nautilus_common/
runner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Global runtime machinery and thread-local storage.
17//!
18//! This module provides global access to shared runtime resources including clocks,
19//! message queues, and time event channels. It manages thread-local storage for
20//! system-wide components that need to be accessible across threads.
21
22use std::{cell::OnceCell, fmt::Debug, sync::Arc};
23
24use crate::{
25    messages::{DataEvent, data::DataCommand},
26    msgbus::{self, switchboard::MessagingSwitchboard},
27    timer::TimeEventHandlerV2,
28};
29
30/// Trait for data command sending that can be implemented for both sync and async runners.
31pub trait DataCommandSender {
32    /// Executes a data command.
33    ///
34    /// - **Sync runners** send the command to a queue for synchronous execution.
35    /// - **Async runners** send the command to a channel for asynchronous execution.
36    fn execute(&self, command: DataCommand);
37}
38
39/// Synchronous implementation of DataCommandSender for backtest environments.
40#[derive(Debug)]
41pub struct SyncDataCommandSender;
42
43impl DataCommandSender for SyncDataCommandSender {
44    fn execute(&self, command: DataCommand) {
45        // TODO: Placeholder, we still need to queue and drain even for sync
46        let endpoint = MessagingSwitchboard::data_engine_execute();
47        msgbus::send_any(endpoint, &command);
48    }
49}
50
51/// Gets the global data command sender.
52///
53/// # Panics
54///
55/// Panics if the sender is uninitialized.
56#[must_use]
57pub fn get_data_cmd_sender() -> Arc<dyn DataCommandSender> {
58    DATA_CMD_SENDER.with(|sender| {
59        sender
60            .get()
61            .expect("Data command sender should be initialized by runner")
62            .clone()
63    })
64}
65
66/// Sets the global data command sender.
67///
68/// This should be called by the runner when it initializes.
69/// Can only be called once per thread.
70///
71/// # Panics
72///
73/// Panics if a sender has already been set.
74pub fn set_data_cmd_sender(sender: Arc<dyn DataCommandSender>) {
75    DATA_CMD_SENDER.with(|s| {
76        if s.set(sender).is_err() {
77            panic!("Data command sender can only be set once");
78        }
79    });
80}
81
82/// Trait for time event sending that can be implemented for both sync and async runners.
83pub trait TimeEventSender: Debug + Send + Sync {
84    /// Sends a time event handler.
85    fn send(&self, handler: TimeEventHandlerV2);
86}
87
88/// Gets the global time event sender.
89///
90/// # Panics
91///
92/// Panics if the sender is uninitialized.
93#[must_use]
94pub fn get_time_event_sender() -> Arc<dyn TimeEventSender> {
95    TIME_EVENT_SENDER.with(|sender| {
96        sender
97            .get()
98            .expect("Time event sender should be initialized by runner")
99            .clone()
100    })
101}
102
103/// Attempts to get the global time event sender without panicking.
104///
105/// Returns `None` if the sender is not initialized (e.g., in test environments).
106#[must_use]
107pub fn try_get_time_event_sender() -> Option<Arc<dyn TimeEventSender>> {
108    TIME_EVENT_SENDER.with(|sender| sender.get().cloned())
109}
110
111/// Sets the global time event sender.
112///
113/// Can only be called once per thread.
114///
115/// # Panics
116///
117/// Panics if a sender has already been set.
118pub fn set_time_event_sender(sender: Arc<dyn TimeEventSender>) {
119    TIME_EVENT_SENDER.with(|s| {
120        if s.set(sender).is_err() {
121            panic!("Time event sender can only be set once");
122        }
123    });
124}
125
126/// Gets the global data event sender.
127///
128/// # Panics
129///
130/// Panics if the sender is uninitialized.
131#[must_use]
132pub fn get_data_event_sender() -> tokio::sync::mpsc::UnboundedSender<DataEvent> {
133    DATA_EVENT_SENDER.with(|sender| {
134        sender
135            .get()
136            .expect("Data event sender should be initialized by runner")
137            .clone()
138    })
139}
140
141/// Sets the global data event sender.
142///
143/// Can only be called once per thread.
144///
145/// # Panics
146///
147/// Panics if a sender has already been set.
148pub fn set_data_event_sender(sender: tokio::sync::mpsc::UnboundedSender<DataEvent>) {
149    DATA_EVENT_SENDER.with(|s| {
150        if s.set(sender).is_err() {
151            panic!("Data event sender can only be set once");
152        }
153    });
154}
155
156// TODO: We can refine this for the synch runner later, data event sender won't be required
157thread_local! {
158    static TIME_EVENT_SENDER: OnceCell<Arc<dyn TimeEventSender>> = const { OnceCell::new() };
159    static DATA_EVENT_SENDER: OnceCell<tokio::sync::mpsc::UnboundedSender<DataEvent>> = const { OnceCell::new() };
160    static DATA_CMD_SENDER: OnceCell<Arc<dyn DataCommandSender>> = const { OnceCell::new() };
161}