nautilus_databento/python/
live.rs
1use std::{
17 collections::HashMap,
18 fs,
19 path::PathBuf,
20 str::FromStr,
21 sync::{Arc, RwLock},
22};
23
24use databento::{dbn, live::Subscription};
25use indexmap::IndexMap;
26use nautilus_core::python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err};
27use nautilus_model::{
28 identifiers::{InstrumentId, Symbol, Venue},
29 python::{data::data_to_pycapsule, instruments::instrument_any_to_pyobject},
30};
31use pyo3::prelude::*;
32use time::OffsetDateTime;
33
34use crate::{
35 live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
36 symbology::{check_consistent_symbology, infer_symbology_type, instrument_id_to_symbol_string},
37 types::DatabentoPublisher,
38};
39
40#[cfg_attr(
41 feature = "python",
42 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
43)]
44#[derive(Debug)]
45pub struct DatabentoLiveClient {
46 #[pyo3(get)]
47 pub key: String,
48 #[pyo3(get)]
49 pub dataset: String,
50 is_running: bool,
51 is_closed: bool,
52 cmd_tx: tokio::sync::mpsc::UnboundedSender<LiveCommand>,
53 cmd_rx: Option<tokio::sync::mpsc::UnboundedReceiver<LiveCommand>>,
54 buffer_size: usize,
55 publisher_venue_map: IndexMap<u16, Venue>,
56 symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
57 use_exchange_as_venue: bool,
58}
59
60impl DatabentoLiveClient {
61 #[must_use]
62 pub fn is_closed(&self) -> bool {
63 self.cmd_tx.is_closed()
64 }
65
66 async fn process_messages(
67 mut msg_rx: tokio::sync::mpsc::Receiver<LiveMessage>,
68 callback: PyObject,
69 callback_pyo3: PyObject,
70 ) -> PyResult<()> {
71 tracing::debug!("Processing messages...");
72 while let Some(msg) = msg_rx.recv().await {
74 tracing::trace!("Received message: {msg:?}");
75 match msg {
76 LiveMessage::Data(data) => Python::with_gil(|py| {
77 let py_obj = data_to_pycapsule(py, data);
78 call_python(py, &callback, py_obj);
79 }),
80 LiveMessage::Instrument(data) => Python::with_gil(|py| {
81 let py_obj =
82 instrument_any_to_pyobject(py, data).expect("Failed creating instrument");
83 call_python(py, &callback, py_obj);
84 }),
85 LiveMessage::Status(data) => Python::with_gil(|py| {
86 let py_obj = data.into_py_any_unwrap(py);
87 call_python(py, &callback_pyo3, py_obj);
88 }),
89 LiveMessage::Imbalance(data) => Python::with_gil(|py| {
90 let py_obj = data.into_py_any_unwrap(py);
91 call_python(py, &callback_pyo3, py_obj);
92 }),
93 LiveMessage::Statistics(data) => Python::with_gil(|py| {
94 let py_obj = data.into_py_any_unwrap(py);
95 call_python(py, &callback_pyo3, py_obj);
96 }),
97 LiveMessage::Close => {
98 break;
100 }
101 LiveMessage::Error(e) => {
102 return Err(to_pyruntime_err(e));
104 }
105 }
106 }
107
108 msg_rx.close();
109 tracing::debug!("Closed message receiver");
110
111 Ok(())
112 }
113
114 fn send_command(&self, cmd: LiveCommand) -> PyResult<()> {
115 self.cmd_tx.send(cmd).map_err(to_pyruntime_err)
116 }
117}
118
119fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
120 if let Err(e) = callback.call1(py, (py_obj,)) {
121 if !e.to_string().contains("CancelledError") {
123 tracing::error!("Error calling Python: {e}");
124 }
125 }
126}
127
128#[pymethods]
129impl DatabentoLiveClient {
130 #[new]
131 pub fn py_new(
132 key: String,
133 dataset: String,
134 publishers_filepath: PathBuf,
135 use_exchange_as_venue: bool,
136 ) -> PyResult<Self> {
137 let publishers_json = fs::read_to_string(publishers_filepath)?;
138 let publishers_vec: Vec<DatabentoPublisher> =
139 serde_json::from_str(&publishers_json).map_err(to_pyvalue_err)?;
140 let publisher_venue_map = publishers_vec
141 .into_iter()
142 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
143 .collect::<IndexMap<u16, Venue>>();
144
145 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<LiveCommand>();
146
147 let buffer_size = 100_000;
149
150 Ok(Self {
151 key,
152 dataset,
153 cmd_tx,
154 cmd_rx: Some(cmd_rx),
155 buffer_size,
156 is_running: false,
157 is_closed: false,
158 publisher_venue_map,
159 symbol_venue_map: Arc::new(RwLock::new(HashMap::new())),
160 use_exchange_as_venue,
161 })
162 }
163
164 #[pyo3(name = "is_running")]
165 const fn py_is_running(&self) -> bool {
166 self.is_running
167 }
168
169 #[pyo3(name = "is_closed")]
170 const fn py_is_closed(&self) -> bool {
171 self.is_closed
172 }
173
174 #[pyo3(name = "subscribe")]
175 #[pyo3(signature = (schema, instrument_ids, start=None, snapshot=None))]
176 fn py_subscribe(
177 &mut self,
178 schema: String,
179 instrument_ids: Vec<InstrumentId>,
180 start: Option<u64>,
181 snapshot: Option<bool>,
182 ) -> PyResult<()> {
183 let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
184 let symbols: Vec<String> = instrument_ids
185 .iter()
186 .map(|instrument_id| {
187 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
188 })
189 .collect();
190 let stype_in = infer_symbology_type(symbols.first().unwrap());
191 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
192 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
193 let mut sub = Subscription::builder()
194 .symbols(symbols)
195 .schema(dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?)
196 .stype_in(stype_in)
197 .build();
198
199 if let Some(start) = start {
200 let start = OffsetDateTime::from_unix_timestamp_nanos(i128::from(start))
201 .map_err(to_pyvalue_err)?;
202 sub.start = Some(start);
203 }
204 sub.use_snapshot = snapshot.unwrap_or(false);
205
206 self.send_command(LiveCommand::Subscribe(sub))
207 }
208
209 #[pyo3(name = "start")]
210 fn py_start<'py>(
211 &mut self,
212 py: Python<'py>,
213 callback: PyObject,
214 callback_pyo3: PyObject,
215 ) -> PyResult<Bound<'py, PyAny>> {
216 if self.is_closed {
217 return Err(to_pyruntime_err("Client already closed"));
218 }
219 if self.is_running {
220 return Err(to_pyruntime_err("Client already running"));
221 }
222
223 tracing::debug!("Starting client");
224
225 self.is_running = true;
226
227 let (msg_tx, msg_rx) = tokio::sync::mpsc::channel::<LiveMessage>(self.buffer_size);
228
229 let cmd_rx = self.cmd_rx.take().unwrap();
233
234 let mut feed_handler = DatabentoFeedHandler::new(
235 self.key.clone(),
236 self.dataset.clone(),
237 cmd_rx,
238 msg_tx,
239 self.publisher_venue_map.clone(),
240 self.symbol_venue_map.clone(),
241 self.use_exchange_as_venue,
242 );
243
244 self.send_command(LiveCommand::Start)?;
245
246 pyo3_async_runtimes::tokio::future_into_py(py, async move {
247 let (proc_handle, feed_handle) = tokio::join!(
248 Self::process_messages(msg_rx, callback, callback_pyo3),
249 feed_handler.run(),
250 );
251
252 match proc_handle {
253 Ok(()) => tracing::debug!("Message processor completed"),
254 Err(e) => tracing::error!("Message processor error: {e}"),
255 }
256
257 match feed_handle {
258 Ok(()) => tracing::debug!("Feed handler completed"),
259 Err(e) => tracing::error!("Feed handler error: {e}"),
260 }
261
262 Ok(())
263 })
264 }
265
266 #[pyo3(name = "close")]
267 fn py_close(&mut self) -> PyResult<()> {
268 if !self.is_running {
269 return Err(to_pyruntime_err("Client never started"));
270 }
271 if self.is_closed {
272 return Err(to_pyruntime_err("Client already closed"));
273 }
274
275 tracing::debug!("Closing client");
276
277 if !self.is_closed() {
278 self.send_command(LiveCommand::Close)?;
279 }
280
281 self.is_running = false;
282 self.is_closed = true;
283
284 Ok(())
285 }
286}