nautilus_live/
runner.rs
1use std::{
17 cell::{OnceCell, RefCell},
18 collections::VecDeque,
19 rc::Rc,
20};
21
22use futures::StreamExt;
23use nautilus_common::{
24 clock::{Clock, LiveClock},
25 messages::data::{DataCommand, DataResponse, SubscribeCommand},
26 runner::{DataEvent, DataQueue, GlobalDataQueue, RunnerEvent, SyncDataQueue},
27 runtime::get_runtime,
28};
29use nautilus_data::engine::DataEngine;
30use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
31
32pub struct AsyncDataQueue(UnboundedSender<DataEvent>);
33
34impl DataQueue for AsyncDataQueue {
35 fn push(&mut self, event: DataEvent) {
36 if let Err(e) = self.0.send(event) {
37 log::error!("Unable to send data event to async data channel: {e}");
38 }
39 }
40}
41
42#[must_use]
43pub fn get_data_queue() -> Rc<RefCell<dyn DataQueue>> {
44 DATA_QUEUE
45 .try_with(|dq| {
46 dq.get()
47 .expect("Data queue should be initialized by runner")
48 .clone()
49 })
50 .expect("Should be able to access thread local storage")
51}
52
53pub fn set_data_queue(dq: Rc<RefCell<dyn DataQueue>>) {
54 DATA_QUEUE
55 .try_with(|deque| {
56 assert!(deque.set(dq).is_ok(), "Global data queue already set");
57 })
58 .expect("Should be able to access thread local storage");
59}
60
61pub type GlobalClock = Rc<RefCell<dyn Clock>>;
62
63#[must_use]
64pub fn get_clock() -> Rc<RefCell<dyn Clock>> {
65 CLOCK
66 .try_with(|clock| {
67 clock
68 .get()
69 .expect("Clock should be initialized by runner")
70 .clone()
71 })
72 .expect("Should be able to access thread local storage")
73}
74
75pub fn set_clock(c: Rc<RefCell<dyn Clock>>) {
76 CLOCK
77 .try_with(|clock| {
78 assert!(clock.set(c).is_ok(), "Global clock already set");
79 })
80 .expect("Should be able to access thread local clock");
81}
82
83pub type MessageBusCommands = Rc<RefCell<VecDeque<SubscribeCommand>>>;
84
85#[must_use]
87pub fn get_msgbus_cmd() -> MessageBusCommands {
88 MSGBUS_CMD
89 .try_with(std::clone::Clone::clone)
90 .expect("Should be able to access thread local storage")
91}
92
93thread_local! {
94 static CLOCK: OnceCell<GlobalClock> = OnceCell::new();
95 static DATA_QUEUE: OnceCell<GlobalDataQueue> = OnceCell::new();
96 static MSGBUS_CMD: MessageBusCommands = Rc::new(RefCell::new(VecDeque::new()));
97}
98
99pub trait Runner {
101 fn new() -> Self;
102 fn run(&mut self, engine: &mut DataEngine);
103}
104
105pub trait SendResponse {
106 fn send(&self, resp: DataResponse);
107}
108
109pub type DataResponseQueue = Rc<RefCell<SyncDataQueue>>;
110
111pub struct LiveRunner {
112 resp_rx: UnboundedReceiver<DataEvent>,
113 pub clock: Rc<RefCell<LiveClock>>,
114}
115
116impl Runner for LiveRunner {
117 fn new() -> Self {
118 let (resp_tx, resp_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
119 set_data_queue(Rc::new(RefCell::new(AsyncDataQueue(resp_tx))));
120
121 let clock = Rc::new(RefCell::new(LiveClock::new()));
122 set_clock(clock.clone());
123
124 Self { resp_rx, clock }
125 }
126
127 fn run(&mut self, engine: &mut DataEngine) {
128 let mut time_event_stream = self.clock.borrow().get_event_stream();
129 let msgbus_cmd = get_msgbus_cmd();
130
131 loop {
132 while let Some(cmd) = msgbus_cmd.borrow_mut().pop_front() {
133 engine.execute(DataCommand::Subscribe(cmd)); }
135
136 let next_event = get_runtime().block_on(async {
138 tokio::select! {
139 Some(resp) = self.resp_rx.recv() => Some(RunnerEvent::Data(resp)),
140 Some(event) = time_event_stream.next() => Some(RunnerEvent::Timer(event)),
141 else => None,
142 }
143 });
144
145 match next_event {
147 Some(RunnerEvent::Data(resp)) => match resp {
148 DataEvent::Response(resp) => engine.response(resp),
149 DataEvent::Data(data) => engine.process_data(data),
150 },
151 Some(RunnerEvent::Timer(event)) => self.clock.borrow().get_handler(event).run(),
152 None => break,
153 }
154 }
155 }
156}
157
158#[cfg(test)]
159#[cfg(feature = "clock_v2")]
160mod tests {
161 use std::{cell::RefCell, rc::Rc};
162
163 use futures::StreamExt;
164 use nautilus_common::{
165 clock::LiveClock,
166 timer::{TimeEvent, TimeEventCallback},
167 };
168
169 use super::{get_clock, set_clock};
170
171 #[tokio::test]
172 async fn test_global_live_clock() {
173 let live_clock = Rc::new(RefCell::new(LiveClock::new()));
174 set_clock(live_clock.clone());
175 let alert_time = live_clock.borrow().get_time_ns() + 100;
176
177 get_clock().borrow_mut().set_time_alert_ns(
179 "hola",
180 alert_time,
181 Some(TimeEventCallback::Rust(Rc::new(|event: TimeEvent| {}))),
182 );
183
184 assert!(
186 live_clock
187 .borrow()
188 .get_event_stream()
189 .next()
190 .await
191 .is_some()
192 );
193 }
194}