nautilus_tardis/
replay.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    fs,
19    path::{Path, PathBuf},
20};
21
22use arrow::array::RecordBatch;
23use chrono::{DateTime, Duration, NaiveDate};
24use futures_util::{StreamExt, future::join_all, pin_mut};
25use heck::ToSnakeCase;
26use nautilus_core::{UnixNanos, parsing::precision_from_str};
27use nautilus_model::{
28    data::{
29        Bar, BarType, Data, OrderBookDelta, OrderBookDeltas_API, OrderBookDepth10, QuoteTick,
30        TradeTick,
31    },
32    identifiers::InstrumentId,
33};
34use nautilus_serialization::{
35    arrow::{
36        bars_to_arrow_record_batch_bytes, order_book_deltas_to_arrow_record_batch_bytes,
37        order_book_depth10_to_arrow_record_batch_bytes, quote_ticks_to_arrow_record_batch_bytes,
38        trade_ticks_to_arrow_record_batch_bytes,
39    },
40    parquet::write_batch_to_parquet,
41};
42use thousands::Separable;
43use ustr::Ustr;
44
45use super::{enums::Exchange, http::models::InstrumentInfo};
46use crate::{
47    config::TardisReplayConfig,
48    http::TardisHttpClient,
49    machine::{TardisMachineClient, types::InstrumentMiniInfo},
50    parse::{normalize_instrument_id, parse_instrument_id},
51};
52
53struct DateCursor {
54    /// Cursor date UTC.
55    date_utc: NaiveDate,
56    /// Cursor end timestamp UNIX nanoseconds.
57    end_ns: UnixNanos,
58}
59
60impl DateCursor {
61    /// Creates a new [`DateCursor`] instance.
62    fn new(current_ns: UnixNanos) -> Self {
63        let current_utc = DateTime::from_timestamp_nanos(current_ns.as_i64());
64        let date_utc = current_utc.date_naive();
65
66        // Calculate end of the current UTC day
67        // SAFETY: Known safe input values
68        let end_utc =
69            date_utc.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
70        let end_ns = UnixNanos::from(end_utc.and_utc().timestamp_nanos_opt().unwrap() as u64);
71
72        Self { date_utc, end_ns }
73    }
74}
75
76async fn gather_instruments_info(
77    config: &TardisReplayConfig,
78    http_client: &TardisHttpClient,
79) -> HashMap<Exchange, Vec<InstrumentInfo>> {
80    let futures = config.options.iter().map(|options| {
81        let exchange = options.exchange.clone();
82        let client = &http_client;
83
84        tracing::info!("Requesting instruments for {exchange}");
85
86        async move {
87            match client.instruments_info(exchange.clone(), None, None).await {
88                Ok(instruments) => Some((exchange, instruments)),
89                Err(e) => {
90                    tracing::error!("Error fetching instruments for {exchange}: {e}");
91                    None
92                }
93            }
94        }
95    });
96
97    let results: Vec<(Exchange, Vec<InstrumentInfo>)> =
98        join_all(futures).await.into_iter().flatten().collect();
99
100    tracing::info!("Received all instruments");
101
102    results.into_iter().collect()
103}
104
105pub async fn run_tardis_machine_replay_from_config(config_filepath: &Path) -> anyhow::Result<()> {
106    tracing::info!("Starting replay");
107    tracing::info!("Config filepath: {}", config_filepath.display());
108
109    let config_data = fs::read_to_string(config_filepath).expect("Failed to read config file");
110    let config: TardisReplayConfig =
111        serde_json::from_str(&config_data).expect("Failed to parse config JSON");
112
113    let path = config
114        .output_path
115        .as_deref()
116        .map(Path::new)
117        .map(Path::to_path_buf)
118        .or_else(|| {
119            std::env::var("NAUTILUS_CATALOG_PATH")
120                .ok()
121                .map(|env_path| PathBuf::from(env_path).join("data"))
122        })
123        .unwrap_or_else(|| std::env::current_dir().expect("Failed to get current directory"));
124
125    tracing::info!("Output path: {}", path.display());
126
127    let normalize_symbols = config.normalize_symbols.unwrap_or(true);
128    tracing::info!("normalize_symbols={normalize_symbols}");
129
130    let http_client = TardisHttpClient::new(None, None, None, normalize_symbols)?;
131    let mut machine_client =
132        TardisMachineClient::new(config.tardis_ws_url.as_deref(), normalize_symbols)?;
133
134    let info_map = gather_instruments_info(&config, &http_client).await;
135
136    for (exchange, instruments) in &info_map {
137        for inst in instruments {
138            let instrument_type = inst.instrument_type.clone();
139            let price_precision = precision_from_str(&inst.price_increment.to_string());
140            let size_precision = precision_from_str(&inst.amount_increment.to_string());
141
142            let instrument_id = if normalize_symbols {
143                normalize_instrument_id(exchange, inst.id, &instrument_type, inst.inverse)
144            } else {
145                parse_instrument_id(exchange, inst.id)
146            };
147
148            let info = InstrumentMiniInfo::new(
149                instrument_id,
150                Some(Ustr::from(&inst.id)),
151                exchange.clone(),
152                price_precision,
153                size_precision,
154            );
155            machine_client.add_instrument_info(info);
156        }
157    }
158
159    tracing::info!("Starting tardis-machine stream");
160    let stream = machine_client.replay(config.options).await;
161    pin_mut!(stream);
162
163    // Initialize date cursors
164    let mut deltas_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
165    let mut depths_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
166    let mut quotes_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
167    let mut trades_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
168    let mut bars_cursors: HashMap<BarType, DateCursor> = HashMap::new();
169
170    // Initialize date collection maps
171    let mut deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>> = HashMap::new();
172    let mut depths_map: HashMap<InstrumentId, Vec<OrderBookDepth10>> = HashMap::new();
173    let mut quotes_map: HashMap<InstrumentId, Vec<QuoteTick>> = HashMap::new();
174    let mut trades_map: HashMap<InstrumentId, Vec<TradeTick>> = HashMap::new();
175    let mut bars_map: HashMap<BarType, Vec<Bar>> = HashMap::new();
176
177    let mut msg_count = 0;
178
179    while let Some(msg) = stream.next().await {
180        match msg {
181            Data::Deltas(msg) => {
182                handle_deltas_msg(msg, &mut deltas_map, &mut deltas_cursors, &path);
183            }
184            Data::Depth10(msg) => {
185                handle_depth10_msg(*msg, &mut depths_map, &mut depths_cursors, &path);
186            }
187            Data::Quote(msg) => handle_quote_msg(msg, &mut quotes_map, &mut quotes_cursors, &path),
188            Data::Trade(msg) => handle_trade_msg(msg, &mut trades_map, &mut trades_cursors, &path),
189            Data::Bar(msg) => handle_bar_msg(msg, &mut bars_map, &mut bars_cursors, &path),
190            Data::Delta(_) => panic!("Individual delta message not implemented (or required)"),
191        }
192
193        msg_count += 1;
194        if msg_count % 100_000 == 0 {
195            tracing::debug!("Processed {} messages", msg_count.separate_with_commas());
196        }
197    }
198
199    // Naively iterate through every remaining type and instrument sequentially
200
201    for (instrument_id, deltas) in deltas_map {
202        let cursor = deltas_cursors.get(&instrument_id).expect("Expected cursor");
203        batch_and_write_deltas(deltas, &instrument_id, cursor.date_utc, &path);
204    }
205
206    for (instrument_id, depths) in depths_map {
207        let cursor = depths_cursors.get(&instrument_id).expect("Expected cursor");
208        batch_and_write_depths(depths, &instrument_id, cursor.date_utc, &path);
209    }
210
211    for (instrument_id, quotes) in quotes_map {
212        let cursor = quotes_cursors.get(&instrument_id).expect("Expected cursor");
213        batch_and_write_quotes(quotes, &instrument_id, cursor.date_utc, &path);
214    }
215
216    for (instrument_id, trades) in trades_map {
217        let cursor = trades_cursors.get(&instrument_id).expect("Expected cursor");
218        batch_and_write_trades(trades, &instrument_id, cursor.date_utc, &path);
219    }
220
221    for (bar_type, bars) in bars_map {
222        let cursor = bars_cursors.get(&bar_type).expect("Expected cursor");
223        batch_and_write_bars(bars, &bar_type, cursor.date_utc, &path);
224    }
225
226    tracing::info!(
227        "Replay completed after {} messages",
228        msg_count.separate_with_commas()
229    );
230    Ok(())
231}
232
233fn handle_deltas_msg(
234    deltas: OrderBookDeltas_API,
235    map: &mut HashMap<InstrumentId, Vec<OrderBookDelta>>,
236    cursors: &mut HashMap<InstrumentId, DateCursor>,
237    path: &Path,
238) {
239    let cursor = cursors
240        .entry(deltas.instrument_id)
241        .or_insert_with(|| DateCursor::new(deltas.ts_init));
242
243    if deltas.ts_init > cursor.end_ns {
244        if let Some(deltas_vec) = map.remove(&deltas.instrument_id) {
245            batch_and_write_deltas(deltas_vec, &deltas.instrument_id, cursor.date_utc, path);
246        }
247        // Update cursor
248        *cursor = DateCursor::new(deltas.ts_init);
249    }
250
251    map.entry(deltas.instrument_id)
252        .or_insert_with(|| Vec::with_capacity(1_000_000))
253        .extend(&*deltas.deltas);
254}
255
256fn handle_depth10_msg(
257    depth10: OrderBookDepth10,
258    map: &mut HashMap<InstrumentId, Vec<OrderBookDepth10>>,
259    cursors: &mut HashMap<InstrumentId, DateCursor>,
260    path: &Path,
261) {
262    let cursor = cursors
263        .entry(depth10.instrument_id)
264        .or_insert_with(|| DateCursor::new(depth10.ts_init));
265
266    if depth10.ts_init > cursor.end_ns {
267        if let Some(depths_vec) = map.remove(&depth10.instrument_id) {
268            batch_and_write_depths(depths_vec, &depth10.instrument_id, cursor.date_utc, path);
269        }
270        // Update cursor
271        *cursor = DateCursor::new(depth10.ts_init);
272    }
273
274    map.entry(depth10.instrument_id)
275        .or_insert_with(|| Vec::with_capacity(1_000_000))
276        .push(depth10);
277}
278
279fn handle_quote_msg(
280    quote: QuoteTick,
281    map: &mut HashMap<InstrumentId, Vec<QuoteTick>>,
282    cursors: &mut HashMap<InstrumentId, DateCursor>,
283    path: &Path,
284) {
285    let cursor = cursors
286        .entry(quote.instrument_id)
287        .or_insert_with(|| DateCursor::new(quote.ts_init));
288
289    if quote.ts_init > cursor.end_ns {
290        if let Some(quotes_vec) = map.remove(&quote.instrument_id) {
291            batch_and_write_quotes(quotes_vec, &quote.instrument_id, cursor.date_utc, path);
292        }
293        // Update cursor
294        *cursor = DateCursor::new(quote.ts_init);
295    }
296
297    map.entry(quote.instrument_id)
298        .or_insert_with(|| Vec::with_capacity(1_000_000))
299        .push(quote);
300}
301
302fn handle_trade_msg(
303    trade: TradeTick,
304    map: &mut HashMap<InstrumentId, Vec<TradeTick>>,
305    cursors: &mut HashMap<InstrumentId, DateCursor>,
306    path: &Path,
307) {
308    let cursor = cursors
309        .entry(trade.instrument_id)
310        .or_insert_with(|| DateCursor::new(trade.ts_init));
311
312    if trade.ts_init > cursor.end_ns {
313        if let Some(trades_vec) = map.remove(&trade.instrument_id) {
314            batch_and_write_trades(trades_vec, &trade.instrument_id, cursor.date_utc, path);
315        }
316        // Update cursor
317        *cursor = DateCursor::new(trade.ts_init);
318    }
319
320    map.entry(trade.instrument_id)
321        .or_insert_with(|| Vec::with_capacity(1_000_000))
322        .push(trade);
323}
324
325fn handle_bar_msg(
326    bar: Bar,
327    map: &mut HashMap<BarType, Vec<Bar>>,
328    cursors: &mut HashMap<BarType, DateCursor>,
329    path: &Path,
330) {
331    let cursor = cursors
332        .entry(bar.bar_type)
333        .or_insert_with(|| DateCursor::new(bar.ts_init));
334
335    if bar.ts_init > cursor.end_ns {
336        if let Some(bars_vec) = map.remove(&bar.bar_type) {
337            batch_and_write_bars(bars_vec, &bar.bar_type, cursor.date_utc, path);
338        }
339        // Update cursor
340        *cursor = DateCursor::new(bar.ts_init);
341    }
342
343    map.entry(bar.bar_type)
344        .or_insert_with(|| Vec::with_capacity(1_000_000))
345        .push(bar);
346}
347
348fn batch_and_write_deltas(
349    deltas: Vec<OrderBookDelta>,
350    instrument_id: &InstrumentId,
351    date: NaiveDate,
352    path: &Path,
353) {
354    let typename = stringify!(OrderBookDeltas);
355    match order_book_deltas_to_arrow_record_batch_bytes(deltas) {
356        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
357        Err(e) => {
358            tracing::error!("Error converting `{typename}` to Arrow: {e:?}",);
359        }
360    }
361}
362
363fn batch_and_write_depths(
364    depths: Vec<OrderBookDepth10>,
365    instrument_id: &InstrumentId,
366    date: NaiveDate,
367    path: &Path,
368) {
369    let typename = stringify!(OrderBookDepth10);
370    match order_book_depth10_to_arrow_record_batch_bytes(depths) {
371        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
372        Err(e) => {
373            tracing::error!("Error converting `{typename}` to Arrow: {e:?}",);
374        }
375    }
376}
377
378fn batch_and_write_quotes(
379    quotes: Vec<QuoteTick>,
380    instrument_id: &InstrumentId,
381    date: NaiveDate,
382    path: &Path,
383) {
384    let typename = stringify!(QuoteTick);
385    match quote_ticks_to_arrow_record_batch_bytes(quotes) {
386        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
387        Err(e) => {
388            tracing::error!("Error converting `{typename}` to Arrow: {e:?}",);
389        }
390    }
391}
392
393fn batch_and_write_trades(
394    trades: Vec<TradeTick>,
395    instrument_id: &InstrumentId,
396    date: NaiveDate,
397    path: &Path,
398) {
399    let typename = stringify!(TradeTick);
400    match trade_ticks_to_arrow_record_batch_bytes(trades) {
401        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
402        Err(e) => {
403            tracing::error!("Error converting `{typename}` to Arrow: {e:?}",);
404        }
405    }
406}
407
408fn batch_and_write_bars(bars: Vec<Bar>, bar_type: &BarType, date: NaiveDate, path: &Path) {
409    let typename = stringify!(Bar);
410    let batch = match bars_to_arrow_record_batch_bytes(bars) {
411        Ok(batch) => batch,
412        Err(e) => {
413            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
414            return;
415        }
416    };
417
418    let filepath = path.join(parquet_filepath_bars(bar_type, date));
419    match write_batch_to_parquet(batch, &filepath, None) {
420        Ok(()) => tracing::info!("File written: {}", filepath.display()),
421        Err(e) => tracing::error!("Error writing {}: {e:?}", filepath.display()),
422    }
423}
424
425fn parquet_filepath(typename: &str, instrument_id: &InstrumentId, date: NaiveDate) -> PathBuf {
426    let typename = typename.to_snake_case();
427    let instrument_id_str = instrument_id.to_string().replace('/', "");
428    let date_str = date.to_string().replace('-', "");
429    PathBuf::new()
430        .join(typename)
431        .join(instrument_id_str)
432        .join(format!("{date_str}.parquet"))
433}
434
435fn parquet_filepath_bars(bar_type: &BarType, date: NaiveDate) -> PathBuf {
436    let bar_type_str = bar_type.to_string().replace('/', "");
437    let date_str = date.to_string().replace('-', "");
438    PathBuf::new()
439        .join("bar")
440        .join(bar_type_str)
441        .join(format!("{date_str}.parquet"))
442}
443
444fn write_batch(
445    batch: RecordBatch,
446    typename: &str,
447    instrument_id: &InstrumentId,
448    date: NaiveDate,
449    path: &Path,
450) {
451    let filepath = path.join(parquet_filepath(typename, instrument_id, date));
452    match write_batch_to_parquet(batch, &filepath, None) {
453        Ok(()) => tracing::info!("File written: {}", filepath.display()),
454        Err(e) => tracing::error!("Error writing {}: {e:?}", filepath.display()),
455    }
456}
457
458///////////////////////////////////////////////////////////////////////////////////////////////////
459// Tests
460///////////////////////////////////////////////////////////////////////////////////////////////////
461#[cfg(test)]
462mod tests {
463    use chrono::{TimeZone, Utc};
464    use rstest::rstest;
465
466    use super::*;
467
468    #[rstest]
469    #[case(
470    // Start of day: 2024-01-01 00:00:00 UTC
471    Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
472    NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
473    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
474)]
475    #[case(
476    // Midday: 2024-01-01 12:00:00 UTC
477    Utc.with_ymd_and_hms(2024, 1, 1, 12, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
478    NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
479    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
480)]
481    #[case(
482    // End of day: 2024-01-01 23:59:59.999999999 UTC
483    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999,
484    NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
485    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
486)]
487    #[case(
488    // Start of new day: 2024-01-02 00:00:00 UTC
489    Utc.with_ymd_and_hms(2024, 1, 2, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
490    NaiveDate::from_ymd_opt(2024, 1, 2).unwrap(),
491    Utc.with_ymd_and_hms(2024, 1, 2, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
492)]
493    fn test_date_cursor(
494        #[case] timestamp: u64,
495        #[case] expected_date: NaiveDate,
496        #[case] expected_end_ns: u64,
497    ) {
498        let unix_nanos = UnixNanos::from(timestamp);
499        let cursor = DateCursor::new(unix_nanos);
500
501        assert_eq!(cursor.date_utc, expected_date);
502        assert_eq!(cursor.end_ns, UnixNanos::from(expected_end_ns));
503    }
504}