nautilus_databento/
live.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    sync::{Arc, RwLock},
18    time::Duration as StdDuration,
19};
20
21use ahash::{AHashMap, HashSet, HashSetExt};
22use databento::{
23    dbn::{self, PitSymbolMap, Record, SymbolIndex},
24    live::Subscription,
25};
26use indexmap::IndexMap;
27use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
28use nautilus_model::{
29    data::{Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
30    enums::RecordFlag,
31    identifiers::{InstrumentId, Symbol, Venue},
32    instruments::InstrumentAny,
33};
34use tokio::{sync::mpsc::error::TryRecvError, time::Duration};
35
36use super::{
37    decode::{decode_imbalance_msg, decode_statistics_msg, decode_status_msg},
38    types::{DatabentoImbalance, DatabentoStatistics},
39};
40use crate::{
41    decode::{decode_instrument_def_msg, decode_record},
42    types::PublisherId,
43};
44
45#[derive(Debug)]
46pub enum LiveCommand {
47    Subscribe(Subscription),
48    Start,
49    Close,
50}
51
52#[derive(Debug)]
53#[allow(clippy::large_enum_variant)] // TODO: Optimize this (largest variant 1096 vs 80 bytes)
54pub enum LiveMessage {
55    Data(Data),
56    Instrument(InstrumentAny),
57    Status(InstrumentStatus),
58    Imbalance(DatabentoImbalance),
59    Statistics(DatabentoStatistics),
60    Error(anyhow::Error),
61    Close,
62}
63
64/// Handles a raw TCP data feed from the Databento LSG for a single dataset.
65///
66/// [`LiveCommand`] messages are recieved synchronously across a channel,
67/// decoded records are sent asynchronously on a tokio channel as [`LiveMessage`]s
68/// back to a message processing task.
69///
70/// # Crash Policy
71///
72/// This handler intentionally crashes on catastrophic feed issues rather than
73/// attempting recovery. If excessive buffering occurs (indicating severe feed
74/// misbehavior), the process will run out of memory and terminate. This is by
75/// design - such scenarios indicate fundamental problems that require external
76/// intervention.
77#[derive(Debug)]
78pub struct DatabentoFeedHandler {
79    key: String,
80    dataset: String,
81    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
82    msg_tx: tokio::sync::mpsc::Sender<LiveMessage>,
83    publisher_venue_map: IndexMap<PublisherId, Venue>,
84    symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
85    replay: bool,
86    use_exchange_as_venue: bool,
87    bars_timestamp_on_close: bool,
88}
89
90impl DatabentoFeedHandler {
91    /// Creates a new [`DatabentoFeedHandler`] instance.
92    #[must_use]
93    #[allow(clippy::too_many_arguments)]
94    pub const fn new(
95        key: String,
96        dataset: String,
97        rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
98        tx: tokio::sync::mpsc::Sender<LiveMessage>,
99        publisher_venue_map: IndexMap<PublisherId, Venue>,
100        symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
101        use_exchange_as_venue: bool,
102        bars_timestamp_on_close: bool,
103    ) -> Self {
104        Self {
105            key,
106            dataset,
107            cmd_rx: rx,
108            msg_tx: tx,
109            publisher_venue_map,
110            symbol_venue_map,
111            replay: false,
112            use_exchange_as_venue,
113            bars_timestamp_on_close,
114        }
115    }
116
117    /// Runs the feed handler main loop, processing commands and streaming market data.
118    ///
119    /// Establishes a connection to the Databento LSG, subscribes to requested data feeds,
120    /// and continuously processes incoming market data messages until shutdown.
121    ///
122    /// # Errors
123    ///
124    /// Returns an error if any client operation or message handling fails.
125    #[allow(clippy::blocks_in_conditions)]
126    pub async fn run(&mut self) -> anyhow::Result<()> {
127        tracing::debug!("Running feed handler");
128        let clock = get_atomic_clock_realtime();
129        let mut symbol_map = PitSymbolMap::new();
130        let mut instrument_id_map: AHashMap<u32, InstrumentId> = AHashMap::new();
131
132        let mut buffering_start = None;
133        let mut buffered_deltas: AHashMap<InstrumentId, Vec<OrderBookDelta>> = AHashMap::new();
134        let mut initialized_books = HashSet::new();
135        let timeout = Duration::from_secs(5); // Hardcoded timeout for now
136
137        let result = tokio::time::timeout(
138            timeout,
139            databento::LiveClient::builder()
140                .user_agent_extension(NAUTILUS_USER_AGENT.into())
141                .key(self.key.clone())?
142                .dataset(self.dataset.clone())
143                .build(),
144        )
145        .await?;
146
147        tracing::info!("Connected");
148
149        let mut client = match result {
150            Ok(client) => client,
151            Err(e) => {
152                self.msg_tx.send(LiveMessage::Close).await?;
153                self.cmd_rx.close();
154                anyhow::bail!("Failed to connect to Databento LSG: {e}");
155            }
156        };
157
158        // Timeout awaiting the next record before checking for a command
159        let timeout = Duration::from_millis(10);
160
161        // Flag to control whether to continue to await next record
162        let mut running = false;
163
164        loop {
165            if self.msg_tx.is_closed() {
166                tracing::debug!("Message channel was closed: stopping");
167                break;
168            }
169
170            match self.cmd_rx.try_recv() {
171                Ok(cmd) => {
172                    tracing::debug!("Received command: {cmd:?}");
173                    match cmd {
174                        LiveCommand::Subscribe(sub) => {
175                            if !self.replay && sub.start.is_some() {
176                                self.replay = true;
177                            }
178                            client.subscribe(sub).await?;
179                        }
180                        LiveCommand::Start => {
181                            buffering_start = if self.replay {
182                                Some(clock.get_time_ns())
183                            } else {
184                                None
185                            };
186                            client.start().await?;
187                            running = true;
188                            tracing::debug!("Started");
189                        }
190                        LiveCommand::Close => {
191                            self.msg_tx.send(LiveMessage::Close).await?;
192                            if running {
193                                client.close().await?;
194                                tracing::debug!("Closed inner client");
195                            }
196                            break;
197                        }
198                    }
199                }
200                Err(TryRecvError::Empty) => {} // No command yet
201                Err(TryRecvError::Disconnected) => {
202                    tracing::debug!("Disconnected");
203                    break;
204                }
205            }
206
207            if !running {
208                continue;
209            }
210
211            // Await the next record with a timeout
212            let result = tokio::time::timeout(timeout, client.next_record()).await;
213            let record_opt = match result {
214                Ok(record_opt) => record_opt,
215                Err(_) => continue, // Timeout
216            };
217
218            let record = match record_opt {
219                Ok(Some(record)) => record,
220                Ok(None) => break, // Session ended normally
221                Err(e) => {
222                    // Fail the session entirely for now. Consider refining
223                    // this strategy to handle specific errors more gracefully.
224                    self.send_msg(LiveMessage::Error(anyhow::anyhow!(e))).await;
225                    break;
226                }
227            };
228
229            let ts_init = clock.get_time_ns();
230
231            // Decode record
232            if let Some(msg) = record.get::<dbn::ErrorMsg>() {
233                handle_error_msg(msg);
234            } else if let Some(msg) = record.get::<dbn::SystemMsg>() {
235                handle_system_msg(msg);
236            } else if let Some(msg) = record.get::<dbn::SymbolMappingMsg>() {
237                // Remove instrument ID index as the raw symbol may have changed
238                instrument_id_map.remove(&msg.hd.instrument_id);
239                handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map)?;
240            } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
241                if self.use_exchange_as_venue {
242                    let exchange = msg.exchange()?;
243                    if !exchange.is_empty() {
244                        update_instrument_id_map_with_exchange(
245                            &symbol_map,
246                            &self.symbol_venue_map,
247                            &mut instrument_id_map,
248                            msg.hd.instrument_id,
249                            exchange,
250                        )?;
251                    }
252                }
253                let data = {
254                    let sym_map = self.read_symbol_venue_map()?;
255                    handle_instrument_def_msg(
256                        msg,
257                        &record,
258                        &symbol_map,
259                        &self.publisher_venue_map,
260                        &sym_map,
261                        &mut instrument_id_map,
262                        ts_init,
263                    )?
264                };
265                self.send_msg(LiveMessage::Instrument(data)).await;
266            } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
267                let data = {
268                    let sym_map = self.read_symbol_venue_map()?;
269                    handle_status_msg(
270                        msg,
271                        &record,
272                        &symbol_map,
273                        &self.publisher_venue_map,
274                        &sym_map,
275                        &mut instrument_id_map,
276                        ts_init,
277                    )?
278                };
279                self.send_msg(LiveMessage::Status(data)).await;
280            } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
281                let data = {
282                    let sym_map = self.read_symbol_venue_map()?;
283                    handle_imbalance_msg(
284                        msg,
285                        &record,
286                        &symbol_map,
287                        &self.publisher_venue_map,
288                        &sym_map,
289                        &mut instrument_id_map,
290                        ts_init,
291                    )?
292                };
293                self.send_msg(LiveMessage::Imbalance(data)).await;
294            } else if let Some(msg) = record.get::<dbn::StatMsg>() {
295                let data = {
296                    let sym_map = self.read_symbol_venue_map()?;
297                    handle_statistics_msg(
298                        msg,
299                        &record,
300                        &symbol_map,
301                        &self.publisher_venue_map,
302                        &sym_map,
303                        &mut instrument_id_map,
304                        ts_init,
305                    )?
306                };
307                self.send_msg(LiveMessage::Statistics(data)).await;
308            } else {
309                // Decode a generic record with possible errors
310                let (mut data1, data2) = match {
311                    let sym_map = self.read_symbol_venue_map()?;
312                    handle_record(
313                        record,
314                        &symbol_map,
315                        &self.publisher_venue_map,
316                        &sym_map,
317                        &mut instrument_id_map,
318                        ts_init,
319                        &initialized_books,
320                        self.bars_timestamp_on_close,
321                    )
322                } {
323                    Ok(decoded) => decoded,
324                    Err(e) => {
325                        tracing::error!("Error decoding record: {e}");
326                        continue;
327                    }
328                };
329
330                if let Some(msg) = record.get::<dbn::MboMsg>() {
331                    // Check if should mark book initialized
332                    if let Some(Data::Delta(delta)) = &data1 {
333                        initialized_books.insert(delta.instrument_id);
334                    } else {
335                        continue; // No delta yet
336                    }
337
338                    if let Some(Data::Delta(delta)) = &data1 {
339                        let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
340                        buffer.push(*delta);
341
342                        tracing::trace!(
343                            "Buffering delta: {} {buffering_start:?} flags={}",
344                            delta.ts_event,
345                            msg.flags.raw(),
346                        );
347
348                        // Check if last message in the book event
349                        if !RecordFlag::F_LAST.matches(msg.flags.raw()) {
350                            continue; // NOT last message
351                        }
352
353                        // Check if snapshot
354                        if RecordFlag::F_SNAPSHOT.matches(msg.flags.raw()) {
355                            continue; // Buffer snapshot
356                        }
357
358                        // Check if buffering a replay
359                        if let Some(start_ns) = buffering_start {
360                            if delta.ts_event <= start_ns {
361                                continue; // Continue buffering replay
362                            }
363                            buffering_start = None;
364                        }
365
366                        // SAFETY: We can guarantee a deltas vec exists
367                        let buffer =
368                            buffered_deltas
369                                .remove(&delta.instrument_id)
370                                .ok_or_else(|| {
371                                    anyhow::anyhow!(
372                                        "Internal error: no buffered deltas for instrument {id}",
373                                        id = delta.instrument_id
374                                    )
375                                })?;
376                        let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
377                        let deltas = OrderBookDeltas_API::new(deltas);
378                        data1 = Some(Data::Deltas(deltas));
379                    }
380                }
381
382                if let Some(data) = data1 {
383                    self.send_msg(LiveMessage::Data(data)).await;
384                }
385
386                if let Some(data) = data2 {
387                    self.send_msg(LiveMessage::Data(data)).await;
388                }
389            }
390        }
391
392        self.cmd_rx.close();
393        tracing::debug!("Closed command receiver");
394
395        Ok(())
396    }
397
398    /// Sends a message to the message processing task.
399    async fn send_msg(&mut self, msg: LiveMessage) {
400        tracing::trace!("Sending {msg:?}");
401        match self.msg_tx.send(msg).await {
402            Ok(()) => {}
403            Err(e) => tracing::error!("Error sending message: {e}"),
404        }
405    }
406
407    /// Acquires a read lock on the symbol-venue map with exponential backoff and timeout.
408    ///
409    /// # Errors
410    ///
411    /// Returns an error if the read lock cannot be acquired within the deadline.
412    fn read_symbol_venue_map(
413        &self,
414    ) -> anyhow::Result<std::sync::RwLockReadGuard<'_, AHashMap<Symbol, Venue>>> {
415        // Try to acquire the lock with exponential backoff and deadline
416        const MAX_WAIT_MS: u64 = 500; // Total maximum wait time
417        const INITIAL_DELAY_MICROS: u64 = 10;
418        const MAX_DELAY_MICROS: u64 = 1000;
419
420        let deadline = std::time::Instant::now() + StdDuration::from_millis(MAX_WAIT_MS);
421        let mut delay = INITIAL_DELAY_MICROS;
422
423        loop {
424            match self.symbol_venue_map.try_read() {
425                Ok(guard) => return Ok(guard),
426                Err(std::sync::TryLockError::WouldBlock) => {
427                    if std::time::Instant::now() >= deadline {
428                        break;
429                    }
430
431                    // Yield to other threads first
432                    std::thread::yield_now();
433
434                    // Then sleep with exponential backoff if still blocked
435                    if std::time::Instant::now() < deadline {
436                        let remaining = deadline - std::time::Instant::now();
437                        let sleep_duration = StdDuration::from_micros(delay).min(remaining);
438                        std::thread::sleep(sleep_duration);
439                        // Exponential backoff with cap and jitter
440                        delay = ((delay * 2) + delay / 4).min(MAX_DELAY_MICROS);
441                    }
442                }
443                Err(std::sync::TryLockError::Poisoned(e)) => {
444                    anyhow::bail!("symbol_venue_map lock poisoned: {e}");
445                }
446            }
447        }
448
449        anyhow::bail!(
450            "Failed to acquire read lock on symbol_venue_map after {MAX_WAIT_MS}ms deadline"
451        )
452    }
453}
454
455/// Handles Databento error messages by logging them.
456fn handle_error_msg(msg: &dbn::ErrorMsg) {
457    tracing::error!("{msg:?}");
458}
459
460/// Handles Databento system messages by logging them.
461fn handle_system_msg(msg: &dbn::SystemMsg) {
462    tracing::info!("{msg:?}");
463}
464
465/// Handles symbol mapping messages and updates the instrument ID map.
466///
467/// # Errors
468///
469/// Returns an error if symbol mapping fails.
470fn handle_symbol_mapping_msg(
471    msg: &dbn::SymbolMappingMsg,
472    symbol_map: &mut PitSymbolMap,
473    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
474) -> anyhow::Result<()> {
475    symbol_map
476        .on_symbol_mapping(msg)
477        .map_err(|e| anyhow::anyhow!("on_symbol_mapping failed for {msg:?}: {e}"))?;
478    instrument_id_map.remove(&msg.header().instrument_id);
479    Ok(())
480}
481
482/// Updates the instrument ID map using exchange information from the symbol map.
483fn update_instrument_id_map_with_exchange(
484    symbol_map: &PitSymbolMap,
485    symbol_venue_map: &RwLock<AHashMap<Symbol, Venue>>,
486    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
487    raw_instrument_id: u32,
488    exchange: &str,
489) -> anyhow::Result<InstrumentId> {
490    let raw_symbol = symbol_map.get(raw_instrument_id).ok_or_else(|| {
491        anyhow::anyhow!("Cannot resolve raw_symbol for instrument_id {raw_instrument_id}")
492    })?;
493    let symbol = Symbol::from(raw_symbol.as_str());
494    let venue = Venue::from_code(exchange)
495        .map_err(|e| anyhow::anyhow!("Invalid venue code '{exchange}': {e}"))?;
496    let instrument_id = InstrumentId::new(symbol, venue);
497    let mut map = symbol_venue_map
498        .write()
499        .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
500    map.entry(symbol).or_insert(venue);
501    instrument_id_map.insert(raw_instrument_id, instrument_id);
502    Ok(instrument_id)
503}
504
505fn update_instrument_id_map(
506    record: &dbn::RecordRef,
507    symbol_map: &PitSymbolMap,
508    publisher_venue_map: &IndexMap<PublisherId, Venue>,
509    symbol_venue_map: &AHashMap<Symbol, Venue>,
510    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
511) -> anyhow::Result<InstrumentId> {
512    let header = record.header();
513
514    // Check if instrument ID is already in the map
515    if let Some(&instrument_id) = instrument_id_map.get(&header.instrument_id) {
516        return Ok(instrument_id);
517    }
518
519    let raw_symbol = symbol_map.get_for_rec(record).ok_or_else(|| {
520        anyhow::anyhow!(
521            "Cannot resolve `raw_symbol` from `symbol_map` for instrument_id {}",
522            header.instrument_id
523        )
524    })?;
525
526    let symbol = Symbol::from_str_unchecked(raw_symbol);
527
528    let publisher_id = header.publisher_id;
529    let venue = if let Some(venue) = symbol_venue_map.get(&symbol) {
530        *venue
531    } else {
532        let venue = publisher_venue_map
533            .get(&publisher_id)
534            .ok_or_else(|| anyhow::anyhow!("No venue found for `publisher_id` {publisher_id}"))?;
535        *venue
536    };
537    let instrument_id = InstrumentId::new(symbol, venue);
538
539    instrument_id_map.insert(header.instrument_id, instrument_id);
540    Ok(instrument_id)
541}
542
543/// Handles instrument definition messages and decodes them into Nautilus instruments.
544///
545/// # Errors
546///
547/// Returns an error if instrument decoding fails.
548fn handle_instrument_def_msg(
549    msg: &dbn::InstrumentDefMsg,
550    record: &dbn::RecordRef,
551    symbol_map: &PitSymbolMap,
552    publisher_venue_map: &IndexMap<PublisherId, Venue>,
553    symbol_venue_map: &AHashMap<Symbol, Venue>,
554    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
555    ts_init: UnixNanos,
556) -> anyhow::Result<InstrumentAny> {
557    let instrument_id = update_instrument_id_map(
558        record,
559        symbol_map,
560        publisher_venue_map,
561        symbol_venue_map,
562        instrument_id_map,
563    )?;
564
565    decode_instrument_def_msg(msg, instrument_id, Some(ts_init))
566}
567
568fn handle_status_msg(
569    msg: &dbn::StatusMsg,
570    record: &dbn::RecordRef,
571    symbol_map: &PitSymbolMap,
572    publisher_venue_map: &IndexMap<PublisherId, Venue>,
573    symbol_venue_map: &AHashMap<Symbol, Venue>,
574    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
575    ts_init: UnixNanos,
576) -> anyhow::Result<InstrumentStatus> {
577    let instrument_id = update_instrument_id_map(
578        record,
579        symbol_map,
580        publisher_venue_map,
581        symbol_venue_map,
582        instrument_id_map,
583    )?;
584
585    decode_status_msg(msg, instrument_id, Some(ts_init))
586}
587
588fn handle_imbalance_msg(
589    msg: &dbn::ImbalanceMsg,
590    record: &dbn::RecordRef,
591    symbol_map: &PitSymbolMap,
592    publisher_venue_map: &IndexMap<PublisherId, Venue>,
593    symbol_venue_map: &AHashMap<Symbol, Venue>,
594    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
595    ts_init: UnixNanos,
596) -> anyhow::Result<DatabentoImbalance> {
597    let instrument_id = update_instrument_id_map(
598        record,
599        symbol_map,
600        publisher_venue_map,
601        symbol_venue_map,
602        instrument_id_map,
603    )?;
604
605    let price_precision = 2; // Hardcoded for now
606
607    decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init))
608}
609
610fn handle_statistics_msg(
611    msg: &dbn::StatMsg,
612    record: &dbn::RecordRef,
613    symbol_map: &PitSymbolMap,
614    publisher_venue_map: &IndexMap<PublisherId, Venue>,
615    symbol_venue_map: &AHashMap<Symbol, Venue>,
616    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
617    ts_init: UnixNanos,
618) -> anyhow::Result<DatabentoStatistics> {
619    let instrument_id = update_instrument_id_map(
620        record,
621        symbol_map,
622        publisher_venue_map,
623        symbol_venue_map,
624        instrument_id_map,
625    )?;
626
627    let price_precision = 2; // Hardcoded for now
628
629    decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
630}
631
632#[allow(clippy::too_many_arguments)]
633fn handle_record(
634    record: dbn::RecordRef,
635    symbol_map: &PitSymbolMap,
636    publisher_venue_map: &IndexMap<PublisherId, Venue>,
637    symbol_venue_map: &AHashMap<Symbol, Venue>,
638    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
639    ts_init: UnixNanos,
640    initialized_books: &HashSet<InstrumentId>,
641    bars_timestamp_on_close: bool,
642) -> anyhow::Result<(Option<Data>, Option<Data>)> {
643    let instrument_id = update_instrument_id_map(
644        &record,
645        symbol_map,
646        publisher_venue_map,
647        symbol_venue_map,
648        instrument_id_map,
649    )?;
650
651    let price_precision = 2; // Hardcoded for now
652
653    // For MBP-1 and quote-based schemas, always include trades since they're integral to the data
654    // For MBO, only include trades after the book is initialized to maintain consistency
655    let include_trades = if record.get::<dbn::Mbp1Msg>().is_some()
656        || record.get::<dbn::TbboMsg>().is_some()
657        || record.get::<dbn::Cmbp1Msg>().is_some()
658    {
659        true // These schemas include trade information directly
660    } else {
661        initialized_books.contains(&instrument_id) // MBO requires initialized book
662    };
663
664    decode_record(
665        &record,
666        instrument_id,
667        price_precision,
668        Some(ts_init),
669        include_trades,
670        bars_timestamp_on_close,
671    )
672}