nautilus_databento/
live.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::{Arc, RwLock};
17
18use ahash::{AHashMap, HashSet, HashSetExt};
19use databento::{
20    dbn::{self, PitSymbolMap, Record, SymbolIndex},
21    live::Subscription,
22};
23use indexmap::IndexMap;
24use nautilus_core::{
25    UnixNanos, consts::NAUTILUS_USER_AGENT, python::to_pyruntime_err,
26    time::get_atomic_clock_realtime,
27};
28use nautilus_model::{
29    data::{Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
30    enums::RecordFlag,
31    identifiers::{InstrumentId, Symbol, Venue},
32    instruments::InstrumentAny,
33};
34use tokio::{sync::mpsc::error::TryRecvError, time::Duration};
35
36use super::{
37    decode::{decode_imbalance_msg, decode_statistics_msg, decode_status_msg},
38    types::{DatabentoImbalance, DatabentoStatistics},
39};
40use crate::{
41    decode::{decode_instrument_def_msg, decode_record},
42    types::PublisherId,
43};
44
45#[derive(Debug)]
46pub enum LiveCommand {
47    Subscribe(Subscription),
48    Start,
49    Close,
50}
51
52#[derive(Debug)]
53#[allow(clippy::large_enum_variant)] // TODO: Optimize this (largest variant 1096 vs 80 bytes)
54pub enum LiveMessage {
55    Data(Data),
56    Instrument(InstrumentAny),
57    Status(InstrumentStatus),
58    Imbalance(DatabentoImbalance),
59    Statistics(DatabentoStatistics),
60    Error(anyhow::Error),
61    Close,
62}
63
64/// Handles a raw TCP data feed from the Databento LSG for a single dataset.
65///
66/// [`LiveCommand`] messages are recieved synchronously across a channel,
67/// decoded records are sent asynchronously on a tokio channel as [`LiveMessage`]s
68/// back to a message processing task.
69#[derive(Debug)]
70pub struct DatabentoFeedHandler {
71    key: String,
72    dataset: String,
73    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
74    msg_tx: tokio::sync::mpsc::Sender<LiveMessage>,
75    publisher_venue_map: IndexMap<PublisherId, Venue>,
76    symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
77    replay: bool,
78    use_exchange_as_venue: bool,
79    bars_timestamp_on_close: bool,
80}
81
82impl DatabentoFeedHandler {
83    /// Creates a new [`DatabentoFeedHandler`] instance.
84    #[must_use]
85    #[allow(clippy::too_many_arguments)]
86    pub const fn new(
87        key: String,
88        dataset: String,
89        rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
90        tx: tokio::sync::mpsc::Sender<LiveMessage>,
91        publisher_venue_map: IndexMap<PublisherId, Venue>,
92        symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
93        use_exchange_as_venue: bool,
94        bars_timestamp_on_close: bool,
95    ) -> Self {
96        Self {
97            key,
98            dataset,
99            cmd_rx: rx,
100            msg_tx: tx,
101            publisher_venue_map,
102            symbol_venue_map,
103            replay: false,
104            use_exchange_as_venue,
105            bars_timestamp_on_close,
106        }
107    }
108
109    /// Run the feed handler to begin listening for commands and processing messages.
110    ///
111    /// # Errors
112    ///
113    /// Returns an error if any client operation or message handling fails.
114    ///
115    /// # Panics
116    ///
117    /// Panics if an MBO message does not decode into a delta variant (via `expect`).
118    #[allow(clippy::blocks_in_conditions)]
119    pub async fn run(&mut self) -> anyhow::Result<()> {
120        tracing::debug!("Running feed handler");
121        let clock = get_atomic_clock_realtime();
122        let mut symbol_map = PitSymbolMap::new();
123        let mut instrument_id_map: AHashMap<u32, InstrumentId> = AHashMap::new();
124
125        let mut buffering_start = None;
126        let mut buffered_deltas: AHashMap<InstrumentId, Vec<OrderBookDelta>> = AHashMap::new();
127        let mut deltas_count = 0_u64;
128        let timeout = Duration::from_secs(5); // Hard-coded timeout for now
129
130        let result = tokio::time::timeout(
131            timeout,
132            databento::LiveClient::builder()
133                .user_agent_extension(NAUTILUS_USER_AGENT.into())
134                .key(self.key.clone())?
135                .dataset(self.dataset.clone())
136                .build(),
137        )
138        .await?;
139
140        tracing::info!("Connected");
141
142        let mut client = if let Ok(client) = result {
143            client
144        } else {
145            self.msg_tx.send(LiveMessage::Close).await?;
146            self.cmd_rx.close();
147            anyhow::bail!("Timeout connecting to LSG");
148        };
149
150        // Timeout awaiting the next record before checking for a command
151        let timeout = Duration::from_millis(10);
152
153        // Flag to control whether to continue to await next record
154        let mut running = false;
155
156        loop {
157            if self.msg_tx.is_closed() {
158                tracing::debug!("Message channel was closed: stopping");
159                break;
160            }
161
162            match self.cmd_rx.try_recv() {
163                Ok(cmd) => {
164                    tracing::debug!("Received command: {cmd:?}");
165                    match cmd {
166                        LiveCommand::Subscribe(sub) => {
167                            if !self.replay & sub.start.is_some() {
168                                self.replay = true;
169                            }
170                            client.subscribe(sub).await.map_err(to_pyruntime_err)?;
171                        }
172                        LiveCommand::Start => {
173                            buffering_start = if self.replay {
174                                Some(clock.get_time_ns())
175                            } else {
176                                None
177                            };
178                            client.start().await.map_err(to_pyruntime_err)?;
179                            running = true;
180                            tracing::debug!("Started");
181                        }
182                        LiveCommand::Close => {
183                            self.msg_tx.send(LiveMessage::Close).await?;
184                            if running {
185                                client.close().await.map_err(to_pyruntime_err)?;
186                                tracing::debug!("Closed inner client");
187                            }
188                            break;
189                        }
190                    }
191                }
192                Err(TryRecvError::Empty) => {} // No command yet
193                Err(TryRecvError::Disconnected) => {
194                    tracing::debug!("Disconnected");
195                    break;
196                }
197            }
198
199            if !running {
200                continue;
201            }
202
203            // Await the next record with a timeout
204            let result = tokio::time::timeout(timeout, client.next_record()).await;
205            let record_opt = match result {
206                Ok(record_opt) => record_opt,
207                Err(_) => continue, // Timeout
208            };
209
210            let record = match record_opt {
211                Ok(Some(record)) => record,
212                Ok(None) => break, // Session ended normally
213                Err(e) => {
214                    // Fail the session entirely for now. Consider refining
215                    // this strategy to handle specific errors more gracefully.
216                    self.send_msg(LiveMessage::Error(anyhow::anyhow!(e))).await;
217                    break;
218                }
219            };
220
221            let ts_init = clock.get_time_ns();
222            let mut initialized_books = HashSet::new();
223
224            // Decode record
225            if let Some(msg) = record.get::<dbn::ErrorMsg>() {
226                handle_error_msg(msg);
227            } else if let Some(msg) = record.get::<dbn::SystemMsg>() {
228                handle_system_msg(msg);
229            } else if let Some(msg) = record.get::<dbn::SymbolMappingMsg>() {
230                // Remove instrument ID index as the raw symbol may have changed
231                instrument_id_map.remove(&msg.hd.instrument_id);
232                handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map)?;
233            } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
234                if self.use_exchange_as_venue {
235                    let exchange = msg.exchange()?;
236                    if !exchange.is_empty() {
237                        update_instrument_id_map_with_exchange(
238                            &symbol_map,
239                            &self.symbol_venue_map,
240                            &mut instrument_id_map,
241                            msg.hd.instrument_id,
242                            exchange,
243                        )?;
244                    }
245                }
246                let data = {
247                    let sym_map = self
248                        .symbol_venue_map
249                        .read()
250                        .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
251                    handle_instrument_def_msg(
252                        msg,
253                        &record,
254                        &symbol_map,
255                        &self.publisher_venue_map,
256                        &sym_map,
257                        &mut instrument_id_map,
258                        ts_init,
259                    )?
260                };
261                self.send_msg(LiveMessage::Instrument(data)).await;
262            } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
263                let data = {
264                    let sym_map = self
265                        .symbol_venue_map
266                        .read()
267                        .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
268                    handle_status_msg(
269                        msg,
270                        &record,
271                        &symbol_map,
272                        &self.publisher_venue_map,
273                        &sym_map,
274                        &mut instrument_id_map,
275                        ts_init,
276                    )?
277                };
278                self.send_msg(LiveMessage::Status(data)).await;
279            } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
280                let data = {
281                    let sym_map = self
282                        .symbol_venue_map
283                        .read()
284                        .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
285                    handle_imbalance_msg(
286                        msg,
287                        &record,
288                        &symbol_map,
289                        &self.publisher_venue_map,
290                        &sym_map,
291                        &mut instrument_id_map,
292                        ts_init,
293                    )?
294                };
295                self.send_msg(LiveMessage::Imbalance(data)).await;
296            } else if let Some(msg) = record.get::<dbn::StatMsg>() {
297                let data = {
298                    let sym_map = self
299                        .symbol_venue_map
300                        .read()
301                        .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
302                    handle_statistics_msg(
303                        msg,
304                        &record,
305                        &symbol_map,
306                        &self.publisher_venue_map,
307                        &sym_map,
308                        &mut instrument_id_map,
309                        ts_init,
310                    )?
311                };
312                self.send_msg(LiveMessage::Statistics(data)).await;
313            } else {
314                // Decode a generic record with possible errors
315                let (mut data1, data2) = match {
316                    let sym_map = self
317                        .symbol_venue_map
318                        .read()
319                        .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
320                    handle_record(
321                        record,
322                        &symbol_map,
323                        &self.publisher_venue_map,
324                        &sym_map,
325                        &mut instrument_id_map,
326                        ts_init,
327                        &initialized_books,
328                        self.bars_timestamp_on_close,
329                    )
330                } {
331                    Ok(decoded) => decoded,
332                    Err(e) => {
333                        tracing::error!("Error decoding record: {e}");
334                        continue;
335                    }
336                };
337
338                if let Some(msg) = record.get::<dbn::MboMsg>() {
339                    // Check if should mark book initialized
340                    if let Some(Data::Delta(delta)) = &data1 {
341                        initialized_books.insert(delta.instrument_id);
342                    } else {
343                        continue; // No delta yet
344                    }
345
346                    if let Data::Delta(delta) = data1.clone().expect("MBO should decode a delta") {
347                        let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
348                        buffer.push(delta);
349
350                        deltas_count += 1;
351                        tracing::trace!(
352                            "Buffering delta: {deltas_count} {} {buffering_start:?} flags={}",
353                            delta.ts_event,
354                            msg.flags.raw(),
355                        );
356
357                        // Check if last message in the book event
358                        if !RecordFlag::F_LAST.matches(msg.flags.raw()) {
359                            continue; // NOT last message
360                        }
361
362                        // Check if snapshot
363                        if RecordFlag::F_SNAPSHOT.matches(msg.flags.raw()) {
364                            continue; // Buffer snapshot
365                        }
366
367                        // Check if buffering a replay
368                        if let Some(start_ns) = buffering_start {
369                            if delta.ts_event <= start_ns {
370                                continue; // Continue buffering replay
371                            }
372                            buffering_start = None;
373                        }
374
375                        // SAFETY: We can guarantee a deltas vec exists
376                        let buffer =
377                            buffered_deltas
378                                .remove(&delta.instrument_id)
379                                .ok_or_else(|| {
380                                    anyhow::anyhow!(
381                                        "Internal error: no buffered deltas for instrument {id}",
382                                        id = delta.instrument_id
383                                    )
384                                })?;
385                        let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
386                        let deltas = OrderBookDeltas_API::new(deltas);
387                        data1 = Some(Data::Deltas(deltas));
388                    }
389                }
390
391                if let Some(data) = data1 {
392                    self.send_msg(LiveMessage::Data(data)).await;
393                }
394
395                if let Some(data) = data2 {
396                    self.send_msg(LiveMessage::Data(data)).await;
397                }
398            }
399        }
400
401        self.cmd_rx.close();
402        tracing::debug!("Closed command receiver");
403
404        Ok(())
405    }
406
407    async fn send_msg(&mut self, msg: LiveMessage) {
408        tracing::trace!("Sending {msg:?}");
409        match self.msg_tx.send(msg).await {
410            Ok(()) => {}
411            Err(e) => tracing::error!("Error sending message: {e}"),
412        }
413    }
414}
415
416fn handle_error_msg(msg: &dbn::ErrorMsg) {
417    tracing::error!("{msg:?}");
418}
419
420fn handle_system_msg(msg: &dbn::SystemMsg) {
421    tracing::info!("{msg:?}");
422}
423
424fn handle_symbol_mapping_msg(
425    msg: &dbn::SymbolMappingMsg,
426    symbol_map: &mut PitSymbolMap,
427    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
428) -> anyhow::Result<()> {
429    symbol_map
430        .on_symbol_mapping(msg)
431        .map_err(|e| anyhow::anyhow!("on_symbol_mapping failed for {msg:?}: {e}"))?;
432    instrument_id_map.remove(&msg.header().instrument_id);
433    Ok(())
434}
435
436fn update_instrument_id_map_with_exchange(
437    symbol_map: &PitSymbolMap,
438    symbol_venue_map: &RwLock<AHashMap<Symbol, Venue>>,
439    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
440    raw_instrument_id: u32,
441    exchange: &str,
442) -> anyhow::Result<InstrumentId> {
443    let raw_symbol = symbol_map.get(raw_instrument_id).ok_or_else(|| {
444        anyhow::anyhow!("Cannot resolve raw_symbol for instrument_id {raw_instrument_id}")
445    })?;
446    let symbol = Symbol::from(raw_symbol.as_str());
447    let venue = Venue::from(exchange);
448    let instrument_id = InstrumentId::new(symbol, venue);
449    let mut map = symbol_venue_map
450        .write()
451        .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
452    map.entry(symbol).or_insert(venue);
453    instrument_id_map.insert(raw_instrument_id, instrument_id);
454    Ok(instrument_id)
455}
456
457fn update_instrument_id_map(
458    record: &dbn::RecordRef,
459    symbol_map: &PitSymbolMap,
460    publisher_venue_map: &IndexMap<PublisherId, Venue>,
461    symbol_venue_map: &AHashMap<Symbol, Venue>,
462    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
463) -> InstrumentId {
464    let header = record.header();
465
466    // Check if instrument ID is already in the map
467    if let Some(&instrument_id) = instrument_id_map.get(&header.instrument_id) {
468        return instrument_id;
469    }
470
471    let raw_symbol = symbol_map
472        .get_for_rec(record)
473        .expect("Cannot resolve `raw_symbol` from `symbol_map`");
474
475    let symbol = Symbol::from_str_unchecked(raw_symbol);
476
477    let publisher_id = header.publisher_id;
478    let venue = match symbol_venue_map.get(&symbol) {
479        Some(venue) => venue,
480        None => publisher_venue_map
481            .get(&publisher_id)
482            .unwrap_or_else(|| panic!("No venue found for `publisher_id` {publisher_id}")),
483    };
484    let instrument_id = InstrumentId::new(symbol, *venue);
485
486    instrument_id_map.insert(header.instrument_id, instrument_id);
487    instrument_id
488}
489
490fn handle_instrument_def_msg(
491    msg: &dbn::InstrumentDefMsg,
492    record: &dbn::RecordRef,
493    symbol_map: &PitSymbolMap,
494    publisher_venue_map: &IndexMap<PublisherId, Venue>,
495    symbol_venue_map: &AHashMap<Symbol, Venue>,
496    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
497    ts_init: UnixNanos,
498) -> anyhow::Result<InstrumentAny> {
499    let instrument_id = update_instrument_id_map(
500        record,
501        symbol_map,
502        publisher_venue_map,
503        symbol_venue_map,
504        instrument_id_map,
505    );
506
507    decode_instrument_def_msg(msg, instrument_id, Some(ts_init))
508}
509
510fn handle_status_msg(
511    msg: &dbn::StatusMsg,
512    record: &dbn::RecordRef,
513    symbol_map: &PitSymbolMap,
514    publisher_venue_map: &IndexMap<PublisherId, Venue>,
515    symbol_venue_map: &AHashMap<Symbol, Venue>,
516    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
517    ts_init: UnixNanos,
518) -> anyhow::Result<InstrumentStatus> {
519    let instrument_id = update_instrument_id_map(
520        record,
521        symbol_map,
522        publisher_venue_map,
523        symbol_venue_map,
524        instrument_id_map,
525    );
526
527    decode_status_msg(msg, instrument_id, Some(ts_init))
528}
529
530fn handle_imbalance_msg(
531    msg: &dbn::ImbalanceMsg,
532    record: &dbn::RecordRef,
533    symbol_map: &PitSymbolMap,
534    publisher_venue_map: &IndexMap<PublisherId, Venue>,
535    symbol_venue_map: &AHashMap<Symbol, Venue>,
536    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
537    ts_init: UnixNanos,
538) -> anyhow::Result<DatabentoImbalance> {
539    let instrument_id = update_instrument_id_map(
540        record,
541        symbol_map,
542        publisher_venue_map,
543        symbol_venue_map,
544        instrument_id_map,
545    );
546
547    let price_precision = 2; // Hard-coded for now
548
549    decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init))
550}
551
552fn handle_statistics_msg(
553    msg: &dbn::StatMsg,
554    record: &dbn::RecordRef,
555    symbol_map: &PitSymbolMap,
556    publisher_venue_map: &IndexMap<PublisherId, Venue>,
557    symbol_venue_map: &AHashMap<Symbol, Venue>,
558    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
559    ts_init: UnixNanos,
560) -> anyhow::Result<DatabentoStatistics> {
561    let instrument_id = update_instrument_id_map(
562        record,
563        symbol_map,
564        publisher_venue_map,
565        symbol_venue_map,
566        instrument_id_map,
567    );
568
569    let price_precision = 2; // Hard-coded for now
570
571    decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
572}
573
574#[allow(clippy::too_many_arguments)]
575fn handle_record(
576    record: dbn::RecordRef,
577    symbol_map: &PitSymbolMap,
578    publisher_venue_map: &IndexMap<PublisherId, Venue>,
579    symbol_venue_map: &AHashMap<Symbol, Venue>,
580    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
581    ts_init: UnixNanos,
582    initialized_books: &HashSet<InstrumentId>,
583    bars_timestamp_on_close: bool,
584) -> anyhow::Result<(Option<Data>, Option<Data>)> {
585    let instrument_id = update_instrument_id_map(
586        &record,
587        symbol_map,
588        publisher_venue_map,
589        symbol_venue_map,
590        instrument_id_map,
591    );
592
593    let price_precision = 2; // Hard-coded for now
594    let include_trades = initialized_books.contains(&instrument_id);
595
596    decode_record(
597        &record,
598        instrument_id,
599        price_precision,
600        Some(ts_init),
601        include_trades,
602        bars_timestamp_on_close,
603    )
604}