1use std::{
17 collections::HashMap,
18 fs,
19 path::{Path, PathBuf},
20};
21
22use anyhow::Context;
23use arrow::record_batch::RecordBatch;
24use chrono::{DateTime, Duration, NaiveDate};
25use futures_util::{StreamExt, future::join_all, pin_mut};
26use heck::ToSnakeCase;
27use nautilus_core::{
28 UnixNanos, datetime::unix_nanos_to_iso8601, formatting::Separable, parsing::precision_from_str,
29};
30use nautilus_model::{
31 data::{
32 Bar, BarType, Data, OrderBookDelta, OrderBookDeltas_API, OrderBookDepth10, QuoteTick,
33 TradeTick,
34 },
35 identifiers::InstrumentId,
36};
37use nautilus_serialization::arrow::{
38 bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
39 book_depth10_to_arrow_record_batch_bytes, quotes_to_arrow_record_batch_bytes,
40 trades_to_arrow_record_batch_bytes,
41};
42use parquet::{arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties};
43use ustr::Ustr;
44
45use super::{enums::TardisExchange, http::models::TardisInstrumentInfo};
46use crate::{
47 config::{BookSnapshotOutput, TardisReplayConfig},
48 http::TardisHttpClient,
49 machine::{TardisMachineClient, types::TardisInstrumentMiniInfo},
50 parse::{normalize_instrument_id, parse_instrument_id},
51};
52
53struct DateCursor {
54 date_utc: NaiveDate,
56 end_ns: UnixNanos,
58}
59
60impl DateCursor {
61 fn new(current_ns: UnixNanos) -> Self {
63 let current_utc = DateTime::from_timestamp_nanos(current_ns.as_i64());
64 let date_utc = current_utc.date_naive();
65
66 let end_utc =
69 date_utc.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
70 let end_ns = UnixNanos::from(end_utc.and_utc().timestamp_nanos_opt().unwrap() as u64);
71
72 Self { date_utc, end_ns }
73 }
74}
75
76async fn gather_instruments_info(
77 config: &TardisReplayConfig,
78 http_client: &TardisHttpClient,
79) -> HashMap<TardisExchange, Vec<TardisInstrumentInfo>> {
80 let futures = config.options.iter().map(|options| {
81 let exchange = options.exchange;
82 let client = &http_client;
83
84 tracing::info!("Requesting instruments for {exchange}");
85
86 async move {
87 match client.instruments_info(exchange, None, None).await {
88 Ok(instruments) => Some((exchange, instruments)),
89 Err(e) => {
90 tracing::error!("Error fetching instruments for {exchange}: {e}");
91 None
92 }
93 }
94 }
95 });
96
97 let results: HashMap<TardisExchange, Vec<TardisInstrumentInfo>> =
98 join_all(futures).await.into_iter().flatten().collect();
99
100 tracing::info!("Received all instruments");
101
102 results
103}
104
105pub async fn run_tardis_machine_replay_from_config(config_filepath: &Path) -> anyhow::Result<()> {
117 tracing::info!("Starting replay");
118 tracing::info!("Config filepath: {config_filepath:?}");
119
120 let config_data = fs::read_to_string(config_filepath)
122 .with_context(|| format!("Failed to read config file: {config_filepath:?}"))?;
123 let config: TardisReplayConfig = serde_json::from_str(&config_data)
124 .context("failed to parse config JSON into TardisReplayConfig")?;
125
126 let path = config
127 .output_path
128 .as_deref()
129 .map(Path::new)
130 .map(Path::to_path_buf)
131 .or_else(|| {
132 std::env::var("NAUTILUS_PATH")
133 .ok()
134 .map(|env_path| PathBuf::from(env_path).join("catalog").join("data"))
135 })
136 .unwrap_or_else(|| std::env::current_dir().expect("Failed to get current directory"));
137
138 tracing::info!("Output path: {path:?}");
139
140 let normalize_symbols = config.normalize_symbols.unwrap_or(true);
141 tracing::info!("normalize_symbols={normalize_symbols}");
142
143 let book_snapshot_output = config
144 .book_snapshot_output
145 .clone()
146 .unwrap_or(BookSnapshotOutput::Deltas);
147 tracing::info!("book_snapshot_output={book_snapshot_output:?}");
148
149 let http_client = TardisHttpClient::new(None, None, None, normalize_symbols)?;
150 let mut machine_client = TardisMachineClient::new(
151 config.tardis_ws_url.as_deref(),
152 normalize_symbols,
153 book_snapshot_output,
154 )?;
155
156 let info_map = gather_instruments_info(&config, &http_client).await;
157
158 for (exchange, instruments) in &info_map {
159 for inst in instruments {
160 let instrument_type = inst.instrument_type;
161 let price_precision = precision_from_str(&inst.price_increment.to_string());
162 let size_precision = precision_from_str(&inst.amount_increment.to_string());
163
164 let instrument_id = if normalize_symbols {
165 normalize_instrument_id(exchange, inst.id, &instrument_type, inst.inverse)
166 } else {
167 parse_instrument_id(exchange, inst.id)
168 };
169
170 let info = TardisInstrumentMiniInfo::new(
171 instrument_id,
172 Some(Ustr::from(&inst.id)),
173 *exchange,
174 price_precision,
175 size_precision,
176 );
177 machine_client.add_instrument_info(info);
178 }
179 }
180
181 tracing::info!("Starting tardis-machine stream");
182 let stream = machine_client.replay(config.options).await?;
183 pin_mut!(stream);
184
185 let mut deltas_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
187 let mut depths_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
188 let mut quotes_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
189 let mut trades_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
190 let mut bars_cursors: HashMap<BarType, DateCursor> = HashMap::new();
191
192 let mut deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>> = HashMap::new();
194 let mut depths_map: HashMap<InstrumentId, Vec<OrderBookDepth10>> = HashMap::new();
195 let mut quotes_map: HashMap<InstrumentId, Vec<QuoteTick>> = HashMap::new();
196 let mut trades_map: HashMap<InstrumentId, Vec<TradeTick>> = HashMap::new();
197 let mut bars_map: HashMap<BarType, Vec<Bar>> = HashMap::new();
198
199 let mut msg_count = 0;
200
201 while let Some(result) = stream.next().await {
202 match result {
203 Ok(msg) => {
204 match msg {
205 Data::Deltas(msg) => {
206 handle_deltas_msg(msg, &mut deltas_map, &mut deltas_cursors, &path);
207 }
208 Data::Depth10(msg) => {
209 handle_depth10_msg(*msg, &mut depths_map, &mut depths_cursors, &path);
210 }
211 Data::Quote(msg) => {
212 handle_quote_msg(msg, &mut quotes_map, &mut quotes_cursors, &path);
213 }
214 Data::Trade(msg) => {
215 handle_trade_msg(msg, &mut trades_map, &mut trades_cursors, &path);
216 }
217 Data::Bar(msg) => handle_bar_msg(msg, &mut bars_map, &mut bars_cursors, &path),
218 Data::Delta(delta) => {
219 tracing::warn!(
220 "Skipping individual delta message for {} (use Deltas batch instead)",
221 delta.instrument_id
222 );
223 }
224 Data::MarkPriceUpdate(_)
225 | Data::IndexPriceUpdate(_)
226 | Data::InstrumentClose(_) => {
227 tracing::debug!(
228 "Skipping unsupported data type for instrument {}",
229 msg.instrument_id()
230 );
231 }
232 }
233
234 msg_count += 1;
235 if msg_count % 100_000 == 0 {
236 tracing::debug!("Processed {} messages", msg_count.separate_with_commas());
237 }
238 }
239 Err(e) => {
240 tracing::error!("Stream error: {e:?}");
241 break;
242 }
243 }
244 }
245
246 for (instrument_id, deltas) in deltas_map {
249 let cursor = deltas_cursors.get(&instrument_id).expect("Expected cursor");
250 batch_and_write_deltas(deltas, &instrument_id, cursor.date_utc, &path);
251 }
252
253 for (instrument_id, depths) in depths_map {
254 let cursor = depths_cursors.get(&instrument_id).expect("Expected cursor");
255 batch_and_write_depths(depths, &instrument_id, cursor.date_utc, &path);
256 }
257
258 for (instrument_id, quotes) in quotes_map {
259 let cursor = quotes_cursors.get(&instrument_id).expect("Expected cursor");
260 batch_and_write_quotes(quotes, &instrument_id, cursor.date_utc, &path);
261 }
262
263 for (instrument_id, trades) in trades_map {
264 let cursor = trades_cursors.get(&instrument_id).expect("Expected cursor");
265 batch_and_write_trades(trades, &instrument_id, cursor.date_utc, &path);
266 }
267
268 for (bar_type, bars) in bars_map {
269 let cursor = bars_cursors.get(&bar_type).expect("Expected cursor");
270 batch_and_write_bars(bars, &bar_type, cursor.date_utc, &path);
271 }
272
273 tracing::info!(
274 "Replay completed after {} messages",
275 msg_count.separate_with_commas()
276 );
277 Ok(())
278}
279
280fn handle_deltas_msg(
281 deltas: OrderBookDeltas_API,
282 map: &mut HashMap<InstrumentId, Vec<OrderBookDelta>>,
283 cursors: &mut HashMap<InstrumentId, DateCursor>,
284 path: &Path,
285) {
286 let cursor = cursors
287 .entry(deltas.instrument_id)
288 .or_insert_with(|| DateCursor::new(deltas.ts_init));
289
290 if deltas.ts_init > cursor.end_ns {
291 if let Some(deltas_vec) = map.remove(&deltas.instrument_id) {
292 batch_and_write_deltas(deltas_vec, &deltas.instrument_id, cursor.date_utc, path);
293 }
294 *cursor = DateCursor::new(deltas.ts_init);
296 }
297
298 map.entry(deltas.instrument_id)
299 .or_insert_with(|| Vec::with_capacity(100_000))
300 .extend(&*deltas.deltas);
301}
302
303fn handle_depth10_msg(
304 depth10: OrderBookDepth10,
305 map: &mut HashMap<InstrumentId, Vec<OrderBookDepth10>>,
306 cursors: &mut HashMap<InstrumentId, DateCursor>,
307 path: &Path,
308) {
309 let cursor = cursors
310 .entry(depth10.instrument_id)
311 .or_insert_with(|| DateCursor::new(depth10.ts_init));
312
313 if depth10.ts_init > cursor.end_ns {
314 if let Some(depths_vec) = map.remove(&depth10.instrument_id) {
315 batch_and_write_depths(depths_vec, &depth10.instrument_id, cursor.date_utc, path);
316 }
317 *cursor = DateCursor::new(depth10.ts_init);
319 }
320
321 map.entry(depth10.instrument_id)
322 .or_insert_with(|| Vec::with_capacity(100_000))
323 .push(depth10);
324}
325
326fn handle_quote_msg(
327 quote: QuoteTick,
328 map: &mut HashMap<InstrumentId, Vec<QuoteTick>>,
329 cursors: &mut HashMap<InstrumentId, DateCursor>,
330 path: &Path,
331) {
332 let cursor = cursors
333 .entry(quote.instrument_id)
334 .or_insert_with(|| DateCursor::new(quote.ts_init));
335
336 if quote.ts_init > cursor.end_ns {
337 if let Some(quotes_vec) = map.remove("e.instrument_id) {
338 batch_and_write_quotes(quotes_vec, "e.instrument_id, cursor.date_utc, path);
339 }
340 *cursor = DateCursor::new(quote.ts_init);
342 }
343
344 map.entry(quote.instrument_id)
345 .or_insert_with(|| Vec::with_capacity(100_000))
346 .push(quote);
347}
348
349fn handle_trade_msg(
350 trade: TradeTick,
351 map: &mut HashMap<InstrumentId, Vec<TradeTick>>,
352 cursors: &mut HashMap<InstrumentId, DateCursor>,
353 path: &Path,
354) {
355 let cursor = cursors
356 .entry(trade.instrument_id)
357 .or_insert_with(|| DateCursor::new(trade.ts_init));
358
359 if trade.ts_init > cursor.end_ns {
360 if let Some(trades_vec) = map.remove(&trade.instrument_id) {
361 batch_and_write_trades(trades_vec, &trade.instrument_id, cursor.date_utc, path);
362 }
363 *cursor = DateCursor::new(trade.ts_init);
365 }
366
367 map.entry(trade.instrument_id)
368 .or_insert_with(|| Vec::with_capacity(100_000))
369 .push(trade);
370}
371
372fn handle_bar_msg(
373 bar: Bar,
374 map: &mut HashMap<BarType, Vec<Bar>>,
375 cursors: &mut HashMap<BarType, DateCursor>,
376 path: &Path,
377) {
378 let cursor = cursors
379 .entry(bar.bar_type)
380 .or_insert_with(|| DateCursor::new(bar.ts_init));
381
382 if bar.ts_init > cursor.end_ns {
383 if let Some(bars_vec) = map.remove(&bar.bar_type) {
384 batch_and_write_bars(bars_vec, &bar.bar_type, cursor.date_utc, path);
385 }
386 *cursor = DateCursor::new(bar.ts_init);
388 }
389
390 map.entry(bar.bar_type)
391 .or_insert_with(|| Vec::with_capacity(100_000))
392 .push(bar);
393}
394
395fn batch_and_write_deltas(
396 deltas: Vec<OrderBookDelta>,
397 instrument_id: &InstrumentId,
398 date: NaiveDate,
399 path: &Path,
400) {
401 let typename = stringify!(OrderBookDeltas);
402 match book_deltas_to_arrow_record_batch_bytes(deltas) {
403 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
404 Err(e) => {
405 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
406 }
407 }
408}
409
410fn batch_and_write_depths(
411 depths: Vec<OrderBookDepth10>,
412 instrument_id: &InstrumentId,
413 date: NaiveDate,
414 path: &Path,
415) {
416 let typename = "order_book_depths";
418 match book_depth10_to_arrow_record_batch_bytes(depths) {
419 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
420 Err(e) => {
421 tracing::error!("Error converting OrderBookDepth10 to Arrow: {e:?}");
422 }
423 }
424}
425
426fn batch_and_write_quotes(
427 quotes: Vec<QuoteTick>,
428 instrument_id: &InstrumentId,
429 date: NaiveDate,
430 path: &Path,
431) {
432 let typename = stringify!(QuoteTick);
433 match quotes_to_arrow_record_batch_bytes(quotes) {
434 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
435 Err(e) => {
436 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
437 }
438 }
439}
440
441fn batch_and_write_trades(
442 trades: Vec<TradeTick>,
443 instrument_id: &InstrumentId,
444 date: NaiveDate,
445 path: &Path,
446) {
447 let typename = stringify!(TradeTick);
448 match trades_to_arrow_record_batch_bytes(trades) {
449 Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
450 Err(e) => {
451 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
452 }
453 }
454}
455
456fn batch_and_write_bars(bars: Vec<Bar>, bar_type: &BarType, date: NaiveDate, path: &Path) {
457 let typename = stringify!(Bar);
458 let batch = match bars_to_arrow_record_batch_bytes(bars) {
459 Ok(batch) => batch,
460 Err(e) => {
461 tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
462 return;
463 }
464 };
465
466 let filepath = path.join(parquet_filepath_bars(bar_type, date));
467 if let Err(e) = write_parquet_local(batch, &filepath) {
468 tracing::error!("Error writing {filepath:?}: {e:?}");
469 } else {
470 tracing::info!("File written: {filepath:?}");
471 }
472}
473
474fn assert_post_epoch(date: NaiveDate) {
481 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("UNIX epoch must exist");
482 if date < epoch {
483 panic!("Tardis replay filenames require dates on or after 1970-01-01; received {date}");
484 }
485}
486
487fn iso_timestamp_to_file_timestamp(iso_timestamp: &str) -> String {
492 iso_timestamp.replace([':', '.'], "-")
493}
494
495fn timestamps_to_filename(timestamp_1: UnixNanos, timestamp_2: UnixNanos) -> String {
500 let datetime_1 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_1));
501 let datetime_2 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_2));
502
503 format!("{datetime_1}_{datetime_2}.parquet")
504}
505
506fn parquet_filepath(typename: &str, instrument_id: &InstrumentId, date: NaiveDate) -> PathBuf {
507 assert_post_epoch(date);
508
509 let typename = typename.to_snake_case();
510 let instrument_id_str = instrument_id.to_string().replace('/', "");
511
512 let start_utc = date.and_hms_opt(0, 0, 0).unwrap().and_utc();
513 let end_utc = date.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
514
515 let start_nanos = start_utc
516 .timestamp_nanos_opt()
517 .expect("valid nanosecond timestamp");
518 let end_nanos = (end_utc.and_utc())
519 .timestamp_nanos_opt()
520 .expect("valid nanosecond timestamp");
521
522 let filename = timestamps_to_filename(
523 UnixNanos::from(start_nanos as u64),
524 UnixNanos::from(end_nanos as u64),
525 );
526
527 PathBuf::new()
528 .join(typename)
529 .join(instrument_id_str)
530 .join(filename)
531}
532
533fn parquet_filepath_bars(bar_type: &BarType, date: NaiveDate) -> PathBuf {
534 assert_post_epoch(date);
535
536 let bar_type_str = bar_type.to_string().replace('/', "");
537
538 let start_utc = date.and_hms_opt(0, 0, 0).unwrap().and_utc();
540 let end_utc = date.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
541
542 let start_nanos = start_utc
543 .timestamp_nanos_opt()
544 .expect("valid nanosecond timestamp");
545 let end_nanos = (end_utc.and_utc())
546 .timestamp_nanos_opt()
547 .expect("valid nanosecond timestamp");
548
549 let filename = timestamps_to_filename(
550 UnixNanos::from(start_nanos as u64),
551 UnixNanos::from(end_nanos as u64),
552 );
553
554 PathBuf::new().join("bar").join(bar_type_str).join(filename)
555}
556
557fn write_batch(
558 batch: RecordBatch,
559 typename: &str,
560 instrument_id: &InstrumentId,
561 date: NaiveDate,
562 path: &Path,
563) {
564 let filepath = path.join(parquet_filepath(typename, instrument_id, date));
565 if let Err(e) = write_parquet_local(batch, &filepath) {
566 tracing::error!("Error writing {filepath:?}: {e:?}");
567 } else {
568 tracing::info!("File written: {filepath:?}");
569 }
570}
571
572fn write_parquet_local(batch: RecordBatch, file_path: &Path) -> anyhow::Result<()> {
573 if let Some(parent) = file_path.parent() {
574 std::fs::create_dir_all(parent)?;
575 }
576
577 let file = std::fs::File::create(file_path)?;
578 let props = WriterProperties::builder()
579 .set_compression(Compression::SNAPPY)
580 .build();
581
582 let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
583 writer.write(&batch)?;
584 writer.close()?;
585 Ok(())
586}
587
588#[cfg(test)]
589mod tests {
590 use chrono::{TimeZone, Utc};
591 use rstest::rstest;
592
593 use super::*;
594
595 #[rstest]
596 #[case(
597 Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
599 NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
600 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
601)]
602 #[case(
603 Utc.with_ymd_and_hms(2024, 1, 1, 12, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
605 NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
606 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
607)]
608 #[case(
609 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999,
611 NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
612 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
613)]
614 #[case(
615 Utc.with_ymd_and_hms(2024, 1, 2, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
617 NaiveDate::from_ymd_opt(2024, 1, 2).unwrap(),
618 Utc.with_ymd_and_hms(2024, 1, 2, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
619)]
620 fn test_date_cursor(
621 #[case] timestamp: u64,
622 #[case] expected_date: NaiveDate,
623 #[case] expected_end_ns: u64,
624 ) {
625 let unix_nanos = UnixNanos::from(timestamp);
626 let cursor = DateCursor::new(unix_nanos);
627
628 assert_eq!(cursor.date_utc, expected_date);
629 assert_eq!(cursor.end_ns, UnixNanos::from(expected_end_ns));
630 }
631}