nautilus_tardis/csv/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16pub mod load;
17mod record;
18pub mod stream;
19
20use std::{
21    ffi::OsStr,
22    fs::File,
23    io::{BufReader, Read, Seek, SeekFrom},
24    path::Path,
25    time::Duration,
26};
27
28use csv::{Reader, ReaderBuilder};
29use flate2::read::GzDecoder;
30pub use load::{
31    load_deltas, load_depth10_from_snapshot5, load_depth10_from_snapshot25, load_funding_rates,
32    load_quotes, load_trades,
33};
34use nautilus_model::{
35    data::{BookOrder, FundingRateUpdate, NULL_ORDER, OrderBookDelta, QuoteTick, TradeTick},
36    enums::{BookAction, OrderSide},
37    identifiers::{InstrumentId, TradeId},
38    types::Quantity,
39};
40use rust_decimal::Decimal;
41pub use stream::{
42    stream_deltas, stream_depth10_from_snapshot5, stream_depth10_from_snapshot25,
43    stream_funding_rates, stream_quotes, stream_trades,
44};
45
46use super::{
47    csv::record::{
48        TardisBookUpdateRecord, TardisDerivativeTickerRecord, TardisQuoteRecord, TardisTradeRecord,
49    },
50    parse::{
51        parse_aggressor_side, parse_book_action, parse_instrument_id, parse_order_side,
52        parse_timestamp,
53    },
54};
55use crate::parse::parse_price;
56
57fn infer_precision(value: f64) -> u8 {
58    let mut buf = ryu::Buffer::new(); // Stack allocation
59    let s = buf.format(value);
60
61    match s.rsplit_once('.') {
62        Some((_, frac)) if frac != "0" => frac.len() as u8,
63        _ => 0,
64    }
65}
66
67fn create_csv_reader<P: AsRef<Path>>(
68    filepath: P,
69) -> anyhow::Result<Reader<Box<dyn std::io::Read>>> {
70    let filepath_ref = filepath.as_ref();
71    const MAX_RETRIES: u8 = 3;
72    const DELAY_MS: u64 = 100;
73    const BUFFER_SIZE: usize = 8 * 1024 * 1024; // 8MB buffer for large files
74
75    fn open_file_with_retry<P: AsRef<Path>>(
76        path: P,
77        max_retries: u8,
78        delay_ms: u64,
79    ) -> anyhow::Result<File> {
80        let path_ref = path.as_ref();
81        for attempt in 1..=max_retries {
82            match File::open(path_ref) {
83                Ok(file) => return Ok(file),
84                Err(e) => {
85                    if attempt == max_retries {
86                        anyhow::bail!(
87                            "Failed to open file '{path_ref:?}' after {max_retries} attempts: {e}"
88                        );
89                    }
90                    eprintln!(
91                        "Attempt {attempt}/{max_retries} failed to open file '{path_ref:?}': {e}. Retrying after {delay_ms}ms..."
92                    );
93                    std::thread::sleep(Duration::from_millis(delay_ms));
94                }
95            }
96        }
97        unreachable!("Loop should return either Ok or Err");
98    }
99
100    let mut file = open_file_with_retry(filepath_ref, MAX_RETRIES, DELAY_MS)?;
101
102    let is_gzipped = filepath_ref
103        .extension()
104        .and_then(OsStr::to_str)
105        .is_some_and(|ext| ext.eq_ignore_ascii_case("gz"));
106
107    if !is_gzipped {
108        let buf_reader = BufReader::with_capacity(BUFFER_SIZE, file);
109        return Ok(ReaderBuilder::new()
110            .has_headers(true)
111            .buffer_capacity(1024 * 1024) // 1MB CSV buffer
112            .from_reader(Box::new(buf_reader)));
113    }
114
115    let file_size = file.metadata()?.len();
116    if file_size < 2 {
117        anyhow::bail!("File too small to be a valid gzip file");
118    }
119
120    let mut header_buf = [0u8; 2];
121    for attempt in 1..=MAX_RETRIES {
122        match file.read_exact(&mut header_buf) {
123            Ok(()) => break,
124            Err(e) => {
125                if attempt == MAX_RETRIES {
126                    anyhow::bail!(
127                        "Failed to read gzip header from '{filepath_ref:?}' after {MAX_RETRIES} attempts: {e}"
128                    );
129                }
130                eprintln!(
131                    "Attempt {attempt}/{MAX_RETRIES} failed to read header from '{filepath_ref:?}': {e}. Retrying after {DELAY_MS}ms..."
132                );
133                std::thread::sleep(Duration::from_millis(DELAY_MS));
134            }
135        }
136    }
137
138    if header_buf[0] != 0x1f || header_buf[1] != 0x8b {
139        anyhow::bail!("File '{filepath_ref:?}' has .gz extension but invalid gzip header");
140    }
141
142    for attempt in 1..=MAX_RETRIES {
143        match file.seek(SeekFrom::Start(0)) {
144            Ok(_) => break,
145            Err(e) => {
146                if attempt == MAX_RETRIES {
147                    anyhow::bail!(
148                        "Failed to reset file position for '{filepath_ref:?}' after {MAX_RETRIES} attempts: {e}"
149                    );
150                }
151                eprintln!(
152                    "Attempt {attempt}/{MAX_RETRIES} failed to seek in '{filepath_ref:?}': {e}. Retrying after {DELAY_MS}ms..."
153                );
154                std::thread::sleep(Duration::from_millis(DELAY_MS));
155            }
156        }
157    }
158
159    let buf_reader = BufReader::with_capacity(BUFFER_SIZE, file);
160    let decoder = GzDecoder::new(buf_reader);
161
162    Ok(ReaderBuilder::new()
163        .has_headers(true)
164        .buffer_capacity(1024 * 1024) // 1MB CSV buffer
165        .from_reader(Box::new(decoder)))
166}
167
168fn create_book_order(
169    side: OrderSide,
170    price: Option<f64>,
171    amount: Option<f64>,
172    price_precision: u8,
173    size_precision: u8,
174) -> (BookOrder, u32) {
175    match price {
176        Some(price) => (
177            BookOrder::new(
178                side,
179                parse_price(price, price_precision),
180                Quantity::new(amount.unwrap_or(0.0), size_precision),
181                0,
182            ),
183            1, // Count set to 1 if order exists
184        ),
185        None => (NULL_ORDER, 0), // NULL_ORDER if price is None
186    }
187}
188
189fn parse_delta_record(
190    data: &TardisBookUpdateRecord,
191    price_precision: u8,
192    size_precision: u8,
193    instrument_id: Option<InstrumentId>,
194) -> OrderBookDelta {
195    let instrument_id = match instrument_id {
196        Some(id) => id,
197        None => parse_instrument_id(&data.exchange, data.symbol),
198    };
199
200    let side = parse_order_side(&data.side);
201    let price = parse_price(data.price, price_precision);
202    let size = Quantity::new(data.amount, size_precision);
203    let order_id = 0; // Not applicable for L2 data
204    let order = BookOrder::new(side, price, size, order_id);
205
206    let action = parse_book_action(data.is_snapshot, size.as_f64());
207    let flags = 0; // Will be set later if needed
208    let sequence = 0; // Sequence not available
209    let ts_event = parse_timestamp(data.timestamp);
210    let ts_init = parse_timestamp(data.local_timestamp);
211
212    assert!(
213        !(action != BookAction::Delete && size.is_zero()),
214        "Invalid delta: action {action} when size zero, check size_precision ({size_precision}) vs data; {data:?}"
215    );
216
217    OrderBookDelta::new(
218        instrument_id,
219        action,
220        order,
221        flags,
222        sequence,
223        ts_event,
224        ts_init,
225    )
226}
227
228fn parse_quote_record(
229    data: &TardisQuoteRecord,
230    price_precision: u8,
231    size_precision: u8,
232    instrument_id: Option<InstrumentId>,
233) -> QuoteTick {
234    let instrument_id = match instrument_id {
235        Some(id) => id,
236        None => parse_instrument_id(&data.exchange, data.symbol),
237    };
238
239    let bid_price = parse_price(data.bid_price.unwrap_or(0.0), price_precision);
240    let ask_price = parse_price(data.ask_price.unwrap_or(0.0), price_precision);
241    let bid_size = Quantity::new(data.bid_amount.unwrap_or(0.0), size_precision);
242    let ask_size = Quantity::new(data.ask_amount.unwrap_or(0.0), size_precision);
243    let ts_event = parse_timestamp(data.timestamp);
244    let ts_init = parse_timestamp(data.local_timestamp);
245
246    QuoteTick::new(
247        instrument_id,
248        bid_price,
249        ask_price,
250        bid_size,
251        ask_size,
252        ts_event,
253        ts_init,
254    )
255}
256
257fn parse_trade_record(
258    data: &TardisTradeRecord,
259    size: Quantity,
260    price_precision: u8,
261    instrument_id: Option<InstrumentId>,
262) -> TradeTick {
263    let instrument_id = match instrument_id {
264        Some(id) => id,
265        None => parse_instrument_id(&data.exchange, data.symbol),
266    };
267
268    let price = parse_price(data.price, price_precision);
269    let aggressor_side = parse_aggressor_side(&data.side);
270    let trade_id = TradeId::new(&data.id);
271    let ts_event = parse_timestamp(data.timestamp);
272    let ts_init = parse_timestamp(data.local_timestamp);
273
274    TradeTick::new(
275        instrument_id,
276        price,
277        size,
278        aggressor_side,
279        trade_id,
280        ts_event,
281        ts_init,
282    )
283}
284
285fn parse_derivative_ticker_record(
286    data: &TardisDerivativeTickerRecord,
287    instrument_id: Option<InstrumentId>,
288) -> Option<FundingRateUpdate> {
289    // Only create funding rate update if we have funding rate data
290    let funding_rate = data.funding_rate?;
291
292    let instrument_id = match instrument_id {
293        Some(id) => id,
294        None => parse_instrument_id(&data.exchange, data.symbol),
295    };
296
297    let rate = Decimal::try_from(funding_rate).ok()?.normalize();
298    let next_funding_ns = if data.predicted_funding_rate.is_some() {
299        data.funding_timestamp.map(parse_timestamp)
300    } else {
301        None
302    };
303    let ts_event = parse_timestamp(data.timestamp);
304    let ts_init = parse_timestamp(data.local_timestamp);
305
306    Some(FundingRateUpdate::new(
307        instrument_id,
308        rate,
309        next_funding_ns,
310        ts_event,
311        ts_init,
312    ))
313}