1use std::{str::FromStr, sync::Arc};
17
18use nautilus_core::{python::to_pyvalue_err, UnixNanos, UUID4};
19use pyo3::{
20 basic::CompareOp,
21 prelude::*,
22 types::{PyLong, PyString, PyTuple},
23};
24use ustr::Ustr;
25
26use crate::timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2};
27
28#[pyo3::pyclass(
29 module = "nautilus_trader.core.nautilus_pyo3.common",
30 name = "TimeEventHandler"
31)]
32#[derive(Clone)]
33#[allow(non_camel_case_types)]
40pub struct TimeEventHandler_Py {
41 pub event: TimeEvent,
43 pub callback: Arc<PyObject>,
45}
46
47impl From<TimeEventHandlerV2> for TimeEventHandler_Py {
48 fn from(value: TimeEventHandlerV2) -> Self {
49 Self {
50 event: value.event,
51 callback: match value.callback {
52 TimeEventCallback::Python(callback) => callback,
53 TimeEventCallback::Rust(_) => {
54 panic!("Python time event handler is not supported for Rust callback")
55 }
56 },
57 }
58 }
59}
60
61#[pymethods]
62impl TimeEvent {
63 #[new]
64 fn py_new(name: &str, event_id: UUID4, ts_event: u64, ts_init: u64) -> Self {
65 Self::new(Ustr::from(name), event_id, ts_event.into(), ts_init.into())
66 }
67
68 fn __setstate__(&mut self, state: &Bound<'_, PyAny>) -> PyResult<()> {
69 let py_tuple: &Bound<'_, PyTuple> = state.downcast::<PyTuple>()?;
70
71 let ts_event = py_tuple
72 .get_item(2)?
73 .downcast::<PyLong>()?
74 .extract::<u64>()?;
75 let ts_init: u64 = py_tuple
76 .get_item(3)?
77 .downcast::<PyLong>()?
78 .extract::<u64>()?;
79
80 self.name = Ustr::from(
81 py_tuple
82 .get_item(0)?
83 .downcast::<PyString>()?
84 .extract::<&str>()?,
85 );
86 self.event_id = UUID4::from_str(
87 py_tuple
88 .get_item(1)?
89 .downcast::<PyString>()?
90 .extract::<&str>()?,
91 )
92 .map_err(to_pyvalue_err)?;
93 self.ts_event = ts_event.into();
94 self.ts_init = ts_init.into();
95
96 Ok(())
97 }
98
99 fn __getstate__(&self, py: Python) -> PyResult<PyObject> {
100 Ok((
101 self.name.to_string(),
102 self.event_id.to_string(),
103 self.ts_event.as_u64(),
104 self.ts_init.as_u64(),
105 )
106 .to_object(py))
107 }
108
109 fn __reduce__(&self, py: Python) -> PyResult<PyObject> {
110 let safe_constructor = py.get_type::<Self>().getattr("_safe_constructor")?;
111 let state = self.__getstate__(py)?;
112 Ok((safe_constructor, PyTuple::empty(py), state).to_object(py))
113 }
114
115 #[staticmethod]
116 fn _safe_constructor() -> Self {
117 Self::new(
118 Ustr::from("NULL"),
119 UUID4::new(),
120 UnixNanos::default(),
121 UnixNanos::default(),
122 )
123 }
124
125 fn __richcmp__(&self, other: &Self, op: CompareOp, py: Python<'_>) -> Py<PyAny> {
126 match op {
127 CompareOp::Eq => self.eq(other).into_py(py),
128 CompareOp::Ne => self.ne(other).into_py(py),
129 _ => py.NotImplemented(),
130 }
131 }
132
133 fn __repr__(&self) -> String {
134 format!("{}('{}')", stringify!(TimeEvent), self)
135 }
136
137 fn __str__(&self) -> String {
138 self.to_string()
139 }
140
141 #[getter]
142 #[pyo3(name = "name")]
143 fn py_name(&self) -> String {
144 self.name.to_string()
145 }
146
147 #[getter]
148 #[pyo3(name = "event_id")]
149 const fn py_event_id(&self) -> UUID4 {
150 self.event_id
151 }
152
153 #[getter]
154 #[pyo3(name = "ts_event")]
155 const fn py_ts_event(&self) -> u64 {
156 self.ts_event.as_u64()
157 }
158
159 #[getter]
160 #[pyo3(name = "ts_init")]
161 const fn py_ts_init(&self) -> u64 {
162 self.ts_init.as_u64()
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 #[rustfmt::skip]
169 #[cfg(feature = "clock_v2")]
170 use std::collections::BinaryHeap;
171
172 use std::num::NonZeroU64;
173 #[rustfmt::skip]
174 #[cfg(feature = "clock_v2")]
175 use std::sync::Arc;
176
177 #[rustfmt::skip]
178 #[cfg(feature = "clock_v2")]
179 use tokio::sync::Mutex;
180
181 use nautilus_core::{
182 datetime::NANOSECONDS_IN_MILLISECOND, time::get_atomic_clock_realtime, UnixNanos,
183 };
184 use pyo3::prelude::*;
185 use tokio::time::Duration;
186
187 use crate::{
188 testing::wait_until,
189 timer::{LiveTimer, TimeEvent, TimeEventCallback},
190 };
191
192 #[pyfunction]
193 const fn receive_event(_py: Python, _event: TimeEvent) -> PyResult<()> {
194 Ok(())
196 }
197
198 #[tokio::test]
199 async fn test_live_timer_starts_and_stops() {
200 pyo3::prepare_freethreaded_python();
201
202 let callback = Python::with_gil(|py| {
203 let callable = wrap_pyfunction_bound!(receive_event, py).unwrap();
204 TimeEventCallback::from(callable.into_py(py))
205 });
206
207 let clock = get_atomic_clock_realtime();
209 let start_time = clock.get_time_ns();
210 let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
211
212 #[cfg(not(feature = "clock_v2"))]
213 let mut timer = LiveTimer::new("TEST_TIMER", interval_ns, start_time, None, callback);
214
215 #[cfg(feature = "clock_v2")]
216 let (_heap, mut timer) = {
217 let heap = Arc::new(Mutex::new(BinaryHeap::new()));
218 (
219 heap.clone(),
220 LiveTimer::new("TEST_TIMER", interval_ns, start_time, None, callback, heap),
221 )
222 };
223 let next_time_ns = timer.next_time_ns();
224 timer.start();
225
226 tokio::time::sleep(Duration::from_millis(300)).await;
228
229 timer.cancel();
230 wait_until(|| timer.is_expired(), Duration::from_secs(2));
231 assert!(timer.next_time_ns() > next_time_ns);
232 }
233
234 #[tokio::test]
235 async fn test_live_timer_with_stop_time() {
236 pyo3::prepare_freethreaded_python();
237
238 let callback = Python::with_gil(|py| {
239 let callable = wrap_pyfunction_bound!(receive_event, py).unwrap();
240 TimeEventCallback::from(callable.into_py(py))
241 });
242
243 let clock = get_atomic_clock_realtime();
245 let start_time = clock.get_time_ns();
246 let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
247 let stop_time = start_time + 500 * NANOSECONDS_IN_MILLISECOND;
248
249 #[cfg(not(feature = "clock_v2"))]
250 let mut timer = LiveTimer::new(
251 "TEST_TIMER",
252 interval_ns,
253 start_time,
254 Some(stop_time),
255 callback,
256 );
257
258 #[cfg(feature = "clock_v2")]
259 let (_heap, mut timer) = {
260 let heap = Arc::new(Mutex::new(BinaryHeap::new()));
261 (
262 heap.clone(),
263 LiveTimer::new(
264 "TEST_TIMER",
265 interval_ns,
266 start_time,
267 Some(stop_time),
268 callback,
269 heap,
270 ),
271 )
272 };
273
274 let next_time_ns = timer.next_time_ns();
275 timer.start();
276
277 tokio::time::sleep(Duration::from_secs(1)).await;
279
280 wait_until(|| timer.is_expired(), Duration::from_secs(2));
281 assert!(timer.next_time_ns() > next_time_ns);
282 }
283
284 #[tokio::test]
285 async fn test_live_timer_with_zero_interval_and_immediate_stop_time() {
286 pyo3::prepare_freethreaded_python();
287
288 let callback = Python::with_gil(|py| {
289 let callable = wrap_pyfunction_bound!(receive_event, py).unwrap();
290 TimeEventCallback::from(callable.into_py(py))
291 });
292
293 let clock = get_atomic_clock_realtime();
295 let start_time = UnixNanos::default();
296 let interval_ns = NonZeroU64::new(1).unwrap();
297 let stop_time = clock.get_time_ns();
298
299 #[cfg(not(feature = "clock_v2"))]
300 let mut timer = LiveTimer::new(
301 "TEST_TIMER",
302 interval_ns,
303 start_time,
304 Some(stop_time),
305 callback,
306 );
307
308 #[cfg(feature = "clock_v2")]
309 let (_heap, mut timer) = {
310 let heap = Arc::new(Mutex::new(BinaryHeap::new()));
311 (
312 heap.clone(),
313 LiveTimer::new(
314 "TEST_TIMER",
315 interval_ns,
316 start_time,
317 Some(stop_time),
318 callback,
319 heap,
320 ),
321 )
322 };
323
324 timer.start();
325
326 wait_until(|| timer.is_expired(), Duration::from_secs(2));
327 }
328}