nautilus_backtest/runner.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21 cell::{OnceCell, RefCell},
22 collections::VecDeque,
23 rc::Rc,
24};
25
26use nautilus_common::{
27 clock::{Clock, TestClock},
28 messages::data::{DataEvent, SubscriptionCommand},
29 runner::{DataQueue, DataResponseQueue, GlobalDataQueue},
30};
31use nautilus_data::engine::DataEngine;
32
33pub struct SyncDataQueue(VecDeque<DataEvent>);
34
35impl DataQueue for SyncDataQueue {
36 fn push(&mut self, event: DataEvent) {
37 self.0.push_back(event);
38 }
39}
40
41#[must_use]
42pub fn get_data_queue() -> Rc<RefCell<dyn DataQueue>> {
43 DATA_QUEUE
44 .try_with(|dq| {
45 dq.get()
46 .expect("Data queue should be initialized by runner")
47 .clone()
48 })
49 .expect("Should be able to access thread local storage")
50}
51
52pub fn set_data_queue(dq: Rc<RefCell<dyn DataQueue>>) {
53 DATA_QUEUE
54 .try_with(|deque| {
55 assert!(deque.set(dq).is_ok(), "Global data queue already set");
56 })
57 .expect("Should be able to access thread local storage");
58}
59
60pub type GlobalClock = Rc<RefCell<dyn Clock>>;
61
62#[must_use]
63pub fn get_clock() -> Rc<RefCell<dyn Clock>> {
64 CLOCK
65 .try_with(|clock| {
66 clock
67 .get()
68 .expect("Clock should be initialized by runner")
69 .clone()
70 })
71 .expect("Should be able to access thread local storage")
72}
73
74pub fn set_clock(c: Rc<RefCell<dyn Clock>>) {
75 CLOCK
76 .try_with(|clock| {
77 assert!(clock.set(c).is_ok(), "Global clock already set");
78 })
79 .expect("Should be able to access thread local clock");
80}
81
82pub type MessageBusCommands = Rc<RefCell<VecDeque<SubscriptionCommand>>>;
83
84/// Get globally shared message bus command queue
85#[must_use]
86pub fn get_msgbus_cmd() -> MessageBusCommands {
87 MSGBUS_CMD
88 .try_with(std::clone::Clone::clone)
89 .expect("Should be able to access thread local storage")
90}
91
92thread_local! {
93 static CLOCK: OnceCell<GlobalClock> = OnceCell::new();
94 static DATA_QUEUE: OnceCell<GlobalDataQueue> = OnceCell::new();
95 static MSGBUS_CMD: MessageBusCommands = Rc::new(RefCell::new(VecDeque::new()));
96}
97
98// TODO: Determine how to deduplicate trait
99pub trait Runner {
100 fn new() -> Self;
101 fn run(&mut self, engine: &mut DataEngine);
102}
103
104pub struct BacktestRunner {
105 pub dq: DataResponseQueue,
106 pub clock: Rc<RefCell<TestClock>>,
107}
108
109// TODO: Untangle puzzle later
110// impl Runner for BacktestRunner {
111// fn new() -> Self {
112// let clock = Rc::new(RefCell::new(TestClock::new()));
113// set_clock(clock.clone());
114//
115// let dq = Rc::new(RefCell::new(SyncDataQueue(VecDeque::new())));
116// set_data_queue(dq.clone());
117// Self { dq, clock }
118// }
119//
120// fn run(&mut self, engine: &mut DataEngine) {
121// let msgbus_cmd = get_msgbus_cmd();
122//
123// while let Some(resp) = self.dq.as_ref().borrow_mut().0.pop_front() {
124// match resp {
125// DataEvent::Response(resp) => engine.response(resp),
126// DataEvent::Data(data) => {
127// while let Some(sub_cmd) = msgbus_cmd.borrow_mut().pop_front() {
128// engine.execute(sub_cmd);
129// }
130//
131// // Advance clock time and collect all triggered events and handlers
132// let handlers: Vec<TimeEventHandlerV2> = {
133// let mut guard = self.clock.borrow_mut();
134// guard.advance_to_time_on_heap(data.ts_init());
135// guard.by_ref().collect()
136// // drop guard
137// };
138//
139// // Execute all handlers before processing the data
140// handlers.into_iter().for_each(TimeEventHandlerV2::run);
141//
142// engine.process_data(data);
143// }
144// }
145// }
146// }
147// }
148
149#[cfg(test)]
150#[cfg(feature = "clock_v2")]
151mod tests {
152 use std::{cell::RefCell, rc::Rc};
153
154 use futures::StreamExt;
155 use nautilus_common::{
156 clock::{LiveClock, TestClock},
157 timer::{TimeEvent, TimeEventCallback},
158 };
159
160 use super::{get_clock, set_clock};
161
162 #[test]
163 fn test_global_test_clock() {
164 let test_clock = Rc::new(RefCell::new(TestClock::new()));
165 set_clock(test_clock.clone());
166
167 // component/actor adding an alert
168 get_clock().borrow_mut().set_time_alert_ns(
169 "hola",
170 2.into(),
171 Some(TimeEventCallback::Rust(Rc::new(|event: TimeEvent| {}))),
172 );
173
174 // runner pulling advancing and pulling from event stream
175 test_clock.borrow_mut().advance_to_time_on_heap(3.into());
176 assert!(test_clock.borrow_mut().next().is_some());
177 }
178}