nautilus_databento/
live.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    sync::{Arc, RwLock},
19};
20
21use databento::{
22    dbn::{self, PitSymbolMap, Publisher, Record, SymbolIndex, VersionUpgradePolicy},
23    live::Subscription,
24};
25use indexmap::IndexMap;
26use nautilus_core::{UnixNanos, python::to_pyruntime_err, time::get_atomic_clock_realtime};
27use nautilus_model::{
28    data::{Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
29    enums::RecordFlag,
30    identifiers::{InstrumentId, Symbol, Venue},
31    instruments::InstrumentAny,
32};
33use tokio::{sync::mpsc::error::TryRecvError, time::Duration};
34
35use super::{
36    decode::{decode_imbalance_msg, decode_statistics_msg, decode_status_msg},
37    types::{DatabentoImbalance, DatabentoStatistics},
38};
39use crate::{
40    decode::{decode_instrument_def_msg, decode_record},
41    types::PublisherId,
42};
43
44#[derive(Debug)]
45pub enum LiveCommand {
46    Subscribe(Subscription),
47    Start,
48    Close,
49}
50
51#[derive(Debug)]
52#[allow(clippy::large_enum_variant)] // TODO: Optimize this (largest variant 1096 vs 80 bytes)
53pub enum LiveMessage {
54    Data(Data),
55    Instrument(InstrumentAny),
56    Status(InstrumentStatus),
57    Imbalance(DatabentoImbalance),
58    Statistics(DatabentoStatistics),
59    Error(anyhow::Error),
60    Close,
61}
62
63/// Handles a raw TCP data feed from the Databento LSG for a single dataset.
64///
65/// [`LiveCommand`] messages are recieved synchronously across a channel,
66/// decoded records are sent asynchronously on a tokio channel as [`LiveMessage`]s
67/// back to a message processing task.
68#[derive(Debug)]
69pub struct DatabentoFeedHandler {
70    key: String,
71    dataset: String,
72    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
73    msg_tx: tokio::sync::mpsc::Sender<LiveMessage>,
74    publisher_venue_map: IndexMap<PublisherId, Venue>,
75    symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
76    replay: bool,
77}
78
79impl DatabentoFeedHandler {
80    /// Creates a new [`DatabentoFeedHandler`] instance.
81    #[must_use]
82    pub const fn new(
83        key: String,
84        dataset: String,
85        rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
86        tx: tokio::sync::mpsc::Sender<LiveMessage>,
87        publisher_venue_map: IndexMap<PublisherId, Venue>,
88        symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
89    ) -> Self {
90        Self {
91            key,
92            dataset,
93            cmd_rx: rx,
94            msg_tx: tx,
95            publisher_venue_map,
96            symbol_venue_map,
97            replay: false,
98        }
99    }
100
101    /// Run the feed handler to begin listening for commands and processing messages.
102    pub async fn run(&mut self) -> anyhow::Result<()> {
103        tracing::debug!("Running feed handler");
104        let clock = get_atomic_clock_realtime();
105        let mut symbol_map = PitSymbolMap::new();
106        let mut instrument_id_map: HashMap<u32, InstrumentId> = HashMap::new();
107
108        let mut buffering_start = None;
109        let mut buffered_deltas: HashMap<InstrumentId, Vec<OrderBookDelta>> = HashMap::new();
110        let mut deltas_count = 0_u64;
111        let timeout = Duration::from_secs(5); // Hard-coded timeout for now
112
113        let result = tokio::time::timeout(
114            timeout,
115            databento::LiveClient::builder()
116                .key(self.key.clone())?
117                .dataset(self.dataset.clone())
118                .upgrade_policy(VersionUpgradePolicy::UpgradeToV2)
119                .build(),
120        )
121        .await?;
122        tracing::info!("Connected");
123
124        let mut client = if let Ok(client) = result {
125            client
126        } else {
127            self.msg_tx.send(LiveMessage::Close).await?;
128            self.cmd_rx.close();
129            return Err(anyhow::anyhow!("Timeout connecting to LSG"));
130        };
131
132        // Timeout awaiting the next record before checking for a command
133        let timeout = Duration::from_millis(10);
134
135        // Flag to control whether to continue to await next record
136        let mut running = false;
137
138        loop {
139            if self.msg_tx.is_closed() {
140                tracing::debug!("Message channel was closed: stopping");
141                break;
142            }
143
144            match self.cmd_rx.try_recv() {
145                Ok(cmd) => {
146                    tracing::debug!("Received command: {cmd:?}");
147                    match cmd {
148                        LiveCommand::Subscribe(sub) => {
149                            if !self.replay & sub.start.is_some() {
150                                self.replay = true;
151                            }
152                            client.subscribe(sub).await.map_err(to_pyruntime_err)?;
153                        }
154                        LiveCommand::Start => {
155                            buffering_start = if self.replay {
156                                Some(clock.get_time_ns())
157                            } else {
158                                None
159                            };
160                            client.start().await.map_err(to_pyruntime_err)?;
161                            running = true;
162                            tracing::debug!("Started");
163                        }
164                        LiveCommand::Close => {
165                            self.msg_tx.send(LiveMessage::Close).await?;
166                            if running {
167                                client.close().await.map_err(to_pyruntime_err)?;
168                                tracing::debug!("Closed inner client");
169                            }
170                            break;
171                        }
172                    }
173                }
174                Err(TryRecvError::Empty) => {} // No command yet
175                Err(TryRecvError::Disconnected) => {
176                    tracing::debug!("Disconnected");
177                    break;
178                }
179            }
180
181            if !running {
182                continue;
183            }
184
185            // Await the next record with a timeout
186            let result = tokio::time::timeout(timeout, client.next_record()).await;
187            let record_opt = match result {
188                Ok(record_opt) => record_opt,
189                Err(_) => continue, // Timeout
190            };
191
192            let record = match record_opt {
193                Ok(Some(record)) => record,
194                Ok(None) => break, // Session ended normally
195                Err(e) => {
196                    // Fail the session entirely for now. Consider refining
197                    // this strategy to handle specific errors more gracefully.
198                    self.send_msg(LiveMessage::Error(anyhow::anyhow!(e))).await;
199                    break;
200                }
201            };
202
203            let ts_init = clock.get_time_ns();
204
205            // Decode record
206            if let Some(msg) = record.get::<dbn::ErrorMsg>() {
207                handle_error_msg(msg);
208            } else if let Some(msg) = record.get::<dbn::SystemMsg>() {
209                handle_system_msg(msg);
210            } else if let Some(msg) = record.get::<dbn::SymbolMappingMsg>() {
211                // Remove instrument ID index as the raw symbol may have changed
212                instrument_id_map.remove(&msg.hd.instrument_id);
213                handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map);
214            } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
215                // TODO: Make this configurable so that exchange can be used as venue
216                let use_exchange_as_venue = false;
217                if use_exchange_as_venue && msg.publisher()? == Publisher::GlbxMdp3Glbx {
218                    update_instrument_id_map_with_exchange(
219                        &symbol_map,
220                        &self.symbol_venue_map,
221                        &mut instrument_id_map,
222                        msg.hd.instrument_id,
223                        msg.exchange()?,
224                    );
225                }
226                let data = handle_instrument_def_msg(
227                    msg,
228                    &record,
229                    &symbol_map,
230                    &self.publisher_venue_map,
231                    &self.symbol_venue_map.read().unwrap(),
232                    &mut instrument_id_map,
233                    ts_init,
234                )?;
235                self.send_msg(LiveMessage::Instrument(data)).await;
236            } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
237                let data = handle_status_msg(
238                    msg,
239                    &record,
240                    &symbol_map,
241                    &self.publisher_venue_map,
242                    &self.symbol_venue_map.read().unwrap(),
243                    &mut instrument_id_map,
244                    ts_init,
245                )?;
246                self.send_msg(LiveMessage::Status(data)).await;
247            } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
248                let data = handle_imbalance_msg(
249                    msg,
250                    &record,
251                    &symbol_map,
252                    &self.publisher_venue_map,
253                    &self.symbol_venue_map.read().unwrap(),
254                    &mut instrument_id_map,
255                    ts_init,
256                )?;
257                self.send_msg(LiveMessage::Imbalance(data)).await;
258            } else if let Some(msg) = record.get::<dbn::StatMsg>() {
259                let data = handle_statistics_msg(
260                    msg,
261                    &record,
262                    &symbol_map,
263                    &self.publisher_venue_map,
264                    &self.symbol_venue_map.read().unwrap(),
265                    &mut instrument_id_map,
266                    ts_init,
267                )?;
268                self.send_msg(LiveMessage::Statistics(data)).await;
269            } else {
270                let (mut data1, data2) = match handle_record(
271                    record,
272                    &symbol_map,
273                    &self.publisher_venue_map,
274                    &self.symbol_venue_map.read().unwrap(),
275                    &mut instrument_id_map,
276                    ts_init,
277                ) {
278                    Ok(decoded) => decoded,
279                    Err(e) => {
280                        tracing::error!("Error decoding record: {e}");
281                        continue;
282                    }
283                };
284
285                if let Some(msg) = record.get::<dbn::MboMsg>() {
286                    if let Data::Delta(delta) = data1.clone().expect("MBO should decode a delta") {
287                        let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
288                        buffer.push(delta);
289
290                        deltas_count += 1;
291                        tracing::trace!(
292                            "Buffering delta: {deltas_count} {} {buffering_start:?} flags={}",
293                            delta.ts_event,
294                            msg.flags.raw(),
295                        );
296
297                        // Check if last message in the book event
298                        if !RecordFlag::F_LAST.matches(msg.flags.raw()) {
299                            continue; // NOT last message
300                        }
301
302                        // Check if snapshot
303                        if RecordFlag::F_SNAPSHOT.matches(msg.flags.raw()) {
304                            continue; // Buffer snapshot
305                        }
306
307                        // Check if buffering a replay
308                        if let Some(start_ns) = buffering_start {
309                            if delta.ts_event <= start_ns {
310                                continue; // Continue buffering replay
311                            }
312                            buffering_start = None;
313                        }
314
315                        // SAFETY: We can guarantee a deltas vec exists
316                        let buffer = buffered_deltas.remove(&delta.instrument_id).unwrap();
317                        let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
318                        let deltas = OrderBookDeltas_API::new(deltas);
319                        data1 = Some(Data::Deltas(deltas));
320                    }
321                }
322
323                if let Some(data) = data1 {
324                    self.send_msg(LiveMessage::Data(data)).await;
325                }
326
327                if let Some(data) = data2 {
328                    self.send_msg(LiveMessage::Data(data)).await;
329                }
330            }
331        }
332
333        self.cmd_rx.close();
334        tracing::debug!("Closed command receiver");
335
336        Ok(())
337    }
338
339    async fn send_msg(&mut self, msg: LiveMessage) {
340        tracing::trace!("Sending {msg:?}");
341        match self.msg_tx.send(msg).await {
342            Ok(()) => {}
343            Err(e) => tracing::error!("Error sending message: {e}"),
344        }
345    }
346}
347
348fn handle_error_msg(msg: &dbn::ErrorMsg) {
349    tracing::error!("{msg:?}");
350}
351
352fn handle_system_msg(msg: &dbn::SystemMsg) {
353    tracing::info!("{msg:?}");
354}
355
356fn handle_symbol_mapping_msg(
357    msg: &dbn::SymbolMappingMsg,
358    symbol_map: &mut PitSymbolMap,
359    instrument_id_map: &mut HashMap<u32, InstrumentId>,
360) {
361    // Update the symbol map
362    symbol_map
363        .on_symbol_mapping(msg)
364        .unwrap_or_else(|_| panic!("Error updating `symbol_map` with {msg:?}"));
365
366    // Remove current entry for instrument
367    instrument_id_map.remove(&msg.header().instrument_id);
368}
369
370fn update_instrument_id_map_with_exchange(
371    symbol_map: &PitSymbolMap,
372    symbol_venue_map: &RwLock<HashMap<Symbol, Venue>>,
373    instrument_id_map: &mut HashMap<u32, InstrumentId>,
374    raw_instrument_id: u32,
375    exchange: &str,
376) -> InstrumentId {
377    let raw_symbol = symbol_map
378        .get(raw_instrument_id)
379        .expect("Cannot resolve `raw_symbol` from `symbol_map`");
380    let symbol = Symbol::from(raw_symbol.as_str());
381    let venue = Venue::from(exchange);
382    let instrument_id = InstrumentId::new(symbol, venue);
383    symbol_venue_map.write().unwrap().insert(symbol, venue);
384    instrument_id_map.insert(raw_instrument_id, instrument_id);
385    instrument_id
386}
387
388fn update_instrument_id_map(
389    record: &dbn::RecordRef,
390    symbol_map: &PitSymbolMap,
391    publisher_venue_map: &IndexMap<PublisherId, Venue>,
392    symbol_venue_map: &HashMap<Symbol, Venue>,
393    instrument_id_map: &mut HashMap<u32, InstrumentId>,
394) -> InstrumentId {
395    let header = record.header();
396
397    // Check if instrument ID is already in the map
398    if let Some(&instrument_id) = instrument_id_map.get(&header.instrument_id) {
399        return instrument_id;
400    }
401
402    let raw_symbol = symbol_map
403        .get_for_rec(record)
404        .expect("Cannot resolve `raw_symbol` from `symbol_map`");
405
406    let symbol = Symbol::from_str_unchecked(raw_symbol);
407
408    let publisher_id = header.publisher_id;
409    let venue = match symbol_venue_map.get(&symbol) {
410        Some(venue) => venue,
411        None => publisher_venue_map
412            .get(&publisher_id)
413            .unwrap_or_else(|| panic!("No venue found for `publisher_id` {publisher_id}")),
414    };
415    let instrument_id = InstrumentId::new(symbol, *venue);
416
417    instrument_id_map.insert(header.instrument_id, instrument_id);
418    instrument_id
419}
420
421fn handle_instrument_def_msg(
422    msg: &dbn::InstrumentDefMsg,
423    record: &dbn::RecordRef,
424    symbol_map: &PitSymbolMap,
425    publisher_venue_map: &IndexMap<PublisherId, Venue>,
426    symbol_venue_map: &HashMap<Symbol, Venue>,
427    instrument_id_map: &mut HashMap<u32, InstrumentId>,
428    ts_init: UnixNanos,
429) -> anyhow::Result<InstrumentAny> {
430    let instrument_id = update_instrument_id_map(
431        record,
432        symbol_map,
433        publisher_venue_map,
434        symbol_venue_map,
435        instrument_id_map,
436    );
437
438    decode_instrument_def_msg(msg, instrument_id, ts_init)
439}
440
441fn handle_status_msg(
442    msg: &dbn::StatusMsg,
443    record: &dbn::RecordRef,
444    symbol_map: &PitSymbolMap,
445    publisher_venue_map: &IndexMap<PublisherId, Venue>,
446    symbol_venue_map: &HashMap<Symbol, Venue>,
447    instrument_id_map: &mut HashMap<u32, InstrumentId>,
448    ts_init: UnixNanos,
449) -> anyhow::Result<InstrumentStatus> {
450    let instrument_id = update_instrument_id_map(
451        record,
452        symbol_map,
453        publisher_venue_map,
454        symbol_venue_map,
455        instrument_id_map,
456    );
457
458    decode_status_msg(msg, instrument_id, ts_init)
459}
460
461fn handle_imbalance_msg(
462    msg: &dbn::ImbalanceMsg,
463    record: &dbn::RecordRef,
464    symbol_map: &PitSymbolMap,
465    publisher_venue_map: &IndexMap<PublisherId, Venue>,
466    symbol_venue_map: &HashMap<Symbol, Venue>,
467    instrument_id_map: &mut HashMap<u32, InstrumentId>,
468    ts_init: UnixNanos,
469) -> anyhow::Result<DatabentoImbalance> {
470    let instrument_id = update_instrument_id_map(
471        record,
472        symbol_map,
473        publisher_venue_map,
474        symbol_venue_map,
475        instrument_id_map,
476    );
477
478    let price_precision = 2; // Hard-coded for now
479
480    decode_imbalance_msg(msg, instrument_id, price_precision, ts_init)
481}
482
483fn handle_statistics_msg(
484    msg: &dbn::StatMsg,
485    record: &dbn::RecordRef,
486    symbol_map: &PitSymbolMap,
487    publisher_venue_map: &IndexMap<PublisherId, Venue>,
488    symbol_venue_map: &HashMap<Symbol, Venue>,
489    instrument_id_map: &mut HashMap<u32, InstrumentId>,
490    ts_init: UnixNanos,
491) -> anyhow::Result<DatabentoStatistics> {
492    let instrument_id = update_instrument_id_map(
493        record,
494        symbol_map,
495        publisher_venue_map,
496        symbol_venue_map,
497        instrument_id_map,
498    );
499
500    let price_precision = 2; // Hard-coded for now
501
502    decode_statistics_msg(msg, instrument_id, price_precision, ts_init)
503}
504
505fn handle_record(
506    record: dbn::RecordRef,
507    symbol_map: &PitSymbolMap,
508    publisher_venue_map: &IndexMap<PublisherId, Venue>,
509    symbol_venue_map: &HashMap<Symbol, Venue>,
510    instrument_id_map: &mut HashMap<u32, InstrumentId>,
511    ts_init: UnixNanos,
512) -> anyhow::Result<(Option<Data>, Option<Data>)> {
513    let instrument_id = update_instrument_id_map(
514        &record,
515        symbol_map,
516        publisher_venue_map,
517        symbol_venue_map,
518        instrument_id_map,
519    );
520
521    let price_precision = 2; // Hard-coded for now
522
523    decode_record(
524        &record,
525        instrument_id,
526        price_precision,
527        Some(ts_init),
528        true, // Always include trades
529    )
530}