1use std::{str::FromStr, sync::Arc};
17
18use nautilus_core::{
19 UUID4, UnixNanos,
20 python::{IntoPyObjectNautilusExt, to_pyvalue_err},
21};
22use pyo3::{
23 IntoPyObjectExt,
24 basic::CompareOp,
25 prelude::*,
26 types::{PyInt, PyString, PyTuple},
27};
28use ustr::Ustr;
29
30use crate::timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2};
31
32#[pyo3::pyclass(
33 module = "nautilus_trader.core.nautilus_pyo3.common",
34 name = "TimeEventHandler"
35)]
36#[derive(Clone)]
37#[allow(non_camel_case_types)]
44pub struct TimeEventHandler_Py {
45 pub event: TimeEvent,
47 pub callback: Arc<PyObject>,
49}
50
51impl From<TimeEventHandlerV2> for TimeEventHandler_Py {
52 fn from(value: TimeEventHandlerV2) -> Self {
53 Self {
54 event: value.event,
55 callback: match value.callback {
56 TimeEventCallback::Python(callback) => callback,
57 TimeEventCallback::Rust(_) => {
58 panic!("Python time event handler is not supported for Rust callback")
59 }
60 },
61 }
62 }
63}
64
65#[pymethods]
66impl TimeEvent {
67 #[new]
68 fn py_new(name: &str, event_id: UUID4, ts_event: u64, ts_init: u64) -> Self {
69 Self::new(Ustr::from(name), event_id, ts_event.into(), ts_init.into())
70 }
71
72 fn __setstate__(&mut self, state: &Bound<'_, PyAny>) -> PyResult<()> {
73 let py_tuple: &Bound<'_, PyTuple> = state.downcast::<PyTuple>()?;
74
75 let ts_event = py_tuple
76 .get_item(2)?
77 .downcast::<PyInt>()?
78 .extract::<u64>()?;
79 let ts_init: u64 = py_tuple
80 .get_item(3)?
81 .downcast::<PyInt>()?
82 .extract::<u64>()?;
83
84 self.name = Ustr::from(
85 py_tuple
86 .get_item(0)?
87 .downcast::<PyString>()?
88 .extract::<&str>()?,
89 );
90 self.event_id = UUID4::from_str(
91 py_tuple
92 .get_item(1)?
93 .downcast::<PyString>()?
94 .extract::<&str>()?,
95 )
96 .map_err(to_pyvalue_err)?;
97 self.ts_event = ts_event.into();
98 self.ts_init = ts_init.into();
99
100 Ok(())
101 }
102
103 fn __getstate__(&self, py: Python) -> PyResult<PyObject> {
104 (
105 self.name.to_string(),
106 self.event_id.to_string(),
107 self.ts_event.as_u64(),
108 self.ts_init.as_u64(),
109 )
110 .into_py_any(py)
111 }
112
113 fn __reduce__(&self, py: Python) -> PyResult<PyObject> {
114 let safe_constructor = py.get_type::<Self>().getattr("_safe_constructor")?;
115 let state = self.__getstate__(py)?;
116 (safe_constructor, PyTuple::empty(py), state).into_py_any(py)
117 }
118
119 #[staticmethod]
120 fn _safe_constructor() -> Self {
121 Self::new(
122 Ustr::from("NULL"),
123 UUID4::new(),
124 UnixNanos::default(),
125 UnixNanos::default(),
126 )
127 }
128
129 fn __richcmp__(&self, other: &Self, op: CompareOp, py: Python<'_>) -> Py<PyAny> {
130 match op {
131 CompareOp::Eq => self.eq(other).into_py_any_unwrap(py),
132 CompareOp::Ne => self.ne(other).into_py_any_unwrap(py),
133 _ => py.NotImplemented(),
134 }
135 }
136
137 fn __repr__(&self) -> String {
138 format!("{}('{}')", stringify!(TimeEvent), self)
139 }
140
141 fn __str__(&self) -> String {
142 self.to_string()
143 }
144
145 #[getter]
146 #[pyo3(name = "name")]
147 fn py_name(&self) -> String {
148 self.name.to_string()
149 }
150
151 #[getter]
152 #[pyo3(name = "event_id")]
153 const fn py_event_id(&self) -> UUID4 {
154 self.event_id
155 }
156
157 #[getter]
158 #[pyo3(name = "ts_event")]
159 const fn py_ts_event(&self) -> u64 {
160 self.ts_event.as_u64()
161 }
162
163 #[getter]
164 #[pyo3(name = "ts_init")]
165 const fn py_ts_init(&self) -> u64 {
166 self.ts_init.as_u64()
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 #[rustfmt::skip]
173 #[cfg(feature = "clock_v2")]
174 use std::collections::BinaryHeap;
175
176 use std::num::NonZeroU64;
177 #[rustfmt::skip]
178 #[cfg(feature = "clock_v2")]
179 use std::sync::Arc;
180
181 #[rustfmt::skip]
182 #[cfg(feature = "clock_v2")]
183 use tokio::sync::Mutex;
184
185 use nautilus_core::{
186 UnixNanos, datetime::NANOSECONDS_IN_MILLISECOND, python::IntoPyObjectNautilusExt,
187 time::get_atomic_clock_realtime,
188 };
189 use pyo3::prelude::*;
190 use tokio::time::Duration;
191
192 use crate::{
193 testing::wait_until,
194 timer::{LiveTimer, TimeEvent, TimeEventCallback},
195 };
196
197 #[pyfunction]
198 const fn receive_event(_py: Python, _event: TimeEvent) -> PyResult<()> {
199 Ok(())
201 }
202
203 #[tokio::test]
204 async fn test_live_timer_starts_and_stops() {
205 pyo3::prepare_freethreaded_python();
206
207 let callback = Python::with_gil(|py| {
208 let callable = wrap_pyfunction!(receive_event, py).unwrap();
209 let callable = callable.into_py_any_unwrap(py);
210 TimeEventCallback::from(callable)
211 });
212
213 let clock = get_atomic_clock_realtime();
215 let start_time = clock.get_time_ns();
216 let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
217
218 #[cfg(not(feature = "clock_v2"))]
219 let mut timer = LiveTimer::new("TEST_TIMER", interval_ns, start_time, None, callback);
220
221 #[cfg(feature = "clock_v2")]
222 let (_heap, mut timer) = {
223 let heap = Arc::new(Mutex::new(BinaryHeap::new()));
224 (
225 heap.clone(),
226 LiveTimer::new("TEST_TIMER", interval_ns, start_time, None, callback, heap),
227 )
228 };
229 let next_time_ns = timer.next_time_ns();
230 timer.start();
231
232 tokio::time::sleep(Duration::from_millis(300)).await;
234
235 timer.cancel();
236 wait_until(|| timer.is_expired(), Duration::from_secs(2));
237 assert!(timer.next_time_ns() > next_time_ns);
238 }
239
240 #[tokio::test]
241 async fn test_live_timer_with_stop_time() {
242 pyo3::prepare_freethreaded_python();
243
244 let callback = Python::with_gil(|py| {
245 let callable = wrap_pyfunction!(receive_event, py).unwrap();
246 let callable = callable.into_py_any_unwrap(py);
247 TimeEventCallback::from(callable)
248 });
249
250 let clock = get_atomic_clock_realtime();
252 let start_time = clock.get_time_ns();
253 let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
254 let stop_time = start_time + 500 * NANOSECONDS_IN_MILLISECOND;
255
256 #[cfg(not(feature = "clock_v2"))]
257 let mut timer = LiveTimer::new(
258 "TEST_TIMER",
259 interval_ns,
260 start_time,
261 Some(stop_time),
262 callback,
263 );
264
265 #[cfg(feature = "clock_v2")]
266 let (_heap, mut timer) = {
267 let heap = Arc::new(Mutex::new(BinaryHeap::new()));
268 (
269 heap.clone(),
270 LiveTimer::new(
271 "TEST_TIMER",
272 interval_ns,
273 start_time,
274 Some(stop_time),
275 callback,
276 heap,
277 ),
278 )
279 };
280
281 let next_time_ns = timer.next_time_ns();
282 timer.start();
283
284 tokio::time::sleep(Duration::from_secs(1)).await;
286
287 wait_until(|| timer.is_expired(), Duration::from_secs(2));
288 assert!(timer.next_time_ns() > next_time_ns);
289 }
290
291 #[tokio::test]
292 async fn test_live_timer_with_zero_interval_and_immediate_stop_time() {
293 pyo3::prepare_freethreaded_python();
294
295 let callback = Python::with_gil(|py| {
296 let callable = wrap_pyfunction!(receive_event, py).unwrap();
297 let callable = callable.into_py_any_unwrap(py);
298 TimeEventCallback::from(callable)
299 });
300
301 let clock = get_atomic_clock_realtime();
303 let start_time = UnixNanos::default();
304 let interval_ns = NonZeroU64::new(1).unwrap();
305 let stop_time = clock.get_time_ns();
306
307 #[cfg(not(feature = "clock_v2"))]
308 let mut timer = LiveTimer::new(
309 "TEST_TIMER",
310 interval_ns,
311 start_time,
312 Some(stop_time),
313 callback,
314 );
315
316 #[cfg(feature = "clock_v2")]
317 let (_heap, mut timer) = {
318 let heap = Arc::new(Mutex::new(BinaryHeap::new()));
319 (
320 heap.clone(),
321 LiveTimer::new(
322 "TEST_TIMER",
323 interval_ns,
324 start_time,
325 Some(stop_time),
326 callback,
327 heap,
328 ),
329 )
330 };
331
332 timer.start();
333
334 wait_until(|| timer.is_expired(), Duration::from_secs(2));
335 }
336}