1pub mod load;
17mod record;
18pub mod stream;
19
20use std::{
21 ffi::OsStr,
22 fs::File,
23 io::{BufReader, Read, Seek, SeekFrom},
24 path::Path,
25 time::Duration,
26};
27
28use csv::{Reader, ReaderBuilder};
29use flate2::read::GzDecoder;
30pub use load::{
31 load_deltas, load_depth10_from_snapshot5, load_depth10_from_snapshot25, load_funding_rates,
32 load_quotes, load_trades,
33};
34use nautilus_model::{
35 data::{BookOrder, FundingRateUpdate, NULL_ORDER, OrderBookDelta, QuoteTick, TradeTick},
36 enums::{BookAction, OrderSide},
37 identifiers::{InstrumentId, TradeId},
38 types::Quantity,
39};
40use rust_decimal::Decimal;
41pub use stream::{
42 stream_deltas, stream_depth10_from_snapshot5, stream_depth10_from_snapshot25,
43 stream_funding_rates, stream_quotes, stream_trades,
44};
45
46use super::{
47 csv::record::{
48 TardisBookUpdateRecord, TardisDerivativeTickerRecord, TardisQuoteRecord, TardisTradeRecord,
49 },
50 parse::{
51 parse_aggressor_side, parse_book_action, parse_instrument_id, parse_order_side,
52 parse_timestamp,
53 },
54};
55use crate::parse::parse_price;
56
57fn infer_precision(value: f64) -> u8 {
58 let mut buf = ryu::Buffer::new(); let s = buf.format(value);
60
61 match s.rsplit_once('.') {
62 Some((_, frac)) if frac != "0" => frac.len() as u8,
63 _ => 0,
64 }
65}
66
67fn create_csv_reader<P: AsRef<Path>>(
68 filepath: P,
69) -> anyhow::Result<Reader<Box<dyn std::io::Read>>> {
70 let filepath_ref = filepath.as_ref();
71 const MAX_RETRIES: u8 = 3;
72 const DELAY_MS: u64 = 100;
73 const BUFFER_SIZE: usize = 8 * 1024 * 1024; fn open_file_with_retry<P: AsRef<Path>>(
76 path: P,
77 max_retries: u8,
78 delay_ms: u64,
79 ) -> anyhow::Result<File> {
80 let path_ref = path.as_ref();
81 for attempt in 1..=max_retries {
82 match File::open(path_ref) {
83 Ok(file) => return Ok(file),
84 Err(e) => {
85 if attempt == max_retries {
86 anyhow::bail!(
87 "Failed to open file '{path_ref:?}' after {max_retries} attempts: {e}"
88 );
89 }
90 eprintln!(
91 "Attempt {attempt}/{max_retries} failed to open file '{path_ref:?}': {e}. Retrying after {delay_ms}ms..."
92 );
93 std::thread::sleep(Duration::from_millis(delay_ms));
94 }
95 }
96 }
97 unreachable!("Loop should return either Ok or Err");
98 }
99
100 let mut file = open_file_with_retry(filepath_ref, MAX_RETRIES, DELAY_MS)?;
101
102 let is_gzipped = filepath_ref
103 .extension()
104 .and_then(OsStr::to_str)
105 .is_some_and(|ext| ext.eq_ignore_ascii_case("gz"));
106
107 if !is_gzipped {
108 let buf_reader = BufReader::with_capacity(BUFFER_SIZE, file);
109 return Ok(ReaderBuilder::new()
110 .has_headers(true)
111 .buffer_capacity(1024 * 1024) .from_reader(Box::new(buf_reader)));
113 }
114
115 let file_size = file.metadata()?.len();
116 if file_size < 2 {
117 anyhow::bail!("File too small to be a valid gzip file");
118 }
119
120 let mut header_buf = [0u8; 2];
121 for attempt in 1..=MAX_RETRIES {
122 match file.read_exact(&mut header_buf) {
123 Ok(()) => break,
124 Err(e) => {
125 if attempt == MAX_RETRIES {
126 anyhow::bail!(
127 "Failed to read gzip header from '{filepath_ref:?}' after {MAX_RETRIES} attempts: {e}"
128 );
129 }
130 eprintln!(
131 "Attempt {attempt}/{MAX_RETRIES} failed to read header from '{filepath_ref:?}': {e}. Retrying after {DELAY_MS}ms..."
132 );
133 std::thread::sleep(Duration::from_millis(DELAY_MS));
134 }
135 }
136 }
137
138 if header_buf[0] != 0x1f || header_buf[1] != 0x8b {
139 anyhow::bail!("File '{filepath_ref:?}' has .gz extension but invalid gzip header");
140 }
141
142 for attempt in 1..=MAX_RETRIES {
143 match file.seek(SeekFrom::Start(0)) {
144 Ok(_) => break,
145 Err(e) => {
146 if attempt == MAX_RETRIES {
147 anyhow::bail!(
148 "Failed to reset file position for '{filepath_ref:?}' after {MAX_RETRIES} attempts: {e}"
149 );
150 }
151 eprintln!(
152 "Attempt {attempt}/{MAX_RETRIES} failed to seek in '{filepath_ref:?}': {e}. Retrying after {DELAY_MS}ms..."
153 );
154 std::thread::sleep(Duration::from_millis(DELAY_MS));
155 }
156 }
157 }
158
159 let buf_reader = BufReader::with_capacity(BUFFER_SIZE, file);
160 let decoder = GzDecoder::new(buf_reader);
161
162 Ok(ReaderBuilder::new()
163 .has_headers(true)
164 .buffer_capacity(1024 * 1024) .from_reader(Box::new(decoder)))
166}
167
168fn create_book_order(
169 side: OrderSide,
170 price: Option<f64>,
171 amount: Option<f64>,
172 price_precision: u8,
173 size_precision: u8,
174) -> (BookOrder, u32) {
175 match price {
176 Some(price) => (
177 BookOrder::new(
178 side,
179 parse_price(price, price_precision),
180 Quantity::new(amount.unwrap_or(0.0), size_precision),
181 0,
182 ),
183 1, ),
185 None => (NULL_ORDER, 0), }
187}
188
189fn parse_delta_record(
190 data: &TardisBookUpdateRecord,
191 price_precision: u8,
192 size_precision: u8,
193 instrument_id: Option<InstrumentId>,
194) -> OrderBookDelta {
195 let instrument_id = match instrument_id {
196 Some(id) => id,
197 None => parse_instrument_id(&data.exchange, data.symbol),
198 };
199
200 let side = parse_order_side(&data.side);
201 let price = parse_price(data.price, price_precision);
202 let size = Quantity::new(data.amount, size_precision);
203 let order_id = 0; let order = BookOrder::new(side, price, size, order_id);
205
206 let action = parse_book_action(data.is_snapshot, size.as_f64());
207 let flags = 0; let sequence = 0; let ts_event = parse_timestamp(data.timestamp);
210 let ts_init = parse_timestamp(data.local_timestamp);
211
212 assert!(
213 !(action != BookAction::Delete && size.is_zero()),
214 "Invalid delta: action {action} when size zero, check size_precision ({size_precision}) vs data; {data:?}"
215 );
216
217 OrderBookDelta::new(
218 instrument_id,
219 action,
220 order,
221 flags,
222 sequence,
223 ts_event,
224 ts_init,
225 )
226}
227
228fn parse_quote_record(
229 data: &TardisQuoteRecord,
230 price_precision: u8,
231 size_precision: u8,
232 instrument_id: Option<InstrumentId>,
233) -> QuoteTick {
234 let instrument_id = match instrument_id {
235 Some(id) => id,
236 None => parse_instrument_id(&data.exchange, data.symbol),
237 };
238
239 let bid_price = parse_price(data.bid_price.unwrap_or(0.0), price_precision);
240 let ask_price = parse_price(data.ask_price.unwrap_or(0.0), price_precision);
241 let bid_size = Quantity::new(data.bid_amount.unwrap_or(0.0), size_precision);
242 let ask_size = Quantity::new(data.ask_amount.unwrap_or(0.0), size_precision);
243 let ts_event = parse_timestamp(data.timestamp);
244 let ts_init = parse_timestamp(data.local_timestamp);
245
246 QuoteTick::new(
247 instrument_id,
248 bid_price,
249 ask_price,
250 bid_size,
251 ask_size,
252 ts_event,
253 ts_init,
254 )
255}
256
257fn parse_trade_record(
258 data: &TardisTradeRecord,
259 size: Quantity,
260 price_precision: u8,
261 instrument_id: Option<InstrumentId>,
262) -> TradeTick {
263 let instrument_id = match instrument_id {
264 Some(id) => id,
265 None => parse_instrument_id(&data.exchange, data.symbol),
266 };
267
268 let price = parse_price(data.price, price_precision);
269 let aggressor_side = parse_aggressor_side(&data.side);
270 let trade_id = TradeId::new(&data.id);
271 let ts_event = parse_timestamp(data.timestamp);
272 let ts_init = parse_timestamp(data.local_timestamp);
273
274 TradeTick::new(
275 instrument_id,
276 price,
277 size,
278 aggressor_side,
279 trade_id,
280 ts_event,
281 ts_init,
282 )
283}
284
285fn parse_derivative_ticker_record(
286 data: &TardisDerivativeTickerRecord,
287 instrument_id: Option<InstrumentId>,
288) -> Option<FundingRateUpdate> {
289 let funding_rate = data.funding_rate?;
291
292 let instrument_id = match instrument_id {
293 Some(id) => id,
294 None => parse_instrument_id(&data.exchange, data.symbol),
295 };
296
297 let rate = Decimal::try_from(funding_rate).ok()?.normalize();
298 let next_funding_ns = if data.predicted_funding_rate.is_some() {
299 data.funding_timestamp.map(parse_timestamp)
300 } else {
301 None
302 };
303 let ts_event = parse_timestamp(data.timestamp);
304 let ts_init = parse_timestamp(data.local_timestamp);
305
306 Some(FundingRateUpdate::new(
307 instrument_id,
308 rate,
309 next_funding_ns,
310 ts_event,
311 ts_init,
312 ))
313}