nautilus_databento/python/
live.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Databento live client.
17
18use std::{
19    fs,
20    path::PathBuf,
21    str::FromStr,
22    sync::{Arc, RwLock},
23};
24
25use ahash::AHashMap;
26use databento::{dbn, live::Subscription};
27use indexmap::IndexMap;
28use nautilus_core::python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err};
29use nautilus_model::{
30    identifiers::{InstrumentId, Symbol, Venue},
31    python::{data::data_to_pycapsule, instruments::instrument_any_to_pyobject},
32};
33use pyo3::prelude::*;
34use time::OffsetDateTime;
35
36use super::types::DatabentoSubscriptionAck;
37use crate::{
38    live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
39    symbology::{check_consistent_symbology, infer_symbology_type, instrument_id_to_symbol_string},
40    types::DatabentoPublisher,
41};
42
43#[cfg_attr(
44    feature = "python",
45    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
46)]
47#[derive(Debug)]
48pub struct DatabentoLiveClient {
49    #[pyo3(get)]
50    pub key: String,
51    #[pyo3(get)]
52    pub dataset: String,
53    is_running: bool,
54    is_closed: bool,
55    cmd_tx: tokio::sync::mpsc::UnboundedSender<LiveCommand>,
56    cmd_rx: Option<tokio::sync::mpsc::UnboundedReceiver<LiveCommand>>,
57    buffer_size: usize,
58    publisher_venue_map: IndexMap<u16, Venue>,
59    symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
60    use_exchange_as_venue: bool,
61    bars_timestamp_on_close: bool,
62    reconnect_timeout_mins: Option<u64>,
63}
64
65impl DatabentoLiveClient {
66    #[must_use]
67    pub fn is_closed(&self) -> bool {
68        self.cmd_tx.is_closed()
69    }
70
71    async fn process_messages(
72        mut msg_rx: tokio::sync::mpsc::Receiver<LiveMessage>,
73        callback: Py<PyAny>,
74        callback_pyo3: Py<PyAny>,
75    ) -> PyResult<()> {
76        log::debug!("Processing messages...");
77        // Continue to process messages until channel is hung up
78        while let Some(msg) = msg_rx.recv().await {
79            log::trace!("Received message: {msg:?}");
80
81            match msg {
82                LiveMessage::Data(data) => Python::attach(|py| {
83                    let py_obj = data_to_pycapsule(py, data);
84                    call_python(py, &callback, py_obj);
85                }),
86                LiveMessage::Instrument(data) => {
87                    Python::attach(|py| match instrument_any_to_pyobject(py, data) {
88                        Ok(py_obj) => call_python(py, &callback, py_obj),
89                        Err(e) => log::error!("Failed creating instrument: {e}"),
90                    });
91                }
92                LiveMessage::Status(data) => Python::attach(|py| {
93                    let py_obj = data.into_py_any_unwrap(py);
94                    call_python(py, &callback_pyo3, py_obj);
95                }),
96                LiveMessage::Imbalance(data) => Python::attach(|py| {
97                    let py_obj = data.into_py_any_unwrap(py);
98                    call_python(py, &callback_pyo3, py_obj);
99                }),
100                LiveMessage::Statistics(data) => Python::attach(|py| {
101                    let py_obj = data.into_py_any_unwrap(py);
102                    call_python(py, &callback_pyo3, py_obj);
103                }),
104                LiveMessage::SubscriptionAck(ack) => Python::attach(|py| {
105                    let py_obj: DatabentoSubscriptionAck = ack.into();
106                    let py_obj = py_obj.into_py_any_unwrap(py);
107                    call_python(py, &callback_pyo3, py_obj);
108                }),
109                LiveMessage::Close => {
110                    // Graceful close
111                    break;
112                }
113                LiveMessage::Error(e) => {
114                    // Return error to Python
115                    return Err(to_pyruntime_err(e));
116                }
117            }
118        }
119
120        msg_rx.close();
121        log::debug!("Closed message receiver");
122
123        Ok(())
124    }
125
126    fn send_command(&self, cmd: LiveCommand) -> PyResult<()> {
127        self.cmd_tx.send(cmd).map_err(to_pyruntime_err)
128    }
129}
130
131fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
132    if let Err(e) = callback.call1(py, (py_obj,)) {
133        // TODO: Improve this by checking for the actual exception type
134        if !e.to_string().contains("CancelledError") {
135            log::error!("Error calling Python: {e}");
136        }
137    }
138}
139
140#[pymethods]
141impl DatabentoLiveClient {
142    /// # Errors
143    ///
144    /// Returns a `PyErr` if reading or parsing the publishers file fails.
145    #[new]
146    #[pyo3(signature = (key, dataset, publishers_filepath, use_exchange_as_venue, bars_timestamp_on_close=None, reconnect_timeout_mins=None))]
147    pub fn py_new(
148        key: String,
149        dataset: String,
150        publishers_filepath: PathBuf,
151        use_exchange_as_venue: bool,
152        bars_timestamp_on_close: Option<bool>,
153        reconnect_timeout_mins: Option<i64>,
154    ) -> PyResult<Self> {
155        let publishers_json = fs::read_to_string(publishers_filepath).map_err(to_pyvalue_err)?;
156        let publishers_vec: Vec<DatabentoPublisher> =
157            serde_json::from_str(&publishers_json).map_err(to_pyvalue_err)?;
158        let publisher_venue_map = publishers_vec
159            .into_iter()
160            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
161            .collect::<IndexMap<u16, Venue>>();
162
163        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<LiveCommand>();
164
165        // Hardcoded to a reasonable size for now
166        let buffer_size = 100_000;
167
168        // Convert i64 to u64: None/negative = infinite retries, 0 = no retries, positive = timeout in minutes
169        let reconnect_timeout_mins = reconnect_timeout_mins
170            .and_then(|mins| if mins >= 0 { Some(mins as u64) } else { None });
171
172        Ok(Self {
173            key,
174            dataset,
175            cmd_tx,
176            cmd_rx: Some(cmd_rx),
177            buffer_size,
178            is_running: false,
179            is_closed: false,
180            publisher_venue_map,
181            symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
182            use_exchange_as_venue,
183            bars_timestamp_on_close: bars_timestamp_on_close.unwrap_or(true),
184            reconnect_timeout_mins,
185        })
186    }
187
188    #[pyo3(name = "is_running")]
189    const fn py_is_running(&self) -> bool {
190        self.is_running
191    }
192
193    #[pyo3(name = "is_closed")]
194    const fn py_is_closed(&self) -> bool {
195        self.is_closed
196    }
197
198    #[pyo3(name = "subscribe")]
199    #[pyo3(signature = (schema, instrument_ids, start=None, snapshot=None))]
200    fn py_subscribe(
201        &mut self,
202        schema: String,
203        instrument_ids: Vec<InstrumentId>,
204        start: Option<u64>,
205        snapshot: Option<bool>,
206    ) -> PyResult<()> {
207        let mut symbol_venue_map = self
208            .symbol_venue_map
209            .write()
210            .map_err(|e| to_pyruntime_err(format!("symbol_venue_map lock poisoned: {e}")))?;
211        let symbols: Vec<String> = instrument_ids
212            .iter()
213            .map(|instrument_id| {
214                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
215            })
216            .collect();
217        let first_symbol = symbols
218            .first()
219            .ok_or_else(|| to_pyvalue_err("No symbols provided"))?;
220        let stype_in = infer_symbology_type(first_symbol);
221        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
222        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
223        let mut sub = Subscription::builder()
224            .symbols(symbols)
225            .schema(dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?)
226            .stype_in(stype_in)
227            .build();
228
229        if let Some(start) = start {
230            let start = OffsetDateTime::from_unix_timestamp_nanos(i128::from(start))
231                .map_err(to_pyvalue_err)?;
232            sub.start = Some(start);
233        }
234        sub.use_snapshot = snapshot.unwrap_or(false);
235
236        self.send_command(LiveCommand::Subscribe(sub))
237    }
238
239    #[pyo3(name = "start")]
240    fn py_start<'py>(
241        &mut self,
242        py: Python<'py>,
243        callback: Py<PyAny>,
244        callback_pyo3: Py<PyAny>,
245    ) -> PyResult<Bound<'py, PyAny>> {
246        if self.is_closed {
247            return Err(to_pyruntime_err("Client already closed"));
248        }
249        if self.is_running {
250            return Err(to_pyruntime_err("Client already running"));
251        }
252
253        log::debug!("Starting client");
254
255        self.is_running = true;
256
257        let (msg_tx, msg_rx) = tokio::sync::mpsc::channel::<LiveMessage>(self.buffer_size);
258
259        // Consume the receiver
260        // SAFETY: We guard the client from being started more than once with the
261        // `is_running` flag, so here it is safe to unwrap the command receiver.
262        let cmd_rx = self
263            .cmd_rx
264            .take()
265            .ok_or_else(|| to_pyruntime_err("Command receiver already taken"))?;
266
267        let mut feed_handler = DatabentoFeedHandler::new(
268            self.key.clone(),
269            self.dataset.clone(),
270            cmd_rx,
271            msg_tx,
272            self.publisher_venue_map.clone(),
273            self.symbol_venue_map.clone(),
274            self.use_exchange_as_venue,
275            self.bars_timestamp_on_close,
276            self.reconnect_timeout_mins,
277        );
278
279        self.send_command(LiveCommand::Start)?;
280
281        pyo3_async_runtimes::tokio::future_into_py(py, async move {
282            let (proc_handle, feed_handle) = tokio::join!(
283                Self::process_messages(msg_rx, callback, callback_pyo3),
284                feed_handler.run(),
285            );
286
287            match proc_handle {
288                Ok(()) => log::debug!("Message processor completed"),
289                Err(e) => log::error!("Message processor error: {e}"),
290            }
291
292            match feed_handle {
293                Ok(()) => log::debug!("Feed handler completed"),
294                Err(e) => log::error!("Feed handler error: {e}"),
295            }
296
297            Ok(())
298        })
299    }
300
301    #[pyo3(name = "close")]
302    fn py_close(&mut self) -> PyResult<()> {
303        if !self.is_running {
304            return Err(to_pyruntime_err("Client never started"));
305        }
306        if self.is_closed {
307            return Err(to_pyruntime_err("Client already closed"));
308        }
309
310        log::debug!("Closing client");
311
312        if !self.is_closed() {
313            self.send_command(LiveCommand::Close)?;
314        }
315
316        self.is_running = false;
317        self.is_closed = true;
318
319        Ok(())
320    }
321}