nautilus_common/python/
timer.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{str::FromStr, sync::Arc};
17
18use nautilus_core::{
19    UUID4, UnixNanos,
20    python::{IntoPyObjectNautilusExt, to_pyvalue_err},
21};
22use pyo3::{
23    IntoPyObjectExt,
24    basic::CompareOp,
25    prelude::*,
26    types::{PyInt, PyString, PyTuple},
27};
28use ustr::Ustr;
29
30use crate::timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2};
31
32#[pyo3::pyclass(
33    module = "nautilus_trader.core.nautilus_pyo3.common",
34    name = "TimeEventHandler"
35)]
36#[derive(Clone)]
37/// Temporary time event handler for Python inter-operatbility
38///
39/// TODO: Remove once control flow moves into Rust
40///
41/// `TimeEventHandler` associates a `TimeEvent` with a callback function that is triggered
42/// when the event's timestamp is reached.
43#[allow(non_camel_case_types)]
44pub struct TimeEventHandler_Py {
45    /// The time event.
46    pub event: TimeEvent,
47    /// The callable python object.
48    pub callback: Arc<PyObject>,
49}
50
51impl From<TimeEventHandlerV2> for TimeEventHandler_Py {
52    fn from(value: TimeEventHandlerV2) -> Self {
53        Self {
54            event: value.event,
55            callback: match value.callback {
56                TimeEventCallback::Python(callback) => callback,
57                TimeEventCallback::Rust(_) => {
58                    panic!("Python time event handler is not supported for Rust callback")
59                }
60            },
61        }
62    }
63}
64
65#[pymethods]
66impl TimeEvent {
67    #[new]
68    fn py_new(name: &str, event_id: UUID4, ts_event: u64, ts_init: u64) -> Self {
69        Self::new(Ustr::from(name), event_id, ts_event.into(), ts_init.into())
70    }
71
72    fn __setstate__(&mut self, state: &Bound<'_, PyAny>) -> PyResult<()> {
73        let py_tuple: &Bound<'_, PyTuple> = state.downcast::<PyTuple>()?;
74
75        let ts_event = py_tuple
76            .get_item(2)?
77            .downcast::<PyInt>()?
78            .extract::<u64>()?;
79        let ts_init: u64 = py_tuple
80            .get_item(3)?
81            .downcast::<PyInt>()?
82            .extract::<u64>()?;
83
84        self.name = Ustr::from(
85            py_tuple
86                .get_item(0)?
87                .downcast::<PyString>()?
88                .extract::<&str>()?,
89        );
90        self.event_id = UUID4::from_str(
91            py_tuple
92                .get_item(1)?
93                .downcast::<PyString>()?
94                .extract::<&str>()?,
95        )
96        .map_err(to_pyvalue_err)?;
97        self.ts_event = ts_event.into();
98        self.ts_init = ts_init.into();
99
100        Ok(())
101    }
102
103    fn __getstate__(&self, py: Python) -> PyResult<PyObject> {
104        (
105            self.name.to_string(),
106            self.event_id.to_string(),
107            self.ts_event.as_u64(),
108            self.ts_init.as_u64(),
109        )
110            .into_py_any(py)
111    }
112
113    fn __reduce__(&self, py: Python) -> PyResult<PyObject> {
114        let safe_constructor = py.get_type::<Self>().getattr("_safe_constructor")?;
115        let state = self.__getstate__(py)?;
116        (safe_constructor, PyTuple::empty(py), state).into_py_any(py)
117    }
118
119    #[staticmethod]
120    fn _safe_constructor() -> Self {
121        Self::new(
122            Ustr::from("NULL"),
123            UUID4::new(),
124            UnixNanos::default(),
125            UnixNanos::default(),
126        )
127    }
128
129    fn __richcmp__(&self, other: &Self, op: CompareOp, py: Python<'_>) -> Py<PyAny> {
130        match op {
131            CompareOp::Eq => self.eq(other).into_py_any_unwrap(py),
132            CompareOp::Ne => self.ne(other).into_py_any_unwrap(py),
133            _ => py.NotImplemented(),
134        }
135    }
136
137    fn __repr__(&self) -> String {
138        format!("{}('{}')", stringify!(TimeEvent), self)
139    }
140
141    fn __str__(&self) -> String {
142        self.to_string()
143    }
144
145    #[getter]
146    #[pyo3(name = "name")]
147    fn py_name(&self) -> String {
148        self.name.to_string()
149    }
150
151    #[getter]
152    #[pyo3(name = "event_id")]
153    const fn py_event_id(&self) -> UUID4 {
154        self.event_id
155    }
156
157    #[getter]
158    #[pyo3(name = "ts_event")]
159    const fn py_ts_event(&self) -> u64 {
160        self.ts_event.as_u64()
161    }
162
163    #[getter]
164    #[pyo3(name = "ts_init")]
165    const fn py_ts_init(&self) -> u64 {
166        self.ts_init.as_u64()
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    #[rustfmt::skip]
173    #[cfg(feature = "clock_v2")]
174    use std::collections::BinaryHeap;
175
176    use std::num::NonZeroU64;
177    #[rustfmt::skip]
178    #[cfg(feature = "clock_v2")]
179    use std::sync::Arc;
180
181    #[rustfmt::skip]
182    #[cfg(feature = "clock_v2")]
183    use tokio::sync::Mutex;
184
185    use nautilus_core::{
186        UnixNanos, datetime::NANOSECONDS_IN_MILLISECOND, python::IntoPyObjectNautilusExt,
187        time::get_atomic_clock_realtime,
188    };
189    use pyo3::prelude::*;
190    use tokio::time::Duration;
191
192    use crate::{
193        testing::wait_until,
194        timer::{LiveTimer, TimeEvent, TimeEventCallback},
195    };
196
197    #[pyfunction]
198    const fn receive_event(_py: Python, _event: TimeEvent) -> PyResult<()> {
199        // TODO: Assert the length of a handler vec
200        Ok(())
201    }
202
203    #[tokio::test]
204    async fn test_live_timer_starts_and_stops() {
205        pyo3::prepare_freethreaded_python();
206
207        let callback = Python::with_gil(|py| {
208            let callable = wrap_pyfunction!(receive_event, py).unwrap();
209            let callable = callable.into_py_any_unwrap(py);
210            TimeEventCallback::from(callable)
211        });
212
213        // Create a new LiveTimer with no stop time
214        let clock = get_atomic_clock_realtime();
215        let start_time = clock.get_time_ns();
216        let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
217
218        #[cfg(not(feature = "clock_v2"))]
219        let mut timer = LiveTimer::new("TEST_TIMER", interval_ns, start_time, None, callback);
220
221        #[cfg(feature = "clock_v2")]
222        let (_heap, mut timer) = {
223            let heap = Arc::new(Mutex::new(BinaryHeap::new()));
224            (
225                heap.clone(),
226                LiveTimer::new("TEST_TIMER", interval_ns, start_time, None, callback, heap),
227            )
228        };
229        let next_time_ns = timer.next_time_ns();
230        timer.start();
231
232        // Wait for timer to run
233        tokio::time::sleep(Duration::from_millis(300)).await;
234
235        timer.cancel();
236        wait_until(|| timer.is_expired(), Duration::from_secs(2));
237        assert!(timer.next_time_ns() > next_time_ns);
238    }
239
240    #[tokio::test]
241    async fn test_live_timer_with_stop_time() {
242        pyo3::prepare_freethreaded_python();
243
244        let callback = Python::with_gil(|py| {
245            let callable = wrap_pyfunction!(receive_event, py).unwrap();
246            let callable = callable.into_py_any_unwrap(py);
247            TimeEventCallback::from(callable)
248        });
249
250        // Create a new LiveTimer with a stop time
251        let clock = get_atomic_clock_realtime();
252        let start_time = clock.get_time_ns();
253        let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
254        let stop_time = start_time + 500 * NANOSECONDS_IN_MILLISECOND;
255
256        #[cfg(not(feature = "clock_v2"))]
257        let mut timer = LiveTimer::new(
258            "TEST_TIMER",
259            interval_ns,
260            start_time,
261            Some(stop_time),
262            callback,
263        );
264
265        #[cfg(feature = "clock_v2")]
266        let (_heap, mut timer) = {
267            let heap = Arc::new(Mutex::new(BinaryHeap::new()));
268            (
269                heap.clone(),
270                LiveTimer::new(
271                    "TEST_TIMER",
272                    interval_ns,
273                    start_time,
274                    Some(stop_time),
275                    callback,
276                    heap,
277                ),
278            )
279        };
280
281        let next_time_ns = timer.next_time_ns();
282        timer.start();
283
284        // Wait for a longer time than the stop time
285        tokio::time::sleep(Duration::from_secs(1)).await;
286
287        wait_until(|| timer.is_expired(), Duration::from_secs(2));
288        assert!(timer.next_time_ns() > next_time_ns);
289    }
290
291    #[tokio::test]
292    async fn test_live_timer_with_zero_interval_and_immediate_stop_time() {
293        pyo3::prepare_freethreaded_python();
294
295        let callback = Python::with_gil(|py| {
296            let callable = wrap_pyfunction!(receive_event, py).unwrap();
297            let callable = callable.into_py_any_unwrap(py);
298            TimeEventCallback::from(callable)
299        });
300
301        // Create a new LiveTimer with a stop time
302        let clock = get_atomic_clock_realtime();
303        let start_time = UnixNanos::default();
304        let interval_ns = NonZeroU64::new(1).unwrap();
305        let stop_time = clock.get_time_ns();
306
307        #[cfg(not(feature = "clock_v2"))]
308        let mut timer = LiveTimer::new(
309            "TEST_TIMER",
310            interval_ns,
311            start_time,
312            Some(stop_time),
313            callback,
314        );
315
316        #[cfg(feature = "clock_v2")]
317        let (_heap, mut timer) = {
318            let heap = Arc::new(Mutex::new(BinaryHeap::new()));
319            (
320                heap.clone(),
321                LiveTimer::new(
322                    "TEST_TIMER",
323                    interval_ns,
324                    start_time,
325                    Some(stop_time),
326                    callback,
327                    heap,
328                ),
329            )
330        };
331
332        timer.start();
333
334        wait_until(|| timer.is_expired(), Duration::from_secs(2));
335    }
336}