nautilus_persistence/python/
catalog.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::path::PathBuf;
17
18use nautilus_model::data::{Bar, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick};
19use nautilus_serialization::enums::ParquetWriteMode;
20use pyo3::{exceptions::PyIOError, prelude::*};
21
22use crate::backend::catalog::ParquetDataCatalog;
23
24/// A catalog for writing data to Parquet files.
25#[cfg_attr(
26    feature = "python",
27    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.persistence")
28)]
29pub struct ParquetDataCatalogV2 {
30    inner: ParquetDataCatalog,
31}
32
33#[pymethods]
34impl ParquetDataCatalogV2 {
35    /// Create a new ParquetCatalog with the given base path and optional batch size.
36    #[new]
37    #[pyo3(signature = (base_path, batch_size=None))]
38    #[must_use]
39    pub fn new(base_path: String, batch_size: Option<usize>) -> Self {
40        Self {
41            inner: ParquetDataCatalog::new(PathBuf::from(base_path), batch_size),
42        }
43    }
44
45    // TODO: Cannot pass mixed data across pyo3 as a single type
46    // pub fn write_data(mut slf: PyRefMut<'_, Self>, data_type: NautilusDataType, data: Vec<Data>) {}
47
48    #[pyo3(signature = (data, write_mode=None))]
49    pub fn write_quote_ticks(
50        &self,
51        data: Vec<QuoteTick>,
52        write_mode: Option<ParquetWriteMode>,
53    ) -> PyResult<String> {
54        self.inner
55            .write_to_parquet(data, None, None, None, write_mode)
56            .map(|path| path.to_string_lossy().to_string())
57            .map_err(|e| PyIOError::new_err(format!("Failed to write quote ticks: {e}")))
58    }
59
60    #[pyo3(signature = (data, write_mode=None))]
61    pub fn write_trade_ticks(
62        &self,
63        data: Vec<TradeTick>,
64        write_mode: Option<ParquetWriteMode>,
65    ) -> PyResult<String> {
66        self.inner
67            .write_to_parquet(data, None, None, None, write_mode)
68            .map(|path| path.to_string_lossy().to_string())
69            .map_err(|e| PyIOError::new_err(format!("Failed to write trade ticks: {e}")))
70    }
71
72    #[pyo3(signature = (data, write_mode=None))]
73    pub fn write_order_book_deltas(
74        &self,
75        data: Vec<OrderBookDelta>,
76        write_mode: Option<ParquetWriteMode>,
77    ) -> PyResult<String> {
78        self.inner
79            .write_to_parquet(data, None, None, None, write_mode)
80            .map(|path| path.to_string_lossy().to_string())
81            .map_err(|e| PyIOError::new_err(format!("Failed to write order book deltas: {e}")))
82    }
83
84    #[pyo3(signature = (data, write_mode=None))]
85    pub fn write_bars(
86        &self,
87        data: Vec<Bar>,
88        write_mode: Option<ParquetWriteMode>,
89    ) -> PyResult<String> {
90        self.inner
91            .write_to_parquet(data, None, None, None, write_mode)
92            .map(|path| path.to_string_lossy().to_string())
93            .map_err(|e| PyIOError::new_err(format!("Failed to write bars: {e}")))
94    }
95
96    #[pyo3(signature = (data, write_mode=None))]
97    pub fn write_order_book_depths(
98        &self,
99        data: Vec<OrderBookDepth10>,
100        write_mode: Option<ParquetWriteMode>,
101    ) -> PyResult<String> {
102        self.inner
103            .write_to_parquet(data, None, None, None, write_mode)
104            .map(|path| path.to_string_lossy().to_string())
105            .map_err(|e| PyIOError::new_err(format!("Failed to write order book depths: {e}")))
106    }
107
108    #[pyo3(signature = ())]
109    pub fn consolidate_catalog(&self) -> PyResult<()> {
110        self.inner
111            .consolidate_catalog()
112            .map_err(|e| PyIOError::new_err(format!("Failed to consolidate catalog: {e}")))
113    }
114
115    #[pyo3(signature = (type_name, instrument_id=None))]
116    pub fn consolidate_data(&self, type_name: &str, instrument_id: Option<String>) -> PyResult<()> {
117        self.inner
118            .consolidate_data(type_name, instrument_id)
119            .map_err(|e| PyIOError::new_err(format!("Failed to consolidate data: {e}")))
120    }
121
122    #[pyo3(signature = (data_cls, instrument_id=None, is_last=true))]
123    pub fn query_timestamp_bound(
124        &self,
125        data_cls: &str,
126        instrument_id: Option<String>,
127        is_last: Option<bool>,
128    ) -> PyResult<Option<i64>> {
129        self.inner
130            .query_timestamp_bound(data_cls, instrument_id, is_last)
131            .map_err(|e| PyIOError::new_err(format!("Failed to compute timestamp bound: {e}")))
132    }
133
134    #[pyo3(signature = (type_name, instrument_id=None))]
135    pub fn query_parquet_files(
136        &self,
137        type_name: &str,
138        instrument_id: Option<String>,
139    ) -> PyResult<Vec<String>> {
140        self.inner
141            .query_parquet_files(type_name, instrument_id)
142            .map(|paths| {
143                paths
144                    .iter()
145                    .map(|p| p.to_string_lossy().to_string())
146                    .collect()
147            })
148            .map_err(|e| PyIOError::new_err(format!("Failed to query parquet files: {e}")))
149    }
150}