nautilus_databento/
live.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    sync::{Arc, RwLock},
19};
20
21use ahash::{HashSet, HashSetExt};
22use databento::{
23    dbn::{self, PitSymbolMap, Record, SymbolIndex, VersionUpgradePolicy},
24    live::Subscription,
25};
26use indexmap::IndexMap;
27use nautilus_core::{UnixNanos, python::to_pyruntime_err, time::get_atomic_clock_realtime};
28use nautilus_model::{
29    data::{Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
30    enums::RecordFlag,
31    identifiers::{InstrumentId, Symbol, Venue},
32    instruments::InstrumentAny,
33};
34use tokio::{sync::mpsc::error::TryRecvError, time::Duration};
35
36use super::{
37    decode::{decode_imbalance_msg, decode_statistics_msg, decode_status_msg},
38    types::{DatabentoImbalance, DatabentoStatistics},
39};
40use crate::{
41    decode::{decode_instrument_def_msg, decode_record},
42    types::PublisherId,
43};
44
45#[derive(Debug)]
46pub enum LiveCommand {
47    Subscribe(Subscription),
48    Start,
49    Close,
50}
51
52#[derive(Debug)]
53#[allow(clippy::large_enum_variant)] // TODO: Optimize this (largest variant 1096 vs 80 bytes)
54pub enum LiveMessage {
55    Data(Data),
56    Instrument(InstrumentAny),
57    Status(InstrumentStatus),
58    Imbalance(DatabentoImbalance),
59    Statistics(DatabentoStatistics),
60    Error(anyhow::Error),
61    Close,
62}
63
64/// Handles a raw TCP data feed from the Databento LSG for a single dataset.
65///
66/// [`LiveCommand`] messages are recieved synchronously across a channel,
67/// decoded records are sent asynchronously on a tokio channel as [`LiveMessage`]s
68/// back to a message processing task.
69#[derive(Debug)]
70pub struct DatabentoFeedHandler {
71    key: String,
72    dataset: String,
73    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
74    msg_tx: tokio::sync::mpsc::Sender<LiveMessage>,
75    publisher_venue_map: IndexMap<PublisherId, Venue>,
76    symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
77    replay: bool,
78    use_exchange_as_venue: bool,
79}
80
81impl DatabentoFeedHandler {
82    /// Creates a new [`DatabentoFeedHandler`] instance.
83    #[must_use]
84    pub const fn new(
85        key: String,
86        dataset: String,
87        rx: tokio::sync::mpsc::UnboundedReceiver<LiveCommand>,
88        tx: tokio::sync::mpsc::Sender<LiveMessage>,
89        publisher_venue_map: IndexMap<PublisherId, Venue>,
90        symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
91        use_exchange_as_venue: bool,
92    ) -> Self {
93        Self {
94            key,
95            dataset,
96            cmd_rx: rx,
97            msg_tx: tx,
98            publisher_venue_map,
99            symbol_venue_map,
100            replay: false,
101            use_exchange_as_venue,
102        }
103    }
104
105    /// Run the feed handler to begin listening for commands and processing messages.
106    pub async fn run(&mut self) -> anyhow::Result<()> {
107        tracing::debug!("Running feed handler");
108        let clock = get_atomic_clock_realtime();
109        let mut symbol_map = PitSymbolMap::new();
110        let mut instrument_id_map: HashMap<u32, InstrumentId> = HashMap::new();
111
112        let mut buffering_start = None;
113        let mut buffered_deltas: HashMap<InstrumentId, Vec<OrderBookDelta>> = HashMap::new();
114        let mut deltas_count = 0_u64;
115        let timeout = Duration::from_secs(5); // Hard-coded timeout for now
116
117        let result = tokio::time::timeout(
118            timeout,
119            databento::LiveClient::builder()
120                .key(self.key.clone())?
121                .dataset(self.dataset.clone())
122                .upgrade_policy(VersionUpgradePolicy::UpgradeToV2)
123                .build(),
124        )
125        .await?;
126        tracing::info!("Connected");
127
128        let mut client = if let Ok(client) = result {
129            client
130        } else {
131            self.msg_tx.send(LiveMessage::Close).await?;
132            self.cmd_rx.close();
133            anyhow::bail!("Timeout connecting to LSG");
134        };
135
136        // Timeout awaiting the next record before checking for a command
137        let timeout = Duration::from_millis(10);
138
139        // Flag to control whether to continue to await next record
140        let mut running = false;
141
142        loop {
143            if self.msg_tx.is_closed() {
144                tracing::debug!("Message channel was closed: stopping");
145                break;
146            }
147
148            match self.cmd_rx.try_recv() {
149                Ok(cmd) => {
150                    tracing::debug!("Received command: {cmd:?}");
151                    match cmd {
152                        LiveCommand::Subscribe(sub) => {
153                            if !self.replay & sub.start.is_some() {
154                                self.replay = true;
155                            }
156                            client.subscribe(sub).await.map_err(to_pyruntime_err)?;
157                        }
158                        LiveCommand::Start => {
159                            buffering_start = if self.replay {
160                                Some(clock.get_time_ns())
161                            } else {
162                                None
163                            };
164                            client.start().await.map_err(to_pyruntime_err)?;
165                            running = true;
166                            tracing::debug!("Started");
167                        }
168                        LiveCommand::Close => {
169                            self.msg_tx.send(LiveMessage::Close).await?;
170                            if running {
171                                client.close().await.map_err(to_pyruntime_err)?;
172                                tracing::debug!("Closed inner client");
173                            }
174                            break;
175                        }
176                    }
177                }
178                Err(TryRecvError::Empty) => {} // No command yet
179                Err(TryRecvError::Disconnected) => {
180                    tracing::debug!("Disconnected");
181                    break;
182                }
183            }
184
185            if !running {
186                continue;
187            }
188
189            // Await the next record with a timeout
190            let result = tokio::time::timeout(timeout, client.next_record()).await;
191            let record_opt = match result {
192                Ok(record_opt) => record_opt,
193                Err(_) => continue, // Timeout
194            };
195
196            let record = match record_opt {
197                Ok(Some(record)) => record,
198                Ok(None) => break, // Session ended normally
199                Err(e) => {
200                    // Fail the session entirely for now. Consider refining
201                    // this strategy to handle specific errors more gracefully.
202                    self.send_msg(LiveMessage::Error(anyhow::anyhow!(e))).await;
203                    break;
204                }
205            };
206
207            let ts_init = clock.get_time_ns();
208            let mut initialized_books = HashSet::new();
209
210            // Decode record
211            if let Some(msg) = record.get::<dbn::ErrorMsg>() {
212                handle_error_msg(msg);
213            } else if let Some(msg) = record.get::<dbn::SystemMsg>() {
214                handle_system_msg(msg);
215            } else if let Some(msg) = record.get::<dbn::SymbolMappingMsg>() {
216                // Remove instrument ID index as the raw symbol may have changed
217                instrument_id_map.remove(&msg.hd.instrument_id);
218                handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map);
219            } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
220                if self.use_exchange_as_venue {
221                    update_instrument_id_map_with_exchange(
222                        &symbol_map,
223                        &self.symbol_venue_map,
224                        &mut instrument_id_map,
225                        msg.hd.instrument_id,
226                        msg.exchange()?,
227                    );
228                }
229                let data = handle_instrument_def_msg(
230                    msg,
231                    &record,
232                    &symbol_map,
233                    &self.publisher_venue_map,
234                    &self.symbol_venue_map.read().unwrap(),
235                    &mut instrument_id_map,
236                    ts_init,
237                )?;
238                self.send_msg(LiveMessage::Instrument(data)).await;
239            } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
240                let data = handle_status_msg(
241                    msg,
242                    &record,
243                    &symbol_map,
244                    &self.publisher_venue_map,
245                    &self.symbol_venue_map.read().unwrap(),
246                    &mut instrument_id_map,
247                    ts_init,
248                )?;
249                self.send_msg(LiveMessage::Status(data)).await;
250            } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
251                let data = handle_imbalance_msg(
252                    msg,
253                    &record,
254                    &symbol_map,
255                    &self.publisher_venue_map,
256                    &self.symbol_venue_map.read().unwrap(),
257                    &mut instrument_id_map,
258                    ts_init,
259                )?;
260                self.send_msg(LiveMessage::Imbalance(data)).await;
261            } else if let Some(msg) = record.get::<dbn::StatMsg>() {
262                let data = handle_statistics_msg(
263                    msg,
264                    &record,
265                    &symbol_map,
266                    &self.publisher_venue_map,
267                    &self.symbol_venue_map.read().unwrap(),
268                    &mut instrument_id_map,
269                    ts_init,
270                )?;
271                self.send_msg(LiveMessage::Statistics(data)).await;
272            } else {
273                let (mut data1, data2) = match handle_record(
274                    record,
275                    &symbol_map,
276                    &self.publisher_venue_map,
277                    &self.symbol_venue_map.read().unwrap(),
278                    &mut instrument_id_map,
279                    ts_init,
280                    &initialized_books,
281                ) {
282                    Ok(decoded) => decoded,
283                    Err(e) => {
284                        tracing::error!("Error decoding record: {e}");
285                        continue;
286                    }
287                };
288
289                if let Some(msg) = record.get::<dbn::MboMsg>() {
290                    // Check if should mark book initialized
291                    if let Some(Data::Delta(delta)) = &data1 {
292                        initialized_books.insert(delta.instrument_id);
293                    } else {
294                        continue; // No delta yet
295                    }
296
297                    if let Data::Delta(delta) = data1.clone().expect("MBO should decode a delta") {
298                        let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
299                        buffer.push(delta);
300
301                        deltas_count += 1;
302                        tracing::trace!(
303                            "Buffering delta: {deltas_count} {} {buffering_start:?} flags={}",
304                            delta.ts_event,
305                            msg.flags.raw(),
306                        );
307
308                        // Check if last message in the book event
309                        if !RecordFlag::F_LAST.matches(msg.flags.raw()) {
310                            continue; // NOT last message
311                        }
312
313                        // Check if snapshot
314                        if RecordFlag::F_SNAPSHOT.matches(msg.flags.raw()) {
315                            continue; // Buffer snapshot
316                        }
317
318                        // Check if buffering a replay
319                        if let Some(start_ns) = buffering_start {
320                            if delta.ts_event <= start_ns {
321                                continue; // Continue buffering replay
322                            }
323                            buffering_start = None;
324                        }
325
326                        // SAFETY: We can guarantee a deltas vec exists
327                        let buffer = buffered_deltas.remove(&delta.instrument_id).unwrap();
328                        let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
329                        let deltas = OrderBookDeltas_API::new(deltas);
330                        data1 = Some(Data::Deltas(deltas));
331                    }
332                }
333
334                if let Some(data) = data1 {
335                    self.send_msg(LiveMessage::Data(data)).await;
336                }
337
338                if let Some(data) = data2 {
339                    self.send_msg(LiveMessage::Data(data)).await;
340                }
341            }
342        }
343
344        self.cmd_rx.close();
345        tracing::debug!("Closed command receiver");
346
347        Ok(())
348    }
349
350    async fn send_msg(&mut self, msg: LiveMessage) {
351        tracing::trace!("Sending {msg:?}");
352        match self.msg_tx.send(msg).await {
353            Ok(()) => {}
354            Err(e) => tracing::error!("Error sending message: {e}"),
355        }
356    }
357}
358
359fn handle_error_msg(msg: &dbn::ErrorMsg) {
360    tracing::error!("{msg:?}");
361}
362
363fn handle_system_msg(msg: &dbn::SystemMsg) {
364    tracing::info!("{msg:?}");
365}
366
367fn handle_symbol_mapping_msg(
368    msg: &dbn::SymbolMappingMsg,
369    symbol_map: &mut PitSymbolMap,
370    instrument_id_map: &mut HashMap<u32, InstrumentId>,
371) {
372    // Update the symbol map
373    symbol_map
374        .on_symbol_mapping(msg)
375        .unwrap_or_else(|_| panic!("Error updating `symbol_map` with {msg:?}"));
376
377    // Remove current entry for instrument
378    instrument_id_map.remove(&msg.header().instrument_id);
379}
380
381fn update_instrument_id_map_with_exchange(
382    symbol_map: &PitSymbolMap,
383    symbol_venue_map: &RwLock<HashMap<Symbol, Venue>>,
384    instrument_id_map: &mut HashMap<u32, InstrumentId>,
385    raw_instrument_id: u32,
386    exchange: &str,
387) -> InstrumentId {
388    let raw_symbol = symbol_map
389        .get(raw_instrument_id)
390        .expect("Cannot resolve `raw_symbol` from `symbol_map`");
391    let symbol = Symbol::from(raw_symbol.as_str());
392    let venue = Venue::from(exchange);
393    let instrument_id = InstrumentId::new(symbol, venue);
394    symbol_venue_map
395        .write()
396        .unwrap()
397        .entry(symbol)
398        .or_insert(venue);
399    instrument_id_map.insert(raw_instrument_id, instrument_id);
400    instrument_id
401}
402
403fn update_instrument_id_map(
404    record: &dbn::RecordRef,
405    symbol_map: &PitSymbolMap,
406    publisher_venue_map: &IndexMap<PublisherId, Venue>,
407    symbol_venue_map: &HashMap<Symbol, Venue>,
408    instrument_id_map: &mut HashMap<u32, InstrumentId>,
409) -> InstrumentId {
410    let header = record.header();
411
412    // Check if instrument ID is already in the map
413    if let Some(&instrument_id) = instrument_id_map.get(&header.instrument_id) {
414        return instrument_id;
415    }
416
417    let raw_symbol = symbol_map
418        .get_for_rec(record)
419        .expect("Cannot resolve `raw_symbol` from `symbol_map`");
420
421    let symbol = Symbol::from_str_unchecked(raw_symbol);
422
423    let publisher_id = header.publisher_id;
424    let venue = match symbol_venue_map.get(&symbol) {
425        Some(venue) => venue,
426        None => publisher_venue_map
427            .get(&publisher_id)
428            .unwrap_or_else(|| panic!("No venue found for `publisher_id` {publisher_id}")),
429    };
430    let instrument_id = InstrumentId::new(symbol, *venue);
431
432    instrument_id_map.insert(header.instrument_id, instrument_id);
433    instrument_id
434}
435
436fn handle_instrument_def_msg(
437    msg: &dbn::InstrumentDefMsg,
438    record: &dbn::RecordRef,
439    symbol_map: &PitSymbolMap,
440    publisher_venue_map: &IndexMap<PublisherId, Venue>,
441    symbol_venue_map: &HashMap<Symbol, Venue>,
442    instrument_id_map: &mut HashMap<u32, InstrumentId>,
443    ts_init: UnixNanos,
444) -> anyhow::Result<InstrumentAny> {
445    let instrument_id = update_instrument_id_map(
446        record,
447        symbol_map,
448        publisher_venue_map,
449        symbol_venue_map,
450        instrument_id_map,
451    );
452
453    decode_instrument_def_msg(msg, instrument_id, ts_init)
454}
455
456fn handle_status_msg(
457    msg: &dbn::StatusMsg,
458    record: &dbn::RecordRef,
459    symbol_map: &PitSymbolMap,
460    publisher_venue_map: &IndexMap<PublisherId, Venue>,
461    symbol_venue_map: &HashMap<Symbol, Venue>,
462    instrument_id_map: &mut HashMap<u32, InstrumentId>,
463    ts_init: UnixNanos,
464) -> anyhow::Result<InstrumentStatus> {
465    let instrument_id = update_instrument_id_map(
466        record,
467        symbol_map,
468        publisher_venue_map,
469        symbol_venue_map,
470        instrument_id_map,
471    );
472
473    decode_status_msg(msg, instrument_id, ts_init)
474}
475
476fn handle_imbalance_msg(
477    msg: &dbn::ImbalanceMsg,
478    record: &dbn::RecordRef,
479    symbol_map: &PitSymbolMap,
480    publisher_venue_map: &IndexMap<PublisherId, Venue>,
481    symbol_venue_map: &HashMap<Symbol, Venue>,
482    instrument_id_map: &mut HashMap<u32, InstrumentId>,
483    ts_init: UnixNanos,
484) -> anyhow::Result<DatabentoImbalance> {
485    let instrument_id = update_instrument_id_map(
486        record,
487        symbol_map,
488        publisher_venue_map,
489        symbol_venue_map,
490        instrument_id_map,
491    );
492
493    let price_precision = 2; // Hard-coded for now
494
495    decode_imbalance_msg(msg, instrument_id, price_precision, ts_init)
496}
497
498fn handle_statistics_msg(
499    msg: &dbn::StatMsg,
500    record: &dbn::RecordRef,
501    symbol_map: &PitSymbolMap,
502    publisher_venue_map: &IndexMap<PublisherId, Venue>,
503    symbol_venue_map: &HashMap<Symbol, Venue>,
504    instrument_id_map: &mut HashMap<u32, InstrumentId>,
505    ts_init: UnixNanos,
506) -> anyhow::Result<DatabentoStatistics> {
507    let instrument_id = update_instrument_id_map(
508        record,
509        symbol_map,
510        publisher_venue_map,
511        symbol_venue_map,
512        instrument_id_map,
513    );
514
515    let price_precision = 2; // Hard-coded for now
516
517    decode_statistics_msg(msg, instrument_id, price_precision, ts_init)
518}
519
520fn handle_record(
521    record: dbn::RecordRef,
522    symbol_map: &PitSymbolMap,
523    publisher_venue_map: &IndexMap<PublisherId, Venue>,
524    symbol_venue_map: &HashMap<Symbol, Venue>,
525    instrument_id_map: &mut HashMap<u32, InstrumentId>,
526    ts_init: UnixNanos,
527    initialized_books: &HashSet<InstrumentId>,
528) -> anyhow::Result<(Option<Data>, Option<Data>)> {
529    let instrument_id = update_instrument_id_map(
530        &record,
531        symbol_map,
532        publisher_venue_map,
533        symbol_venue_map,
534        instrument_id_map,
535    );
536
537    let price_precision = 2; // Hard-coded for now
538    let include_trades = initialized_books.contains(&instrument_id);
539
540    decode_record(
541        &record,
542        instrument_id,
543        price_precision,
544        Some(ts_init),
545        include_trades,
546    )
547}