1use std::str::FromStr;
17
18use nautilus_core::{
19 UUID4, UnixNanos,
20 python::{IntoPyObjectNautilusExt, to_pyvalue_err},
21};
22use pyo3::{
23 IntoPyObjectExt,
24 basic::CompareOp,
25 prelude::*,
26 types::{PyInt, PyString, PyTuple},
27};
28use ustr::Ustr;
29
30use crate::timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2};
31
32#[pyo3::pyclass(
33 module = "nautilus_trader.core.nautilus_pyo3.common",
34 name = "TimeEventHandler"
35)]
36#[allow(non_camel_case_types)]
43#[derive(Debug)]
44pub struct TimeEventHandler_Py {
45 pub event: TimeEvent,
47 pub callback: Py<PyAny>,
49}
50
51impl From<TimeEventHandlerV2> for TimeEventHandler_Py {
52 fn from(value: TimeEventHandlerV2) -> Self {
57 Self {
58 event: value.event,
59 callback: match value.callback {
60 #[cfg(feature = "python")]
61 TimeEventCallback::Python(callback) => callback,
62 TimeEventCallback::Rust(_) => {
63 panic!("Python time event handler is not supported for Rust callbacks")
64 }
65 },
66 }
67 }
68}
69
70#[pymethods]
71impl TimeEvent {
72 #[new]
73 fn py_new(name: &str, event_id: UUID4, ts_event: u64, ts_init: u64) -> Self {
74 Self::new(Ustr::from(name), event_id, ts_event.into(), ts_init.into())
75 }
76
77 fn __setstate__(&mut self, state: &Bound<'_, PyAny>) -> PyResult<()> {
78 let py_tuple: &Bound<'_, PyTuple> = state.cast::<PyTuple>()?;
79
80 let ts_event = py_tuple.get_item(2)?.cast::<PyInt>()?.extract::<u64>()?;
81 let ts_init: u64 = py_tuple.get_item(3)?.cast::<PyInt>()?.extract::<u64>()?;
82
83 self.name = Ustr::from(
84 py_tuple
85 .get_item(0)?
86 .cast::<PyString>()?
87 .extract::<&str>()?,
88 );
89 self.event_id = UUID4::from_str(
90 py_tuple
91 .get_item(1)?
92 .cast::<PyString>()?
93 .extract::<&str>()?,
94 )
95 .map_err(to_pyvalue_err)?;
96 self.ts_event = ts_event.into();
97 self.ts_init = ts_init.into();
98
99 Ok(())
100 }
101
102 fn __getstate__(&self, py: Python) -> PyResult<Py<PyAny>> {
103 (
104 self.name.to_string(),
105 self.event_id.to_string(),
106 self.ts_event.as_u64(),
107 self.ts_init.as_u64(),
108 )
109 .into_py_any(py)
110 }
111
112 fn __reduce__(&self, py: Python) -> PyResult<Py<PyAny>> {
113 let safe_constructor = py.get_type::<Self>().getattr("_safe_constructor")?;
114 let state = self.__getstate__(py)?;
115 (safe_constructor, PyTuple::empty(py), state).into_py_any(py)
116 }
117
118 #[staticmethod]
119 fn _safe_constructor() -> Self {
120 Self::new(
121 Ustr::from("NULL"),
122 UUID4::new(),
123 UnixNanos::default(),
124 UnixNanos::default(),
125 )
126 }
127
128 fn __richcmp__(&self, other: &Self, op: CompareOp, py: Python<'_>) -> Py<PyAny> {
129 match op {
130 CompareOp::Eq => self.eq(other).into_py_any_unwrap(py),
131 CompareOp::Ne => self.ne(other).into_py_any_unwrap(py),
132 _ => py.NotImplemented(),
133 }
134 }
135
136 fn __repr__(&self) -> String {
137 self.to_string()
138 }
139
140 #[getter]
141 #[pyo3(name = "name")]
142 fn py_name(&self) -> String {
143 self.name.to_string()
144 }
145
146 #[getter]
147 #[pyo3(name = "event_id")]
148 const fn py_event_id(&self) -> UUID4 {
149 self.event_id
150 }
151
152 #[getter]
153 #[pyo3(name = "ts_event")]
154 const fn py_ts_event(&self) -> u64 {
155 self.ts_event.as_u64()
156 }
157
158 #[getter]
159 #[pyo3(name = "ts_init")]
160 const fn py_ts_init(&self) -> u64 {
161 self.ts_init.as_u64()
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use std::{num::NonZeroU64, sync::Arc};
168
169 use nautilus_core::{
170 UnixNanos, datetime::NANOSECONDS_IN_MILLISECOND, python::IntoPyObjectNautilusExt,
171 time::get_atomic_clock_realtime,
172 };
173 use pyo3::prelude::*;
174 use tokio::time::Duration;
175
176 use crate::{
177 runner::{TimeEventSender, set_time_event_sender},
178 testing::wait_until,
179 timer::{LiveTimer, TimeEvent, TimeEventCallback},
180 };
181
182 #[pyfunction]
183 const fn receive_event(_py: Python, _event: TimeEvent) -> PyResult<()> {
184 Ok(())
186 }
187
188 #[derive(Debug)]
189 struct TestTimeEventSender;
190
191 impl TimeEventSender for TestTimeEventSender {
192 fn send(&self, _handler: crate::timer::TimeEventHandlerV2) {
193 }
195 }
196
197 #[tokio::test]
198 async fn test_live_timer_starts_and_stops() {
199 set_time_event_sender(Arc::new(TestTimeEventSender));
200
201 Python::initialize();
202 let callback = Python::attach(|py| {
203 let callable = wrap_pyfunction!(receive_event, py).unwrap();
204 let callable = callable.into_py_any_unwrap(py);
205 TimeEventCallback::from(callable)
206 });
207
208 let clock = get_atomic_clock_realtime();
210 let start_time = clock.get_time_ns();
211 let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
212
213 let test_sender = Arc::new(TestTimeEventSender);
214 let mut timer = LiveTimer::new(
215 "TEST_TIMER".into(),
216 interval_ns,
217 start_time,
218 None,
219 callback,
220 false,
221 Some(test_sender),
222 );
223
224 let next_time_ns = timer.next_time_ns();
225 timer.start();
226
227 tokio::time::sleep(Duration::from_millis(300)).await;
229
230 timer.cancel();
231 wait_until(|| timer.is_expired(), Duration::from_secs(2));
232 assert!(timer.next_time_ns() > next_time_ns);
233 }
234
235 #[tokio::test]
236 async fn test_live_timer_with_stop_time() {
237 set_time_event_sender(Arc::new(TestTimeEventSender));
238
239 Python::initialize();
240 let callback = Python::attach(|py| {
241 let callable = wrap_pyfunction!(receive_event, py).unwrap();
242 let callable = callable.into_py_any_unwrap(py);
243 TimeEventCallback::from(callable)
244 });
245
246 let clock = get_atomic_clock_realtime();
248 let start_time = clock.get_time_ns();
249 let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
250 let stop_time = start_time + 500 * NANOSECONDS_IN_MILLISECOND;
251
252 let test_sender = Arc::new(TestTimeEventSender);
253 let mut timer = LiveTimer::new(
254 "TEST_TIMER".into(),
255 interval_ns,
256 start_time,
257 Some(stop_time),
258 callback,
259 false,
260 Some(test_sender),
261 );
262
263 let next_time_ns = timer.next_time_ns();
264 timer.start();
265
266 tokio::time::sleep(Duration::from_secs(1)).await;
268
269 wait_until(|| timer.is_expired(), Duration::from_secs(2));
270 assert!(timer.next_time_ns() > next_time_ns);
271 }
272
273 #[tokio::test]
274 async fn test_live_timer_with_zero_interval_and_immediate_stop_time() {
275 set_time_event_sender(Arc::new(TestTimeEventSender));
276
277 Python::initialize();
278 let callback = Python::attach(|py| {
279 let callable = wrap_pyfunction!(receive_event, py).unwrap();
280 let callable = callable.into_py_any_unwrap(py);
281 TimeEventCallback::from(callable)
282 });
283
284 let clock = get_atomic_clock_realtime();
286 let start_time = UnixNanos::default();
287 let interval_ns = NonZeroU64::new(1).unwrap();
288 let stop_time = clock.get_time_ns();
289
290 let test_sender = Arc::new(TestTimeEventSender);
291 let mut timer = LiveTimer::new(
292 "TEST_TIMER".into(),
293 interval_ns,
294 start_time,
295 Some(stop_time),
296 callback,
297 false,
298 Some(test_sender),
299 );
300
301 timer.start();
302
303 wait_until(|| timer.is_expired(), Duration::from_secs(2));
304 }
305}