nautilus_databento/
live.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    sync::{Arc, RwLock},
18    time::Duration as StdDuration,
19};
20
21use ahash::{AHashMap, HashSet, HashSetExt};
22use databento::{
23    dbn::{self, PitSymbolMap, Record, SymbolIndex},
24    live::Subscription,
25};
26use indexmap::IndexMap;
27use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
28use nautilus_model::{
29    data::{Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
30    enums::RecordFlag,
31    identifiers::{InstrumentId, Symbol, Venue},
32    instruments::InstrumentAny,
33};
34use tokio::{sync::mpsc::error::TryRecvError, time::Duration};
35
36use super::{
37    decode::{decode_imbalance_msg, decode_statistics_msg, decode_status_msg},
38    types::{DatabentoImbalance, DatabentoStatistics},
39};
40use crate::{
41    decode::{decode_instrument_def_msg, decode_record},
42    types::PublisherId,
43};
44
45#[derive(Debug)]
46pub enum LiveCommand {
47    Subscribe(Subscription),
48    Start,
49    Close,
50}
51
52#[derive(Debug)]
53#[allow(clippy::large_enum_variant)] // TODO: Optimize this (largest variant 1096 vs 80 bytes)
54pub enum LiveMessage {
55    Data(Data),
56    Instrument(InstrumentAny),
57    Status(InstrumentStatus),
58    Imbalance(DatabentoImbalance),
59    Statistics(DatabentoStatistics),
60    Error(anyhow::Error),
61    Close,
62}
63
64/// Handles a raw TCP data feed from the Databento LSG for a single dataset.
65///
66/// [`LiveCommand`] messages are recieved synchronously across a channel,
67/// decoded records are sent asynchronously on a tokio channel as [`LiveMessage`]s
68/// back to a message processing task.
69///
70/// # Crash Policy
71///
72/// This handler intentionally crashes on catastrophic feed issues rather than
73/// attempting recovery. If excessive buffering occurs (indicating severe feed
74/// misbehavior), the process will run out of memory and terminate. This is by
75/// design - such scenarios indicate fundamental problems that require external
76/// intervention.
77#[derive(Debug)]
78pub struct DatabentoFeedHandler {
79    key: String,
80    dataset: String,
81    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
82    msg_tx: tokio::sync::mpsc::Sender<LiveMessage>,
83    publisher_venue_map: IndexMap<PublisherId, Venue>,
84    symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
85    replay: bool,
86    use_exchange_as_venue: bool,
87    bars_timestamp_on_close: bool,
88}
89
90impl DatabentoFeedHandler {
91    /// Creates a new [`DatabentoFeedHandler`] instance.
92    #[must_use]
93    #[allow(clippy::too_many_arguments)]
94    pub const fn new(
95        key: String,
96        dataset: String,
97        rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
98        tx: tokio::sync::mpsc::Sender<LiveMessage>,
99        publisher_venue_map: IndexMap<PublisherId, Venue>,
100        symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
101        use_exchange_as_venue: bool,
102        bars_timestamp_on_close: bool,
103    ) -> Self {
104        Self {
105            key,
106            dataset,
107            cmd_rx: rx,
108            msg_tx: tx,
109            publisher_venue_map,
110            symbol_venue_map,
111            replay: false,
112            use_exchange_as_venue,
113            bars_timestamp_on_close,
114        }
115    }
116
117    /// Runs the feed handler main loop, processing commands and streaming market data.
118    ///
119    /// Establishes a connection to the Databento LSG, subscribes to requested data feeds,
120    /// and continuously processes incoming market data messages until shutdown.
121    ///
122    /// # Errors
123    ///
124    /// Returns an error if any client operation or message handling fails.
125    #[allow(clippy::blocks_in_conditions)]
126    pub async fn run(&mut self) -> anyhow::Result<()> {
127        tracing::debug!("Running feed handler");
128        let clock = get_atomic_clock_realtime();
129        let mut symbol_map = PitSymbolMap::new();
130        let mut instrument_id_map: AHashMap<u32, InstrumentId> = AHashMap::new();
131
132        let mut buffering_start = None;
133        let mut buffered_deltas: AHashMap<InstrumentId, Vec<OrderBookDelta>> = AHashMap::new();
134        let mut initialized_books = HashSet::new();
135        let timeout = Duration::from_secs(5); // Hard-coded timeout for now
136
137        let result = tokio::time::timeout(
138            timeout,
139            databento::LiveClient::builder()
140                .user_agent_extension(NAUTILUS_USER_AGENT.into())
141                .key(self.key.clone())?
142                .dataset(self.dataset.clone())
143                .build(),
144        )
145        .await?;
146
147        tracing::info!("Connected");
148
149        let mut client = match result {
150            Ok(client) => client,
151            Err(e) => {
152                self.msg_tx.send(LiveMessage::Close).await?;
153                self.cmd_rx.close();
154                anyhow::bail!("Failed to connect to Databento LSG: {e}");
155            }
156        };
157
158        // Timeout awaiting the next record before checking for a command
159        let timeout = Duration::from_millis(10);
160
161        // Flag to control whether to continue to await next record
162        let mut running = false;
163
164        loop {
165            if self.msg_tx.is_closed() {
166                tracing::debug!("Message channel was closed: stopping");
167                break;
168            }
169
170            match self.cmd_rx.try_recv() {
171                Ok(cmd) => {
172                    tracing::debug!("Received command: {cmd:?}");
173                    match cmd {
174                        LiveCommand::Subscribe(sub) => {
175                            if !self.replay && sub.start.is_some() {
176                                self.replay = true;
177                            }
178                            client.subscribe(sub).await?;
179                        }
180                        LiveCommand::Start => {
181                            buffering_start = if self.replay {
182                                Some(clock.get_time_ns())
183                            } else {
184                                None
185                            };
186                            client.start().await?;
187                            running = true;
188                            tracing::debug!("Started");
189                        }
190                        LiveCommand::Close => {
191                            self.msg_tx.send(LiveMessage::Close).await?;
192                            if running {
193                                client.close().await?;
194                                tracing::debug!("Closed inner client");
195                            }
196                            break;
197                        }
198                    }
199                }
200                Err(TryRecvError::Empty) => {} // No command yet
201                Err(TryRecvError::Disconnected) => {
202                    tracing::debug!("Disconnected");
203                    break;
204                }
205            }
206
207            if !running {
208                continue;
209            }
210
211            // Await the next record with a timeout
212            let result = tokio::time::timeout(timeout, client.next_record()).await;
213            let record_opt = match result {
214                Ok(record_opt) => record_opt,
215                Err(_) => continue, // Timeout
216            };
217
218            let record = match record_opt {
219                Ok(Some(record)) => record,
220                Ok(None) => break, // Session ended normally
221                Err(e) => {
222                    // Fail the session entirely for now. Consider refining
223                    // this strategy to handle specific errors more gracefully.
224                    self.send_msg(LiveMessage::Error(anyhow::anyhow!(e))).await;
225                    break;
226                }
227            };
228
229            let ts_init = clock.get_time_ns();
230
231            // Decode record
232            if let Some(msg) = record.get::<dbn::ErrorMsg>() {
233                handle_error_msg(msg);
234            } else if let Some(msg) = record.get::<dbn::SystemMsg>() {
235                handle_system_msg(msg);
236            } else if let Some(msg) = record.get::<dbn::SymbolMappingMsg>() {
237                // Remove instrument ID index as the raw symbol may have changed
238                instrument_id_map.remove(&msg.hd.instrument_id);
239                handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map)?;
240            } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
241                if self.use_exchange_as_venue {
242                    let exchange = msg.exchange()?;
243                    if !exchange.is_empty() {
244                        update_instrument_id_map_with_exchange(
245                            &symbol_map,
246                            &self.symbol_venue_map,
247                            &mut instrument_id_map,
248                            msg.hd.instrument_id,
249                            exchange,
250                        )?;
251                    }
252                }
253                let data = {
254                    let sym_map = self.read_symbol_venue_map()?;
255                    handle_instrument_def_msg(
256                        msg,
257                        &record,
258                        &symbol_map,
259                        &self.publisher_venue_map,
260                        &sym_map,
261                        &mut instrument_id_map,
262                        ts_init,
263                    )?
264                };
265                self.send_msg(LiveMessage::Instrument(data)).await;
266            } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
267                let data = {
268                    let sym_map = self.read_symbol_venue_map()?;
269                    handle_status_msg(
270                        msg,
271                        &record,
272                        &symbol_map,
273                        &self.publisher_venue_map,
274                        &sym_map,
275                        &mut instrument_id_map,
276                        ts_init,
277                    )?
278                };
279                self.send_msg(LiveMessage::Status(data)).await;
280            } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
281                let data = {
282                    let sym_map = self.read_symbol_venue_map()?;
283                    handle_imbalance_msg(
284                        msg,
285                        &record,
286                        &symbol_map,
287                        &self.publisher_venue_map,
288                        &sym_map,
289                        &mut instrument_id_map,
290                        ts_init,
291                    )?
292                };
293                self.send_msg(LiveMessage::Imbalance(data)).await;
294            } else if let Some(msg) = record.get::<dbn::StatMsg>() {
295                let data = {
296                    let sym_map = self.read_symbol_venue_map()?;
297                    handle_statistics_msg(
298                        msg,
299                        &record,
300                        &symbol_map,
301                        &self.publisher_venue_map,
302                        &sym_map,
303                        &mut instrument_id_map,
304                        ts_init,
305                    )?
306                };
307                self.send_msg(LiveMessage::Statistics(data)).await;
308            } else {
309                // Decode a generic record with possible errors
310                let (mut data1, data2) = match {
311                    let sym_map = self.read_symbol_venue_map()?;
312                    handle_record(
313                        record,
314                        &symbol_map,
315                        &self.publisher_venue_map,
316                        &sym_map,
317                        &mut instrument_id_map,
318                        ts_init,
319                        &initialized_books,
320                        self.bars_timestamp_on_close,
321                    )
322                } {
323                    Ok(decoded) => decoded,
324                    Err(e) => {
325                        tracing::error!("Error decoding record: {e}");
326                        continue;
327                    }
328                };
329
330                if let Some(msg) = record.get::<dbn::MboMsg>() {
331                    // Check if should mark book initialized
332                    if let Some(Data::Delta(delta)) = &data1 {
333                        initialized_books.insert(delta.instrument_id);
334                    } else {
335                        continue; // No delta yet
336                    }
337
338                    if let Some(Data::Delta(delta)) = &data1 {
339                        let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
340                        buffer.push(*delta);
341
342                        tracing::trace!(
343                            "Buffering delta: {} {buffering_start:?} flags={}",
344                            delta.ts_event,
345                            msg.flags.raw(),
346                        );
347
348                        // Check if last message in the book event
349                        if !RecordFlag::F_LAST.matches(msg.flags.raw()) {
350                            continue; // NOT last message
351                        }
352
353                        // Check if snapshot
354                        if RecordFlag::F_SNAPSHOT.matches(msg.flags.raw()) {
355                            continue; // Buffer snapshot
356                        }
357
358                        // Check if buffering a replay
359                        if let Some(start_ns) = buffering_start {
360                            if delta.ts_event <= start_ns {
361                                continue; // Continue buffering replay
362                            }
363                            buffering_start = None;
364                        }
365
366                        // SAFETY: We can guarantee a deltas vec exists
367                        let buffer =
368                            buffered_deltas
369                                .remove(&delta.instrument_id)
370                                .ok_or_else(|| {
371                                    anyhow::anyhow!(
372                                        "Internal error: no buffered deltas for instrument {id}",
373                                        id = delta.instrument_id
374                                    )
375                                })?;
376                        let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
377                        let deltas = OrderBookDeltas_API::new(deltas);
378                        data1 = Some(Data::Deltas(deltas));
379                    }
380                }
381
382                if let Some(data) = data1 {
383                    self.send_msg(LiveMessage::Data(data)).await;
384                }
385
386                if let Some(data) = data2 {
387                    self.send_msg(LiveMessage::Data(data)).await;
388                }
389            }
390        }
391
392        self.cmd_rx.close();
393        tracing::debug!("Closed command receiver");
394
395        Ok(())
396    }
397
398    /// Sends a message to the message processing task.
399    async fn send_msg(&mut self, msg: LiveMessage) {
400        tracing::trace!("Sending {msg:?}");
401        match self.msg_tx.send(msg).await {
402            Ok(()) => {}
403            Err(e) => tracing::error!("Error sending message: {e}"),
404        }
405    }
406
407    /// Acquires a read lock on the symbol-venue map with exponential backoff and timeout.
408    ///
409    /// # Errors
410    ///
411    /// Returns an error if the read lock cannot be acquired within the deadline.
412    fn read_symbol_venue_map(
413        &self,
414    ) -> anyhow::Result<std::sync::RwLockReadGuard<'_, AHashMap<Symbol, Venue>>> {
415        // Try to acquire the lock with exponential backoff and deadline
416        const MAX_WAIT_MS: u64 = 500; // Total maximum wait time
417        const INITIAL_DELAY_MICROS: u64 = 10;
418        const MAX_DELAY_MICROS: u64 = 1000;
419
420        let deadline = std::time::Instant::now() + StdDuration::from_millis(MAX_WAIT_MS);
421        let mut delay = INITIAL_DELAY_MICROS;
422
423        loop {
424            match self.symbol_venue_map.try_read() {
425                Ok(guard) => return Ok(guard),
426                Err(std::sync::TryLockError::WouldBlock) => {
427                    if std::time::Instant::now() >= deadline {
428                        break;
429                    }
430                    // Yield to other threads first
431                    std::thread::yield_now();
432
433                    // Then sleep with exponential backoff if still blocked
434                    if std::time::Instant::now() < deadline {
435                        let remaining = deadline - std::time::Instant::now();
436                        let sleep_duration = StdDuration::from_micros(delay).min(remaining);
437                        std::thread::sleep(sleep_duration);
438                        // Exponential backoff with cap and jitter
439                        delay = ((delay * 2) + delay / 4).min(MAX_DELAY_MICROS);
440                    }
441                }
442                Err(std::sync::TryLockError::Poisoned(e)) => {
443                    return Err(anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"));
444                }
445            }
446        }
447
448        Err(anyhow::anyhow!(
449            "Failed to acquire read lock on symbol_venue_map after {MAX_WAIT_MS}ms deadline"
450        ))
451    }
452}
453
454/// Handles Databento error messages by logging them.
455fn handle_error_msg(msg: &dbn::ErrorMsg) {
456    tracing::error!("{msg:?}");
457}
458
459/// Handles Databento system messages by logging them.
460fn handle_system_msg(msg: &dbn::SystemMsg) {
461    tracing::info!("{msg:?}");
462}
463
464/// Handles symbol mapping messages and updates the instrument ID map.
465///
466/// # Errors
467///
468/// Returns an error if symbol mapping fails.
469fn handle_symbol_mapping_msg(
470    msg: &dbn::SymbolMappingMsg,
471    symbol_map: &mut PitSymbolMap,
472    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
473) -> anyhow::Result<()> {
474    symbol_map
475        .on_symbol_mapping(msg)
476        .map_err(|e| anyhow::anyhow!("on_symbol_mapping failed for {msg:?}: {e}"))?;
477    instrument_id_map.remove(&msg.header().instrument_id);
478    Ok(())
479}
480
481/// Updates the instrument ID map using exchange information from the symbol map.
482fn update_instrument_id_map_with_exchange(
483    symbol_map: &PitSymbolMap,
484    symbol_venue_map: &RwLock<AHashMap<Symbol, Venue>>,
485    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
486    raw_instrument_id: u32,
487    exchange: &str,
488) -> anyhow::Result<InstrumentId> {
489    let raw_symbol = symbol_map.get(raw_instrument_id).ok_or_else(|| {
490        anyhow::anyhow!("Cannot resolve raw_symbol for instrument_id {raw_instrument_id}")
491    })?;
492    let symbol = Symbol::from(raw_symbol.as_str());
493    let venue = Venue::from_code(exchange)
494        .map_err(|e| anyhow::anyhow!("Invalid venue code '{exchange}': {e}"))?;
495    let instrument_id = InstrumentId::new(symbol, venue);
496    let mut map = symbol_venue_map
497        .write()
498        .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
499    map.entry(symbol).or_insert(venue);
500    instrument_id_map.insert(raw_instrument_id, instrument_id);
501    Ok(instrument_id)
502}
503
504fn update_instrument_id_map(
505    record: &dbn::RecordRef,
506    symbol_map: &PitSymbolMap,
507    publisher_venue_map: &IndexMap<PublisherId, Venue>,
508    symbol_venue_map: &AHashMap<Symbol, Venue>,
509    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
510) -> anyhow::Result<InstrumentId> {
511    let header = record.header();
512
513    // Check if instrument ID is already in the map
514    if let Some(&instrument_id) = instrument_id_map.get(&header.instrument_id) {
515        return Ok(instrument_id);
516    }
517
518    let raw_symbol = symbol_map.get_for_rec(record).ok_or_else(|| {
519        anyhow::anyhow!(
520            "Cannot resolve `raw_symbol` from `symbol_map` for instrument_id {}",
521            header.instrument_id
522        )
523    })?;
524
525    let symbol = Symbol::from_str_unchecked(raw_symbol);
526
527    let publisher_id = header.publisher_id;
528    let venue = if let Some(venue) = symbol_venue_map.get(&symbol) {
529        *venue
530    } else {
531        let venue = publisher_venue_map
532            .get(&publisher_id)
533            .ok_or_else(|| anyhow::anyhow!("No venue found for `publisher_id` {publisher_id}"))?;
534        *venue
535    };
536    let instrument_id = InstrumentId::new(symbol, venue);
537
538    instrument_id_map.insert(header.instrument_id, instrument_id);
539    Ok(instrument_id)
540}
541
542/// Handles instrument definition messages and decodes them into Nautilus instruments.
543///
544/// # Errors
545///
546/// Returns an error if instrument decoding fails.
547fn handle_instrument_def_msg(
548    msg: &dbn::InstrumentDefMsg,
549    record: &dbn::RecordRef,
550    symbol_map: &PitSymbolMap,
551    publisher_venue_map: &IndexMap<PublisherId, Venue>,
552    symbol_venue_map: &AHashMap<Symbol, Venue>,
553    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
554    ts_init: UnixNanos,
555) -> anyhow::Result<InstrumentAny> {
556    let instrument_id = update_instrument_id_map(
557        record,
558        symbol_map,
559        publisher_venue_map,
560        symbol_venue_map,
561        instrument_id_map,
562    )?;
563
564    decode_instrument_def_msg(msg, instrument_id, Some(ts_init))
565}
566
567fn handle_status_msg(
568    msg: &dbn::StatusMsg,
569    record: &dbn::RecordRef,
570    symbol_map: &PitSymbolMap,
571    publisher_venue_map: &IndexMap<PublisherId, Venue>,
572    symbol_venue_map: &AHashMap<Symbol, Venue>,
573    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
574    ts_init: UnixNanos,
575) -> anyhow::Result<InstrumentStatus> {
576    let instrument_id = update_instrument_id_map(
577        record,
578        symbol_map,
579        publisher_venue_map,
580        symbol_venue_map,
581        instrument_id_map,
582    )?;
583
584    decode_status_msg(msg, instrument_id, Some(ts_init))
585}
586
587fn handle_imbalance_msg(
588    msg: &dbn::ImbalanceMsg,
589    record: &dbn::RecordRef,
590    symbol_map: &PitSymbolMap,
591    publisher_venue_map: &IndexMap<PublisherId, Venue>,
592    symbol_venue_map: &AHashMap<Symbol, Venue>,
593    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
594    ts_init: UnixNanos,
595) -> anyhow::Result<DatabentoImbalance> {
596    let instrument_id = update_instrument_id_map(
597        record,
598        symbol_map,
599        publisher_venue_map,
600        symbol_venue_map,
601        instrument_id_map,
602    )?;
603
604    let price_precision = 2; // Hard-coded for now
605
606    decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init))
607}
608
609fn handle_statistics_msg(
610    msg: &dbn::StatMsg,
611    record: &dbn::RecordRef,
612    symbol_map: &PitSymbolMap,
613    publisher_venue_map: &IndexMap<PublisherId, Venue>,
614    symbol_venue_map: &AHashMap<Symbol, Venue>,
615    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
616    ts_init: UnixNanos,
617) -> anyhow::Result<DatabentoStatistics> {
618    let instrument_id = update_instrument_id_map(
619        record,
620        symbol_map,
621        publisher_venue_map,
622        symbol_venue_map,
623        instrument_id_map,
624    )?;
625
626    let price_precision = 2; // Hard-coded for now
627
628    decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
629}
630
631#[allow(clippy::too_many_arguments)]
632fn handle_record(
633    record: dbn::RecordRef,
634    symbol_map: &PitSymbolMap,
635    publisher_venue_map: &IndexMap<PublisherId, Venue>,
636    symbol_venue_map: &AHashMap<Symbol, Venue>,
637    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
638    ts_init: UnixNanos,
639    initialized_books: &HashSet<InstrumentId>,
640    bars_timestamp_on_close: bool,
641) -> anyhow::Result<(Option<Data>, Option<Data>)> {
642    let instrument_id = update_instrument_id_map(
643        &record,
644        symbol_map,
645        publisher_venue_map,
646        symbol_venue_map,
647        instrument_id_map,
648    )?;
649
650    let price_precision = 2; // Hard-coded for now
651
652    // For MBP-1 and quote-based schemas, always include trades since they're integral to the data
653    // For MBO, only include trades after the book is initialized to maintain consistency
654    let include_trades = if record.get::<dbn::Mbp1Msg>().is_some()
655        || record.get::<dbn::TbboMsg>().is_some()
656        || record.get::<dbn::Cmbp1Msg>().is_some()
657    {
658        true // These schemas include trade information directly
659    } else {
660        initialized_books.contains(&instrument_id) // MBO requires initialized book
661    };
662
663    decode_record(
664        &record,
665        instrument_id,
666        price_precision,
667        Some(ts_init),
668        include_trades,
669        bars_timestamp_on_close,
670    )
671}