Skip to main content

nautilus_common/
runner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Global runtime machinery and thread-local storage.
17//!
18//! This module provides global access to shared runtime resources including clocks,
19//! message queues, and time event channels. It manages thread-local storage for
20//! system-wide components that need to be accessible across threads.
21
22use std::{
23    cell::{OnceCell, RefCell},
24    fmt::Debug,
25    sync::Arc,
26};
27
28use crate::{
29    messages::{data::DataCommand, execution::TradingCommand},
30    msgbus::{self, MessagingSwitchboard},
31    timer::TimeEventHandler,
32};
33
34/// Trait for data command sending that can be implemented for both sync and async runners.
35pub trait DataCommandSender {
36    /// Executes a data command.
37    ///
38    /// - **Sync runners** send the command to a queue for synchronous execution.
39    /// - **Async runners** send the command to a channel for asynchronous execution.
40    fn execute(&self, command: DataCommand);
41}
42
43/// Synchronous [`DataCommandSender`] for backtest environments.
44///
45/// Buffers commands in a thread-local queue for deferred execution,
46/// avoiding `RefCell` re-entrancy when sent from event handler callbacks.
47#[derive(Debug)]
48pub struct SyncDataCommandSender;
49
50impl DataCommandSender for SyncDataCommandSender {
51    fn execute(&self, command: DataCommand) {
52        DATA_CMD_QUEUE.with(|q| q.borrow_mut().push(command));
53    }
54}
55
56/// Drain all buffered data commands, dispatching each to the data engine.
57pub fn drain_data_cmd_queue() {
58    DATA_CMD_QUEUE.with(|q| {
59        let commands: Vec<DataCommand> = q.borrow_mut().drain(..).collect();
60        let endpoint = MessagingSwitchboard::data_engine_execute();
61        for cmd in commands {
62            msgbus::send_data_command(endpoint, cmd);
63        }
64    });
65}
66
67/// Returns `true` if the data command queue is empty.
68pub fn data_cmd_queue_is_empty() -> bool {
69    DATA_CMD_QUEUE.with(|q| q.borrow().is_empty())
70}
71
72/// Gets the global data command sender.
73///
74/// # Panics
75///
76/// Panics if the sender is uninitialized.
77#[must_use]
78pub fn get_data_cmd_sender() -> Arc<dyn DataCommandSender> {
79    DATA_CMD_SENDER.with(|sender| {
80        sender
81            .get()
82            .expect("Data command sender should be initialized by runner")
83            .clone()
84    })
85}
86
87/// Sets the global data command sender.
88///
89/// This should be called by the runner when it initializes.
90/// Can only be called once per thread.
91///
92/// # Panics
93///
94/// Panics if a sender has already been set.
95pub fn set_data_cmd_sender(sender: Arc<dyn DataCommandSender>) {
96    DATA_CMD_SENDER.with(|s| {
97        assert!(
98            s.set(sender).is_ok(),
99            "Data command sender can only be set once"
100        );
101    });
102}
103
104/// Sets the global data command sender if not already set (idempotent).
105pub fn init_data_cmd_sender(sender: Arc<dyn DataCommandSender>) {
106    DATA_CMD_SENDER.with(|s| {
107        let _ = s.set(sender); // Ignore if already set
108    });
109}
110
111/// Trait for time event sending that can be implemented for both sync and async runners.
112pub trait TimeEventSender: Debug + Send + Sync {
113    /// Sends a time event handler.
114    fn send(&self, handler: TimeEventHandler);
115}
116
117/// Gets the global time event sender.
118///
119/// # Panics
120///
121/// Panics if the sender is uninitialized.
122#[must_use]
123pub fn get_time_event_sender() -> Arc<dyn TimeEventSender> {
124    TIME_EVENT_SENDER.with(|sender| {
125        sender
126            .get()
127            .expect("Time event sender should be initialized by runner")
128            .clone()
129    })
130}
131
132/// Attempts to get the global time event sender without panicking.
133///
134/// Returns `None` if the sender is not initialized (e.g., in test environments).
135#[must_use]
136pub fn try_get_time_event_sender() -> Option<Arc<dyn TimeEventSender>> {
137    TIME_EVENT_SENDER.with(|sender| sender.get().cloned())
138}
139
140/// Sets the global time event sender.
141///
142/// Can only be called once per thread.
143///
144/// # Panics
145///
146/// Panics if a sender has already been set.
147pub fn set_time_event_sender(sender: Arc<dyn TimeEventSender>) {
148    TIME_EVENT_SENDER.with(|s| {
149        assert!(
150            s.set(sender).is_ok(),
151            "Time event sender can only be set once"
152        );
153    });
154}
155
156/// Trait for trading command sending that can be implemented for both sync and async runners.
157pub trait TradingCommandSender {
158    /// Executes a trading command.
159    ///
160    /// - **Sync runners** send the command to a queue for synchronous execution.
161    /// - **Async runners** send the command to a channel for asynchronous execution.
162    fn execute(&self, command: TradingCommand);
163}
164
165/// Synchronous [`TradingCommandSender`] for backtest environments.
166///
167/// Buffers commands in a thread-local queue for deferred execution,
168/// avoiding `RefCell` re-entrancy when sent from event handler callbacks.
169#[derive(Debug)]
170pub struct SyncTradingCommandSender;
171
172impl TradingCommandSender for SyncTradingCommandSender {
173    fn execute(&self, command: TradingCommand) {
174        TRADING_CMD_QUEUE.with(|q| q.borrow_mut().push(command));
175    }
176}
177
178/// Drain all buffered trading commands, dispatching each to the exec engine.
179pub fn drain_trading_cmd_queue() {
180    TRADING_CMD_QUEUE.with(|q| {
181        let commands: Vec<TradingCommand> = q.borrow_mut().drain(..).collect();
182        let endpoint = MessagingSwitchboard::exec_engine_execute();
183        for cmd in commands {
184            msgbus::send_trading_command(endpoint, cmd);
185        }
186    });
187}
188
189/// Returns `true` if the trading command queue is empty.
190pub fn trading_cmd_queue_is_empty() -> bool {
191    TRADING_CMD_QUEUE.with(|q| q.borrow().is_empty())
192}
193
194/// Gets the global trading command sender.
195///
196/// # Panics
197///
198/// Panics if the sender is uninitialized.
199#[must_use]
200pub fn get_trading_cmd_sender() -> Arc<dyn TradingCommandSender> {
201    EXEC_CMD_SENDER.with(|sender| {
202        sender
203            .get()
204            .expect("Trading command sender should be initialized by runner")
205            .clone()
206    })
207}
208
209/// Attempts to get the global trading command sender without panicking.
210///
211/// Returns `None` if the sender is not initialized (e.g., in test environments).
212#[must_use]
213pub fn try_get_trading_cmd_sender() -> Option<Arc<dyn TradingCommandSender>> {
214    EXEC_CMD_SENDER.with(|sender| sender.get().cloned())
215}
216
217/// Sets the global trading command sender.
218///
219/// This should be called by the runner when it initializes.
220/// Can only be called once per thread.
221///
222/// # Panics
223///
224/// Panics if a sender has already been set.
225pub fn set_exec_cmd_sender(sender: Arc<dyn TradingCommandSender>) {
226    EXEC_CMD_SENDER.with(|s| {
227        assert!(
228            s.set(sender).is_ok(),
229            "Trading command sender can only be set once"
230        );
231    });
232}
233
234/// Sets the global trading command sender if not already set (idempotent).
235pub fn init_exec_cmd_sender(sender: Arc<dyn TradingCommandSender>) {
236    EXEC_CMD_SENDER.with(|s| {
237        let _ = s.set(sender); // Ignore if already set
238    });
239}
240
241thread_local! {
242    static TIME_EVENT_SENDER: OnceCell<Arc<dyn TimeEventSender>> = const { OnceCell::new() };
243    static DATA_CMD_SENDER: OnceCell<Arc<dyn DataCommandSender>> = const { OnceCell::new() };
244    static EXEC_CMD_SENDER: OnceCell<Arc<dyn TradingCommandSender>> = const { OnceCell::new() };
245    static DATA_CMD_QUEUE: RefCell<Vec<DataCommand>> = const { RefCell::new(Vec::new()) };
246    static TRADING_CMD_QUEUE: RefCell<Vec<TradingCommand>> = const { RefCell::new(Vec::new()) };
247}