1use std::{
17 collections::HashMap,
18 fs,
19 path::{Path, PathBuf},
20};
21
22use anyhow::Context;
23use arrow::record_batch::RecordBatch;
24use chrono::{DateTime, Duration, NaiveDate};
25use futures_util::{StreamExt, future::join_all, pin_mut};
26use heck::ToSnakeCase;
27use nautilus_core::{UnixNanos, datetime::unix_nanos_to_iso8601, parsing::precision_from_str};
28use nautilus_model::{
29 data::{
30 Bar, BarType, Data, OrderBookDelta, OrderBookDeltas_API, OrderBookDepth10, QuoteTick,
31 TradeTick,
32 },
33 identifiers::InstrumentId,
34};
35use nautilus_serialization::arrow::{
36 bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
37 book_depth10_to_arrow_record_batch_bytes, quotes_to_arrow_record_batch_bytes,
38 trades_to_arrow_record_batch_bytes,
39};
40use parquet::{arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties};
41use thousands::Separable;
42use ustr::Ustr;
43
44use super::{enums::TardisExchange, http::models::TardisInstrumentInfo};
45use crate::{
46 config::{BookSnapshotOutput, TardisReplayConfig},
47 http::TardisHttpClient,
48 machine::{TardisMachineClient, types::TardisInstrumentMiniInfo},
49 parse::{normalize_instrument_id, parse_instrument_id},
50};
51
52struct DateCursor {
53 date_utc: NaiveDate,
55 end_ns: UnixNanos,
57}
58
59impl DateCursor {
60 fn new(current_ns: UnixNanos) -> Self {
62 let current_utc = DateTime::from_timestamp_nanos(current_ns.as_i64());
63 let date_utc = current_utc.date_naive();
64
65 let end_utc =
68 date_utc.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
69 let end_ns = UnixNanos::from(end_utc.and_utc().timestamp_nanos_opt().unwrap() as u64);
70
71 Self { date_utc, end_ns }
72 }
73}
74
75async fn gather_instruments_info(
76 config: &TardisReplayConfig,
77 http_client: &TardisHttpClient,
78) -> HashMap<TardisExchange, Vec<TardisInstrumentInfo>> {
79 let futures = config.options.iter().map(|options| {
80 let exchange = options.exchange;
81 let client = &http_client;
82
83 tracing::info!("Requesting instruments for {exchange}");
84
85 async move {
86 match client.instruments_info(exchange, None, None).await {
87 Ok(instruments) => Some((exchange, instruments)),
88 Err(e) => {
89 tracing::error!("Error fetching instruments for {exchange}: {e}");
90 None
91 }
92 }
93 }
94 });
95
96 let results: HashMap<TardisExchange, Vec<TardisInstrumentInfo>> =
97 join_all(futures).await.into_iter().flatten().collect();
98
99 tracing::info!("Received all instruments");
100
101 results
102}
103
104pub async fn run_tardis_machine_replay_from_config(config_filepath: &Path) -> anyhow::Result<()> {
116 tracing::info!("Starting replay");
117 tracing::info!("Config filepath: {config_filepath:?}");
118
119 let config_data = fs::read_to_string(config_filepath)
121 .with_context(|| format!("Failed to read config file: {config_filepath:?}"))?;
122 let config: TardisReplayConfig = serde_json::from_str(&config_data)
123 .context("failed to parse config JSON into TardisReplayConfig")?;
124
125 let path = config
126 .output_path
127 .as_deref()
128 .map(Path::new)
129 .map(Path::to_path_buf)
130 .or_else(|| {
131 std::env::var("NAUTILUS_PATH")
132 .ok()
133 .map(|env_path| PathBuf::from(env_path).join("catalog").join("data"))
134 })
135 .unwrap_or_else(|| std::env::current_dir().expect("Failed to get current directory"));
136
137 tracing::info!("Output path: {path:?}");
138
139 let normalize_symbols = config.normalize_symbols.unwrap_or(true);
140 tracing::info!("normalize_symbols={normalize_symbols}");
141
142 let book_snapshot_output = config
143 .book_snapshot_output
144 .clone()
145 .unwrap_or(BookSnapshotOutput::Deltas);
146 tracing::info!("book_snapshot_output={book_snapshot_output:?}");
147
148 let http_client = TardisHttpClient::new(None, None, None, normalize_symbols)?;
149 let mut machine_client = TardisMachineClient::new(
150 config.tardis_ws_url.as_deref(),
151 normalize_symbols,
152 book_snapshot_output,
153 )?;
154
155 let info_map = gather_instruments_info(&config, &http_client).await;
156
157 for (exchange, instruments) in &info_map {
158 for inst in instruments {
159 let instrument_type = inst.instrument_type;
160 let price_precision = precision_from_str(&inst.price_increment.to_string());
161 let size_precision = precision_from_str(&inst.amount_increment.to_string());
162
163 let instrument_id = if normalize_symbols {
164 normalize_instrument_id(exchange, inst.id, &instrument_type, inst.inverse)
165 } else {
166 parse_instrument_id(exchange, inst.id)
167 };
168
169 let info = TardisInstrumentMiniInfo::new(
170 instrument_id,
171 Some(Ustr::from(&inst.id)),
172 *exchange,
173 price_precision,
174 size_precision,
175 );
176 machine_client.add_instrument_info(info);
177 }
178 }
179
180 tracing::info!("Starting tardis-machine stream");
181 let stream = machine_client.replay(config.options).await?;
182 pin_mut!(stream);
183
184 let mut deltas_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
186 let mut depths_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
187 let mut quotes_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
188 let mut trades_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
189 let mut bars_cursors: HashMap<BarType, DateCursor> = HashMap::new();
190
191 let mut deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>> = HashMap::new();
193 let mut depths_map: HashMap<InstrumentId, Vec<OrderBookDepth10>> = HashMap::new();
194 let mut quotes_map: HashMap<InstrumentId, Vec<QuoteTick>> = HashMap::new();
195 let mut trades_map: HashMap<InstrumentId, Vec<TradeTick>> = HashMap::new();
196 let mut bars_map: HashMap<BarType, Vec<Bar>> = HashMap::new();
197
198 let mut msg_count = 0;
199
200 while let Some(result) = stream.next().await {
201 match result {
202 Ok(msg) => {
203 match msg {
204 Data::Deltas(msg) => {
205 handle_deltas_msg(msg, &mut deltas_map, &mut deltas_cursors, &path);
206 }
207 Data::Depth10(msg) => {
208 handle_depth10_msg(*msg, &mut depths_map, &mut depths_cursors, &path);
209 }
210 Data::Quote(msg) => {
211 handle_quote_msg(msg, &mut quotes_map, &mut quotes_cursors, &path);
212 }
213 Data::Trade(msg) => {
214 handle_trade_msg(msg, &mut trades_map, &mut trades_cursors, &path);
215 }
216 Data::Bar(msg) => handle_bar_msg(msg, &mut bars_map, &mut bars_cursors, &path),
217 Data::Delta(delta) => {
218 tracing::warn!(
219 "Skipping individual delta message for {} (use Deltas batch instead)",
220 delta.instrument_id
221 );
222 }
223 Data::MarkPriceUpdate(_)
224 | Data::IndexPriceUpdate(_)
225 | Data::InstrumentClose(_) => {
226 tracing::debug!(
227 "Skipping unsupported data type for instrument {}",
228 msg.instrument_id()
229 );
230 }
231 }
232
233 msg_count += 1;
234 if msg_count % 100_000 == 0 {
235 tracing::debug!("Processed {} messages", msg_count.separate_with_commas());
236 }
237 }
238 Err(e) => {
239 tracing::error!("Stream error: {e:?}");
240 break;
241 }
242 }
243 }
244
245 for (instrument_id, deltas) in deltas_map {
248 let cursor = deltas_cursors.get(&instrument_id).expect("Expected cursor");
249 batch_and_write_deltas(deltas, &instrument_id, cursor.date_utc, &path);
250 }
251
252 for (instrument_id, depths) in depths_map {
253 let cursor = depths_cursors.get(&instrument_id).expect("Expected cursor");
254 batch_and_write_depths(depths, &instrument_id, cursor.date_utc, &path);
255 }
256
257 for (instrument_id, quotes) in quotes_map {
258 let cursor = quotes_cursors.get(&instrument_id).expect("Expected cursor");
259 batch_and_write_quotes(quotes, &instrument_id, cursor.date_utc, &path);
260 }
261
262 for (instrument_id, trades) in trades_map {
263 let cursor = trades_cursors.get(&instrument_id).expect("Expected cursor");
264 batch_and_write_trades(trades, &instrument_id, cursor.date_utc, &path);
265 }
266
267 for (bar_type, bars) in bars_map {
268 let cursor = bars_cursors.get(&bar_type).expect("Expected cursor");
269 batch_and_write_bars(bars, &bar_type, cursor.date_utc, &path);
270 }
271
272 tracing::info!(
273 "Replay completed after {} messages",
274 msg_count.separate_with_commas()
275 );
276 Ok(())
277}
278
279fn handle_deltas_msg(
280 deltas: OrderBookDeltas_API,
281 map: &mut HashMap<InstrumentId, Vec<OrderBookDelta>>,
282 cursors: &mut HashMap<InstrumentId, DateCursor>,
283 path: &Path,
284) {
285 let cursor = cursors
286 .entry(deltas.instrument_id)
287 .or_insert_with(|| DateCursor::new(deltas.ts_init));
288
289 if deltas.ts_init > cursor.end_ns {
290 if let Some(deltas_vec) = map.remove(&deltas.instrument_id) {
291 batch_and_write_deltas(deltas_vec, &deltas.instrument_id, cursor.date_utc, path);
292 }
293 *cursor = DateCursor::new(deltas.ts_init);
295 }
296
297 map.entry(deltas.instrument_id)
298 .or_insert_with(|| Vec::with_capacity(100_000))
299 .extend(&*deltas.deltas);
300}
301
302fn handle_depth10_msg(
303 depth10: OrderBookDepth10,
304 map: &mut HashMap<InstrumentId, Vec<OrderBookDepth10>>,
305 cursors: &mut HashMap<InstrumentId, DateCursor>,
306 path: &Path,
307) {
308 let cursor = cursors
309 .entry(depth10.instrument_id)
310 .or_insert_with(|| DateCursor::new(depth10.ts_init));
311
312 if depth10.ts_init > cursor.end_ns {
313 if let Some(depths_vec) = map.remove(&depth10.instrument_id) {
314 batch_and_write_depths(depths_vec, &depth10.instrument_id, cursor.date_utc, path);
315 }
316 *cursor = DateCursor::new(depth10.ts_init);
318 }
319
320 map.entry(depth10.instrument_id)
321 .or_insert_with(|| Vec::with_capacity(100_000))
322 .push(depth10);
323}
324
325fn handle_quote_msg(
326 quote: QuoteTick,
327 map: &mut HashMap<InstrumentId, Vec<QuoteTick>>,
328 cursors: &mut HashMap<InstrumentId, DateCursor>,
329 path: &Path,
330) {
331 let cursor = cursors
332 .entry(quote.instrument_id)
333 .or_insert_with(|| DateCursor::new(quote.ts_init));
334
335 if quote.ts_init > cursor.end_ns {
336 if let Some(quotes_vec) = map.remove("e.instrument_id) {
337 batch_and_write_quotes(quotes_vec, "e.instrument_id, cursor.date_utc, path);
338 }
339 *cursor = DateCursor::new(quote.ts_init);
341 }
342
343 map.entry(quote.instrument_id)
344 .or_insert_with(|| Vec::with_capacity(100_000))
345 .push(quote);
346}
347
348fn handle_trade_msg(
349 trade: TradeTick,
350 map: &mut HashMap<InstrumentId, Vec<TradeTick>>,
351 cursors: &mut HashMap<InstrumentId, DateCursor>,
352 path: &Path,
353) {
354 let cursor = cursors
355 .entry(trade.instrument_id)
356 .or_insert_with(|| DateCursor::new(trade.ts_init));
357
358 if trade.ts_init > cursor.end_ns {
359 if let Some(trades_vec) = map.remove(&trade.instrument_id) {
360 batch_and_write_trades(trades_vec, &trade.instrument_id, cursor.date_utc, path);
361 }
362 *cursor = DateCursor::new(trade.ts_init);
364 }
365
366 map.entry(trade.instrument_id)
367 .or_insert_with(|| Vec::with_capacity(100_000))
368 .push(trade);
369}
370
371fn handle_bar_msg(
372 bar: Bar,
373 map: &mut HashMap<BarType, Vec<Bar>>,
374 cursors: &mut HashMap<BarType, DateCursor>,
375 path: &Path,
376) {
377 let cursor = cursors
378 .entry(bar.bar_type)
379 .or_insert_with(|| DateCursor::new(bar.ts_init));
380
381 if bar.ts_init > cursor.end_ns {
382 if let Some(bars_vec) = map.remove(&bar.bar_type) {
383 batch_and_write_bars(bars_vec, &bar.bar_type, cursor.date_utc, path);
384 }
385 *cursor = DateCursor::new(bar.ts_init);
387 }
388
389 map.entry(bar.bar_type)
390 .or_insert_with(|| Vec::with_capacity(100_000))
391 .push(bar);
392}
393
394fn batch_and_write_deltas(
395 deltas: Vec<OrderBookDelta>,
396 instrument_id: &InstrumentId,
397 date: NaiveDate,
398 path: &Path,
399) {
400 let typename = stringify!(OrderBookDeltas);
401 match book_deltas_to_arrow_record_batch_bytes(deltas) {
402 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
403 Err(e) => {
404 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
405 }
406 }
407}
408
409fn batch_and_write_depths(
410 depths: Vec<OrderBookDepth10>,
411 instrument_id: &InstrumentId,
412 date: NaiveDate,
413 path: &Path,
414) {
415 let typename = "order_book_depths";
417 match book_depth10_to_arrow_record_batch_bytes(depths) {
418 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
419 Err(e) => {
420 tracing::error!("Error converting OrderBookDepth10 to Arrow: {e:?}");
421 }
422 }
423}
424
425fn batch_and_write_quotes(
426 quotes: Vec<QuoteTick>,
427 instrument_id: &InstrumentId,
428 date: NaiveDate,
429 path: &Path,
430) {
431 let typename = stringify!(QuoteTick);
432 match quotes_to_arrow_record_batch_bytes(quotes) {
433 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
434 Err(e) => {
435 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
436 }
437 }
438}
439
440fn batch_and_write_trades(
441 trades: Vec<TradeTick>,
442 instrument_id: &InstrumentId,
443 date: NaiveDate,
444 path: &Path,
445) {
446 let typename = stringify!(TradeTick);
447 match trades_to_arrow_record_batch_bytes(trades) {
448 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
449 Err(e) => {
450 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
451 }
452 }
453}
454
455fn batch_and_write_bars(bars: Vec<Bar>, bar_type: &BarType, date: NaiveDate, path: &Path) {
456 let typename = stringify!(Bar);
457 let batch = match bars_to_arrow_record_batch_bytes(bars) {
458 Ok(batch) => batch,
459 Err(e) => {
460 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
461 return;
462 }
463 };
464
465 let filepath = path.join(parquet_filepath_bars(bar_type, date));
466 if let Err(e) = write_parquet_local(batch, &filepath) {
467 tracing::error!("Error writing {filepath:?}: {e:?}");
468 } else {
469 tracing::info!("File written: {filepath:?}");
470 }
471}
472
473fn assert_post_epoch(date: NaiveDate) {
480 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("UNIX epoch must exist");
481 if date < epoch {
482 panic!("Tardis replay filenames require dates on or after 1970-01-01; received {date}");
483 }
484}
485
486fn iso_timestamp_to_file_timestamp(iso_timestamp: &str) -> String {
491 iso_timestamp.replace([':', '.'], "-")
492}
493
494fn timestamps_to_filename(timestamp_1: UnixNanos, timestamp_2: UnixNanos) -> String {
499 let datetime_1 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_1));
500 let datetime_2 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_2));
501
502 format!("{datetime_1}_{datetime_2}.parquet")
503}
504
505fn parquet_filepath(typename: &str, instrument_id: &InstrumentId, date: NaiveDate) -> PathBuf {
506 assert_post_epoch(date);
507
508 let typename = typename.to_snake_case();
509 let instrument_id_str = instrument_id.to_string().replace('/', "");
510
511 let start_utc = date.and_hms_opt(0, 0, 0).unwrap().and_utc();
512 let end_utc = date.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
513
514 let start_nanos = start_utc
515 .timestamp_nanos_opt()
516 .expect("valid nanosecond timestamp");
517 let end_nanos = (end_utc.and_utc())
518 .timestamp_nanos_opt()
519 .expect("valid nanosecond timestamp");
520
521 let filename = timestamps_to_filename(
522 UnixNanos::from(start_nanos as u64),
523 UnixNanos::from(end_nanos as u64),
524 );
525
526 PathBuf::new()
527 .join(typename)
528 .join(instrument_id_str)
529 .join(filename)
530}
531
532fn parquet_filepath_bars(bar_type: &BarType, date: NaiveDate) -> PathBuf {
533 assert_post_epoch(date);
534
535 let bar_type_str = bar_type.to_string().replace('/', "");
536
537 let start_utc = date.and_hms_opt(0, 0, 0).unwrap().and_utc();
539 let end_utc = date.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
540
541 let start_nanos = start_utc
542 .timestamp_nanos_opt()
543 .expect("valid nanosecond timestamp");
544 let end_nanos = (end_utc.and_utc())
545 .timestamp_nanos_opt()
546 .expect("valid nanosecond timestamp");
547
548 let filename = timestamps_to_filename(
549 UnixNanos::from(start_nanos as u64),
550 UnixNanos::from(end_nanos as u64),
551 );
552
553 PathBuf::new().join("bar").join(bar_type_str).join(filename)
554}
555
556fn write_batch(
557 batch: RecordBatch,
558 typename: &str,
559 instrument_id: &InstrumentId,
560 date: NaiveDate,
561 path: &Path,
562) {
563 let filepath = path.join(parquet_filepath(typename, instrument_id, date));
564 if let Err(e) = write_parquet_local(batch, &filepath) {
565 tracing::error!("Error writing {filepath:?}: {e:?}");
566 } else {
567 tracing::info!("File written: {filepath:?}");
568 }
569}
570
571fn write_parquet_local(batch: RecordBatch, file_path: &Path) -> anyhow::Result<()> {
572 if let Some(parent) = file_path.parent() {
573 std::fs::create_dir_all(parent)?;
574 }
575
576 let file = std::fs::File::create(file_path)?;
577 let props = WriterProperties::builder()
578 .set_compression(Compression::SNAPPY)
579 .build();
580
581 let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
582 writer.write(&batch)?;
583 writer.close()?;
584 Ok(())
585}
586
587#[cfg(test)]
588mod tests {
589 use chrono::{TimeZone, Utc};
590 use rstest::rstest;
591
592 use super::*;
593
594 #[rstest]
595 #[case(
596 Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
598 NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
599 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
600)]
601 #[case(
602 Utc.with_ymd_and_hms(2024, 1, 1, 12, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
604 NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
605 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
606)]
607 #[case(
608 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999,
610 NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
611 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
612)]
613 #[case(
614 Utc.with_ymd_and_hms(2024, 1, 2, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
616 NaiveDate::from_ymd_opt(2024, 1, 2).unwrap(),
617 Utc.with_ymd_and_hms(2024, 1, 2, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
618)]
619 fn test_date_cursor(
620 #[case] timestamp: u64,
621 #[case] expected_date: NaiveDate,
622 #[case] expected_end_ns: u64,
623 ) {
624 let unix_nanos = UnixNanos::from(timestamp);
625 let cursor = DateCursor::new(unix_nanos);
626
627 assert_eq!(cursor.date_utc, expected_date);
628 assert_eq!(cursor.end_ns, UnixNanos::from(expected_end_ns));
629 }
630}