nautilus_persistence/python/backend/
session.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::collections::HashMap;
17
18use nautilus_core::{
19    ffi::cvec::CVec,
20    python::{IntoPyObjectNautilusExt, to_pyruntime_err},
21};
22use nautilus_model::data::{
23    Bar, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
24};
25use pyo3::{prelude::*, types::PyCapsule};
26
27use crate::backend::session::{DataBackendSession, DataQueryResult};
28
29#[repr(C)]
30#[pyclass(eq, eq_int)]
31#[derive(Clone, Copy, Debug, PartialEq, Eq)]
32pub enum NautilusDataType {
33    // Custom = 0,  # First slot reserved for custom data
34    OrderBookDelta = 1,
35    OrderBookDepth10 = 2,
36    QuoteTick = 3,
37    TradeTick = 4,
38    Bar = 5,
39    MarkPriceUpdate = 6,
40}
41
42#[pymethods]
43impl DataBackendSession {
44    #[new]
45    #[pyo3(signature=(chunk_size=10_000))]
46    fn new_session(chunk_size: usize) -> Self {
47        Self::new(chunk_size)
48    }
49
50    /// Query a file for its records. the caller must specify `T` to indicate
51    /// the kind of data expected from this query.
52    ///
53    /// `table_name`: Logical `table_name` assigned to this file. Queries to this file should address the
54    /// file by its table name.
55    /// `file_path`: Path to file
56    /// `sql_query`: A custom sql query to retrieve records from file. If no query is provided a default
57    /// query "SELECT * FROM <`table_name`>" is run.
58    ///
59    /// # Safety
60    ///
61    /// The file data must be ordered by the `ts_init` in ascending order for this
62    /// to work correctly.
63    #[pyo3(name = "add_file")]
64    #[pyo3(signature = (data_type, table_name, file_path, sql_query=None))]
65    fn py_add_file(
66        mut slf: PyRefMut<'_, Self>,
67        data_type: NautilusDataType,
68        table_name: &str,
69        file_path: &str,
70        sql_query: Option<&str>,
71    ) -> PyResult<()> {
72        let _guard = slf.runtime.enter();
73
74        match data_type {
75            NautilusDataType::OrderBookDelta => slf
76                .add_file::<OrderBookDelta>(table_name, file_path, sql_query)
77                .map_err(to_pyruntime_err),
78            NautilusDataType::OrderBookDepth10 => slf
79                .add_file::<OrderBookDepth10>(table_name, file_path, sql_query)
80                .map_err(to_pyruntime_err),
81            NautilusDataType::QuoteTick => slf
82                .add_file::<QuoteTick>(table_name, file_path, sql_query)
83                .map_err(to_pyruntime_err),
84            NautilusDataType::TradeTick => slf
85                .add_file::<TradeTick>(table_name, file_path, sql_query)
86                .map_err(to_pyruntime_err),
87            NautilusDataType::Bar => slf
88                .add_file::<Bar>(table_name, file_path, sql_query)
89                .map_err(to_pyruntime_err),
90            NautilusDataType::MarkPriceUpdate => slf
91                .add_file::<MarkPriceUpdate>(table_name, file_path, sql_query)
92                .map_err(to_pyruntime_err),
93        }
94    }
95
96    fn to_query_result(mut slf: PyRefMut<'_, Self>) -> DataQueryResult {
97        let query_result = slf.get_query_result();
98        DataQueryResult::new(query_result, slf.chunk_size)
99    }
100
101    /// Register an object store with the session context from a URI with optional storage options
102    #[pyo3(name = "register_object_store_from_uri")]
103    #[pyo3(signature = (uri, storage_options=None))]
104    fn py_register_object_store_from_uri(
105        mut slf: PyRefMut<'_, Self>,
106        uri: &str,
107        storage_options: Option<HashMap<String, String>>,
108    ) -> PyResult<()> {
109        // Convert HashMap to AHashMap for internal use
110        let storage_options = storage_options.map(|m| m.into_iter().collect());
111        slf.register_object_store_from_uri(uri, storage_options)
112            .map_err(to_pyruntime_err)
113    }
114}
115
116#[pymethods]
117impl DataQueryResult {
118    /// The reader implements an iterator.
119    const fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
120        slf
121    }
122
123    /// Each iteration returns a chunk of values read from the parquet file.
124    fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<Option<Py<PyAny>>> {
125        match slf.next() {
126            Some(acc) if !acc.is_empty() => {
127                let cvec = slf.set_chunk(acc);
128                Python::attach(|py| {
129                    match PyCapsule::new_with_destructor::<CVec, _>(py, cvec, None, |_, _| {}) {
130                        Ok(capsule) => Ok(Some(capsule.into_py_any_unwrap(py))),
131                        Err(e) => Err(to_pyruntime_err(e)),
132                    }
133                })
134            }
135            _ => Ok(None),
136        }
137    }
138}