1use std::str::FromStr;
17
18use nautilus_core::{
19 UUID4, UnixNanos,
20 python::{IntoPyObjectNautilusExt, to_pyvalue_err},
21};
22use pyo3::{
23 IntoPyObjectExt,
24 basic::CompareOp,
25 prelude::*,
26 types::{PyInt, PyString, PyTuple},
27};
28use ustr::Ustr;
29
30use crate::timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2};
31
32#[pyo3::pyclass(
33 module = "nautilus_trader.core.nautilus_pyo3.common",
34 name = "TimeEventHandler"
35)]
36#[allow(non_camel_case_types)]
43#[derive(Debug)]
44pub struct TimeEventHandler_Py {
45 pub event: TimeEvent,
47 pub callback: Py<PyAny>,
49}
50
51impl From<TimeEventHandlerV2> for TimeEventHandler_Py {
52 fn from(value: TimeEventHandlerV2) -> Self {
57 Self {
58 event: value.event,
59 callback: match value.callback {
60 #[cfg(feature = "python")]
61 TimeEventCallback::Python(callback) => callback,
62 TimeEventCallback::Rust(_) => {
63 panic!("Python time event handler is not supported for Rust callbacks")
64 }
65 },
66 }
67 }
68}
69
70#[pymethods]
71impl TimeEvent {
72 #[new]
73 fn py_new(name: &str, event_id: UUID4, ts_event: u64, ts_init: u64) -> Self {
74 Self::new(Ustr::from(name), event_id, ts_event.into(), ts_init.into())
75 }
76
77 fn __setstate__(&mut self, state: &Bound<'_, PyAny>) -> PyResult<()> {
78 let py_tuple: &Bound<'_, PyTuple> = state.downcast::<PyTuple>()?;
79
80 let ts_event = py_tuple
81 .get_item(2)?
82 .downcast::<PyInt>()?
83 .extract::<u64>()?;
84 let ts_init: u64 = py_tuple
85 .get_item(3)?
86 .downcast::<PyInt>()?
87 .extract::<u64>()?;
88
89 self.name = Ustr::from(
90 py_tuple
91 .get_item(0)?
92 .downcast::<PyString>()?
93 .extract::<&str>()?,
94 );
95 self.event_id = UUID4::from_str(
96 py_tuple
97 .get_item(1)?
98 .downcast::<PyString>()?
99 .extract::<&str>()?,
100 )
101 .map_err(to_pyvalue_err)?;
102 self.ts_event = ts_event.into();
103 self.ts_init = ts_init.into();
104
105 Ok(())
106 }
107
108 fn __getstate__(&self, py: Python) -> PyResult<Py<PyAny>> {
109 (
110 self.name.to_string(),
111 self.event_id.to_string(),
112 self.ts_event.as_u64(),
113 self.ts_init.as_u64(),
114 )
115 .into_py_any(py)
116 }
117
118 fn __reduce__(&self, py: Python) -> PyResult<Py<PyAny>> {
119 let safe_constructor = py.get_type::<Self>().getattr("_safe_constructor")?;
120 let state = self.__getstate__(py)?;
121 (safe_constructor, PyTuple::empty(py), state).into_py_any(py)
122 }
123
124 #[staticmethod]
125 fn _safe_constructor() -> Self {
126 Self::new(
127 Ustr::from("NULL"),
128 UUID4::new(),
129 UnixNanos::default(),
130 UnixNanos::default(),
131 )
132 }
133
134 fn __richcmp__(&self, other: &Self, op: CompareOp, py: Python<'_>) -> Py<PyAny> {
135 match op {
136 CompareOp::Eq => self.eq(other).into_py_any_unwrap(py),
137 CompareOp::Ne => self.ne(other).into_py_any_unwrap(py),
138 _ => py.NotImplemented(),
139 }
140 }
141
142 fn __repr__(&self) -> String {
143 self.to_string()
144 }
145
146 #[getter]
147 #[pyo3(name = "name")]
148 fn py_name(&self) -> String {
149 self.name.to_string()
150 }
151
152 #[getter]
153 #[pyo3(name = "event_id")]
154 const fn py_event_id(&self) -> UUID4 {
155 self.event_id
156 }
157
158 #[getter]
159 #[pyo3(name = "ts_event")]
160 const fn py_ts_event(&self) -> u64 {
161 self.ts_event.as_u64()
162 }
163
164 #[getter]
165 #[pyo3(name = "ts_init")]
166 const fn py_ts_init(&self) -> u64 {
167 self.ts_init.as_u64()
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use std::{num::NonZeroU64, sync::Arc};
174
175 use nautilus_core::{
176 UnixNanos, datetime::NANOSECONDS_IN_MILLISECOND, python::IntoPyObjectNautilusExt,
177 time::get_atomic_clock_realtime,
178 };
179 use pyo3::prelude::*;
180 use tokio::time::Duration;
181
182 use crate::{
183 runner::{TimeEventSender, set_time_event_sender},
184 testing::wait_until,
185 timer::{LiveTimer, TimeEvent, TimeEventCallback},
186 };
187
188 #[pyfunction]
189 const fn receive_event(_py: Python, _event: TimeEvent) -> PyResult<()> {
190 Ok(())
192 }
193
194 #[derive(Debug)]
195 struct TestTimeEventSender;
196
197 impl TimeEventSender for TestTimeEventSender {
198 fn send(&self, _handler: crate::timer::TimeEventHandlerV2) {
199 }
201 }
202
203 #[tokio::test]
204 async fn test_live_timer_starts_and_stops() {
205 set_time_event_sender(Arc::new(TestTimeEventSender));
206
207 Python::initialize();
208 let callback = Python::attach(|py| {
209 let callable = wrap_pyfunction!(receive_event, py).unwrap();
210 let callable = callable.into_py_any_unwrap(py);
211 TimeEventCallback::from(callable)
212 });
213
214 let clock = get_atomic_clock_realtime();
216 let start_time = clock.get_time_ns();
217 let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
218
219 let test_sender = Arc::new(TestTimeEventSender);
220 let mut timer = LiveTimer::new(
221 "TEST_TIMER".into(),
222 interval_ns,
223 start_time,
224 None,
225 callback,
226 false,
227 Some(test_sender),
228 );
229
230 let next_time_ns = timer.next_time_ns();
231 timer.start();
232
233 tokio::time::sleep(Duration::from_millis(300)).await;
235
236 timer.cancel();
237 wait_until(|| timer.is_expired(), Duration::from_secs(2));
238 assert!(timer.next_time_ns() > next_time_ns);
239 }
240
241 #[tokio::test]
242 async fn test_live_timer_with_stop_time() {
243 set_time_event_sender(Arc::new(TestTimeEventSender));
244
245 Python::initialize();
246 let callback = Python::attach(|py| {
247 let callable = wrap_pyfunction!(receive_event, py).unwrap();
248 let callable = callable.into_py_any_unwrap(py);
249 TimeEventCallback::from(callable)
250 });
251
252 let clock = get_atomic_clock_realtime();
254 let start_time = clock.get_time_ns();
255 let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
256 let stop_time = start_time + 500 * NANOSECONDS_IN_MILLISECOND;
257
258 let test_sender = Arc::new(TestTimeEventSender);
259 let mut timer = LiveTimer::new(
260 "TEST_TIMER".into(),
261 interval_ns,
262 start_time,
263 Some(stop_time),
264 callback,
265 false,
266 Some(test_sender),
267 );
268
269 let next_time_ns = timer.next_time_ns();
270 timer.start();
271
272 tokio::time::sleep(Duration::from_secs(1)).await;
274
275 wait_until(|| timer.is_expired(), Duration::from_secs(2));
276 assert!(timer.next_time_ns() > next_time_ns);
277 }
278
279 #[tokio::test]
280 async fn test_live_timer_with_zero_interval_and_immediate_stop_time() {
281 set_time_event_sender(Arc::new(TestTimeEventSender));
282
283 Python::initialize();
284 let callback = Python::attach(|py| {
285 let callable = wrap_pyfunction!(receive_event, py).unwrap();
286 let callable = callable.into_py_any_unwrap(py);
287 TimeEventCallback::from(callable)
288 });
289
290 let clock = get_atomic_clock_realtime();
292 let start_time = UnixNanos::default();
293 let interval_ns = NonZeroU64::new(1).unwrap();
294 let stop_time = clock.get_time_ns();
295
296 let test_sender = Arc::new(TestTimeEventSender);
297 let mut timer = LiveTimer::new(
298 "TEST_TIMER".into(),
299 interval_ns,
300 start_time,
301 Some(stop_time),
302 callback,
303 false,
304 Some(test_sender),
305 );
306
307 timer.start();
308
309 wait_until(|| timer.is_expired(), Duration::from_secs(2));
310 }
311}