nautilus_databento/python/
live.rs1use std::{
19 fs,
20 path::PathBuf,
21 str::FromStr,
22 sync::{Arc, RwLock},
23};
24
25use ahash::AHashMap;
26use databento::{dbn, live::Subscription};
27use indexmap::IndexMap;
28use nautilus_core::python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err};
29use nautilus_model::{
30 identifiers::{InstrumentId, Symbol, Venue},
31 python::{data::data_to_pycapsule, instruments::instrument_any_to_pyobject},
32};
33use pyo3::prelude::*;
34use time::OffsetDateTime;
35
36use crate::{
37 live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
38 symbology::{check_consistent_symbology, infer_symbology_type, instrument_id_to_symbol_string},
39 types::DatabentoPublisher,
40};
41
42#[cfg_attr(
43 feature = "python",
44 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
45)]
46#[derive(Debug)]
47pub struct DatabentoLiveClient {
48 #[pyo3(get)]
49 pub key: String,
50 #[pyo3(get)]
51 pub dataset: String,
52 is_running: bool,
53 is_closed: bool,
54 cmd_tx: tokio::sync::mpsc::UnboundedSender<LiveCommand>,
55 cmd_rx: Option<tokio::sync::mpsc::UnboundedReceiver<LiveCommand>>,
56 buffer_size: usize,
57 publisher_venue_map: IndexMap<u16, Venue>,
58 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
59 use_exchange_as_venue: bool,
60 bars_timestamp_on_close: bool,
61 reconnect_timeout_mins: Option<u64>,
62}
63
64impl DatabentoLiveClient {
65 #[must_use]
66 pub fn is_closed(&self) -> bool {
67 self.cmd_tx.is_closed()
68 }
69
70 async fn process_messages(
71 mut msg_rx: tokio::sync::mpsc::Receiver<LiveMessage>,
72 callback: Py<PyAny>,
73 callback_pyo3: Py<PyAny>,
74 ) -> PyResult<()> {
75 tracing::debug!("Processing messages...");
76 while let Some(msg) = msg_rx.recv().await {
78 tracing::trace!("Received message: {msg:?}");
79
80 match msg {
81 LiveMessage::Data(data) => Python::attach(|py| {
82 let py_obj = data_to_pycapsule(py, data);
83 call_python(py, &callback, py_obj);
84 }),
85 LiveMessage::Instrument(data) => {
86 Python::attach(|py| match instrument_any_to_pyobject(py, data) {
87 Ok(py_obj) => call_python(py, &callback, py_obj),
88 Err(e) => tracing::error!("Failed creating instrument: {e}"),
89 });
90 }
91 LiveMessage::Status(data) => Python::attach(|py| {
92 let py_obj = data.into_py_any_unwrap(py);
93 call_python(py, &callback_pyo3, py_obj);
94 }),
95 LiveMessage::Imbalance(data) => Python::attach(|py| {
96 let py_obj = data.into_py_any_unwrap(py);
97 call_python(py, &callback_pyo3, py_obj);
98 }),
99 LiveMessage::Statistics(data) => Python::attach(|py| {
100 let py_obj = data.into_py_any_unwrap(py);
101 call_python(py, &callback_pyo3, py_obj);
102 }),
103 LiveMessage::Close => {
104 break;
106 }
107 LiveMessage::Error(e) => {
108 return Err(to_pyruntime_err(e));
110 }
111 }
112 }
113
114 msg_rx.close();
115 tracing::debug!("Closed message receiver");
116
117 Ok(())
118 }
119
120 fn send_command(&self, cmd: LiveCommand) -> PyResult<()> {
121 self.cmd_tx.send(cmd).map_err(to_pyruntime_err)
122 }
123}
124
125fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
126 if let Err(e) = callback.call1(py, (py_obj,)) {
127 if !e.to_string().contains("CancelledError") {
129 tracing::error!("Error calling Python: {e}");
130 }
131 }
132}
133
134#[pymethods]
135impl DatabentoLiveClient {
136 #[new]
140 #[pyo3(signature = (key, dataset, publishers_filepath, use_exchange_as_venue, bars_timestamp_on_close=None, reconnect_timeout_mins=None))]
141 pub fn py_new(
142 key: String,
143 dataset: String,
144 publishers_filepath: PathBuf,
145 use_exchange_as_venue: bool,
146 bars_timestamp_on_close: Option<bool>,
147 reconnect_timeout_mins: Option<i64>,
148 ) -> PyResult<Self> {
149 let publishers_json = fs::read_to_string(publishers_filepath).map_err(to_pyvalue_err)?;
150 let publishers_vec: Vec<DatabentoPublisher> =
151 serde_json::from_str(&publishers_json).map_err(to_pyvalue_err)?;
152 let publisher_venue_map = publishers_vec
153 .into_iter()
154 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
155 .collect::<IndexMap<u16, Venue>>();
156
157 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<LiveCommand>();
158
159 let buffer_size = 100_000;
161
162 let reconnect_timeout_mins = reconnect_timeout_mins
164 .and_then(|mins| if mins >= 0 { Some(mins as u64) } else { None });
165
166 Ok(Self {
167 key,
168 dataset,
169 cmd_tx,
170 cmd_rx: Some(cmd_rx),
171 buffer_size,
172 is_running: false,
173 is_closed: false,
174 publisher_venue_map,
175 symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
176 use_exchange_as_venue,
177 bars_timestamp_on_close: bars_timestamp_on_close.unwrap_or(true),
178 reconnect_timeout_mins,
179 })
180 }
181
182 #[pyo3(name = "is_running")]
183 const fn py_is_running(&self) -> bool {
184 self.is_running
185 }
186
187 #[pyo3(name = "is_closed")]
188 const fn py_is_closed(&self) -> bool {
189 self.is_closed
190 }
191
192 #[pyo3(name = "subscribe")]
193 #[pyo3(signature = (schema, instrument_ids, start=None, snapshot=None))]
194 fn py_subscribe(
195 &mut self,
196 schema: String,
197 instrument_ids: Vec<InstrumentId>,
198 start: Option<u64>,
199 snapshot: Option<bool>,
200 ) -> PyResult<()> {
201 let mut symbol_venue_map = self
202 .symbol_venue_map
203 .write()
204 .map_err(|e| to_pyruntime_err(format!("symbol_venue_map lock poisoned: {e}")))?;
205 let symbols: Vec<String> = instrument_ids
206 .iter()
207 .map(|instrument_id| {
208 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
209 })
210 .collect();
211 let first_symbol = symbols
212 .first()
213 .ok_or_else(|| to_pyvalue_err("No symbols provided"))?;
214 let stype_in = infer_symbology_type(first_symbol);
215 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
216 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
217 let mut sub = Subscription::builder()
218 .symbols(symbols)
219 .schema(dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?)
220 .stype_in(stype_in)
221 .build();
222
223 if let Some(start) = start {
224 let start = OffsetDateTime::from_unix_timestamp_nanos(i128::from(start))
225 .map_err(to_pyvalue_err)?;
226 sub.start = Some(start);
227 }
228 sub.use_snapshot = snapshot.unwrap_or(false);
229
230 self.send_command(LiveCommand::Subscribe(sub))
231 }
232
233 #[pyo3(name = "start")]
234 fn py_start<'py>(
235 &mut self,
236 py: Python<'py>,
237 callback: Py<PyAny>,
238 callback_pyo3: Py<PyAny>,
239 ) -> PyResult<Bound<'py, PyAny>> {
240 if self.is_closed {
241 return Err(to_pyruntime_err("Client already closed"));
242 }
243 if self.is_running {
244 return Err(to_pyruntime_err("Client already running"));
245 }
246
247 tracing::debug!("Starting client");
248
249 self.is_running = true;
250
251 let (msg_tx, msg_rx) = tokio::sync::mpsc::channel::<LiveMessage>(self.buffer_size);
252
253 let cmd_rx = self
257 .cmd_rx
258 .take()
259 .ok_or_else(|| to_pyruntime_err("Command receiver already taken"))?;
260
261 let mut feed_handler = DatabentoFeedHandler::new(
262 self.key.clone(),
263 self.dataset.clone(),
264 cmd_rx,
265 msg_tx,
266 self.publisher_venue_map.clone(),
267 self.symbol_venue_map.clone(),
268 self.use_exchange_as_venue,
269 self.bars_timestamp_on_close,
270 self.reconnect_timeout_mins,
271 );
272
273 self.send_command(LiveCommand::Start)?;
274
275 pyo3_async_runtimes::tokio::future_into_py(py, async move {
276 let (proc_handle, feed_handle) = tokio::join!(
277 Self::process_messages(msg_rx, callback, callback_pyo3),
278 feed_handler.run(),
279 );
280
281 match proc_handle {
282 Ok(()) => tracing::debug!("Message processor completed"),
283 Err(e) => tracing::error!("Message processor error: {e}"),
284 }
285
286 match feed_handle {
287 Ok(()) => tracing::debug!("Feed handler completed"),
288 Err(e) => tracing::error!("Feed handler error: {e}"),
289 }
290
291 Ok(())
292 })
293 }
294
295 #[pyo3(name = "close")]
296 fn py_close(&mut self) -> PyResult<()> {
297 if !self.is_running {
298 return Err(to_pyruntime_err("Client never started"));
299 }
300 if self.is_closed {
301 return Err(to_pyruntime_err("Client already closed"));
302 }
303
304 tracing::debug!("Closing client");
305
306 if !self.is_closed() {
307 self.send_command(LiveCommand::Close)?;
308 }
309
310 self.is_running = false;
311 self.is_closed = true;
312
313 Ok(())
314 }
315}