nautilus_databento/python/
live.rs1use std::{
17 collections::HashMap,
18 fs,
19 path::PathBuf,
20 str::FromStr,
21 sync::{Arc, RwLock},
22};
23
24use databento::{dbn, live::Subscription};
25use indexmap::IndexMap;
26use nautilus_core::python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err};
27use nautilus_model::{
28 identifiers::{InstrumentId, Symbol, Venue},
29 python::{data::data_to_pycapsule, instruments::instrument_any_to_pyobject},
30};
31use pyo3::prelude::*;
32use time::OffsetDateTime;
33
34use crate::{
35 live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
36 symbology::{check_consistent_symbology, infer_symbology_type, instrument_id_to_symbol_string},
37 types::DatabentoPublisher,
38};
39
40#[cfg_attr(
41 feature = "python",
42 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
43)]
44#[derive(Debug)]
45pub struct DatabentoLiveClient {
46 #[pyo3(get)]
47 pub key: String,
48 #[pyo3(get)]
49 pub dataset: String,
50 is_running: bool,
51 is_closed: bool,
52 cmd_tx: tokio::sync::mpsc::UnboundedSender<LiveCommand>,
53 cmd_rx: Option<tokio::sync::mpsc::UnboundedReceiver<LiveCommand>>,
54 buffer_size: usize,
55 publisher_venue_map: IndexMap<u16, Venue>,
56 symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
57}
58
59impl DatabentoLiveClient {
60 #[must_use]
61 pub fn is_closed(&self) -> bool {
62 self.cmd_tx.is_closed()
63 }
64
65 async fn process_messages(
66 mut msg_rx: tokio::sync::mpsc::Receiver<LiveMessage>,
67 callback: PyObject,
68 callback_pyo3: PyObject,
69 ) -> PyResult<()> {
70 tracing::debug!("Processing messages...");
71 while let Some(msg) = msg_rx.recv().await {
73 tracing::trace!("Received message: {msg:?}");
74 match msg {
75 LiveMessage::Data(data) => Python::with_gil(|py| {
76 let py_obj = data_to_pycapsule(py, data);
77 call_python(py, &callback, py_obj);
78 }),
79 LiveMessage::Instrument(data) => Python::with_gil(|py| {
80 let py_obj =
81 instrument_any_to_pyobject(py, data).expect("Failed creating instrument");
82 call_python(py, &callback, py_obj);
83 }),
84 LiveMessage::Status(data) => Python::with_gil(|py| {
85 let py_obj = data.into_py_any_unwrap(py);
86 call_python(py, &callback_pyo3, py_obj);
87 }),
88 LiveMessage::Imbalance(data) => Python::with_gil(|py| {
89 let py_obj = data.into_py_any_unwrap(py);
90 call_python(py, &callback_pyo3, py_obj);
91 }),
92 LiveMessage::Statistics(data) => Python::with_gil(|py| {
93 let py_obj = data.into_py_any_unwrap(py);
94 call_python(py, &callback_pyo3, py_obj);
95 }),
96 LiveMessage::Close => {
97 break;
99 }
100 LiveMessage::Error(e) => {
101 return Err(to_pyruntime_err(e));
103 }
104 }
105 }
106
107 msg_rx.close();
108 tracing::debug!("Closed message receiver");
109
110 Ok(())
111 }
112
113 fn send_command(&self, cmd: LiveCommand) -> PyResult<()> {
114 self.cmd_tx.send(cmd).map_err(to_pyruntime_err)
115 }
116}
117
118fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
119 if let Err(e) = callback.call1(py, (py_obj,)) {
120 tracing::error!("Error calling Python: {e}");
121 }
122}
123
124#[pymethods]
125impl DatabentoLiveClient {
126 #[new]
127 pub fn py_new(key: String, dataset: String, publishers_filepath: PathBuf) -> PyResult<Self> {
128 let publishers_json = fs::read_to_string(publishers_filepath)?;
129 let publishers_vec: Vec<DatabentoPublisher> =
130 serde_json::from_str(&publishers_json).map_err(to_pyvalue_err)?;
131 let publisher_venue_map = publishers_vec
132 .into_iter()
133 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
134 .collect::<IndexMap<u16, Venue>>();
135
136 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<LiveCommand>();
137
138 let buffer_size = 100_000;
140
141 Ok(Self {
142 key,
143 dataset,
144 cmd_tx,
145 cmd_rx: Some(cmd_rx),
146 buffer_size,
147 is_running: false,
148 is_closed: false,
149 publisher_venue_map,
150 symbol_venue_map: Arc::new(RwLock::new(HashMap::new())),
151 })
152 }
153
154 #[pyo3(name = "is_running")]
155 const fn py_is_running(&self) -> bool {
156 self.is_running
157 }
158
159 #[pyo3(name = "is_closed")]
160 const fn py_is_closed(&self) -> bool {
161 self.is_closed
162 }
163
164 #[pyo3(name = "subscribe")]
165 #[pyo3(signature = (schema, instrument_ids, start=None, snapshot=None))]
166 fn py_subscribe(
167 &mut self,
168 schema: String,
169 instrument_ids: Vec<InstrumentId>,
170 start: Option<u64>,
171 snapshot: Option<bool>,
172 ) -> PyResult<()> {
173 let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
174 let symbols: Vec<String> = instrument_ids
175 .iter()
176 .map(|instrument_id| {
177 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
178 })
179 .collect();
180 let stype_in = infer_symbology_type(symbols.first().unwrap());
181 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
182 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
183 let mut sub = Subscription::builder()
184 .symbols(symbols)
185 .schema(dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?)
186 .stype_in(stype_in)
187 .build();
188
189 if let Some(start) = start {
190 let start = OffsetDateTime::from_unix_timestamp_nanos(i128::from(start))
191 .map_err(to_pyvalue_err)?;
192 sub.start = Some(start);
193 }
194 sub.use_snapshot = snapshot.unwrap_or(false);
195
196 self.send_command(LiveCommand::Subscribe(sub))
197 }
198
199 #[pyo3(name = "start")]
200 fn py_start<'py>(
201 &mut self,
202 py: Python<'py>,
203 callback: PyObject,
204 callback_pyo3: PyObject,
205 ) -> PyResult<Bound<'py, PyAny>> {
206 if self.is_closed {
207 return Err(to_pyruntime_err("Client already closed"));
208 }
209 if self.is_running {
210 return Err(to_pyruntime_err("Client already running"));
211 }
212
213 tracing::debug!("Starting client");
214
215 self.is_running = true;
216
217 let (msg_tx, msg_rx) = tokio::sync::mpsc::channel::<LiveMessage>(self.buffer_size);
218
219 let cmd_rx = self.cmd_rx.take().unwrap();
223
224 let mut feed_handler = DatabentoFeedHandler::new(
225 self.key.clone(),
226 self.dataset.clone(),
227 cmd_rx,
228 msg_tx,
229 self.publisher_venue_map.clone(),
230 self.symbol_venue_map.clone(),
231 );
232
233 self.send_command(LiveCommand::Start)?;
234
235 pyo3_async_runtimes::tokio::future_into_py(py, async move {
236 let (proc_handle, feed_handle) = tokio::join!(
237 Self::process_messages(msg_rx, callback, callback_pyo3),
238 feed_handler.run(),
239 );
240
241 match proc_handle {
242 Ok(()) => tracing::debug!("Message processor completed"),
243 Err(e) => tracing::error!("Message processor error: {e}"),
244 }
245
246 match feed_handle {
247 Ok(()) => tracing::debug!("Feed handler completed"),
248 Err(e) => tracing::error!("Feed handler error: {e}"),
249 }
250
251 Ok(())
252 })
253 }
254
255 #[pyo3(name = "close")]
256 fn py_close(&mut self) -> PyResult<()> {
257 if !self.is_running {
258 return Err(to_pyruntime_err("Client never started"));
259 }
260 if self.is_closed {
261 return Err(to_pyruntime_err("Client already closed"));
262 }
263
264 tracing::debug!("Closing client");
265
266 if !self.is_closed() {
267 self.send_command(LiveCommand::Close)?;
268 }
269
270 self.is_running = false;
271 self.is_closed = true;
272
273 Ok(())
274 }
275}