nautilus_backtest/
runner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21    cell::{OnceCell, RefCell},
22    collections::VecDeque,
23    rc::Rc,
24};
25
26use nautilus_common::{
27    clock::{Clock, TestClock},
28    messages::data::{DataEvent, SubscriptionCommand},
29    runner::{DataQueue, DataResponseQueue, GlobalDataQueue},
30};
31use nautilus_data::engine::DataEngine;
32
33pub struct SyncDataQueue(VecDeque<DataEvent>);
34
35impl DataQueue for SyncDataQueue {
36    fn push(&mut self, event: DataEvent) {
37        self.0.push_back(event);
38    }
39}
40
41#[must_use]
42pub fn get_data_queue() -> Rc<RefCell<dyn DataQueue>> {
43    DATA_QUEUE
44        .try_with(|dq| {
45            dq.get()
46                .expect("Data queue should be initialized by runner")
47                .clone()
48        })
49        .expect("Should be able to access thread local storage")
50}
51
52pub fn set_data_queue(dq: Rc<RefCell<dyn DataQueue>>) {
53    DATA_QUEUE
54        .try_with(|deque| {
55            assert!(deque.set(dq).is_ok(), "Global data queue already set");
56        })
57        .expect("Should be able to access thread local storage");
58}
59
60pub type GlobalClock = Rc<RefCell<dyn Clock>>;
61
62#[must_use]
63pub fn get_clock() -> Rc<RefCell<dyn Clock>> {
64    CLOCK
65        .try_with(|clock| {
66            clock
67                .get()
68                .expect("Clock should be initialized by runner")
69                .clone()
70        })
71        .expect("Should be able to access thread local storage")
72}
73
74pub fn set_clock(c: Rc<RefCell<dyn Clock>>) {
75    CLOCK
76        .try_with(|clock| {
77            assert!(clock.set(c).is_ok(), "Global clock already set");
78        })
79        .expect("Should be able to access thread local clock");
80}
81
82pub type MessageBusCommands = Rc<RefCell<VecDeque<SubscriptionCommand>>>;
83
84/// Get globally shared message bus command queue
85#[must_use]
86pub fn get_msgbus_cmd() -> MessageBusCommands {
87    MSGBUS_CMD
88        .try_with(std::clone::Clone::clone)
89        .expect("Should be able to access thread local storage")
90}
91
92thread_local! {
93    static CLOCK: OnceCell<GlobalClock> = OnceCell::new();
94    static DATA_QUEUE: OnceCell<GlobalDataQueue> = OnceCell::new();
95    static MSGBUS_CMD: MessageBusCommands = Rc::new(RefCell::new(VecDeque::new()));
96}
97
98// TODO: Determine how to deduplicate trait
99pub trait Runner {
100    fn new() -> Self;
101    fn run(&mut self, engine: &mut DataEngine);
102}
103
104pub struct BacktestRunner {
105    pub dq: DataResponseQueue,
106    pub clock: Rc<RefCell<TestClock>>,
107}
108
109// TODO: Untangle puzzle later
110// impl Runner for BacktestRunner {
111//     fn new() -> Self {
112//         let clock = Rc::new(RefCell::new(TestClock::new()));
113//         set_clock(clock.clone());
114//
115//         let dq = Rc::new(RefCell::new(SyncDataQueue(VecDeque::new())));
116//         set_data_queue(dq.clone());
117//         Self { dq, clock }
118//     }
119//
120//     fn run(&mut self, engine: &mut DataEngine) {
121//         let msgbus_cmd = get_msgbus_cmd();
122//
123//         while let Some(resp) = self.dq.as_ref().borrow_mut().0.pop_front() {
124//             match resp {
125//                 DataEvent::Response(resp) => engine.response(resp),
126//                 DataEvent::Data(data) => {
127//                     while let Some(sub_cmd) = msgbus_cmd.borrow_mut().pop_front() {
128//                         engine.execute(sub_cmd);
129//                     }
130//
131//                     // Advance clock time and collect all triggered events and handlers
132//                     let handlers: Vec<TimeEventHandlerV2> = {
133//                         let mut guard = self.clock.borrow_mut();
134//                         guard.advance_to_time_on_heap(data.ts_init());
135//                         guard.by_ref().collect()
136//                         // drop guard
137//                     };
138//
139//                     // Execute all handlers before processing the data
140//                     handlers.into_iter().for_each(TimeEventHandlerV2::run);
141//
142//                     engine.process_data(data);
143//                 }
144//             }
145//         }
146//     }
147// }
148
149#[cfg(test)]
150#[cfg(feature = "clock_v2")]
151mod tests {
152    use std::{cell::RefCell, rc::Rc};
153
154    use futures::StreamExt;
155    use nautilus_common::{
156        clock::{LiveClock, TestClock},
157        timer::{TimeEvent, TimeEventCallback},
158    };
159
160    use super::{get_clock, set_clock};
161
162    #[test]
163    fn test_global_test_clock() {
164        let test_clock = Rc::new(RefCell::new(TestClock::new()));
165        set_clock(test_clock.clone());
166
167        // component/actor adding an alert
168        get_clock().borrow_mut().set_time_alert_ns(
169            "hola",
170            2.into(),
171            Some(TimeEventCallback::Rust(Rc::new(|event: TimeEvent| {}))),
172        );
173
174        // runner pulling advancing and pulling from event stream
175        test_clock.borrow_mut().advance_to_time_on_heap(3.into());
176        assert!(test_clock.borrow_mut().next().is_some());
177    }
178}