nautilus_databento/
live.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    sync::{Arc, RwLock},
18    time::Duration as StdDuration,
19};
20
21use ahash::{AHashMap, HashSet, HashSetExt};
22use databento::{
23    dbn::{self, PitSymbolMap, Record, SymbolIndex},
24    live::Subscription,
25};
26use indexmap::IndexMap;
27use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
28use nautilus_model::{
29    data::{Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
30    enums::RecordFlag,
31    identifiers::{InstrumentId, Symbol, Venue},
32    instruments::InstrumentAny,
33};
34use nautilus_network::backoff::ExponentialBackoff;
35use tokio::{
36    sync::mpsc::error::TryRecvError,
37    time::{Duration, Instant},
38};
39
40use super::{
41    decode::{decode_imbalance_msg, decode_statistics_msg, decode_status_msg},
42    types::{DatabentoImbalance, DatabentoStatistics, SubscriptionAckEvent},
43};
44use crate::{
45    decode::{decode_instrument_def_msg, decode_record},
46    types::PublisherId,
47};
48
49#[derive(Debug)]
50pub enum LiveCommand {
51    Subscribe(Subscription),
52    Start,
53    Close,
54}
55
56#[derive(Debug)]
57#[allow(
58    clippy::large_enum_variant,
59    reason = "TODO: Optimize this (largest variant 1096 vs 80 bytes)"
60)]
61pub enum LiveMessage {
62    Data(Data),
63    Instrument(InstrumentAny),
64    Status(InstrumentStatus),
65    Imbalance(DatabentoImbalance),
66    Statistics(DatabentoStatistics),
67    SubscriptionAck(SubscriptionAckEvent),
68    Error(anyhow::Error),
69    Close,
70}
71
72/// Handles a raw TCP data feed from the Databento LSG for a single dataset.
73///
74/// [`LiveCommand`] messages are received synchronously across a channel,
75/// decoded records are sent asynchronously on a tokio channel as [`LiveMessage`]s
76/// back to a message processing task.
77///
78/// # Crash Policy
79///
80/// This handler intentionally crashes on catastrophic feed issues rather than
81/// attempting recovery. If excessive buffering occurs (indicating severe feed
82/// misbehavior), the process will run out of memory and terminate. This is by
83/// design - such scenarios indicate fundamental problems that require external
84/// intervention.
85#[derive(Debug)]
86pub struct DatabentoFeedHandler {
87    key: String,
88    dataset: String,
89    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
90    msg_tx: tokio::sync::mpsc::Sender<LiveMessage>,
91    publisher_venue_map: IndexMap<PublisherId, Venue>,
92    symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
93    replay: bool,
94    use_exchange_as_venue: bool,
95    bars_timestamp_on_close: bool,
96    reconnect_timeout_mins: Option<u64>,
97    backoff: ExponentialBackoff,
98    subscriptions: Vec<Subscription>,
99    buffered_commands: Vec<LiveCommand>,
100}
101
102impl DatabentoFeedHandler {
103    /// Creates a new [`DatabentoFeedHandler`] instance.
104    ///
105    /// # Panics
106    ///
107    /// Panics if exponential backoff creation fails (should never happen with valid hardcoded parameters).
108    #[must_use]
109    #[allow(clippy::too_many_arguments)]
110    pub fn new(
111        key: String,
112        dataset: String,
113        rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
114        tx: tokio::sync::mpsc::Sender<LiveMessage>,
115        publisher_venue_map: IndexMap<PublisherId, Venue>,
116        symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
117        use_exchange_as_venue: bool,
118        bars_timestamp_on_close: bool,
119        reconnect_timeout_mins: Option<u64>,
120    ) -> Self {
121        // Choose max delay based on timeout configuration:
122        // - With timeout: 60s max (quick recovery to reconnect within window)
123        // - Without timeout (None): 600s max (patient recovery, respectful of infrastructure)
124        let delay_max = if reconnect_timeout_mins.is_some() {
125            Duration::from_secs(60)
126        } else {
127            Duration::from_secs(600)
128        };
129
130        // SAFETY: Hardcoded parameters are all valid
131        let backoff =
132            ExponentialBackoff::new(Duration::from_secs(1), delay_max, 2.0, 1000, false).unwrap();
133
134        Self {
135            key,
136            dataset,
137            cmd_rx: rx,
138            msg_tx: tx,
139            publisher_venue_map,
140            symbol_venue_map,
141            replay: false,
142            use_exchange_as_venue,
143            bars_timestamp_on_close,
144            reconnect_timeout_mins,
145            backoff,
146            subscriptions: Vec::new(),
147            buffered_commands: Vec::new(),
148        }
149    }
150
151    /// Runs the feed handler main loop, processing commands and streaming market data.
152    ///
153    /// Establishes a connection to the Databento LSG, subscribes to requested data feeds,
154    /// and continuously processes incoming market data messages until shutdown.
155    ///
156    /// Implements automatic reconnection with exponential backoff (1s to 60s with jitter).
157    /// Each successful session resets the reconnection cycle, giving the next disconnect
158    /// a fresh timeout window. Gives up after `reconnect_timeout_mins` if configured.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if any client operation or message handling fails.
163    #[allow(clippy::blocks_in_conditions)]
164    pub async fn run(&mut self) -> anyhow::Result<()> {
165        log::debug!("Running feed handler");
166
167        let mut reconnect_start: Option<Instant> = None;
168        let mut attempt = 0;
169
170        loop {
171            attempt += 1;
172
173            match self.run_session(attempt).await {
174                Ok(ran_successfully) => {
175                    if ran_successfully {
176                        log::info!("Resetting reconnection cycle after successful session");
177                        reconnect_start = None;
178                        attempt = 0;
179                        self.backoff.reset();
180                        continue;
181                    } else {
182                        log::info!("Session ended normally");
183                        break Ok(());
184                    }
185                }
186                Err(e) => {
187                    let cycle_start = reconnect_start.get_or_insert_with(Instant::now);
188
189                    if let Some(timeout_mins) = self.reconnect_timeout_mins {
190                        let elapsed = cycle_start.elapsed();
191                        let timeout = Duration::from_secs(timeout_mins * 60);
192
193                        if elapsed >= timeout {
194                            log::error!("Giving up reconnection after {timeout_mins} minutes");
195                            self.send_msg(LiveMessage::Error(anyhow::anyhow!(
196                                "Reconnection timeout after {timeout_mins} minutes: {e}"
197                            )))
198                            .await;
199                            break Err(e);
200                        }
201                    }
202
203                    let delay = self.backoff.next_duration();
204
205                    log::warn!(
206                        "Connection lost (attempt {}): {}. Reconnecting in {}s...",
207                        attempt,
208                        e,
209                        delay.as_secs()
210                    );
211
212                    tokio::select! {
213                        () = tokio::time::sleep(delay) => {}
214                        cmd = self.cmd_rx.recv() => {
215                            match cmd {
216                                Some(LiveCommand::Close) => {
217                                    log::info!("Close received during backoff");
218                                    return Ok(());
219                                }
220                                None => {
221                                    log::debug!("Command channel closed during backoff");
222                                    return Ok(());
223                                }
224                                Some(cmd) => {
225                                    log::debug!("Buffering command received during backoff: {cmd:?}");
226                                    self.buffered_commands.push(cmd);
227                                }
228                            }
229                        }
230                    }
231                }
232            }
233        }
234    }
235
236    /// Runs a single session, handling connection, subscriptions, and data streaming.
237    ///
238    /// Returns `Ok(bool)` where the bool indicates if the session ran successfully
239    /// for a meaningful duration (true) or was intentionally closed (false).
240    ///
241    /// # Errors
242    ///
243    /// Returns an error if connection fails, subscription fails, or data streaming encounters an error.
244    async fn run_session(&mut self, attempt: usize) -> anyhow::Result<bool> {
245        if attempt > 1 {
246            log::info!("Reconnecting (attempt {attempt})...");
247        }
248
249        let session_start = Instant::now();
250        let clock = get_atomic_clock_realtime();
251        let mut symbol_map = PitSymbolMap::new();
252        let mut instrument_id_map: AHashMap<u32, InstrumentId> = AHashMap::new();
253
254        let mut buffering_start = None;
255        let mut buffered_deltas: AHashMap<InstrumentId, Vec<OrderBookDelta>> = AHashMap::new();
256        let mut initialized_books = HashSet::new();
257        let timeout = Duration::from_secs(5); // Hardcoded timeout for now
258
259        let result = tokio::time::timeout(
260            timeout,
261            databento::LiveClient::builder()
262                .user_agent_extension(NAUTILUS_USER_AGENT.into())
263                .key(self.key.clone())?
264                .dataset(self.dataset.clone())
265                .build(),
266        )
267        .await?;
268
269        let mut client = match result {
270            Ok(client) => {
271                if attempt > 1 {
272                    log::info!("Reconnected successfully");
273                } else {
274                    log::info!("Connected");
275                }
276                client
277            }
278            Err(e) => {
279                anyhow::bail!("Failed to connect to Databento LSG: {e}");
280            }
281        };
282
283        // Process any commands buffered during reconnection backoff
284        let mut start_buffered = false;
285        if !self.buffered_commands.is_empty() {
286            log::info!(
287                "Processing {} buffered commands",
288                self.buffered_commands.len()
289            );
290            for cmd in self.buffered_commands.drain(..) {
291                match cmd {
292                    LiveCommand::Subscribe(sub) => {
293                        if !self.replay && sub.start.is_some() {
294                            self.replay = true;
295                        }
296                        self.subscriptions.push(sub);
297                    }
298                    LiveCommand::Start => {
299                        start_buffered = true;
300                    }
301                    LiveCommand::Close => {
302                        log::warn!("Close command was buffered, shutting down");
303                        return Ok(false);
304                    }
305                }
306            }
307        }
308
309        let timeout = Duration::from_millis(10);
310        let mut running = false;
311
312        if !self.subscriptions.is_empty() {
313            log::info!(
314                "Resubscribing to {} subscriptions",
315                self.subscriptions.len()
316            );
317            for sub in self.subscriptions.clone() {
318                client.subscribe(sub).await?;
319            }
320            // Strip start timestamps after successful subscription to avoid replaying history on future reconnects
321            for sub in &mut self.subscriptions {
322                sub.start = None;
323            }
324            client.start().await?;
325            running = true;
326            log::info!("Resubscription complete");
327        } else if start_buffered {
328            log::info!("Starting session from buffered Start command");
329            buffering_start = if self.replay {
330                Some(clock.get_time_ns())
331            } else {
332                None
333            };
334            client.start().await?;
335            running = true;
336        }
337
338        loop {
339            if self.msg_tx.is_closed() {
340                log::debug!("Message channel was closed: stopping");
341                return Ok(false);
342            }
343
344            match self.cmd_rx.try_recv() {
345                Ok(cmd) => {
346                    log::debug!("Received command: {cmd:?}");
347                    match cmd {
348                        LiveCommand::Subscribe(sub) => {
349                            if !self.replay && sub.start.is_some() {
350                                self.replay = true;
351                            }
352                            client.subscribe(sub.clone()).await?;
353                            // Store without start to avoid replaying history on reconnect
354                            let mut sub_for_reconnect = sub;
355                            sub_for_reconnect.start = None;
356                            self.subscriptions.push(sub_for_reconnect);
357                        }
358                        LiveCommand::Start => {
359                            buffering_start = if self.replay {
360                                Some(clock.get_time_ns())
361                            } else {
362                                None
363                            };
364                            client.start().await?;
365                            running = true;
366                            log::debug!("Started");
367                        }
368                        LiveCommand::Close => {
369                            self.msg_tx.send(LiveMessage::Close).await?;
370                            if running {
371                                client.close().await?;
372                                log::debug!("Closed inner client");
373                            }
374                            return Ok(false);
375                        }
376                    }
377                }
378                Err(TryRecvError::Empty) => {}
379                Err(TryRecvError::Disconnected) => {
380                    log::debug!("Command channel disconnected");
381                    return Ok(false);
382                }
383            }
384
385            if !running {
386                continue;
387            }
388
389            let result = tokio::time::timeout(timeout, client.next_record()).await;
390            let record_opt = match result {
391                Ok(record_opt) => record_opt,
392                Err(_) => continue,
393            };
394
395            let record = match record_opt {
396                Ok(Some(record)) => record,
397                Ok(None) => {
398                    const SUCCESS_THRESHOLD: Duration = Duration::from_secs(60);
399                    if session_start.elapsed() >= SUCCESS_THRESHOLD {
400                        log::info!("Session ended after successful run");
401                        return Ok(true);
402                    }
403                    anyhow::bail!("Session ended by gateway");
404                }
405                Err(e) => {
406                    const SUCCESS_THRESHOLD: Duration = Duration::from_secs(60);
407                    if session_start.elapsed() >= SUCCESS_THRESHOLD {
408                        log::info!("Connection error after successful run: {e}");
409                        return Ok(true);
410                    }
411                    anyhow::bail!("Connection error: {e}");
412                }
413            };
414
415            let ts_init = clock.get_time_ns();
416
417            // Decode record
418            if let Some(msg) = record.get::<dbn::ErrorMsg>() {
419                handle_error_msg(msg);
420            } else if let Some(msg) = record.get::<dbn::SystemMsg>() {
421                if let Some(ack) = handle_system_msg(msg, ts_init) {
422                    self.send_msg(LiveMessage::SubscriptionAck(ack)).await;
423                }
424            } else if let Some(msg) = record.get::<dbn::SymbolMappingMsg>() {
425                // Remove instrument ID index as the raw symbol may have changed
426                instrument_id_map.remove(&msg.hd.instrument_id);
427                handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map)?;
428            } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
429                if self.use_exchange_as_venue {
430                    let exchange = msg.exchange()?;
431                    if !exchange.is_empty() {
432                        update_instrument_id_map_with_exchange(
433                            &symbol_map,
434                            &self.symbol_venue_map,
435                            &mut instrument_id_map,
436                            msg.hd.instrument_id,
437                            exchange,
438                        )?;
439                    }
440                }
441                let data = {
442                    let sym_map = self.read_symbol_venue_map()?;
443                    handle_instrument_def_msg(
444                        msg,
445                        &record,
446                        &symbol_map,
447                        &self.publisher_venue_map,
448                        &sym_map,
449                        &mut instrument_id_map,
450                        ts_init,
451                    )?
452                };
453                self.send_msg(LiveMessage::Instrument(data)).await;
454            } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
455                let data = {
456                    let sym_map = self.read_symbol_venue_map()?;
457                    handle_status_msg(
458                        msg,
459                        &record,
460                        &symbol_map,
461                        &self.publisher_venue_map,
462                        &sym_map,
463                        &mut instrument_id_map,
464                        ts_init,
465                    )?
466                };
467                self.send_msg(LiveMessage::Status(data)).await;
468            } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
469                let data = {
470                    let sym_map = self.read_symbol_venue_map()?;
471                    handle_imbalance_msg(
472                        msg,
473                        &record,
474                        &symbol_map,
475                        &self.publisher_venue_map,
476                        &sym_map,
477                        &mut instrument_id_map,
478                        ts_init,
479                    )?
480                };
481                self.send_msg(LiveMessage::Imbalance(data)).await;
482            } else if let Some(msg) = record.get::<dbn::StatMsg>() {
483                let data = {
484                    let sym_map = self.read_symbol_venue_map()?;
485                    handle_statistics_msg(
486                        msg,
487                        &record,
488                        &symbol_map,
489                        &self.publisher_venue_map,
490                        &sym_map,
491                        &mut instrument_id_map,
492                        ts_init,
493                    )?
494                };
495                self.send_msg(LiveMessage::Statistics(data)).await;
496            } else {
497                // Decode a generic record with possible errors
498                let res = {
499                    let sym_map = self.read_symbol_venue_map()?;
500                    handle_record(
501                        record,
502                        &symbol_map,
503                        &self.publisher_venue_map,
504                        &sym_map,
505                        &mut instrument_id_map,
506                        ts_init,
507                        &initialized_books,
508                        self.bars_timestamp_on_close,
509                    )
510                };
511                let (mut data1, data2) = match res {
512                    Ok(decoded) => decoded,
513                    Err(e) => {
514                        log::error!("Error decoding record: {e}");
515                        continue;
516                    }
517                };
518
519                if let Some(msg) = record.get::<dbn::MboMsg>() {
520                    // Check if should mark book initialized
521                    if let Some(Data::Delta(delta)) = &data1 {
522                        initialized_books.insert(delta.instrument_id);
523                    } else {
524                        continue; // No delta yet
525                    }
526
527                    if let Some(Data::Delta(delta)) = &data1 {
528                        let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
529                        buffer.push(*delta);
530
531                        log::trace!(
532                            "Buffering delta: {} {buffering_start:?} flags={}",
533                            delta.ts_event,
534                            msg.flags.raw(),
535                        );
536
537                        // Check if last message in the book event
538                        if !RecordFlag::F_LAST.matches(msg.flags.raw()) {
539                            continue; // NOT last message
540                        }
541
542                        // Check if snapshot
543                        if RecordFlag::F_SNAPSHOT.matches(msg.flags.raw()) {
544                            continue; // Buffer snapshot
545                        }
546
547                        // Check if buffering a replay
548                        if let Some(start_ns) = buffering_start {
549                            if delta.ts_event <= start_ns {
550                                continue; // Continue buffering replay
551                            }
552                            buffering_start = None;
553                        }
554
555                        // SAFETY: We can guarantee a deltas vec exists
556                        let buffer =
557                            buffered_deltas
558                                .remove(&delta.instrument_id)
559                                .ok_or_else(|| {
560                                    anyhow::anyhow!(
561                                        "Internal error: no buffered deltas for instrument {id}",
562                                        id = delta.instrument_id
563                                    )
564                                })?;
565                        let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
566                        let deltas = OrderBookDeltas_API::new(deltas);
567                        data1 = Some(Data::Deltas(deltas));
568                    }
569                }
570
571                if let Some(data) = data1 {
572                    self.send_msg(LiveMessage::Data(data)).await;
573                }
574
575                if let Some(data) = data2 {
576                    self.send_msg(LiveMessage::Data(data)).await;
577                }
578            }
579        }
580    }
581
582    /// Sends a message to the message processing task.
583    async fn send_msg(&mut self, msg: LiveMessage) {
584        log::trace!("Sending {msg:?}");
585        match self.msg_tx.send(msg).await {
586            Ok(()) => {}
587            Err(e) => log::error!("Error sending message: {e}"),
588        }
589    }
590
591    /// Acquires a read lock on the symbol-venue map with exponential backoff and timeout.
592    ///
593    /// # Errors
594    ///
595    /// Returns an error if the read lock cannot be acquired within the deadline.
596    fn read_symbol_venue_map(
597        &self,
598    ) -> anyhow::Result<std::sync::RwLockReadGuard<'_, AHashMap<Symbol, Venue>>> {
599        // Try to acquire the lock with exponential backoff and deadline
600        const MAX_WAIT_MS: u64 = 500; // Total maximum wait time
601        const INITIAL_DELAY_MICROS: u64 = 10;
602        const MAX_DELAY_MICROS: u64 = 1000;
603
604        let deadline = std::time::Instant::now() + StdDuration::from_millis(MAX_WAIT_MS);
605        let mut delay = INITIAL_DELAY_MICROS;
606
607        loop {
608            match self.symbol_venue_map.try_read() {
609                Ok(guard) => return Ok(guard),
610                Err(std::sync::TryLockError::WouldBlock) => {
611                    if std::time::Instant::now() >= deadline {
612                        break;
613                    }
614
615                    // Yield to other threads first
616                    std::thread::yield_now();
617
618                    // Then sleep with exponential backoff if still blocked
619                    if std::time::Instant::now() < deadline {
620                        let remaining = deadline - std::time::Instant::now();
621                        let sleep_duration = StdDuration::from_micros(delay).min(remaining);
622                        std::thread::sleep(sleep_duration);
623                        // Exponential backoff with cap and jitter
624                        delay = ((delay * 2) + delay / 4).min(MAX_DELAY_MICROS);
625                    }
626                }
627                Err(std::sync::TryLockError::Poisoned(e)) => {
628                    anyhow::bail!("symbol_venue_map lock poisoned: {e}");
629                }
630            }
631        }
632
633        anyhow::bail!(
634            "Failed to acquire read lock on symbol_venue_map after {MAX_WAIT_MS}ms deadline"
635        )
636    }
637}
638
639/// Handles Databento error messages by logging them.
640fn handle_error_msg(msg: &dbn::ErrorMsg) {
641    log::error!("{msg:?}");
642}
643
644/// Handles Databento system messages, returning a subscription ack event if applicable.
645fn handle_system_msg(msg: &dbn::SystemMsg, ts_received: UnixNanos) -> Option<SubscriptionAckEvent> {
646    match msg.code() {
647        Ok(dbn::SystemCode::SubscriptionAck) => {
648            let message = msg.msg().unwrap_or("<invalid utf-8>");
649            log::info!("Subscription acknowledged: {message}");
650
651            let schema = parse_ack_message(message);
652
653            Some(SubscriptionAckEvent {
654                schema,
655                message: message.to_string(),
656                ts_received,
657            })
658        }
659        Ok(dbn::SystemCode::Heartbeat) => {
660            log::trace!("Heartbeat received");
661            None
662        }
663        Ok(dbn::SystemCode::SlowReaderWarning) => {
664            let message = msg.msg().unwrap_or("<invalid utf-8>");
665            log::warn!("Slow reader warning: {message}");
666            None
667        }
668        Ok(dbn::SystemCode::ReplayCompleted) => {
669            let message = msg.msg().unwrap_or("<invalid utf-8>");
670            log::info!("Replay completed: {message}");
671            None
672        }
673        _ => {
674            log::debug!("{msg:?}");
675            None
676        }
677    }
678}
679
680/// Parses a subscription ack message to extract the schema.
681fn parse_ack_message(message: &str) -> String {
682    // Format: "Subscription request N for <schema> data succeeded"
683    message
684        .strip_prefix("Subscription request ")
685        .and_then(|rest| rest.split_once(" for "))
686        .and_then(|(_, after_num)| after_num.strip_suffix(" data succeeded"))
687        .map(|schema| schema.trim().to_string())
688        .unwrap_or_default()
689}
690
691/// Handles symbol mapping messages and updates the instrument ID map.
692///
693/// # Errors
694///
695/// Returns an error if symbol mapping fails.
696fn handle_symbol_mapping_msg(
697    msg: &dbn::SymbolMappingMsg,
698    symbol_map: &mut PitSymbolMap,
699    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
700) -> anyhow::Result<()> {
701    symbol_map
702        .on_symbol_mapping(msg)
703        .map_err(|e| anyhow::anyhow!("on_symbol_mapping failed for {msg:?}: {e}"))?;
704    instrument_id_map.remove(&msg.header().instrument_id);
705    Ok(())
706}
707
708/// Updates the instrument ID map using exchange information from the symbol map.
709fn update_instrument_id_map_with_exchange(
710    symbol_map: &PitSymbolMap,
711    symbol_venue_map: &RwLock<AHashMap<Symbol, Venue>>,
712    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
713    raw_instrument_id: u32,
714    exchange: &str,
715) -> anyhow::Result<InstrumentId> {
716    let raw_symbol = symbol_map.get(raw_instrument_id).ok_or_else(|| {
717        anyhow::anyhow!("Cannot resolve raw_symbol for instrument_id {raw_instrument_id}")
718    })?;
719    let symbol = Symbol::from(raw_symbol.as_str());
720    let venue = Venue::from_code(exchange)
721        .map_err(|e| anyhow::anyhow!("Invalid venue code '{exchange}': {e}"))?;
722    let instrument_id = InstrumentId::new(symbol, venue);
723    let mut map = symbol_venue_map
724        .write()
725        .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
726    map.entry(symbol).or_insert(venue);
727    instrument_id_map.insert(raw_instrument_id, instrument_id);
728    Ok(instrument_id)
729}
730
731fn update_instrument_id_map(
732    record: &dbn::RecordRef,
733    symbol_map: &PitSymbolMap,
734    publisher_venue_map: &IndexMap<PublisherId, Venue>,
735    symbol_venue_map: &AHashMap<Symbol, Venue>,
736    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
737) -> anyhow::Result<InstrumentId> {
738    let header = record.header();
739
740    // Check if instrument ID is already in the map
741    if let Some(&instrument_id) = instrument_id_map.get(&header.instrument_id) {
742        return Ok(instrument_id);
743    }
744
745    let raw_symbol = symbol_map.get_for_rec(record).ok_or_else(|| {
746        anyhow::anyhow!(
747            "Cannot resolve `raw_symbol` from `symbol_map` for instrument_id {}",
748            header.instrument_id
749        )
750    })?;
751
752    let symbol = Symbol::from_str_unchecked(raw_symbol);
753
754    let publisher_id = header.publisher_id;
755    let venue = if let Some(venue) = symbol_venue_map.get(&symbol) {
756        *venue
757    } else {
758        let venue = publisher_venue_map
759            .get(&publisher_id)
760            .ok_or_else(|| anyhow::anyhow!("No venue found for `publisher_id` {publisher_id}"))?;
761        *venue
762    };
763    let instrument_id = InstrumentId::new(symbol, venue);
764
765    instrument_id_map.insert(header.instrument_id, instrument_id);
766    Ok(instrument_id)
767}
768
769/// Handles instrument definition messages and decodes them into Nautilus instruments.
770///
771/// # Errors
772///
773/// Returns an error if instrument decoding fails.
774fn handle_instrument_def_msg(
775    msg: &dbn::InstrumentDefMsg,
776    record: &dbn::RecordRef,
777    symbol_map: &PitSymbolMap,
778    publisher_venue_map: &IndexMap<PublisherId, Venue>,
779    symbol_venue_map: &AHashMap<Symbol, Venue>,
780    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
781    ts_init: UnixNanos,
782) -> anyhow::Result<InstrumentAny> {
783    let instrument_id = update_instrument_id_map(
784        record,
785        symbol_map,
786        publisher_venue_map,
787        symbol_venue_map,
788        instrument_id_map,
789    )?;
790
791    decode_instrument_def_msg(msg, instrument_id, Some(ts_init))
792}
793
794fn handle_status_msg(
795    msg: &dbn::StatusMsg,
796    record: &dbn::RecordRef,
797    symbol_map: &PitSymbolMap,
798    publisher_venue_map: &IndexMap<PublisherId, Venue>,
799    symbol_venue_map: &AHashMap<Symbol, Venue>,
800    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
801    ts_init: UnixNanos,
802) -> anyhow::Result<InstrumentStatus> {
803    let instrument_id = update_instrument_id_map(
804        record,
805        symbol_map,
806        publisher_venue_map,
807        symbol_venue_map,
808        instrument_id_map,
809    )?;
810
811    decode_status_msg(msg, instrument_id, Some(ts_init))
812}
813
814fn handle_imbalance_msg(
815    msg: &dbn::ImbalanceMsg,
816    record: &dbn::RecordRef,
817    symbol_map: &PitSymbolMap,
818    publisher_venue_map: &IndexMap<PublisherId, Venue>,
819    symbol_venue_map: &AHashMap<Symbol, Venue>,
820    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
821    ts_init: UnixNanos,
822) -> anyhow::Result<DatabentoImbalance> {
823    let instrument_id = update_instrument_id_map(
824        record,
825        symbol_map,
826        publisher_venue_map,
827        symbol_venue_map,
828        instrument_id_map,
829    )?;
830
831    let price_precision = 2; // Hardcoded for now
832
833    decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init))
834}
835
836fn handle_statistics_msg(
837    msg: &dbn::StatMsg,
838    record: &dbn::RecordRef,
839    symbol_map: &PitSymbolMap,
840    publisher_venue_map: &IndexMap<PublisherId, Venue>,
841    symbol_venue_map: &AHashMap<Symbol, Venue>,
842    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
843    ts_init: UnixNanos,
844) -> anyhow::Result<DatabentoStatistics> {
845    let instrument_id = update_instrument_id_map(
846        record,
847        symbol_map,
848        publisher_venue_map,
849        symbol_venue_map,
850        instrument_id_map,
851    )?;
852
853    let price_precision = 2; // Hardcoded for now
854
855    decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
856}
857
858#[allow(clippy::too_many_arguments)]
859fn handle_record(
860    record: dbn::RecordRef,
861    symbol_map: &PitSymbolMap,
862    publisher_venue_map: &IndexMap<PublisherId, Venue>,
863    symbol_venue_map: &AHashMap<Symbol, Venue>,
864    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
865    ts_init: UnixNanos,
866    initialized_books: &HashSet<InstrumentId>,
867    bars_timestamp_on_close: bool,
868) -> anyhow::Result<(Option<Data>, Option<Data>)> {
869    let instrument_id = update_instrument_id_map(
870        &record,
871        symbol_map,
872        publisher_venue_map,
873        symbol_venue_map,
874        instrument_id_map,
875    )?;
876
877    let price_precision = 2; // Hardcoded for now
878
879    // For MBP-1 and quote-based schemas, always include trades since they're integral to the data
880    // For MBO, only include trades after the book is initialized to maintain consistency
881    let include_trades = if record.get::<dbn::Mbp1Msg>().is_some()
882        || record.get::<dbn::TbboMsg>().is_some()
883        || record.get::<dbn::Cmbp1Msg>().is_some()
884    {
885        true // These schemas include trade information directly
886    } else {
887        initialized_books.contains(&instrument_id) // MBO requires initialized book
888    };
889
890    decode_record(
891        &record,
892        instrument_id,
893        price_precision,
894        Some(ts_init),
895        include_trades,
896        bars_timestamp_on_close,
897    )
898}
899
900#[cfg(test)]
901mod tests {
902    use databento::live::Subscription;
903    use indexmap::IndexMap;
904    use rstest::*;
905    use time::macros::datetime;
906
907    use super::*;
908
909    fn create_test_handler(reconnect_timeout_mins: Option<u64>) -> DatabentoFeedHandler {
910        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
911        let (msg_tx, _msg_rx) = tokio::sync::mpsc::channel(100);
912
913        DatabentoFeedHandler::new(
914            "test_key".to_string(),
915            "GLBX.MDP3".to_string(),
916            cmd_rx,
917            msg_tx,
918            IndexMap::new(),
919            Arc::new(RwLock::new(AHashMap::new())),
920            false,
921            false,
922            reconnect_timeout_mins,
923        )
924    }
925
926    #[rstest]
927    #[case(Some(10))]
928    #[case(None)]
929    fn test_backoff_initialization(#[case] reconnect_timeout_mins: Option<u64>) {
930        let handler = create_test_handler(reconnect_timeout_mins);
931
932        assert_eq!(handler.reconnect_timeout_mins, reconnect_timeout_mins);
933        assert!(handler.subscriptions.is_empty());
934        assert!(handler.buffered_commands.is_empty());
935    }
936
937    #[rstest]
938    fn test_subscription_with_and_without_start() {
939        let start_time = datetime!(2024-01-01 00:00:00 UTC);
940        let sub_with_start = Subscription::builder()
941            .symbols("ES.FUT")
942            .schema(databento::dbn::Schema::Mbp1)
943            .start(start_time)
944            .build();
945
946        let mut sub_without_start = sub_with_start.clone();
947        sub_without_start.start = None;
948
949        assert!(sub_with_start.start.is_some());
950        assert!(sub_without_start.start.is_none());
951        assert_eq!(sub_with_start.schema, sub_without_start.schema);
952        assert_eq!(sub_with_start.symbols, sub_without_start.symbols);
953    }
954
955    #[rstest]
956    fn test_handler_initialization_state() {
957        let handler = create_test_handler(Some(10));
958
959        assert!(!handler.replay);
960        assert_eq!(handler.dataset, "GLBX.MDP3");
961        assert_eq!(handler.key, "test_key");
962        assert!(handler.subscriptions.is_empty());
963        assert!(handler.buffered_commands.is_empty());
964    }
965
966    #[rstest]
967    fn test_handler_with_no_timeout() {
968        let handler = create_test_handler(None);
969
970        assert_eq!(handler.reconnect_timeout_mins, None);
971        assert!(!handler.replay);
972    }
973
974    #[rstest]
975    fn test_handler_with_zero_timeout() {
976        let handler = create_test_handler(Some(0));
977
978        assert_eq!(handler.reconnect_timeout_mins, Some(0));
979        assert!(!handler.replay);
980    }
981}