nautilus_databento/python/
live.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    fs,
19    path::PathBuf,
20    str::FromStr,
21    sync::{Arc, RwLock},
22};
23
24use databento::{dbn, live::Subscription};
25use indexmap::IndexMap;
26use nautilus_core::python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err};
27use nautilus_model::{
28    identifiers::{InstrumentId, Symbol, Venue},
29    python::{data::data_to_pycapsule, instruments::instrument_any_to_pyobject},
30};
31use pyo3::prelude::*;
32use time::OffsetDateTime;
33
34use crate::{
35    live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
36    symbology::{check_consistent_symbology, infer_symbology_type, instrument_id_to_symbol_string},
37    types::DatabentoPublisher,
38};
39
40#[cfg_attr(
41    feature = "python",
42    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
43)]
44#[derive(Debug)]
45pub struct DatabentoLiveClient {
46    #[pyo3(get)]
47    pub key: String,
48    #[pyo3(get)]
49    pub dataset: String,
50    is_running: bool,
51    is_closed: bool,
52    cmd_tx: tokio::sync::mpsc::UnboundedSender<LiveCommand>,
53    cmd_rx: Option<tokio::sync::mpsc::UnboundedReceiver<LiveCommand>>,
54    buffer_size: usize,
55    publisher_venue_map: IndexMap<u16, Venue>,
56    symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
57    use_exchange_as_venue: bool,
58}
59
60impl DatabentoLiveClient {
61    #[must_use]
62    pub fn is_closed(&self) -> bool {
63        self.cmd_tx.is_closed()
64    }
65
66    async fn process_messages(
67        mut msg_rx: tokio::sync::mpsc::Receiver<LiveMessage>,
68        callback: PyObject,
69        callback_pyo3: PyObject,
70    ) -> PyResult<()> {
71        tracing::debug!("Processing messages...");
72        // Continue to process messages until channel is hung up
73        while let Some(msg) = msg_rx.recv().await {
74            tracing::trace!("Received message: {msg:?}");
75            match msg {
76                LiveMessage::Data(data) => Python::with_gil(|py| {
77                    let py_obj = data_to_pycapsule(py, data);
78                    call_python(py, &callback, py_obj);
79                }),
80                LiveMessage::Instrument(data) => Python::with_gil(|py| {
81                    let py_obj =
82                        instrument_any_to_pyobject(py, data).expect("Failed creating instrument");
83                    call_python(py, &callback, py_obj);
84                }),
85                LiveMessage::Status(data) => Python::with_gil(|py| {
86                    let py_obj = data.into_py_any_unwrap(py);
87                    call_python(py, &callback_pyo3, py_obj);
88                }),
89                LiveMessage::Imbalance(data) => Python::with_gil(|py| {
90                    let py_obj = data.into_py_any_unwrap(py);
91                    call_python(py, &callback_pyo3, py_obj);
92                }),
93                LiveMessage::Statistics(data) => Python::with_gil(|py| {
94                    let py_obj = data.into_py_any_unwrap(py);
95                    call_python(py, &callback_pyo3, py_obj);
96                }),
97                LiveMessage::Close => {
98                    // Graceful close
99                    break;
100                }
101                LiveMessage::Error(e) => {
102                    // Return error to Python
103                    return Err(to_pyruntime_err(e));
104                }
105            }
106        }
107
108        msg_rx.close();
109        tracing::debug!("Closed message receiver");
110
111        Ok(())
112    }
113
114    fn send_command(&self, cmd: LiveCommand) -> PyResult<()> {
115        self.cmd_tx.send(cmd).map_err(to_pyruntime_err)
116    }
117}
118
119fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
120    if let Err(e) = callback.call1(py, (py_obj,)) {
121        // TODO: Improve this by checking for the actual exception type
122        if !e.to_string().contains("CancelledError") {
123            tracing::error!("Error calling Python: {e}");
124        }
125    }
126}
127
128#[pymethods]
129impl DatabentoLiveClient {
130    #[new]
131    pub fn py_new(
132        key: String,
133        dataset: String,
134        publishers_filepath: PathBuf,
135        use_exchange_as_venue: bool,
136    ) -> PyResult<Self> {
137        let publishers_json = fs::read_to_string(publishers_filepath)?;
138        let publishers_vec: Vec<DatabentoPublisher> =
139            serde_json::from_str(&publishers_json).map_err(to_pyvalue_err)?;
140        let publisher_venue_map = publishers_vec
141            .into_iter()
142            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
143            .collect::<IndexMap<u16, Venue>>();
144
145        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<LiveCommand>();
146
147        // Hard-coded to a reasonable size for now
148        let buffer_size = 100_000;
149
150        Ok(Self {
151            key,
152            dataset,
153            cmd_tx,
154            cmd_rx: Some(cmd_rx),
155            buffer_size,
156            is_running: false,
157            is_closed: false,
158            publisher_venue_map,
159            symbol_venue_map: Arc::new(RwLock::new(HashMap::new())),
160            use_exchange_as_venue,
161        })
162    }
163
164    #[pyo3(name = "is_running")]
165    const fn py_is_running(&self) -> bool {
166        self.is_running
167    }
168
169    #[pyo3(name = "is_closed")]
170    const fn py_is_closed(&self) -> bool {
171        self.is_closed
172    }
173
174    #[pyo3(name = "subscribe")]
175    #[pyo3(signature = (schema, instrument_ids, start=None, snapshot=None))]
176    fn py_subscribe(
177        &mut self,
178        schema: String,
179        instrument_ids: Vec<InstrumentId>,
180        start: Option<u64>,
181        snapshot: Option<bool>,
182    ) -> PyResult<()> {
183        let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
184        let symbols: Vec<String> = instrument_ids
185            .iter()
186            .map(|instrument_id| {
187                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
188            })
189            .collect();
190        let stype_in = infer_symbology_type(symbols.first().unwrap());
191        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
192        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
193        let mut sub = Subscription::builder()
194            .symbols(symbols)
195            .schema(dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?)
196            .stype_in(stype_in)
197            .build();
198
199        if let Some(start) = start {
200            let start = OffsetDateTime::from_unix_timestamp_nanos(i128::from(start))
201                .map_err(to_pyvalue_err)?;
202            sub.start = Some(start);
203        }
204        sub.use_snapshot = snapshot.unwrap_or(false);
205
206        self.send_command(LiveCommand::Subscribe(sub))
207    }
208
209    #[pyo3(name = "start")]
210    fn py_start<'py>(
211        &mut self,
212        py: Python<'py>,
213        callback: PyObject,
214        callback_pyo3: PyObject,
215    ) -> PyResult<Bound<'py, PyAny>> {
216        if self.is_closed {
217            return Err(to_pyruntime_err("Client already closed"));
218        }
219        if self.is_running {
220            return Err(to_pyruntime_err("Client already running"));
221        }
222
223        tracing::debug!("Starting client");
224
225        self.is_running = true;
226
227        let (msg_tx, msg_rx) = tokio::sync::mpsc::channel::<LiveMessage>(self.buffer_size);
228
229        // Consume the receiver
230        // SAFETY: We guard the client from being started more than once with the
231        // `is_running` flag, so here it is safe to unwrap the command receiver.
232        let cmd_rx = self.cmd_rx.take().unwrap();
233
234        let mut feed_handler = DatabentoFeedHandler::new(
235            self.key.clone(),
236            self.dataset.clone(),
237            cmd_rx,
238            msg_tx,
239            self.publisher_venue_map.clone(),
240            self.symbol_venue_map.clone(),
241            self.use_exchange_as_venue,
242        );
243
244        self.send_command(LiveCommand::Start)?;
245
246        pyo3_async_runtimes::tokio::future_into_py(py, async move {
247            let (proc_handle, feed_handle) = tokio::join!(
248                Self::process_messages(msg_rx, callback, callback_pyo3),
249                feed_handler.run(),
250            );
251
252            match proc_handle {
253                Ok(()) => tracing::debug!("Message processor completed"),
254                Err(e) => tracing::error!("Message processor error: {e}"),
255            }
256
257            match feed_handle {
258                Ok(()) => tracing::debug!("Feed handler completed"),
259                Err(e) => tracing::error!("Feed handler error: {e}"),
260            }
261
262            Ok(())
263        })
264    }
265
266    #[pyo3(name = "close")]
267    fn py_close(&mut self) -> PyResult<()> {
268        if !self.is_running {
269            return Err(to_pyruntime_err("Client never started"));
270        }
271        if self.is_closed {
272            return Err(to_pyruntime_err("Client already closed"));
273        }
274
275        tracing::debug!("Closing client");
276
277        if !self.is_closed() {
278            self.send_command(LiveCommand::Close)?;
279        }
280
281        self.is_running = false;
282        self.is_closed = true;
283
284        Ok(())
285    }
286}