nautilus_persistence/python/backend/
session.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use nautilus_core::{ffi::cvec::CVec, python::to_pyruntime_err};
17use nautilus_model::data::{Bar, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick};
18use pyo3::{prelude::*, types::PyCapsule};
19
20use crate::backend::session::{DataBackendSession, DataQueryResult};
21
22#[repr(C)]
23#[pyclass(eq, eq_int)]
24#[derive(Clone, Copy, Debug, PartialEq, Eq)]
25pub enum NautilusDataType {
26    // Custom = 0,  # First slot reserved for custom data
27    OrderBookDelta = 1,
28    OrderBookDepth10 = 2,
29    QuoteTick = 3,
30    TradeTick = 4,
31    Bar = 5,
32}
33
34#[pymethods]
35impl DataBackendSession {
36    #[new]
37    #[pyo3(signature=(chunk_size=10_000))]
38    fn new_session(chunk_size: usize) -> Self {
39        Self::new(chunk_size)
40    }
41
42    /// Query a file for its records. the caller must specify `T` to indicate
43    /// the kind of data expected from this query.
44    ///
45    /// table_name: Logical table_name assigned to this file. Queries to this file should address the
46    /// file by its table name.
47    /// file_path: Path to file
48    /// sql_query: A custom sql query to retrieve records from file. If no query is provided a default
49    /// query "SELECT * FROM <table_name>" is run.
50    ///
51    /// # Safety
52    ///
53    /// The file data must be ordered by the ts_init in ascending order for this
54    /// to work correctly.
55    #[pyo3(name = "add_file")]
56    #[pyo3(signature = (data_type, table_name, file_path, sql_query=None))]
57    fn add_file_py(
58        mut slf: PyRefMut<'_, Self>,
59        data_type: NautilusDataType,
60        table_name: &str,
61        file_path: &str,
62        sql_query: Option<&str>,
63    ) -> PyResult<()> {
64        let _guard = slf.runtime.enter();
65
66        match data_type {
67            NautilusDataType::OrderBookDelta => slf
68                .add_file::<OrderBookDelta>(table_name, file_path, sql_query)
69                .map_err(to_pyruntime_err),
70            NautilusDataType::OrderBookDepth10 => slf
71                .add_file::<OrderBookDepth10>(table_name, file_path, sql_query)
72                .map_err(to_pyruntime_err),
73            NautilusDataType::QuoteTick => slf
74                .add_file::<QuoteTick>(table_name, file_path, sql_query)
75                .map_err(to_pyruntime_err),
76            NautilusDataType::TradeTick => slf
77                .add_file::<TradeTick>(table_name, file_path, sql_query)
78                .map_err(to_pyruntime_err),
79            NautilusDataType::Bar => slf
80                .add_file::<Bar>(table_name, file_path, sql_query)
81                .map_err(to_pyruntime_err),
82        }
83    }
84
85    fn to_query_result(mut slf: PyRefMut<'_, Self>) -> DataQueryResult {
86        let query_result = slf.get_query_result();
87        DataQueryResult::new(query_result, slf.chunk_size)
88    }
89}
90
91#[pymethods]
92impl DataQueryResult {
93    /// The reader implements an iterator.
94    const fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
95        slf
96    }
97
98    /// Each iteration returns a chunk of values read from the parquet file.
99    fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<Option<PyObject>> {
100        match slf.next() {
101            Some(acc) if !acc.is_empty() => {
102                let cvec = slf.set_chunk(acc);
103                Python::with_gil(|py| match PyCapsule::new::<CVec>(py, cvec, None) {
104                    Ok(capsule) => Ok(Some(capsule.into_py(py))),
105                    Err(e) => Err(to_pyruntime_err(e)),
106                })
107            }
108            _ => Ok(None),
109        }
110    }
111}