nautilus_persistence/python/backend/
session.rs1use std::collections::HashMap;
17
18use nautilus_core::{
19 ffi::cvec::CVec,
20 python::{IntoPyObjectNautilusExt, to_pyruntime_err},
21};
22use nautilus_model::data::{
23 Bar, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
24};
25use pyo3::{prelude::*, types::PyCapsule};
26
27use crate::backend::session::{DataBackendSession, DataQueryResult};
28
29#[repr(C)]
30#[pyclass(eq, eq_int)]
31#[derive(Clone, Copy, Debug, PartialEq, Eq)]
32pub enum NautilusDataType {
33 OrderBookDelta = 1,
35 OrderBookDepth10 = 2,
36 QuoteTick = 3,
37 TradeTick = 4,
38 Bar = 5,
39 MarkPriceUpdate = 6,
40}
41
42#[pymethods]
43impl DataBackendSession {
44 #[new]
45 #[pyo3(signature=(chunk_size=10_000))]
46 fn new_session(chunk_size: usize) -> Self {
47 Self::new(chunk_size)
48 }
49
50 #[pyo3(name = "add_file")]
64 #[pyo3(signature = (data_type, table_name, file_path, sql_query=None))]
65 fn py_add_file(
66 mut slf: PyRefMut<'_, Self>,
67 data_type: NautilusDataType,
68 table_name: &str,
69 file_path: &str,
70 sql_query: Option<&str>,
71 ) -> PyResult<()> {
72 let _guard = slf.runtime.enter();
73
74 match data_type {
75 NautilusDataType::OrderBookDelta => slf
76 .add_file::<OrderBookDelta>(table_name, file_path, sql_query)
77 .map_err(to_pyruntime_err),
78 NautilusDataType::OrderBookDepth10 => slf
79 .add_file::<OrderBookDepth10>(table_name, file_path, sql_query)
80 .map_err(to_pyruntime_err),
81 NautilusDataType::QuoteTick => slf
82 .add_file::<QuoteTick>(table_name, file_path, sql_query)
83 .map_err(to_pyruntime_err),
84 NautilusDataType::TradeTick => slf
85 .add_file::<TradeTick>(table_name, file_path, sql_query)
86 .map_err(to_pyruntime_err),
87 NautilusDataType::Bar => slf
88 .add_file::<Bar>(table_name, file_path, sql_query)
89 .map_err(to_pyruntime_err),
90 NautilusDataType::MarkPriceUpdate => slf
91 .add_file::<MarkPriceUpdate>(table_name, file_path, sql_query)
92 .map_err(to_pyruntime_err),
93 }
94 }
95
96 fn to_query_result(mut slf: PyRefMut<'_, Self>) -> DataQueryResult {
97 let query_result = slf.get_query_result();
98 DataQueryResult::new(query_result, slf.chunk_size)
99 }
100
101 #[pyo3(name = "register_object_store_from_uri")]
103 #[pyo3(signature = (uri, storage_options=None))]
104 fn py_register_object_store_from_uri(
105 mut slf: PyRefMut<'_, Self>,
106 uri: &str,
107 storage_options: Option<HashMap<String, String>>,
108 ) -> PyResult<()> {
109 let storage_options = storage_options.map(|m| m.into_iter().collect());
111 slf.register_object_store_from_uri(uri, storage_options)
112 .map_err(to_pyruntime_err)
113 }
114}
115
116#[pymethods]
117impl DataQueryResult {
118 const fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
120 slf
121 }
122
123 fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<Option<Py<PyAny>>> {
125 match slf.next() {
126 Some(acc) if !acc.is_empty() => {
127 let cvec = slf.set_chunk(acc);
128 Python::attach(|py| {
129 match PyCapsule::new_with_destructor::<CVec, _>(py, cvec, None, |_, _| {}) {
130 Ok(capsule) => Ok(Some(capsule.into_py_any_unwrap(py))),
131 Err(e) => Err(to_pyruntime_err(e)),
132 }
133 })
134 }
135 _ => Ok(None),
136 }
137 }
138}