1use std::{
17 sync::{Arc, RwLock},
18 time::Duration as StdDuration,
19};
20
21use ahash::{AHashMap, HashSet, HashSetExt};
22use databento::{
23 dbn::{self, PitSymbolMap, Record, SymbolIndex},
24 live::Subscription,
25};
26use indexmap::IndexMap;
27use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
28use nautilus_model::{
29 data::{Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
30 enums::RecordFlag,
31 identifiers::{InstrumentId, Symbol, Venue},
32 instruments::InstrumentAny,
33};
34use nautilus_network::backoff::ExponentialBackoff;
35use tokio::{
36 sync::mpsc::error::TryRecvError,
37 time::{Duration, Instant},
38};
39
40use super::{
41 decode::{decode_imbalance_msg, decode_statistics_msg, decode_status_msg},
42 types::{DatabentoImbalance, DatabentoStatistics, SubscriptionAckEvent},
43};
44use crate::{
45 decode::{decode_instrument_def_msg, decode_record},
46 types::PublisherId,
47};
48
49#[derive(Debug)]
50pub enum LiveCommand {
51 Subscribe(Subscription),
52 Start,
53 Close,
54}
55
56#[derive(Debug)]
57#[allow(
58 clippy::large_enum_variant,
59 reason = "TODO: Optimize this (largest variant 1096 vs 80 bytes)"
60)]
61pub enum LiveMessage {
62 Data(Data),
63 Instrument(InstrumentAny),
64 Status(InstrumentStatus),
65 Imbalance(DatabentoImbalance),
66 Statistics(DatabentoStatistics),
67 SubscriptionAck(SubscriptionAckEvent),
68 Error(anyhow::Error),
69 Close,
70}
71
72#[derive(Debug)]
86pub struct DatabentoFeedHandler {
87 key: String,
88 dataset: String,
89 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
90 msg_tx: tokio::sync::mpsc::Sender<LiveMessage>,
91 publisher_venue_map: IndexMap<PublisherId, Venue>,
92 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
93 replay: bool,
94 use_exchange_as_venue: bool,
95 bars_timestamp_on_close: bool,
96 reconnect_timeout_mins: Option<u64>,
97 backoff: ExponentialBackoff,
98 subscriptions: Vec<Subscription>,
99 buffered_commands: Vec<LiveCommand>,
100}
101
102impl DatabentoFeedHandler {
103 #[must_use]
109 #[allow(clippy::too_many_arguments)]
110 pub fn new(
111 key: String,
112 dataset: String,
113 rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
114 tx: tokio::sync::mpsc::Sender<LiveMessage>,
115 publisher_venue_map: IndexMap<PublisherId, Venue>,
116 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
117 use_exchange_as_venue: bool,
118 bars_timestamp_on_close: bool,
119 reconnect_timeout_mins: Option<u64>,
120 ) -> Self {
121 let delay_max = if reconnect_timeout_mins.is_some() {
125 Duration::from_secs(60)
126 } else {
127 Duration::from_secs(600)
128 };
129
130 let backoff =
132 ExponentialBackoff::new(Duration::from_secs(1), delay_max, 2.0, 1000, false).unwrap();
133
134 Self {
135 key,
136 dataset,
137 cmd_rx: rx,
138 msg_tx: tx,
139 publisher_venue_map,
140 symbol_venue_map,
141 replay: false,
142 use_exchange_as_venue,
143 bars_timestamp_on_close,
144 reconnect_timeout_mins,
145 backoff,
146 subscriptions: Vec::new(),
147 buffered_commands: Vec::new(),
148 }
149 }
150
151 #[allow(clippy::blocks_in_conditions)]
164 pub async fn run(&mut self) -> anyhow::Result<()> {
165 log::debug!("Running feed handler");
166
167 let mut reconnect_start: Option<Instant> = None;
168 let mut attempt = 0;
169
170 loop {
171 attempt += 1;
172
173 match self.run_session(attempt).await {
174 Ok(ran_successfully) => {
175 if ran_successfully {
176 log::info!("Resetting reconnection cycle after successful session");
177 reconnect_start = None;
178 attempt = 0;
179 self.backoff.reset();
180 continue;
181 } else {
182 log::info!("Session ended normally");
183 break Ok(());
184 }
185 }
186 Err(e) => {
187 let cycle_start = reconnect_start.get_or_insert_with(Instant::now);
188
189 if let Some(timeout_mins) = self.reconnect_timeout_mins {
190 let elapsed = cycle_start.elapsed();
191 let timeout = Duration::from_secs(timeout_mins * 60);
192
193 if elapsed >= timeout {
194 log::error!("Giving up reconnection after {timeout_mins} minutes");
195 self.send_msg(LiveMessage::Error(anyhow::anyhow!(
196 "Reconnection timeout after {timeout_mins} minutes: {e}"
197 )))
198 .await;
199 break Err(e);
200 }
201 }
202
203 let delay = self.backoff.next_duration();
204
205 log::warn!(
206 "Connection lost (attempt {}): {}. Reconnecting in {}s...",
207 attempt,
208 e,
209 delay.as_secs()
210 );
211
212 tokio::select! {
213 () = tokio::time::sleep(delay) => {}
214 cmd = self.cmd_rx.recv() => {
215 match cmd {
216 Some(LiveCommand::Close) => {
217 log::info!("Close received during backoff");
218 return Ok(());
219 }
220 None => {
221 log::debug!("Command channel closed during backoff");
222 return Ok(());
223 }
224 Some(cmd) => {
225 log::debug!("Buffering command received during backoff: {cmd:?}");
226 self.buffered_commands.push(cmd);
227 }
228 }
229 }
230 }
231 }
232 }
233 }
234 }
235
236 async fn run_session(&mut self, attempt: usize) -> anyhow::Result<bool> {
245 if attempt > 1 {
246 log::info!("Reconnecting (attempt {attempt})...");
247 }
248
249 let session_start = Instant::now();
250 let clock = get_atomic_clock_realtime();
251 let mut symbol_map = PitSymbolMap::new();
252 let mut instrument_id_map: AHashMap<u32, InstrumentId> = AHashMap::new();
253
254 let mut buffering_start = None;
255 let mut buffered_deltas: AHashMap<InstrumentId, Vec<OrderBookDelta>> = AHashMap::new();
256 let mut initialized_books = HashSet::new();
257 let timeout = Duration::from_secs(5); let result = tokio::time::timeout(
260 timeout,
261 databento::LiveClient::builder()
262 .user_agent_extension(NAUTILUS_USER_AGENT.into())
263 .key(self.key.clone())?
264 .dataset(self.dataset.clone())
265 .build(),
266 )
267 .await?;
268
269 let mut client = match result {
270 Ok(client) => {
271 if attempt > 1 {
272 log::info!("Reconnected successfully");
273 } else {
274 log::info!("Connected");
275 }
276 client
277 }
278 Err(e) => {
279 anyhow::bail!("Failed to connect to Databento LSG: {e}");
280 }
281 };
282
283 let mut start_buffered = false;
285 if !self.buffered_commands.is_empty() {
286 log::info!(
287 "Processing {} buffered commands",
288 self.buffered_commands.len()
289 );
290 for cmd in self.buffered_commands.drain(..) {
291 match cmd {
292 LiveCommand::Subscribe(sub) => {
293 if !self.replay && sub.start.is_some() {
294 self.replay = true;
295 }
296 self.subscriptions.push(sub);
297 }
298 LiveCommand::Start => {
299 start_buffered = true;
300 }
301 LiveCommand::Close => {
302 log::warn!("Close command was buffered, shutting down");
303 return Ok(false);
304 }
305 }
306 }
307 }
308
309 let timeout = Duration::from_millis(10);
310 let mut running = false;
311
312 if !self.subscriptions.is_empty() {
313 log::info!(
314 "Resubscribing to {} subscriptions",
315 self.subscriptions.len()
316 );
317 for sub in self.subscriptions.clone() {
318 client.subscribe(sub).await?;
319 }
320 for sub in &mut self.subscriptions {
322 sub.start = None;
323 }
324 client.start().await?;
325 running = true;
326 log::info!("Resubscription complete");
327 } else if start_buffered {
328 log::info!("Starting session from buffered Start command");
329 buffering_start = if self.replay {
330 Some(clock.get_time_ns())
331 } else {
332 None
333 };
334 client.start().await?;
335 running = true;
336 }
337
338 loop {
339 if self.msg_tx.is_closed() {
340 log::debug!("Message channel was closed: stopping");
341 return Ok(false);
342 }
343
344 match self.cmd_rx.try_recv() {
345 Ok(cmd) => {
346 log::debug!("Received command: {cmd:?}");
347 match cmd {
348 LiveCommand::Subscribe(sub) => {
349 if !self.replay && sub.start.is_some() {
350 self.replay = true;
351 }
352 client.subscribe(sub.clone()).await?;
353 let mut sub_for_reconnect = sub;
355 sub_for_reconnect.start = None;
356 self.subscriptions.push(sub_for_reconnect);
357 }
358 LiveCommand::Start => {
359 buffering_start = if self.replay {
360 Some(clock.get_time_ns())
361 } else {
362 None
363 };
364 client.start().await?;
365 running = true;
366 log::debug!("Started");
367 }
368 LiveCommand::Close => {
369 self.msg_tx.send(LiveMessage::Close).await?;
370 if running {
371 client.close().await?;
372 log::debug!("Closed inner client");
373 }
374 return Ok(false);
375 }
376 }
377 }
378 Err(TryRecvError::Empty) => {}
379 Err(TryRecvError::Disconnected) => {
380 log::debug!("Command channel disconnected");
381 return Ok(false);
382 }
383 }
384
385 if !running {
386 continue;
387 }
388
389 let result = tokio::time::timeout(timeout, client.next_record()).await;
390 let record_opt = match result {
391 Ok(record_opt) => record_opt,
392 Err(_) => continue,
393 };
394
395 let record = match record_opt {
396 Ok(Some(record)) => record,
397 Ok(None) => {
398 const SUCCESS_THRESHOLD: Duration = Duration::from_secs(60);
399 if session_start.elapsed() >= SUCCESS_THRESHOLD {
400 log::info!("Session ended after successful run");
401 return Ok(true);
402 }
403 anyhow::bail!("Session ended by gateway");
404 }
405 Err(e) => {
406 const SUCCESS_THRESHOLD: Duration = Duration::from_secs(60);
407 if session_start.elapsed() >= SUCCESS_THRESHOLD {
408 log::info!("Connection error after successful run: {e}");
409 return Ok(true);
410 }
411 anyhow::bail!("Connection error: {e}");
412 }
413 };
414
415 let ts_init = clock.get_time_ns();
416
417 if let Some(msg) = record.get::<dbn::ErrorMsg>() {
419 handle_error_msg(msg);
420 } else if let Some(msg) = record.get::<dbn::SystemMsg>() {
421 if let Some(ack) = handle_system_msg(msg, ts_init) {
422 self.send_msg(LiveMessage::SubscriptionAck(ack)).await;
423 }
424 } else if let Some(msg) = record.get::<dbn::SymbolMappingMsg>() {
425 instrument_id_map.remove(&msg.hd.instrument_id);
427 handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map)?;
428 } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
429 if self.use_exchange_as_venue {
430 let exchange = msg.exchange()?;
431 if !exchange.is_empty() {
432 update_instrument_id_map_with_exchange(
433 &symbol_map,
434 &self.symbol_venue_map,
435 &mut instrument_id_map,
436 msg.hd.instrument_id,
437 exchange,
438 )?;
439 }
440 }
441 let data = {
442 let sym_map = self.read_symbol_venue_map()?;
443 handle_instrument_def_msg(
444 msg,
445 &record,
446 &symbol_map,
447 &self.publisher_venue_map,
448 &sym_map,
449 &mut instrument_id_map,
450 ts_init,
451 )?
452 };
453 self.send_msg(LiveMessage::Instrument(data)).await;
454 } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
455 let data = {
456 let sym_map = self.read_symbol_venue_map()?;
457 handle_status_msg(
458 msg,
459 &record,
460 &symbol_map,
461 &self.publisher_venue_map,
462 &sym_map,
463 &mut instrument_id_map,
464 ts_init,
465 )?
466 };
467 self.send_msg(LiveMessage::Status(data)).await;
468 } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
469 let data = {
470 let sym_map = self.read_symbol_venue_map()?;
471 handle_imbalance_msg(
472 msg,
473 &record,
474 &symbol_map,
475 &self.publisher_venue_map,
476 &sym_map,
477 &mut instrument_id_map,
478 ts_init,
479 )?
480 };
481 self.send_msg(LiveMessage::Imbalance(data)).await;
482 } else if let Some(msg) = record.get::<dbn::StatMsg>() {
483 let data = {
484 let sym_map = self.read_symbol_venue_map()?;
485 handle_statistics_msg(
486 msg,
487 &record,
488 &symbol_map,
489 &self.publisher_venue_map,
490 &sym_map,
491 &mut instrument_id_map,
492 ts_init,
493 )?
494 };
495 self.send_msg(LiveMessage::Statistics(data)).await;
496 } else {
497 let res = {
499 let sym_map = self.read_symbol_venue_map()?;
500 handle_record(
501 record,
502 &symbol_map,
503 &self.publisher_venue_map,
504 &sym_map,
505 &mut instrument_id_map,
506 ts_init,
507 &initialized_books,
508 self.bars_timestamp_on_close,
509 )
510 };
511 let (mut data1, data2) = match res {
512 Ok(decoded) => decoded,
513 Err(e) => {
514 log::error!("Error decoding record: {e}");
515 continue;
516 }
517 };
518
519 if let Some(msg) = record.get::<dbn::MboMsg>() {
520 if let Some(Data::Delta(delta)) = &data1 {
522 initialized_books.insert(delta.instrument_id);
523 } else {
524 continue; }
526
527 if let Some(Data::Delta(delta)) = &data1 {
528 let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
529 buffer.push(*delta);
530
531 log::trace!(
532 "Buffering delta: {} {buffering_start:?} flags={}",
533 delta.ts_event,
534 msg.flags.raw(),
535 );
536
537 if !RecordFlag::F_LAST.matches(msg.flags.raw()) {
539 continue; }
541
542 if RecordFlag::F_SNAPSHOT.matches(msg.flags.raw()) {
544 continue; }
546
547 if let Some(start_ns) = buffering_start {
549 if delta.ts_event <= start_ns {
550 continue; }
552 buffering_start = None;
553 }
554
555 let buffer =
557 buffered_deltas
558 .remove(&delta.instrument_id)
559 .ok_or_else(|| {
560 anyhow::anyhow!(
561 "Internal error: no buffered deltas for instrument {id}",
562 id = delta.instrument_id
563 )
564 })?;
565 let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
566 let deltas = OrderBookDeltas_API::new(deltas);
567 data1 = Some(Data::Deltas(deltas));
568 }
569 }
570
571 if let Some(data) = data1 {
572 self.send_msg(LiveMessage::Data(data)).await;
573 }
574
575 if let Some(data) = data2 {
576 self.send_msg(LiveMessage::Data(data)).await;
577 }
578 }
579 }
580 }
581
582 async fn send_msg(&mut self, msg: LiveMessage) {
584 log::trace!("Sending {msg:?}");
585 match self.msg_tx.send(msg).await {
586 Ok(()) => {}
587 Err(e) => log::error!("Error sending message: {e}"),
588 }
589 }
590
591 fn read_symbol_venue_map(
597 &self,
598 ) -> anyhow::Result<std::sync::RwLockReadGuard<'_, AHashMap<Symbol, Venue>>> {
599 const MAX_WAIT_MS: u64 = 500; const INITIAL_DELAY_MICROS: u64 = 10;
602 const MAX_DELAY_MICROS: u64 = 1000;
603
604 let deadline = std::time::Instant::now() + StdDuration::from_millis(MAX_WAIT_MS);
605 let mut delay = INITIAL_DELAY_MICROS;
606
607 loop {
608 match self.symbol_venue_map.try_read() {
609 Ok(guard) => return Ok(guard),
610 Err(std::sync::TryLockError::WouldBlock) => {
611 if std::time::Instant::now() >= deadline {
612 break;
613 }
614
615 std::thread::yield_now();
617
618 if std::time::Instant::now() < deadline {
620 let remaining = deadline - std::time::Instant::now();
621 let sleep_duration = StdDuration::from_micros(delay).min(remaining);
622 std::thread::sleep(sleep_duration);
623 delay = ((delay * 2) + delay / 4).min(MAX_DELAY_MICROS);
625 }
626 }
627 Err(std::sync::TryLockError::Poisoned(e)) => {
628 anyhow::bail!("symbol_venue_map lock poisoned: {e}");
629 }
630 }
631 }
632
633 anyhow::bail!(
634 "Failed to acquire read lock on symbol_venue_map after {MAX_WAIT_MS}ms deadline"
635 )
636 }
637}
638
639fn handle_error_msg(msg: &dbn::ErrorMsg) {
641 log::error!("{msg:?}");
642}
643
644fn handle_system_msg(msg: &dbn::SystemMsg, ts_received: UnixNanos) -> Option<SubscriptionAckEvent> {
646 match msg.code() {
647 Ok(dbn::SystemCode::SubscriptionAck) => {
648 let message = msg.msg().unwrap_or("<invalid utf-8>");
649 log::info!("Subscription acknowledged: {message}");
650
651 let schema = parse_ack_message(message);
652
653 Some(SubscriptionAckEvent {
654 schema,
655 message: message.to_string(),
656 ts_received,
657 })
658 }
659 Ok(dbn::SystemCode::Heartbeat) => {
660 log::trace!("Heartbeat received");
661 None
662 }
663 Ok(dbn::SystemCode::SlowReaderWarning) => {
664 let message = msg.msg().unwrap_or("<invalid utf-8>");
665 log::warn!("Slow reader warning: {message}");
666 None
667 }
668 Ok(dbn::SystemCode::ReplayCompleted) => {
669 let message = msg.msg().unwrap_or("<invalid utf-8>");
670 log::info!("Replay completed: {message}");
671 None
672 }
673 _ => {
674 log::debug!("{msg:?}");
675 None
676 }
677 }
678}
679
680fn parse_ack_message(message: &str) -> String {
682 message
684 .strip_prefix("Subscription request ")
685 .and_then(|rest| rest.split_once(" for "))
686 .and_then(|(_, after_num)| after_num.strip_suffix(" data succeeded"))
687 .map(|schema| schema.trim().to_string())
688 .unwrap_or_default()
689}
690
691fn handle_symbol_mapping_msg(
697 msg: &dbn::SymbolMappingMsg,
698 symbol_map: &mut PitSymbolMap,
699 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
700) -> anyhow::Result<()> {
701 symbol_map
702 .on_symbol_mapping(msg)
703 .map_err(|e| anyhow::anyhow!("on_symbol_mapping failed for {msg:?}: {e}"))?;
704 instrument_id_map.remove(&msg.header().instrument_id);
705 Ok(())
706}
707
708fn update_instrument_id_map_with_exchange(
710 symbol_map: &PitSymbolMap,
711 symbol_venue_map: &RwLock<AHashMap<Symbol, Venue>>,
712 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
713 raw_instrument_id: u32,
714 exchange: &str,
715) -> anyhow::Result<InstrumentId> {
716 let raw_symbol = symbol_map.get(raw_instrument_id).ok_or_else(|| {
717 anyhow::anyhow!("Cannot resolve raw_symbol for instrument_id {raw_instrument_id}")
718 })?;
719 let symbol = Symbol::from(raw_symbol.as_str());
720 let venue = Venue::from_code(exchange)
721 .map_err(|e| anyhow::anyhow!("Invalid venue code '{exchange}': {e}"))?;
722 let instrument_id = InstrumentId::new(symbol, venue);
723 let mut map = symbol_venue_map
724 .write()
725 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
726 map.entry(symbol).or_insert(venue);
727 instrument_id_map.insert(raw_instrument_id, instrument_id);
728 Ok(instrument_id)
729}
730
731fn update_instrument_id_map(
732 record: &dbn::RecordRef,
733 symbol_map: &PitSymbolMap,
734 publisher_venue_map: &IndexMap<PublisherId, Venue>,
735 symbol_venue_map: &AHashMap<Symbol, Venue>,
736 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
737) -> anyhow::Result<InstrumentId> {
738 let header = record.header();
739
740 if let Some(&instrument_id) = instrument_id_map.get(&header.instrument_id) {
742 return Ok(instrument_id);
743 }
744
745 let raw_symbol = symbol_map.get_for_rec(record).ok_or_else(|| {
746 anyhow::anyhow!(
747 "Cannot resolve `raw_symbol` from `symbol_map` for instrument_id {}",
748 header.instrument_id
749 )
750 })?;
751
752 let symbol = Symbol::from_str_unchecked(raw_symbol);
753
754 let publisher_id = header.publisher_id;
755 let venue = if let Some(venue) = symbol_venue_map.get(&symbol) {
756 *venue
757 } else {
758 let venue = publisher_venue_map
759 .get(&publisher_id)
760 .ok_or_else(|| anyhow::anyhow!("No venue found for `publisher_id` {publisher_id}"))?;
761 *venue
762 };
763 let instrument_id = InstrumentId::new(symbol, venue);
764
765 instrument_id_map.insert(header.instrument_id, instrument_id);
766 Ok(instrument_id)
767}
768
769fn handle_instrument_def_msg(
775 msg: &dbn::InstrumentDefMsg,
776 record: &dbn::RecordRef,
777 symbol_map: &PitSymbolMap,
778 publisher_venue_map: &IndexMap<PublisherId, Venue>,
779 symbol_venue_map: &AHashMap<Symbol, Venue>,
780 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
781 ts_init: UnixNanos,
782) -> anyhow::Result<InstrumentAny> {
783 let instrument_id = update_instrument_id_map(
784 record,
785 symbol_map,
786 publisher_venue_map,
787 symbol_venue_map,
788 instrument_id_map,
789 )?;
790
791 decode_instrument_def_msg(msg, instrument_id, Some(ts_init))
792}
793
794fn handle_status_msg(
795 msg: &dbn::StatusMsg,
796 record: &dbn::RecordRef,
797 symbol_map: &PitSymbolMap,
798 publisher_venue_map: &IndexMap<PublisherId, Venue>,
799 symbol_venue_map: &AHashMap<Symbol, Venue>,
800 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
801 ts_init: UnixNanos,
802) -> anyhow::Result<InstrumentStatus> {
803 let instrument_id = update_instrument_id_map(
804 record,
805 symbol_map,
806 publisher_venue_map,
807 symbol_venue_map,
808 instrument_id_map,
809 )?;
810
811 decode_status_msg(msg, instrument_id, Some(ts_init))
812}
813
814fn handle_imbalance_msg(
815 msg: &dbn::ImbalanceMsg,
816 record: &dbn::RecordRef,
817 symbol_map: &PitSymbolMap,
818 publisher_venue_map: &IndexMap<PublisherId, Venue>,
819 symbol_venue_map: &AHashMap<Symbol, Venue>,
820 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
821 ts_init: UnixNanos,
822) -> anyhow::Result<DatabentoImbalance> {
823 let instrument_id = update_instrument_id_map(
824 record,
825 symbol_map,
826 publisher_venue_map,
827 symbol_venue_map,
828 instrument_id_map,
829 )?;
830
831 let price_precision = 2; decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init))
834}
835
836fn handle_statistics_msg(
837 msg: &dbn::StatMsg,
838 record: &dbn::RecordRef,
839 symbol_map: &PitSymbolMap,
840 publisher_venue_map: &IndexMap<PublisherId, Venue>,
841 symbol_venue_map: &AHashMap<Symbol, Venue>,
842 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
843 ts_init: UnixNanos,
844) -> anyhow::Result<DatabentoStatistics> {
845 let instrument_id = update_instrument_id_map(
846 record,
847 symbol_map,
848 publisher_venue_map,
849 symbol_venue_map,
850 instrument_id_map,
851 )?;
852
853 let price_precision = 2; decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
856}
857
858#[allow(clippy::too_many_arguments)]
859fn handle_record(
860 record: dbn::RecordRef,
861 symbol_map: &PitSymbolMap,
862 publisher_venue_map: &IndexMap<PublisherId, Venue>,
863 symbol_venue_map: &AHashMap<Symbol, Venue>,
864 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
865 ts_init: UnixNanos,
866 initialized_books: &HashSet<InstrumentId>,
867 bars_timestamp_on_close: bool,
868) -> anyhow::Result<(Option<Data>, Option<Data>)> {
869 let instrument_id = update_instrument_id_map(
870 &record,
871 symbol_map,
872 publisher_venue_map,
873 symbol_venue_map,
874 instrument_id_map,
875 )?;
876
877 let price_precision = 2; let include_trades = if record.get::<dbn::Mbp1Msg>().is_some()
882 || record.get::<dbn::TbboMsg>().is_some()
883 || record.get::<dbn::Cmbp1Msg>().is_some()
884 {
885 true } else {
887 initialized_books.contains(&instrument_id) };
889
890 decode_record(
891 &record,
892 instrument_id,
893 price_precision,
894 Some(ts_init),
895 include_trades,
896 bars_timestamp_on_close,
897 )
898}
899
900#[cfg(test)]
901mod tests {
902 use databento::live::Subscription;
903 use indexmap::IndexMap;
904 use rstest::*;
905 use time::macros::datetime;
906
907 use super::*;
908
909 fn create_test_handler(reconnect_timeout_mins: Option<u64>) -> DatabentoFeedHandler {
910 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
911 let (msg_tx, _msg_rx) = tokio::sync::mpsc::channel(100);
912
913 DatabentoFeedHandler::new(
914 "test_key".to_string(),
915 "GLBX.MDP3".to_string(),
916 cmd_rx,
917 msg_tx,
918 IndexMap::new(),
919 Arc::new(RwLock::new(AHashMap::new())),
920 false,
921 false,
922 reconnect_timeout_mins,
923 )
924 }
925
926 #[rstest]
927 #[case(Some(10))]
928 #[case(None)]
929 fn test_backoff_initialization(#[case] reconnect_timeout_mins: Option<u64>) {
930 let handler = create_test_handler(reconnect_timeout_mins);
931
932 assert_eq!(handler.reconnect_timeout_mins, reconnect_timeout_mins);
933 assert!(handler.subscriptions.is_empty());
934 assert!(handler.buffered_commands.is_empty());
935 }
936
937 #[rstest]
938 fn test_subscription_with_and_without_start() {
939 let start_time = datetime!(2024-01-01 00:00:00 UTC);
940 let sub_with_start = Subscription::builder()
941 .symbols("ES.FUT")
942 .schema(databento::dbn::Schema::Mbp1)
943 .start(start_time)
944 .build();
945
946 let mut sub_without_start = sub_with_start.clone();
947 sub_without_start.start = None;
948
949 assert!(sub_with_start.start.is_some());
950 assert!(sub_without_start.start.is_none());
951 assert_eq!(sub_with_start.schema, sub_without_start.schema);
952 assert_eq!(sub_with_start.symbols, sub_without_start.symbols);
953 }
954
955 #[rstest]
956 fn test_handler_initialization_state() {
957 let handler = create_test_handler(Some(10));
958
959 assert!(!handler.replay);
960 assert_eq!(handler.dataset, "GLBX.MDP3");
961 assert_eq!(handler.key, "test_key");
962 assert!(handler.subscriptions.is_empty());
963 assert!(handler.buffered_commands.is_empty());
964 }
965
966 #[rstest]
967 fn test_handler_with_no_timeout() {
968 let handler = create_test_handler(None);
969
970 assert_eq!(handler.reconnect_timeout_mins, None);
971 assert!(!handler.replay);
972 }
973
974 #[rstest]
975 fn test_handler_with_zero_timeout() {
976 let handler = create_test_handler(Some(0));
977
978 assert_eq!(handler.reconnect_timeout_mins, Some(0));
979 assert!(!handler.replay);
980 }
981}