nautilus_databento/python/
live.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Databento live client.
17
18use std::{
19    fs,
20    path::PathBuf,
21    str::FromStr,
22    sync::{Arc, RwLock},
23};
24
25use ahash::AHashMap;
26use databento::{dbn, live::Subscription};
27use indexmap::IndexMap;
28use nautilus_core::python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err};
29use nautilus_model::{
30    identifiers::{InstrumentId, Symbol, Venue},
31    python::{data::data_to_pycapsule, instruments::instrument_any_to_pyobject},
32};
33use pyo3::prelude::*;
34use time::OffsetDateTime;
35
36use crate::{
37    live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
38    symbology::{check_consistent_symbology, infer_symbology_type, instrument_id_to_symbol_string},
39    types::DatabentoPublisher,
40};
41
42#[cfg_attr(
43    feature = "python",
44    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
45)]
46#[derive(Debug)]
47pub struct DatabentoLiveClient {
48    #[pyo3(get)]
49    pub key: String,
50    #[pyo3(get)]
51    pub dataset: String,
52    is_running: bool,
53    is_closed: bool,
54    cmd_tx: tokio::sync::mpsc::UnboundedSender<LiveCommand>,
55    cmd_rx: Option<tokio::sync::mpsc::UnboundedReceiver<LiveCommand>>,
56    buffer_size: usize,
57    publisher_venue_map: IndexMap<u16, Venue>,
58    symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
59    use_exchange_as_venue: bool,
60    bars_timestamp_on_close: bool,
61    reconnect_timeout_mins: Option<u64>,
62}
63
64impl DatabentoLiveClient {
65    #[must_use]
66    pub fn is_closed(&self) -> bool {
67        self.cmd_tx.is_closed()
68    }
69
70    async fn process_messages(
71        mut msg_rx: tokio::sync::mpsc::Receiver<LiveMessage>,
72        callback: Py<PyAny>,
73        callback_pyo3: Py<PyAny>,
74    ) -> PyResult<()> {
75        tracing::debug!("Processing messages...");
76        // Continue to process messages until channel is hung up
77        while let Some(msg) = msg_rx.recv().await {
78            tracing::trace!("Received message: {msg:?}");
79
80            match msg {
81                LiveMessage::Data(data) => Python::attach(|py| {
82                    let py_obj = data_to_pycapsule(py, data);
83                    call_python(py, &callback, py_obj);
84                }),
85                LiveMessage::Instrument(data) => {
86                    Python::attach(|py| match instrument_any_to_pyobject(py, data) {
87                        Ok(py_obj) => call_python(py, &callback, py_obj),
88                        Err(e) => tracing::error!("Failed creating instrument: {e}"),
89                    });
90                }
91                LiveMessage::Status(data) => Python::attach(|py| {
92                    let py_obj = data.into_py_any_unwrap(py);
93                    call_python(py, &callback_pyo3, py_obj);
94                }),
95                LiveMessage::Imbalance(data) => Python::attach(|py| {
96                    let py_obj = data.into_py_any_unwrap(py);
97                    call_python(py, &callback_pyo3, py_obj);
98                }),
99                LiveMessage::Statistics(data) => Python::attach(|py| {
100                    let py_obj = data.into_py_any_unwrap(py);
101                    call_python(py, &callback_pyo3, py_obj);
102                }),
103                LiveMessage::Close => {
104                    // Graceful close
105                    break;
106                }
107                LiveMessage::Error(e) => {
108                    // Return error to Python
109                    return Err(to_pyruntime_err(e));
110                }
111            }
112        }
113
114        msg_rx.close();
115        tracing::debug!("Closed message receiver");
116
117        Ok(())
118    }
119
120    fn send_command(&self, cmd: LiveCommand) -> PyResult<()> {
121        self.cmd_tx.send(cmd).map_err(to_pyruntime_err)
122    }
123}
124
125fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
126    if let Err(e) = callback.call1(py, (py_obj,)) {
127        // TODO: Improve this by checking for the actual exception type
128        if !e.to_string().contains("CancelledError") {
129            tracing::error!("Error calling Python: {e}");
130        }
131    }
132}
133
134#[pymethods]
135impl DatabentoLiveClient {
136    /// # Errors
137    ///
138    /// Returns a `PyErr` if reading or parsing the publishers file fails.
139    #[new]
140    #[pyo3(signature = (key, dataset, publishers_filepath, use_exchange_as_venue, bars_timestamp_on_close=None, reconnect_timeout_mins=None))]
141    pub fn py_new(
142        key: String,
143        dataset: String,
144        publishers_filepath: PathBuf,
145        use_exchange_as_venue: bool,
146        bars_timestamp_on_close: Option<bool>,
147        reconnect_timeout_mins: Option<i64>,
148    ) -> PyResult<Self> {
149        let publishers_json = fs::read_to_string(publishers_filepath).map_err(to_pyvalue_err)?;
150        let publishers_vec: Vec<DatabentoPublisher> =
151            serde_json::from_str(&publishers_json).map_err(to_pyvalue_err)?;
152        let publisher_venue_map = publishers_vec
153            .into_iter()
154            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
155            .collect::<IndexMap<u16, Venue>>();
156
157        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<LiveCommand>();
158
159        // Hardcoded to a reasonable size for now
160        let buffer_size = 100_000;
161
162        // Convert i64 to u64: None/negative = infinite retries, 0 = no retries, positive = timeout in minutes
163        let reconnect_timeout_mins = reconnect_timeout_mins
164            .and_then(|mins| if mins >= 0 { Some(mins as u64) } else { None });
165
166        Ok(Self {
167            key,
168            dataset,
169            cmd_tx,
170            cmd_rx: Some(cmd_rx),
171            buffer_size,
172            is_running: false,
173            is_closed: false,
174            publisher_venue_map,
175            symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
176            use_exchange_as_venue,
177            bars_timestamp_on_close: bars_timestamp_on_close.unwrap_or(true),
178            reconnect_timeout_mins,
179        })
180    }
181
182    #[pyo3(name = "is_running")]
183    const fn py_is_running(&self) -> bool {
184        self.is_running
185    }
186
187    #[pyo3(name = "is_closed")]
188    const fn py_is_closed(&self) -> bool {
189        self.is_closed
190    }
191
192    #[pyo3(name = "subscribe")]
193    #[pyo3(signature = (schema, instrument_ids, start=None, snapshot=None))]
194    fn py_subscribe(
195        &mut self,
196        schema: String,
197        instrument_ids: Vec<InstrumentId>,
198        start: Option<u64>,
199        snapshot: Option<bool>,
200    ) -> PyResult<()> {
201        let mut symbol_venue_map = self
202            .symbol_venue_map
203            .write()
204            .map_err(|e| to_pyruntime_err(format!("symbol_venue_map lock poisoned: {e}")))?;
205        let symbols: Vec<String> = instrument_ids
206            .iter()
207            .map(|instrument_id| {
208                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
209            })
210            .collect();
211        let first_symbol = symbols
212            .first()
213            .ok_or_else(|| to_pyvalue_err("No symbols provided"))?;
214        let stype_in = infer_symbology_type(first_symbol);
215        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
216        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
217        let mut sub = Subscription::builder()
218            .symbols(symbols)
219            .schema(dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?)
220            .stype_in(stype_in)
221            .build();
222
223        if let Some(start) = start {
224            let start = OffsetDateTime::from_unix_timestamp_nanos(i128::from(start))
225                .map_err(to_pyvalue_err)?;
226            sub.start = Some(start);
227        }
228        sub.use_snapshot = snapshot.unwrap_or(false);
229
230        self.send_command(LiveCommand::Subscribe(sub))
231    }
232
233    #[pyo3(name = "start")]
234    fn py_start<'py>(
235        &mut self,
236        py: Python<'py>,
237        callback: Py<PyAny>,
238        callback_pyo3: Py<PyAny>,
239    ) -> PyResult<Bound<'py, PyAny>> {
240        if self.is_closed {
241            return Err(to_pyruntime_err("Client already closed"));
242        }
243        if self.is_running {
244            return Err(to_pyruntime_err("Client already running"));
245        }
246
247        tracing::debug!("Starting client");
248
249        self.is_running = true;
250
251        let (msg_tx, msg_rx) = tokio::sync::mpsc::channel::<LiveMessage>(self.buffer_size);
252
253        // Consume the receiver
254        // SAFETY: We guard the client from being started more than once with the
255        // `is_running` flag, so here it is safe to unwrap the command receiver.
256        let cmd_rx = self
257            .cmd_rx
258            .take()
259            .ok_or_else(|| to_pyruntime_err("Command receiver already taken"))?;
260
261        let mut feed_handler = DatabentoFeedHandler::new(
262            self.key.clone(),
263            self.dataset.clone(),
264            cmd_rx,
265            msg_tx,
266            self.publisher_venue_map.clone(),
267            self.symbol_venue_map.clone(),
268            self.use_exchange_as_venue,
269            self.bars_timestamp_on_close,
270            self.reconnect_timeout_mins,
271        );
272
273        self.send_command(LiveCommand::Start)?;
274
275        pyo3_async_runtimes::tokio::future_into_py(py, async move {
276            let (proc_handle, feed_handle) = tokio::join!(
277                Self::process_messages(msg_rx, callback, callback_pyo3),
278                feed_handler.run(),
279            );
280
281            match proc_handle {
282                Ok(()) => tracing::debug!("Message processor completed"),
283                Err(e) => tracing::error!("Message processor error: {e}"),
284            }
285
286            match feed_handle {
287                Ok(()) => tracing::debug!("Feed handler completed"),
288                Err(e) => tracing::error!("Feed handler error: {e}"),
289            }
290
291            Ok(())
292        })
293    }
294
295    #[pyo3(name = "close")]
296    fn py_close(&mut self) -> PyResult<()> {
297        if !self.is_running {
298            return Err(to_pyruntime_err("Client never started"));
299        }
300        if self.is_closed {
301            return Err(to_pyruntime_err("Client already closed"));
302        }
303
304        tracing::debug!("Closing client");
305
306        if !self.is_closed() {
307            self.send_command(LiveCommand::Close)?;
308        }
309
310        self.is_running = false;
311        self.is_closed = true;
312
313        Ok(())
314    }
315}