nautilus_tardis/python/
csv.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{fmt::Debug, path::PathBuf};
17
18use nautilus_core::python::to_pyvalue_err;
19use nautilus_model::{
20    data::{FundingRateUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
21    identifiers::InstrumentId,
22};
23use pyo3::prelude::*;
24
25use crate::csv::{
26    load::{
27        load_deltas, load_depth10_from_snapshot5, load_depth10_from_snapshot25, load_funding_rates,
28        load_quotes, load_trades,
29    },
30    stream::{
31        stream_batched_deltas, stream_deltas, stream_depth10_from_snapshot5,
32        stream_depth10_from_snapshot25, stream_funding_rates, stream_quotes, stream_trades,
33    },
34};
35
36macro_rules! impl_tardis_stream_iterator {
37    ($struct_name:ident, $data_type:ty, $type_name:expr) => {
38        #[pyclass(unsendable)]
39        pub struct $struct_name {
40            stream: Box<dyn Iterator<Item = anyhow::Result<Vec<$data_type>>>>,
41        }
42
43        impl Debug for $struct_name {
44            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45                write!(f, "{} {{ stream: ... }}", $type_name)
46            }
47        }
48
49        #[pymethods]
50        impl $struct_name {
51            const fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
52                slf
53            }
54
55            fn __next__(&mut self) -> PyResult<Option<Vec<$data_type>>> {
56                match self.stream.next() {
57                    Some(Ok(chunk)) => Ok(Some(chunk)),
58                    Some(Err(e)) => Err(to_pyvalue_err(e)),
59                    None => Ok(None),
60                }
61            }
62        }
63    };
64}
65
66/// # Errors
67///
68/// Returns a Python error if loading or parsing the CSV file fails.
69#[pyfunction(name = "load_tardis_deltas")]
70#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
71pub fn py_load_tardis_deltas(
72    filepath: PathBuf,
73    price_precision: Option<u8>,
74    size_precision: Option<u8>,
75    instrument_id: Option<InstrumentId>,
76    limit: Option<usize>,
77) -> PyResult<Vec<OrderBookDelta>> {
78    load_deltas(
79        filepath,
80        price_precision,
81        size_precision,
82        instrument_id,
83        limit,
84    )
85    .map_err(to_pyvalue_err)
86}
87
88/// # Errors
89///
90/// Returns a Python error if loading or parsing the CSV file fails.
91#[pyfunction(name = "load_tardis_depth10_from_snapshot5")]
92#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
93pub fn py_load_tardis_depth10_from_snapshot5(
94    filepath: PathBuf,
95    price_precision: Option<u8>,
96    size_precision: Option<u8>,
97    instrument_id: Option<InstrumentId>,
98    limit: Option<usize>,
99) -> PyResult<Vec<OrderBookDepth10>> {
100    load_depth10_from_snapshot5(
101        filepath,
102        price_precision,
103        size_precision,
104        instrument_id,
105        limit,
106    )
107    .map_err(to_pyvalue_err)
108}
109
110/// # Errors
111///
112/// Returns a Python error if loading or parsing the CSV file fails.
113#[pyfunction(name = "load_tardis_depth10_from_snapshot25")]
114#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
115pub fn py_load_tardis_depth10_from_snapshot25(
116    filepath: PathBuf,
117    price_precision: Option<u8>,
118    size_precision: Option<u8>,
119    instrument_id: Option<InstrumentId>,
120    limit: Option<usize>,
121) -> PyResult<Vec<OrderBookDepth10>> {
122    load_depth10_from_snapshot25(
123        filepath,
124        price_precision,
125        size_precision,
126        instrument_id,
127        limit,
128    )
129    .map_err(to_pyvalue_err)
130}
131
132/// # Errors
133///
134/// Returns a Python error if loading or parsing the CSV file fails.
135#[pyfunction(name = "load_tardis_quotes")]
136#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
137pub fn py_load_tardis_quotes(
138    filepath: PathBuf,
139    price_precision: Option<u8>,
140    size_precision: Option<u8>,
141    instrument_id: Option<InstrumentId>,
142    limit: Option<usize>,
143) -> PyResult<Vec<QuoteTick>> {
144    load_quotes(
145        filepath,
146        price_precision,
147        size_precision,
148        instrument_id,
149        limit,
150    )
151    .map_err(to_pyvalue_err)
152}
153
154/// # Errors
155///
156/// Returns a Python error if loading or parsing the CSV file fails.
157#[pyfunction(name = "load_tardis_trades")]
158#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
159pub fn py_load_tardis_trades(
160    filepath: PathBuf,
161    price_precision: Option<u8>,
162    size_precision: Option<u8>,
163    instrument_id: Option<InstrumentId>,
164    limit: Option<usize>,
165) -> PyResult<Vec<TradeTick>> {
166    load_trades(
167        filepath,
168        price_precision,
169        size_precision,
170        instrument_id,
171        limit,
172    )
173    .map_err(to_pyvalue_err)
174}
175
176/// # Errors
177///
178/// Returns a Python error if loading or parsing the CSV file fails.
179#[pyfunction(name = "load_tardis_funding_rates")]
180#[pyo3(signature = (filepath, instrument_id=None, limit=None))]
181pub fn py_load_tardis_funding_rates(
182    filepath: PathBuf,
183    instrument_id: Option<InstrumentId>,
184    limit: Option<usize>,
185) -> PyResult<Vec<FundingRateUpdate>> {
186    load_funding_rates(filepath, instrument_id, limit).map_err(to_pyvalue_err)
187}
188
189impl_tardis_stream_iterator!(
190    TardisDeltaStreamIterator,
191    OrderBookDelta,
192    "TardisDeltasStreamIterator"
193);
194
195/// Streams order book deltas from a Tardis CSV file.
196///
197/// # Errors
198///
199/// Returns a Python error if loading or parsing the CSV file fails.
200#[pyfunction(name = "stream_tardis_deltas")]
201#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
202pub fn py_stream_tardis_deltas(
203    filepath: PathBuf,
204    chunk_size: usize,
205    price_precision: Option<u8>,
206    size_precision: Option<u8>,
207    instrument_id: Option<InstrumentId>,
208    limit: Option<usize>,
209) -> PyResult<TardisDeltaStreamIterator> {
210    let stream = stream_deltas(
211        filepath,
212        chunk_size,
213        price_precision,
214        size_precision,
215        instrument_id,
216        limit,
217    )
218    .map_err(to_pyvalue_err)?;
219
220    Ok(TardisDeltaStreamIterator {
221        stream: Box::new(stream),
222    })
223}
224
225#[pyclass(unsendable)]
226pub struct TardisBatchedDeltasStreamIterator {
227    stream: Box<dyn Iterator<Item = anyhow::Result<Vec<PyObject>>>>,
228}
229
230impl Debug for TardisBatchedDeltasStreamIterator {
231    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232        write!(f, "TardisBatchedDeltasStreamIterator {{ stream: ... }}")
233    }
234}
235
236#[pymethods]
237impl TardisBatchedDeltasStreamIterator {
238    const fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
239        slf
240    }
241
242    fn __next__(&mut self) -> PyResult<Option<Vec<PyObject>>> {
243        match self.stream.next() {
244            Some(Ok(batch)) => Ok(Some(batch)),
245            Some(Err(e)) => Err(to_pyvalue_err(e)),
246            None => Ok(None),
247        }
248    }
249}
250
251/// Streams batched order book deltas from a Tardis CSV file.
252///
253/// # Errors
254///
255/// Returns a Python error if loading or parsing the CSV file fails.
256#[pyfunction(name = "stream_tardis_batched_deltas")]
257#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
258pub fn py_stream_tardis_batched_deltas(
259    filepath: PathBuf,
260    chunk_size: usize,
261    price_precision: Option<u8>,
262    size_precision: Option<u8>,
263    instrument_id: Option<InstrumentId>,
264    limit: Option<usize>,
265) -> PyResult<TardisBatchedDeltasStreamIterator> {
266    let stream = stream_batched_deltas(
267        filepath,
268        chunk_size,
269        price_precision,
270        size_precision,
271        instrument_id,
272        limit,
273    )
274    .map_err(to_pyvalue_err)?;
275
276    Ok(TardisBatchedDeltasStreamIterator {
277        stream: Box::new(stream),
278    })
279}
280
281impl_tardis_stream_iterator!(
282    TardisQuoteStreamIterator,
283    QuoteTick,
284    "TardisQuoteStreamIterator"
285);
286
287/// Streams quote ticks from a Tardis CSV file.
288///
289/// # Errors
290///
291/// Returns a Python error if loading or parsing the CSV file fails.
292#[pyfunction(name = "stream_tardis_quotes")]
293#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
294pub fn py_stream_tardis_quotes(
295    filepath: PathBuf,
296    chunk_size: usize,
297    price_precision: Option<u8>,
298    size_precision: Option<u8>,
299    instrument_id: Option<InstrumentId>,
300    limit: Option<usize>,
301) -> PyResult<TardisQuoteStreamIterator> {
302    let stream = stream_quotes(
303        filepath,
304        chunk_size,
305        price_precision,
306        size_precision,
307        instrument_id,
308        limit,
309    )
310    .map_err(to_pyvalue_err)?;
311
312    Ok(TardisQuoteStreamIterator {
313        stream: Box::new(stream),
314    })
315}
316
317impl_tardis_stream_iterator!(
318    TardisTradeStreamIterator,
319    TradeTick,
320    "TardisTradeStreamIterator"
321);
322
323/// Streams trade ticks from a Tardis CSV file.
324///
325/// # Errors
326///
327/// Returns a Python error if loading or parsing the CSV file fails.
328#[pyfunction(name = "stream_tardis_trades")]
329#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
330pub fn py_stream_tardis_trades(
331    filepath: PathBuf,
332    chunk_size: usize,
333    price_precision: Option<u8>,
334    size_precision: Option<u8>,
335    instrument_id: Option<InstrumentId>,
336    limit: Option<usize>,
337) -> PyResult<TardisTradeStreamIterator> {
338    let stream = stream_trades(
339        filepath,
340        chunk_size,
341        price_precision,
342        size_precision,
343        instrument_id,
344        limit,
345    )
346    .map_err(to_pyvalue_err)?;
347
348    Ok(TardisTradeStreamIterator {
349        stream: Box::new(stream),
350    })
351}
352
353impl_tardis_stream_iterator!(
354    TardisDepth10StreamIterator,
355    OrderBookDepth10,
356    "TardisDepth10StreamIterator"
357);
358
359/// Streams order book depth10 from a Tardis snapshot5 CSV file.
360///
361/// # Errors
362///
363/// Returns a Python error if loading or parsing the CSV file fails.
364#[pyfunction(name = "stream_tardis_depth10_from_snapshot5")]
365#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
366pub fn py_stream_tardis_depth10_from_snapshot5(
367    filepath: PathBuf,
368    chunk_size: usize,
369    price_precision: Option<u8>,
370    size_precision: Option<u8>,
371    instrument_id: Option<InstrumentId>,
372    limit: Option<usize>,
373) -> PyResult<TardisDepth10StreamIterator> {
374    let stream = stream_depth10_from_snapshot5(
375        filepath,
376        chunk_size,
377        price_precision,
378        size_precision,
379        instrument_id,
380        limit,
381    )
382    .map_err(to_pyvalue_err)?;
383
384    Ok(TardisDepth10StreamIterator {
385        stream: Box::new(stream),
386    })
387}
388
389/// Streams order book depth10 from a Tardis snapshot25 CSV file.
390///
391/// # Errors
392///
393/// Returns a Python error if loading or parsing the CSV file fails.
394#[pyfunction(name = "stream_tardis_depth10_from_snapshot25")]
395#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
396pub fn py_stream_tardis_depth10_from_snapshot25(
397    filepath: PathBuf,
398    chunk_size: usize,
399    price_precision: Option<u8>,
400    size_precision: Option<u8>,
401    instrument_id: Option<InstrumentId>,
402    limit: Option<usize>,
403) -> PyResult<TardisDepth10StreamIterator> {
404    let stream = stream_depth10_from_snapshot25(
405        filepath,
406        chunk_size,
407        price_precision,
408        size_precision,
409        instrument_id,
410        limit,
411    )
412    .map_err(to_pyvalue_err)?;
413
414    Ok(TardisDepth10StreamIterator {
415        stream: Box::new(stream),
416    })
417}
418
419impl_tardis_stream_iterator!(
420    TardisFundingRateStreamIterator,
421    FundingRateUpdate,
422    "TardisFundingRateStreamIterator"
423);
424
425/// Streams funding rate updates from a Tardis derivative ticker CSV file.
426///
427/// # Errors
428///
429/// Returns a Python error if loading or parsing the CSV file fails.
430#[pyfunction(name = "stream_tardis_funding_rates")]
431#[pyo3(signature = (filepath, chunk_size=100_000, instrument_id=None, limit=None))]
432pub fn py_stream_tardis_funding_rates(
433    filepath: PathBuf,
434    chunk_size: usize,
435    instrument_id: Option<InstrumentId>,
436    limit: Option<usize>,
437) -> PyResult<TardisFundingRateStreamIterator> {
438    let stream =
439        stream_funding_rates(filepath, chunk_size, instrument_id, limit).map_err(to_pyvalue_err)?;
440
441    Ok(TardisFundingRateStreamIterator {
442        stream: Box::new(stream),
443    })
444}