1use std::{
17 collections::HashMap,
18 sync::{Arc, RwLock},
19};
20
21use databento::{
22 dbn::{self, PitSymbolMap, Publisher, Record, SymbolIndex, VersionUpgradePolicy},
23 live::Subscription,
24};
25use indexmap::IndexMap;
26use nautilus_core::{UnixNanos, python::to_pyruntime_err, time::get_atomic_clock_realtime};
27use nautilus_model::{
28 data::{Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
29 enums::RecordFlag,
30 identifiers::{InstrumentId, Symbol, Venue},
31 instruments::InstrumentAny,
32};
33use tokio::{sync::mpsc::error::TryRecvError, time::Duration};
34
35use super::{
36 decode::{decode_imbalance_msg, decode_statistics_msg, decode_status_msg},
37 types::{DatabentoImbalance, DatabentoStatistics},
38};
39use crate::{
40 decode::{decode_instrument_def_msg, decode_record},
41 types::PublisherId,
42};
43
44#[derive(Debug)]
45pub enum LiveCommand {
46 Subscribe(Subscription),
47 Start,
48 Close,
49}
50
51#[derive(Debug)]
52#[allow(clippy::large_enum_variant)] pub enum LiveMessage {
54 Data(Data),
55 Instrument(InstrumentAny),
56 Status(InstrumentStatus),
57 Imbalance(DatabentoImbalance),
58 Statistics(DatabentoStatistics),
59 Error(anyhow::Error),
60 Close,
61}
62
63#[derive(Debug)]
69pub struct DatabentoFeedHandler {
70 key: String,
71 dataset: String,
72 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
73 msg_tx: tokio::sync::mpsc::Sender<LiveMessage>,
74 publisher_venue_map: IndexMap<PublisherId, Venue>,
75 symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
76 replay: bool,
77}
78
79impl DatabentoFeedHandler {
80 #[must_use]
82 pub const fn new(
83 key: String,
84 dataset: String,
85 rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
86 tx: tokio::sync::mpsc::Sender<LiveMessage>,
87 publisher_venue_map: IndexMap<PublisherId, Venue>,
88 symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
89 ) -> Self {
90 Self {
91 key,
92 dataset,
93 cmd_rx: rx,
94 msg_tx: tx,
95 publisher_venue_map,
96 symbol_venue_map,
97 replay: false,
98 }
99 }
100
101 pub async fn run(&mut self) -> anyhow::Result<()> {
103 tracing::debug!("Running feed handler");
104 let clock = get_atomic_clock_realtime();
105 let mut symbol_map = PitSymbolMap::new();
106 let mut instrument_id_map: HashMap<u32, InstrumentId> = HashMap::new();
107
108 let mut buffering_start = None;
109 let mut buffered_deltas: HashMap<InstrumentId, Vec<OrderBookDelta>> = HashMap::new();
110 let mut deltas_count = 0_u64;
111 let timeout = Duration::from_secs(5); let result = tokio::time::timeout(
114 timeout,
115 databento::LiveClient::builder()
116 .key(self.key.clone())?
117 .dataset(self.dataset.clone())
118 .upgrade_policy(VersionUpgradePolicy::UpgradeToV2)
119 .build(),
120 )
121 .await?;
122 tracing::info!("Connected");
123
124 let mut client = if let Ok(client) = result {
125 client
126 } else {
127 self.msg_tx.send(LiveMessage::Close).await?;
128 self.cmd_rx.close();
129 return Err(anyhow::anyhow!("Timeout connecting to LSG"));
130 };
131
132 let timeout = Duration::from_millis(10);
134
135 let mut running = false;
137
138 loop {
139 if self.msg_tx.is_closed() {
140 tracing::debug!("Message channel was closed: stopping");
141 break;
142 }
143
144 match self.cmd_rx.try_recv() {
145 Ok(cmd) => {
146 tracing::debug!("Received command: {cmd:?}");
147 match cmd {
148 LiveCommand::Subscribe(sub) => {
149 if !self.replay & sub.start.is_some() {
150 self.replay = true;
151 }
152 client.subscribe(sub).await.map_err(to_pyruntime_err)?;
153 }
154 LiveCommand::Start => {
155 buffering_start = if self.replay {
156 Some(clock.get_time_ns())
157 } else {
158 None
159 };
160 client.start().await.map_err(to_pyruntime_err)?;
161 running = true;
162 tracing::debug!("Started");
163 }
164 LiveCommand::Close => {
165 self.msg_tx.send(LiveMessage::Close).await?;
166 if running {
167 client.close().await.map_err(to_pyruntime_err)?;
168 tracing::debug!("Closed inner client");
169 }
170 break;
171 }
172 }
173 }
174 Err(TryRecvError::Empty) => {} Err(TryRecvError::Disconnected) => {
176 tracing::debug!("Disconnected");
177 break;
178 }
179 }
180
181 if !running {
182 continue;
183 }
184
185 let result = tokio::time::timeout(timeout, client.next_record()).await;
187 let record_opt = match result {
188 Ok(record_opt) => record_opt,
189 Err(_) => continue, };
191
192 let record = match record_opt {
193 Ok(Some(record)) => record,
194 Ok(None) => break, Err(e) => {
196 self.send_msg(LiveMessage::Error(anyhow::anyhow!(e))).await;
199 break;
200 }
201 };
202
203 let ts_init = clock.get_time_ns();
204
205 if let Some(msg) = record.get::<dbn::ErrorMsg>() {
207 handle_error_msg(msg);
208 } else if let Some(msg) = record.get::<dbn::SystemMsg>() {
209 handle_system_msg(msg);
210 } else if let Some(msg) = record.get::<dbn::SymbolMappingMsg>() {
211 instrument_id_map.remove(&msg.hd.instrument_id);
213 handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map);
214 } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
215 let use_exchange_as_venue = false;
217 if use_exchange_as_venue && msg.publisher()? == Publisher::GlbxMdp3Glbx {
218 update_instrument_id_map_with_exchange(
219 &symbol_map,
220 &self.symbol_venue_map,
221 &mut instrument_id_map,
222 msg.hd.instrument_id,
223 msg.exchange()?,
224 );
225 }
226 let data = handle_instrument_def_msg(
227 msg,
228 &record,
229 &symbol_map,
230 &self.publisher_venue_map,
231 &self.symbol_venue_map.read().unwrap(),
232 &mut instrument_id_map,
233 ts_init,
234 )?;
235 self.send_msg(LiveMessage::Instrument(data)).await;
236 } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
237 let data = handle_status_msg(
238 msg,
239 &record,
240 &symbol_map,
241 &self.publisher_venue_map,
242 &self.symbol_venue_map.read().unwrap(),
243 &mut instrument_id_map,
244 ts_init,
245 )?;
246 self.send_msg(LiveMessage::Status(data)).await;
247 } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
248 let data = handle_imbalance_msg(
249 msg,
250 &record,
251 &symbol_map,
252 &self.publisher_venue_map,
253 &self.symbol_venue_map.read().unwrap(),
254 &mut instrument_id_map,
255 ts_init,
256 )?;
257 self.send_msg(LiveMessage::Imbalance(data)).await;
258 } else if let Some(msg) = record.get::<dbn::StatMsg>() {
259 let data = handle_statistics_msg(
260 msg,
261 &record,
262 &symbol_map,
263 &self.publisher_venue_map,
264 &self.symbol_venue_map.read().unwrap(),
265 &mut instrument_id_map,
266 ts_init,
267 )?;
268 self.send_msg(LiveMessage::Statistics(data)).await;
269 } else {
270 let (mut data1, data2) = match handle_record(
271 record,
272 &symbol_map,
273 &self.publisher_venue_map,
274 &self.symbol_venue_map.read().unwrap(),
275 &mut instrument_id_map,
276 ts_init,
277 ) {
278 Ok(decoded) => decoded,
279 Err(e) => {
280 tracing::error!("Error decoding record: {e}");
281 continue;
282 }
283 };
284
285 if let Some(msg) = record.get::<dbn::MboMsg>() {
286 if let Data::Delta(delta) = data1.clone().expect("MBO should decode a delta") {
287 let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
288 buffer.push(delta);
289
290 deltas_count += 1;
291 tracing::trace!(
292 "Buffering delta: {deltas_count} {} {buffering_start:?} flags={}",
293 delta.ts_event,
294 msg.flags.raw(),
295 );
296
297 if !RecordFlag::F_LAST.matches(msg.flags.raw()) {
299 continue; }
301
302 if RecordFlag::F_SNAPSHOT.matches(msg.flags.raw()) {
304 continue; }
306
307 if let Some(start_ns) = buffering_start {
309 if delta.ts_event <= start_ns {
310 continue; }
312 buffering_start = None;
313 }
314
315 let buffer = buffered_deltas.remove(&delta.instrument_id).unwrap();
317 let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
318 let deltas = OrderBookDeltas_API::new(deltas);
319 data1 = Some(Data::Deltas(deltas));
320 }
321 }
322
323 if let Some(data) = data1 {
324 self.send_msg(LiveMessage::Data(data)).await;
325 }
326
327 if let Some(data) = data2 {
328 self.send_msg(LiveMessage::Data(data)).await;
329 }
330 }
331 }
332
333 self.cmd_rx.close();
334 tracing::debug!("Closed command receiver");
335
336 Ok(())
337 }
338
339 async fn send_msg(&mut self, msg: LiveMessage) {
340 tracing::trace!("Sending {msg:?}");
341 match self.msg_tx.send(msg).await {
342 Ok(()) => {}
343 Err(e) => tracing::error!("Error sending message: {e}"),
344 }
345 }
346}
347
348fn handle_error_msg(msg: &dbn::ErrorMsg) {
349 tracing::error!("{msg:?}");
350}
351
352fn handle_system_msg(msg: &dbn::SystemMsg) {
353 tracing::info!("{msg:?}");
354}
355
356fn handle_symbol_mapping_msg(
357 msg: &dbn::SymbolMappingMsg,
358 symbol_map: &mut PitSymbolMap,
359 instrument_id_map: &mut HashMap<u32, InstrumentId>,
360) {
361 symbol_map
363 .on_symbol_mapping(msg)
364 .unwrap_or_else(|_| panic!("Error updating `symbol_map` with {msg:?}"));
365
366 instrument_id_map.remove(&msg.header().instrument_id);
368}
369
370fn update_instrument_id_map_with_exchange(
371 symbol_map: &PitSymbolMap,
372 symbol_venue_map: &RwLock<HashMap<Symbol, Venue>>,
373 instrument_id_map: &mut HashMap<u32, InstrumentId>,
374 raw_instrument_id: u32,
375 exchange: &str,
376) -> InstrumentId {
377 let raw_symbol = symbol_map
378 .get(raw_instrument_id)
379 .expect("Cannot resolve `raw_symbol` from `symbol_map`");
380 let symbol = Symbol::from(raw_symbol.as_str());
381 let venue = Venue::from(exchange);
382 let instrument_id = InstrumentId::new(symbol, venue);
383 symbol_venue_map.write().unwrap().insert(symbol, venue);
384 instrument_id_map.insert(raw_instrument_id, instrument_id);
385 instrument_id
386}
387
388fn update_instrument_id_map(
389 record: &dbn::RecordRef,
390 symbol_map: &PitSymbolMap,
391 publisher_venue_map: &IndexMap<PublisherId, Venue>,
392 symbol_venue_map: &HashMap<Symbol, Venue>,
393 instrument_id_map: &mut HashMap<u32, InstrumentId>,
394) -> InstrumentId {
395 let header = record.header();
396
397 if let Some(&instrument_id) = instrument_id_map.get(&header.instrument_id) {
399 return instrument_id;
400 }
401
402 let raw_symbol = symbol_map
403 .get_for_rec(record)
404 .expect("Cannot resolve `raw_symbol` from `symbol_map`");
405
406 let symbol = Symbol::from_str_unchecked(raw_symbol);
407
408 let publisher_id = header.publisher_id;
409 let venue = match symbol_venue_map.get(&symbol) {
410 Some(venue) => venue,
411 None => publisher_venue_map
412 .get(&publisher_id)
413 .unwrap_or_else(|| panic!("No venue found for `publisher_id` {publisher_id}")),
414 };
415 let instrument_id = InstrumentId::new(symbol, *venue);
416
417 instrument_id_map.insert(header.instrument_id, instrument_id);
418 instrument_id
419}
420
421fn handle_instrument_def_msg(
422 msg: &dbn::InstrumentDefMsg,
423 record: &dbn::RecordRef,
424 symbol_map: &PitSymbolMap,
425 publisher_venue_map: &IndexMap<PublisherId, Venue>,
426 symbol_venue_map: &HashMap<Symbol, Venue>,
427 instrument_id_map: &mut HashMap<u32, InstrumentId>,
428 ts_init: UnixNanos,
429) -> anyhow::Result<InstrumentAny> {
430 let instrument_id = update_instrument_id_map(
431 record,
432 symbol_map,
433 publisher_venue_map,
434 symbol_venue_map,
435 instrument_id_map,
436 );
437
438 decode_instrument_def_msg(msg, instrument_id, ts_init)
439}
440
441fn handle_status_msg(
442 msg: &dbn::StatusMsg,
443 record: &dbn::RecordRef,
444 symbol_map: &PitSymbolMap,
445 publisher_venue_map: &IndexMap<PublisherId, Venue>,
446 symbol_venue_map: &HashMap<Symbol, Venue>,
447 instrument_id_map: &mut HashMap<u32, InstrumentId>,
448 ts_init: UnixNanos,
449) -> anyhow::Result<InstrumentStatus> {
450 let instrument_id = update_instrument_id_map(
451 record,
452 symbol_map,
453 publisher_venue_map,
454 symbol_venue_map,
455 instrument_id_map,
456 );
457
458 decode_status_msg(msg, instrument_id, ts_init)
459}
460
461fn handle_imbalance_msg(
462 msg: &dbn::ImbalanceMsg,
463 record: &dbn::RecordRef,
464 symbol_map: &PitSymbolMap,
465 publisher_venue_map: &IndexMap<PublisherId, Venue>,
466 symbol_venue_map: &HashMap<Symbol, Venue>,
467 instrument_id_map: &mut HashMap<u32, InstrumentId>,
468 ts_init: UnixNanos,
469) -> anyhow::Result<DatabentoImbalance> {
470 let instrument_id = update_instrument_id_map(
471 record,
472 symbol_map,
473 publisher_venue_map,
474 symbol_venue_map,
475 instrument_id_map,
476 );
477
478 let price_precision = 2; decode_imbalance_msg(msg, instrument_id, price_precision, ts_init)
481}
482
483fn handle_statistics_msg(
484 msg: &dbn::StatMsg,
485 record: &dbn::RecordRef,
486 symbol_map: &PitSymbolMap,
487 publisher_venue_map: &IndexMap<PublisherId, Venue>,
488 symbol_venue_map: &HashMap<Symbol, Venue>,
489 instrument_id_map: &mut HashMap<u32, InstrumentId>,
490 ts_init: UnixNanos,
491) -> anyhow::Result<DatabentoStatistics> {
492 let instrument_id = update_instrument_id_map(
493 record,
494 symbol_map,
495 publisher_venue_map,
496 symbol_venue_map,
497 instrument_id_map,
498 );
499
500 let price_precision = 2; decode_statistics_msg(msg, instrument_id, price_precision, ts_init)
503}
504
505fn handle_record(
506 record: dbn::RecordRef,
507 symbol_map: &PitSymbolMap,
508 publisher_venue_map: &IndexMap<PublisherId, Venue>,
509 symbol_venue_map: &HashMap<Symbol, Venue>,
510 instrument_id_map: &mut HashMap<u32, InstrumentId>,
511 ts_init: UnixNanos,
512) -> anyhow::Result<(Option<Data>, Option<Data>)> {
513 let instrument_id = update_instrument_id_map(
514 &record,
515 symbol_map,
516 publisher_venue_map,
517 symbol_venue_map,
518 instrument_id_map,
519 );
520
521 let price_precision = 2; decode_record(
524 &record,
525 instrument_id,
526 price_precision,
527 Some(ts_init),
528 true, )
530}