nautilus_databento/python/
live.rs1use std::{
19 fs,
20 path::PathBuf,
21 str::FromStr,
22 sync::{Arc, RwLock},
23};
24
25use ahash::AHashMap;
26use databento::{dbn, live::Subscription};
27use indexmap::IndexMap;
28use nautilus_core::python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err};
29use nautilus_model::{
30 identifiers::{InstrumentId, Symbol, Venue},
31 python::{data::data_to_pycapsule, instruments::instrument_any_to_pyobject},
32};
33use pyo3::prelude::*;
34use time::OffsetDateTime;
35
36use crate::{
37 live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
38 symbology::{check_consistent_symbology, infer_symbology_type, instrument_id_to_symbol_string},
39 types::DatabentoPublisher,
40};
41
42#[cfg_attr(
43 feature = "python",
44 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
45)]
46#[derive(Debug)]
47pub struct DatabentoLiveClient {
48 #[pyo3(get)]
49 pub key: String,
50 #[pyo3(get)]
51 pub dataset: String,
52 is_running: bool,
53 is_closed: bool,
54 cmd_tx: tokio::sync::mpsc::UnboundedSender<LiveCommand>,
55 cmd_rx: Option<tokio::sync::mpsc::UnboundedReceiver<LiveCommand>>,
56 buffer_size: usize,
57 publisher_venue_map: IndexMap<u16, Venue>,
58 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
59 use_exchange_as_venue: bool,
60 bars_timestamp_on_close: bool,
61}
62
63impl DatabentoLiveClient {
64 #[must_use]
65 pub fn is_closed(&self) -> bool {
66 self.cmd_tx.is_closed()
67 }
68
69 async fn process_messages(
70 mut msg_rx: tokio::sync::mpsc::Receiver<LiveMessage>,
71 callback: PyObject,
72 callback_pyo3: PyObject,
73 ) -> PyResult<()> {
74 tracing::debug!("Processing messages...");
75 while let Some(msg) = msg_rx.recv().await {
77 tracing::trace!("Received message: {msg:?}");
78
79 match msg {
80 LiveMessage::Data(data) => Python::with_gil(|py| {
81 let py_obj = data_to_pycapsule(py, data);
82 call_python(py, &callback, py_obj);
83 }),
84 LiveMessage::Instrument(data) => {
85 Python::with_gil(|py| match instrument_any_to_pyobject(py, data) {
86 Ok(py_obj) => call_python(py, &callback, py_obj),
87 Err(e) => tracing::error!("Failed creating instrument: {e}"),
88 });
89 }
90 LiveMessage::Status(data) => Python::with_gil(|py| {
91 let py_obj = data.into_py_any_unwrap(py);
92 call_python(py, &callback_pyo3, py_obj);
93 }),
94 LiveMessage::Imbalance(data) => Python::with_gil(|py| {
95 let py_obj = data.into_py_any_unwrap(py);
96 call_python(py, &callback_pyo3, py_obj);
97 }),
98 LiveMessage::Statistics(data) => Python::with_gil(|py| {
99 let py_obj = data.into_py_any_unwrap(py);
100 call_python(py, &callback_pyo3, py_obj);
101 }),
102 LiveMessage::Close => {
103 break;
105 }
106 LiveMessage::Error(e) => {
107 return Err(to_pyruntime_err(e));
109 }
110 }
111 }
112
113 msg_rx.close();
114 tracing::debug!("Closed message receiver");
115
116 Ok(())
117 }
118
119 fn send_command(&self, cmd: LiveCommand) -> PyResult<()> {
120 self.cmd_tx.send(cmd).map_err(to_pyruntime_err)
121 }
122}
123
124fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
125 if let Err(e) = callback.call1(py, (py_obj,)) {
126 if !e.to_string().contains("CancelledError") {
128 tracing::error!("Error calling Python: {e}");
129 }
130 }
131}
132
133#[pymethods]
134impl DatabentoLiveClient {
135 #[new]
139 pub fn py_new(
140 key: String,
141 dataset: String,
142 publishers_filepath: PathBuf,
143 use_exchange_as_venue: bool,
144 bars_timestamp_on_close: Option<bool>,
145 ) -> PyResult<Self> {
146 let publishers_json = fs::read_to_string(publishers_filepath).map_err(to_pyvalue_err)?;
147 let publishers_vec: Vec<DatabentoPublisher> =
148 serde_json::from_str(&publishers_json).map_err(to_pyvalue_err)?;
149 let publisher_venue_map = publishers_vec
150 .into_iter()
151 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
152 .collect::<IndexMap<u16, Venue>>();
153
154 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<LiveCommand>();
155
156 let buffer_size = 100_000;
158
159 Ok(Self {
160 key,
161 dataset,
162 cmd_tx,
163 cmd_rx: Some(cmd_rx),
164 buffer_size,
165 is_running: false,
166 is_closed: false,
167 publisher_venue_map,
168 symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
169 use_exchange_as_venue,
170 bars_timestamp_on_close: bars_timestamp_on_close.unwrap_or(true),
171 })
172 }
173
174 #[pyo3(name = "is_running")]
175 const fn py_is_running(&self) -> bool {
176 self.is_running
177 }
178
179 #[pyo3(name = "is_closed")]
180 const fn py_is_closed(&self) -> bool {
181 self.is_closed
182 }
183
184 #[pyo3(name = "subscribe")]
185 #[pyo3(signature = (schema, instrument_ids, start=None, snapshot=None))]
186 fn py_subscribe(
187 &mut self,
188 schema: String,
189 instrument_ids: Vec<InstrumentId>,
190 start: Option<u64>,
191 snapshot: Option<bool>,
192 ) -> PyResult<()> {
193 let mut symbol_venue_map = self
194 .symbol_venue_map
195 .write()
196 .map_err(|e| to_pyruntime_err(format!("symbol_venue_map lock poisoned: {e}")))?;
197 let symbols: Vec<String> = instrument_ids
198 .iter()
199 .map(|instrument_id| {
200 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
201 })
202 .collect();
203 let first_symbol = symbols
204 .first()
205 .ok_or_else(|| to_pyvalue_err("No symbols provided"))?;
206 let stype_in = infer_symbology_type(first_symbol);
207 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
208 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
209 let mut sub = Subscription::builder()
210 .symbols(symbols)
211 .schema(dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?)
212 .stype_in(stype_in)
213 .build();
214
215 if let Some(start) = start {
216 let start = OffsetDateTime::from_unix_timestamp_nanos(i128::from(start))
217 .map_err(to_pyvalue_err)?;
218 sub.start = Some(start);
219 }
220 sub.use_snapshot = snapshot.unwrap_or(false);
221
222 self.send_command(LiveCommand::Subscribe(sub))
223 }
224
225 #[pyo3(name = "start")]
226 fn py_start<'py>(
227 &mut self,
228 py: Python<'py>,
229 callback: PyObject,
230 callback_pyo3: PyObject,
231 ) -> PyResult<Bound<'py, PyAny>> {
232 if self.is_closed {
233 return Err(to_pyruntime_err("Client already closed"));
234 }
235 if self.is_running {
236 return Err(to_pyruntime_err("Client already running"));
237 }
238
239 tracing::debug!("Starting client");
240
241 self.is_running = true;
242
243 let (msg_tx, msg_rx) = tokio::sync::mpsc::channel::<LiveMessage>(self.buffer_size);
244
245 let cmd_rx = self
249 .cmd_rx
250 .take()
251 .ok_or_else(|| to_pyruntime_err("Command receiver already taken"))?;
252
253 let mut feed_handler = DatabentoFeedHandler::new(
254 self.key.clone(),
255 self.dataset.clone(),
256 cmd_rx,
257 msg_tx,
258 self.publisher_venue_map.clone(),
259 self.symbol_venue_map.clone(),
260 self.use_exchange_as_venue,
261 self.bars_timestamp_on_close,
262 );
263
264 self.send_command(LiveCommand::Start)?;
265
266 pyo3_async_runtimes::tokio::future_into_py(py, async move {
267 let (proc_handle, feed_handle) = tokio::join!(
268 Self::process_messages(msg_rx, callback, callback_pyo3),
269 feed_handler.run(),
270 );
271
272 match proc_handle {
273 Ok(()) => tracing::debug!("Message processor completed"),
274 Err(e) => tracing::error!("Message processor error: {e}"),
275 }
276
277 match feed_handle {
278 Ok(()) => tracing::debug!("Feed handler completed"),
279 Err(e) => tracing::error!("Feed handler error: {e}"),
280 }
281
282 Ok(())
283 })
284 }
285
286 #[pyo3(name = "close")]
287 fn py_close(&mut self) -> PyResult<()> {
288 if !self.is_running {
289 return Err(to_pyruntime_err("Client never started"));
290 }
291 if self.is_closed {
292 return Err(to_pyruntime_err("Client already closed"));
293 }
294
295 tracing::debug!("Closing client");
296
297 if !self.is_closed() {
298 self.send_command(LiveCommand::Close)?;
299 }
300
301 self.is_running = false;
302 self.is_closed = true;
303
304 Ok(())
305 }
306}