nautilus_data/engine/
runner.rs1use std::{
17 cell::{OnceCell, RefCell},
18 collections::VecDeque,
19 rc::Rc,
20};
21
22use futures::StreamExt;
23use nautilus_common::{
24 clock::{Clock, LiveClock, TestClock},
25 messages::data::{DataEvent, DataResponse, SubscriptionCommand},
26 runtime::get_runtime,
27 timer::{TimeEvent, TimeEventHandlerV2},
28};
29use nautilus_model::data::GetTsInit;
30use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
31
32use super::DataEngine;
33
34pub trait DataQueue {
35 fn push(&mut self, event: DataEvent);
36}
37
38pub type GlobalDataQueue = Rc<RefCell<dyn DataQueue>>;
39
40pub struct SyncDataQueue(VecDeque<DataEvent>);
41pub struct AsyncDataQueue(UnboundedSender<DataEvent>);
42
43impl DataQueue for SyncDataQueue {
44 fn push(&mut self, event: DataEvent) {
45 self.0.push_back(event);
46 }
47}
48
49impl DataQueue for AsyncDataQueue {
50 fn push(&mut self, event: DataEvent) {
51 if let Err(e) = self.0.send(event) {
52 log::error!("Unable to send data event to async data channel: {e}");
53 }
54 }
55}
56
57#[must_use]
58pub fn get_data_queue() -> Rc<RefCell<dyn DataQueue>> {
59 DATA_QUEUE
60 .try_with(|dq| {
61 dq.get()
62 .expect("Data queue should be initialized by runner")
63 .clone()
64 })
65 .expect("Should be able to access thread local storage")
66}
67
68pub fn set_data_queue(dq: Rc<RefCell<dyn DataQueue>>) {
69 DATA_QUEUE
70 .try_with(|deque| {
71 assert!(deque.set(dq).is_ok(), "Global data queue already set");
72 })
73 .expect("Should be able to access thread local storage");
74}
75
76pub type GlobalClock = Rc<RefCell<dyn Clock>>;
77
78#[must_use]
79pub fn get_clock() -> Rc<RefCell<dyn Clock>> {
80 CLOCK
81 .try_with(|clock| {
82 clock
83 .get()
84 .expect("Clock should be initialized by runner")
85 .clone()
86 })
87 .expect("Should be able to access thread local storage")
88}
89
90pub fn set_clock(c: Rc<RefCell<dyn Clock>>) {
91 CLOCK
92 .try_with(|clock| {
93 assert!(clock.set(c).is_ok(), "Global clock already set");
94 })
95 .expect("Should be able to access thread local clock");
96}
97
98pub type MessageBusCommands = Rc<RefCell<VecDeque<SubscriptionCommand>>>;
99
100#[must_use]
102pub fn get_msgbus_cmd() -> MessageBusCommands {
103 MSGBUS_CMD
104 .try_with(std::clone::Clone::clone)
105 .expect("Should be able to access thread local storage")
106}
107
108thread_local! {
109 static CLOCK: OnceCell<GlobalClock> = OnceCell::new();
110 static DATA_QUEUE: OnceCell<GlobalDataQueue> = OnceCell::new();
111 static MSGBUS_CMD: MessageBusCommands = Rc::new(RefCell::new(VecDeque::new()));
112}
113
114pub trait Runner {
115 fn new() -> Self;
116 fn run(&mut self, engine: &mut DataEngine);
117}
118
119pub trait SendResponse {
120 fn send(&self, resp: DataResponse);
121}
122
123pub type DataResponseQueue = Rc<RefCell<SyncDataQueue>>;
124
125pub struct BacktestRunner {
126 dq: DataResponseQueue,
127 pub clock: Rc<RefCell<TestClock>>,
128}
129
130impl Runner for BacktestRunner {
131 fn new() -> Self {
132 let clock = Rc::new(RefCell::new(TestClock::new()));
133 set_clock(clock.clone());
134
135 let dq = Rc::new(RefCell::new(SyncDataQueue(VecDeque::new())));
136 set_data_queue(dq.clone());
137 Self { dq, clock }
138 }
139
140 fn run(&mut self, engine: &mut DataEngine) {
141 let msgbus_cmd = get_msgbus_cmd();
142
143 while let Some(resp) = self.dq.as_ref().borrow_mut().0.pop_front() {
144 match resp {
145 DataEvent::Response(resp) => engine.response(resp),
146 DataEvent::Data(data) => {
147 while let Some(sub_cmd) = msgbus_cmd.borrow_mut().pop_front() {
148 engine.execute(sub_cmd);
149 }
150
151 let handlers: Vec<TimeEventHandlerV2> = {
153 let mut guard = self.clock.borrow_mut();
154 guard.advance_to_time_on_heap(data.ts_init());
155 guard.by_ref().collect()
156 };
158
159 handlers.into_iter().for_each(TimeEventHandlerV2::run);
161
162 engine.process_data(data);
163 }
164 }
165 }
166 }
167}
168
169pub struct LiveRunner {
170 resp_rx: UnboundedReceiver<DataEvent>,
171 pub clock: Rc<RefCell<LiveClock>>,
172}
173
174impl Runner for LiveRunner {
175 fn new() -> Self {
176 let (resp_tx, resp_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
177 set_data_queue(Rc::new(RefCell::new(AsyncDataQueue(resp_tx))));
178
179 let clock = Rc::new(RefCell::new(LiveClock::new()));
180 set_clock(clock.clone());
181
182 Self { resp_rx, clock }
183 }
184
185 fn run(&mut self, engine: &mut DataEngine) {
186 let mut time_event_stream = self.clock.borrow().get_event_stream();
187 let msgbus_cmd = get_msgbus_cmd();
188
189 loop {
190 while let Some(sub_cmd) = msgbus_cmd.borrow_mut().pop_front() {
191 engine.execute(sub_cmd);
192 }
193
194 let next_event = get_runtime().block_on(async {
196 tokio::select! {
197 Some(resp) = self.resp_rx.recv() => Some(RunnerEvent::Data(resp)),
198 Some(event) = time_event_stream.next() => Some(RunnerEvent::Timer(event)),
199 else => None,
200 }
201 });
202
203 match next_event {
205 Some(RunnerEvent::Data(resp)) => match resp {
206 DataEvent::Response(resp) => engine.response(resp),
207 DataEvent::Data(data) => engine.process_data(data),
208 },
209 Some(RunnerEvent::Timer(event)) => self.clock.borrow().get_handler(event).run(),
210 None => break,
211 }
212 }
213 }
214}
215
216#[allow(clippy::large_enum_variant)]
219enum RunnerEvent {
220 Data(DataEvent),
221 Timer(TimeEvent),
222}
223
224#[cfg(test)]
225#[cfg(feature = "clock_v2")]
226mod tests {
227 use std::{cell::RefCell, rc::Rc};
228
229 use futures::StreamExt;
230 use nautilus_common::{
231 clock::{LiveClock, TestClock},
232 timer::{TimeEvent, TimeEventCallback},
233 };
234
235 use super::{get_clock, set_clock};
236
237 #[test]
238 fn test_global_test_clock() {
239 let test_clock = Rc::new(RefCell::new(TestClock::new()));
240 set_clock(test_clock.clone());
241
242 get_clock().borrow_mut().set_time_alert_ns(
244 "hola",
245 2.into(),
246 Some(TimeEventCallback::Rust(Rc::new(|event: TimeEvent| {}))),
247 );
248
249 test_clock.borrow_mut().advance_to_time_on_heap(3.into());
251 assert!(test_clock.borrow_mut().next().is_some());
252 }
253
254 #[tokio::test]
255 async fn test_global_live_clock() {
256 let live_clock = Rc::new(RefCell::new(LiveClock::new()));
257 set_clock(live_clock.clone());
258 let alert_time = live_clock.borrow().get_time_ns() + 100;
259
260 get_clock().borrow_mut().set_time_alert_ns(
262 "hola",
263 alert_time,
264 Some(TimeEventCallback::Rust(Rc::new(|event: TimeEvent| {}))),
265 );
266
267 assert!(live_clock
269 .borrow()
270 .get_event_stream()
271 .next()
272 .await
273 .is_some());
274 }
275}