1use std::str::FromStr;
17
18use nautilus_core::{
19 UUID4, UnixNanos,
20 python::{IntoPyObjectNautilusExt, to_pyvalue_err},
21};
22use pyo3::{
23 IntoPyObjectExt,
24 basic::CompareOp,
25 prelude::*,
26 types::{PyInt, PyString, PyTuple},
27};
28use ustr::Ustr;
29
30use crate::timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2};
31
32#[pyo3::pyclass(
33 module = "nautilus_trader.core.nautilus_pyo3.common",
34 name = "TimeEventHandler"
35)]
36#[allow(non_camel_case_types)]
43#[derive(Debug)]
44pub struct TimeEventHandler_Py {
45 pub event: TimeEvent,
47 pub callback: PyObject,
49}
50
51impl From<TimeEventHandlerV2> for TimeEventHandler_Py {
52 fn from(value: TimeEventHandlerV2) -> Self {
57 Self {
58 event: value.event,
59 callback: match value.callback {
60 #[cfg(feature = "python")]
61 TimeEventCallback::Python(callback) => callback,
62 TimeEventCallback::Rust(_) => {
63 panic!("Python time event handler is not supported for Rust callback")
64 }
65 },
66 }
67 }
68}
69
70#[pymethods]
71impl TimeEvent {
72 #[new]
73 fn py_new(name: &str, event_id: UUID4, ts_event: u64, ts_init: u64) -> Self {
74 Self::new(Ustr::from(name), event_id, ts_event.into(), ts_init.into())
75 }
76
77 fn __setstate__(&mut self, state: &Bound<'_, PyAny>) -> PyResult<()> {
78 let py_tuple: &Bound<'_, PyTuple> = state.downcast::<PyTuple>()?;
79
80 let ts_event = py_tuple
81 .get_item(2)?
82 .downcast::<PyInt>()?
83 .extract::<u64>()?;
84 let ts_init: u64 = py_tuple
85 .get_item(3)?
86 .downcast::<PyInt>()?
87 .extract::<u64>()?;
88
89 self.name = Ustr::from(
90 py_tuple
91 .get_item(0)?
92 .downcast::<PyString>()?
93 .extract::<&str>()?,
94 );
95 self.event_id = UUID4::from_str(
96 py_tuple
97 .get_item(1)?
98 .downcast::<PyString>()?
99 .extract::<&str>()?,
100 )
101 .map_err(to_pyvalue_err)?;
102 self.ts_event = ts_event.into();
103 self.ts_init = ts_init.into();
104
105 Ok(())
106 }
107
108 fn __getstate__(&self, py: Python) -> PyResult<PyObject> {
109 (
110 self.name.to_string(),
111 self.event_id.to_string(),
112 self.ts_event.as_u64(),
113 self.ts_init.as_u64(),
114 )
115 .into_py_any(py)
116 }
117
118 fn __reduce__(&self, py: Python) -> PyResult<PyObject> {
119 let safe_constructor = py.get_type::<Self>().getattr("_safe_constructor")?;
120 let state = self.__getstate__(py)?;
121 (safe_constructor, PyTuple::empty(py), state).into_py_any(py)
122 }
123
124 #[staticmethod]
125 fn _safe_constructor() -> Self {
126 Self::new(
127 Ustr::from("NULL"),
128 UUID4::new(),
129 UnixNanos::default(),
130 UnixNanos::default(),
131 )
132 }
133
134 fn __richcmp__(&self, other: &Self, op: CompareOp, py: Python<'_>) -> Py<PyAny> {
135 match op {
136 CompareOp::Eq => self.eq(other).into_py_any_unwrap(py),
137 CompareOp::Ne => self.ne(other).into_py_any_unwrap(py),
138 _ => py.NotImplemented(),
139 }
140 }
141
142 fn __repr__(&self) -> String {
143 self.to_string()
144 }
145
146 #[getter]
147 #[pyo3(name = "name")]
148 fn py_name(&self) -> String {
149 self.name.to_string()
150 }
151
152 #[getter]
153 #[pyo3(name = "event_id")]
154 const fn py_event_id(&self) -> UUID4 {
155 self.event_id
156 }
157
158 #[getter]
159 #[pyo3(name = "ts_event")]
160 const fn py_ts_event(&self) -> u64 {
161 self.ts_event.as_u64()
162 }
163
164 #[getter]
165 #[pyo3(name = "ts_init")]
166 const fn py_ts_init(&self) -> u64 {
167 self.ts_init.as_u64()
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use std::{num::NonZeroU64, sync::Arc};
174
175 use nautilus_core::{
176 UnixNanos, datetime::NANOSECONDS_IN_MILLISECOND, python::IntoPyObjectNautilusExt,
177 time::get_atomic_clock_realtime,
178 };
179 use pyo3::prelude::*;
180 use tokio::time::Duration;
181
182 use crate::{
183 runner::{TimeEventSender, set_time_event_sender},
184 testing::wait_until,
185 timer::{LiveTimer, TimeEvent, TimeEventCallback},
186 };
187
188 #[pyfunction]
189 const fn receive_event(_py: Python, _event: TimeEvent) -> PyResult<()> {
190 Ok(())
192 }
193
194 #[derive(Debug)]
195 struct TestTimeEventSender;
196
197 impl TimeEventSender for TestTimeEventSender {
198 fn send(&self, _handler: crate::timer::TimeEventHandlerV2) {
199 }
201 }
202
203 #[tokio::test]
204 async fn test_live_timer_starts_and_stops() {
205 pyo3::prepare_freethreaded_python();
206
207 set_time_event_sender(Arc::new(TestTimeEventSender));
208
209 let callback = Python::with_gil(|py| {
210 let callable = wrap_pyfunction!(receive_event, py).unwrap();
211 let callable = callable.into_py_any_unwrap(py);
212 TimeEventCallback::from(callable)
213 });
214
215 let clock = get_atomic_clock_realtime();
217 let start_time = clock.get_time_ns();
218 let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
219
220 let test_sender = Arc::new(TestTimeEventSender);
221 let mut timer = LiveTimer::new(
222 "TEST_TIMER".into(),
223 interval_ns,
224 start_time,
225 None,
226 callback,
227 false,
228 Some(test_sender),
229 );
230
231 let next_time_ns = timer.next_time_ns();
232 timer.start();
233
234 tokio::time::sleep(Duration::from_millis(300)).await;
236
237 timer.cancel();
238 wait_until(|| timer.is_expired(), Duration::from_secs(2));
239 assert!(timer.next_time_ns() > next_time_ns);
240 }
241
242 #[tokio::test]
243 async fn test_live_timer_with_stop_time() {
244 pyo3::prepare_freethreaded_python();
245
246 set_time_event_sender(Arc::new(TestTimeEventSender));
247
248 let callback = Python::with_gil(|py| {
249 let callable = wrap_pyfunction!(receive_event, py).unwrap();
250 let callable = callable.into_py_any_unwrap(py);
251 TimeEventCallback::from(callable)
252 });
253
254 let clock = get_atomic_clock_realtime();
256 let start_time = clock.get_time_ns();
257 let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
258 let stop_time = start_time + 500 * NANOSECONDS_IN_MILLISECOND;
259
260 let test_sender = Arc::new(TestTimeEventSender);
261 let mut timer = LiveTimer::new(
262 "TEST_TIMER".into(),
263 interval_ns,
264 start_time,
265 Some(stop_time),
266 callback,
267 false,
268 Some(test_sender),
269 );
270
271 let next_time_ns = timer.next_time_ns();
272 timer.start();
273
274 tokio::time::sleep(Duration::from_secs(1)).await;
276
277 wait_until(|| timer.is_expired(), Duration::from_secs(2));
278 assert!(timer.next_time_ns() > next_time_ns);
279 }
280
281 #[tokio::test]
282 async fn test_live_timer_with_zero_interval_and_immediate_stop_time() {
283 pyo3::prepare_freethreaded_python();
284
285 set_time_event_sender(Arc::new(TestTimeEventSender));
286
287 let callback = Python::with_gil(|py| {
288 let callable = wrap_pyfunction!(receive_event, py).unwrap();
289 let callable = callable.into_py_any_unwrap(py);
290 TimeEventCallback::from(callable)
291 });
292
293 let clock = get_atomic_clock_realtime();
295 let start_time = UnixNanos::default();
296 let interval_ns = NonZeroU64::new(1).unwrap();
297 let stop_time = clock.get_time_ns();
298
299 let test_sender = Arc::new(TestTimeEventSender);
300 let mut timer = LiveTimer::new(
301 "TEST_TIMER".into(),
302 interval_ns,
303 start_time,
304 Some(stop_time),
305 callback,
306 false,
307 Some(test_sender),
308 );
309
310 timer.start();
311
312 wait_until(|| timer.is_expired(), Duration::from_secs(2));
313 }
314}