nautilus_common/runner.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Global runtime machinery and thread-local storage.
17//!
18//! This module provides global access to shared runtime resources including clocks,
19//! message queues, and time event channels. It manages thread-local storage for
20//! system-wide components that need to be accessible across threads.
21
22use std::{
23 cell::{OnceCell, RefCell},
24 fmt::Debug,
25 sync::Arc,
26};
27
28use crate::{
29 messages::{data::DataCommand, execution::TradingCommand},
30 msgbus::{self, MessagingSwitchboard},
31 timer::TimeEventHandler,
32};
33
34/// Trait for data command sending that can be implemented for both sync and async runners.
35pub trait DataCommandSender {
36 /// Executes a data command.
37 ///
38 /// - **Sync runners** send the command to a queue for synchronous execution.
39 /// - **Async runners** send the command to a channel for asynchronous execution.
40 fn execute(&self, command: DataCommand);
41}
42
43/// Synchronous [`DataCommandSender`] for backtest environments.
44///
45/// Buffers commands in a thread-local queue for deferred execution,
46/// avoiding `RefCell` re-entrancy when sent from event handler callbacks.
47#[derive(Debug)]
48pub struct SyncDataCommandSender;
49
50impl DataCommandSender for SyncDataCommandSender {
51 fn execute(&self, command: DataCommand) {
52 DATA_CMD_QUEUE.with(|q| q.borrow_mut().push(command));
53 }
54}
55
56/// Drain all buffered data commands, dispatching each to the data engine.
57pub fn drain_data_cmd_queue() {
58 DATA_CMD_QUEUE.with(|q| {
59 let commands: Vec<DataCommand> = q.borrow_mut().drain(..).collect();
60 let endpoint = MessagingSwitchboard::data_engine_execute();
61 for cmd in commands {
62 msgbus::send_data_command(endpoint, cmd);
63 }
64 });
65}
66
67/// Returns `true` if the data command queue is empty.
68pub fn data_cmd_queue_is_empty() -> bool {
69 DATA_CMD_QUEUE.with(|q| q.borrow().is_empty())
70}
71
72/// Gets the global data command sender.
73///
74/// # Panics
75///
76/// Panics if the sender is uninitialized.
77#[must_use]
78pub fn get_data_cmd_sender() -> Arc<dyn DataCommandSender> {
79 DATA_CMD_SENDER.with(|sender| {
80 sender
81 .get()
82 .expect("Data command sender should be initialized by runner")
83 .clone()
84 })
85}
86
87/// Sets the global data command sender.
88///
89/// This should be called by the runner when it initializes.
90/// Can only be called once per thread.
91///
92/// # Panics
93///
94/// Panics if a sender has already been set.
95pub fn set_data_cmd_sender(sender: Arc<dyn DataCommandSender>) {
96 DATA_CMD_SENDER.with(|s| {
97 assert!(
98 s.set(sender).is_ok(),
99 "Data command sender can only be set once"
100 );
101 });
102}
103
104/// Sets the global data command sender if not already set (idempotent).
105pub fn init_data_cmd_sender(sender: Arc<dyn DataCommandSender>) {
106 DATA_CMD_SENDER.with(|s| {
107 let _ = s.set(sender); // Ignore if already set
108 });
109}
110
111/// Trait for time event sending that can be implemented for both sync and async runners.
112pub trait TimeEventSender: Debug + Send + Sync {
113 /// Sends a time event handler.
114 fn send(&self, handler: TimeEventHandler);
115}
116
117/// Gets the global time event sender.
118///
119/// # Panics
120///
121/// Panics if the sender is uninitialized.
122#[must_use]
123pub fn get_time_event_sender() -> Arc<dyn TimeEventSender> {
124 TIME_EVENT_SENDER.with(|sender| {
125 sender
126 .get()
127 .expect("Time event sender should be initialized by runner")
128 .clone()
129 })
130}
131
132/// Attempts to get the global time event sender without panicking.
133///
134/// Returns `None` if the sender is not initialized (e.g., in test environments).
135#[must_use]
136pub fn try_get_time_event_sender() -> Option<Arc<dyn TimeEventSender>> {
137 TIME_EVENT_SENDER.with(|sender| sender.get().cloned())
138}
139
140/// Sets the global time event sender.
141///
142/// Can only be called once per thread.
143///
144/// # Panics
145///
146/// Panics if a sender has already been set.
147pub fn set_time_event_sender(sender: Arc<dyn TimeEventSender>) {
148 TIME_EVENT_SENDER.with(|s| {
149 assert!(
150 s.set(sender).is_ok(),
151 "Time event sender can only be set once"
152 );
153 });
154}
155
156/// Trait for trading command sending that can be implemented for both sync and async runners.
157pub trait TradingCommandSender {
158 /// Executes a trading command.
159 ///
160 /// - **Sync runners** send the command to a queue for synchronous execution.
161 /// - **Async runners** send the command to a channel for asynchronous execution.
162 fn execute(&self, command: TradingCommand);
163}
164
165/// Synchronous [`TradingCommandSender`] for backtest environments.
166///
167/// Buffers commands in a thread-local queue for deferred execution,
168/// avoiding `RefCell` re-entrancy when sent from event handler callbacks.
169#[derive(Debug)]
170pub struct SyncTradingCommandSender;
171
172impl TradingCommandSender for SyncTradingCommandSender {
173 fn execute(&self, command: TradingCommand) {
174 TRADING_CMD_QUEUE.with(|q| q.borrow_mut().push(command));
175 }
176}
177
178/// Drain all buffered trading commands, dispatching each to the exec engine.
179pub fn drain_trading_cmd_queue() {
180 TRADING_CMD_QUEUE.with(|q| {
181 let commands: Vec<TradingCommand> = q.borrow_mut().drain(..).collect();
182 let endpoint = MessagingSwitchboard::exec_engine_execute();
183 for cmd in commands {
184 msgbus::send_trading_command(endpoint, cmd);
185 }
186 });
187}
188
189/// Returns `true` if the trading command queue is empty.
190pub fn trading_cmd_queue_is_empty() -> bool {
191 TRADING_CMD_QUEUE.with(|q| q.borrow().is_empty())
192}
193
194/// Gets the global trading command sender.
195///
196/// # Panics
197///
198/// Panics if the sender is uninitialized.
199#[must_use]
200pub fn get_trading_cmd_sender() -> Arc<dyn TradingCommandSender> {
201 EXEC_CMD_SENDER.with(|sender| {
202 sender
203 .get()
204 .expect("Trading command sender should be initialized by runner")
205 .clone()
206 })
207}
208
209/// Attempts to get the global trading command sender without panicking.
210///
211/// Returns `None` if the sender is not initialized (e.g., in test environments).
212#[must_use]
213pub fn try_get_trading_cmd_sender() -> Option<Arc<dyn TradingCommandSender>> {
214 EXEC_CMD_SENDER.with(|sender| sender.get().cloned())
215}
216
217/// Sets the global trading command sender.
218///
219/// This should be called by the runner when it initializes.
220/// Can only be called once per thread.
221///
222/// # Panics
223///
224/// Panics if a sender has already been set.
225pub fn set_exec_cmd_sender(sender: Arc<dyn TradingCommandSender>) {
226 EXEC_CMD_SENDER.with(|s| {
227 assert!(
228 s.set(sender).is_ok(),
229 "Trading command sender can only be set once"
230 );
231 });
232}
233
234/// Sets the global trading command sender if not already set (idempotent).
235pub fn init_exec_cmd_sender(sender: Arc<dyn TradingCommandSender>) {
236 EXEC_CMD_SENDER.with(|s| {
237 let _ = s.set(sender); // Ignore if already set
238 });
239}
240
241thread_local! {
242 static TIME_EVENT_SENDER: OnceCell<Arc<dyn TimeEventSender>> = const { OnceCell::new() };
243 static DATA_CMD_SENDER: OnceCell<Arc<dyn DataCommandSender>> = const { OnceCell::new() };
244 static EXEC_CMD_SENDER: OnceCell<Arc<dyn TradingCommandSender>> = const { OnceCell::new() };
245 static DATA_CMD_QUEUE: RefCell<Vec<DataCommand>> = const { RefCell::new(Vec::new()) };
246 static TRADING_CMD_QUEUE: RefCell<Vec<TradingCommand>> = const { RefCell::new(Vec::new()) };
247}