nautilus_persistence/python/backend/
session.rs1use nautilus_core::{
17 ffi::cvec::CVec,
18 python::{IntoPyObjectNautilusExt, to_pyruntime_err},
19};
20use nautilus_model::data::{
21 Bar, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
22};
23use pyo3::{prelude::*, types::PyCapsule};
24
25use crate::backend::session::{DataBackendSession, DataQueryResult};
26
27#[repr(C)]
28#[pyclass(eq, eq_int)]
29#[derive(Clone, Copy, Debug, PartialEq, Eq)]
30pub enum NautilusDataType {
31 OrderBookDelta = 1,
33 OrderBookDepth10 = 2,
34 QuoteTick = 3,
35 TradeTick = 4,
36 Bar = 5,
37 MarkPriceUpdate = 6,
38}
39
40#[pymethods]
41impl DataBackendSession {
42 #[new]
43 #[pyo3(signature=(chunk_size=10_000))]
44 fn new_session(chunk_size: usize) -> Self {
45 Self::new(chunk_size)
46 }
47
48 #[pyo3(name = "add_file")]
62 #[pyo3(signature = (data_type, table_name, file_path, sql_query=None))]
63 fn add_file_py(
64 mut slf: PyRefMut<'_, Self>,
65 data_type: NautilusDataType,
66 table_name: &str,
67 file_path: &str,
68 sql_query: Option<&str>,
69 ) -> PyResult<()> {
70 let _guard = slf.runtime.enter();
71
72 match data_type {
73 NautilusDataType::OrderBookDelta => slf
74 .add_file::<OrderBookDelta>(table_name, file_path, sql_query)
75 .map_err(to_pyruntime_err),
76 NautilusDataType::OrderBookDepth10 => slf
77 .add_file::<OrderBookDepth10>(table_name, file_path, sql_query)
78 .map_err(to_pyruntime_err),
79 NautilusDataType::QuoteTick => slf
80 .add_file::<QuoteTick>(table_name, file_path, sql_query)
81 .map_err(to_pyruntime_err),
82 NautilusDataType::TradeTick => slf
83 .add_file::<TradeTick>(table_name, file_path, sql_query)
84 .map_err(to_pyruntime_err),
85 NautilusDataType::Bar => slf
86 .add_file::<Bar>(table_name, file_path, sql_query)
87 .map_err(to_pyruntime_err),
88 NautilusDataType::MarkPriceUpdate => slf
89 .add_file::<MarkPriceUpdate>(table_name, file_path, sql_query)
90 .map_err(to_pyruntime_err),
91 }
92 }
93
94 fn to_query_result(mut slf: PyRefMut<'_, Self>) -> DataQueryResult {
95 let query_result = slf.get_query_result();
96 DataQueryResult::new(query_result, slf.chunk_size)
97 }
98
99 #[pyo3(name = "register_object_store_from_uri")]
101 #[pyo3(signature = (uri, storage_options=None))]
102 fn register_object_store_from_uri_py(
103 mut slf: PyRefMut<'_, Self>,
104 uri: &str,
105 storage_options: Option<std::collections::HashMap<String, String>>,
106 ) -> PyResult<()> {
107 slf.register_object_store_from_uri(uri, storage_options)
108 .map_err(to_pyruntime_err)
109 }
110}
111
112#[pymethods]
113impl DataQueryResult {
114 const fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
116 slf
117 }
118
119 fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<Option<PyObject>> {
121 match slf.next() {
122 Some(acc) if !acc.is_empty() => {
123 let cvec = slf.set_chunk(acc);
124 Python::with_gil(|py| match PyCapsule::new::<CVec>(py, cvec, None) {
125 Ok(capsule) => Ok(Some(capsule.into_py_any_unwrap(py))),
126 Err(e) => Err(to_pyruntime_err(e)),
127 })
128 }
129 _ => Ok(None),
130 }
131 }
132}