1use std::{
17 collections::HashMap,
18 fs,
19 path::{Path, PathBuf},
20};
21
22use anyhow::Context;
23use arrow::record_batch::RecordBatch;
24use chrono::{DateTime, Duration, NaiveDate};
25use futures_util::{StreamExt, future::join_all, pin_mut};
26use heck::ToSnakeCase;
27use nautilus_core::{UnixNanos, datetime::unix_nanos_to_iso8601, parsing::precision_from_str};
28use nautilus_model::{
29 data::{
30 Bar, BarType, Data, OrderBookDelta, OrderBookDeltas_API, OrderBookDepth10, QuoteTick,
31 TradeTick,
32 },
33 identifiers::InstrumentId,
34};
35use nautilus_serialization::arrow::{
36 bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
37 book_depth10_to_arrow_record_batch_bytes, quotes_to_arrow_record_batch_bytes,
38 trades_to_arrow_record_batch_bytes,
39};
40use parquet::{arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties};
41use thousands::Separable;
42use ustr::Ustr;
43
44use super::{enums::TardisExchange, http::models::TardisInstrumentInfo};
45use crate::{
46 config::TardisReplayConfig,
47 http::TardisHttpClient,
48 machine::{TardisMachineClient, types::TardisInstrumentMiniInfo},
49 parse::{normalize_instrument_id, parse_instrument_id},
50};
51
52struct DateCursor {
53 date_utc: NaiveDate,
55 end_ns: UnixNanos,
57}
58
59impl DateCursor {
60 fn new(current_ns: UnixNanos) -> Self {
62 let current_utc = DateTime::from_timestamp_nanos(current_ns.as_i64());
63 let date_utc = current_utc.date_naive();
64
65 let end_utc =
68 date_utc.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
69 let end_ns = UnixNanos::from(end_utc.and_utc().timestamp_nanos_opt().unwrap() as u64);
70
71 Self { date_utc, end_ns }
72 }
73}
74
75async fn gather_instruments_info(
76 config: &TardisReplayConfig,
77 http_client: &TardisHttpClient,
78) -> HashMap<TardisExchange, Vec<TardisInstrumentInfo>> {
79 let futures = config.options.iter().map(|options| {
80 let exchange = options.exchange;
81 let client = &http_client;
82
83 tracing::info!("Requesting instruments for {exchange}");
84
85 async move {
86 match client.instruments_info(exchange, None, None).await {
87 Ok(instruments) => Some((exchange, instruments)),
88 Err(e) => {
89 tracing::error!("Error fetching instruments for {exchange}: {e}");
90 None
91 }
92 }
93 }
94 });
95
96 let results: Vec<(TardisExchange, Vec<TardisInstrumentInfo>)> =
97 join_all(futures).await.into_iter().flatten().collect();
98
99 tracing::info!("Received all instruments");
100
101 results.into_iter().collect()
102}
103
104pub async fn run_tardis_machine_replay_from_config(config_filepath: &Path) -> anyhow::Result<()> {
116 tracing::info!("Starting replay");
117 tracing::info!("Config filepath: {config_filepath:?}");
118
119 let config_data = fs::read_to_string(config_filepath)
121 .with_context(|| format!("Failed to read config file: {config_filepath:?}"))?;
122 let config: TardisReplayConfig = serde_json::from_str(&config_data)
123 .context("Failed to parse config JSON into TardisReplayConfig")?;
124
125 let path = config
126 .output_path
127 .as_deref()
128 .map(Path::new)
129 .map(Path::to_path_buf)
130 .or_else(|| {
131 std::env::var("NAUTILUS_PATH")
132 .ok()
133 .map(|env_path| PathBuf::from(env_path).join("catalog").join("data"))
134 })
135 .unwrap_or_else(|| std::env::current_dir().expect("Failed to get current directory"));
136
137 tracing::info!("Output path: {path:?}");
138
139 let normalize_symbols = config.normalize_symbols.unwrap_or(true);
140 tracing::info!("normalize_symbols={normalize_symbols}");
141
142 let http_client = TardisHttpClient::new(None, None, None, normalize_symbols)?;
143 let mut machine_client =
144 TardisMachineClient::new(config.tardis_ws_url.as_deref(), normalize_symbols)?;
145
146 let info_map = gather_instruments_info(&config, &http_client).await;
147
148 for (exchange, instruments) in &info_map {
149 for inst in instruments {
150 let instrument_type = inst.instrument_type;
151 let price_precision = precision_from_str(&inst.price_increment.to_string());
152 let size_precision = precision_from_str(&inst.amount_increment.to_string());
153
154 let instrument_id = if normalize_symbols {
155 normalize_instrument_id(exchange, inst.id, &instrument_type, inst.inverse)
156 } else {
157 parse_instrument_id(exchange, inst.id)
158 };
159
160 let info = TardisInstrumentMiniInfo::new(
161 instrument_id,
162 Some(Ustr::from(&inst.id)),
163 *exchange,
164 price_precision,
165 size_precision,
166 );
167 machine_client.add_instrument_info(info);
168 }
169 }
170
171 tracing::info!("Starting tardis-machine stream");
172 let stream = machine_client.replay(config.options).await?;
173 pin_mut!(stream);
174
175 let mut deltas_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
177 let mut depths_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
178 let mut quotes_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
179 let mut trades_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
180 let mut bars_cursors: HashMap<BarType, DateCursor> = HashMap::new();
181
182 let mut deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>> = HashMap::new();
184 let mut depths_map: HashMap<InstrumentId, Vec<OrderBookDepth10>> = HashMap::new();
185 let mut quotes_map: HashMap<InstrumentId, Vec<QuoteTick>> = HashMap::new();
186 let mut trades_map: HashMap<InstrumentId, Vec<TradeTick>> = HashMap::new();
187 let mut bars_map: HashMap<BarType, Vec<Bar>> = HashMap::new();
188
189 let mut msg_count = 0;
190
191 while let Some(result) = stream.next().await {
192 match result {
193 Ok(msg) => {
194 match msg {
195 Data::Deltas(msg) => {
196 handle_deltas_msg(msg, &mut deltas_map, &mut deltas_cursors, &path);
197 }
198 Data::Depth10(msg) => {
199 handle_depth10_msg(*msg, &mut depths_map, &mut depths_cursors, &path);
200 }
201 Data::Quote(msg) => {
202 handle_quote_msg(msg, &mut quotes_map, &mut quotes_cursors, &path)
203 }
204 Data::Trade(msg) => {
205 handle_trade_msg(msg, &mut trades_map, &mut trades_cursors, &path)
206 }
207 Data::Bar(msg) => handle_bar_msg(msg, &mut bars_map, &mut bars_cursors, &path),
208 Data::Delta(_) => {
209 panic!("Individual delta message not implemented (or required)")
210 }
211 _ => panic!("Not implemented"),
212 }
213
214 msg_count += 1;
215 if msg_count % 100_000 == 0 {
216 tracing::debug!("Processed {} messages", msg_count.separate_with_commas());
217 }
218 }
219 Err(e) => {
220 tracing::error!("Stream error: {e:?}");
221 break;
222 }
223 }
224 }
225
226 for (instrument_id, deltas) in deltas_map {
229 let cursor = deltas_cursors.get(&instrument_id).expect("Expected cursor");
230 batch_and_write_deltas(deltas, &instrument_id, cursor.date_utc, &path);
231 }
232
233 for (instrument_id, depths) in depths_map {
234 let cursor = depths_cursors.get(&instrument_id).expect("Expected cursor");
235 batch_and_write_depths(depths, &instrument_id, cursor.date_utc, &path);
236 }
237
238 for (instrument_id, quotes) in quotes_map {
239 let cursor = quotes_cursors.get(&instrument_id).expect("Expected cursor");
240 batch_and_write_quotes(quotes, &instrument_id, cursor.date_utc, &path);
241 }
242
243 for (instrument_id, trades) in trades_map {
244 let cursor = trades_cursors.get(&instrument_id).expect("Expected cursor");
245 batch_and_write_trades(trades, &instrument_id, cursor.date_utc, &path);
246 }
247
248 for (bar_type, bars) in bars_map {
249 let cursor = bars_cursors.get(&bar_type).expect("Expected cursor");
250 batch_and_write_bars(bars, &bar_type, cursor.date_utc, &path);
251 }
252
253 tracing::info!(
254 "Replay completed after {} messages",
255 msg_count.separate_with_commas()
256 );
257 Ok(())
258}
259
260fn handle_deltas_msg(
261 deltas: OrderBookDeltas_API,
262 map: &mut HashMap<InstrumentId, Vec<OrderBookDelta>>,
263 cursors: &mut HashMap<InstrumentId, DateCursor>,
264 path: &Path,
265) {
266 let cursor = cursors
267 .entry(deltas.instrument_id)
268 .or_insert_with(|| DateCursor::new(deltas.ts_init));
269
270 if deltas.ts_init > cursor.end_ns {
271 if let Some(deltas_vec) = map.remove(&deltas.instrument_id) {
272 batch_and_write_deltas(deltas_vec, &deltas.instrument_id, cursor.date_utc, path);
273 }
274 *cursor = DateCursor::new(deltas.ts_init);
276 }
277
278 map.entry(deltas.instrument_id)
279 .or_insert_with(|| Vec::with_capacity(1_000_000))
280 .extend(&*deltas.deltas);
281}
282
283fn handle_depth10_msg(
284 depth10: OrderBookDepth10,
285 map: &mut HashMap<InstrumentId, Vec<OrderBookDepth10>>,
286 cursors: &mut HashMap<InstrumentId, DateCursor>,
287 path: &Path,
288) {
289 let cursor = cursors
290 .entry(depth10.instrument_id)
291 .or_insert_with(|| DateCursor::new(depth10.ts_init));
292
293 if depth10.ts_init > cursor.end_ns {
294 if let Some(depths_vec) = map.remove(&depth10.instrument_id) {
295 batch_and_write_depths(depths_vec, &depth10.instrument_id, cursor.date_utc, path);
296 }
297 *cursor = DateCursor::new(depth10.ts_init);
299 }
300
301 map.entry(depth10.instrument_id)
302 .or_insert_with(|| Vec::with_capacity(1_000_000))
303 .push(depth10);
304}
305
306fn handle_quote_msg(
307 quote: QuoteTick,
308 map: &mut HashMap<InstrumentId, Vec<QuoteTick>>,
309 cursors: &mut HashMap<InstrumentId, DateCursor>,
310 path: &Path,
311) {
312 let cursor = cursors
313 .entry(quote.instrument_id)
314 .or_insert_with(|| DateCursor::new(quote.ts_init));
315
316 if quote.ts_init > cursor.end_ns {
317 if let Some(quotes_vec) = map.remove("e.instrument_id) {
318 batch_and_write_quotes(quotes_vec, "e.instrument_id, cursor.date_utc, path);
319 }
320 *cursor = DateCursor::new(quote.ts_init);
322 }
323
324 map.entry(quote.instrument_id)
325 .or_insert_with(|| Vec::with_capacity(1_000_000))
326 .push(quote);
327}
328
329fn handle_trade_msg(
330 trade: TradeTick,
331 map: &mut HashMap<InstrumentId, Vec<TradeTick>>,
332 cursors: &mut HashMap<InstrumentId, DateCursor>,
333 path: &Path,
334) {
335 let cursor = cursors
336 .entry(trade.instrument_id)
337 .or_insert_with(|| DateCursor::new(trade.ts_init));
338
339 if trade.ts_init > cursor.end_ns {
340 if let Some(trades_vec) = map.remove(&trade.instrument_id) {
341 batch_and_write_trades(trades_vec, &trade.instrument_id, cursor.date_utc, path);
342 }
343 *cursor = DateCursor::new(trade.ts_init);
345 }
346
347 map.entry(trade.instrument_id)
348 .or_insert_with(|| Vec::with_capacity(1_000_000))
349 .push(trade);
350}
351
352fn handle_bar_msg(
353 bar: Bar,
354 map: &mut HashMap<BarType, Vec<Bar>>,
355 cursors: &mut HashMap<BarType, DateCursor>,
356 path: &Path,
357) {
358 let cursor = cursors
359 .entry(bar.bar_type)
360 .or_insert_with(|| DateCursor::new(bar.ts_init));
361
362 if bar.ts_init > cursor.end_ns {
363 if let Some(bars_vec) = map.remove(&bar.bar_type) {
364 batch_and_write_bars(bars_vec, &bar.bar_type, cursor.date_utc, path);
365 }
366 *cursor = DateCursor::new(bar.ts_init);
368 }
369
370 map.entry(bar.bar_type)
371 .or_insert_with(|| Vec::with_capacity(1_000_000))
372 .push(bar);
373}
374
375fn batch_and_write_deltas(
376 deltas: Vec<OrderBookDelta>,
377 instrument_id: &InstrumentId,
378 date: NaiveDate,
379 path: &Path,
380) {
381 let typename = stringify!(OrderBookDeltas);
382 match book_deltas_to_arrow_record_batch_bytes(deltas) {
383 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
384 Err(e) => {
385 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
386 }
387 }
388}
389
390fn batch_and_write_depths(
391 depths: Vec<OrderBookDepth10>,
392 instrument_id: &InstrumentId,
393 date: NaiveDate,
394 path: &Path,
395) {
396 let typename = stringify!(OrderBookDepth10);
397 match book_depth10_to_arrow_record_batch_bytes(depths) {
398 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
399 Err(e) => {
400 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
401 }
402 }
403}
404
405fn batch_and_write_quotes(
406 quotes: Vec<QuoteTick>,
407 instrument_id: &InstrumentId,
408 date: NaiveDate,
409 path: &Path,
410) {
411 let typename = stringify!(QuoteTick);
412 match quotes_to_arrow_record_batch_bytes(quotes) {
413 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
414 Err(e) => {
415 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
416 }
417 }
418}
419
420fn batch_and_write_trades(
421 trades: Vec<TradeTick>,
422 instrument_id: &InstrumentId,
423 date: NaiveDate,
424 path: &Path,
425) {
426 let typename = stringify!(TradeTick);
427 match trades_to_arrow_record_batch_bytes(trades) {
428 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
429 Err(e) => {
430 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
431 }
432 }
433}
434
435fn batch_and_write_bars(bars: Vec<Bar>, bar_type: &BarType, date: NaiveDate, path: &Path) {
436 let typename = stringify!(Bar);
437 let batch = match bars_to_arrow_record_batch_bytes(bars) {
438 Ok(batch) => batch,
439 Err(e) => {
440 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
441 return;
442 }
443 };
444
445 let filepath = path.join(parquet_filepath_bars(bar_type, date));
446 if let Err(e) = write_parquet_local(batch, &filepath) {
447 tracing::error!("Error writing {filepath:?}: {e:?}");
448 } else {
449 tracing::info!("File written: {filepath:?}");
450 }
451}
452
453fn assert_post_epoch(date: NaiveDate) {
460 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("UNIX epoch must exist");
461 if date < epoch {
462 panic!("Tardis replay filenames require dates on or after 1970-01-01; received {date}");
463 }
464}
465
466fn iso_timestamp_to_file_timestamp(iso_timestamp: &str) -> String {
471 iso_timestamp.replace([':', '.'], "-")
472}
473
474fn timestamps_to_filename(timestamp_1: UnixNanos, timestamp_2: UnixNanos) -> String {
479 let datetime_1 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_1));
480 let datetime_2 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_2));
481
482 format!("{datetime_1}_{datetime_2}.parquet")
483}
484
485fn parquet_filepath(typename: &str, instrument_id: &InstrumentId, date: NaiveDate) -> PathBuf {
486 assert_post_epoch(date);
487
488 let typename = typename.to_snake_case();
489 let instrument_id_str = instrument_id.to_string().replace('/', "");
490
491 let start_utc = date.and_hms_opt(0, 0, 0).unwrap().and_utc();
492 let end_utc = date.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
493
494 let start_nanos = start_utc
495 .timestamp_nanos_opt()
496 .expect("valid nanosecond timestamp");
497 let end_nanos = (end_utc.and_utc())
498 .timestamp_nanos_opt()
499 .expect("valid nanosecond timestamp");
500
501 let filename = timestamps_to_filename(
502 UnixNanos::from(start_nanos as u64),
503 UnixNanos::from(end_nanos as u64),
504 );
505
506 PathBuf::new()
507 .join(typename)
508 .join(instrument_id_str)
509 .join(filename)
510}
511
512fn parquet_filepath_bars(bar_type: &BarType, date: NaiveDate) -> PathBuf {
513 assert_post_epoch(date);
514
515 let bar_type_str = bar_type.to_string().replace('/', "");
516
517 let start_utc = date.and_hms_opt(0, 0, 0).unwrap().and_utc();
519 let end_utc = date.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
520
521 let start_nanos = start_utc
522 .timestamp_nanos_opt()
523 .expect("valid nanosecond timestamp");
524 let end_nanos = (end_utc.and_utc())
525 .timestamp_nanos_opt()
526 .expect("valid nanosecond timestamp");
527
528 let filename = timestamps_to_filename(
529 UnixNanos::from(start_nanos as u64),
530 UnixNanos::from(end_nanos as u64),
531 );
532
533 PathBuf::new().join("bar").join(bar_type_str).join(filename)
534}
535
536fn write_batch(
537 batch: RecordBatch,
538 typename: &str,
539 instrument_id: &InstrumentId,
540 date: NaiveDate,
541 path: &Path,
542) {
543 let filepath = path.join(parquet_filepath(typename, instrument_id, date));
544 if let Err(e) = write_parquet_local(batch, &filepath) {
545 tracing::error!("Error writing {filepath:?}: {e:?}");
546 } else {
547 tracing::info!("File written: {filepath:?}");
548 }
549}
550
551fn write_parquet_local(batch: RecordBatch, file_path: &Path) -> anyhow::Result<()> {
552 if let Some(parent) = file_path.parent() {
553 std::fs::create_dir_all(parent)?;
554 }
555
556 let file = std::fs::File::create(file_path)?;
557 let props = WriterProperties::builder()
558 .set_compression(Compression::SNAPPY)
559 .build();
560
561 let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
562 writer.write(&batch)?;
563 writer.close()?;
564 Ok(())
565}
566
567#[cfg(test)]
571mod tests {
572 use chrono::{TimeZone, Utc};
573 use rstest::rstest;
574
575 use super::*;
576
577 #[rstest]
578 #[case(
579 Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
581 NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
582 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
583)]
584 #[case(
585 Utc.with_ymd_and_hms(2024, 1, 1, 12, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
587 NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
588 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
589)]
590 #[case(
591 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999,
593 NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
594 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
595)]
596 #[case(
597 Utc.with_ymd_and_hms(2024, 1, 2, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
599 NaiveDate::from_ymd_opt(2024, 1, 2).unwrap(),
600 Utc.with_ymd_and_hms(2024, 1, 2, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
601)]
602 fn test_date_cursor(
603 #[case] timestamp: u64,
604 #[case] expected_date: NaiveDate,
605 #[case] expected_end_ns: u64,
606 ) {
607 let unix_nanos = UnixNanos::from(timestamp);
608 let cursor = DateCursor::new(unix_nanos);
609
610 assert_eq!(cursor.date_utc, expected_date);
611 assert_eq!(cursor.end_ns, UnixNanos::from(expected_end_ns));
612 }
613}