nautilus_persistence/python/
catalog.rs
1use std::path::PathBuf;
17
18use nautilus_model::data::{Bar, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick};
19use nautilus_serialization::enums::ParquetWriteMode;
20use pyo3::{exceptions::PyIOError, prelude::*};
21
22use crate::backend::catalog::ParquetDataCatalog;
23
24#[cfg_attr(
26 feature = "python",
27 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.persistence")
28)]
29pub struct ParquetDataCatalogV2 {
30 inner: ParquetDataCatalog,
31}
32
33#[pymethods]
34impl ParquetDataCatalogV2 {
35 #[new]
37 #[pyo3(signature = (base_path, batch_size=None))]
38 #[must_use]
39 pub fn new(base_path: String, batch_size: Option<usize>) -> Self {
40 Self {
41 inner: ParquetDataCatalog::new(PathBuf::from(base_path), batch_size),
42 }
43 }
44
45 #[pyo3(signature = (data, write_mode=None))]
49 pub fn write_quote_ticks(
50 &self,
51 data: Vec<QuoteTick>,
52 write_mode: Option<ParquetWriteMode>,
53 ) -> PyResult<String> {
54 self.inner
55 .write_to_parquet(data, None, None, None, write_mode)
56 .map(|path| path.to_string_lossy().to_string())
57 .map_err(|e| PyIOError::new_err(format!("Failed to write quote ticks: {e}")))
58 }
59
60 #[pyo3(signature = (data, write_mode=None))]
61 pub fn write_trade_ticks(
62 &self,
63 data: Vec<TradeTick>,
64 write_mode: Option<ParquetWriteMode>,
65 ) -> PyResult<String> {
66 self.inner
67 .write_to_parquet(data, None, None, None, write_mode)
68 .map(|path| path.to_string_lossy().to_string())
69 .map_err(|e| PyIOError::new_err(format!("Failed to write trade ticks: {e}")))
70 }
71
72 #[pyo3(signature = (data, write_mode=None))]
73 pub fn write_order_book_deltas(
74 &self,
75 data: Vec<OrderBookDelta>,
76 write_mode: Option<ParquetWriteMode>,
77 ) -> PyResult<String> {
78 self.inner
79 .write_to_parquet(data, None, None, None, write_mode)
80 .map(|path| path.to_string_lossy().to_string())
81 .map_err(|e| PyIOError::new_err(format!("Failed to write order book deltas: {e}")))
82 }
83
84 #[pyo3(signature = (data, write_mode=None))]
85 pub fn write_bars(
86 &self,
87 data: Vec<Bar>,
88 write_mode: Option<ParquetWriteMode>,
89 ) -> PyResult<String> {
90 self.inner
91 .write_to_parquet(data, None, None, None, write_mode)
92 .map(|path| path.to_string_lossy().to_string())
93 .map_err(|e| PyIOError::new_err(format!("Failed to write bars: {e}")))
94 }
95
96 #[pyo3(signature = (data, write_mode=None))]
97 pub fn write_order_book_depths(
98 &self,
99 data: Vec<OrderBookDepth10>,
100 write_mode: Option<ParquetWriteMode>,
101 ) -> PyResult<String> {
102 self.inner
103 .write_to_parquet(data, None, None, None, write_mode)
104 .map(|path| path.to_string_lossy().to_string())
105 .map_err(|e| PyIOError::new_err(format!("Failed to write order book depths: {e}")))
106 }
107
108 #[pyo3(signature = ())]
109 pub fn consolidate_catalog(&self) -> PyResult<()> {
110 self.inner
111 .consolidate_catalog()
112 .map_err(|e| PyIOError::new_err(format!("Failed to consolidate catalog: {e}")))
113 }
114
115 #[pyo3(signature = (type_name, instrument_id=None))]
116 pub fn consolidate_data(&self, type_name: &str, instrument_id: Option<String>) -> PyResult<()> {
117 self.inner
118 .consolidate_data(type_name, instrument_id)
119 .map_err(|e| PyIOError::new_err(format!("Failed to consolidate data: {e}")))
120 }
121
122 #[pyo3(signature = (data_cls, instrument_id=None, is_last=true))]
123 pub fn query_timestamp_bound(
124 &self,
125 data_cls: &str,
126 instrument_id: Option<String>,
127 is_last: Option<bool>,
128 ) -> PyResult<Option<i64>> {
129 self.inner
130 .query_timestamp_bound(data_cls, instrument_id, is_last)
131 .map_err(|e| PyIOError::new_err(format!("Failed to compute timestamp bound: {e}")))
132 }
133
134 #[pyo3(signature = (type_name, instrument_id=None))]
135 pub fn query_parquet_files(
136 &self,
137 type_name: &str,
138 instrument_id: Option<String>,
139 ) -> PyResult<Vec<String>> {
140 self.inner
141 .query_parquet_files(type_name, instrument_id)
142 .map(|paths| {
143 paths
144 .iter()
145 .map(|p| p.to_string_lossy().to_string())
146 .collect()
147 })
148 .map_err(|e| PyIOError::new_err(format!("Failed to query parquet files: {e}")))
149 }
150}