nautilus_common/python/
timer.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{str::FromStr, sync::Arc};
17
18use nautilus_core::{
19    UUID4, UnixNanos,
20    python::{IntoPyObjectNautilusExt, to_pyvalue_err},
21};
22use pyo3::{
23    IntoPyObjectExt,
24    basic::CompareOp,
25    prelude::*,
26    types::{PyInt, PyString, PyTuple},
27};
28use ustr::Ustr;
29
30use crate::timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2};
31
32#[pyo3::pyclass(
33    module = "nautilus_trader.core.nautilus_pyo3.common",
34    name = "TimeEventHandler"
35)]
36#[derive(Clone)]
37/// Temporary time event handler for Python inter-operatbility
38///
39/// TODO: Remove once control flow moves into Rust
40///
41/// `TimeEventHandler` associates a `TimeEvent` with a callback function that is triggered
42/// when the event's timestamp is reached.
43#[allow(non_camel_case_types)]
44pub struct TimeEventHandler_Py {
45    /// The time event.
46    pub event: TimeEvent,
47    /// The callable python object.
48    pub callback: Arc<PyObject>,
49}
50
51impl From<TimeEventHandlerV2> for TimeEventHandler_Py {
52    fn from(value: TimeEventHandlerV2) -> Self {
53        Self {
54            event: value.event,
55            callback: match value.callback {
56                TimeEventCallback::Python(callback) => callback,
57                TimeEventCallback::Rust(_) => {
58                    panic!("Python time event handler is not supported for Rust callback")
59                }
60            },
61        }
62    }
63}
64
65#[pymethods]
66impl TimeEvent {
67    #[new]
68    fn py_new(name: &str, event_id: UUID4, ts_event: u64, ts_init: u64) -> Self {
69        Self::new(Ustr::from(name), event_id, ts_event.into(), ts_init.into())
70    }
71
72    fn __setstate__(&mut self, state: &Bound<'_, PyAny>) -> PyResult<()> {
73        let py_tuple: &Bound<'_, PyTuple> = state.downcast::<PyTuple>()?;
74
75        let ts_event = py_tuple
76            .get_item(2)?
77            .downcast::<PyInt>()?
78            .extract::<u64>()?;
79        let ts_init: u64 = py_tuple
80            .get_item(3)?
81            .downcast::<PyInt>()?
82            .extract::<u64>()?;
83
84        self.name = Ustr::from(
85            py_tuple
86                .get_item(0)?
87                .downcast::<PyString>()?
88                .extract::<&str>()?,
89        );
90        self.event_id = UUID4::from_str(
91            py_tuple
92                .get_item(1)?
93                .downcast::<PyString>()?
94                .extract::<&str>()?,
95        )
96        .map_err(to_pyvalue_err)?;
97        self.ts_event = ts_event.into();
98        self.ts_init = ts_init.into();
99
100        Ok(())
101    }
102
103    fn __getstate__(&self, py: Python) -> PyResult<PyObject> {
104        (
105            self.name.to_string(),
106            self.event_id.to_string(),
107            self.ts_event.as_u64(),
108            self.ts_init.as_u64(),
109        )
110            .into_py_any(py)
111    }
112
113    fn __reduce__(&self, py: Python) -> PyResult<PyObject> {
114        let safe_constructor = py.get_type::<Self>().getattr("_safe_constructor")?;
115        let state = self.__getstate__(py)?;
116        (safe_constructor, PyTuple::empty(py), state).into_py_any(py)
117    }
118
119    #[staticmethod]
120    fn _safe_constructor() -> Self {
121        Self::new(
122            Ustr::from("NULL"),
123            UUID4::new(),
124            UnixNanos::default(),
125            UnixNanos::default(),
126        )
127    }
128
129    fn __richcmp__(&self, other: &Self, op: CompareOp, py: Python<'_>) -> Py<PyAny> {
130        match op {
131            CompareOp::Eq => self.eq(other).into_py_any_unwrap(py),
132            CompareOp::Ne => self.ne(other).into_py_any_unwrap(py),
133            _ => py.NotImplemented(),
134        }
135    }
136
137    fn __repr__(&self) -> String {
138        format!("{}('{}')", stringify!(TimeEvent), self)
139    }
140
141    fn __str__(&self) -> String {
142        self.to_string()
143    }
144
145    #[getter]
146    #[pyo3(name = "name")]
147    fn py_name(&self) -> String {
148        self.name.to_string()
149    }
150
151    #[getter]
152    #[pyo3(name = "event_id")]
153    const fn py_event_id(&self) -> UUID4 {
154        self.event_id
155    }
156
157    #[getter]
158    #[pyo3(name = "ts_event")]
159    const fn py_ts_event(&self) -> u64 {
160        self.ts_event.as_u64()
161    }
162
163    #[getter]
164    #[pyo3(name = "ts_init")]
165    const fn py_ts_init(&self) -> u64 {
166        self.ts_init.as_u64()
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    #[rustfmt::skip]
173    #[cfg(feature = "clock_v2")]
174    use std::collections::BinaryHeap;
175
176    use std::num::NonZeroU64;
177    #[rustfmt::skip]
178    #[cfg(feature = "clock_v2")]
179    use std::sync::Arc;
180
181    #[rustfmt::skip]
182    #[cfg(feature = "clock_v2")]
183    use tokio::sync::Mutex;
184
185    use nautilus_core::{
186        UnixNanos, datetime::NANOSECONDS_IN_MILLISECOND, python::IntoPyObjectNautilusExt,
187        time::get_atomic_clock_realtime,
188    };
189    use pyo3::prelude::*;
190    use tokio::time::Duration;
191    use ustr::Ustr;
192
193    use crate::{
194        testing::wait_until,
195        timer::{LiveTimer, TimeEvent, TimeEventCallback},
196    };
197
198    #[pyfunction]
199    const fn receive_event(_py: Python, _event: TimeEvent) -> PyResult<()> {
200        // TODO: Assert the length of a handler vec
201        Ok(())
202    }
203
204    #[tokio::test]
205    async fn test_live_timer_starts_and_stops() {
206        pyo3::prepare_freethreaded_python();
207
208        let callback = Python::with_gil(|py| {
209            let callable = wrap_pyfunction!(receive_event, py).unwrap();
210            let callable = callable.into_py_any_unwrap(py);
211            TimeEventCallback::from(callable)
212        });
213
214        // Create a new LiveTimer with no stop time
215        let clock = get_atomic_clock_realtime();
216        let start_time = clock.get_time_ns();
217        let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
218
219        #[cfg(not(feature = "clock_v2"))]
220        let mut timer = LiveTimer::new(
221            Ustr::from("TEST_TIMER"),
222            interval_ns,
223            start_time,
224            None,
225            callback,
226        );
227
228        #[cfg(feature = "clock_v2")]
229        let (_heap, mut timer) = {
230            let heap = Arc::new(Mutex::new(BinaryHeap::new()));
231            (
232                heap.clone(),
233                LiveTimer::new("TEST_TIMER", interval_ns, start_time, None, callback, heap),
234            )
235        };
236        let next_time_ns = timer.next_time_ns();
237        timer.start();
238
239        // Wait for timer to run
240        tokio::time::sleep(Duration::from_millis(300)).await;
241
242        timer.cancel();
243        wait_until(|| timer.is_expired(), Duration::from_secs(2));
244        assert!(timer.next_time_ns() > next_time_ns);
245    }
246
247    #[tokio::test]
248    async fn test_live_timer_with_stop_time() {
249        pyo3::prepare_freethreaded_python();
250
251        let callback = Python::with_gil(|py| {
252            let callable = wrap_pyfunction!(receive_event, py).unwrap();
253            let callable = callable.into_py_any_unwrap(py);
254            TimeEventCallback::from(callable)
255        });
256
257        // Create a new LiveTimer with a stop time
258        let clock = get_atomic_clock_realtime();
259        let start_time = clock.get_time_ns();
260        let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
261        let stop_time = start_time + 500 * NANOSECONDS_IN_MILLISECOND;
262
263        #[cfg(not(feature = "clock_v2"))]
264        let mut timer = LiveTimer::new(
265            Ustr::from("TEST_TIMER"),
266            interval_ns,
267            start_time,
268            Some(stop_time),
269            callback,
270        );
271
272        #[cfg(feature = "clock_v2")]
273        let (_heap, mut timer) = {
274            let heap = Arc::new(Mutex::new(BinaryHeap::new()));
275            (
276                heap.clone(),
277                LiveTimer::new(
278                    "TEST_TIMER",
279                    interval_ns,
280                    start_time,
281                    Some(stop_time),
282                    callback,
283                    heap,
284                ),
285            )
286        };
287
288        let next_time_ns = timer.next_time_ns();
289        timer.start();
290
291        // Wait for a longer time than the stop time
292        tokio::time::sleep(Duration::from_secs(1)).await;
293
294        wait_until(|| timer.is_expired(), Duration::from_secs(2));
295        assert!(timer.next_time_ns() > next_time_ns);
296    }
297
298    #[tokio::test]
299    async fn test_live_timer_with_zero_interval_and_immediate_stop_time() {
300        pyo3::prepare_freethreaded_python();
301
302        let callback = Python::with_gil(|py| {
303            let callable = wrap_pyfunction!(receive_event, py).unwrap();
304            let callable = callable.into_py_any_unwrap(py);
305            TimeEventCallback::from(callable)
306        });
307
308        // Create a new LiveTimer with a stop time
309        let clock = get_atomic_clock_realtime();
310        let start_time = UnixNanos::default();
311        let interval_ns = NonZeroU64::new(1).unwrap();
312        let stop_time = clock.get_time_ns();
313
314        #[cfg(not(feature = "clock_v2"))]
315        let mut timer = LiveTimer::new(
316            Ustr::from("TEST_TIMER"),
317            interval_ns,
318            start_time,
319            Some(stop_time),
320            callback,
321        );
322
323        #[cfg(feature = "clock_v2")]
324        let (_heap, mut timer) = {
325            let heap = Arc::new(Mutex::new(BinaryHeap::new()));
326            (
327                heap.clone(),
328                LiveTimer::new(
329                    "TEST_TIMER",
330                    interval_ns,
331                    start_time,
332                    Some(stop_time),
333                    callback,
334                    heap,
335                ),
336            )
337        };
338
339        timer.start();
340
341        wait_until(|| timer.is_expired(), Duration::from_secs(2));
342    }
343}