nautilus_databento/python/
live.rs1use std::{
19 fs,
20 path::PathBuf,
21 str::FromStr,
22 sync::{Arc, RwLock},
23};
24
25use ahash::AHashMap;
26use databento::{dbn, live::Subscription};
27use indexmap::IndexMap;
28use nautilus_core::python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err};
29use nautilus_model::{
30 identifiers::{InstrumentId, Symbol, Venue},
31 python::{data::data_to_pycapsule, instruments::instrument_any_to_pyobject},
32};
33use pyo3::prelude::*;
34use time::OffsetDateTime;
35
36use super::types::DatabentoSubscriptionAck;
37use crate::{
38 live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
39 symbology::{check_consistent_symbology, infer_symbology_type, instrument_id_to_symbol_string},
40 types::DatabentoPublisher,
41};
42
43#[cfg_attr(
44 feature = "python",
45 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
46)]
47#[derive(Debug)]
48pub struct DatabentoLiveClient {
49 #[pyo3(get)]
50 pub key: String,
51 #[pyo3(get)]
52 pub dataset: String,
53 is_running: bool,
54 is_closed: bool,
55 cmd_tx: tokio::sync::mpsc::UnboundedSender<LiveCommand>,
56 cmd_rx: Option<tokio::sync::mpsc::UnboundedReceiver<LiveCommand>>,
57 buffer_size: usize,
58 publisher_venue_map: IndexMap<u16, Venue>,
59 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
60 use_exchange_as_venue: bool,
61 bars_timestamp_on_close: bool,
62 reconnect_timeout_mins: Option<u64>,
63}
64
65impl DatabentoLiveClient {
66 #[must_use]
67 pub fn is_closed(&self) -> bool {
68 self.cmd_tx.is_closed()
69 }
70
71 async fn process_messages(
72 mut msg_rx: tokio::sync::mpsc::Receiver<LiveMessage>,
73 callback: Py<PyAny>,
74 callback_pyo3: Py<PyAny>,
75 ) -> PyResult<()> {
76 log::debug!("Processing messages...");
77 while let Some(msg) = msg_rx.recv().await {
79 log::trace!("Received message: {msg:?}");
80
81 match msg {
82 LiveMessage::Data(data) => Python::attach(|py| {
83 let py_obj = data_to_pycapsule(py, data);
84 call_python(py, &callback, py_obj);
85 }),
86 LiveMessage::Instrument(data) => {
87 Python::attach(|py| match instrument_any_to_pyobject(py, data) {
88 Ok(py_obj) => call_python(py, &callback, py_obj),
89 Err(e) => log::error!("Failed creating instrument: {e}"),
90 });
91 }
92 LiveMessage::Status(data) => Python::attach(|py| {
93 let py_obj = data.into_py_any_unwrap(py);
94 call_python(py, &callback_pyo3, py_obj);
95 }),
96 LiveMessage::Imbalance(data) => Python::attach(|py| {
97 let py_obj = data.into_py_any_unwrap(py);
98 call_python(py, &callback_pyo3, py_obj);
99 }),
100 LiveMessage::Statistics(data) => Python::attach(|py| {
101 let py_obj = data.into_py_any_unwrap(py);
102 call_python(py, &callback_pyo3, py_obj);
103 }),
104 LiveMessage::SubscriptionAck(ack) => Python::attach(|py| {
105 let py_obj: DatabentoSubscriptionAck = ack.into();
106 let py_obj = py_obj.into_py_any_unwrap(py);
107 call_python(py, &callback_pyo3, py_obj);
108 }),
109 LiveMessage::Close => {
110 break;
112 }
113 LiveMessage::Error(e) => {
114 return Err(to_pyruntime_err(e));
116 }
117 }
118 }
119
120 msg_rx.close();
121 log::debug!("Closed message receiver");
122
123 Ok(())
124 }
125
126 fn send_command(&self, cmd: LiveCommand) -> PyResult<()> {
127 self.cmd_tx.send(cmd).map_err(to_pyruntime_err)
128 }
129}
130
131fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
132 if let Err(e) = callback.call1(py, (py_obj,)) {
133 if !e.to_string().contains("CancelledError") {
135 log::error!("Error calling Python: {e}");
136 }
137 }
138}
139
140#[pymethods]
141impl DatabentoLiveClient {
142 #[new]
146 #[pyo3(signature = (key, dataset, publishers_filepath, use_exchange_as_venue, bars_timestamp_on_close=None, reconnect_timeout_mins=None))]
147 pub fn py_new(
148 key: String,
149 dataset: String,
150 publishers_filepath: PathBuf,
151 use_exchange_as_venue: bool,
152 bars_timestamp_on_close: Option<bool>,
153 reconnect_timeout_mins: Option<i64>,
154 ) -> PyResult<Self> {
155 let publishers_json = fs::read_to_string(publishers_filepath).map_err(to_pyvalue_err)?;
156 let publishers_vec: Vec<DatabentoPublisher> =
157 serde_json::from_str(&publishers_json).map_err(to_pyvalue_err)?;
158 let publisher_venue_map = publishers_vec
159 .into_iter()
160 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
161 .collect::<IndexMap<u16, Venue>>();
162
163 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<LiveCommand>();
164
165 let buffer_size = 100_000;
167
168 let reconnect_timeout_mins = reconnect_timeout_mins
170 .and_then(|mins| if mins >= 0 { Some(mins as u64) } else { None });
171
172 Ok(Self {
173 key,
174 dataset,
175 cmd_tx,
176 cmd_rx: Some(cmd_rx),
177 buffer_size,
178 is_running: false,
179 is_closed: false,
180 publisher_venue_map,
181 symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
182 use_exchange_as_venue,
183 bars_timestamp_on_close: bars_timestamp_on_close.unwrap_or(true),
184 reconnect_timeout_mins,
185 })
186 }
187
188 #[pyo3(name = "is_running")]
189 const fn py_is_running(&self) -> bool {
190 self.is_running
191 }
192
193 #[pyo3(name = "is_closed")]
194 const fn py_is_closed(&self) -> bool {
195 self.is_closed
196 }
197
198 #[pyo3(name = "subscribe")]
199 #[pyo3(signature = (schema, instrument_ids, start=None, snapshot=None))]
200 fn py_subscribe(
201 &mut self,
202 schema: String,
203 instrument_ids: Vec<InstrumentId>,
204 start: Option<u64>,
205 snapshot: Option<bool>,
206 ) -> PyResult<()> {
207 let mut symbol_venue_map = self
208 .symbol_venue_map
209 .write()
210 .map_err(|e| to_pyruntime_err(format!("symbol_venue_map lock poisoned: {e}")))?;
211 let symbols: Vec<String> = instrument_ids
212 .iter()
213 .map(|instrument_id| {
214 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
215 })
216 .collect();
217 let first_symbol = symbols
218 .first()
219 .ok_or_else(|| to_pyvalue_err("No symbols provided"))?;
220 let stype_in = infer_symbology_type(first_symbol);
221 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
222 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
223 let mut sub = Subscription::builder()
224 .symbols(symbols)
225 .schema(dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?)
226 .stype_in(stype_in)
227 .build();
228
229 if let Some(start) = start {
230 let start = OffsetDateTime::from_unix_timestamp_nanos(i128::from(start))
231 .map_err(to_pyvalue_err)?;
232 sub.start = Some(start);
233 }
234 sub.use_snapshot = snapshot.unwrap_or(false);
235
236 self.send_command(LiveCommand::Subscribe(sub))
237 }
238
239 #[pyo3(name = "start")]
240 fn py_start<'py>(
241 &mut self,
242 py: Python<'py>,
243 callback: Py<PyAny>,
244 callback_pyo3: Py<PyAny>,
245 ) -> PyResult<Bound<'py, PyAny>> {
246 if self.is_closed {
247 return Err(to_pyruntime_err("Client already closed"));
248 }
249 if self.is_running {
250 return Err(to_pyruntime_err("Client already running"));
251 }
252
253 log::debug!("Starting client");
254
255 self.is_running = true;
256
257 let (msg_tx, msg_rx) = tokio::sync::mpsc::channel::<LiveMessage>(self.buffer_size);
258
259 let cmd_rx = self
263 .cmd_rx
264 .take()
265 .ok_or_else(|| to_pyruntime_err("Command receiver already taken"))?;
266
267 let mut feed_handler = DatabentoFeedHandler::new(
268 self.key.clone(),
269 self.dataset.clone(),
270 cmd_rx,
271 msg_tx,
272 self.publisher_venue_map.clone(),
273 self.symbol_venue_map.clone(),
274 self.use_exchange_as_venue,
275 self.bars_timestamp_on_close,
276 self.reconnect_timeout_mins,
277 );
278
279 self.send_command(LiveCommand::Start)?;
280
281 pyo3_async_runtimes::tokio::future_into_py(py, async move {
282 let (proc_handle, feed_handle) = tokio::join!(
283 Self::process_messages(msg_rx, callback, callback_pyo3),
284 feed_handler.run(),
285 );
286
287 match proc_handle {
288 Ok(()) => log::debug!("Message processor completed"),
289 Err(e) => log::error!("Message processor error: {e}"),
290 }
291
292 match feed_handle {
293 Ok(()) => log::debug!("Feed handler completed"),
294 Err(e) => log::error!("Feed handler error: {e}"),
295 }
296
297 Ok(())
298 })
299 }
300
301 #[pyo3(name = "close")]
302 fn py_close(&mut self) -> PyResult<()> {
303 if !self.is_running {
304 return Err(to_pyruntime_err("Client never started"));
305 }
306 if self.is_closed {
307 return Err(to_pyruntime_err("Client already closed"));
308 }
309
310 log::debug!("Closing client");
311
312 if !self.is_closed() {
313 self.send_command(LiveCommand::Close)?;
314 }
315
316 self.is_running = false;
317 self.is_closed = true;
318
319 Ok(())
320 }
321}