1mod record;
17
18use std::{error::Error, fs::File, io::BufReader, path::Path};
19
20use csv::{Reader, ReaderBuilder, StringRecord};
21use flate2::read::GzDecoder;
22use nautilus_core::UnixNanos;
23use nautilus_model::{
24 data::{
25 BookOrder, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick, DEPTH10_LEN, NULL_ORDER,
26 },
27 enums::{OrderSide, RecordFlag},
28 identifiers::{InstrumentId, TradeId},
29 types::{Price, Quantity},
30};
31
32use crate::parse::parse_price;
33
34use super::{
35 csv::record::{
36 TardisBookUpdateRecord, TardisOrderBookSnapshot25Record, TardisOrderBookSnapshot5Record,
37 TardisQuoteRecord, TardisTradeRecord,
38 },
39 parse::{
40 parse_aggressor_side, parse_book_action, parse_instrument_id, parse_order_side,
41 parse_timestamp,
42 },
43};
44
45pub fn create_csv_reader<P: AsRef<Path>>(
47 filepath: P,
48) -> anyhow::Result<Reader<Box<dyn std::io::Read>>> {
49 let file = File::open(filepath.as_ref())?;
50 let buf_reader = BufReader::new(file);
51
52 let reader: Box<dyn std::io::Read> =
54 if filepath.as_ref().extension().unwrap_or_default() == "gz" {
55 Box::new(GzDecoder::new(buf_reader)) } else {
57 Box::new(buf_reader) };
59
60 Ok(ReaderBuilder::new().has_headers(true).from_reader(reader))
61}
62
63pub fn load_deltas<P: AsRef<Path>>(
65 filepath: P,
66 price_precision: u8,
67 size_precision: u8,
68 instrument_id: Option<InstrumentId>,
69 limit: Option<usize>,
70) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
71 let mut csv_reader = create_csv_reader(filepath)?;
72 let mut deltas: Vec<OrderBookDelta> = Vec::new();
73 let mut last_ts_event = UnixNanos::default();
74
75 let mut raw_record = StringRecord::new();
76 while csv_reader.read_record(&mut raw_record)? {
77 let record: TardisBookUpdateRecord = raw_record.deserialize(None)?;
78
79 let instrument_id = match &instrument_id {
80 Some(id) => *id,
81 None => parse_instrument_id(&record.exchange, record.symbol),
82 };
83 let side = parse_order_side(&record.side);
84 let price = parse_price(record.price, price_precision);
85 let size = Quantity::new(record.amount, size_precision);
86 let order_id = 0; let order = BookOrder::new(side, price, size, order_id);
88
89 let action = parse_book_action(record.is_snapshot, record.amount);
90 let flags = 0; let sequence = 0; let ts_event = parse_timestamp(record.timestamp);
93 let ts_init = parse_timestamp(record.local_timestamp);
94
95 if last_ts_event != ts_event {
97 if let Some(last_delta) = deltas.last_mut() {
98 last_delta.flags = RecordFlag::F_LAST.value();
100 }
101 }
102
103 last_ts_event = ts_event;
104
105 let delta = OrderBookDelta::new(
106 instrument_id,
107 action,
108 order,
109 flags,
110 sequence,
111 ts_event,
112 ts_init,
113 );
114
115 deltas.push(delta);
116
117 if let Some(limit) = limit {
118 if deltas.len() >= limit {
119 break;
120 }
121 }
122 }
123
124 if let Some(last_delta) = deltas.last_mut() {
126 last_delta.flags = RecordFlag::F_LAST.value();
127 }
128
129 Ok(deltas)
130}
131
132fn create_book_order(
133 side: OrderSide,
134 price: Option<f64>,
135 amount: Option<f64>,
136 price_precision: u8,
137 size_precision: u8,
138) -> (BookOrder, u32) {
139 match price {
140 Some(price) => (
141 BookOrder::new(
142 side,
143 Price::new(price, price_precision),
144 Quantity::new(amount.unwrap_or(0.0), size_precision),
145 0,
146 ),
147 1, ),
149 None => (NULL_ORDER, 0), }
151}
152
153pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
155 filepath: P,
156 price_precision: u8,
157 size_precision: u8,
158 instrument_id: Option<InstrumentId>,
159 limit: Option<usize>,
160) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
161 let mut csv_reader = create_csv_reader(filepath)?;
162 let mut depths: Vec<OrderBookDepth10> = Vec::new();
163
164 let mut raw_record = StringRecord::new();
165 while csv_reader.read_record(&mut raw_record)? {
166 let record: TardisOrderBookSnapshot5Record = raw_record.deserialize(None)?;
167 let instrument_id = match &instrument_id {
168 Some(id) => *id,
169 None => parse_instrument_id(&record.exchange, record.symbol),
170 };
171 let flags = RecordFlag::F_LAST.value();
172 let sequence = 0; let ts_event = parse_timestamp(record.timestamp);
174 let ts_init = parse_timestamp(record.local_timestamp);
175
176 let mut bids = [NULL_ORDER; DEPTH10_LEN];
178 let mut asks = [NULL_ORDER; DEPTH10_LEN];
179 let mut bid_counts = [0u32; DEPTH10_LEN];
180 let mut ask_counts = [0u32; DEPTH10_LEN];
181
182 for i in 0..=4 {
183 let (bid_order, bid_count) = create_book_order(
185 OrderSide::Buy,
186 match i {
187 0 => record.bids_0_price,
188 1 => record.bids_1_price,
189 2 => record.bids_2_price,
190 3 => record.bids_3_price,
191 4 => record.bids_4_price,
192 _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
193 },
194 match i {
195 0 => record.bids_0_amount,
196 1 => record.bids_1_amount,
197 2 => record.bids_2_amount,
198 3 => record.bids_3_amount,
199 4 => record.bids_4_amount,
200 _ => panic!("Invalid level for snapshot5 -> depth10 parsing"),
201 },
202 price_precision,
203 size_precision,
204 );
205 bids[i] = bid_order;
206 bid_counts[i] = bid_count;
207
208 let (ask_order, ask_count) = create_book_order(
210 OrderSide::Sell,
211 match i {
212 0 => record.asks_0_price,
213 1 => record.asks_1_price,
214 2 => record.asks_2_price,
215 3 => record.asks_3_price,
216 4 => record.asks_4_price,
217 _ => None, },
219 match i {
220 0 => record.asks_0_amount,
221 1 => record.asks_1_amount,
222 2 => record.asks_2_amount,
223 3 => record.asks_3_amount,
224 4 => record.asks_4_amount,
225 _ => None, },
227 price_precision,
228 size_precision,
229 );
230 asks[i] = ask_order;
231 ask_counts[i] = ask_count;
232 }
233
234 let depth = OrderBookDepth10::new(
235 instrument_id,
236 bids,
237 asks,
238 bid_counts,
239 ask_counts,
240 flags,
241 sequence,
242 ts_event,
243 ts_init,
244 );
245
246 depths.push(depth);
247
248 if let Some(limit) = limit {
249 if depths.len() >= limit {
250 break;
251 }
252 }
253 }
254
255 Ok(depths)
256}
257
258pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
259 filepath: P,
260 price_precision: u8,
261 size_precision: u8,
262 instrument_id: Option<InstrumentId>,
263 limit: Option<usize>,
264) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
265 let mut csv_reader = create_csv_reader(filepath)?;
266 let mut depths: Vec<OrderBookDepth10> = Vec::new();
267
268 let mut raw_record = StringRecord::new();
269 while csv_reader.read_record(&mut raw_record)? {
270 let record: TardisOrderBookSnapshot25Record = raw_record.deserialize(None)?;
271
272 let instrument_id = match &instrument_id {
273 Some(id) => *id,
274 None => parse_instrument_id(&record.exchange, record.symbol),
275 };
276 let flags = RecordFlag::F_LAST.value();
277 let sequence = 0; let ts_event = parse_timestamp(record.timestamp);
279 let ts_init = parse_timestamp(record.local_timestamp);
280
281 let mut bids = [NULL_ORDER; DEPTH10_LEN];
283 let mut asks = [NULL_ORDER; DEPTH10_LEN];
284 let mut bid_counts = [0u32; DEPTH10_LEN];
285 let mut ask_counts = [0u32; DEPTH10_LEN];
286
287 for i in 0..DEPTH10_LEN {
289 let (bid_order, bid_count) = create_book_order(
291 OrderSide::Buy,
292 match i {
293 0 => record.bids_0_price,
294 1 => record.bids_1_price,
295 2 => record.bids_2_price,
296 3 => record.bids_3_price,
297 4 => record.bids_4_price,
298 5 => record.bids_5_price,
299 6 => record.bids_6_price,
300 7 => record.bids_7_price,
301 8 => record.bids_8_price,
302 9 => record.bids_9_price,
303 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
304 },
305 match i {
306 0 => record.bids_0_amount,
307 1 => record.bids_1_amount,
308 2 => record.bids_2_amount,
309 3 => record.bids_3_amount,
310 4 => record.bids_4_amount,
311 5 => record.bids_5_amount,
312 6 => record.bids_6_amount,
313 7 => record.bids_7_amount,
314 8 => record.bids_8_amount,
315 9 => record.bids_9_amount,
316 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
317 },
318 price_precision,
319 size_precision,
320 );
321 bids[i] = bid_order;
322 bid_counts[i] = bid_count;
323
324 let (ask_order, ask_count) = create_book_order(
326 OrderSide::Sell,
327 match i {
328 0 => record.asks_0_price,
329 1 => record.asks_1_price,
330 2 => record.asks_2_price,
331 3 => record.asks_3_price,
332 4 => record.asks_4_price,
333 5 => record.asks_5_price,
334 6 => record.asks_6_price,
335 7 => record.asks_7_price,
336 8 => record.asks_8_price,
337 9 => record.asks_9_price,
338 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
339 },
340 match i {
341 0 => record.asks_0_amount,
342 1 => record.asks_1_amount,
343 2 => record.asks_2_amount,
344 3 => record.asks_3_amount,
345 4 => record.asks_4_amount,
346 5 => record.asks_5_amount,
347 6 => record.asks_6_amount,
348 7 => record.asks_7_amount,
349 8 => record.asks_8_amount,
350 9 => record.asks_9_amount,
351 _ => panic!("Invalid level for snapshot25 -> depth10 parsing"),
352 },
353 price_precision,
354 size_precision,
355 );
356 asks[i] = ask_order;
357 ask_counts[i] = ask_count;
358 }
359
360 let depth = OrderBookDepth10::new(
361 instrument_id,
362 bids,
363 asks,
364 bid_counts,
365 ask_counts,
366 flags,
367 sequence,
368 ts_event,
369 ts_init,
370 );
371
372 depths.push(depth);
373
374 if let Some(limit) = limit {
375 if depths.len() >= limit {
376 break;
377 }
378 }
379 }
380
381 Ok(depths)
382}
383
384pub fn load_quote_ticks<P: AsRef<Path>>(
386 filepath: P,
387 price_precision: u8,
388 size_precision: u8,
389 instrument_id: Option<InstrumentId>,
390 limit: Option<usize>,
391) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
392 let mut csv_reader = create_csv_reader(filepath)?;
393 let mut quotes = Vec::new();
394
395 let mut raw_record = StringRecord::new();
396 while csv_reader.read_record(&mut raw_record)? {
397 let record: TardisQuoteRecord = raw_record.deserialize(None)?;
398
399 let instrument_id = match &instrument_id {
400 Some(id) => *id,
401 None => parse_instrument_id(&record.exchange, record.symbol),
402 };
403 let bid_price = Price::new(record.bid_price.unwrap_or(0.0), price_precision);
404 let bid_size = Quantity::new(record.bid_amount.unwrap_or(0.0), size_precision);
405 let ask_price = Price::new(record.ask_price.unwrap_or(0.0), price_precision);
406 let ask_size = Quantity::new(record.ask_amount.unwrap_or(0.0), size_precision);
407 let ts_event = parse_timestamp(record.timestamp);
408 let ts_init = parse_timestamp(record.local_timestamp);
409
410 let quote = QuoteTick::new(
411 instrument_id,
412 bid_price,
413 ask_price,
414 bid_size,
415 ask_size,
416 ts_event,
417 ts_init,
418 );
419
420 quotes.push(quote);
421
422 if let Some(limit) = limit {
423 if quotes.len() >= limit {
424 break;
425 }
426 }
427 }
428
429 Ok(quotes)
430}
431
432pub fn load_trade_ticks<P: AsRef<Path>>(
434 filepath: P,
435 price_precision: u8,
436 size_precision: u8,
437 instrument_id: Option<InstrumentId>,
438 limit: Option<usize>,
439) -> Result<Vec<TradeTick>, Box<dyn Error>> {
440 let mut csv_reader = create_csv_reader(filepath)?;
441 let mut trades = Vec::new();
442
443 let mut raw_record = StringRecord::new();
444 while csv_reader.read_record(&mut raw_record)? {
445 let record: TardisTradeRecord = raw_record.deserialize(None)?;
446
447 let instrument_id = match &instrument_id {
448 Some(id) => *id,
449 None => parse_instrument_id(&record.exchange, record.symbol),
450 };
451 let price = Price::new(record.price, price_precision);
452 let size = Quantity::new(record.amount, size_precision);
453 let aggressor_side = parse_aggressor_side(&record.side);
454 let trade_id = TradeId::new(&record.id);
455 let ts_event = parse_timestamp(record.timestamp);
456 let ts_init = parse_timestamp(record.local_timestamp);
457
458 let trade = TradeTick::new(
459 instrument_id,
460 price,
461 size,
462 aggressor_side,
463 trade_id,
464 ts_event,
465 ts_init,
466 );
467
468 trades.push(trade);
469
470 if let Some(limit) = limit {
471 if trades.len() >= limit {
472 break;
473 }
474 }
475 }
476
477 Ok(trades)
478}
479
480#[cfg(test)]
484mod tests {
485 use nautilus_model::{
486 enums::{AggressorSide, BookAction},
487 identifiers::InstrumentId,
488 };
489 use nautilus_test_kit::common::{
490 ensure_data_exists_tardis_binance_snapshot25, ensure_data_exists_tardis_binance_snapshot5,
491 ensure_data_exists_tardis_bitmex_trades, ensure_data_exists_tardis_deribit_book_l2,
492 ensure_data_exists_tardis_huobi_quotes,
493 };
494 use rstest::*;
495
496 use super::*;
497
498 #[rstest]
499 pub fn test_read_deltas() {
500 let filepath = ensure_data_exists_tardis_deribit_book_l2();
501 let deltas = load_deltas(filepath, 1, 0, None, Some(1_000)).unwrap();
502
503 assert_eq!(deltas.len(), 1_000);
504 assert_eq!(
505 deltas[0].instrument_id,
506 InstrumentId::from("BTC-PERPETUAL.DERIBIT")
507 );
508 assert_eq!(deltas[0].action, BookAction::Add);
509 assert_eq!(deltas[0].order.side, OrderSide::Sell);
510 assert_eq!(deltas[0].order.price, Price::from("6421.5"));
511 assert_eq!(deltas[0].order.size, Quantity::from("18640"));
512 assert_eq!(deltas[0].flags, 0);
513 assert_eq!(deltas[0].sequence, 0);
514 assert_eq!(deltas[0].ts_event, 1585699200245000000);
515 assert_eq!(deltas[0].ts_init, 1585699200355684000);
516 }
517
518 #[rstest]
519 pub fn test_read_depth10s_from_snapshot5() {
520 let filepath = ensure_data_exists_tardis_binance_snapshot5();
521 let depths = load_depth10_from_snapshot5(filepath, 1, 0, None, Some(100_000)).unwrap();
522
523 assert_eq!(depths.len(), 100_000);
524 assert_eq!(
525 depths[0].instrument_id,
526 InstrumentId::from("BTCUSDT.BINANCE")
527 );
528 assert_eq!(depths[0].bids.len(), 10);
529 assert_eq!(depths[0].bids[0].price, Price::from("11657.1"));
530 assert_eq!(depths[0].bids[0].size, Quantity::from("11"));
531 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
532 assert_eq!(depths[0].bids[0].order_id, 0);
533 assert_eq!(depths[0].asks.len(), 10);
534 assert_eq!(depths[0].asks[0].price, Price::from("11657.1"));
535 assert_eq!(depths[0].asks[0].size, Quantity::from("2"));
536 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
537 assert_eq!(depths[0].asks[0].order_id, 0);
538 assert_eq!(depths[0].bid_counts[0], 1);
539 assert_eq!(depths[0].ask_counts[0], 1);
540 assert_eq!(depths[0].flags, 128);
541 assert_eq!(depths[0].ts_event, 1598918403696000000);
542 assert_eq!(depths[0].ts_init, 1598918403810979000);
543 assert_eq!(depths[0].sequence, 0);
544 }
545
546 #[rstest]
547 pub fn test_read_depth10s_from_snapshot25() {
548 let filepath = ensure_data_exists_tardis_binance_snapshot25();
549 let depths = load_depth10_from_snapshot25(filepath, 1, 0, None, Some(100_000)).unwrap();
550
551 assert_eq!(depths.len(), 100_000);
552 assert_eq!(
553 depths[0].instrument_id,
554 InstrumentId::from("BTCUSDT.BINANCE")
555 );
556 assert_eq!(depths[0].bids.len(), 10);
557 assert_eq!(depths[0].bids[0].price, Price::from("11657.1"));
558 assert_eq!(depths[0].bids[0].size, Quantity::from("11"));
559 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
560 assert_eq!(depths[0].bids[0].order_id, 0);
561 assert_eq!(depths[0].asks.len(), 10);
562 assert_eq!(depths[0].asks[0].price, Price::from("11657.1"));
563 assert_eq!(depths[0].asks[0].size, Quantity::from("2"));
564 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
565 assert_eq!(depths[0].asks[0].order_id, 0);
566 assert_eq!(depths[0].bid_counts[0], 1);
567 assert_eq!(depths[0].ask_counts[0], 1);
568 assert_eq!(depths[0].flags, 128);
569 assert_eq!(depths[0].ts_event, 1598918403696000000);
570 assert_eq!(depths[0].ts_init, 1598918403810979000);
571 assert_eq!(depths[0].sequence, 0);
572 }
573
574 #[rstest]
575 pub fn test_read_quotes() {
576 let filepath = ensure_data_exists_tardis_huobi_quotes();
577 let quotes = load_quote_ticks(filepath, 1, 0, None, Some(100_000)).unwrap();
578
579 assert_eq!(quotes.len(), 100_000);
580 assert_eq!(quotes[0].instrument_id, InstrumentId::from("BTC-USD.HUOBI"));
581 assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
582 assert_eq!(quotes[0].bid_size, Quantity::from("806"));
583 assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
584 assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
585 assert_eq!(quotes[0].ts_event, 1588291201099000000);
586 assert_eq!(quotes[0].ts_init, 1588291201234268000);
587 }
588
589 #[rstest]
590 pub fn test_read_trades() {
591 let filepath = ensure_data_exists_tardis_bitmex_trades();
592 let trades = load_trade_ticks(filepath, 1, 0, None, Some(100_000)).unwrap();
593
594 assert_eq!(trades.len(), 100_000);
595 assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
596 assert_eq!(trades[0].price, Price::from("8531.5"));
597 assert_eq!(trades[0].size, Quantity::from("2152"));
598 assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
599 assert_eq!(
600 trades[0].trade_id,
601 TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
602 );
603 assert_eq!(trades[0].ts_event, 1583020803145000000);
604 assert_eq!(trades[0].ts_init, 1583020803307160000);
605 }
606}