nautilus_databento/
live.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    sync::{Arc, RwLock},
18    time::Duration as StdDuration,
19};
20
21use ahash::{AHashMap, HashSet, HashSetExt};
22use databento::{
23    dbn::{self, PitSymbolMap, Record, SymbolIndex},
24    live::Subscription,
25};
26use indexmap::IndexMap;
27use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
28use nautilus_model::{
29    data::{Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
30    enums::RecordFlag,
31    identifiers::{InstrumentId, Symbol, Venue},
32    instruments::InstrumentAny,
33};
34use tokio::{sync::mpsc::error::TryRecvError, time::Duration};
35
36use super::{
37    decode::{decode_imbalance_msg, decode_statistics_msg, decode_status_msg},
38    types::{DatabentoImbalance, DatabentoStatistics},
39};
40use crate::{
41    decode::{decode_instrument_def_msg, decode_record},
42    types::PublisherId,
43};
44
45#[derive(Debug)]
46pub enum LiveCommand {
47    Subscribe(Subscription),
48    Start,
49    Close,
50}
51
52#[derive(Debug)]
53#[allow(
54    clippy::large_enum_variant,
55    reason = "TODO: Optimize this (largest variant 1096 vs 80 bytes)"
56)]
57pub enum LiveMessage {
58    Data(Data),
59    Instrument(InstrumentAny),
60    Status(InstrumentStatus),
61    Imbalance(DatabentoImbalance),
62    Statistics(DatabentoStatistics),
63    Error(anyhow::Error),
64    Close,
65}
66
67/// Handles a raw TCP data feed from the Databento LSG for a single dataset.
68///
69/// [`LiveCommand`] messages are recieved synchronously across a channel,
70/// decoded records are sent asynchronously on a tokio channel as [`LiveMessage`]s
71/// back to a message processing task.
72///
73/// # Crash Policy
74///
75/// This handler intentionally crashes on catastrophic feed issues rather than
76/// attempting recovery. If excessive buffering occurs (indicating severe feed
77/// misbehavior), the process will run out of memory and terminate. This is by
78/// design - such scenarios indicate fundamental problems that require external
79/// intervention.
80#[derive(Debug)]
81pub struct DatabentoFeedHandler {
82    key: String,
83    dataset: String,
84    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
85    msg_tx: tokio::sync::mpsc::Sender<LiveMessage>,
86    publisher_venue_map: IndexMap<PublisherId, Venue>,
87    symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
88    replay: bool,
89    use_exchange_as_venue: bool,
90    bars_timestamp_on_close: bool,
91}
92
93impl DatabentoFeedHandler {
94    /// Creates a new [`DatabentoFeedHandler`] instance.
95    #[must_use]
96    #[allow(clippy::too_many_arguments)]
97    pub const fn new(
98        key: String,
99        dataset: String,
100        rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
101        tx: tokio::sync::mpsc::Sender<LiveMessage>,
102        publisher_venue_map: IndexMap<PublisherId, Venue>,
103        symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
104        use_exchange_as_venue: bool,
105        bars_timestamp_on_close: bool,
106    ) -> Self {
107        Self {
108            key,
109            dataset,
110            cmd_rx: rx,
111            msg_tx: tx,
112            publisher_venue_map,
113            symbol_venue_map,
114            replay: false,
115            use_exchange_as_venue,
116            bars_timestamp_on_close,
117        }
118    }
119
120    /// Runs the feed handler main loop, processing commands and streaming market data.
121    ///
122    /// Establishes a connection to the Databento LSG, subscribes to requested data feeds,
123    /// and continuously processes incoming market data messages until shutdown.
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if any client operation or message handling fails.
128    #[allow(clippy::blocks_in_conditions)]
129    pub async fn run(&mut self) -> anyhow::Result<()> {
130        tracing::debug!("Running feed handler");
131        let clock = get_atomic_clock_realtime();
132        let mut symbol_map = PitSymbolMap::new();
133        let mut instrument_id_map: AHashMap<u32, InstrumentId> = AHashMap::new();
134
135        let mut buffering_start = None;
136        let mut buffered_deltas: AHashMap<InstrumentId, Vec<OrderBookDelta>> = AHashMap::new();
137        let mut initialized_books = HashSet::new();
138        let timeout = Duration::from_secs(5); // Hardcoded timeout for now
139
140        let result = tokio::time::timeout(
141            timeout,
142            databento::LiveClient::builder()
143                .user_agent_extension(NAUTILUS_USER_AGENT.into())
144                .key(self.key.clone())?
145                .dataset(self.dataset.clone())
146                .build(),
147        )
148        .await?;
149
150        tracing::info!("Connected");
151
152        let mut client = match result {
153            Ok(client) => client,
154            Err(e) => {
155                self.msg_tx.send(LiveMessage::Close).await?;
156                self.cmd_rx.close();
157                anyhow::bail!("Failed to connect to Databento LSG: {e}");
158            }
159        };
160
161        // Timeout awaiting the next record before checking for a command
162        let timeout = Duration::from_millis(10);
163
164        // Flag to control whether to continue to await next record
165        let mut running = false;
166
167        loop {
168            if self.msg_tx.is_closed() {
169                tracing::debug!("Message channel was closed: stopping");
170                break;
171            }
172
173            match self.cmd_rx.try_recv() {
174                Ok(cmd) => {
175                    tracing::debug!("Received command: {cmd:?}");
176                    match cmd {
177                        LiveCommand::Subscribe(sub) => {
178                            if !self.replay && sub.start.is_some() {
179                                self.replay = true;
180                            }
181                            client.subscribe(sub).await?;
182                        }
183                        LiveCommand::Start => {
184                            buffering_start = if self.replay {
185                                Some(clock.get_time_ns())
186                            } else {
187                                None
188                            };
189                            client.start().await?;
190                            running = true;
191                            tracing::debug!("Started");
192                        }
193                        LiveCommand::Close => {
194                            self.msg_tx.send(LiveMessage::Close).await?;
195                            if running {
196                                client.close().await?;
197                                tracing::debug!("Closed inner client");
198                            }
199                            break;
200                        }
201                    }
202                }
203                Err(TryRecvError::Empty) => {} // No command yet
204                Err(TryRecvError::Disconnected) => {
205                    tracing::debug!("Disconnected");
206                    break;
207                }
208            }
209
210            if !running {
211                continue;
212            }
213
214            // Await the next record with a timeout
215            let result = tokio::time::timeout(timeout, client.next_record()).await;
216            let record_opt = match result {
217                Ok(record_opt) => record_opt,
218                Err(_) => continue, // Timeout
219            };
220
221            let record = match record_opt {
222                Ok(Some(record)) => record,
223                Ok(None) => break, // Session ended normally
224                Err(e) => {
225                    // Fail the session entirely for now. Consider refining
226                    // this strategy to handle specific errors more gracefully.
227                    self.send_msg(LiveMessage::Error(anyhow::anyhow!(e))).await;
228                    break;
229                }
230            };
231
232            let ts_init = clock.get_time_ns();
233
234            // Decode record
235            if let Some(msg) = record.get::<dbn::ErrorMsg>() {
236                handle_error_msg(msg);
237            } else if let Some(msg) = record.get::<dbn::SystemMsg>() {
238                handle_system_msg(msg);
239            } else if let Some(msg) = record.get::<dbn::SymbolMappingMsg>() {
240                // Remove instrument ID index as the raw symbol may have changed
241                instrument_id_map.remove(&msg.hd.instrument_id);
242                handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map)?;
243            } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
244                if self.use_exchange_as_venue {
245                    let exchange = msg.exchange()?;
246                    if !exchange.is_empty() {
247                        update_instrument_id_map_with_exchange(
248                            &symbol_map,
249                            &self.symbol_venue_map,
250                            &mut instrument_id_map,
251                            msg.hd.instrument_id,
252                            exchange,
253                        )?;
254                    }
255                }
256                let data = {
257                    let sym_map = self.read_symbol_venue_map()?;
258                    handle_instrument_def_msg(
259                        msg,
260                        &record,
261                        &symbol_map,
262                        &self.publisher_venue_map,
263                        &sym_map,
264                        &mut instrument_id_map,
265                        ts_init,
266                    )?
267                };
268                self.send_msg(LiveMessage::Instrument(data)).await;
269            } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
270                let data = {
271                    let sym_map = self.read_symbol_venue_map()?;
272                    handle_status_msg(
273                        msg,
274                        &record,
275                        &symbol_map,
276                        &self.publisher_venue_map,
277                        &sym_map,
278                        &mut instrument_id_map,
279                        ts_init,
280                    )?
281                };
282                self.send_msg(LiveMessage::Status(data)).await;
283            } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
284                let data = {
285                    let sym_map = self.read_symbol_venue_map()?;
286                    handle_imbalance_msg(
287                        msg,
288                        &record,
289                        &symbol_map,
290                        &self.publisher_venue_map,
291                        &sym_map,
292                        &mut instrument_id_map,
293                        ts_init,
294                    )?
295                };
296                self.send_msg(LiveMessage::Imbalance(data)).await;
297            } else if let Some(msg) = record.get::<dbn::StatMsg>() {
298                let data = {
299                    let sym_map = self.read_symbol_venue_map()?;
300                    handle_statistics_msg(
301                        msg,
302                        &record,
303                        &symbol_map,
304                        &self.publisher_venue_map,
305                        &sym_map,
306                        &mut instrument_id_map,
307                        ts_init,
308                    )?
309                };
310                self.send_msg(LiveMessage::Statistics(data)).await;
311            } else {
312                // Decode a generic record with possible errors
313                let (mut data1, data2) = match {
314                    let sym_map = self.read_symbol_venue_map()?;
315                    handle_record(
316                        record,
317                        &symbol_map,
318                        &self.publisher_venue_map,
319                        &sym_map,
320                        &mut instrument_id_map,
321                        ts_init,
322                        &initialized_books,
323                        self.bars_timestamp_on_close,
324                    )
325                } {
326                    Ok(decoded) => decoded,
327                    Err(e) => {
328                        tracing::error!("Error decoding record: {e}");
329                        continue;
330                    }
331                };
332
333                if let Some(msg) = record.get::<dbn::MboMsg>() {
334                    // Check if should mark book initialized
335                    if let Some(Data::Delta(delta)) = &data1 {
336                        initialized_books.insert(delta.instrument_id);
337                    } else {
338                        continue; // No delta yet
339                    }
340
341                    if let Some(Data::Delta(delta)) = &data1 {
342                        let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
343                        buffer.push(*delta);
344
345                        tracing::trace!(
346                            "Buffering delta: {} {buffering_start:?} flags={}",
347                            delta.ts_event,
348                            msg.flags.raw(),
349                        );
350
351                        // Check if last message in the book event
352                        if !RecordFlag::F_LAST.matches(msg.flags.raw()) {
353                            continue; // NOT last message
354                        }
355
356                        // Check if snapshot
357                        if RecordFlag::F_SNAPSHOT.matches(msg.flags.raw()) {
358                            continue; // Buffer snapshot
359                        }
360
361                        // Check if buffering a replay
362                        if let Some(start_ns) = buffering_start {
363                            if delta.ts_event <= start_ns {
364                                continue; // Continue buffering replay
365                            }
366                            buffering_start = None;
367                        }
368
369                        // SAFETY: We can guarantee a deltas vec exists
370                        let buffer =
371                            buffered_deltas
372                                .remove(&delta.instrument_id)
373                                .ok_or_else(|| {
374                                    anyhow::anyhow!(
375                                        "Internal error: no buffered deltas for instrument {id}",
376                                        id = delta.instrument_id
377                                    )
378                                })?;
379                        let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
380                        let deltas = OrderBookDeltas_API::new(deltas);
381                        data1 = Some(Data::Deltas(deltas));
382                    }
383                }
384
385                if let Some(data) = data1 {
386                    self.send_msg(LiveMessage::Data(data)).await;
387                }
388
389                if let Some(data) = data2 {
390                    self.send_msg(LiveMessage::Data(data)).await;
391                }
392            }
393        }
394
395        self.cmd_rx.close();
396        tracing::debug!("Closed command receiver");
397
398        Ok(())
399    }
400
401    /// Sends a message to the message processing task.
402    async fn send_msg(&mut self, msg: LiveMessage) {
403        tracing::trace!("Sending {msg:?}");
404        match self.msg_tx.send(msg).await {
405            Ok(()) => {}
406            Err(e) => tracing::error!("Error sending message: {e}"),
407        }
408    }
409
410    /// Acquires a read lock on the symbol-venue map with exponential backoff and timeout.
411    ///
412    /// # Errors
413    ///
414    /// Returns an error if the read lock cannot be acquired within the deadline.
415    fn read_symbol_venue_map(
416        &self,
417    ) -> anyhow::Result<std::sync::RwLockReadGuard<'_, AHashMap<Symbol, Venue>>> {
418        // Try to acquire the lock with exponential backoff and deadline
419        const MAX_WAIT_MS: u64 = 500; // Total maximum wait time
420        const INITIAL_DELAY_MICROS: u64 = 10;
421        const MAX_DELAY_MICROS: u64 = 1000;
422
423        let deadline = std::time::Instant::now() + StdDuration::from_millis(MAX_WAIT_MS);
424        let mut delay = INITIAL_DELAY_MICROS;
425
426        loop {
427            match self.symbol_venue_map.try_read() {
428                Ok(guard) => return Ok(guard),
429                Err(std::sync::TryLockError::WouldBlock) => {
430                    if std::time::Instant::now() >= deadline {
431                        break;
432                    }
433
434                    // Yield to other threads first
435                    std::thread::yield_now();
436
437                    // Then sleep with exponential backoff if still blocked
438                    if std::time::Instant::now() < deadline {
439                        let remaining = deadline - std::time::Instant::now();
440                        let sleep_duration = StdDuration::from_micros(delay).min(remaining);
441                        std::thread::sleep(sleep_duration);
442                        // Exponential backoff with cap and jitter
443                        delay = ((delay * 2) + delay / 4).min(MAX_DELAY_MICROS);
444                    }
445                }
446                Err(std::sync::TryLockError::Poisoned(e)) => {
447                    anyhow::bail!("symbol_venue_map lock poisoned: {e}");
448                }
449            }
450        }
451
452        anyhow::bail!(
453            "Failed to acquire read lock on symbol_venue_map after {MAX_WAIT_MS}ms deadline"
454        )
455    }
456}
457
458/// Handles Databento error messages by logging them.
459fn handle_error_msg(msg: &dbn::ErrorMsg) {
460    tracing::error!("{msg:?}");
461}
462
463/// Handles Databento system messages by logging them.
464fn handle_system_msg(msg: &dbn::SystemMsg) {
465    tracing::info!("{msg:?}");
466}
467
468/// Handles symbol mapping messages and updates the instrument ID map.
469///
470/// # Errors
471///
472/// Returns an error if symbol mapping fails.
473fn handle_symbol_mapping_msg(
474    msg: &dbn::SymbolMappingMsg,
475    symbol_map: &mut PitSymbolMap,
476    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
477) -> anyhow::Result<()> {
478    symbol_map
479        .on_symbol_mapping(msg)
480        .map_err(|e| anyhow::anyhow!("on_symbol_mapping failed for {msg:?}: {e}"))?;
481    instrument_id_map.remove(&msg.header().instrument_id);
482    Ok(())
483}
484
485/// Updates the instrument ID map using exchange information from the symbol map.
486fn update_instrument_id_map_with_exchange(
487    symbol_map: &PitSymbolMap,
488    symbol_venue_map: &RwLock<AHashMap<Symbol, Venue>>,
489    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
490    raw_instrument_id: u32,
491    exchange: &str,
492) -> anyhow::Result<InstrumentId> {
493    let raw_symbol = symbol_map.get(raw_instrument_id).ok_or_else(|| {
494        anyhow::anyhow!("Cannot resolve raw_symbol for instrument_id {raw_instrument_id}")
495    })?;
496    let symbol = Symbol::from(raw_symbol.as_str());
497    let venue = Venue::from_code(exchange)
498        .map_err(|e| anyhow::anyhow!("Invalid venue code '{exchange}': {e}"))?;
499    let instrument_id = InstrumentId::new(symbol, venue);
500    let mut map = symbol_venue_map
501        .write()
502        .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
503    map.entry(symbol).or_insert(venue);
504    instrument_id_map.insert(raw_instrument_id, instrument_id);
505    Ok(instrument_id)
506}
507
508fn update_instrument_id_map(
509    record: &dbn::RecordRef,
510    symbol_map: &PitSymbolMap,
511    publisher_venue_map: &IndexMap<PublisherId, Venue>,
512    symbol_venue_map: &AHashMap<Symbol, Venue>,
513    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
514) -> anyhow::Result<InstrumentId> {
515    let header = record.header();
516
517    // Check if instrument ID is already in the map
518    if let Some(&instrument_id) = instrument_id_map.get(&header.instrument_id) {
519        return Ok(instrument_id);
520    }
521
522    let raw_symbol = symbol_map.get_for_rec(record).ok_or_else(|| {
523        anyhow::anyhow!(
524            "Cannot resolve `raw_symbol` from `symbol_map` for instrument_id {}",
525            header.instrument_id
526        )
527    })?;
528
529    let symbol = Symbol::from_str_unchecked(raw_symbol);
530
531    let publisher_id = header.publisher_id;
532    let venue = if let Some(venue) = symbol_venue_map.get(&symbol) {
533        *venue
534    } else {
535        let venue = publisher_venue_map
536            .get(&publisher_id)
537            .ok_or_else(|| anyhow::anyhow!("No venue found for `publisher_id` {publisher_id}"))?;
538        *venue
539    };
540    let instrument_id = InstrumentId::new(symbol, venue);
541
542    instrument_id_map.insert(header.instrument_id, instrument_id);
543    Ok(instrument_id)
544}
545
546/// Handles instrument definition messages and decodes them into Nautilus instruments.
547///
548/// # Errors
549///
550/// Returns an error if instrument decoding fails.
551fn handle_instrument_def_msg(
552    msg: &dbn::InstrumentDefMsg,
553    record: &dbn::RecordRef,
554    symbol_map: &PitSymbolMap,
555    publisher_venue_map: &IndexMap<PublisherId, Venue>,
556    symbol_venue_map: &AHashMap<Symbol, Venue>,
557    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
558    ts_init: UnixNanos,
559) -> anyhow::Result<InstrumentAny> {
560    let instrument_id = update_instrument_id_map(
561        record,
562        symbol_map,
563        publisher_venue_map,
564        symbol_venue_map,
565        instrument_id_map,
566    )?;
567
568    decode_instrument_def_msg(msg, instrument_id, Some(ts_init))
569}
570
571fn handle_status_msg(
572    msg: &dbn::StatusMsg,
573    record: &dbn::RecordRef,
574    symbol_map: &PitSymbolMap,
575    publisher_venue_map: &IndexMap<PublisherId, Venue>,
576    symbol_venue_map: &AHashMap<Symbol, Venue>,
577    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
578    ts_init: UnixNanos,
579) -> anyhow::Result<InstrumentStatus> {
580    let instrument_id = update_instrument_id_map(
581        record,
582        symbol_map,
583        publisher_venue_map,
584        symbol_venue_map,
585        instrument_id_map,
586    )?;
587
588    decode_status_msg(msg, instrument_id, Some(ts_init))
589}
590
591fn handle_imbalance_msg(
592    msg: &dbn::ImbalanceMsg,
593    record: &dbn::RecordRef,
594    symbol_map: &PitSymbolMap,
595    publisher_venue_map: &IndexMap<PublisherId, Venue>,
596    symbol_venue_map: &AHashMap<Symbol, Venue>,
597    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
598    ts_init: UnixNanos,
599) -> anyhow::Result<DatabentoImbalance> {
600    let instrument_id = update_instrument_id_map(
601        record,
602        symbol_map,
603        publisher_venue_map,
604        symbol_venue_map,
605        instrument_id_map,
606    )?;
607
608    let price_precision = 2; // Hardcoded for now
609
610    decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init))
611}
612
613fn handle_statistics_msg(
614    msg: &dbn::StatMsg,
615    record: &dbn::RecordRef,
616    symbol_map: &PitSymbolMap,
617    publisher_venue_map: &IndexMap<PublisherId, Venue>,
618    symbol_venue_map: &AHashMap<Symbol, Venue>,
619    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
620    ts_init: UnixNanos,
621) -> anyhow::Result<DatabentoStatistics> {
622    let instrument_id = update_instrument_id_map(
623        record,
624        symbol_map,
625        publisher_venue_map,
626        symbol_venue_map,
627        instrument_id_map,
628    )?;
629
630    let price_precision = 2; // Hardcoded for now
631
632    decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
633}
634
635#[allow(clippy::too_many_arguments)]
636fn handle_record(
637    record: dbn::RecordRef,
638    symbol_map: &PitSymbolMap,
639    publisher_venue_map: &IndexMap<PublisherId, Venue>,
640    symbol_venue_map: &AHashMap<Symbol, Venue>,
641    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
642    ts_init: UnixNanos,
643    initialized_books: &HashSet<InstrumentId>,
644    bars_timestamp_on_close: bool,
645) -> anyhow::Result<(Option<Data>, Option<Data>)> {
646    let instrument_id = update_instrument_id_map(
647        &record,
648        symbol_map,
649        publisher_venue_map,
650        symbol_venue_map,
651        instrument_id_map,
652    )?;
653
654    let price_precision = 2; // Hardcoded for now
655
656    // For MBP-1 and quote-based schemas, always include trades since they're integral to the data
657    // For MBO, only include trades after the book is initialized to maintain consistency
658    let include_trades = if record.get::<dbn::Mbp1Msg>().is_some()
659        || record.get::<dbn::TbboMsg>().is_some()
660        || record.get::<dbn::Cmbp1Msg>().is_some()
661    {
662        true // These schemas include trade information directly
663    } else {
664        initialized_books.contains(&instrument_id) // MBO requires initialized book
665    };
666
667    decode_record(
668        &record,
669        instrument_id,
670        price_precision,
671        Some(ts_init),
672        include_trades,
673        bars_timestamp_on_close,
674    )
675}