nautilus_persistence/python/backend/
session.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use nautilus_core::{
17    ffi::cvec::CVec,
18    python::{IntoPyObjectNautilusExt, to_pyruntime_err},
19};
20use nautilus_model::data::{
21    Bar, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
22};
23use pyo3::{prelude::*, types::PyCapsule};
24
25use crate::backend::session::{DataBackendSession, DataQueryResult};
26
27#[repr(C)]
28#[pyclass(eq, eq_int)]
29#[derive(Clone, Copy, Debug, PartialEq, Eq)]
30pub enum NautilusDataType {
31    // Custom = 0,  # First slot reserved for custom data
32    OrderBookDelta = 1,
33    OrderBookDepth10 = 2,
34    QuoteTick = 3,
35    TradeTick = 4,
36    Bar = 5,
37    MarkPriceUpdate = 6,
38}
39
40#[pymethods]
41impl DataBackendSession {
42    #[new]
43    #[pyo3(signature=(chunk_size=10_000))]
44    fn new_session(chunk_size: usize) -> Self {
45        Self::new(chunk_size)
46    }
47
48    /// Query a file for its records. the caller must specify `T` to indicate
49    /// the kind of data expected from this query.
50    ///
51    /// `table_name`: Logical `table_name` assigned to this file. Queries to this file should address the
52    /// file by its table name.
53    /// `file_path`: Path to file
54    /// `sql_query`: A custom sql query to retrieve records from file. If no query is provided a default
55    /// query "SELECT * FROM <`table_name`>" is run.
56    ///
57    /// # Safety
58    ///
59    /// The file data must be ordered by the `ts_init` in ascending order for this
60    /// to work correctly.
61    #[pyo3(name = "add_file")]
62    #[pyo3(signature = (data_type, table_name, file_path, sql_query=None))]
63    fn add_file_py(
64        mut slf: PyRefMut<'_, Self>,
65        data_type: NautilusDataType,
66        table_name: &str,
67        file_path: &str,
68        sql_query: Option<&str>,
69    ) -> PyResult<()> {
70        let _guard = slf.runtime.enter();
71
72        match data_type {
73            NautilusDataType::OrderBookDelta => slf
74                .add_file::<OrderBookDelta>(table_name, file_path, sql_query)
75                .map_err(to_pyruntime_err),
76            NautilusDataType::OrderBookDepth10 => slf
77                .add_file::<OrderBookDepth10>(table_name, file_path, sql_query)
78                .map_err(to_pyruntime_err),
79            NautilusDataType::QuoteTick => slf
80                .add_file::<QuoteTick>(table_name, file_path, sql_query)
81                .map_err(to_pyruntime_err),
82            NautilusDataType::TradeTick => slf
83                .add_file::<TradeTick>(table_name, file_path, sql_query)
84                .map_err(to_pyruntime_err),
85            NautilusDataType::Bar => slf
86                .add_file::<Bar>(table_name, file_path, sql_query)
87                .map_err(to_pyruntime_err),
88            NautilusDataType::MarkPriceUpdate => slf
89                .add_file::<MarkPriceUpdate>(table_name, file_path, sql_query)
90                .map_err(to_pyruntime_err),
91        }
92    }
93
94    fn to_query_result(mut slf: PyRefMut<'_, Self>) -> DataQueryResult {
95        let query_result = slf.get_query_result();
96        DataQueryResult::new(query_result, slf.chunk_size)
97    }
98
99    /// Register an object store with the session context from a URI with optional storage options
100    #[pyo3(name = "register_object_store_from_uri")]
101    #[pyo3(signature = (uri, storage_options=None))]
102    fn register_object_store_from_uri_py(
103        mut slf: PyRefMut<'_, Self>,
104        uri: &str,
105        storage_options: Option<std::collections::HashMap<String, String>>,
106    ) -> PyResult<()> {
107        slf.register_object_store_from_uri(uri, storage_options)
108            .map_err(to_pyruntime_err)
109    }
110}
111
112#[pymethods]
113impl DataQueryResult {
114    /// The reader implements an iterator.
115    const fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
116        slf
117    }
118
119    /// Each iteration returns a chunk of values read from the parquet file.
120    fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<Option<PyObject>> {
121        match slf.next() {
122            Some(acc) if !acc.is_empty() => {
123                let cvec = slf.set_chunk(acc);
124                Python::with_gil(|py| match PyCapsule::new::<CVec>(py, cvec, None) {
125                    Ok(capsule) => Ok(Some(capsule.into_py_any_unwrap(py))),
126                    Err(e) => Err(to_pyruntime_err(e)),
127                })
128            }
129            _ => Ok(None),
130        }
131    }
132}