nautilus_common/
runner.rs1use std::{
17 cell::{OnceCell, RefCell},
18 collections::VecDeque,
19 rc::Rc,
20};
21
22use nautilus_model::data::Data;
23
24use crate::{
25 clock::Clock,
26 messages::data::{CustomDataResponse, DataResponse, SubscribeCommand},
27 timer::TimeEvent,
28};
29
30pub trait DataQueue {
31 fn push(&mut self, event: DataEvent);
32}
33
34pub type GlobalDataQueue = Rc<RefCell<dyn DataQueue>>;
35
36#[allow(clippy::large_enum_variant)]
38#[derive(Debug)]
39pub enum DataEvent {
40 Response(DataResponse),
41 Data(Data),
42}
43
44#[derive(Debug)]
45pub struct SyncDataQueue(VecDeque<DataEvent>);
46
47impl DataQueue for SyncDataQueue {
48 fn push(&mut self, event: DataEvent) {
49 self.0.push_back(event);
50 }
51}
52
53#[must_use]
54pub fn get_data_queue() -> Rc<RefCell<dyn DataQueue>> {
55 DATA_QUEUE
56 .try_with(|dq| {
57 dq.get()
58 .expect("Data queue should be initialized by runner")
59 .clone()
60 })
61 .expect("Should be able to access thread local storage")
62}
63
64pub fn set_data_queue(dq: Rc<RefCell<dyn DataQueue>>) {
65 DATA_QUEUE
66 .try_with(|deque| {
67 assert!(deque.set(dq).is_ok(), "Global data queue already set");
68 })
69 .expect("Should be able to access thread local storage");
70}
71
72pub type GlobalClock = Rc<RefCell<dyn Clock>>;
73
74#[must_use]
75pub fn get_clock() -> Rc<RefCell<dyn Clock>> {
76 CLOCK
77 .try_with(|clock| {
78 clock
79 .get()
80 .expect("Clock should be initialized by runner")
81 .clone()
82 })
83 .expect("Should be able to access thread local storage")
84}
85
86pub fn set_clock(c: Rc<RefCell<dyn Clock>>) {
87 CLOCK
88 .try_with(|clock| {
89 assert!(clock.set(c).is_ok(), "Global clock already set");
90 })
91 .expect("Should be able to access thread local clock");
92}
93
94pub type MessageBusCommands = Rc<RefCell<VecDeque<SubscribeCommand>>>; #[must_use]
98pub fn get_msgbus_cmd() -> MessageBusCommands {
99 MSGBUS_CMD
100 .try_with(std::clone::Clone::clone)
101 .expect("Should be able to access thread local storage")
102}
103
104thread_local! {
105 static CLOCK: OnceCell<GlobalClock> = OnceCell::new();
106 static DATA_QUEUE: OnceCell<GlobalDataQueue> = OnceCell::new();
107 static MSGBUS_CMD: MessageBusCommands = Rc::new(RefCell::new(VecDeque::new()));
108}
109
110pub trait SendResponse {
111 fn send(&self, resp: CustomDataResponse);
112}
113
114pub type DataResponseQueue = Rc<RefCell<SyncDataQueue>>;
115
116#[allow(clippy::large_enum_variant)]
118#[derive(Debug)]
119pub enum RunnerEvent {
120 Data(DataEvent),
121 Timer(TimeEvent),
122}