nautilus_tardis/csv/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16mod record;
17
18use std::{error::Error, fs::File, io::BufReader, path::Path};
19
20use csv::{Reader, ReaderBuilder, StringRecord};
21use flate2::read::GzDecoder;
22use nautilus_core::UnixNanos;
23use nautilus_model::{
24    data::{
25        BookOrder, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick, DEPTH10_LEN, NULL_ORDER,
26    },
27    enums::{OrderSide, RecordFlag},
28    identifiers::{InstrumentId, TradeId},
29    types::{Price, Quantity},
30};
31
32use crate::parse::parse_price;
33
34use super::{
35    csv::record::{
36        TardisBookUpdateRecord, TardisOrderBookSnapshot25Record, TardisOrderBookSnapshot5Record,
37        TardisQuoteRecord, TardisTradeRecord,
38    },
39    parse::{
40        parse_aggressor_side, parse_book_action, parse_instrument_id, parse_order_side,
41        parse_timestamp,
42    },
43};
44
45/// Creates a new CSV reader which can handle gzip compression.
46pub fn create_csv_reader<P: AsRef<Path>>(
47    filepath: P,
48) -> anyhow::Result<Reader<Box<dyn std::io::Read>>> {
49    let file = File::open(filepath.as_ref())?;
50    let buf_reader = BufReader::new(file);
51
52    // Determine if the file is gzipped by its extension
53    let reader: Box<dyn std::io::Read> =
54        if filepath.as_ref().extension().unwrap_or_default() == "gz" {
55            Box::new(GzDecoder::new(buf_reader)) // Decompress the gzipped file
56        } else {
57            Box::new(buf_reader) // Regular file reader
58        };
59
60    Ok(ReaderBuilder::new().has_headers(true).from_reader(reader))
61}
62
63/// Load [`OrderBookDelta`]s from a Tardis format CSV at the given `filepath`.
64pub fn load_deltas<P: AsRef<Path>>(
65    filepath: P,
66    price_precision: u8,
67    size_precision: u8,
68    instrument_id: Option<InstrumentId>,
69    limit: Option<usize>,
70) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
71    let mut csv_reader = create_csv_reader(filepath)?;
72    let mut deltas: Vec<OrderBookDelta> = Vec::new();
73    let mut last_ts_event = UnixNanos::default();
74
75    let mut raw_record = StringRecord::new();
76    while csv_reader.read_record(&mut raw_record)? {
77        let record: TardisBookUpdateRecord = raw_record.deserialize(None)?;
78
79        let instrument_id = match &instrument_id {
80            Some(id) => *id,
81            None => parse_instrument_id(&record.exchange, record.symbol),
82        };
83        let side = parse_order_side(&record.side);
84        let price = parse_price(record.price, price_precision);
85        let size = Quantity::new(record.amount, size_precision);
86        let order_id = 0; // Not applicable for L2 data
87        let order = BookOrder::new(side, price, size, order_id);
88
89        let action = parse_book_action(record.is_snapshot, record.amount);
90        let flags = 0; // Flags always zero until timestamp changes
91        let sequence = 0; // Sequence not available
92        let ts_event = parse_timestamp(record.timestamp);
93        let ts_init = parse_timestamp(record.local_timestamp);
94
95        // Check if timestamp is different from last timestamp
96        if last_ts_event != ts_event {
97            if let Some(last_delta) = deltas.last_mut() {
98                // Set previous delta flags as F_LAST
99                last_delta.flags = RecordFlag::F_LAST.value();
100            }
101        }
102
103        last_ts_event = ts_event;
104
105        let delta = OrderBookDelta::new(
106            instrument_id,
107            action,
108            order,
109            flags,
110            sequence,
111            ts_event,
112            ts_init,
113        );
114
115        deltas.push(delta);
116
117        if let Some(limit) = limit {
118            if deltas.len() >= limit {
119                break;
120            }
121        }
122    }
123
124    // Set F_LAST flag for final delta
125    if let Some(last_delta) = deltas.last_mut() {
126        last_delta.flags = RecordFlag::F_LAST.value();
127    }
128
129    Ok(deltas)
130}
131
132fn create_book_order(
133    side: OrderSide,
134    price: Option<f64>,
135    amount: Option<f64>,
136    price_precision: u8,
137    size_precision: u8,
138) -> (BookOrder, u32) {
139    match price {
140        Some(price) => (
141            BookOrder::new(
142                side,
143                Price::new(price, price_precision),
144                Quantity::new(amount.unwrap_or(0.0), size_precision),
145                0,
146            ),
147            1, // Count set to 1 if order exists
148        ),
149        None => (NULL_ORDER, 0), // NULL_ORDER if price is None
150    }
151}
152
153/// Load [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`.
154pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
155    filepath: P,
156    price_precision: u8,
157    size_precision: u8,
158    instrument_id: Option<InstrumentId>,
159    limit: Option<usize>,
160) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
161    let mut csv_reader = create_csv_reader(filepath)?;
162    let mut depths: Vec<OrderBookDepth10> = Vec::new();
163
164    let mut raw_record = StringRecord::new();
165    while csv_reader.read_record(&mut raw_record)? {
166        let record: TardisOrderBookSnapshot5Record = raw_record.deserialize(None)?;
167        let instrument_id = match &instrument_id {
168            Some(id) => *id,
169            None => parse_instrument_id(&record.exchange, record.symbol),
170        };
171        let flags = RecordFlag::F_LAST.value();
172        let sequence = 0; // Sequence not available
173        let ts_event = parse_timestamp(record.timestamp);
174        let ts_init = parse_timestamp(record.local_timestamp);
175
176        // Initialize empty arrays
177        let mut bids = [NULL_ORDER; DEPTH10_LEN];
178        let mut asks = [NULL_ORDER; DEPTH10_LEN];
179        let mut bid_counts = [0u32; DEPTH10_LEN];
180        let mut ask_counts = [0u32; DEPTH10_LEN];
181
182        for i in 0..=4 {
183            // Create bids
184            let (bid_order, bid_count) = create_book_order(
185                OrderSide::Buy,
186                match i {
187                    0 => record.bids_0_price,
188                    1 => record.bids_1_price,
189                    2 => record.bids_2_price,
190                    3 => record.bids_3_price,
191                    4 => record.bids_4_price,
192                    _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
193                },
194                match i {
195                    0 => record.bids_0_amount,
196                    1 => record.bids_1_amount,
197                    2 => record.bids_2_amount,
198                    3 => record.bids_3_amount,
199                    4 => record.bids_4_amount,
200                    _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
201                },
202                price_precision,
203                size_precision,
204            );
205            bids[i] = bid_order;
206            bid_counts[i] = bid_count;
207
208            // Create asks
209            let (ask_order, ask_count) = create_book_order(
210                OrderSide::Sell,
211                match i {
212                    0 => record.asks_0_price,
213                    1 => record.asks_1_price,
214                    2 => record.asks_2_price,
215                    3 => record.asks_3_price,
216                    4 => record.asks_4_price,
217                    _ => None, // Unreachable, but for safety
218                },
219                match i {
220                    0 => record.asks_0_amount,
221                    1 => record.asks_1_amount,
222                    2 => record.asks_2_amount,
223                    3 => record.asks_3_amount,
224                    4 => record.asks_4_amount,
225                    _ => None, // Unreachable, but for safety
226                },
227                price_precision,
228                size_precision,
229            );
230            asks[i] = ask_order;
231            ask_counts[i] = ask_count;
232        }
233
234        let depth = OrderBookDepth10::new(
235            instrument_id,
236            bids,
237            asks,
238            bid_counts,
239            ask_counts,
240            flags,
241            sequence,
242            ts_event,
243            ts_init,
244        );
245
246        depths.push(depth);
247
248        if let Some(limit) = limit {
249            if depths.len() >= limit {
250                break;
251            }
252        }
253    }
254
255    Ok(depths)
256}
257
258pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
259    filepath: P,
260    price_precision: u8,
261    size_precision: u8,
262    instrument_id: Option<InstrumentId>,
263    limit: Option<usize>,
264) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
265    let mut csv_reader = create_csv_reader(filepath)?;
266    let mut depths: Vec<OrderBookDepth10> = Vec::new();
267
268    let mut raw_record = StringRecord::new();
269    while csv_reader.read_record(&mut raw_record)? {
270        let record: TardisOrderBookSnapshot25Record = raw_record.deserialize(None)?;
271
272        let instrument_id = match &instrument_id {
273            Some(id) => *id,
274            None => parse_instrument_id(&record.exchange, record.symbol),
275        };
276        let flags = RecordFlag::F_LAST.value();
277        let sequence = 0; // Sequence not available
278        let ts_event = parse_timestamp(record.timestamp);
279        let ts_init = parse_timestamp(record.local_timestamp);
280
281        // Initialize empty arrays for the first 10 levels only
282        let mut bids = [NULL_ORDER; DEPTH10_LEN];
283        let mut asks = [NULL_ORDER; DEPTH10_LEN];
284        let mut bid_counts = [0u32; DEPTH10_LEN];
285        let mut ask_counts = [0u32; DEPTH10_LEN];
286
287        // Fill only the first 10 levels from the 25-level record
288        for i in 0..DEPTH10_LEN {
289            // Create bids
290            let (bid_order, bid_count) = create_book_order(
291                OrderSide::Buy,
292                match i {
293                    0 => record.bids_0_price,
294                    1 => record.bids_1_price,
295                    2 => record.bids_2_price,
296                    3 => record.bids_3_price,
297                    4 => record.bids_4_price,
298                    5 => record.bids_5_price,
299                    6 => record.bids_6_price,
300                    7 => record.bids_7_price,
301                    8 => record.bids_8_price,
302                    9 => record.bids_9_price,
303                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
304                },
305                match i {
306                    0 => record.bids_0_amount,
307                    1 => record.bids_1_amount,
308                    2 => record.bids_2_amount,
309                    3 => record.bids_3_amount,
310                    4 => record.bids_4_amount,
311                    5 => record.bids_5_amount,
312                    6 => record.bids_6_amount,
313                    7 => record.bids_7_amount,
314                    8 => record.bids_8_amount,
315                    9 => record.bids_9_amount,
316                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
317                },
318                price_precision,
319                size_precision,
320            );
321            bids[i] = bid_order;
322            bid_counts[i] = bid_count;
323
324            // Create asks
325            let (ask_order, ask_count) = create_book_order(
326                OrderSide::Sell,
327                match i {
328                    0 => record.asks_0_price,
329                    1 => record.asks_1_price,
330                    2 => record.asks_2_price,
331                    3 => record.asks_3_price,
332                    4 => record.asks_4_price,
333                    5 => record.asks_5_price,
334                    6 => record.asks_6_price,
335                    7 => record.asks_7_price,
336                    8 => record.asks_8_price,
337                    9 => record.asks_9_price,
338                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
339                },
340                match i {
341                    0 => record.asks_0_amount,
342                    1 => record.asks_1_amount,
343                    2 => record.asks_2_amount,
344                    3 => record.asks_3_amount,
345                    4 => record.asks_4_amount,
346                    5 => record.asks_5_amount,
347                    6 => record.asks_6_amount,
348                    7 => record.asks_7_amount,
349                    8 => record.asks_8_amount,
350                    9 => record.asks_9_amount,
351                    _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
352                },
353                price_precision,
354                size_precision,
355            );
356            asks[i] = ask_order;
357            ask_counts[i] = ask_count;
358        }
359
360        let depth = OrderBookDepth10::new(
361            instrument_id,
362            bids,
363            asks,
364            bid_counts,
365            ask_counts,
366            flags,
367            sequence,
368            ts_event,
369            ts_init,
370        );
371
372        depths.push(depth);
373
374        if let Some(limit) = limit {
375            if depths.len() >= limit {
376                break;
377            }
378        }
379    }
380
381    Ok(depths)
382}
383
384/// Load [`QuoteTick`]s from a Tardis format CSV at the given `filepath`.
385pub fn load_quote_ticks<P: AsRef<Path>>(
386    filepath: P,
387    price_precision: u8,
388    size_precision: u8,
389    instrument_id: Option<InstrumentId>,
390    limit: Option<usize>,
391) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
392    let mut csv_reader = create_csv_reader(filepath)?;
393    let mut quotes = Vec::new();
394
395    let mut raw_record = StringRecord::new();
396    while csv_reader.read_record(&mut raw_record)? {
397        let record: TardisQuoteRecord = raw_record.deserialize(None)?;
398
399        let instrument_id = match &instrument_id {
400            Some(id) => *id,
401            None => parse_instrument_id(&record.exchange, record.symbol),
402        };
403        let bid_price = Price::new(record.bid_price.unwrap_or(0.0), price_precision);
404        let bid_size = Quantity::new(record.bid_amount.unwrap_or(0.0), size_precision);
405        let ask_price = Price::new(record.ask_price.unwrap_or(0.0), price_precision);
406        let ask_size = Quantity::new(record.ask_amount.unwrap_or(0.0), size_precision);
407        let ts_event = parse_timestamp(record.timestamp);
408        let ts_init = parse_timestamp(record.local_timestamp);
409
410        let quote = QuoteTick::new(
411            instrument_id,
412            bid_price,
413            ask_price,
414            bid_size,
415            ask_size,
416            ts_event,
417            ts_init,
418        );
419
420        quotes.push(quote);
421
422        if let Some(limit) = limit {
423            if quotes.len() >= limit {
424                break;
425            }
426        }
427    }
428
429    Ok(quotes)
430}
431
432/// Load [`TradeTick`]s from a Tardis format CSV at the given `filepath`.
433pub fn load_trade_ticks<P: AsRef<Path>>(
434    filepath: P,
435    price_precision: u8,
436    size_precision: u8,
437    instrument_id: Option<InstrumentId>,
438    limit: Option<usize>,
439) -> Result<Vec<TradeTick>, Box<dyn Error>> {
440    let mut csv_reader = create_csv_reader(filepath)?;
441    let mut trades = Vec::new();
442
443    let mut raw_record = StringRecord::new();
444    while csv_reader.read_record(&mut raw_record)? {
445        let record: TardisTradeRecord = raw_record.deserialize(None)?;
446
447        let instrument_id = match &instrument_id {
448            Some(id) => *id,
449            None => parse_instrument_id(&record.exchange, record.symbol),
450        };
451        let price = Price::new(record.price, price_precision);
452        let size = Quantity::new(record.amount, size_precision);
453        let aggressor_side = parse_aggressor_side(&record.side);
454        let trade_id = TradeId::new(&record.id);
455        let ts_event = parse_timestamp(record.timestamp);
456        let ts_init = parse_timestamp(record.local_timestamp);
457
458        let trade = TradeTick::new(
459            instrument_id,
460            price,
461            size,
462            aggressor_side,
463            trade_id,
464            ts_event,
465            ts_init,
466        );
467
468        trades.push(trade);
469
470        if let Some(limit) = limit {
471            if trades.len() >= limit {
472                break;
473            }
474        }
475    }
476
477    Ok(trades)
478}
479
480////////////////////////////////////////////////////////////////////////////////
481// Tests
482////////////////////////////////////////////////////////////////////////////////
483#[cfg(test)]
484mod tests {
485    use nautilus_model::{
486        enums::{AggressorSide, BookAction},
487        identifiers::InstrumentId,
488    };
489    use nautilus_test_kit::common::{
490        ensure_data_exists_tardis_binance_snapshot25, ensure_data_exists_tardis_binance_snapshot5,
491        ensure_data_exists_tardis_bitmex_trades, ensure_data_exists_tardis_deribit_book_l2,
492        ensure_data_exists_tardis_huobi_quotes,
493    };
494    use rstest::*;
495
496    use super::*;
497
498    #[rstest]
499    pub fn test_read_deltas() {
500        let filepath = ensure_data_exists_tardis_deribit_book_l2();
501        let deltas = load_deltas(filepath, 1, 0, None, Some(1_000)).unwrap();
502
503        assert_eq!(deltas.len(), 1_000);
504        assert_eq!(
505            deltas[0].instrument_id,
506            InstrumentId::from("BTC-PERPETUAL.DERIBIT")
507        );
508        assert_eq!(deltas[0].action, BookAction::Add);
509        assert_eq!(deltas[0].order.side, OrderSide::Sell);
510        assert_eq!(deltas[0].order.price, Price::from("6421.5"));
511        assert_eq!(deltas[0].order.size, Quantity::from("18640"));
512        assert_eq!(deltas[0].flags, 0);
513        assert_eq!(deltas[0].sequence, 0);
514        assert_eq!(deltas[0].ts_event, 1585699200245000000);
515        assert_eq!(deltas[0].ts_init, 1585699200355684000);
516    }
517
518    #[rstest]
519    pub fn test_read_depth10s_from_snapshot5() {
520        let filepath = ensure_data_exists_tardis_binance_snapshot5();
521        let depths = load_depth10_from_snapshot5(filepath, 1, 0, None, Some(100_000)).unwrap();
522
523        assert_eq!(depths.len(), 100_000);
524        assert_eq!(
525            depths[0].instrument_id,
526            InstrumentId::from("BTCUSDT.BINANCE")
527        );
528        assert_eq!(depths[0].bids.len(), 10);
529        assert_eq!(depths[0].bids[0].price, Price::from("11657.1"));
530        assert_eq!(depths[0].bids[0].size, Quantity::from("11"));
531        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
532        assert_eq!(depths[0].bids[0].order_id, 0);
533        assert_eq!(depths[0].asks.len(), 10);
534        assert_eq!(depths[0].asks[0].price, Price::from("11657.1"));
535        assert_eq!(depths[0].asks[0].size, Quantity::from("2"));
536        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
537        assert_eq!(depths[0].asks[0].order_id, 0);
538        assert_eq!(depths[0].bid_counts[0], 1);
539        assert_eq!(depths[0].ask_counts[0], 1);
540        assert_eq!(depths[0].flags, 128);
541        assert_eq!(depths[0].ts_event, 1598918403696000000);
542        assert_eq!(depths[0].ts_init, 1598918403810979000);
543        assert_eq!(depths[0].sequence, 0);
544    }
545
546    #[rstest]
547    pub fn test_read_depth10s_from_snapshot25() {
548        let filepath = ensure_data_exists_tardis_binance_snapshot25();
549        let depths = load_depth10_from_snapshot25(filepath, 1, 0, None, Some(100_000)).unwrap();
550
551        assert_eq!(depths.len(), 100_000);
552        assert_eq!(
553            depths[0].instrument_id,
554            InstrumentId::from("BTCUSDT.BINANCE")
555        );
556        assert_eq!(depths[0].bids.len(), 10);
557        assert_eq!(depths[0].bids[0].price, Price::from("11657.1"));
558        assert_eq!(depths[0].bids[0].size, Quantity::from("11"));
559        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
560        assert_eq!(depths[0].bids[0].order_id, 0);
561        assert_eq!(depths[0].asks.len(), 10);
562        assert_eq!(depths[0].asks[0].price, Price::from("11657.1"));
563        assert_eq!(depths[0].asks[0].size, Quantity::from("2"));
564        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
565        assert_eq!(depths[0].asks[0].order_id, 0);
566        assert_eq!(depths[0].bid_counts[0], 1);
567        assert_eq!(depths[0].ask_counts[0], 1);
568        assert_eq!(depths[0].flags, 128);
569        assert_eq!(depths[0].ts_event, 1598918403696000000);
570        assert_eq!(depths[0].ts_init, 1598918403810979000);
571        assert_eq!(depths[0].sequence, 0);
572    }
573
574    #[rstest]
575    pub fn test_read_quotes() {
576        let filepath = ensure_data_exists_tardis_huobi_quotes();
577        let quotes = load_quote_ticks(filepath, 1, 0, None, Some(100_000)).unwrap();
578
579        assert_eq!(quotes.len(), 100_000);
580        assert_eq!(quotes[0].instrument_id, InstrumentId::from("BTC-USD.HUOBI"));
581        assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
582        assert_eq!(quotes[0].bid_size, Quantity::from("806"));
583        assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
584        assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
585        assert_eq!(quotes[0].ts_event, 1588291201099000000);
586        assert_eq!(quotes[0].ts_init, 1588291201234268000);
587    }
588
589    #[rstest]
590    pub fn test_read_trades() {
591        let filepath = ensure_data_exists_tardis_bitmex_trades();
592        let trades = load_trade_ticks(filepath, 1, 0, None, Some(100_000)).unwrap();
593
594        assert_eq!(trades.len(), 100_000);
595        assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
596        assert_eq!(trades[0].price, Price::from("8531.5"));
597        assert_eq!(trades[0].size, Quantity::from("2152"));
598        assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
599        assert_eq!(
600            trades[0].trade_id,
601            TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
602        );
603        assert_eq!(trades[0].ts_event, 1583020803145000000);
604        assert_eq!(trades[0].ts_init, 1583020803307160000);
605    }
606}