nautilus_data/engine/
runner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::{OnceCell, RefCell},
18    collections::VecDeque,
19    rc::Rc,
20};
21
22use futures::StreamExt;
23use nautilus_common::{
24    clock::{Clock, LiveClock, TestClock},
25    messages::data::{DataEvent, DataResponse, SubscriptionCommand},
26    runtime::get_runtime,
27    timer::{TimeEvent, TimeEventHandlerV2},
28};
29use nautilus_model::data::GetTsInit;
30use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
31
32use super::DataEngine;
33
34pub trait DataQueue {
35    fn push(&mut self, event: DataEvent);
36}
37
38pub type GlobalDataQueue = Rc<RefCell<dyn DataQueue>>;
39
40pub struct SyncDataQueue(VecDeque<DataEvent>);
41pub struct AsyncDataQueue(UnboundedSender<DataEvent>);
42
43impl DataQueue for SyncDataQueue {
44    fn push(&mut self, event: DataEvent) {
45        self.0.push_back(event);
46    }
47}
48
49impl DataQueue for AsyncDataQueue {
50    fn push(&mut self, event: DataEvent) {
51        if let Err(e) = self.0.send(event) {
52            log::error!("Unable to send data event to async data channel: {e}");
53        }
54    }
55}
56
57#[must_use]
58pub fn get_data_queue() -> Rc<RefCell<dyn DataQueue>> {
59    DATA_QUEUE
60        .try_with(|dq| {
61            dq.get()
62                .expect("Data queue should be initialized by runner")
63                .clone()
64        })
65        .expect("Should be able to access thread local storage")
66}
67
68pub fn set_data_queue(dq: Rc<RefCell<dyn DataQueue>>) {
69    DATA_QUEUE
70        .try_with(|deque| {
71            assert!(deque.set(dq).is_ok(), "Global data queue already set");
72        })
73        .expect("Should be able to access thread local storage");
74}
75
76pub type GlobalClock = Rc<RefCell<dyn Clock>>;
77
78#[must_use]
79pub fn get_clock() -> Rc<RefCell<dyn Clock>> {
80    CLOCK
81        .try_with(|clock| {
82            clock
83                .get()
84                .expect("Clock should be initialized by runner")
85                .clone()
86        })
87        .expect("Should be able to access thread local storage")
88}
89
90pub fn set_clock(c: Rc<RefCell<dyn Clock>>) {
91    CLOCK
92        .try_with(|clock| {
93            assert!(clock.set(c).is_ok(), "Global clock already set");
94        })
95        .expect("Should be able to access thread local clock");
96}
97
98pub type MessageBusCommands = Rc<RefCell<VecDeque<SubscriptionCommand>>>;
99
100/// Get globally shared message bus command queue
101#[must_use]
102pub fn get_msgbus_cmd() -> MessageBusCommands {
103    MSGBUS_CMD
104        .try_with(std::clone::Clone::clone)
105        .expect("Should be able to access thread local storage")
106}
107
108thread_local! {
109    static CLOCK: OnceCell<GlobalClock> = OnceCell::new();
110    static DATA_QUEUE: OnceCell<GlobalDataQueue> = OnceCell::new();
111    static MSGBUS_CMD: MessageBusCommands = Rc::new(RefCell::new(VecDeque::new()));
112}
113
114pub trait Runner {
115    fn new() -> Self;
116    fn run(&mut self, engine: &mut DataEngine);
117}
118
119pub trait SendResponse {
120    fn send(&self, resp: DataResponse);
121}
122
123pub type DataResponseQueue = Rc<RefCell<SyncDataQueue>>;
124
125pub struct BacktestRunner {
126    dq: DataResponseQueue,
127    pub clock: Rc<RefCell<TestClock>>,
128}
129
130impl Runner for BacktestRunner {
131    fn new() -> Self {
132        let clock = Rc::new(RefCell::new(TestClock::new()));
133        set_clock(clock.clone());
134
135        let dq = Rc::new(RefCell::new(SyncDataQueue(VecDeque::new())));
136        set_data_queue(dq.clone());
137        Self { dq, clock }
138    }
139
140    fn run(&mut self, engine: &mut DataEngine) {
141        let msgbus_cmd = get_msgbus_cmd();
142
143        while let Some(resp) = self.dq.as_ref().borrow_mut().0.pop_front() {
144            match resp {
145                DataEvent::Response(resp) => engine.response(resp),
146                DataEvent::Data(data) => {
147                    while let Some(sub_cmd) = msgbus_cmd.borrow_mut().pop_front() {
148                        engine.execute(sub_cmd);
149                    }
150
151                    // Advance clock time and collect all triggered events and handlers
152                    let handlers: Vec<TimeEventHandlerV2> = {
153                        let mut guard = self.clock.borrow_mut();
154                        guard.advance_to_time_on_heap(data.ts_init());
155                        guard.by_ref().collect()
156                        // drop guard
157                    };
158
159                    // Execute all handlers before processing the data
160                    handlers.into_iter().for_each(TimeEventHandlerV2::run);
161
162                    engine.process_data(data);
163                }
164            }
165        }
166    }
167}
168
169pub struct LiveRunner {
170    resp_rx: UnboundedReceiver<DataEvent>,
171    pub clock: Rc<RefCell<LiveClock>>,
172}
173
174impl Runner for LiveRunner {
175    fn new() -> Self {
176        let (resp_tx, resp_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
177        set_data_queue(Rc::new(RefCell::new(AsyncDataQueue(resp_tx))));
178
179        let clock = Rc::new(RefCell::new(LiveClock::new()));
180        set_clock(clock.clone());
181
182        Self { resp_rx, clock }
183    }
184
185    fn run(&mut self, engine: &mut DataEngine) {
186        let mut time_event_stream = self.clock.borrow().get_event_stream();
187        let msgbus_cmd = get_msgbus_cmd();
188
189        loop {
190            while let Some(sub_cmd) = msgbus_cmd.borrow_mut().pop_front() {
191                engine.execute(sub_cmd);
192            }
193
194            // Collect the next event to process
195            let next_event = get_runtime().block_on(async {
196                tokio::select! {
197                    Some(resp) = self.resp_rx.recv() => Some(RunnerEvent::Data(resp)),
198                    Some(event) = time_event_stream.next() => Some(RunnerEvent::Timer(event)),
199                    else => None,
200                }
201            });
202
203            // Process the event outside of the async context
204            match next_event {
205                Some(RunnerEvent::Data(resp)) => match resp {
206                    DataEvent::Response(resp) => engine.response(resp),
207                    DataEvent::Data(data) => engine.process_data(data),
208                },
209                Some(RunnerEvent::Timer(event)) => self.clock.borrow().get_handler(event).run(),
210                None => break,
211            }
212        }
213    }
214}
215
216// Helper enum to represent different event types
217// TODO: Fix large enum variant problem
218#[allow(clippy::large_enum_variant)]
219enum RunnerEvent {
220    Data(DataEvent),
221    Timer(TimeEvent),
222}
223
224#[cfg(test)]
225#[cfg(feature = "clock_v2")]
226mod tests {
227    use std::{cell::RefCell, rc::Rc};
228
229    use futures::StreamExt;
230    use nautilus_common::{
231        clock::{LiveClock, TestClock},
232        timer::{TimeEvent, TimeEventCallback},
233    };
234
235    use super::{get_clock, set_clock};
236
237    #[test]
238    fn test_global_test_clock() {
239        let test_clock = Rc::new(RefCell::new(TestClock::new()));
240        set_clock(test_clock.clone());
241
242        // component/actor adding an alert
243        get_clock().borrow_mut().set_time_alert_ns(
244            "hola",
245            2.into(),
246            Some(TimeEventCallback::Rust(Rc::new(|event: TimeEvent| {}))),
247        );
248
249        // runner pulling advancing and pulling from event stream
250        test_clock.borrow_mut().advance_to_time_on_heap(3.into());
251        assert!(test_clock.borrow_mut().next().is_some());
252    }
253
254    #[tokio::test]
255    async fn test_global_live_clock() {
256        let live_clock = Rc::new(RefCell::new(LiveClock::new()));
257        set_clock(live_clock.clone());
258        let alert_time = live_clock.borrow().get_time_ns() + 100;
259
260        // component/actor adding an alert
261        get_clock().borrow_mut().set_time_alert_ns(
262            "hola",
263            alert_time,
264            Some(TimeEventCallback::Rust(Rc::new(|event: TimeEvent| {}))),
265        );
266
267        // runner pulling from event
268        assert!(live_clock
269            .borrow()
270            .get_event_stream()
271            .next()
272            .await
273            .is_some());
274    }
275}