1use std::{fmt::Debug, path::PathBuf};
17
18use nautilus_core::python::to_pyvalue_err;
19use nautilus_model::{
20 data::{FundingRateUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
21 identifiers::InstrumentId,
22};
23use pyo3::prelude::*;
24
25use crate::csv::{
26 load::{
27 load_deltas, load_depth10_from_snapshot5, load_depth10_from_snapshot25, load_funding_rates,
28 load_quotes, load_trades,
29 },
30 stream::{
31 stream_batched_deltas, stream_deltas, stream_depth10_from_snapshot5,
32 stream_depth10_from_snapshot25, stream_funding_rates, stream_quotes, stream_trades,
33 },
34};
35
36macro_rules! impl_tardis_stream_iterator {
37 ($struct_name:ident, $data_type:ty, $type_name:expr) => {
38 #[pyclass(unsendable)]
39 pub struct $struct_name {
40 stream: Box<dyn Iterator<Item = anyhow::Result<Vec<$data_type>>>>,
41 }
42
43 impl Debug for $struct_name {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 write!(f, "{} {{ stream: ... }}", $type_name)
46 }
47 }
48
49 #[pymethods]
50 impl $struct_name {
51 const fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
52 slf
53 }
54
55 fn __next__(&mut self) -> PyResult<Option<Vec<$data_type>>> {
56 match self.stream.next() {
57 Some(Ok(chunk)) => Ok(Some(chunk)),
58 Some(Err(e)) => Err(to_pyvalue_err(e)),
59 None => Ok(None),
60 }
61 }
62 }
63 };
64}
65
66#[pyfunction(name = "load_tardis_deltas")]
70#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
71pub fn py_load_tardis_deltas(
72 filepath: PathBuf,
73 price_precision: Option<u8>,
74 size_precision: Option<u8>,
75 instrument_id: Option<InstrumentId>,
76 limit: Option<usize>,
77) -> PyResult<Vec<OrderBookDelta>> {
78 load_deltas(
79 filepath,
80 price_precision,
81 size_precision,
82 instrument_id,
83 limit,
84 )
85 .map_err(to_pyvalue_err)
86}
87
88#[pyfunction(name = "load_tardis_depth10_from_snapshot5")]
92#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
93pub fn py_load_tardis_depth10_from_snapshot5(
94 filepath: PathBuf,
95 price_precision: Option<u8>,
96 size_precision: Option<u8>,
97 instrument_id: Option<InstrumentId>,
98 limit: Option<usize>,
99) -> PyResult<Vec<OrderBookDepth10>> {
100 load_depth10_from_snapshot5(
101 filepath,
102 price_precision,
103 size_precision,
104 instrument_id,
105 limit,
106 )
107 .map_err(to_pyvalue_err)
108}
109
110#[pyfunction(name = "load_tardis_depth10_from_snapshot25")]
114#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
115pub fn py_load_tardis_depth10_from_snapshot25(
116 filepath: PathBuf,
117 price_precision: Option<u8>,
118 size_precision: Option<u8>,
119 instrument_id: Option<InstrumentId>,
120 limit: Option<usize>,
121) -> PyResult<Vec<OrderBookDepth10>> {
122 load_depth10_from_snapshot25(
123 filepath,
124 price_precision,
125 size_precision,
126 instrument_id,
127 limit,
128 )
129 .map_err(to_pyvalue_err)
130}
131
132#[pyfunction(name = "load_tardis_quotes")]
136#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
137pub fn py_load_tardis_quotes(
138 filepath: PathBuf,
139 price_precision: Option<u8>,
140 size_precision: Option<u8>,
141 instrument_id: Option<InstrumentId>,
142 limit: Option<usize>,
143) -> PyResult<Vec<QuoteTick>> {
144 load_quotes(
145 filepath,
146 price_precision,
147 size_precision,
148 instrument_id,
149 limit,
150 )
151 .map_err(to_pyvalue_err)
152}
153
154#[pyfunction(name = "load_tardis_trades")]
158#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
159pub fn py_load_tardis_trades(
160 filepath: PathBuf,
161 price_precision: Option<u8>,
162 size_precision: Option<u8>,
163 instrument_id: Option<InstrumentId>,
164 limit: Option<usize>,
165) -> PyResult<Vec<TradeTick>> {
166 load_trades(
167 filepath,
168 price_precision,
169 size_precision,
170 instrument_id,
171 limit,
172 )
173 .map_err(to_pyvalue_err)
174}
175
176#[pyfunction(name = "load_tardis_funding_rates")]
180#[pyo3(signature = (filepath, instrument_id=None, limit=None))]
181pub fn py_load_tardis_funding_rates(
182 filepath: PathBuf,
183 instrument_id: Option<InstrumentId>,
184 limit: Option<usize>,
185) -> PyResult<Vec<FundingRateUpdate>> {
186 load_funding_rates(filepath, instrument_id, limit).map_err(to_pyvalue_err)
187}
188
189impl_tardis_stream_iterator!(
190 TardisDeltaStreamIterator,
191 OrderBookDelta,
192 "TardisDeltasStreamIterator"
193);
194
195#[pyfunction(name = "stream_tardis_deltas")]
201#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
202pub fn py_stream_tardis_deltas(
203 filepath: PathBuf,
204 chunk_size: usize,
205 price_precision: Option<u8>,
206 size_precision: Option<u8>,
207 instrument_id: Option<InstrumentId>,
208 limit: Option<usize>,
209) -> PyResult<TardisDeltaStreamIterator> {
210 let stream = stream_deltas(
211 filepath,
212 chunk_size,
213 price_precision,
214 size_precision,
215 instrument_id,
216 limit,
217 )
218 .map_err(to_pyvalue_err)?;
219
220 Ok(TardisDeltaStreamIterator {
221 stream: Box::new(stream),
222 })
223}
224
225#[pyclass(unsendable)]
226pub struct TardisBatchedDeltasStreamIterator {
227 stream: Box<dyn Iterator<Item = anyhow::Result<Vec<PyObject>>>>,
228}
229
230impl Debug for TardisBatchedDeltasStreamIterator {
231 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232 write!(f, "TardisBatchedDeltasStreamIterator {{ stream: ... }}")
233 }
234}
235
236#[pymethods]
237impl TardisBatchedDeltasStreamIterator {
238 const fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
239 slf
240 }
241
242 fn __next__(&mut self) -> PyResult<Option<Vec<PyObject>>> {
243 match self.stream.next() {
244 Some(Ok(batch)) => Ok(Some(batch)),
245 Some(Err(e)) => Err(to_pyvalue_err(e)),
246 None => Ok(None),
247 }
248 }
249}
250
251#[pyfunction(name = "stream_tardis_batched_deltas")]
257#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
258pub fn py_stream_tardis_batched_deltas(
259 filepath: PathBuf,
260 chunk_size: usize,
261 price_precision: Option<u8>,
262 size_precision: Option<u8>,
263 instrument_id: Option<InstrumentId>,
264 limit: Option<usize>,
265) -> PyResult<TardisBatchedDeltasStreamIterator> {
266 let stream = stream_batched_deltas(
267 filepath,
268 chunk_size,
269 price_precision,
270 size_precision,
271 instrument_id,
272 limit,
273 )
274 .map_err(to_pyvalue_err)?;
275
276 Ok(TardisBatchedDeltasStreamIterator {
277 stream: Box::new(stream),
278 })
279}
280
281impl_tardis_stream_iterator!(
282 TardisQuoteStreamIterator,
283 QuoteTick,
284 "TardisQuoteStreamIterator"
285);
286
287#[pyfunction(name = "stream_tardis_quotes")]
293#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
294pub fn py_stream_tardis_quotes(
295 filepath: PathBuf,
296 chunk_size: usize,
297 price_precision: Option<u8>,
298 size_precision: Option<u8>,
299 instrument_id: Option<InstrumentId>,
300 limit: Option<usize>,
301) -> PyResult<TardisQuoteStreamIterator> {
302 let stream = stream_quotes(
303 filepath,
304 chunk_size,
305 price_precision,
306 size_precision,
307 instrument_id,
308 limit,
309 )
310 .map_err(to_pyvalue_err)?;
311
312 Ok(TardisQuoteStreamIterator {
313 stream: Box::new(stream),
314 })
315}
316
317impl_tardis_stream_iterator!(
318 TardisTradeStreamIterator,
319 TradeTick,
320 "TardisTradeStreamIterator"
321);
322
323#[pyfunction(name = "stream_tardis_trades")]
329#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
330pub fn py_stream_tardis_trades(
331 filepath: PathBuf,
332 chunk_size: usize,
333 price_precision: Option<u8>,
334 size_precision: Option<u8>,
335 instrument_id: Option<InstrumentId>,
336 limit: Option<usize>,
337) -> PyResult<TardisTradeStreamIterator> {
338 let stream = stream_trades(
339 filepath,
340 chunk_size,
341 price_precision,
342 size_precision,
343 instrument_id,
344 limit,
345 )
346 .map_err(to_pyvalue_err)?;
347
348 Ok(TardisTradeStreamIterator {
349 stream: Box::new(stream),
350 })
351}
352
353impl_tardis_stream_iterator!(
354 TardisDepth10StreamIterator,
355 OrderBookDepth10,
356 "TardisDepth10StreamIterator"
357);
358
359#[pyfunction(name = "stream_tardis_depth10_from_snapshot5")]
365#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
366pub fn py_stream_tardis_depth10_from_snapshot5(
367 filepath: PathBuf,
368 chunk_size: usize,
369 price_precision: Option<u8>,
370 size_precision: Option<u8>,
371 instrument_id: Option<InstrumentId>,
372 limit: Option<usize>,
373) -> PyResult<TardisDepth10StreamIterator> {
374 let stream = stream_depth10_from_snapshot5(
375 filepath,
376 chunk_size,
377 price_precision,
378 size_precision,
379 instrument_id,
380 limit,
381 )
382 .map_err(to_pyvalue_err)?;
383
384 Ok(TardisDepth10StreamIterator {
385 stream: Box::new(stream),
386 })
387}
388
389#[pyfunction(name = "stream_tardis_depth10_from_snapshot25")]
395#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
396pub fn py_stream_tardis_depth10_from_snapshot25(
397 filepath: PathBuf,
398 chunk_size: usize,
399 price_precision: Option<u8>,
400 size_precision: Option<u8>,
401 instrument_id: Option<InstrumentId>,
402 limit: Option<usize>,
403) -> PyResult<TardisDepth10StreamIterator> {
404 let stream = stream_depth10_from_snapshot25(
405 filepath,
406 chunk_size,
407 price_precision,
408 size_precision,
409 instrument_id,
410 limit,
411 )
412 .map_err(to_pyvalue_err)?;
413
414 Ok(TardisDepth10StreamIterator {
415 stream: Box::new(stream),
416 })
417}
418
419impl_tardis_stream_iterator!(
420 TardisFundingRateStreamIterator,
421 FundingRateUpdate,
422 "TardisFundingRateStreamIterator"
423);
424
425#[pyfunction(name = "stream_tardis_funding_rates")]
431#[pyo3(signature = (filepath, chunk_size=100_000, instrument_id=None, limit=None))]
432pub fn py_stream_tardis_funding_rates(
433 filepath: PathBuf,
434 chunk_size: usize,
435 instrument_id: Option<InstrumentId>,
436 limit: Option<usize>,
437) -> PyResult<TardisFundingRateStreamIterator> {
438 let stream =
439 stream_funding_rates(filepath, chunk_size, instrument_id, limit).map_err(to_pyvalue_err)?;
440
441 Ok(TardisFundingRateStreamIterator {
442 stream: Box::new(stream),
443 })
444}