nautilus_common/python/
timer.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{str::FromStr, sync::Arc};
17
18use nautilus_core::{python::to_pyvalue_err, UnixNanos, UUID4};
19use pyo3::{
20    basic::CompareOp,
21    prelude::*,
22    types::{PyLong, PyString, PyTuple},
23};
24use ustr::Ustr;
25
26use crate::timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2};
27
28#[pyo3::pyclass(
29    module = "nautilus_trader.core.nautilus_pyo3.common",
30    name = "TimeEventHandler"
31)]
32#[derive(Clone)]
33/// Temporary time event handler for Python inter-operatbility
34///
35/// TODO: Remove once control flow moves into Rust
36///
37/// `TimeEventHandler` associates a `TimeEvent` with a callback function that is triggered
38/// when the event's timestamp is reached.
39#[allow(non_camel_case_types)]
40pub struct TimeEventHandler_Py {
41    /// The time event.
42    pub event: TimeEvent,
43    /// The callable python object.
44    pub callback: Arc<PyObject>,
45}
46
47impl From<TimeEventHandlerV2> for TimeEventHandler_Py {
48    fn from(value: TimeEventHandlerV2) -> Self {
49        Self {
50            event: value.event,
51            callback: match value.callback {
52                TimeEventCallback::Python(callback) => callback,
53                TimeEventCallback::Rust(_) => {
54                    panic!("Python time event handler is not supported for Rust callback")
55                }
56            },
57        }
58    }
59}
60
61#[pymethods]
62impl TimeEvent {
63    #[new]
64    fn py_new(name: &str, event_id: UUID4, ts_event: u64, ts_init: u64) -> Self {
65        Self::new(Ustr::from(name), event_id, ts_event.into(), ts_init.into())
66    }
67
68    fn __setstate__(&mut self, state: &Bound<'_, PyAny>) -> PyResult<()> {
69        let py_tuple: &Bound<'_, PyTuple> = state.downcast::<PyTuple>()?;
70
71        let ts_event = py_tuple
72            .get_item(2)?
73            .downcast::<PyLong>()?
74            .extract::<u64>()?;
75        let ts_init: u64 = py_tuple
76            .get_item(3)?
77            .downcast::<PyLong>()?
78            .extract::<u64>()?;
79
80        self.name = Ustr::from(
81            py_tuple
82                .get_item(0)?
83                .downcast::<PyString>()?
84                .extract::<&str>()?,
85        );
86        self.event_id = UUID4::from_str(
87            py_tuple
88                .get_item(1)?
89                .downcast::<PyString>()?
90                .extract::<&str>()?,
91        )
92        .map_err(to_pyvalue_err)?;
93        self.ts_event = ts_event.into();
94        self.ts_init = ts_init.into();
95
96        Ok(())
97    }
98
99    fn __getstate__(&self, py: Python) -> PyResult<PyObject> {
100        Ok((
101            self.name.to_string(),
102            self.event_id.to_string(),
103            self.ts_event.as_u64(),
104            self.ts_init.as_u64(),
105        )
106            .to_object(py))
107    }
108
109    fn __reduce__(&self, py: Python) -> PyResult<PyObject> {
110        let safe_constructor = py.get_type::<Self>().getattr("_safe_constructor")?;
111        let state = self.__getstate__(py)?;
112        Ok((safe_constructor, PyTuple::empty(py), state).to_object(py))
113    }
114
115    #[staticmethod]
116    fn _safe_constructor() -> Self {
117        Self::new(
118            Ustr::from("NULL"),
119            UUID4::new(),
120            UnixNanos::default(),
121            UnixNanos::default(),
122        )
123    }
124
125    fn __richcmp__(&self, other: &Self, op: CompareOp, py: Python<'_>) -> Py<PyAny> {
126        match op {
127            CompareOp::Eq => self.eq(other).into_py(py),
128            CompareOp::Ne => self.ne(other).into_py(py),
129            _ => py.NotImplemented(),
130        }
131    }
132
133    fn __repr__(&self) -> String {
134        format!("{}('{}')", stringify!(TimeEvent), self)
135    }
136
137    fn __str__(&self) -> String {
138        self.to_string()
139    }
140
141    #[getter]
142    #[pyo3(name = "name")]
143    fn py_name(&self) -> String {
144        self.name.to_string()
145    }
146
147    #[getter]
148    #[pyo3(name = "event_id")]
149    const fn py_event_id(&self) -> UUID4 {
150        self.event_id
151    }
152
153    #[getter]
154    #[pyo3(name = "ts_event")]
155    const fn py_ts_event(&self) -> u64 {
156        self.ts_event.as_u64()
157    }
158
159    #[getter]
160    #[pyo3(name = "ts_init")]
161    const fn py_ts_init(&self) -> u64 {
162        self.ts_init.as_u64()
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    #[rustfmt::skip]
169    #[cfg(feature = "clock_v2")]
170    use std::collections::BinaryHeap;
171
172    use std::num::NonZeroU64;
173    #[rustfmt::skip]
174    #[cfg(feature = "clock_v2")]
175    use std::sync::Arc;
176
177    #[rustfmt::skip]
178    #[cfg(feature = "clock_v2")]
179    use tokio::sync::Mutex;
180
181    use nautilus_core::{
182        datetime::NANOSECONDS_IN_MILLISECOND, time::get_atomic_clock_realtime, UnixNanos,
183    };
184    use pyo3::prelude::*;
185    use tokio::time::Duration;
186
187    use crate::{
188        testing::wait_until,
189        timer::{LiveTimer, TimeEvent, TimeEventCallback},
190    };
191
192    #[pyfunction]
193    const fn receive_event(_py: Python, _event: TimeEvent) -> PyResult<()> {
194        // TODO: Assert the length of a handler vec
195        Ok(())
196    }
197
198    #[tokio::test]
199    async fn test_live_timer_starts_and_stops() {
200        pyo3::prepare_freethreaded_python();
201
202        let callback = Python::with_gil(|py| {
203            let callable = wrap_pyfunction_bound!(receive_event, py).unwrap();
204            TimeEventCallback::from(callable.into_py(py))
205        });
206
207        // Create a new LiveTimer with no stop time
208        let clock = get_atomic_clock_realtime();
209        let start_time = clock.get_time_ns();
210        let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
211
212        #[cfg(not(feature = "clock_v2"))]
213        let mut timer = LiveTimer::new("TEST_TIMER", interval_ns, start_time, None, callback);
214
215        #[cfg(feature = "clock_v2")]
216        let (_heap, mut timer) = {
217            let heap = Arc::new(Mutex::new(BinaryHeap::new()));
218            (
219                heap.clone(),
220                LiveTimer::new("TEST_TIMER", interval_ns, start_time, None, callback, heap),
221            )
222        };
223        let next_time_ns = timer.next_time_ns();
224        timer.start();
225
226        // Wait for timer to run
227        tokio::time::sleep(Duration::from_millis(300)).await;
228
229        timer.cancel();
230        wait_until(|| timer.is_expired(), Duration::from_secs(2));
231        assert!(timer.next_time_ns() > next_time_ns);
232    }
233
234    #[tokio::test]
235    async fn test_live_timer_with_stop_time() {
236        pyo3::prepare_freethreaded_python();
237
238        let callback = Python::with_gil(|py| {
239            let callable = wrap_pyfunction_bound!(receive_event, py).unwrap();
240            TimeEventCallback::from(callable.into_py(py))
241        });
242
243        // Create a new LiveTimer with a stop time
244        let clock = get_atomic_clock_realtime();
245        let start_time = clock.get_time_ns();
246        let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
247        let stop_time = start_time + 500 * NANOSECONDS_IN_MILLISECOND;
248
249        #[cfg(not(feature = "clock_v2"))]
250        let mut timer = LiveTimer::new(
251            "TEST_TIMER",
252            interval_ns,
253            start_time,
254            Some(stop_time),
255            callback,
256        );
257
258        #[cfg(feature = "clock_v2")]
259        let (_heap, mut timer) = {
260            let heap = Arc::new(Mutex::new(BinaryHeap::new()));
261            (
262                heap.clone(),
263                LiveTimer::new(
264                    "TEST_TIMER",
265                    interval_ns,
266                    start_time,
267                    Some(stop_time),
268                    callback,
269                    heap,
270                ),
271            )
272        };
273
274        let next_time_ns = timer.next_time_ns();
275        timer.start();
276
277        // Wait for a longer time than the stop time
278        tokio::time::sleep(Duration::from_secs(1)).await;
279
280        wait_until(|| timer.is_expired(), Duration::from_secs(2));
281        assert!(timer.next_time_ns() > next_time_ns);
282    }
283
284    #[tokio::test]
285    async fn test_live_timer_with_zero_interval_and_immediate_stop_time() {
286        pyo3::prepare_freethreaded_python();
287
288        let callback = Python::with_gil(|py| {
289            let callable = wrap_pyfunction_bound!(receive_event, py).unwrap();
290            TimeEventCallback::from(callable.into_py(py))
291        });
292
293        // Create a new LiveTimer with a stop time
294        let clock = get_atomic_clock_realtime();
295        let start_time = UnixNanos::default();
296        let interval_ns = NonZeroU64::new(1).unwrap();
297        let stop_time = clock.get_time_ns();
298
299        #[cfg(not(feature = "clock_v2"))]
300        let mut timer = LiveTimer::new(
301            "TEST_TIMER",
302            interval_ns,
303            start_time,
304            Some(stop_time),
305            callback,
306        );
307
308        #[cfg(feature = "clock_v2")]
309        let (_heap, mut timer) = {
310            let heap = Arc::new(Mutex::new(BinaryHeap::new()));
311            (
312                heap.clone(),
313                LiveTimer::new(
314                    "TEST_TIMER",
315                    interval_ns,
316                    start_time,
317                    Some(stop_time),
318                    callback,
319                    heap,
320                ),
321            )
322        };
323
324        timer.start();
325
326        wait_until(|| timer.is_expired(), Duration::from_secs(2));
327    }
328}