1use std::{str::FromStr, sync::Arc};
17
18use nautilus_core::{
19 UUID4, UnixNanos,
20 python::{IntoPyObjectNautilusExt, to_pyvalue_err},
21};
22use pyo3::{
23 IntoPyObjectExt,
24 basic::CompareOp,
25 prelude::*,
26 types::{PyInt, PyString, PyTuple},
27};
28use ustr::Ustr;
29
30use crate::timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2};
31
32#[pyo3::pyclass(
33 module = "nautilus_trader.core.nautilus_pyo3.common",
34 name = "TimeEventHandler"
35)]
36#[derive(Clone)]
37#[allow(non_camel_case_types)]
44pub struct TimeEventHandler_Py {
45 pub event: TimeEvent,
47 pub callback: Arc<PyObject>,
49}
50
51impl From<TimeEventHandlerV2> for TimeEventHandler_Py {
52 fn from(value: TimeEventHandlerV2) -> Self {
53 Self {
54 event: value.event,
55 callback: match value.callback {
56 TimeEventCallback::Python(callback) => callback,
57 TimeEventCallback::Rust(_) => {
58 panic!("Python time event handler is not supported for Rust callback")
59 }
60 },
61 }
62 }
63}
64
65#[pymethods]
66impl TimeEvent {
67 #[new]
68 fn py_new(name: &str, event_id: UUID4, ts_event: u64, ts_init: u64) -> Self {
69 Self::new(Ustr::from(name), event_id, ts_event.into(), ts_init.into())
70 }
71
72 fn __setstate__(&mut self, state: &Bound<'_, PyAny>) -> PyResult<()> {
73 let py_tuple: &Bound<'_, PyTuple> = state.downcast::<PyTuple>()?;
74
75 let ts_event = py_tuple
76 .get_item(2)?
77 .downcast::<PyInt>()?
78 .extract::<u64>()?;
79 let ts_init: u64 = py_tuple
80 .get_item(3)?
81 .downcast::<PyInt>()?
82 .extract::<u64>()?;
83
84 self.name = Ustr::from(
85 py_tuple
86 .get_item(0)?
87 .downcast::<PyString>()?
88 .extract::<&str>()?,
89 );
90 self.event_id = UUID4::from_str(
91 py_tuple
92 .get_item(1)?
93 .downcast::<PyString>()?
94 .extract::<&str>()?,
95 )
96 .map_err(to_pyvalue_err)?;
97 self.ts_event = ts_event.into();
98 self.ts_init = ts_init.into();
99
100 Ok(())
101 }
102
103 fn __getstate__(&self, py: Python) -> PyResult<PyObject> {
104 (
105 self.name.to_string(),
106 self.event_id.to_string(),
107 self.ts_event.as_u64(),
108 self.ts_init.as_u64(),
109 )
110 .into_py_any(py)
111 }
112
113 fn __reduce__(&self, py: Python) -> PyResult<PyObject> {
114 let safe_constructor = py.get_type::<Self>().getattr("_safe_constructor")?;
115 let state = self.__getstate__(py)?;
116 (safe_constructor, PyTuple::empty(py), state).into_py_any(py)
117 }
118
119 #[staticmethod]
120 fn _safe_constructor() -> Self {
121 Self::new(
122 Ustr::from("NULL"),
123 UUID4::new(),
124 UnixNanos::default(),
125 UnixNanos::default(),
126 )
127 }
128
129 fn __richcmp__(&self, other: &Self, op: CompareOp, py: Python<'_>) -> Py<PyAny> {
130 match op {
131 CompareOp::Eq => self.eq(other).into_py_any_unwrap(py),
132 CompareOp::Ne => self.ne(other).into_py_any_unwrap(py),
133 _ => py.NotImplemented(),
134 }
135 }
136
137 fn __repr__(&self) -> String {
138 format!("{}('{}')", stringify!(TimeEvent), self)
139 }
140
141 fn __str__(&self) -> String {
142 self.to_string()
143 }
144
145 #[getter]
146 #[pyo3(name = "name")]
147 fn py_name(&self) -> String {
148 self.name.to_string()
149 }
150
151 #[getter]
152 #[pyo3(name = "event_id")]
153 const fn py_event_id(&self) -> UUID4 {
154 self.event_id
155 }
156
157 #[getter]
158 #[pyo3(name = "ts_event")]
159 const fn py_ts_event(&self) -> u64 {
160 self.ts_event.as_u64()
161 }
162
163 #[getter]
164 #[pyo3(name = "ts_init")]
165 const fn py_ts_init(&self) -> u64 {
166 self.ts_init.as_u64()
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 #[rustfmt::skip]
173 #[cfg(feature = "clock_v2")]
174 use std::collections::BinaryHeap;
175
176 use std::num::NonZeroU64;
177 #[rustfmt::skip]
178 #[cfg(feature = "clock_v2")]
179 use std::sync::Arc;
180
181 #[rustfmt::skip]
182 #[cfg(feature = "clock_v2")]
183 use tokio::sync::Mutex;
184
185 use nautilus_core::{
186 UnixNanos, datetime::NANOSECONDS_IN_MILLISECOND, python::IntoPyObjectNautilusExt,
187 time::get_atomic_clock_realtime,
188 };
189 use pyo3::prelude::*;
190 use tokio::time::Duration;
191 use ustr::Ustr;
192
193 use crate::{
194 testing::wait_until,
195 timer::{LiveTimer, TimeEvent, TimeEventCallback},
196 };
197
198 #[pyfunction]
199 const fn receive_event(_py: Python, _event: TimeEvent) -> PyResult<()> {
200 Ok(())
202 }
203
204 #[tokio::test]
205 async fn test_live_timer_starts_and_stops() {
206 pyo3::prepare_freethreaded_python();
207
208 let callback = Python::with_gil(|py| {
209 let callable = wrap_pyfunction!(receive_event, py).unwrap();
210 let callable = callable.into_py_any_unwrap(py);
211 TimeEventCallback::from(callable)
212 });
213
214 let clock = get_atomic_clock_realtime();
216 let start_time = clock.get_time_ns();
217 let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
218
219 #[cfg(not(feature = "clock_v2"))]
220 let mut timer = LiveTimer::new(
221 Ustr::from("TEST_TIMER"),
222 interval_ns,
223 start_time,
224 None,
225 callback,
226 );
227
228 #[cfg(feature = "clock_v2")]
229 let (_heap, mut timer) = {
230 let heap = Arc::new(Mutex::new(BinaryHeap::new()));
231 (
232 heap.clone(),
233 LiveTimer::new("TEST_TIMER", interval_ns, start_time, None, callback, heap),
234 )
235 };
236 let next_time_ns = timer.next_time_ns();
237 timer.start();
238
239 tokio::time::sleep(Duration::from_millis(300)).await;
241
242 timer.cancel();
243 wait_until(|| timer.is_expired(), Duration::from_secs(2));
244 assert!(timer.next_time_ns() > next_time_ns);
245 }
246
247 #[tokio::test]
248 async fn test_live_timer_with_stop_time() {
249 pyo3::prepare_freethreaded_python();
250
251 let callback = Python::with_gil(|py| {
252 let callable = wrap_pyfunction!(receive_event, py).unwrap();
253 let callable = callable.into_py_any_unwrap(py);
254 TimeEventCallback::from(callable)
255 });
256
257 let clock = get_atomic_clock_realtime();
259 let start_time = clock.get_time_ns();
260 let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
261 let stop_time = start_time + 500 * NANOSECONDS_IN_MILLISECOND;
262
263 #[cfg(not(feature = "clock_v2"))]
264 let mut timer = LiveTimer::new(
265 Ustr::from("TEST_TIMER"),
266 interval_ns,
267 start_time,
268 Some(stop_time),
269 callback,
270 );
271
272 #[cfg(feature = "clock_v2")]
273 let (_heap, mut timer) = {
274 let heap = Arc::new(Mutex::new(BinaryHeap::new()));
275 (
276 heap.clone(),
277 LiveTimer::new(
278 "TEST_TIMER",
279 interval_ns,
280 start_time,
281 Some(stop_time),
282 callback,
283 heap,
284 ),
285 )
286 };
287
288 let next_time_ns = timer.next_time_ns();
289 timer.start();
290
291 tokio::time::sleep(Duration::from_secs(1)).await;
293
294 wait_until(|| timer.is_expired(), Duration::from_secs(2));
295 assert!(timer.next_time_ns() > next_time_ns);
296 }
297
298 #[tokio::test]
299 async fn test_live_timer_with_zero_interval_and_immediate_stop_time() {
300 pyo3::prepare_freethreaded_python();
301
302 let callback = Python::with_gil(|py| {
303 let callable = wrap_pyfunction!(receive_event, py).unwrap();
304 let callable = callable.into_py_any_unwrap(py);
305 TimeEventCallback::from(callable)
306 });
307
308 let clock = get_atomic_clock_realtime();
310 let start_time = UnixNanos::default();
311 let interval_ns = NonZeroU64::new(1).unwrap();
312 let stop_time = clock.get_time_ns();
313
314 #[cfg(not(feature = "clock_v2"))]
315 let mut timer = LiveTimer::new(
316 Ustr::from("TEST_TIMER"),
317 interval_ns,
318 start_time,
319 Some(stop_time),
320 callback,
321 );
322
323 #[cfg(feature = "clock_v2")]
324 let (_heap, mut timer) = {
325 let heap = Arc::new(Mutex::new(BinaryHeap::new()));
326 (
327 heap.clone(),
328 LiveTimer::new(
329 "TEST_TIMER",
330 interval_ns,
331 start_time,
332 Some(stop_time),
333 callback,
334 heap,
335 ),
336 )
337 };
338
339 timer.start();
340
341 wait_until(|| timer.is_expired(), Duration::from_secs(2));
342 }
343}