nautilus_databento/python/
live.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    fs,
19    path::PathBuf,
20    str::FromStr,
21    sync::{Arc, RwLock},
22};
23
24use databento::{dbn, live::Subscription};
25use indexmap::IndexMap;
26use nautilus_core::python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err};
27use nautilus_model::{
28    identifiers::{InstrumentId, Symbol, Venue},
29    python::{data::data_to_pycapsule, instruments::instrument_any_to_pyobject},
30};
31use pyo3::prelude::*;
32use time::OffsetDateTime;
33
34use crate::{
35    live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
36    symbology::{check_consistent_symbology, infer_symbology_type, instrument_id_to_symbol_string},
37    types::DatabentoPublisher,
38};
39
40#[cfg_attr(
41    feature = "python",
42    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
43)]
44#[derive(Debug)]
45pub struct DatabentoLiveClient {
46    #[pyo3(get)]
47    pub key: String,
48    #[pyo3(get)]
49    pub dataset: String,
50    is_running: bool,
51    is_closed: bool,
52    cmd_tx: tokio::sync::mpsc::UnboundedSender<LiveCommand>,
53    cmd_rx: Option<tokio::sync::mpsc::UnboundedReceiver<LiveCommand>>,
54    buffer_size: usize,
55    publisher_venue_map: IndexMap<u16, Venue>,
56    symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
57}
58
59impl DatabentoLiveClient {
60    #[must_use]
61    pub fn is_closed(&self) -> bool {
62        self.cmd_tx.is_closed()
63    }
64
65    async fn process_messages(
66        mut msg_rx: tokio::sync::mpsc::Receiver<LiveMessage>,
67        callback: PyObject,
68        callback_pyo3: PyObject,
69    ) -> PyResult<()> {
70        tracing::debug!("Processing messages...");
71        // Continue to process messages until channel is hung up
72        while let Some(msg) = msg_rx.recv().await {
73            tracing::trace!("Received message: {msg:?}");
74            match msg {
75                LiveMessage::Data(data) => Python::with_gil(|py| {
76                    let py_obj = data_to_pycapsule(py, data);
77                    call_python(py, &callback, py_obj);
78                }),
79                LiveMessage::Instrument(data) => Python::with_gil(|py| {
80                    let py_obj =
81                        instrument_any_to_pyobject(py, data).expect("Failed creating instrument");
82                    call_python(py, &callback, py_obj);
83                }),
84                LiveMessage::Status(data) => Python::with_gil(|py| {
85                    let py_obj = data.into_py_any_unwrap(py);
86                    call_python(py, &callback_pyo3, py_obj);
87                }),
88                LiveMessage::Imbalance(data) => Python::with_gil(|py| {
89                    let py_obj = data.into_py_any_unwrap(py);
90                    call_python(py, &callback_pyo3, py_obj);
91                }),
92                LiveMessage::Statistics(data) => Python::with_gil(|py| {
93                    let py_obj = data.into_py_any_unwrap(py);
94                    call_python(py, &callback_pyo3, py_obj);
95                }),
96                LiveMessage::Close => {
97                    // Graceful close
98                    break;
99                }
100                LiveMessage::Error(e) => {
101                    // Return error to Python
102                    return Err(to_pyruntime_err(e));
103                }
104            }
105        }
106
107        msg_rx.close();
108        tracing::debug!("Closed message receiver");
109
110        Ok(())
111    }
112
113    fn send_command(&self, cmd: LiveCommand) -> PyResult<()> {
114        self.cmd_tx.send(cmd).map_err(to_pyruntime_err)
115    }
116}
117
118fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
119    if let Err(e) = callback.call1(py, (py_obj,)) {
120        tracing::error!("Error calling Python: {e}");
121    }
122}
123
124#[pymethods]
125impl DatabentoLiveClient {
126    #[new]
127    pub fn py_new(key: String, dataset: String, publishers_filepath: PathBuf) -> PyResult<Self> {
128        let publishers_json = fs::read_to_string(publishers_filepath)?;
129        let publishers_vec: Vec<DatabentoPublisher> =
130            serde_json::from_str(&publishers_json).map_err(to_pyvalue_err)?;
131        let publisher_venue_map = publishers_vec
132            .into_iter()
133            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
134            .collect::<IndexMap<u16, Venue>>();
135
136        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<LiveCommand>();
137
138        // Hard-coded to a reasonable size for now
139        let buffer_size = 100_000;
140
141        Ok(Self {
142            key,
143            dataset,
144            cmd_tx,
145            cmd_rx: Some(cmd_rx),
146            buffer_size,
147            is_running: false,
148            is_closed: false,
149            publisher_venue_map,
150            symbol_venue_map: Arc::new(RwLock::new(HashMap::new())),
151        })
152    }
153
154    #[pyo3(name = "is_running")]
155    const fn py_is_running(&self) -> bool {
156        self.is_running
157    }
158
159    #[pyo3(name = "is_closed")]
160    const fn py_is_closed(&self) -> bool {
161        self.is_closed
162    }
163
164    #[pyo3(name = "subscribe")]
165    #[pyo3(signature = (schema, instrument_ids, start=None, snapshot=None))]
166    fn py_subscribe(
167        &mut self,
168        schema: String,
169        instrument_ids: Vec<InstrumentId>,
170        start: Option<u64>,
171        snapshot: Option<bool>,
172    ) -> PyResult<()> {
173        let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
174        let symbols: Vec<String> = instrument_ids
175            .iter()
176            .map(|instrument_id| {
177                instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
178            })
179            .collect();
180        let stype_in = infer_symbology_type(symbols.first().unwrap());
181        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
182        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
183        let mut sub = Subscription::builder()
184            .symbols(symbols)
185            .schema(dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?)
186            .stype_in(stype_in)
187            .build();
188
189        if let Some(start) = start {
190            let start = OffsetDateTime::from_unix_timestamp_nanos(i128::from(start))
191                .map_err(to_pyvalue_err)?;
192            sub.start = Some(start);
193        }
194        sub.use_snapshot = snapshot.unwrap_or(false);
195
196        self.send_command(LiveCommand::Subscribe(sub))
197    }
198
199    #[pyo3(name = "start")]
200    fn py_start<'py>(
201        &mut self,
202        py: Python<'py>,
203        callback: PyObject,
204        callback_pyo3: PyObject,
205    ) -> PyResult<Bound<'py, PyAny>> {
206        if self.is_closed {
207            return Err(to_pyruntime_err("Client already closed"));
208        }
209        if self.is_running {
210            return Err(to_pyruntime_err("Client already running"));
211        }
212
213        tracing::debug!("Starting client");
214
215        self.is_running = true;
216
217        let (msg_tx, msg_rx) = tokio::sync::mpsc::channel::<LiveMessage>(self.buffer_size);
218
219        // Consume the receiver
220        // SAFETY: We guard the client from being started more than once with the
221        // `is_running` flag, so here it is safe to unwrap the command receiver.
222        let cmd_rx = self.cmd_rx.take().unwrap();
223
224        let mut feed_handler = DatabentoFeedHandler::new(
225            self.key.clone(),
226            self.dataset.clone(),
227            cmd_rx,
228            msg_tx,
229            self.publisher_venue_map.clone(),
230            self.symbol_venue_map.clone(),
231        );
232
233        self.send_command(LiveCommand::Start)?;
234
235        pyo3_async_runtimes::tokio::future_into_py(py, async move {
236            let (proc_handle, feed_handle) = tokio::join!(
237                Self::process_messages(msg_rx, callback, callback_pyo3),
238                feed_handler.run(),
239            );
240
241            match proc_handle {
242                Ok(()) => tracing::debug!("Message processor completed"),
243                Err(e) => tracing::error!("Message processor error: {e}"),
244            }
245
246            match feed_handle {
247                Ok(()) => tracing::debug!("Feed handler completed"),
248                Err(e) => tracing::error!("Feed handler error: {e}"),
249            }
250
251            Ok(())
252        })
253    }
254
255    #[pyo3(name = "close")]
256    fn py_close(&mut self) -> PyResult<()> {
257        if !self.is_running {
258            return Err(to_pyruntime_err("Client never started"));
259        }
260        if self.is_closed {
261            return Err(to_pyruntime_err("Client already closed"));
262        }
263
264        tracing::debug!("Closing client");
265
266        if !self.is_closed() {
267            self.send_command(LiveCommand::Close)?;
268        }
269
270        self.is_running = false;
271        self.is_closed = true;
272
273        Ok(())
274    }
275}