nautilus_databento/
live.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    sync::{Arc, RwLock},
18    time::Duration as StdDuration,
19};
20
21use ahash::{AHashMap, HashSet, HashSetExt};
22use databento::{
23    dbn::{self, PitSymbolMap, Record, SymbolIndex},
24    live::Subscription,
25};
26use indexmap::IndexMap;
27use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
28use nautilus_model::{
29    data::{Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
30    enums::RecordFlag,
31    identifiers::{InstrumentId, Symbol, Venue},
32    instruments::InstrumentAny,
33};
34use nautilus_network::backoff::ExponentialBackoff;
35use tokio::{
36    sync::mpsc::error::TryRecvError,
37    time::{Duration, Instant},
38};
39
40use super::{
41    decode::{decode_imbalance_msg, decode_statistics_msg, decode_status_msg},
42    types::{DatabentoImbalance, DatabentoStatistics},
43};
44use crate::{
45    decode::{decode_instrument_def_msg, decode_record},
46    types::PublisherId,
47};
48
49#[derive(Debug)]
50pub enum LiveCommand {
51    Subscribe(Subscription),
52    Start,
53    Close,
54}
55
56#[derive(Debug)]
57#[allow(
58    clippy::large_enum_variant,
59    reason = "TODO: Optimize this (largest variant 1096 vs 80 bytes)"
60)]
61pub enum LiveMessage {
62    Data(Data),
63    Instrument(InstrumentAny),
64    Status(InstrumentStatus),
65    Imbalance(DatabentoImbalance),
66    Statistics(DatabentoStatistics),
67    Error(anyhow::Error),
68    Close,
69}
70
71/// Handles a raw TCP data feed from the Databento LSG for a single dataset.
72///
73/// [`LiveCommand`] messages are received synchronously across a channel,
74/// decoded records are sent asynchronously on a tokio channel as [`LiveMessage`]s
75/// back to a message processing task.
76///
77/// # Crash Policy
78///
79/// This handler intentionally crashes on catastrophic feed issues rather than
80/// attempting recovery. If excessive buffering occurs (indicating severe feed
81/// misbehavior), the process will run out of memory and terminate. This is by
82/// design - such scenarios indicate fundamental problems that require external
83/// intervention.
84#[derive(Debug)]
85pub struct DatabentoFeedHandler {
86    key: String,
87    dataset: String,
88    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
89    msg_tx: tokio::sync::mpsc::Sender<LiveMessage>,
90    publisher_venue_map: IndexMap<PublisherId, Venue>,
91    symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
92    replay: bool,
93    use_exchange_as_venue: bool,
94    bars_timestamp_on_close: bool,
95    reconnect_timeout_mins: Option<u64>,
96    backoff: ExponentialBackoff,
97    subscriptions: Vec<Subscription>,
98    buffered_commands: Vec<LiveCommand>,
99}
100
101impl DatabentoFeedHandler {
102    /// Creates a new [`DatabentoFeedHandler`] instance.
103    ///
104    /// # Panics
105    ///
106    /// Panics if exponential backoff creation fails (should never happen with valid hardcoded parameters).
107    #[must_use]
108    #[allow(clippy::too_many_arguments)]
109    pub fn new(
110        key: String,
111        dataset: String,
112        rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
113        tx: tokio::sync::mpsc::Sender<LiveMessage>,
114        publisher_venue_map: IndexMap<PublisherId, Venue>,
115        symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
116        use_exchange_as_venue: bool,
117        bars_timestamp_on_close: bool,
118        reconnect_timeout_mins: Option<u64>,
119    ) -> Self {
120        // Choose max delay based on timeout configuration:
121        // - With timeout: 60s max (quick recovery to reconnect within window)
122        // - Without timeout (None): 600s max (patient recovery, respectful of infrastructure)
123        let delay_max = if reconnect_timeout_mins.is_some() {
124            Duration::from_secs(60)
125        } else {
126            Duration::from_secs(600)
127        };
128
129        // SAFETY: Hardcoded parameters are all valid
130        let backoff =
131            ExponentialBackoff::new(Duration::from_secs(1), delay_max, 2.0, 1000, false).unwrap();
132
133        Self {
134            key,
135            dataset,
136            cmd_rx: rx,
137            msg_tx: tx,
138            publisher_venue_map,
139            symbol_venue_map,
140            replay: false,
141            use_exchange_as_venue,
142            bars_timestamp_on_close,
143            reconnect_timeout_mins,
144            backoff,
145            subscriptions: Vec::new(),
146            buffered_commands: Vec::new(),
147        }
148    }
149
150    /// Runs the feed handler main loop, processing commands and streaming market data.
151    ///
152    /// Establishes a connection to the Databento LSG, subscribes to requested data feeds,
153    /// and continuously processes incoming market data messages until shutdown.
154    ///
155    /// Implements automatic reconnection with exponential backoff (1s to 60s with jitter).
156    /// Each successful session resets the reconnection cycle, giving the next disconnect
157    /// a fresh timeout window. Gives up after `reconnect_timeout_mins` if configured.
158    ///
159    /// # Errors
160    ///
161    /// Returns an error if any client operation or message handling fails.
162    #[allow(clippy::blocks_in_conditions)]
163    pub async fn run(&mut self) -> anyhow::Result<()> {
164        tracing::debug!("Running feed handler");
165
166        let mut reconnect_start: Option<Instant> = None;
167        let mut attempt = 0;
168
169        loop {
170            attempt += 1;
171
172            match self.run_session(attempt).await {
173                Ok(ran_successfully) => {
174                    if ran_successfully {
175                        tracing::info!("Resetting reconnection cycle after successful session");
176                        reconnect_start = None;
177                        attempt = 0;
178                        self.backoff.reset();
179                        continue;
180                    } else {
181                        tracing::info!("Session ended normally");
182                        break Ok(());
183                    }
184                }
185                Err(e) => {
186                    let cycle_start = reconnect_start.get_or_insert_with(Instant::now);
187
188                    if let Some(timeout_mins) = self.reconnect_timeout_mins {
189                        let elapsed = cycle_start.elapsed();
190                        let timeout = Duration::from_secs(timeout_mins * 60);
191
192                        if elapsed >= timeout {
193                            tracing::error!(
194                                "Giving up reconnection after {} minutes",
195                                timeout_mins
196                            );
197                            self.send_msg(LiveMessage::Error(anyhow::anyhow!(
198                                "Reconnection timeout after {timeout_mins} minutes: {e}"
199                            )))
200                            .await;
201                            break Err(e);
202                        }
203                    }
204
205                    let delay = self.backoff.next_duration();
206
207                    tracing::warn!(
208                        "Connection lost (attempt {}): {}. Reconnecting in {}s...",
209                        attempt,
210                        e,
211                        delay.as_secs()
212                    );
213
214                    tokio::select! {
215                        _ = tokio::time::sleep(delay) => {}
216                        cmd = self.cmd_rx.recv() => {
217                            match cmd {
218                                Some(LiveCommand::Close) => {
219                                    tracing::info!("Close received during backoff");
220                                    return Ok(());
221                                }
222                                None => {
223                                    tracing::debug!("Command channel closed during backoff");
224                                    return Ok(());
225                                }
226                                Some(cmd) => {
227                                    tracing::debug!("Buffering command received during backoff: {:?}", cmd);
228                                    self.buffered_commands.push(cmd);
229                                }
230                            }
231                        }
232                    }
233                }
234            }
235        }
236    }
237
238    /// Runs a single session, handling connection, subscriptions, and data streaming.
239    ///
240    /// Returns `Ok(bool)` where the bool indicates if the session ran successfully
241    /// for a meaningful duration (true) or was intentionally closed (false).
242    ///
243    /// # Errors
244    ///
245    /// Returns an error if connection fails, subscription fails, or data streaming encounters an error.
246    async fn run_session(&mut self, attempt: usize) -> anyhow::Result<bool> {
247        if attempt > 1 {
248            tracing::info!("Reconnecting (attempt {})...", attempt);
249        }
250
251        let session_start = Instant::now();
252        let clock = get_atomic_clock_realtime();
253        let mut symbol_map = PitSymbolMap::new();
254        let mut instrument_id_map: AHashMap<u32, InstrumentId> = AHashMap::new();
255
256        let mut buffering_start = None;
257        let mut buffered_deltas: AHashMap<InstrumentId, Vec<OrderBookDelta>> = AHashMap::new();
258        let mut initialized_books = HashSet::new();
259        let timeout = Duration::from_secs(5); // Hardcoded timeout for now
260
261        let result = tokio::time::timeout(
262            timeout,
263            databento::LiveClient::builder()
264                .user_agent_extension(NAUTILUS_USER_AGENT.into())
265                .key(self.key.clone())?
266                .dataset(self.dataset.clone())
267                .build(),
268        )
269        .await?;
270
271        let mut client = match result {
272            Ok(client) => {
273                if attempt > 1 {
274                    tracing::info!("Reconnected successfully");
275                } else {
276                    tracing::info!("Connected");
277                }
278                client
279            }
280            Err(e) => {
281                anyhow::bail!("Failed to connect to Databento LSG: {e}");
282            }
283        };
284
285        // Process any commands buffered during reconnection backoff
286        let mut start_buffered = false;
287        if !self.buffered_commands.is_empty() {
288            tracing::info!(
289                "Processing {} buffered commands",
290                self.buffered_commands.len()
291            );
292            for cmd in self.buffered_commands.drain(..) {
293                match cmd {
294                    LiveCommand::Subscribe(sub) => {
295                        if !self.replay && sub.start.is_some() {
296                            self.replay = true;
297                        }
298                        self.subscriptions.push(sub);
299                    }
300                    LiveCommand::Start => {
301                        start_buffered = true;
302                    }
303                    LiveCommand::Close => {
304                        tracing::warn!("Close command was buffered, shutting down");
305                        return Ok(false);
306                    }
307                }
308            }
309        }
310
311        let timeout = Duration::from_millis(10);
312        let mut running = false;
313
314        if !self.subscriptions.is_empty() {
315            tracing::info!(
316                "Resubscribing to {} subscriptions",
317                self.subscriptions.len()
318            );
319            for sub in self.subscriptions.clone() {
320                client.subscribe(sub).await?;
321            }
322            // Strip start timestamps after successful subscription to avoid replaying history on future reconnects
323            for sub in &mut self.subscriptions {
324                sub.start = None;
325            }
326            client.start().await?;
327            running = true;
328            tracing::info!("Resubscription complete");
329        } else if start_buffered {
330            tracing::info!("Starting session from buffered Start command");
331            buffering_start = if self.replay {
332                Some(clock.get_time_ns())
333            } else {
334                None
335            };
336            client.start().await?;
337            running = true;
338        }
339
340        loop {
341            if self.msg_tx.is_closed() {
342                tracing::debug!("Message channel was closed: stopping");
343                return Ok(false);
344            }
345
346            match self.cmd_rx.try_recv() {
347                Ok(cmd) => {
348                    tracing::debug!("Received command: {cmd:?}");
349                    match cmd {
350                        LiveCommand::Subscribe(sub) => {
351                            if !self.replay && sub.start.is_some() {
352                                self.replay = true;
353                            }
354                            client.subscribe(sub.clone()).await?;
355                            // Store without start to avoid replaying history on reconnect
356                            let mut sub_for_reconnect = sub;
357                            sub_for_reconnect.start = None;
358                            self.subscriptions.push(sub_for_reconnect);
359                        }
360                        LiveCommand::Start => {
361                            buffering_start = if self.replay {
362                                Some(clock.get_time_ns())
363                            } else {
364                                None
365                            };
366                            client.start().await?;
367                            running = true;
368                            tracing::debug!("Started");
369                        }
370                        LiveCommand::Close => {
371                            self.msg_tx.send(LiveMessage::Close).await?;
372                            if running {
373                                client.close().await?;
374                                tracing::debug!("Closed inner client");
375                            }
376                            return Ok(false);
377                        }
378                    }
379                }
380                Err(TryRecvError::Empty) => {}
381                Err(TryRecvError::Disconnected) => {
382                    tracing::debug!("Command channel disconnected");
383                    return Ok(false);
384                }
385            }
386
387            if !running {
388                continue;
389            }
390
391            let result = tokio::time::timeout(timeout, client.next_record()).await;
392            let record_opt = match result {
393                Ok(record_opt) => record_opt,
394                Err(_) => continue,
395            };
396
397            let record = match record_opt {
398                Ok(Some(record)) => record,
399                Ok(None) => {
400                    const SUCCESS_THRESHOLD: Duration = Duration::from_secs(60);
401                    if session_start.elapsed() >= SUCCESS_THRESHOLD {
402                        tracing::info!("Session ended after successful run");
403                        return Ok(true);
404                    }
405                    anyhow::bail!("Session ended by gateway");
406                }
407                Err(e) => {
408                    const SUCCESS_THRESHOLD: Duration = Duration::from_secs(60);
409                    if session_start.elapsed() >= SUCCESS_THRESHOLD {
410                        tracing::info!("Connection error after successful run: {e}");
411                        return Ok(true);
412                    }
413                    anyhow::bail!("Connection error: {e}");
414                }
415            };
416
417            let ts_init = clock.get_time_ns();
418
419            // Decode record
420            if let Some(msg) = record.get::<dbn::ErrorMsg>() {
421                handle_error_msg(msg);
422            } else if let Some(msg) = record.get::<dbn::SystemMsg>() {
423                handle_system_msg(msg);
424            } else if let Some(msg) = record.get::<dbn::SymbolMappingMsg>() {
425                // Remove instrument ID index as the raw symbol may have changed
426                instrument_id_map.remove(&msg.hd.instrument_id);
427                handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map)?;
428            } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
429                if self.use_exchange_as_venue {
430                    let exchange = msg.exchange()?;
431                    if !exchange.is_empty() {
432                        update_instrument_id_map_with_exchange(
433                            &symbol_map,
434                            &self.symbol_venue_map,
435                            &mut instrument_id_map,
436                            msg.hd.instrument_id,
437                            exchange,
438                        )?;
439                    }
440                }
441                let data = {
442                    let sym_map = self.read_symbol_venue_map()?;
443                    handle_instrument_def_msg(
444                        msg,
445                        &record,
446                        &symbol_map,
447                        &self.publisher_venue_map,
448                        &sym_map,
449                        &mut instrument_id_map,
450                        ts_init,
451                    )?
452                };
453                self.send_msg(LiveMessage::Instrument(data)).await;
454            } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
455                let data = {
456                    let sym_map = self.read_symbol_venue_map()?;
457                    handle_status_msg(
458                        msg,
459                        &record,
460                        &symbol_map,
461                        &self.publisher_venue_map,
462                        &sym_map,
463                        &mut instrument_id_map,
464                        ts_init,
465                    )?
466                };
467                self.send_msg(LiveMessage::Status(data)).await;
468            } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
469                let data = {
470                    let sym_map = self.read_symbol_venue_map()?;
471                    handle_imbalance_msg(
472                        msg,
473                        &record,
474                        &symbol_map,
475                        &self.publisher_venue_map,
476                        &sym_map,
477                        &mut instrument_id_map,
478                        ts_init,
479                    )?
480                };
481                self.send_msg(LiveMessage::Imbalance(data)).await;
482            } else if let Some(msg) = record.get::<dbn::StatMsg>() {
483                let data = {
484                    let sym_map = self.read_symbol_venue_map()?;
485                    handle_statistics_msg(
486                        msg,
487                        &record,
488                        &symbol_map,
489                        &self.publisher_venue_map,
490                        &sym_map,
491                        &mut instrument_id_map,
492                        ts_init,
493                    )?
494                };
495                self.send_msg(LiveMessage::Statistics(data)).await;
496            } else {
497                // Decode a generic record with possible errors
498                let res = {
499                    let sym_map = self.read_symbol_venue_map()?;
500                    handle_record(
501                        record,
502                        &symbol_map,
503                        &self.publisher_venue_map,
504                        &sym_map,
505                        &mut instrument_id_map,
506                        ts_init,
507                        &initialized_books,
508                        self.bars_timestamp_on_close,
509                    )
510                };
511                let (mut data1, data2) = match res {
512                    Ok(decoded) => decoded,
513                    Err(e) => {
514                        tracing::error!("Error decoding record: {e}");
515                        continue;
516                    }
517                };
518
519                if let Some(msg) = record.get::<dbn::MboMsg>() {
520                    // Check if should mark book initialized
521                    if let Some(Data::Delta(delta)) = &data1 {
522                        initialized_books.insert(delta.instrument_id);
523                    } else {
524                        continue; // No delta yet
525                    }
526
527                    if let Some(Data::Delta(delta)) = &data1 {
528                        let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
529                        buffer.push(*delta);
530
531                        tracing::trace!(
532                            "Buffering delta: {} {buffering_start:?} flags={}",
533                            delta.ts_event,
534                            msg.flags.raw(),
535                        );
536
537                        // Check if last message in the book event
538                        if !RecordFlag::F_LAST.matches(msg.flags.raw()) {
539                            continue; // NOT last message
540                        }
541
542                        // Check if snapshot
543                        if RecordFlag::F_SNAPSHOT.matches(msg.flags.raw()) {
544                            continue; // Buffer snapshot
545                        }
546
547                        // Check if buffering a replay
548                        if let Some(start_ns) = buffering_start {
549                            if delta.ts_event <= start_ns {
550                                continue; // Continue buffering replay
551                            }
552                            buffering_start = None;
553                        }
554
555                        // SAFETY: We can guarantee a deltas vec exists
556                        let buffer =
557                            buffered_deltas
558                                .remove(&delta.instrument_id)
559                                .ok_or_else(|| {
560                                    anyhow::anyhow!(
561                                        "Internal error: no buffered deltas for instrument {id}",
562                                        id = delta.instrument_id
563                                    )
564                                })?;
565                        let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
566                        let deltas = OrderBookDeltas_API::new(deltas);
567                        data1 = Some(Data::Deltas(deltas));
568                    }
569                }
570
571                if let Some(data) = data1 {
572                    self.send_msg(LiveMessage::Data(data)).await;
573                }
574
575                if let Some(data) = data2 {
576                    self.send_msg(LiveMessage::Data(data)).await;
577                }
578            }
579        }
580    }
581
582    /// Sends a message to the message processing task.
583    async fn send_msg(&mut self, msg: LiveMessage) {
584        tracing::trace!("Sending {msg:?}");
585        match self.msg_tx.send(msg).await {
586            Ok(()) => {}
587            Err(e) => tracing::error!("Error sending message: {e}"),
588        }
589    }
590
591    /// Acquires a read lock on the symbol-venue map with exponential backoff and timeout.
592    ///
593    /// # Errors
594    ///
595    /// Returns an error if the read lock cannot be acquired within the deadline.
596    fn read_symbol_venue_map(
597        &self,
598    ) -> anyhow::Result<std::sync::RwLockReadGuard<'_, AHashMap<Symbol, Venue>>> {
599        // Try to acquire the lock with exponential backoff and deadline
600        const MAX_WAIT_MS: u64 = 500; // Total maximum wait time
601        const INITIAL_DELAY_MICROS: u64 = 10;
602        const MAX_DELAY_MICROS: u64 = 1000;
603
604        let deadline = std::time::Instant::now() + StdDuration::from_millis(MAX_WAIT_MS);
605        let mut delay = INITIAL_DELAY_MICROS;
606
607        loop {
608            match self.symbol_venue_map.try_read() {
609                Ok(guard) => return Ok(guard),
610                Err(std::sync::TryLockError::WouldBlock) => {
611                    if std::time::Instant::now() >= deadline {
612                        break;
613                    }
614
615                    // Yield to other threads first
616                    std::thread::yield_now();
617
618                    // Then sleep with exponential backoff if still blocked
619                    if std::time::Instant::now() < deadline {
620                        let remaining = deadline - std::time::Instant::now();
621                        let sleep_duration = StdDuration::from_micros(delay).min(remaining);
622                        std::thread::sleep(sleep_duration);
623                        // Exponential backoff with cap and jitter
624                        delay = ((delay * 2) + delay / 4).min(MAX_DELAY_MICROS);
625                    }
626                }
627                Err(std::sync::TryLockError::Poisoned(e)) => {
628                    anyhow::bail!("symbol_venue_map lock poisoned: {e}");
629                }
630            }
631        }
632
633        anyhow::bail!(
634            "Failed to acquire read lock on symbol_venue_map after {MAX_WAIT_MS}ms deadline"
635        )
636    }
637}
638
639/// Handles Databento error messages by logging them.
640fn handle_error_msg(msg: &dbn::ErrorMsg) {
641    tracing::error!("{msg:?}");
642}
643
644/// Handles Databento system messages by logging them.
645fn handle_system_msg(msg: &dbn::SystemMsg) {
646    tracing::debug!("{msg:?}");
647}
648
649/// Handles symbol mapping messages and updates the instrument ID map.
650///
651/// # Errors
652///
653/// Returns an error if symbol mapping fails.
654fn handle_symbol_mapping_msg(
655    msg: &dbn::SymbolMappingMsg,
656    symbol_map: &mut PitSymbolMap,
657    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
658) -> anyhow::Result<()> {
659    symbol_map
660        .on_symbol_mapping(msg)
661        .map_err(|e| anyhow::anyhow!("on_symbol_mapping failed for {msg:?}: {e}"))?;
662    instrument_id_map.remove(&msg.header().instrument_id);
663    Ok(())
664}
665
666/// Updates the instrument ID map using exchange information from the symbol map.
667fn update_instrument_id_map_with_exchange(
668    symbol_map: &PitSymbolMap,
669    symbol_venue_map: &RwLock<AHashMap<Symbol, Venue>>,
670    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
671    raw_instrument_id: u32,
672    exchange: &str,
673) -> anyhow::Result<InstrumentId> {
674    let raw_symbol = symbol_map.get(raw_instrument_id).ok_or_else(|| {
675        anyhow::anyhow!("Cannot resolve raw_symbol for instrument_id {raw_instrument_id}")
676    })?;
677    let symbol = Symbol::from(raw_symbol.as_str());
678    let venue = Venue::from_code(exchange)
679        .map_err(|e| anyhow::anyhow!("Invalid venue code '{exchange}': {e}"))?;
680    let instrument_id = InstrumentId::new(symbol, venue);
681    let mut map = symbol_venue_map
682        .write()
683        .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
684    map.entry(symbol).or_insert(venue);
685    instrument_id_map.insert(raw_instrument_id, instrument_id);
686    Ok(instrument_id)
687}
688
689fn update_instrument_id_map(
690    record: &dbn::RecordRef,
691    symbol_map: &PitSymbolMap,
692    publisher_venue_map: &IndexMap<PublisherId, Venue>,
693    symbol_venue_map: &AHashMap<Symbol, Venue>,
694    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
695) -> anyhow::Result<InstrumentId> {
696    let header = record.header();
697
698    // Check if instrument ID is already in the map
699    if let Some(&instrument_id) = instrument_id_map.get(&header.instrument_id) {
700        return Ok(instrument_id);
701    }
702
703    let raw_symbol = symbol_map.get_for_rec(record).ok_or_else(|| {
704        anyhow::anyhow!(
705            "Cannot resolve `raw_symbol` from `symbol_map` for instrument_id {}",
706            header.instrument_id
707        )
708    })?;
709
710    let symbol = Symbol::from_str_unchecked(raw_symbol);
711
712    let publisher_id = header.publisher_id;
713    let venue = if let Some(venue) = symbol_venue_map.get(&symbol) {
714        *venue
715    } else {
716        let venue = publisher_venue_map
717            .get(&publisher_id)
718            .ok_or_else(|| anyhow::anyhow!("No venue found for `publisher_id` {publisher_id}"))?;
719        *venue
720    };
721    let instrument_id = InstrumentId::new(symbol, venue);
722
723    instrument_id_map.insert(header.instrument_id, instrument_id);
724    Ok(instrument_id)
725}
726
727/// Handles instrument definition messages and decodes them into Nautilus instruments.
728///
729/// # Errors
730///
731/// Returns an error if instrument decoding fails.
732fn handle_instrument_def_msg(
733    msg: &dbn::InstrumentDefMsg,
734    record: &dbn::RecordRef,
735    symbol_map: &PitSymbolMap,
736    publisher_venue_map: &IndexMap<PublisherId, Venue>,
737    symbol_venue_map: &AHashMap<Symbol, Venue>,
738    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
739    ts_init: UnixNanos,
740) -> anyhow::Result<InstrumentAny> {
741    let instrument_id = update_instrument_id_map(
742        record,
743        symbol_map,
744        publisher_venue_map,
745        symbol_venue_map,
746        instrument_id_map,
747    )?;
748
749    decode_instrument_def_msg(msg, instrument_id, Some(ts_init))
750}
751
752fn handle_status_msg(
753    msg: &dbn::StatusMsg,
754    record: &dbn::RecordRef,
755    symbol_map: &PitSymbolMap,
756    publisher_venue_map: &IndexMap<PublisherId, Venue>,
757    symbol_venue_map: &AHashMap<Symbol, Venue>,
758    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
759    ts_init: UnixNanos,
760) -> anyhow::Result<InstrumentStatus> {
761    let instrument_id = update_instrument_id_map(
762        record,
763        symbol_map,
764        publisher_venue_map,
765        symbol_venue_map,
766        instrument_id_map,
767    )?;
768
769    decode_status_msg(msg, instrument_id, Some(ts_init))
770}
771
772fn handle_imbalance_msg(
773    msg: &dbn::ImbalanceMsg,
774    record: &dbn::RecordRef,
775    symbol_map: &PitSymbolMap,
776    publisher_venue_map: &IndexMap<PublisherId, Venue>,
777    symbol_venue_map: &AHashMap<Symbol, Venue>,
778    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
779    ts_init: UnixNanos,
780) -> anyhow::Result<DatabentoImbalance> {
781    let instrument_id = update_instrument_id_map(
782        record,
783        symbol_map,
784        publisher_venue_map,
785        symbol_venue_map,
786        instrument_id_map,
787    )?;
788
789    let price_precision = 2; // Hardcoded for now
790
791    decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init))
792}
793
794fn handle_statistics_msg(
795    msg: &dbn::StatMsg,
796    record: &dbn::RecordRef,
797    symbol_map: &PitSymbolMap,
798    publisher_venue_map: &IndexMap<PublisherId, Venue>,
799    symbol_venue_map: &AHashMap<Symbol, Venue>,
800    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
801    ts_init: UnixNanos,
802) -> anyhow::Result<DatabentoStatistics> {
803    let instrument_id = update_instrument_id_map(
804        record,
805        symbol_map,
806        publisher_venue_map,
807        symbol_venue_map,
808        instrument_id_map,
809    )?;
810
811    let price_precision = 2; // Hardcoded for now
812
813    decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
814}
815
816#[allow(clippy::too_many_arguments)]
817fn handle_record(
818    record: dbn::RecordRef,
819    symbol_map: &PitSymbolMap,
820    publisher_venue_map: &IndexMap<PublisherId, Venue>,
821    symbol_venue_map: &AHashMap<Symbol, Venue>,
822    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
823    ts_init: UnixNanos,
824    initialized_books: &HashSet<InstrumentId>,
825    bars_timestamp_on_close: bool,
826) -> anyhow::Result<(Option<Data>, Option<Data>)> {
827    let instrument_id = update_instrument_id_map(
828        &record,
829        symbol_map,
830        publisher_venue_map,
831        symbol_venue_map,
832        instrument_id_map,
833    )?;
834
835    let price_precision = 2; // Hardcoded for now
836
837    // For MBP-1 and quote-based schemas, always include trades since they're integral to the data
838    // For MBO, only include trades after the book is initialized to maintain consistency
839    let include_trades = if record.get::<dbn::Mbp1Msg>().is_some()
840        || record.get::<dbn::TbboMsg>().is_some()
841        || record.get::<dbn::Cmbp1Msg>().is_some()
842    {
843        true // These schemas include trade information directly
844    } else {
845        initialized_books.contains(&instrument_id) // MBO requires initialized book
846    };
847
848    decode_record(
849        &record,
850        instrument_id,
851        price_precision,
852        Some(ts_init),
853        include_trades,
854        bars_timestamp_on_close,
855    )
856}
857
858#[cfg(test)]
859mod tests {
860    use databento::live::Subscription;
861    use indexmap::IndexMap;
862    use rstest::*;
863    use time::macros::datetime;
864
865    use super::*;
866
867    fn create_test_handler(reconnect_timeout_mins: Option<u64>) -> DatabentoFeedHandler {
868        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
869        let (msg_tx, _msg_rx) = tokio::sync::mpsc::channel(100);
870
871        DatabentoFeedHandler::new(
872            "test_key".to_string(),
873            "GLBX.MDP3".to_string(),
874            cmd_rx,
875            msg_tx,
876            IndexMap::new(),
877            Arc::new(RwLock::new(AHashMap::new())),
878            false,
879            false,
880            reconnect_timeout_mins,
881        )
882    }
883
884    #[rstest]
885    #[case(Some(10))]
886    #[case(None)]
887    fn test_backoff_initialization(#[case] reconnect_timeout_mins: Option<u64>) {
888        let handler = create_test_handler(reconnect_timeout_mins);
889
890        assert_eq!(handler.reconnect_timeout_mins, reconnect_timeout_mins);
891        assert!(handler.subscriptions.is_empty());
892        assert!(handler.buffered_commands.is_empty());
893    }
894
895    #[rstest]
896    fn test_subscription_with_and_without_start() {
897        let start_time = datetime!(2024-01-01 00:00:00 UTC);
898        let sub_with_start = Subscription::builder()
899            .symbols("ES.FUT")
900            .schema(databento::dbn::Schema::Mbp1)
901            .start(start_time)
902            .build();
903
904        let mut sub_without_start = sub_with_start.clone();
905        sub_without_start.start = None;
906
907        assert!(sub_with_start.start.is_some());
908        assert!(sub_without_start.start.is_none());
909        assert_eq!(sub_with_start.schema, sub_without_start.schema);
910        assert_eq!(sub_with_start.symbols, sub_without_start.symbols);
911    }
912
913    #[rstest]
914    fn test_handler_initialization_state() {
915        let handler = create_test_handler(Some(10));
916
917        assert!(!handler.replay);
918        assert_eq!(handler.dataset, "GLBX.MDP3");
919        assert_eq!(handler.key, "test_key");
920        assert!(handler.subscriptions.is_empty());
921        assert!(handler.buffered_commands.is_empty());
922    }
923
924    #[rstest]
925    fn test_handler_with_no_timeout() {
926        let handler = create_test_handler(None);
927
928        assert_eq!(handler.reconnect_timeout_mins, None);
929        assert!(!handler.replay);
930    }
931
932    #[rstest]
933    fn test_handler_with_zero_timeout() {
934        let handler = create_test_handler(Some(0));
935
936        assert_eq!(handler.reconnect_timeout_mins, Some(0));
937        assert!(!handler.replay);
938    }
939}