1use std::{
17 sync::{Arc, RwLock},
18 time::Duration as StdDuration,
19};
20
21use ahash::{AHashMap, HashSet, HashSetExt};
22use databento::{
23 dbn::{self, PitSymbolMap, Record, SymbolIndex},
24 live::Subscription,
25};
26use indexmap::IndexMap;
27use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
28use nautilus_model::{
29 data::{Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
30 enums::RecordFlag,
31 identifiers::{InstrumentId, Symbol, Venue},
32 instruments::InstrumentAny,
33};
34use nautilus_network::backoff::ExponentialBackoff;
35use tokio::{
36 sync::mpsc::error::TryRecvError,
37 time::{Duration, Instant},
38};
39
40use super::{
41 decode::{decode_imbalance_msg, decode_statistics_msg, decode_status_msg},
42 types::{DatabentoImbalance, DatabentoStatistics},
43};
44use crate::{
45 decode::{decode_instrument_def_msg, decode_record},
46 types::PublisherId,
47};
48
49#[derive(Debug)]
50pub enum LiveCommand {
51 Subscribe(Subscription),
52 Start,
53 Close,
54}
55
56#[derive(Debug)]
57#[allow(
58 clippy::large_enum_variant,
59 reason = "TODO: Optimize this (largest variant 1096 vs 80 bytes)"
60)]
61pub enum LiveMessage {
62 Data(Data),
63 Instrument(InstrumentAny),
64 Status(InstrumentStatus),
65 Imbalance(DatabentoImbalance),
66 Statistics(DatabentoStatistics),
67 Error(anyhow::Error),
68 Close,
69}
70
71#[derive(Debug)]
85pub struct DatabentoFeedHandler {
86 key: String,
87 dataset: String,
88 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
89 msg_tx: tokio::sync::mpsc::Sender<LiveMessage>,
90 publisher_venue_map: IndexMap<PublisherId, Venue>,
91 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
92 replay: bool,
93 use_exchange_as_venue: bool,
94 bars_timestamp_on_close: bool,
95 reconnect_timeout_mins: Option<u64>,
96 backoff: ExponentialBackoff,
97 subscriptions: Vec<Subscription>,
98 buffered_commands: Vec<LiveCommand>,
99}
100
101impl DatabentoFeedHandler {
102 #[must_use]
108 #[allow(clippy::too_many_arguments)]
109 pub fn new(
110 key: String,
111 dataset: String,
112 rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
113 tx: tokio::sync::mpsc::Sender<LiveMessage>,
114 publisher_venue_map: IndexMap<PublisherId, Venue>,
115 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
116 use_exchange_as_venue: bool,
117 bars_timestamp_on_close: bool,
118 reconnect_timeout_mins: Option<u64>,
119 ) -> Self {
120 let delay_max = if reconnect_timeout_mins.is_some() {
124 Duration::from_secs(60)
125 } else {
126 Duration::from_secs(600)
127 };
128
129 let backoff =
131 ExponentialBackoff::new(Duration::from_secs(1), delay_max, 2.0, 1000, false).unwrap();
132
133 Self {
134 key,
135 dataset,
136 cmd_rx: rx,
137 msg_tx: tx,
138 publisher_venue_map,
139 symbol_venue_map,
140 replay: false,
141 use_exchange_as_venue,
142 bars_timestamp_on_close,
143 reconnect_timeout_mins,
144 backoff,
145 subscriptions: Vec::new(),
146 buffered_commands: Vec::new(),
147 }
148 }
149
150 #[allow(clippy::blocks_in_conditions)]
163 pub async fn run(&mut self) -> anyhow::Result<()> {
164 tracing::debug!("Running feed handler");
165
166 let mut reconnect_start: Option<Instant> = None;
167 let mut attempt = 0;
168
169 loop {
170 attempt += 1;
171
172 match self.run_session(attempt).await {
173 Ok(ran_successfully) => {
174 if ran_successfully {
175 tracing::info!("Resetting reconnection cycle after successful session");
176 reconnect_start = None;
177 attempt = 0;
178 self.backoff.reset();
179 continue;
180 } else {
181 tracing::info!("Session ended normally");
182 break Ok(());
183 }
184 }
185 Err(e) => {
186 let cycle_start = reconnect_start.get_or_insert_with(Instant::now);
187
188 if let Some(timeout_mins) = self.reconnect_timeout_mins {
189 let elapsed = cycle_start.elapsed();
190 let timeout = Duration::from_secs(timeout_mins * 60);
191
192 if elapsed >= timeout {
193 tracing::error!(
194 "Giving up reconnection after {} minutes",
195 timeout_mins
196 );
197 self.send_msg(LiveMessage::Error(anyhow::anyhow!(
198 "Reconnection timeout after {timeout_mins} minutes: {e}"
199 )))
200 .await;
201 break Err(e);
202 }
203 }
204
205 let delay = self.backoff.next_duration();
206
207 tracing::warn!(
208 "Connection lost (attempt {}): {}. Reconnecting in {}s...",
209 attempt,
210 e,
211 delay.as_secs()
212 );
213
214 tokio::select! {
215 _ = tokio::time::sleep(delay) => {}
216 cmd = self.cmd_rx.recv() => {
217 match cmd {
218 Some(LiveCommand::Close) => {
219 tracing::info!("Close received during backoff");
220 return Ok(());
221 }
222 None => {
223 tracing::debug!("Command channel closed during backoff");
224 return Ok(());
225 }
226 Some(cmd) => {
227 tracing::debug!("Buffering command received during backoff: {:?}", cmd);
228 self.buffered_commands.push(cmd);
229 }
230 }
231 }
232 }
233 }
234 }
235 }
236 }
237
238 async fn run_session(&mut self, attempt: usize) -> anyhow::Result<bool> {
247 if attempt > 1 {
248 tracing::info!("Reconnecting (attempt {})...", attempt);
249 }
250
251 let session_start = Instant::now();
252 let clock = get_atomic_clock_realtime();
253 let mut symbol_map = PitSymbolMap::new();
254 let mut instrument_id_map: AHashMap<u32, InstrumentId> = AHashMap::new();
255
256 let mut buffering_start = None;
257 let mut buffered_deltas: AHashMap<InstrumentId, Vec<OrderBookDelta>> = AHashMap::new();
258 let mut initialized_books = HashSet::new();
259 let timeout = Duration::from_secs(5); let result = tokio::time::timeout(
262 timeout,
263 databento::LiveClient::builder()
264 .user_agent_extension(NAUTILUS_USER_AGENT.into())
265 .key(self.key.clone())?
266 .dataset(self.dataset.clone())
267 .build(),
268 )
269 .await?;
270
271 let mut client = match result {
272 Ok(client) => {
273 if attempt > 1 {
274 tracing::info!("Reconnected successfully");
275 } else {
276 tracing::info!("Connected");
277 }
278 client
279 }
280 Err(e) => {
281 anyhow::bail!("Failed to connect to Databento LSG: {e}");
282 }
283 };
284
285 let mut start_buffered = false;
287 if !self.buffered_commands.is_empty() {
288 tracing::info!(
289 "Processing {} buffered commands",
290 self.buffered_commands.len()
291 );
292 for cmd in self.buffered_commands.drain(..) {
293 match cmd {
294 LiveCommand::Subscribe(sub) => {
295 if !self.replay && sub.start.is_some() {
296 self.replay = true;
297 }
298 self.subscriptions.push(sub);
299 }
300 LiveCommand::Start => {
301 start_buffered = true;
302 }
303 LiveCommand::Close => {
304 tracing::warn!("Close command was buffered, shutting down");
305 return Ok(false);
306 }
307 }
308 }
309 }
310
311 let timeout = Duration::from_millis(10);
312 let mut running = false;
313
314 if !self.subscriptions.is_empty() {
315 tracing::info!(
316 "Resubscribing to {} subscriptions",
317 self.subscriptions.len()
318 );
319 for sub in self.subscriptions.clone() {
320 client.subscribe(sub).await?;
321 }
322 for sub in &mut self.subscriptions {
324 sub.start = None;
325 }
326 client.start().await?;
327 running = true;
328 tracing::info!("Resubscription complete");
329 } else if start_buffered {
330 tracing::info!("Starting session from buffered Start command");
331 buffering_start = if self.replay {
332 Some(clock.get_time_ns())
333 } else {
334 None
335 };
336 client.start().await?;
337 running = true;
338 }
339
340 loop {
341 if self.msg_tx.is_closed() {
342 tracing::debug!("Message channel was closed: stopping");
343 return Ok(false);
344 }
345
346 match self.cmd_rx.try_recv() {
347 Ok(cmd) => {
348 tracing::debug!("Received command: {cmd:?}");
349 match cmd {
350 LiveCommand::Subscribe(sub) => {
351 if !self.replay && sub.start.is_some() {
352 self.replay = true;
353 }
354 client.subscribe(sub.clone()).await?;
355 let mut sub_for_reconnect = sub;
357 sub_for_reconnect.start = None;
358 self.subscriptions.push(sub_for_reconnect);
359 }
360 LiveCommand::Start => {
361 buffering_start = if self.replay {
362 Some(clock.get_time_ns())
363 } else {
364 None
365 };
366 client.start().await?;
367 running = true;
368 tracing::debug!("Started");
369 }
370 LiveCommand::Close => {
371 self.msg_tx.send(LiveMessage::Close).await?;
372 if running {
373 client.close().await?;
374 tracing::debug!("Closed inner client");
375 }
376 return Ok(false);
377 }
378 }
379 }
380 Err(TryRecvError::Empty) => {}
381 Err(TryRecvError::Disconnected) => {
382 tracing::debug!("Command channel disconnected");
383 return Ok(false);
384 }
385 }
386
387 if !running {
388 continue;
389 }
390
391 let result = tokio::time::timeout(timeout, client.next_record()).await;
392 let record_opt = match result {
393 Ok(record_opt) => record_opt,
394 Err(_) => continue,
395 };
396
397 let record = match record_opt {
398 Ok(Some(record)) => record,
399 Ok(None) => {
400 const SUCCESS_THRESHOLD: Duration = Duration::from_secs(60);
401 if session_start.elapsed() >= SUCCESS_THRESHOLD {
402 tracing::info!("Session ended after successful run");
403 return Ok(true);
404 }
405 anyhow::bail!("Session ended by gateway");
406 }
407 Err(e) => {
408 const SUCCESS_THRESHOLD: Duration = Duration::from_secs(60);
409 if session_start.elapsed() >= SUCCESS_THRESHOLD {
410 tracing::info!("Connection error after successful run: {e}");
411 return Ok(true);
412 }
413 anyhow::bail!("Connection error: {e}");
414 }
415 };
416
417 let ts_init = clock.get_time_ns();
418
419 if let Some(msg) = record.get::<dbn::ErrorMsg>() {
421 handle_error_msg(msg);
422 } else if let Some(msg) = record.get::<dbn::SystemMsg>() {
423 handle_system_msg(msg);
424 } else if let Some(msg) = record.get::<dbn::SymbolMappingMsg>() {
425 instrument_id_map.remove(&msg.hd.instrument_id);
427 handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map)?;
428 } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
429 if self.use_exchange_as_venue {
430 let exchange = msg.exchange()?;
431 if !exchange.is_empty() {
432 update_instrument_id_map_with_exchange(
433 &symbol_map,
434 &self.symbol_venue_map,
435 &mut instrument_id_map,
436 msg.hd.instrument_id,
437 exchange,
438 )?;
439 }
440 }
441 let data = {
442 let sym_map = self.read_symbol_venue_map()?;
443 handle_instrument_def_msg(
444 msg,
445 &record,
446 &symbol_map,
447 &self.publisher_venue_map,
448 &sym_map,
449 &mut instrument_id_map,
450 ts_init,
451 )?
452 };
453 self.send_msg(LiveMessage::Instrument(data)).await;
454 } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
455 let data = {
456 let sym_map = self.read_symbol_venue_map()?;
457 handle_status_msg(
458 msg,
459 &record,
460 &symbol_map,
461 &self.publisher_venue_map,
462 &sym_map,
463 &mut instrument_id_map,
464 ts_init,
465 )?
466 };
467 self.send_msg(LiveMessage::Status(data)).await;
468 } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
469 let data = {
470 let sym_map = self.read_symbol_venue_map()?;
471 handle_imbalance_msg(
472 msg,
473 &record,
474 &symbol_map,
475 &self.publisher_venue_map,
476 &sym_map,
477 &mut instrument_id_map,
478 ts_init,
479 )?
480 };
481 self.send_msg(LiveMessage::Imbalance(data)).await;
482 } else if let Some(msg) = record.get::<dbn::StatMsg>() {
483 let data = {
484 let sym_map = self.read_symbol_venue_map()?;
485 handle_statistics_msg(
486 msg,
487 &record,
488 &symbol_map,
489 &self.publisher_venue_map,
490 &sym_map,
491 &mut instrument_id_map,
492 ts_init,
493 )?
494 };
495 self.send_msg(LiveMessage::Statistics(data)).await;
496 } else {
497 let res = {
499 let sym_map = self.read_symbol_venue_map()?;
500 handle_record(
501 record,
502 &symbol_map,
503 &self.publisher_venue_map,
504 &sym_map,
505 &mut instrument_id_map,
506 ts_init,
507 &initialized_books,
508 self.bars_timestamp_on_close,
509 )
510 };
511 let (mut data1, data2) = match res {
512 Ok(decoded) => decoded,
513 Err(e) => {
514 tracing::error!("Error decoding record: {e}");
515 continue;
516 }
517 };
518
519 if let Some(msg) = record.get::<dbn::MboMsg>() {
520 if let Some(Data::Delta(delta)) = &data1 {
522 initialized_books.insert(delta.instrument_id);
523 } else {
524 continue; }
526
527 if let Some(Data::Delta(delta)) = &data1 {
528 let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
529 buffer.push(*delta);
530
531 tracing::trace!(
532 "Buffering delta: {} {buffering_start:?} flags={}",
533 delta.ts_event,
534 msg.flags.raw(),
535 );
536
537 if !RecordFlag::F_LAST.matches(msg.flags.raw()) {
539 continue; }
541
542 if RecordFlag::F_SNAPSHOT.matches(msg.flags.raw()) {
544 continue; }
546
547 if let Some(start_ns) = buffering_start {
549 if delta.ts_event <= start_ns {
550 continue; }
552 buffering_start = None;
553 }
554
555 let buffer =
557 buffered_deltas
558 .remove(&delta.instrument_id)
559 .ok_or_else(|| {
560 anyhow::anyhow!(
561 "Internal error: no buffered deltas for instrument {id}",
562 id = delta.instrument_id
563 )
564 })?;
565 let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
566 let deltas = OrderBookDeltas_API::new(deltas);
567 data1 = Some(Data::Deltas(deltas));
568 }
569 }
570
571 if let Some(data) = data1 {
572 self.send_msg(LiveMessage::Data(data)).await;
573 }
574
575 if let Some(data) = data2 {
576 self.send_msg(LiveMessage::Data(data)).await;
577 }
578 }
579 }
580 }
581
582 async fn send_msg(&mut self, msg: LiveMessage) {
584 tracing::trace!("Sending {msg:?}");
585 match self.msg_tx.send(msg).await {
586 Ok(()) => {}
587 Err(e) => tracing::error!("Error sending message: {e}"),
588 }
589 }
590
591 fn read_symbol_venue_map(
597 &self,
598 ) -> anyhow::Result<std::sync::RwLockReadGuard<'_, AHashMap<Symbol, Venue>>> {
599 const MAX_WAIT_MS: u64 = 500; const INITIAL_DELAY_MICROS: u64 = 10;
602 const MAX_DELAY_MICROS: u64 = 1000;
603
604 let deadline = std::time::Instant::now() + StdDuration::from_millis(MAX_WAIT_MS);
605 let mut delay = INITIAL_DELAY_MICROS;
606
607 loop {
608 match self.symbol_venue_map.try_read() {
609 Ok(guard) => return Ok(guard),
610 Err(std::sync::TryLockError::WouldBlock) => {
611 if std::time::Instant::now() >= deadline {
612 break;
613 }
614
615 std::thread::yield_now();
617
618 if std::time::Instant::now() < deadline {
620 let remaining = deadline - std::time::Instant::now();
621 let sleep_duration = StdDuration::from_micros(delay).min(remaining);
622 std::thread::sleep(sleep_duration);
623 delay = ((delay * 2) + delay / 4).min(MAX_DELAY_MICROS);
625 }
626 }
627 Err(std::sync::TryLockError::Poisoned(e)) => {
628 anyhow::bail!("symbol_venue_map lock poisoned: {e}");
629 }
630 }
631 }
632
633 anyhow::bail!(
634 "Failed to acquire read lock on symbol_venue_map after {MAX_WAIT_MS}ms deadline"
635 )
636 }
637}
638
639fn handle_error_msg(msg: &dbn::ErrorMsg) {
641 tracing::error!("{msg:?}");
642}
643
644fn handle_system_msg(msg: &dbn::SystemMsg) {
646 tracing::debug!("{msg:?}");
647}
648
649fn handle_symbol_mapping_msg(
655 msg: &dbn::SymbolMappingMsg,
656 symbol_map: &mut PitSymbolMap,
657 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
658) -> anyhow::Result<()> {
659 symbol_map
660 .on_symbol_mapping(msg)
661 .map_err(|e| anyhow::anyhow!("on_symbol_mapping failed for {msg:?}: {e}"))?;
662 instrument_id_map.remove(&msg.header().instrument_id);
663 Ok(())
664}
665
666fn update_instrument_id_map_with_exchange(
668 symbol_map: &PitSymbolMap,
669 symbol_venue_map: &RwLock<AHashMap<Symbol, Venue>>,
670 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
671 raw_instrument_id: u32,
672 exchange: &str,
673) -> anyhow::Result<InstrumentId> {
674 let raw_symbol = symbol_map.get(raw_instrument_id).ok_or_else(|| {
675 anyhow::anyhow!("Cannot resolve raw_symbol for instrument_id {raw_instrument_id}")
676 })?;
677 let symbol = Symbol::from(raw_symbol.as_str());
678 let venue = Venue::from_code(exchange)
679 .map_err(|e| anyhow::anyhow!("Invalid venue code '{exchange}': {e}"))?;
680 let instrument_id = InstrumentId::new(symbol, venue);
681 let mut map = symbol_venue_map
682 .write()
683 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
684 map.entry(symbol).or_insert(venue);
685 instrument_id_map.insert(raw_instrument_id, instrument_id);
686 Ok(instrument_id)
687}
688
689fn update_instrument_id_map(
690 record: &dbn::RecordRef,
691 symbol_map: &PitSymbolMap,
692 publisher_venue_map: &IndexMap<PublisherId, Venue>,
693 symbol_venue_map: &AHashMap<Symbol, Venue>,
694 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
695) -> anyhow::Result<InstrumentId> {
696 let header = record.header();
697
698 if let Some(&instrument_id) = instrument_id_map.get(&header.instrument_id) {
700 return Ok(instrument_id);
701 }
702
703 let raw_symbol = symbol_map.get_for_rec(record).ok_or_else(|| {
704 anyhow::anyhow!(
705 "Cannot resolve `raw_symbol` from `symbol_map` for instrument_id {}",
706 header.instrument_id
707 )
708 })?;
709
710 let symbol = Symbol::from_str_unchecked(raw_symbol);
711
712 let publisher_id = header.publisher_id;
713 let venue = if let Some(venue) = symbol_venue_map.get(&symbol) {
714 *venue
715 } else {
716 let venue = publisher_venue_map
717 .get(&publisher_id)
718 .ok_or_else(|| anyhow::anyhow!("No venue found for `publisher_id` {publisher_id}"))?;
719 *venue
720 };
721 let instrument_id = InstrumentId::new(symbol, venue);
722
723 instrument_id_map.insert(header.instrument_id, instrument_id);
724 Ok(instrument_id)
725}
726
727fn handle_instrument_def_msg(
733 msg: &dbn::InstrumentDefMsg,
734 record: &dbn::RecordRef,
735 symbol_map: &PitSymbolMap,
736 publisher_venue_map: &IndexMap<PublisherId, Venue>,
737 symbol_venue_map: &AHashMap<Symbol, Venue>,
738 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
739 ts_init: UnixNanos,
740) -> anyhow::Result<InstrumentAny> {
741 let instrument_id = update_instrument_id_map(
742 record,
743 symbol_map,
744 publisher_venue_map,
745 symbol_venue_map,
746 instrument_id_map,
747 )?;
748
749 decode_instrument_def_msg(msg, instrument_id, Some(ts_init))
750}
751
752fn handle_status_msg(
753 msg: &dbn::StatusMsg,
754 record: &dbn::RecordRef,
755 symbol_map: &PitSymbolMap,
756 publisher_venue_map: &IndexMap<PublisherId, Venue>,
757 symbol_venue_map: &AHashMap<Symbol, Venue>,
758 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
759 ts_init: UnixNanos,
760) -> anyhow::Result<InstrumentStatus> {
761 let instrument_id = update_instrument_id_map(
762 record,
763 symbol_map,
764 publisher_venue_map,
765 symbol_venue_map,
766 instrument_id_map,
767 )?;
768
769 decode_status_msg(msg, instrument_id, Some(ts_init))
770}
771
772fn handle_imbalance_msg(
773 msg: &dbn::ImbalanceMsg,
774 record: &dbn::RecordRef,
775 symbol_map: &PitSymbolMap,
776 publisher_venue_map: &IndexMap<PublisherId, Venue>,
777 symbol_venue_map: &AHashMap<Symbol, Venue>,
778 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
779 ts_init: UnixNanos,
780) -> anyhow::Result<DatabentoImbalance> {
781 let instrument_id = update_instrument_id_map(
782 record,
783 symbol_map,
784 publisher_venue_map,
785 symbol_venue_map,
786 instrument_id_map,
787 )?;
788
789 let price_precision = 2; decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init))
792}
793
794fn handle_statistics_msg(
795 msg: &dbn::StatMsg,
796 record: &dbn::RecordRef,
797 symbol_map: &PitSymbolMap,
798 publisher_venue_map: &IndexMap<PublisherId, Venue>,
799 symbol_venue_map: &AHashMap<Symbol, Venue>,
800 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
801 ts_init: UnixNanos,
802) -> anyhow::Result<DatabentoStatistics> {
803 let instrument_id = update_instrument_id_map(
804 record,
805 symbol_map,
806 publisher_venue_map,
807 symbol_venue_map,
808 instrument_id_map,
809 )?;
810
811 let price_precision = 2; decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
814}
815
816#[allow(clippy::too_many_arguments)]
817fn handle_record(
818 record: dbn::RecordRef,
819 symbol_map: &PitSymbolMap,
820 publisher_venue_map: &IndexMap<PublisherId, Venue>,
821 symbol_venue_map: &AHashMap<Symbol, Venue>,
822 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
823 ts_init: UnixNanos,
824 initialized_books: &HashSet<InstrumentId>,
825 bars_timestamp_on_close: bool,
826) -> anyhow::Result<(Option<Data>, Option<Data>)> {
827 let instrument_id = update_instrument_id_map(
828 &record,
829 symbol_map,
830 publisher_venue_map,
831 symbol_venue_map,
832 instrument_id_map,
833 )?;
834
835 let price_precision = 2; let include_trades = if record.get::<dbn::Mbp1Msg>().is_some()
840 || record.get::<dbn::TbboMsg>().is_some()
841 || record.get::<dbn::Cmbp1Msg>().is_some()
842 {
843 true } else {
845 initialized_books.contains(&instrument_id) };
847
848 decode_record(
849 &record,
850 instrument_id,
851 price_precision,
852 Some(ts_init),
853 include_trades,
854 bars_timestamp_on_close,
855 )
856}
857
858#[cfg(test)]
859mod tests {
860 use databento::live::Subscription;
861 use indexmap::IndexMap;
862 use rstest::*;
863 use time::macros::datetime;
864
865 use super::*;
866
867 fn create_test_handler(reconnect_timeout_mins: Option<u64>) -> DatabentoFeedHandler {
868 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
869 let (msg_tx, _msg_rx) = tokio::sync::mpsc::channel(100);
870
871 DatabentoFeedHandler::new(
872 "test_key".to_string(),
873 "GLBX.MDP3".to_string(),
874 cmd_rx,
875 msg_tx,
876 IndexMap::new(),
877 Arc::new(RwLock::new(AHashMap::new())),
878 false,
879 false,
880 reconnect_timeout_mins,
881 )
882 }
883
884 #[rstest]
885 #[case(Some(10))]
886 #[case(None)]
887 fn test_backoff_initialization(#[case] reconnect_timeout_mins: Option<u64>) {
888 let handler = create_test_handler(reconnect_timeout_mins);
889
890 assert_eq!(handler.reconnect_timeout_mins, reconnect_timeout_mins);
891 assert!(handler.subscriptions.is_empty());
892 assert!(handler.buffered_commands.is_empty());
893 }
894
895 #[rstest]
896 fn test_subscription_with_and_without_start() {
897 let start_time = datetime!(2024-01-01 00:00:00 UTC);
898 let sub_with_start = Subscription::builder()
899 .symbols("ES.FUT")
900 .schema(databento::dbn::Schema::Mbp1)
901 .start(start_time)
902 .build();
903
904 let mut sub_without_start = sub_with_start.clone();
905 sub_without_start.start = None;
906
907 assert!(sub_with_start.start.is_some());
908 assert!(sub_without_start.start.is_none());
909 assert_eq!(sub_with_start.schema, sub_without_start.schema);
910 assert_eq!(sub_with_start.symbols, sub_without_start.symbols);
911 }
912
913 #[rstest]
914 fn test_handler_initialization_state() {
915 let handler = create_test_handler(Some(10));
916
917 assert!(!handler.replay);
918 assert_eq!(handler.dataset, "GLBX.MDP3");
919 assert_eq!(handler.key, "test_key");
920 assert!(handler.subscriptions.is_empty());
921 assert!(handler.buffered_commands.is_empty());
922 }
923
924 #[rstest]
925 fn test_handler_with_no_timeout() {
926 let handler = create_test_handler(None);
927
928 assert_eq!(handler.reconnect_timeout_mins, None);
929 assert!(!handler.replay);
930 }
931
932 #[rstest]
933 fn test_handler_with_zero_timeout() {
934 let handler = create_test_handler(Some(0));
935
936 assert_eq!(handler.reconnect_timeout_mins, Some(0));
937 assert!(!handler.replay);
938 }
939}