nautilus_data/engine/
book.rsuse std::{
any::Any,
cell::{Ref, RefCell},
num::NonZeroU64,
rc::Rc,
};
use nautilus_common::{
cache::Cache,
messages::data::DataResponse,
msgbus::{handler::MessageHandler, MessageBus},
timer::TimeEvent,
};
use nautilus_model::{
data::Data,
identifiers::{InstrumentId, Venue},
};
use ustr::Ustr;
#[derive(Clone, Debug)]
pub struct BookSnapshotInfo {
pub instrument_id: InstrumentId,
pub venue: Venue,
pub is_composite: bool,
pub root: Ustr,
pub topic: Ustr,
pub interval_ms: NonZeroU64,
}
pub struct BookUpdater {
pub id: Ustr,
pub instrument_id: InstrumentId,
pub cache: Rc<RefCell<Cache>>,
}
impl BookUpdater {
pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
Self {
id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
instrument_id: *instrument_id,
cache,
}
}
}
impl MessageHandler for BookUpdater {
fn id(&self) -> Ustr {
self.id
}
fn handle(&self, message: &dyn Any) {}
fn handle_response(&self, _resp: DataResponse) {}
fn handle_data(&self, data: Data) {
if let Some(book) = self
.cache
.borrow_mut()
.order_book_mut(&data.instrument_id())
{
match data {
Data::Delta(delta) => book.apply_delta(&delta),
Data::Deltas(deltas) => book.apply_deltas(&deltas),
Data::Depth10(depth) => book.apply_depth(&depth),
_ => log::error!("Invalid data type for book update, was {data:?}"),
}
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
pub struct BookSnapshotter {
pub id: Ustr,
pub timer_name: Ustr,
pub snap_info: BookSnapshotInfo,
pub cache: Rc<RefCell<Cache>>,
pub msgbus: Rc<RefCell<MessageBus>>,
}
impl BookSnapshotter {
pub fn new(
snap_info: BookSnapshotInfo,
cache: Rc<RefCell<Cache>>,
msgbus: Rc<RefCell<MessageBus>>,
) -> Self {
let id_str = format!(
"{}-{}",
stringify!(BookSnapshotter),
snap_info.instrument_id
);
let timer_name = format!(
"OrderBook|{}|{}",
snap_info.instrument_id, snap_info.interval_ms
);
Self {
id: Ustr::from(&id_str),
timer_name: Ustr::from(&timer_name),
snap_info,
cache,
msgbus,
}
}
pub fn snapshot(&self, event: TimeEvent) {
let cache = self.cache.borrow();
if self.snap_info.is_composite {
let msgbus = self.msgbus.borrow_mut();
let topic = self.snap_info.topic;
let underlying = self.snap_info.root;
for instrument in cache.instruments(&self.snap_info.venue, Some(&underlying)) {
self.publish_order_book(&instrument.id(), &topic, &cache);
}
} else {
self.publish_order_book(&self.snap_info.instrument_id, &self.snap_info.topic, &cache);
}
}
fn publish_order_book(
&self,
instrument_id: &InstrumentId,
topic: &Ustr,
cache: &Ref<'_, Cache>,
) {
let book = cache
.order_book(instrument_id)
.unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
if book.count == 0 {
log::debug!("OrderBook for {instrument_id} not yet updated for snapshot");
return;
}
self.msgbus.borrow_mut().publish(topic, book as &dyn Any);
}
}