Skip to main content

nautilus_common/python/
actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
5//  You may not use this file except in compliance with the License.
6//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
7//  Unless required by applicable law or agreed to in writing, software
8//  distributed under the License is distributed on an "AS IS" BASIS,
9//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10//  See the License for the specific language governing permissions and
11//  limitations under the License.
12// -------------------------------------------------------------------------------------------------
13
14//! Python bindings for DataActor with complete command and event handler forwarding.
15
16use std::{
17    any::Any,
18    cell::{RefCell, UnsafeCell},
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroUsize,
22    ops::{Deref, DerefMut},
23    rc::Rc,
24};
25
26use indexmap::IndexMap;
27use nautilus_core::{
28    nanos::UnixNanos,
29    python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err},
30};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33    Block, Blockchain, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap,
34};
35use nautilus_model::{
36    data::{
37        Bar, BarType, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38        MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
39    },
40    enums::BookType,
41    identifiers::{ActorId, ClientId, InstrumentId, TraderId, Venue},
42    instruments::InstrumentAny,
43    orderbook::OrderBook,
44    python::instruments::instrument_any_to_pyobject,
45};
46use pyo3::{prelude::*, types::PyDict};
47
48use crate::{
49    actor::{
50        Actor, DataActor,
51        data_actor::{DataActorConfig, DataActorCore, ImportableActorConfig},
52        registry::{get_actor_registry, try_get_actor_unchecked},
53    },
54    cache::Cache,
55    clock::Clock,
56    component::{Component, get_component_registry},
57    enums::ComponentState,
58    python::{cache::PyCache, clock::PyClock, logging::PyLogger},
59    signal::Signal,
60    timer::{TimeEvent, TimeEventCallback},
61};
62
63#[pyo3::pymethods]
64impl DataActorConfig {
65    #[new]
66    #[pyo3(signature = (actor_id=None, log_events=true, log_commands=true))]
67    fn py_new(actor_id: Option<ActorId>, log_events: bool, log_commands: bool) -> Self {
68        Self {
69            actor_id,
70            log_events,
71            log_commands,
72        }
73    }
74}
75
76#[pyo3::pymethods]
77impl ImportableActorConfig {
78    #[new]
79    fn py_new(actor_path: String, config_path: String, config: Py<PyDict>) -> PyResult<Self> {
80        let json_config = Python::attach(|py| -> PyResult<HashMap<String, serde_json::Value>> {
81            let json_str: String = PyModule::import(py, "json")?
82                .call_method("dumps", (config.bind(py),), None)?
83                .extract()?;
84
85            let json_value: serde_json::Value =
86                serde_json::from_str(&json_str).map_err(to_pyvalue_err)?;
87
88            if let serde_json::Value::Object(map) = json_value {
89                Ok(map.into_iter().collect())
90            } else {
91                Err(to_pyvalue_err("Config must be a dictionary"))
92            }
93        })?;
94
95        Ok(Self {
96            actor_path,
97            config_path,
98            config: json_config,
99        })
100    }
101
102    #[getter]
103    fn actor_path(&self) -> &String {
104        &self.actor_path
105    }
106
107    #[getter]
108    fn config_path(&self) -> &String {
109        &self.config_path
110    }
111
112    #[getter]
113    fn config(&self, py: Python<'_>) -> PyResult<Py<PyDict>> {
114        // Convert HashMap<String, serde_json::Value> back to Python dict
115        let py_dict = PyDict::new(py);
116        for (key, value) in &self.config {
117            // Convert serde_json::Value back to Python object via JSON
118            let json_str = serde_json::to_string(value).map_err(to_pyvalue_err)?;
119            let py_value = PyModule::import(py, "json")?.call_method("loads", (json_str,), None)?;
120            py_dict.set_item(key, py_value)?;
121        }
122        Ok(py_dict.unbind())
123    }
124}
125
126/// Inner state of PyDataActor, shared between Python wrapper and Rust registries.
127///
128/// This type holds the actual actor state and implements all the actor traits.
129/// It is wrapped in `Rc<UnsafeCell<>>` to allow shared ownership between Python
130/// and the global registries without copying.
131pub struct PyDataActorInner {
132    core: DataActorCore,
133    py_self: Option<Py<PyAny>>,
134    clock: PyClock,
135    logger: PyLogger,
136}
137
138impl Debug for PyDataActorInner {
139    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140        f.debug_struct(stringify!(PyDataActorInner))
141            .field("core", &self.core)
142            .field("py_self", &self.py_self.as_ref().map(|_| "<Py<PyAny>>"))
143            .field("clock", &self.clock)
144            .field("logger", &self.logger)
145            .finish()
146    }
147}
148
149impl Deref for PyDataActorInner {
150    type Target = DataActorCore;
151
152    fn deref(&self) -> &Self::Target {
153        &self.core
154    }
155}
156
157impl DerefMut for PyDataActorInner {
158    fn deref_mut(&mut self) -> &mut Self::Target {
159        &mut self.core
160    }
161}
162
163impl PyDataActorInner {
164    fn dispatch_on_start(&self) -> PyResult<()> {
165        if let Some(ref py_self) = self.py_self {
166            Python::attach(|py| py_self.call_method0(py, "on_start"))?;
167        }
168        Ok(())
169    }
170
171    fn dispatch_on_stop(&mut self) -> PyResult<()> {
172        if let Some(ref py_self) = self.py_self {
173            Python::attach(|py| py_self.call_method0(py, "on_stop"))?;
174        }
175        Ok(())
176    }
177
178    fn dispatch_on_resume(&mut self) -> PyResult<()> {
179        if let Some(ref py_self) = self.py_self {
180            Python::attach(|py| py_self.call_method0(py, "on_resume"))?;
181        }
182        Ok(())
183    }
184
185    fn dispatch_on_reset(&mut self) -> PyResult<()> {
186        if let Some(ref py_self) = self.py_self {
187            Python::attach(|py| py_self.call_method0(py, "on_reset"))?;
188        }
189        Ok(())
190    }
191
192    fn dispatch_on_dispose(&mut self) -> PyResult<()> {
193        if let Some(ref py_self) = self.py_self {
194            Python::attach(|py| py_self.call_method0(py, "on_dispose"))?;
195        }
196        Ok(())
197    }
198
199    fn dispatch_on_degrade(&mut self) -> PyResult<()> {
200        if let Some(ref py_self) = self.py_self {
201            Python::attach(|py| py_self.call_method0(py, "on_degrade"))?;
202        }
203        Ok(())
204    }
205
206    fn dispatch_on_fault(&mut self) -> PyResult<()> {
207        if let Some(ref py_self) = self.py_self {
208            Python::attach(|py| py_self.call_method0(py, "on_fault"))?;
209        }
210        Ok(())
211    }
212
213    fn dispatch_on_time_event(&mut self, event: TimeEvent) -> PyResult<()> {
214        if let Some(ref py_self) = self.py_self {
215            Python::attach(|py| {
216                py_self.call_method1(py, "on_time_event", (event.into_py_any_unwrap(py),))
217            })?;
218        }
219        Ok(())
220    }
221
222    fn dispatch_on_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
223        if let Some(ref py_self) = self.py_self {
224            Python::attach(|py| py_self.call_method1(py, "on_data", (data,)))?;
225        }
226        Ok(())
227    }
228
229    fn dispatch_on_signal(&mut self, signal: &Signal) -> PyResult<()> {
230        if let Some(ref py_self) = self.py_self {
231            Python::attach(|py| {
232                py_self.call_method1(py, "on_signal", (signal.clone().into_py_any_unwrap(py),))
233            })?;
234        }
235        Ok(())
236    }
237
238    fn dispatch_on_instrument(&mut self, instrument: Py<PyAny>) -> PyResult<()> {
239        if let Some(ref py_self) = self.py_self {
240            Python::attach(|py| py_self.call_method1(py, "on_instrument", (instrument,)))?;
241        }
242        Ok(())
243    }
244
245    fn dispatch_on_quote(&mut self, quote: QuoteTick) -> PyResult<()> {
246        if let Some(ref py_self) = self.py_self {
247            Python::attach(|py| {
248                py_self.call_method1(py, "on_quote", (quote.into_py_any_unwrap(py),))
249            })?;
250        }
251        Ok(())
252    }
253
254    fn dispatch_on_trade(&mut self, trade: TradeTick) -> PyResult<()> {
255        if let Some(ref py_self) = self.py_self {
256            Python::attach(|py| {
257                py_self.call_method1(py, "on_trade", (trade.into_py_any_unwrap(py),))
258            })?;
259        }
260        Ok(())
261    }
262
263    fn dispatch_on_bar(&mut self, bar: Bar) -> PyResult<()> {
264        if let Some(ref py_self) = self.py_self {
265            Python::attach(|py| py_self.call_method1(py, "on_bar", (bar.into_py_any_unwrap(py),)))?;
266        }
267        Ok(())
268    }
269
270    fn dispatch_on_book_deltas(&mut self, deltas: OrderBookDeltas) -> PyResult<()> {
271        if let Some(ref py_self) = self.py_self {
272            Python::attach(|py| {
273                py_self.call_method1(py, "on_book_deltas", (deltas.into_py_any_unwrap(py),))
274            })?;
275        }
276        Ok(())
277    }
278
279    fn dispatch_on_book(&mut self, book: &OrderBook) -> PyResult<()> {
280        if let Some(ref py_self) = self.py_self {
281            Python::attach(|py| {
282                py_self.call_method1(py, "on_book", (book.clone().into_py_any_unwrap(py),))
283            })?;
284        }
285        Ok(())
286    }
287
288    fn dispatch_on_mark_price(&mut self, mark_price: MarkPriceUpdate) -> PyResult<()> {
289        if let Some(ref py_self) = self.py_self {
290            Python::attach(|py| {
291                py_self.call_method1(py, "on_mark_price", (mark_price.into_py_any_unwrap(py),))
292            })?;
293        }
294        Ok(())
295    }
296
297    fn dispatch_on_index_price(&mut self, index_price: IndexPriceUpdate) -> PyResult<()> {
298        if let Some(ref py_self) = self.py_self {
299            Python::attach(|py| {
300                py_self.call_method1(py, "on_index_price", (index_price.into_py_any_unwrap(py),))
301            })?;
302        }
303        Ok(())
304    }
305
306    fn dispatch_on_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> PyResult<()> {
307        if let Some(ref py_self) = self.py_self {
308            Python::attach(|py| {
309                py_self.call_method1(
310                    py,
311                    "on_funding_rate",
312                    (funding_rate.into_py_any_unwrap(py),),
313                )
314            })?;
315        }
316        Ok(())
317    }
318
319    fn dispatch_on_instrument_status(&mut self, data: InstrumentStatus) -> PyResult<()> {
320        if let Some(ref py_self) = self.py_self {
321            Python::attach(|py| {
322                py_self.call_method1(py, "on_instrument_status", (data.into_py_any_unwrap(py),))
323            })?;
324        }
325        Ok(())
326    }
327
328    fn dispatch_on_instrument_close(&mut self, update: InstrumentClose) -> PyResult<()> {
329        if let Some(ref py_self) = self.py_self {
330            Python::attach(|py| {
331                py_self.call_method1(py, "on_instrument_close", (update.into_py_any_unwrap(py),))
332            })?;
333        }
334        Ok(())
335    }
336
337    fn dispatch_on_historical_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
338        if let Some(ref py_self) = self.py_self {
339            Python::attach(|py| py_self.call_method1(py, "on_historical_data", (data,)))?;
340        }
341        Ok(())
342    }
343
344    fn dispatch_on_historical_quotes(&mut self, quotes: Vec<QuoteTick>) -> PyResult<()> {
345        if let Some(ref py_self) = self.py_self {
346            Python::attach(|py| {
347                let py_quotes: Vec<_> = quotes
348                    .into_iter()
349                    .map(|q| q.into_py_any_unwrap(py))
350                    .collect();
351                py_self.call_method1(py, "on_historical_quotes", (py_quotes,))
352            })?;
353        }
354        Ok(())
355    }
356
357    fn dispatch_on_historical_trades(&mut self, trades: Vec<TradeTick>) -> PyResult<()> {
358        if let Some(ref py_self) = self.py_self {
359            Python::attach(|py| {
360                let py_trades: Vec<_> = trades
361                    .into_iter()
362                    .map(|t| t.into_py_any_unwrap(py))
363                    .collect();
364                py_self.call_method1(py, "on_historical_trades", (py_trades,))
365            })?;
366        }
367        Ok(())
368    }
369
370    fn dispatch_on_historical_bars(&mut self, bars: Vec<Bar>) -> PyResult<()> {
371        if let Some(ref py_self) = self.py_self {
372            Python::attach(|py| {
373                let py_bars: Vec<_> = bars.into_iter().map(|b| b.into_py_any_unwrap(py)).collect();
374                py_self.call_method1(py, "on_historical_bars", (py_bars,))
375            })?;
376        }
377        Ok(())
378    }
379
380    fn dispatch_on_historical_mark_prices(
381        &mut self,
382        mark_prices: Vec<MarkPriceUpdate>,
383    ) -> PyResult<()> {
384        if let Some(ref py_self) = self.py_self {
385            Python::attach(|py| {
386                let py_prices: Vec<_> = mark_prices
387                    .into_iter()
388                    .map(|p| p.into_py_any_unwrap(py))
389                    .collect();
390                py_self.call_method1(py, "on_historical_mark_prices", (py_prices,))
391            })?;
392        }
393        Ok(())
394    }
395
396    fn dispatch_on_historical_index_prices(
397        &mut self,
398        index_prices: Vec<IndexPriceUpdate>,
399    ) -> PyResult<()> {
400        if let Some(ref py_self) = self.py_self {
401            Python::attach(|py| {
402                let py_prices: Vec<_> = index_prices
403                    .into_iter()
404                    .map(|p| p.into_py_any_unwrap(py))
405                    .collect();
406                py_self.call_method1(py, "on_historical_index_prices", (py_prices,))
407            })?;
408        }
409        Ok(())
410    }
411
412    #[cfg(feature = "defi")]
413    fn dispatch_on_block(&mut self, block: Block) -> PyResult<()> {
414        if let Some(ref py_self) = self.py_self {
415            Python::attach(|py| {
416                py_self.call_method1(py, "on_block", (block.into_py_any_unwrap(py),))
417            })?;
418        }
419        Ok(())
420    }
421
422    #[cfg(feature = "defi")]
423    fn dispatch_on_pool(&mut self, pool: Pool) -> PyResult<()> {
424        if let Some(ref py_self) = self.py_self {
425            Python::attach(|py| {
426                py_self.call_method1(py, "on_pool", (pool.into_py_any_unwrap(py),))
427            })?;
428        }
429        Ok(())
430    }
431
432    #[cfg(feature = "defi")]
433    fn dispatch_on_pool_swap(&mut self, swap: PoolSwap) -> PyResult<()> {
434        if let Some(ref py_self) = self.py_self {
435            Python::attach(|py| {
436                py_self.call_method1(py, "on_pool_swap", (swap.into_py_any_unwrap(py),))
437            })?;
438        }
439        Ok(())
440    }
441
442    #[cfg(feature = "defi")]
443    fn dispatch_on_pool_liquidity_update(&mut self, update: PoolLiquidityUpdate) -> PyResult<()> {
444        if let Some(ref py_self) = self.py_self {
445            Python::attach(|py| {
446                py_self.call_method1(
447                    py,
448                    "on_pool_liquidity_update",
449                    (update.into_py_any_unwrap(py),),
450                )
451            })?;
452        }
453        Ok(())
454    }
455
456    #[cfg(feature = "defi")]
457    fn dispatch_on_pool_fee_collect(&mut self, collect: PoolFeeCollect) -> PyResult<()> {
458        if let Some(ref py_self) = self.py_self {
459            Python::attach(|py| {
460                py_self.call_method1(py, "on_pool_fee_collect", (collect.into_py_any_unwrap(py),))
461            })?;
462        }
463        Ok(())
464    }
465
466    #[cfg(feature = "defi")]
467    fn dispatch_on_pool_flash(&mut self, flash: PoolFlash) -> PyResult<()> {
468        if let Some(ref py_self) = self.py_self {
469            Python::attach(|py| {
470                py_self.call_method1(py, "on_pool_flash", (flash.into_py_any_unwrap(py),))
471            })?;
472        }
473        Ok(())
474    }
475}
476
477/// Python-facing wrapper for DataActor.
478///
479/// This wrapper holds shared ownership of `PyDataActorInner` via `Rc<UnsafeCell<>>`.
480/// Both Python (through this wrapper) and the global registries share the same
481/// underlying actor instance, ensuring mutations are visible from both sides.
482#[allow(non_camel_case_types)]
483#[pyo3::pyclass(
484    module = "nautilus_trader.common",
485    name = "DataActor",
486    unsendable,
487    subclass
488)]
489pub struct PyDataActor {
490    inner: Rc<UnsafeCell<PyDataActorInner>>,
491}
492
493impl Debug for PyDataActor {
494    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
495        f.debug_struct(stringify!(PyDataActor))
496            .field("inner", &self.inner())
497            .finish()
498    }
499}
500
501impl PyDataActor {
502    /// Returns a reference to the inner actor state.
503    ///
504    /// # Safety
505    ///
506    /// This is safe for single-threaded use. The `UnsafeCell` allows interior
507    /// mutability which is required for the registries to mutate the actor.
508    #[inline]
509    #[allow(unsafe_code)]
510    pub(crate) fn inner(&self) -> &PyDataActorInner {
511        unsafe { &*self.inner.get() }
512    }
513
514    /// Returns a mutable reference to the inner actor state.
515    ///
516    /// # Safety
517    ///
518    /// This is safe for single-threaded use. Callers must ensure no aliasing
519    /// mutable references exist.
520    #[inline]
521    #[allow(unsafe_code, clippy::mut_from_ref)]
522    pub(crate) fn inner_mut(&self) -> &mut PyDataActorInner {
523        unsafe { &mut *self.inner.get() }
524    }
525}
526
527impl Deref for PyDataActor {
528    type Target = DataActorCore;
529
530    fn deref(&self) -> &Self::Target {
531        &self.inner().core
532    }
533}
534
535impl DerefMut for PyDataActor {
536    fn deref_mut(&mut self) -> &mut Self::Target {
537        &mut self.inner_mut().core
538    }
539}
540
541impl PyDataActor {
542    // Rust constructor for tests and direct Rust usage
543    pub fn new(config: Option<DataActorConfig>) -> Self {
544        let config = config.unwrap_or_default();
545        let core = DataActorCore::new(config);
546        let clock = PyClock::new_test(); // Temporary clock, will be updated on registration
547        let logger = PyLogger::new(core.actor_id().as_str());
548
549        let inner = PyDataActorInner {
550            core,
551            py_self: None,
552            clock,
553            logger,
554        };
555
556        Self {
557            inner: Rc::new(UnsafeCell::new(inner)),
558        }
559    }
560
561    /// Sets the Python instance reference for method dispatch.
562    ///
563    /// This enables the PyDataActor to forward method calls (like `on_start`, `on_stop`)
564    /// to the original Python instance that contains this PyDataActor. This is essential
565    /// for Python inheritance to work correctly, allowing Python subclasses to override
566    /// DataActor methods and have them called by the Rust system.
567    pub fn set_python_instance(&mut self, py_obj: Py<PyAny>) {
568        self.inner_mut().py_self = Some(py_obj);
569    }
570
571    /// Updates the actor_id in both the core config and the actor_id field.
572    ///
573    /// # Safety
574    ///
575    /// This method is only exposed for the Python actor to assist with configuration and should
576    /// **never** be called post registration. Calling this after registration will cause
577    /// inconsistent state where the actor is registered under one ID but its internal actor_id
578    /// field contains another, breaking message routing and lifecycle management.
579    pub fn set_actor_id(&mut self, actor_id: ActorId) {
580        let inner = self.inner_mut();
581        inner.core.config.actor_id = Some(actor_id);
582        inner.core.actor_id = actor_id;
583    }
584
585    /// Updates the log_events setting in the core config.
586    pub fn set_log_events(&mut self, log_events: bool) {
587        self.inner_mut().core.config.log_events = log_events;
588    }
589
590    /// Updates the log_commands setting in the core config.
591    pub fn set_log_commands(&mut self, log_commands: bool) {
592        self.inner_mut().core.config.log_commands = log_commands;
593    }
594
595    /// Returns the memory address of this instance as a hexadecimal string.
596    pub fn mem_address(&self) -> String {
597        self.inner().core.mem_address()
598    }
599
600    /// Returns a value indicating whether the actor has been registered with a trader.
601    pub fn is_registered(&self) -> bool {
602        self.inner().core.is_registered()
603    }
604
605    /// Register the actor with a trader.
606    ///
607    /// # Errors
608    ///
609    /// Returns an error if the actor is already registered or if the registration process fails.
610    pub fn register(
611        &mut self,
612        trader_id: TraderId,
613        clock: Rc<RefCell<dyn Clock>>,
614        cache: Rc<RefCell<Cache>>,
615    ) -> anyhow::Result<()> {
616        let inner = self.inner_mut();
617        inner.core.register(trader_id, clock, cache)?;
618
619        inner.clock = PyClock::from_rc(inner.core.clock_rc());
620
621        // Register default time event handler for this actor
622        let actor_id = inner.actor_id().inner();
623        let callback = TimeEventCallback::from(move |event: TimeEvent| {
624            if let Some(mut actor) = try_get_actor_unchecked::<PyDataActorInner>(&actor_id) {
625                if let Err(e) = actor.on_time_event(&event) {
626                    log::error!("Python time event handler failed for actor {actor_id}: {e}");
627                }
628            } else {
629                log::error!("Actor {actor_id} not found for time event handling");
630            }
631        });
632
633        inner.clock.inner_mut().register_default_handler(callback);
634
635        inner.initialize()
636    }
637
638    /// Registers this actor in the global component and actor registries.
639    ///
640    /// Clones the internal `Rc` and inserts into both registries. This ensures
641    /// Python and the registries share the exact same actor instance.
642    pub fn register_in_global_registries(&self) {
643        let inner = self.inner();
644        let component_id = inner.component_id().inner();
645        let actor_id = Actor::id(inner);
646
647        let inner_ref: Rc<UnsafeCell<PyDataActorInner>> = self.inner.clone();
648
649        let component_trait_ref: Rc<UnsafeCell<dyn Component>> = inner_ref.clone();
650        get_component_registry().insert(component_id, component_trait_ref);
651
652        let actor_trait_ref: Rc<UnsafeCell<dyn Actor>> = inner_ref;
653        get_actor_registry().insert(actor_id, actor_trait_ref);
654    }
655}
656
657impl DataActor for PyDataActorInner {
658    fn on_start(&mut self) -> anyhow::Result<()> {
659        self.dispatch_on_start()
660            .map_err(|e| anyhow::anyhow!("Python on_start failed: {e}"))
661    }
662
663    fn on_stop(&mut self) -> anyhow::Result<()> {
664        self.dispatch_on_stop()
665            .map_err(|e| anyhow::anyhow!("Python on_stop failed: {e}"))
666    }
667
668    fn on_resume(&mut self) -> anyhow::Result<()> {
669        self.dispatch_on_resume()
670            .map_err(|e| anyhow::anyhow!("Python on_resume failed: {e}"))
671    }
672
673    fn on_reset(&mut self) -> anyhow::Result<()> {
674        self.dispatch_on_reset()
675            .map_err(|e| anyhow::anyhow!("Python on_reset failed: {e}"))
676    }
677
678    fn on_dispose(&mut self) -> anyhow::Result<()> {
679        self.dispatch_on_dispose()
680            .map_err(|e| anyhow::anyhow!("Python on_dispose failed: {e}"))
681    }
682
683    fn on_degrade(&mut self) -> anyhow::Result<()> {
684        self.dispatch_on_degrade()
685            .map_err(|e| anyhow::anyhow!("Python on_degrade failed: {e}"))
686    }
687
688    fn on_fault(&mut self) -> anyhow::Result<()> {
689        self.dispatch_on_fault()
690            .map_err(|e| anyhow::anyhow!("Python on_fault failed: {e}"))
691    }
692
693    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
694        self.dispatch_on_time_event(event.clone())
695            .map_err(|e| anyhow::anyhow!("Python on_time_event failed: {e}"))
696    }
697
698    #[allow(unused_variables)]
699    fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
700        Python::attach(|py| {
701            // TODO: Create a placeholder object since we can't easily convert &dyn Any to Py<PyAny>
702            // For now, we'll pass None and let Python subclasses handle specific data types
703            let py_data = py.None();
704
705            self.dispatch_on_data(py_data)
706                .map_err(|e| anyhow::anyhow!("Python on_data failed: {e}"))
707        })
708    }
709
710    fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
711        self.dispatch_on_signal(signal)
712            .map_err(|e| anyhow::anyhow!("Python on_signal failed: {e}"))
713    }
714
715    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
716        Python::attach(|py| {
717            let py_instrument = instrument_any_to_pyobject(py, instrument.clone())
718                .map_err(|e| anyhow::anyhow!("Failed to convert InstrumentAny to Python: {e}"))?;
719            self.dispatch_on_instrument(py_instrument)
720                .map_err(|e| anyhow::anyhow!("Python on_instrument failed: {e}"))
721        })
722    }
723
724    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
725        self.dispatch_on_quote(*quote)
726            .map_err(|e| anyhow::anyhow!("Python on_quote failed: {e}"))
727    }
728
729    fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
730        self.dispatch_on_trade(*tick)
731            .map_err(|e| anyhow::anyhow!("Python on_trade failed: {e}"))
732    }
733
734    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
735        self.dispatch_on_bar(*bar)
736            .map_err(|e| anyhow::anyhow!("Python on_bar failed: {e}"))
737    }
738
739    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
740        self.dispatch_on_book_deltas(deltas.clone())
741            .map_err(|e| anyhow::anyhow!("Python on_book_deltas failed: {e}"))
742    }
743
744    fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
745        self.dispatch_on_book(order_book)
746            .map_err(|e| anyhow::anyhow!("Python on_book failed: {e}"))
747    }
748
749    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
750        self.dispatch_on_mark_price(*mark_price)
751            .map_err(|e| anyhow::anyhow!("Python on_mark_price failed: {e}"))
752    }
753
754    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
755        self.dispatch_on_index_price(*index_price)
756            .map_err(|e| anyhow::anyhow!("Python on_index_price failed: {e}"))
757    }
758
759    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
760        self.dispatch_on_funding_rate(*funding_rate)
761            .map_err(|e| anyhow::anyhow!("Python on_funding_rate failed: {e}"))
762    }
763
764    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
765        self.dispatch_on_instrument_status(*data)
766            .map_err(|e| anyhow::anyhow!("Python on_instrument_status failed: {e}"))
767    }
768
769    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
770        self.dispatch_on_instrument_close(*update)
771            .map_err(|e| anyhow::anyhow!("Python on_instrument_close failed: {e}"))
772    }
773
774    #[cfg(feature = "defi")]
775    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
776        self.dispatch_on_block(block.clone())
777            .map_err(|e| anyhow::anyhow!("Python on_block failed: {e}"))
778    }
779
780    #[cfg(feature = "defi")]
781    fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
782        self.dispatch_on_pool(pool.clone())
783            .map_err(|e| anyhow::anyhow!("Python on_pool failed: {e}"))
784    }
785
786    #[cfg(feature = "defi")]
787    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
788        self.dispatch_on_pool_swap(swap.clone())
789            .map_err(|e| anyhow::anyhow!("Python on_pool_swap failed: {e}"))
790    }
791
792    #[cfg(feature = "defi")]
793    fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
794        self.dispatch_on_pool_liquidity_update(update.clone())
795            .map_err(|e| anyhow::anyhow!("Python on_pool_liquidity_update failed: {e}"))
796    }
797
798    #[cfg(feature = "defi")]
799    fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
800        self.dispatch_on_pool_fee_collect(collect.clone())
801            .map_err(|e| anyhow::anyhow!("Python on_pool_fee_collect failed: {e}"))
802    }
803
804    #[cfg(feature = "defi")]
805    fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
806        self.dispatch_on_pool_flash(flash.clone())
807            .map_err(|e| anyhow::anyhow!("Python on_pool_flash failed: {e}"))
808    }
809
810    fn on_historical_data(&mut self, _data: &dyn Any) -> anyhow::Result<()> {
811        Python::attach(|py| {
812            let py_data = py.None();
813            self.dispatch_on_historical_data(py_data)
814                .map_err(|e| anyhow::anyhow!("Python on_historical_data failed: {e}"))
815        })
816    }
817
818    fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
819        self.dispatch_on_historical_quotes(quotes.to_vec())
820            .map_err(|e| anyhow::anyhow!("Python on_historical_quotes failed: {e}"))
821    }
822
823    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
824        self.dispatch_on_historical_trades(trades.to_vec())
825            .map_err(|e| anyhow::anyhow!("Python on_historical_trades failed: {e}"))
826    }
827
828    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
829        self.dispatch_on_historical_bars(bars.to_vec())
830            .map_err(|e| anyhow::anyhow!("Python on_historical_bars failed: {e}"))
831    }
832
833    fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
834        self.dispatch_on_historical_mark_prices(mark_prices.to_vec())
835            .map_err(|e| anyhow::anyhow!("Python on_historical_mark_prices failed: {e}"))
836    }
837
838    fn on_historical_index_prices(
839        &mut self,
840        index_prices: &[IndexPriceUpdate],
841    ) -> anyhow::Result<()> {
842        self.dispatch_on_historical_index_prices(index_prices.to_vec())
843            .map_err(|e| anyhow::anyhow!("Python on_historical_index_prices failed: {e}"))
844    }
845}
846
847#[pymethods]
848impl PyDataActor {
849    #[new]
850    #[pyo3(signature = (config=None))]
851    fn py_new(config: Option<DataActorConfig>) -> PyResult<Self> {
852        Ok(Self::new(config))
853    }
854
855    #[getter]
856    #[pyo3(name = "clock")]
857    fn py_clock(&self) -> PyResult<PyClock> {
858        let inner = self.inner();
859        if inner.core.is_registered() {
860            Ok(inner.clock.clone())
861        } else {
862            Err(to_pyruntime_err(
863                "Actor must be registered with a trader before accessing clock",
864            ))
865        }
866    }
867
868    #[getter]
869    #[pyo3(name = "cache")]
870    fn py_cache(&self) -> PyResult<PyCache> {
871        let inner = self.inner();
872        if inner.core.is_registered() {
873            Ok(PyCache::from_rc(inner.core.cache_rc()))
874        } else {
875            Err(to_pyruntime_err(
876                "Actor must be registered with a trader before accessing cache",
877            ))
878        }
879    }
880
881    #[getter]
882    #[pyo3(name = "log")]
883    fn py_log(&self) -> PyLogger {
884        self.inner().logger.clone()
885    }
886
887    #[getter]
888    #[pyo3(name = "actor_id")]
889    fn py_actor_id(&self) -> ActorId {
890        self.inner().core.actor_id
891    }
892
893    #[getter]
894    #[pyo3(name = "trader_id")]
895    fn py_trader_id(&self) -> Option<TraderId> {
896        self.inner().core.trader_id()
897    }
898
899    #[pyo3(name = "state")]
900    fn py_state(&self) -> ComponentState {
901        Component::state(self.inner())
902    }
903
904    #[pyo3(name = "is_ready")]
905    fn py_is_ready(&self) -> bool {
906        Component::is_ready(self.inner())
907    }
908
909    #[pyo3(name = "is_running")]
910    fn py_is_running(&self) -> bool {
911        Component::is_running(self.inner())
912    }
913
914    #[pyo3(name = "is_stopped")]
915    fn py_is_stopped(&self) -> bool {
916        Component::is_stopped(self.inner())
917    }
918
919    #[pyo3(name = "is_degraded")]
920    fn py_is_degraded(&self) -> bool {
921        Component::is_degraded(self.inner())
922    }
923
924    #[pyo3(name = "is_faulted")]
925    fn py_is_faulted(&self) -> bool {
926        Component::is_faulted(self.inner())
927    }
928
929    #[pyo3(name = "is_disposed")]
930    fn py_is_disposed(&self) -> bool {
931        Component::is_disposed(self.inner())
932    }
933
934    #[pyo3(name = "start")]
935    fn py_start(&mut self) -> PyResult<()> {
936        Component::start(self.inner_mut()).map_err(to_pyruntime_err)
937    }
938
939    #[pyo3(name = "stop")]
940    fn py_stop(&mut self) -> PyResult<()> {
941        Component::stop(self.inner_mut()).map_err(to_pyruntime_err)
942    }
943
944    #[pyo3(name = "resume")]
945    fn py_resume(&mut self) -> PyResult<()> {
946        Component::resume(self.inner_mut()).map_err(to_pyruntime_err)
947    }
948
949    #[pyo3(name = "reset")]
950    fn py_reset(&mut self) -> PyResult<()> {
951        Component::reset(self.inner_mut()).map_err(to_pyruntime_err)
952    }
953
954    #[pyo3(name = "dispose")]
955    fn py_dispose(&mut self) -> PyResult<()> {
956        Component::dispose(self.inner_mut()).map_err(to_pyruntime_err)
957    }
958
959    #[pyo3(name = "degrade")]
960    fn py_degrade(&mut self) -> PyResult<()> {
961        Component::degrade(self.inner_mut()).map_err(to_pyruntime_err)
962    }
963
964    #[pyo3(name = "fault")]
965    fn py_fault(&mut self) -> PyResult<()> {
966        Component::fault(self.inner_mut()).map_err(to_pyruntime_err)
967    }
968
969    #[pyo3(name = "shutdown_system")]
970    #[pyo3(signature = (reason=None))]
971    fn py_shutdown_system(&self, reason: Option<String>) -> PyResult<()> {
972        self.inner().core.shutdown_system(reason);
973        Ok(())
974    }
975
976    #[pyo3(name = "on_start")]
977    fn py_on_start(&self) -> PyResult<()> {
978        self.inner().dispatch_on_start()
979    }
980
981    #[pyo3(name = "on_stop")]
982    fn py_on_stop(&mut self) -> PyResult<()> {
983        self.inner_mut().dispatch_on_stop()
984    }
985
986    #[pyo3(name = "on_resume")]
987    fn py_on_resume(&mut self) -> PyResult<()> {
988        self.inner_mut().dispatch_on_resume()
989    }
990
991    #[pyo3(name = "on_reset")]
992    fn py_on_reset(&mut self) -> PyResult<()> {
993        self.inner_mut().dispatch_on_reset()
994    }
995
996    #[pyo3(name = "on_dispose")]
997    fn py_on_dispose(&mut self) -> PyResult<()> {
998        self.inner_mut().dispatch_on_dispose()
999    }
1000
1001    #[pyo3(name = "on_degrade")]
1002    fn py_on_degrade(&mut self) -> PyResult<()> {
1003        self.inner_mut().dispatch_on_degrade()
1004    }
1005
1006    #[pyo3(name = "on_fault")]
1007    fn py_on_fault(&mut self) -> PyResult<()> {
1008        self.inner_mut().dispatch_on_fault()
1009    }
1010
1011    #[pyo3(name = "on_time_event")]
1012    fn py_on_time_event(&mut self, event: TimeEvent) -> PyResult<()> {
1013        self.inner_mut().dispatch_on_time_event(event)
1014    }
1015
1016    #[pyo3(name = "on_data")]
1017    fn py_on_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
1018        self.inner_mut().dispatch_on_data(data)
1019    }
1020
1021    #[pyo3(name = "on_signal")]
1022    fn py_on_signal(&mut self, signal: &Signal) -> PyResult<()> {
1023        self.inner_mut().dispatch_on_signal(signal)
1024    }
1025
1026    #[pyo3(name = "on_instrument")]
1027    fn py_on_instrument(&mut self, instrument: Py<PyAny>) -> PyResult<()> {
1028        self.inner_mut().dispatch_on_instrument(instrument)
1029    }
1030
1031    #[pyo3(name = "on_quote")]
1032    fn py_on_quote(&mut self, quote: QuoteTick) -> PyResult<()> {
1033        self.inner_mut().dispatch_on_quote(quote)
1034    }
1035
1036    #[pyo3(name = "on_trade")]
1037    fn py_on_trade(&mut self, trade: TradeTick) -> PyResult<()> {
1038        self.inner_mut().dispatch_on_trade(trade)
1039    }
1040
1041    #[pyo3(name = "on_bar")]
1042    fn py_on_bar(&mut self, bar: Bar) -> PyResult<()> {
1043        self.inner_mut().dispatch_on_bar(bar)
1044    }
1045
1046    #[pyo3(name = "on_book_deltas")]
1047    fn py_on_book_deltas(&mut self, deltas: OrderBookDeltas) -> PyResult<()> {
1048        self.inner_mut().dispatch_on_book_deltas(deltas)
1049    }
1050
1051    #[pyo3(name = "on_book")]
1052    fn py_on_book(&mut self, book: &OrderBook) -> PyResult<()> {
1053        self.inner_mut().dispatch_on_book(book)
1054    }
1055
1056    #[pyo3(name = "on_mark_price")]
1057    fn py_on_mark_price(&mut self, mark_price: MarkPriceUpdate) -> PyResult<()> {
1058        self.inner_mut().dispatch_on_mark_price(mark_price)
1059    }
1060
1061    #[pyo3(name = "on_index_price")]
1062    fn py_on_index_price(&mut self, index_price: IndexPriceUpdate) -> PyResult<()> {
1063        self.inner_mut().dispatch_on_index_price(index_price)
1064    }
1065
1066    #[pyo3(name = "on_funding_rate")]
1067    fn py_on_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> PyResult<()> {
1068        self.inner_mut().dispatch_on_funding_rate(funding_rate)
1069    }
1070
1071    #[pyo3(name = "on_instrument_status")]
1072    fn py_on_instrument_status(&mut self, status: InstrumentStatus) -> PyResult<()> {
1073        self.inner_mut().dispatch_on_instrument_status(status)
1074    }
1075
1076    #[pyo3(name = "on_instrument_close")]
1077    fn py_on_instrument_close(&mut self, close: InstrumentClose) -> PyResult<()> {
1078        self.inner_mut().dispatch_on_instrument_close(close)
1079    }
1080
1081    #[cfg(feature = "defi")]
1082    #[pyo3(name = "on_block")]
1083    fn py_on_block(&mut self, block: Block) -> PyResult<()> {
1084        self.inner_mut().dispatch_on_block(block)
1085    }
1086
1087    #[cfg(feature = "defi")]
1088    #[pyo3(name = "on_pool")]
1089    fn py_on_pool(&mut self, pool: Pool) -> PyResult<()> {
1090        self.inner_mut().dispatch_on_pool(pool)
1091    }
1092
1093    #[cfg(feature = "defi")]
1094    #[pyo3(name = "on_pool_swap")]
1095    fn py_on_pool_swap(&mut self, swap: PoolSwap) -> PyResult<()> {
1096        self.inner_mut().dispatch_on_pool_swap(swap)
1097    }
1098
1099    #[cfg(feature = "defi")]
1100    #[pyo3(name = "on_pool_liquidity_update")]
1101    fn py_on_pool_liquidity_update(&mut self, update: PoolLiquidityUpdate) -> PyResult<()> {
1102        self.inner_mut().dispatch_on_pool_liquidity_update(update)
1103    }
1104
1105    #[cfg(feature = "defi")]
1106    #[pyo3(name = "on_pool_fee_collect")]
1107    fn py_on_pool_fee_collect(&mut self, update: PoolFeeCollect) -> PyResult<()> {
1108        self.inner_mut().dispatch_on_pool_fee_collect(update)
1109    }
1110
1111    #[cfg(feature = "defi")]
1112    #[pyo3(name = "on_pool_flash")]
1113    fn py_on_pool_flash(&mut self, flash: PoolFlash) -> PyResult<()> {
1114        self.inner_mut().dispatch_on_pool_flash(flash)
1115    }
1116
1117    #[pyo3(name = "subscribe_data")]
1118    #[pyo3(signature = (data_type, client_id=None, params=None))]
1119    fn py_subscribe_data(
1120        &mut self,
1121        data_type: DataType,
1122        client_id: Option<ClientId>,
1123        params: Option<IndexMap<String, String>>,
1124    ) -> PyResult<()> {
1125        DataActor::subscribe_data(self.inner_mut(), data_type, client_id, params);
1126        Ok(())
1127    }
1128
1129    #[pyo3(name = "subscribe_instruments")]
1130    #[pyo3(signature = (venue, client_id=None, params=None))]
1131    fn py_subscribe_instruments(
1132        &mut self,
1133        venue: Venue,
1134        client_id: Option<ClientId>,
1135        params: Option<IndexMap<String, String>>,
1136    ) -> PyResult<()> {
1137        DataActor::subscribe_instruments(self.inner_mut(), venue, client_id, params);
1138        Ok(())
1139    }
1140
1141    #[pyo3(name = "subscribe_instrument")]
1142    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1143    fn py_subscribe_instrument(
1144        &mut self,
1145        instrument_id: InstrumentId,
1146        client_id: Option<ClientId>,
1147        params: Option<IndexMap<String, String>>,
1148    ) -> PyResult<()> {
1149        DataActor::subscribe_instrument(self.inner_mut(), instrument_id, client_id, params);
1150        Ok(())
1151    }
1152
1153    #[pyo3(name = "subscribe_book_deltas")]
1154    #[pyo3(signature = (instrument_id, book_type, depth=None, client_id=None, managed=false, params=None))]
1155    fn py_subscribe_book_deltas(
1156        &mut self,
1157        instrument_id: InstrumentId,
1158        book_type: BookType,
1159        depth: Option<usize>,
1160        client_id: Option<ClientId>,
1161        managed: bool,
1162        params: Option<IndexMap<String, String>>,
1163    ) -> PyResult<()> {
1164        let depth = depth.and_then(NonZeroUsize::new);
1165        DataActor::subscribe_book_deltas(
1166            self.inner_mut(),
1167            instrument_id,
1168            book_type,
1169            depth,
1170            client_id,
1171            managed,
1172            params,
1173        );
1174        Ok(())
1175    }
1176
1177    #[pyo3(name = "subscribe_book_at_interval")]
1178    #[pyo3(signature = (instrument_id, book_type, interval_ms, depth=None, client_id=None, params=None))]
1179    fn py_subscribe_book_at_interval(
1180        &mut self,
1181        instrument_id: InstrumentId,
1182        book_type: BookType,
1183        interval_ms: usize,
1184        depth: Option<usize>,
1185        client_id: Option<ClientId>,
1186        params: Option<IndexMap<String, String>>,
1187    ) -> PyResult<()> {
1188        let depth = depth.and_then(NonZeroUsize::new);
1189        let interval_ms = NonZeroUsize::new(interval_ms)
1190            .ok_or_else(|| to_pyvalue_err("interval_ms must be > 0"))?;
1191
1192        DataActor::subscribe_book_at_interval(
1193            self.inner_mut(),
1194            instrument_id,
1195            book_type,
1196            depth,
1197            interval_ms,
1198            client_id,
1199            params,
1200        );
1201        Ok(())
1202    }
1203
1204    #[pyo3(name = "subscribe_quotes")]
1205    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1206    fn py_subscribe_quotes(
1207        &mut self,
1208        instrument_id: InstrumentId,
1209        client_id: Option<ClientId>,
1210        params: Option<IndexMap<String, String>>,
1211    ) -> PyResult<()> {
1212        DataActor::subscribe_quotes(self.inner_mut(), instrument_id, client_id, params);
1213        Ok(())
1214    }
1215
1216    #[pyo3(name = "subscribe_trades")]
1217    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1218    fn py_subscribe_trades(
1219        &mut self,
1220        instrument_id: InstrumentId,
1221        client_id: Option<ClientId>,
1222        params: Option<IndexMap<String, String>>,
1223    ) -> PyResult<()> {
1224        DataActor::subscribe_trades(self.inner_mut(), instrument_id, client_id, params);
1225        Ok(())
1226    }
1227
1228    #[pyo3(name = "subscribe_bars")]
1229    #[pyo3(signature = (bar_type, client_id=None, params=None))]
1230    fn py_subscribe_bars(
1231        &mut self,
1232        bar_type: BarType,
1233        client_id: Option<ClientId>,
1234        params: Option<IndexMap<String, String>>,
1235    ) -> PyResult<()> {
1236        DataActor::subscribe_bars(self.inner_mut(), bar_type, client_id, params);
1237        Ok(())
1238    }
1239
1240    #[pyo3(name = "subscribe_mark_prices")]
1241    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1242    fn py_subscribe_mark_prices(
1243        &mut self,
1244        instrument_id: InstrumentId,
1245        client_id: Option<ClientId>,
1246        params: Option<IndexMap<String, String>>,
1247    ) -> PyResult<()> {
1248        DataActor::subscribe_mark_prices(self.inner_mut(), instrument_id, client_id, params);
1249        Ok(())
1250    }
1251
1252    #[pyo3(name = "subscribe_index_prices")]
1253    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1254    fn py_subscribe_index_prices(
1255        &mut self,
1256        instrument_id: InstrumentId,
1257        client_id: Option<ClientId>,
1258        params: Option<IndexMap<String, String>>,
1259    ) -> PyResult<()> {
1260        DataActor::subscribe_index_prices(self.inner_mut(), instrument_id, client_id, params);
1261        Ok(())
1262    }
1263
1264    #[pyo3(name = "subscribe_instrument_status")]
1265    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1266    fn py_subscribe_instrument_status(
1267        &mut self,
1268        instrument_id: InstrumentId,
1269        client_id: Option<ClientId>,
1270        params: Option<IndexMap<String, String>>,
1271    ) -> PyResult<()> {
1272        DataActor::subscribe_instrument_status(self.inner_mut(), instrument_id, client_id, params);
1273        Ok(())
1274    }
1275
1276    #[pyo3(name = "subscribe_instrument_close")]
1277    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1278    fn py_subscribe_instrument_close(
1279        &mut self,
1280        instrument_id: InstrumentId,
1281        client_id: Option<ClientId>,
1282        params: Option<IndexMap<String, String>>,
1283    ) -> PyResult<()> {
1284        DataActor::subscribe_instrument_close(self.inner_mut(), instrument_id, client_id, params);
1285        Ok(())
1286    }
1287
1288    #[pyo3(name = "subscribe_order_fills")]
1289    #[pyo3(signature = (instrument_id))]
1290    fn py_subscribe_order_fills(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1291        DataActor::subscribe_order_fills(self.inner_mut(), instrument_id);
1292        Ok(())
1293    }
1294
1295    #[pyo3(name = "subscribe_order_cancels")]
1296    #[pyo3(signature = (instrument_id))]
1297    fn py_subscribe_order_cancels(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1298        DataActor::subscribe_order_cancels(self.inner_mut(), instrument_id);
1299        Ok(())
1300    }
1301
1302    #[cfg(feature = "defi")]
1303    #[pyo3(name = "subscribe_blocks")]
1304    #[pyo3(signature = (chain, client_id=None, params=None))]
1305    fn py_subscribe_blocks(
1306        &mut self,
1307        chain: Blockchain,
1308        client_id: Option<ClientId>,
1309        params: Option<IndexMap<String, String>>,
1310    ) -> PyResult<()> {
1311        DataActor::subscribe_blocks(self.inner_mut(), chain, client_id, params);
1312        Ok(())
1313    }
1314
1315    #[cfg(feature = "defi")]
1316    #[pyo3(name = "subscribe_pool")]
1317    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1318    fn py_subscribe_pool(
1319        &mut self,
1320        instrument_id: InstrumentId,
1321        client_id: Option<ClientId>,
1322        params: Option<IndexMap<String, String>>,
1323    ) -> PyResult<()> {
1324        DataActor::subscribe_pool(self.inner_mut(), instrument_id, client_id, params);
1325        Ok(())
1326    }
1327
1328    #[cfg(feature = "defi")]
1329    #[pyo3(name = "subscribe_pool_swaps")]
1330    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1331    fn py_subscribe_pool_swaps(
1332        &mut self,
1333        instrument_id: InstrumentId,
1334        client_id: Option<ClientId>,
1335        params: Option<IndexMap<String, String>>,
1336    ) -> PyResult<()> {
1337        DataActor::subscribe_pool_swaps(self.inner_mut(), instrument_id, client_id, params);
1338        Ok(())
1339    }
1340
1341    #[cfg(feature = "defi")]
1342    #[pyo3(name = "subscribe_pool_liquidity_updates")]
1343    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1344    fn py_subscribe_pool_liquidity_updates(
1345        &mut self,
1346        instrument_id: InstrumentId,
1347        client_id: Option<ClientId>,
1348        params: Option<IndexMap<String, String>>,
1349    ) -> PyResult<()> {
1350        DataActor::subscribe_pool_liquidity_updates(
1351            self.inner_mut(),
1352            instrument_id,
1353            client_id,
1354            params,
1355        );
1356        Ok(())
1357    }
1358
1359    #[cfg(feature = "defi")]
1360    #[pyo3(name = "subscribe_pool_fee_collects")]
1361    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1362    fn py_subscribe_pool_fee_collects(
1363        &mut self,
1364        instrument_id: InstrumentId,
1365        client_id: Option<ClientId>,
1366        params: Option<IndexMap<String, String>>,
1367    ) -> PyResult<()> {
1368        DataActor::subscribe_pool_fee_collects(self.inner_mut(), instrument_id, client_id, params);
1369        Ok(())
1370    }
1371
1372    #[cfg(feature = "defi")]
1373    #[pyo3(name = "subscribe_pool_flash_events")]
1374    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1375    fn py_subscribe_pool_flash_events(
1376        &mut self,
1377        instrument_id: InstrumentId,
1378        client_id: Option<ClientId>,
1379        params: Option<IndexMap<String, String>>,
1380    ) -> PyResult<()> {
1381        DataActor::subscribe_pool_flash_events(self.inner_mut(), instrument_id, client_id, params);
1382        Ok(())
1383    }
1384
1385    #[pyo3(name = "request_data")]
1386    #[pyo3(signature = (data_type, client_id, start=None, end=None, limit=None, params=None))]
1387    fn py_request_data(
1388        &mut self,
1389        data_type: DataType,
1390        client_id: ClientId,
1391        start: Option<u64>,
1392        end: Option<u64>,
1393        limit: Option<usize>,
1394        params: Option<IndexMap<String, String>>,
1395    ) -> PyResult<String> {
1396        let limit = limit.and_then(NonZeroUsize::new);
1397        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1398        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1399
1400        let request_id = DataActor::request_data(
1401            self.inner_mut(),
1402            data_type,
1403            client_id,
1404            start,
1405            end,
1406            limit,
1407            params,
1408        )
1409        .map_err(to_pyvalue_err)?;
1410        Ok(request_id.to_string())
1411    }
1412
1413    #[pyo3(name = "request_instrument")]
1414    #[pyo3(signature = (instrument_id, start=None, end=None, client_id=None, params=None))]
1415    fn py_request_instrument(
1416        &mut self,
1417        instrument_id: InstrumentId,
1418        start: Option<u64>,
1419        end: Option<u64>,
1420        client_id: Option<ClientId>,
1421        params: Option<IndexMap<String, String>>,
1422    ) -> PyResult<String> {
1423        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1424        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1425
1426        let request_id = DataActor::request_instrument(
1427            self.inner_mut(),
1428            instrument_id,
1429            start,
1430            end,
1431            client_id,
1432            params,
1433        )
1434        .map_err(to_pyvalue_err)?;
1435        Ok(request_id.to_string())
1436    }
1437
1438    #[pyo3(name = "request_instruments")]
1439    #[pyo3(signature = (venue=None, start=None, end=None, client_id=None, params=None))]
1440    fn py_request_instruments(
1441        &mut self,
1442        venue: Option<Venue>,
1443        start: Option<u64>,
1444        end: Option<u64>,
1445        client_id: Option<ClientId>,
1446        params: Option<IndexMap<String, String>>,
1447    ) -> PyResult<String> {
1448        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1449        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1450
1451        let request_id =
1452            DataActor::request_instruments(self.inner_mut(), venue, start, end, client_id, params)
1453                .map_err(to_pyvalue_err)?;
1454        Ok(request_id.to_string())
1455    }
1456
1457    #[pyo3(name = "request_book_snapshot")]
1458    #[pyo3(signature = (instrument_id, depth=None, client_id=None, params=None))]
1459    fn py_request_book_snapshot(
1460        &mut self,
1461        instrument_id: InstrumentId,
1462        depth: Option<usize>,
1463        client_id: Option<ClientId>,
1464        params: Option<IndexMap<String, String>>,
1465    ) -> PyResult<String> {
1466        let depth = depth.and_then(NonZeroUsize::new);
1467
1468        let request_id = DataActor::request_book_snapshot(
1469            self.inner_mut(),
1470            instrument_id,
1471            depth,
1472            client_id,
1473            params,
1474        )
1475        .map_err(to_pyvalue_err)?;
1476        Ok(request_id.to_string())
1477    }
1478
1479    #[pyo3(name = "request_quotes")]
1480    #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1481    fn py_request_quotes(
1482        &mut self,
1483        instrument_id: InstrumentId,
1484        start: Option<u64>,
1485        end: Option<u64>,
1486        limit: Option<usize>,
1487        client_id: Option<ClientId>,
1488        params: Option<IndexMap<String, String>>,
1489    ) -> PyResult<String> {
1490        let limit = limit.and_then(NonZeroUsize::new);
1491        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1492        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1493
1494        let request_id = DataActor::request_quotes(
1495            self.inner_mut(),
1496            instrument_id,
1497            start,
1498            end,
1499            limit,
1500            client_id,
1501            params,
1502        )
1503        .map_err(to_pyvalue_err)?;
1504        Ok(request_id.to_string())
1505    }
1506
1507    #[pyo3(name = "request_trades")]
1508    #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1509    fn py_request_trades(
1510        &mut self,
1511        instrument_id: InstrumentId,
1512        start: Option<u64>,
1513        end: Option<u64>,
1514        limit: Option<usize>,
1515        client_id: Option<ClientId>,
1516        params: Option<IndexMap<String, String>>,
1517    ) -> PyResult<String> {
1518        let limit = limit.and_then(NonZeroUsize::new);
1519        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1520        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1521
1522        let request_id = DataActor::request_trades(
1523            self.inner_mut(),
1524            instrument_id,
1525            start,
1526            end,
1527            limit,
1528            client_id,
1529            params,
1530        )
1531        .map_err(to_pyvalue_err)?;
1532        Ok(request_id.to_string())
1533    }
1534
1535    #[pyo3(name = "request_bars")]
1536    #[pyo3(signature = (bar_type, start=None, end=None, limit=None, client_id=None, params=None))]
1537    fn py_request_bars(
1538        &mut self,
1539        bar_type: BarType,
1540        start: Option<u64>,
1541        end: Option<u64>,
1542        limit: Option<usize>,
1543        client_id: Option<ClientId>,
1544        params: Option<IndexMap<String, String>>,
1545    ) -> PyResult<String> {
1546        let limit = limit.and_then(NonZeroUsize::new);
1547        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1548        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1549
1550        let request_id = DataActor::request_bars(
1551            self.inner_mut(),
1552            bar_type,
1553            start,
1554            end,
1555            limit,
1556            client_id,
1557            params,
1558        )
1559        .map_err(to_pyvalue_err)?;
1560        Ok(request_id.to_string())
1561    }
1562
1563    #[pyo3(name = "unsubscribe_data")]
1564    #[pyo3(signature = (data_type, client_id=None, params=None))]
1565    fn py_unsubscribe_data(
1566        &mut self,
1567        data_type: DataType,
1568        client_id: Option<ClientId>,
1569        params: Option<IndexMap<String, String>>,
1570    ) -> PyResult<()> {
1571        DataActor::unsubscribe_data(self.inner_mut(), data_type, client_id, params);
1572        Ok(())
1573    }
1574
1575    #[pyo3(name = "unsubscribe_instruments")]
1576    #[pyo3(signature = (venue, client_id=None, params=None))]
1577    fn py_unsubscribe_instruments(
1578        &mut self,
1579        venue: Venue,
1580        client_id: Option<ClientId>,
1581        params: Option<IndexMap<String, String>>,
1582    ) -> PyResult<()> {
1583        DataActor::unsubscribe_instruments(self.inner_mut(), venue, client_id, params);
1584        Ok(())
1585    }
1586
1587    #[pyo3(name = "unsubscribe_instrument")]
1588    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1589    fn py_unsubscribe_instrument(
1590        &mut self,
1591        instrument_id: InstrumentId,
1592        client_id: Option<ClientId>,
1593        params: Option<IndexMap<String, String>>,
1594    ) -> PyResult<()> {
1595        DataActor::unsubscribe_instrument(self.inner_mut(), instrument_id, client_id, params);
1596        Ok(())
1597    }
1598
1599    #[pyo3(name = "unsubscribe_book_deltas")]
1600    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1601    fn py_unsubscribe_book_deltas(
1602        &mut self,
1603        instrument_id: InstrumentId,
1604        client_id: Option<ClientId>,
1605        params: Option<IndexMap<String, String>>,
1606    ) -> PyResult<()> {
1607        DataActor::unsubscribe_book_deltas(self.inner_mut(), instrument_id, client_id, params);
1608        Ok(())
1609    }
1610
1611    #[pyo3(name = "unsubscribe_book_at_interval")]
1612    #[pyo3(signature = (instrument_id, interval_ms, client_id=None, params=None))]
1613    fn py_unsubscribe_book_at_interval(
1614        &mut self,
1615        instrument_id: InstrumentId,
1616        interval_ms: usize,
1617        client_id: Option<ClientId>,
1618        params: Option<IndexMap<String, String>>,
1619    ) -> PyResult<()> {
1620        let interval_ms = NonZeroUsize::new(interval_ms)
1621            .ok_or_else(|| to_pyvalue_err("interval_ms must be > 0"))?;
1622
1623        DataActor::unsubscribe_book_at_interval(
1624            self.inner_mut(),
1625            instrument_id,
1626            interval_ms,
1627            client_id,
1628            params,
1629        );
1630        Ok(())
1631    }
1632
1633    #[pyo3(name = "unsubscribe_quotes")]
1634    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1635    fn py_unsubscribe_quotes(
1636        &mut self,
1637        instrument_id: InstrumentId,
1638        client_id: Option<ClientId>,
1639        params: Option<IndexMap<String, String>>,
1640    ) -> PyResult<()> {
1641        DataActor::unsubscribe_quotes(self.inner_mut(), instrument_id, client_id, params);
1642        Ok(())
1643    }
1644
1645    #[pyo3(name = "unsubscribe_trades")]
1646    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1647    fn py_unsubscribe_trades(
1648        &mut self,
1649        instrument_id: InstrumentId,
1650        client_id: Option<ClientId>,
1651        params: Option<IndexMap<String, String>>,
1652    ) -> PyResult<()> {
1653        DataActor::unsubscribe_trades(self.inner_mut(), instrument_id, client_id, params);
1654        Ok(())
1655    }
1656
1657    #[pyo3(name = "unsubscribe_bars")]
1658    #[pyo3(signature = (bar_type, client_id=None, params=None))]
1659    fn py_unsubscribe_bars(
1660        &mut self,
1661        bar_type: BarType,
1662        client_id: Option<ClientId>,
1663        params: Option<IndexMap<String, String>>,
1664    ) -> PyResult<()> {
1665        DataActor::unsubscribe_bars(self.inner_mut(), bar_type, client_id, params);
1666        Ok(())
1667    }
1668
1669    #[pyo3(name = "unsubscribe_mark_prices")]
1670    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1671    fn py_unsubscribe_mark_prices(
1672        &mut self,
1673        instrument_id: InstrumentId,
1674        client_id: Option<ClientId>,
1675        params: Option<IndexMap<String, String>>,
1676    ) -> PyResult<()> {
1677        DataActor::unsubscribe_mark_prices(self.inner_mut(), instrument_id, client_id, params);
1678        Ok(())
1679    }
1680
1681    #[pyo3(name = "unsubscribe_index_prices")]
1682    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1683    fn py_unsubscribe_index_prices(
1684        &mut self,
1685        instrument_id: InstrumentId,
1686        client_id: Option<ClientId>,
1687        params: Option<IndexMap<String, String>>,
1688    ) -> PyResult<()> {
1689        DataActor::unsubscribe_index_prices(self.inner_mut(), instrument_id, client_id, params);
1690        Ok(())
1691    }
1692
1693    #[pyo3(name = "unsubscribe_instrument_status")]
1694    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1695    fn py_unsubscribe_instrument_status(
1696        &mut self,
1697        instrument_id: InstrumentId,
1698        client_id: Option<ClientId>,
1699        params: Option<IndexMap<String, String>>,
1700    ) -> PyResult<()> {
1701        DataActor::unsubscribe_instrument_status(
1702            self.inner_mut(),
1703            instrument_id,
1704            client_id,
1705            params,
1706        );
1707        Ok(())
1708    }
1709
1710    #[pyo3(name = "unsubscribe_instrument_close")]
1711    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1712    fn py_unsubscribe_instrument_close(
1713        &mut self,
1714        instrument_id: InstrumentId,
1715        client_id: Option<ClientId>,
1716        params: Option<IndexMap<String, String>>,
1717    ) -> PyResult<()> {
1718        DataActor::unsubscribe_instrument_close(self.inner_mut(), instrument_id, client_id, params);
1719        Ok(())
1720    }
1721
1722    #[pyo3(name = "unsubscribe_order_fills")]
1723    #[pyo3(signature = (instrument_id))]
1724    fn py_unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1725        DataActor::unsubscribe_order_fills(self.inner_mut(), instrument_id);
1726        Ok(())
1727    }
1728
1729    #[pyo3(name = "unsubscribe_order_cancels")]
1730    #[pyo3(signature = (instrument_id))]
1731    fn py_unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1732        DataActor::unsubscribe_order_cancels(self.inner_mut(), instrument_id);
1733        Ok(())
1734    }
1735
1736    #[cfg(feature = "defi")]
1737    #[pyo3(name = "unsubscribe_blocks")]
1738    #[pyo3(signature = (chain, client_id=None, params=None))]
1739    fn py_unsubscribe_blocks(
1740        &mut self,
1741        chain: Blockchain,
1742        client_id: Option<ClientId>,
1743        params: Option<IndexMap<String, String>>,
1744    ) -> PyResult<()> {
1745        DataActor::unsubscribe_blocks(self.inner_mut(), chain, client_id, params);
1746        Ok(())
1747    }
1748
1749    #[cfg(feature = "defi")]
1750    #[pyo3(name = "unsubscribe_pool")]
1751    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1752    fn py_unsubscribe_pool(
1753        &mut self,
1754        instrument_id: InstrumentId,
1755        client_id: Option<ClientId>,
1756        params: Option<IndexMap<String, String>>,
1757    ) -> PyResult<()> {
1758        DataActor::unsubscribe_pool(self.inner_mut(), instrument_id, client_id, params);
1759        Ok(())
1760    }
1761
1762    #[cfg(feature = "defi")]
1763    #[pyo3(name = "unsubscribe_pool_swaps")]
1764    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1765    fn py_unsubscribe_pool_swaps(
1766        &mut self,
1767        instrument_id: InstrumentId,
1768        client_id: Option<ClientId>,
1769        params: Option<IndexMap<String, String>>,
1770    ) -> PyResult<()> {
1771        DataActor::unsubscribe_pool_swaps(self.inner_mut(), instrument_id, client_id, params);
1772        Ok(())
1773    }
1774
1775    #[cfg(feature = "defi")]
1776    #[pyo3(name = "unsubscribe_pool_liquidity_updates")]
1777    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1778    fn py_unsubscribe_pool_liquidity_updates(
1779        &mut self,
1780        instrument_id: InstrumentId,
1781        client_id: Option<ClientId>,
1782        params: Option<IndexMap<String, String>>,
1783    ) -> PyResult<()> {
1784        DataActor::unsubscribe_pool_liquidity_updates(
1785            self.inner_mut(),
1786            instrument_id,
1787            client_id,
1788            params,
1789        );
1790        Ok(())
1791    }
1792
1793    #[cfg(feature = "defi")]
1794    #[pyo3(name = "unsubscribe_pool_fee_collects")]
1795    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1796    fn py_unsubscribe_pool_fee_collects(
1797        &mut self,
1798        instrument_id: InstrumentId,
1799        client_id: Option<ClientId>,
1800        params: Option<IndexMap<String, String>>,
1801    ) -> PyResult<()> {
1802        DataActor::unsubscribe_pool_fee_collects(
1803            self.inner_mut(),
1804            instrument_id,
1805            client_id,
1806            params,
1807        );
1808        Ok(())
1809    }
1810
1811    #[cfg(feature = "defi")]
1812    #[pyo3(name = "unsubscribe_pool_flash_events")]
1813    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1814    fn py_unsubscribe_pool_flash_events(
1815        &mut self,
1816        instrument_id: InstrumentId,
1817        client_id: Option<ClientId>,
1818        params: Option<IndexMap<String, String>>,
1819    ) -> PyResult<()> {
1820        DataActor::unsubscribe_pool_flash_events(
1821            self.inner_mut(),
1822            instrument_id,
1823            client_id,
1824            params,
1825        );
1826        Ok(())
1827    }
1828
1829    #[allow(unused_variables)]
1830    #[pyo3(name = "on_historical_data")]
1831    fn py_on_historical_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
1832        // Default implementation - can be overridden in Python subclasses
1833        Ok(())
1834    }
1835
1836    #[allow(unused_variables)]
1837    #[pyo3(name = "on_historical_quotes")]
1838    fn py_on_historical_quotes(&mut self, quotes: Vec<QuoteTick>) -> PyResult<()> {
1839        // Default implementation - can be overridden in Python subclasses
1840        Ok(())
1841    }
1842
1843    #[allow(unused_variables)]
1844    #[pyo3(name = "on_historical_trades")]
1845    fn py_on_historical_trades(&mut self, trades: Vec<TradeTick>) -> PyResult<()> {
1846        // Default implementation - can be overridden in Python subclasses
1847        Ok(())
1848    }
1849
1850    #[allow(unused_variables)]
1851    #[pyo3(name = "on_historical_bars")]
1852    fn py_on_historical_bars(&mut self, bars: Vec<Bar>) -> PyResult<()> {
1853        // Default implementation - can be overridden in Python subclasses
1854        Ok(())
1855    }
1856
1857    #[allow(unused_variables)]
1858    #[pyo3(name = "on_historical_mark_prices")]
1859    fn py_on_historical_mark_prices(&mut self, mark_prices: Vec<MarkPriceUpdate>) -> PyResult<()> {
1860        // Default implementation - can be overridden in Python subclasses
1861        Ok(())
1862    }
1863
1864    #[allow(unused_variables)]
1865    #[pyo3(name = "on_historical_index_prices")]
1866    fn py_on_historical_index_prices(
1867        &mut self,
1868        index_prices: Vec<IndexPriceUpdate>,
1869    ) -> PyResult<()> {
1870        // Default implementation - can be overridden in Python subclasses
1871        Ok(())
1872    }
1873}
1874
1875#[cfg(test)]
1876mod tests {
1877    use std::{
1878        any::Any,
1879        cell::RefCell,
1880        collections::HashMap,
1881        ops::{Deref, DerefMut},
1882        rc::Rc,
1883        str::FromStr,
1884        sync::{Arc, Mutex},
1885    };
1886
1887    #[cfg(feature = "defi")]
1888    use alloy_primitives::{I256, U160};
1889    use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
1890    #[cfg(feature = "defi")]
1891    use nautilus_model::defi::{
1892        AmmType, Block, Blockchain, Chain, Dex, DexType, Pool, PoolIdentifier, PoolLiquidityUpdate,
1893        PoolSwap, Token,
1894    };
1895    use nautilus_model::{
1896        data::{
1897            Bar, BarType, DataType, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
1898            OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
1899        },
1900        enums::{AggressorSide, BookType, InstrumentCloseType, MarketStatusAction},
1901        identifiers::{ClientId, TradeId, TraderId, Venue},
1902        instruments::{CurrencyPair, InstrumentAny, stubs::audusd_sim},
1903        orderbook::OrderBook,
1904        types::{Price, Quantity},
1905    };
1906    use pyo3::{Py, PyAny, PyResult, Python, ffi::c_str, types::PyAnyMethods};
1907    use rstest::{fixture, rstest};
1908    use ustr::Ustr;
1909
1910    use super::PyDataActor;
1911    use crate::{
1912        actor::{DataActor, data_actor::DataActorCore},
1913        cache::Cache,
1914        clock::TestClock,
1915        component::Component,
1916        enums::ComponentState,
1917        runner::{SyncDataCommandSender, set_data_cmd_sender},
1918        signal::Signal,
1919        timer::TimeEvent,
1920    };
1921
1922    #[fixture]
1923    fn clock() -> Rc<RefCell<TestClock>> {
1924        Rc::new(RefCell::new(TestClock::new()))
1925    }
1926
1927    #[fixture]
1928    fn cache() -> Rc<RefCell<Cache>> {
1929        Rc::new(RefCell::new(Cache::new(None, None)))
1930    }
1931
1932    #[fixture]
1933    fn trader_id() -> TraderId {
1934        TraderId::from("TRADER-001")
1935    }
1936
1937    #[fixture]
1938    fn client_id() -> ClientId {
1939        ClientId::new("TestClient")
1940    }
1941
1942    #[fixture]
1943    fn venue() -> Venue {
1944        Venue::from("SIM")
1945    }
1946
1947    #[fixture]
1948    fn data_type() -> DataType {
1949        DataType::new("TestData", None)
1950    }
1951
1952    #[fixture]
1953    fn bar_type(audusd_sim: CurrencyPair) -> BarType {
1954        BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap()
1955    }
1956
1957    fn create_unregistered_actor() -> PyDataActor {
1958        PyDataActor::new(None)
1959    }
1960
1961    fn create_registered_actor(
1962        clock: Rc<RefCell<TestClock>>,
1963        cache: Rc<RefCell<Cache>>,
1964        trader_id: TraderId,
1965    ) -> PyDataActor {
1966        // Set up sync data command sender for tests
1967        let sender = SyncDataCommandSender;
1968        set_data_cmd_sender(Arc::new(sender));
1969
1970        let mut actor = PyDataActor::new(None);
1971        actor.register(trader_id, clock, cache).unwrap();
1972        actor
1973    }
1974
1975    #[rstest]
1976    fn test_new_actor_creation() {
1977        let actor = PyDataActor::new(None);
1978        assert!(actor.trader_id().is_none());
1979    }
1980
1981    #[rstest]
1982    fn test_clock_access_before_registration_raises_error() {
1983        let actor = PyDataActor::new(None);
1984
1985        // Accessing clock before registration should raise PyRuntimeError
1986        let result = actor.py_clock();
1987        assert!(result.is_err());
1988
1989        let error = result.unwrap_err();
1990        pyo3::Python::initialize();
1991        pyo3::Python::attach(|py| {
1992            assert!(error.is_instance_of::<pyo3::exceptions::PyRuntimeError>(py));
1993        });
1994
1995        let error_msg = error.to_string();
1996        assert!(
1997            error_msg.contains("Actor must be registered with a trader before accessing clock")
1998        );
1999    }
2000
2001    #[rstest]
2002    fn test_unregistered_actor_methods_work() {
2003        let actor = create_unregistered_actor();
2004
2005        assert!(!actor.py_is_ready());
2006        assert!(!actor.py_is_running());
2007        assert!(!actor.py_is_stopped());
2008        assert!(!actor.py_is_disposed());
2009        assert!(!actor.py_is_degraded());
2010        assert!(!actor.py_is_faulted());
2011
2012        // Verify unregistered state
2013        assert_eq!(actor.trader_id(), None);
2014    }
2015
2016    #[rstest]
2017    fn test_registration_success(
2018        clock: Rc<RefCell<TestClock>>,
2019        cache: Rc<RefCell<Cache>>,
2020        trader_id: TraderId,
2021    ) {
2022        let mut actor = create_unregistered_actor();
2023        actor.register(trader_id, clock, cache).unwrap();
2024        assert!(actor.trader_id().is_some());
2025        assert_eq!(actor.trader_id().unwrap(), trader_id);
2026    }
2027
2028    #[rstest]
2029    fn test_registered_actor_basic_properties(
2030        clock: Rc<RefCell<TestClock>>,
2031        cache: Rc<RefCell<Cache>>,
2032        trader_id: TraderId,
2033    ) {
2034        let actor = create_registered_actor(clock, cache, trader_id);
2035
2036        assert_eq!(actor.state(), ComponentState::Ready);
2037        assert_eq!(actor.trader_id(), Some(TraderId::from("TRADER-001")));
2038        assert!(actor.py_is_ready());
2039        assert!(!actor.py_is_running());
2040        assert!(!actor.py_is_stopped());
2041        assert!(!actor.py_is_disposed());
2042        assert!(!actor.py_is_degraded());
2043        assert!(!actor.py_is_faulted());
2044    }
2045
2046    #[rstest]
2047    fn test_basic_subscription_methods_compile(
2048        clock: Rc<RefCell<TestClock>>,
2049        cache: Rc<RefCell<Cache>>,
2050        trader_id: TraderId,
2051        data_type: DataType,
2052        client_id: ClientId,
2053        audusd_sim: CurrencyPair,
2054    ) {
2055        let mut actor = create_registered_actor(clock, cache, trader_id);
2056
2057        assert!(
2058            actor
2059                .py_subscribe_data(data_type.clone(), Some(client_id), None)
2060                .is_ok()
2061        );
2062        assert!(
2063            actor
2064                .py_subscribe_quotes(audusd_sim.id, Some(client_id), None)
2065                .is_ok()
2066        );
2067        assert!(
2068            actor
2069                .py_unsubscribe_data(data_type, Some(client_id), None)
2070                .is_ok()
2071        );
2072        assert!(
2073            actor
2074                .py_unsubscribe_quotes(audusd_sim.id, Some(client_id), None)
2075                .is_ok()
2076        );
2077    }
2078
2079    #[rstest]
2080    fn test_shutdown_system_passes_through(
2081        clock: Rc<RefCell<TestClock>>,
2082        cache: Rc<RefCell<Cache>>,
2083        trader_id: TraderId,
2084    ) {
2085        let actor = create_registered_actor(clock, cache, trader_id);
2086
2087        assert!(
2088            actor
2089                .py_shutdown_system(Some("Test shutdown".to_string()))
2090                .is_ok()
2091        );
2092        assert!(actor.py_shutdown_system(None).is_ok());
2093    }
2094
2095    #[rstest]
2096    fn test_book_at_interval_invalid_interval_ms(
2097        clock: Rc<RefCell<TestClock>>,
2098        cache: Rc<RefCell<Cache>>,
2099        trader_id: TraderId,
2100        audusd_sim: CurrencyPair,
2101    ) {
2102        pyo3::Python::initialize();
2103        let mut actor = create_registered_actor(clock, cache, trader_id);
2104
2105        let result = actor.py_subscribe_book_at_interval(
2106            audusd_sim.id,
2107            BookType::L2_MBP,
2108            0,
2109            None,
2110            None,
2111            None,
2112        );
2113        assert!(result.is_err());
2114        assert_eq!(
2115            result.unwrap_err().to_string(),
2116            "ValueError: interval_ms must be > 0"
2117        );
2118
2119        let result = actor.py_unsubscribe_book_at_interval(audusd_sim.id, 0, None, None);
2120        assert!(result.is_err());
2121        assert_eq!(
2122            result.unwrap_err().to_string(),
2123            "ValueError: interval_ms must be > 0"
2124        );
2125    }
2126
2127    #[rstest]
2128    fn test_request_methods_signatures_exist() {
2129        let actor = create_unregistered_actor();
2130        assert!(actor.trader_id().is_none());
2131    }
2132
2133    #[rstest]
2134    fn test_data_actor_trait_implementation(
2135        clock: Rc<RefCell<TestClock>>,
2136        cache: Rc<RefCell<Cache>>,
2137        trader_id: TraderId,
2138    ) {
2139        let actor = create_registered_actor(clock, cache, trader_id);
2140        let state = actor.state();
2141        assert_eq!(state, ComponentState::Ready);
2142    }
2143
2144    static CALL_TRACKER: std::sync::LazyLock<Arc<Mutex<HashMap<String, i32>>>> =
2145        std::sync::LazyLock::new(|| Arc::new(Mutex::new(HashMap::new())));
2146
2147    #[derive(Debug)]
2148    struct TestDataActor {
2149        inner: PyDataActor,
2150    }
2151
2152    impl TestDataActor {
2153        fn new() -> Self {
2154            Self {
2155                inner: PyDataActor::new(None),
2156            }
2157        }
2158
2159        fn track_call(&self, handler_name: &str) {
2160            let mut tracker = CALL_TRACKER.lock().expect(MUTEX_POISONED);
2161            *tracker.entry(handler_name.to_string()).or_insert(0) += 1;
2162        }
2163
2164        fn get_call_count(&self, handler_name: &str) -> i32 {
2165            let tracker = CALL_TRACKER.lock().expect(MUTEX_POISONED);
2166            tracker.get(handler_name).copied().unwrap_or(0)
2167        }
2168
2169        fn reset_tracker(&self) {
2170            let mut tracker = CALL_TRACKER.lock().expect(MUTEX_POISONED);
2171            tracker.clear();
2172        }
2173    }
2174
2175    impl Deref for TestDataActor {
2176        type Target = DataActorCore;
2177        fn deref(&self) -> &Self::Target {
2178            &self.inner.inner().core
2179        }
2180    }
2181
2182    impl DerefMut for TestDataActor {
2183        fn deref_mut(&mut self) -> &mut Self::Target {
2184            &mut self.inner.inner_mut().core
2185        }
2186    }
2187
2188    impl DataActor for TestDataActor {
2189        fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
2190            self.track_call("on_time_event");
2191            self.inner.inner_mut().on_time_event(event)
2192        }
2193
2194        fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
2195            self.track_call("on_data");
2196            self.inner.inner_mut().on_data(data)
2197        }
2198
2199        fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
2200            self.track_call("on_signal");
2201            self.inner.inner_mut().on_signal(signal)
2202        }
2203
2204        fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
2205            self.track_call("on_instrument");
2206            self.inner.inner_mut().on_instrument(instrument)
2207        }
2208
2209        fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
2210            self.track_call("on_quote");
2211            self.inner.inner_mut().on_quote(quote)
2212        }
2213
2214        fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
2215            self.track_call("on_trade");
2216            self.inner.inner_mut().on_trade(trade)
2217        }
2218
2219        fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
2220            self.track_call("on_bar");
2221            self.inner.inner_mut().on_bar(bar)
2222        }
2223
2224        fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
2225            self.track_call("on_book");
2226            self.inner.inner_mut().on_book(book)
2227        }
2228
2229        fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
2230            self.track_call("on_book_deltas");
2231            self.inner.inner_mut().on_book_deltas(deltas)
2232        }
2233
2234        fn on_mark_price(&mut self, update: &MarkPriceUpdate) -> anyhow::Result<()> {
2235            self.track_call("on_mark_price");
2236            self.inner.inner_mut().on_mark_price(update)
2237        }
2238
2239        fn on_index_price(&mut self, update: &IndexPriceUpdate) -> anyhow::Result<()> {
2240            self.track_call("on_index_price");
2241            self.inner.inner_mut().on_index_price(update)
2242        }
2243
2244        fn on_instrument_status(&mut self, update: &InstrumentStatus) -> anyhow::Result<()> {
2245            self.track_call("on_instrument_status");
2246            self.inner.inner_mut().on_instrument_status(update)
2247        }
2248
2249        fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
2250            self.track_call("on_instrument_close");
2251            self.inner.inner_mut().on_instrument_close(update)
2252        }
2253
2254        #[cfg(feature = "defi")]
2255        fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
2256            self.track_call("on_block");
2257            self.inner.inner_mut().on_block(block)
2258        }
2259
2260        #[cfg(feature = "defi")]
2261        fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
2262            self.track_call("on_pool");
2263            self.inner.inner_mut().on_pool(pool)
2264        }
2265
2266        #[cfg(feature = "defi")]
2267        fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
2268            self.track_call("on_pool_swap");
2269            self.inner.inner_mut().on_pool_swap(swap)
2270        }
2271
2272        #[cfg(feature = "defi")]
2273        fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
2274            self.track_call("on_pool_liquidity_update");
2275            self.inner.inner_mut().on_pool_liquidity_update(update)
2276        }
2277    }
2278
2279    #[rstest]
2280    fn test_python_on_signal_handler(
2281        clock: Rc<RefCell<TestClock>>,
2282        cache: Rc<RefCell<Cache>>,
2283        trader_id: TraderId,
2284    ) {
2285        pyo3::Python::initialize();
2286        let mut test_actor = TestDataActor::new();
2287        test_actor.reset_tracker();
2288        test_actor.register(trader_id, clock, cache).unwrap();
2289
2290        let signal = Signal::new(
2291            Ustr::from("test_signal"),
2292            "1.0".to_string(),
2293            UnixNanos::default(),
2294            UnixNanos::default(),
2295        );
2296
2297        assert!(test_actor.on_signal(&signal).is_ok());
2298        assert_eq!(test_actor.get_call_count("on_signal"), 1);
2299    }
2300
2301    #[rstest]
2302    fn test_python_on_data_handler(
2303        clock: Rc<RefCell<TestClock>>,
2304        cache: Rc<RefCell<Cache>>,
2305        trader_id: TraderId,
2306    ) {
2307        pyo3::Python::initialize();
2308        let mut test_actor = TestDataActor::new();
2309        test_actor.reset_tracker();
2310        test_actor.register(trader_id, clock, cache).unwrap();
2311
2312        assert!(test_actor.on_data(&()).is_ok());
2313        assert_eq!(test_actor.get_call_count("on_data"), 1);
2314    }
2315
2316    #[rstest]
2317    fn test_python_on_time_event_handler(
2318        clock: Rc<RefCell<TestClock>>,
2319        cache: Rc<RefCell<Cache>>,
2320        trader_id: TraderId,
2321    ) {
2322        pyo3::Python::initialize();
2323        let mut test_actor = TestDataActor::new();
2324        test_actor.reset_tracker();
2325        test_actor.register(trader_id, clock, cache).unwrap();
2326
2327        let time_event = TimeEvent::new(
2328            Ustr::from("test_timer"),
2329            UUID4::new(),
2330            UnixNanos::default(),
2331            UnixNanos::default(),
2332        );
2333
2334        assert!(test_actor.on_time_event(&time_event).is_ok());
2335        assert_eq!(test_actor.get_call_count("on_time_event"), 1);
2336    }
2337
2338    #[rstest]
2339    fn test_python_on_instrument_handler(
2340        clock: Rc<RefCell<TestClock>>,
2341        cache: Rc<RefCell<Cache>>,
2342        trader_id: TraderId,
2343        audusd_sim: CurrencyPair,
2344    ) {
2345        pyo3::Python::initialize();
2346        let mut rust_actor = PyDataActor::new(None);
2347        rust_actor.register(trader_id, clock, cache).unwrap();
2348
2349        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2350
2351        assert!(rust_actor.inner_mut().on_instrument(&instrument).is_ok());
2352    }
2353
2354    #[rstest]
2355    fn test_python_on_quote_handler(
2356        clock: Rc<RefCell<TestClock>>,
2357        cache: Rc<RefCell<Cache>>,
2358        trader_id: TraderId,
2359        audusd_sim: CurrencyPair,
2360    ) {
2361        pyo3::Python::initialize();
2362        let mut rust_actor = PyDataActor::new(None);
2363        rust_actor.register(trader_id, clock, cache).unwrap();
2364
2365        let quote = QuoteTick::new(
2366            audusd_sim.id,
2367            Price::from("1.0000"),
2368            Price::from("1.0001"),
2369            Quantity::from("100000"),
2370            Quantity::from("100000"),
2371            UnixNanos::default(),
2372            UnixNanos::default(),
2373        );
2374
2375        assert!(rust_actor.inner_mut().on_quote(&quote).is_ok());
2376    }
2377
2378    #[rstest]
2379    fn test_python_on_trade_handler(
2380        clock: Rc<RefCell<TestClock>>,
2381        cache: Rc<RefCell<Cache>>,
2382        trader_id: TraderId,
2383        audusd_sim: CurrencyPair,
2384    ) {
2385        pyo3::Python::initialize();
2386        let mut rust_actor = PyDataActor::new(None);
2387        rust_actor.register(trader_id, clock, cache).unwrap();
2388
2389        let trade = TradeTick::new(
2390            audusd_sim.id,
2391            Price::from("1.0000"),
2392            Quantity::from("100000"),
2393            AggressorSide::Buyer,
2394            "T123".to_string().into(),
2395            UnixNanos::default(),
2396            UnixNanos::default(),
2397        );
2398
2399        assert!(rust_actor.inner_mut().on_trade(&trade).is_ok());
2400    }
2401
2402    #[rstest]
2403    fn test_python_on_bar_handler(
2404        clock: Rc<RefCell<TestClock>>,
2405        cache: Rc<RefCell<Cache>>,
2406        trader_id: TraderId,
2407        audusd_sim: CurrencyPair,
2408    ) {
2409        pyo3::Python::initialize();
2410        let mut rust_actor = PyDataActor::new(None);
2411        rust_actor.register(trader_id, clock, cache).unwrap();
2412
2413        let bar_type =
2414            BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap();
2415        let bar = Bar::new(
2416            bar_type,
2417            Price::from("1.0000"),
2418            Price::from("1.0001"),
2419            Price::from("0.9999"),
2420            Price::from("1.0000"),
2421            Quantity::from("100000"),
2422            UnixNanos::default(),
2423            UnixNanos::default(),
2424        );
2425
2426        assert!(rust_actor.inner_mut().on_bar(&bar).is_ok());
2427    }
2428
2429    #[rstest]
2430    fn test_python_on_book_handler(
2431        clock: Rc<RefCell<TestClock>>,
2432        cache: Rc<RefCell<Cache>>,
2433        trader_id: TraderId,
2434        audusd_sim: CurrencyPair,
2435    ) {
2436        pyo3::Python::initialize();
2437        let mut rust_actor = PyDataActor::new(None);
2438        rust_actor.register(trader_id, clock, cache).unwrap();
2439
2440        let book = OrderBook::new(audusd_sim.id, BookType::L2_MBP);
2441        assert!(rust_actor.inner_mut().on_book(&book).is_ok());
2442    }
2443
2444    #[rstest]
2445    fn test_python_on_book_deltas_handler(
2446        clock: Rc<RefCell<TestClock>>,
2447        cache: Rc<RefCell<Cache>>,
2448        trader_id: TraderId,
2449        audusd_sim: CurrencyPair,
2450    ) {
2451        pyo3::Python::initialize();
2452        let mut rust_actor = PyDataActor::new(None);
2453        rust_actor.register(trader_id, clock, cache).unwrap();
2454
2455        let delta =
2456            OrderBookDelta::clear(audusd_sim.id, 0, UnixNanos::default(), UnixNanos::default());
2457        let deltas = OrderBookDeltas::new(audusd_sim.id, vec![delta]);
2458
2459        assert!(rust_actor.inner_mut().on_book_deltas(&deltas).is_ok());
2460    }
2461
2462    #[rstest]
2463    fn test_python_on_mark_price_handler(
2464        clock: Rc<RefCell<TestClock>>,
2465        cache: Rc<RefCell<Cache>>,
2466        trader_id: TraderId,
2467        audusd_sim: CurrencyPair,
2468    ) {
2469        pyo3::Python::initialize();
2470        let mut rust_actor = PyDataActor::new(None);
2471        rust_actor.register(trader_id, clock, cache).unwrap();
2472
2473        let mark_price = MarkPriceUpdate::new(
2474            audusd_sim.id,
2475            Price::from("1.0000"),
2476            UnixNanos::default(),
2477            UnixNanos::default(),
2478        );
2479
2480        assert!(rust_actor.inner_mut().on_mark_price(&mark_price).is_ok());
2481    }
2482
2483    #[rstest]
2484    fn test_python_on_index_price_handler(
2485        clock: Rc<RefCell<TestClock>>,
2486        cache: Rc<RefCell<Cache>>,
2487        trader_id: TraderId,
2488        audusd_sim: CurrencyPair,
2489    ) {
2490        pyo3::Python::initialize();
2491        let mut rust_actor = PyDataActor::new(None);
2492        rust_actor.register(trader_id, clock, cache).unwrap();
2493
2494        let index_price = IndexPriceUpdate::new(
2495            audusd_sim.id,
2496            Price::from("1.0000"),
2497            UnixNanos::default(),
2498            UnixNanos::default(),
2499        );
2500
2501        assert!(rust_actor.inner_mut().on_index_price(&index_price).is_ok());
2502    }
2503
2504    #[rstest]
2505    fn test_python_on_instrument_status_handler(
2506        clock: Rc<RefCell<TestClock>>,
2507        cache: Rc<RefCell<Cache>>,
2508        trader_id: TraderId,
2509        audusd_sim: CurrencyPair,
2510    ) {
2511        pyo3::Python::initialize();
2512        let mut rust_actor = PyDataActor::new(None);
2513        rust_actor.register(trader_id, clock, cache).unwrap();
2514
2515        let status = InstrumentStatus::new(
2516            audusd_sim.id,
2517            MarketStatusAction::Trading,
2518            UnixNanos::default(),
2519            UnixNanos::default(),
2520            None,
2521            None,
2522            None,
2523            None,
2524            None,
2525        );
2526
2527        assert!(rust_actor.inner_mut().on_instrument_status(&status).is_ok());
2528    }
2529
2530    #[rstest]
2531    fn test_python_on_instrument_close_handler(
2532        clock: Rc<RefCell<TestClock>>,
2533        cache: Rc<RefCell<Cache>>,
2534        trader_id: TraderId,
2535        audusd_sim: CurrencyPair,
2536    ) {
2537        pyo3::Python::initialize();
2538        let mut rust_actor = PyDataActor::new(None);
2539        rust_actor.register(trader_id, clock, cache).unwrap();
2540
2541        let close = InstrumentClose::new(
2542            audusd_sim.id,
2543            Price::from("1.0000"),
2544            InstrumentCloseType::EndOfSession,
2545            UnixNanos::default(),
2546            UnixNanos::default(),
2547        );
2548
2549        assert!(rust_actor.inner_mut().on_instrument_close(&close).is_ok());
2550    }
2551
2552    #[cfg(feature = "defi")]
2553    #[rstest]
2554    fn test_python_on_block_handler(
2555        clock: Rc<RefCell<TestClock>>,
2556        cache: Rc<RefCell<Cache>>,
2557        trader_id: TraderId,
2558    ) {
2559        pyo3::Python::initialize();
2560        let mut test_actor = TestDataActor::new();
2561        test_actor.reset_tracker();
2562        test_actor.register(trader_id, clock, cache).unwrap();
2563
2564        let block = Block::new(
2565            "0x1234567890abcdef".to_string(),
2566            "0xabcdef1234567890".to_string(),
2567            12345,
2568            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0".into(),
2569            21000,
2570            20000,
2571            UnixNanos::default(),
2572            Some(Blockchain::Ethereum),
2573        );
2574
2575        assert!(test_actor.on_block(&block).is_ok());
2576        assert_eq!(test_actor.get_call_count("on_block"), 1);
2577    }
2578
2579    #[cfg(feature = "defi")]
2580    #[rstest]
2581    fn test_python_on_pool_swap_handler(
2582        clock: Rc<RefCell<TestClock>>,
2583        cache: Rc<RefCell<Cache>>,
2584        trader_id: TraderId,
2585    ) {
2586        pyo3::Python::initialize();
2587        let mut rust_actor = PyDataActor::new(None);
2588        rust_actor.register(trader_id, clock, cache).unwrap();
2589
2590        let chain = Arc::new(Chain::new(Blockchain::Ethereum, 1));
2591        let dex = Arc::new(Dex::new(
2592            Chain::new(Blockchain::Ethereum, 1),
2593            DexType::UniswapV3,
2594            "0x1F98431c8aD98523631AE4a59f267346ea31F984",
2595            0,
2596            AmmType::CLAMM,
2597            "PoolCreated",
2598            "Swap",
2599            "Mint",
2600            "Burn",
2601            "Collect",
2602        ));
2603        let token0 = Token::new(
2604            chain.clone(),
2605            "0xa0b86a33e6441c8c06dd7b111a8c4e82e2b2a5e1"
2606                .parse()
2607                .unwrap(),
2608            "USDC".into(),
2609            "USD Coin".into(),
2610            6,
2611        );
2612        let token1 = Token::new(
2613            chain.clone(),
2614            "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
2615                .parse()
2616                .unwrap(),
2617            "WETH".into(),
2618            "Wrapped Ether".into(),
2619            18,
2620        );
2621        let pool_address = "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
2622            .parse()
2623            .unwrap();
2624        let pool_identifier: PoolIdentifier = "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
2625            .parse()
2626            .unwrap();
2627        let pool = Arc::new(Pool::new(
2628            chain.clone(),
2629            dex.clone(),
2630            pool_address,
2631            pool_identifier,
2632            12345,
2633            token0,
2634            token1,
2635            Some(500),
2636            Some(10),
2637            UnixNanos::default(),
2638        ));
2639
2640        let swap = PoolSwap::new(
2641            chain,
2642            dex,
2643            pool.instrument_id,
2644            pool.pool_identifier,
2645            12345,
2646            "0xabc123".to_string(),
2647            0,
2648            0,
2649            None,
2650            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
2651                .parse()
2652                .unwrap(),
2653            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
2654                .parse()
2655                .unwrap(),
2656            I256::from_str("1000000000000000000").unwrap(),
2657            I256::from_str("400000000000000").unwrap(),
2658            U160::from(59000000000000u128),
2659            1000000,
2660            100,
2661        );
2662
2663        assert!(rust_actor.inner_mut().on_pool_swap(&swap).is_ok());
2664    }
2665
2666    #[cfg(feature = "defi")]
2667    #[rstest]
2668    fn test_python_on_pool_liquidity_update_handler(
2669        clock: Rc<RefCell<TestClock>>,
2670        cache: Rc<RefCell<Cache>>,
2671        trader_id: TraderId,
2672    ) {
2673        pyo3::Python::initialize();
2674        let mut rust_actor = PyDataActor::new(None);
2675        rust_actor.register(trader_id, clock, cache).unwrap();
2676
2677        let block = Block::new(
2678            "0x1234567890abcdef".to_string(),
2679            "0xabcdef1234567890".to_string(),
2680            12345,
2681            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0".into(),
2682            21000,
2683            20000,
2684            UnixNanos::default(),
2685            Some(Blockchain::Ethereum),
2686        );
2687
2688        // We test on_block since PoolLiquidityUpdate construction is complex
2689        assert!(rust_actor.inner_mut().on_block(&block).is_ok());
2690    }
2691
2692    const TRACKING_ACTOR_CODE: &std::ffi::CStr = c_str!(
2693        r#"
2694class TrackingActor:
2695    """A mock Python actor that tracks all method calls."""
2696
2697    def __init__(self):
2698        self.calls = []
2699
2700    def _record(self, method_name, *args):
2701        self.calls.append((method_name, args))
2702
2703    def was_called(self, method_name):
2704        return any(call[0] == method_name for call in self.calls)
2705
2706    def call_count(self, method_name):
2707        return sum(1 for call in self.calls if call[0] == method_name)
2708
2709    def on_start(self):
2710        self._record("on_start")
2711
2712    def on_stop(self):
2713        self._record("on_stop")
2714
2715    def on_resume(self):
2716        self._record("on_resume")
2717
2718    def on_reset(self):
2719        self._record("on_reset")
2720
2721    def on_dispose(self):
2722        self._record("on_dispose")
2723
2724    def on_degrade(self):
2725        self._record("on_degrade")
2726
2727    def on_fault(self):
2728        self._record("on_fault")
2729
2730    def on_time_event(self, event):
2731        self._record("on_time_event", event)
2732
2733    def on_data(self, data):
2734        self._record("on_data", data)
2735
2736    def on_signal(self, signal):
2737        self._record("on_signal", signal)
2738
2739    def on_instrument(self, instrument):
2740        self._record("on_instrument", instrument)
2741
2742    def on_quote(self, quote):
2743        self._record("on_quote", quote)
2744
2745    def on_trade(self, trade):
2746        self._record("on_trade", trade)
2747
2748    def on_bar(self, bar):
2749        self._record("on_bar", bar)
2750
2751    def on_book(self, book):
2752        self._record("on_book", book)
2753
2754    def on_book_deltas(self, deltas):
2755        self._record("on_book_deltas", deltas)
2756
2757    def on_mark_price(self, update):
2758        self._record("on_mark_price", update)
2759
2760    def on_index_price(self, update):
2761        self._record("on_index_price", update)
2762
2763    def on_funding_rate(self, update):
2764        self._record("on_funding_rate", update)
2765
2766    def on_instrument_status(self, status):
2767        self._record("on_instrument_status", status)
2768
2769    def on_instrument_close(self, close):
2770        self._record("on_instrument_close", close)
2771
2772    def on_historical_data(self, data):
2773        self._record("on_historical_data", data)
2774
2775    def on_historical_quotes(self, quotes):
2776        self._record("on_historical_quotes", quotes)
2777
2778    def on_historical_trades(self, trades):
2779        self._record("on_historical_trades", trades)
2780
2781    def on_historical_bars(self, bars):
2782        self._record("on_historical_bars", bars)
2783
2784    def on_historical_mark_prices(self, prices):
2785        self._record("on_historical_mark_prices", prices)
2786
2787    def on_historical_index_prices(self, prices):
2788        self._record("on_historical_index_prices", prices)
2789"#
2790    );
2791
2792    fn create_tracking_python_actor(py: Python<'_>) -> PyResult<Py<PyAny>> {
2793        py.run(TRACKING_ACTOR_CODE, None, None)?;
2794        let tracking_actor_class = py.eval(c_str!("TrackingActor"), None, None)?;
2795        let instance = tracking_actor_class.call0()?;
2796        Ok(instance.unbind())
2797    }
2798
2799    fn python_method_was_called(py_actor: &Py<PyAny>, py: Python<'_>, method_name: &str) -> bool {
2800        py_actor
2801            .call_method1(py, "was_called", (method_name,))
2802            .and_then(|r| r.extract::<bool>(py))
2803            .unwrap_or(false)
2804    }
2805
2806    fn python_method_call_count(py_actor: &Py<PyAny>, py: Python<'_>, method_name: &str) -> i32 {
2807        py_actor
2808            .call_method1(py, "call_count", (method_name,))
2809            .and_then(|r| r.extract::<i32>(py))
2810            .unwrap_or(0)
2811    }
2812
2813    #[rstest]
2814    fn test_python_dispatch_on_start(
2815        clock: Rc<RefCell<TestClock>>,
2816        cache: Rc<RefCell<Cache>>,
2817        trader_id: TraderId,
2818    ) {
2819        pyo3::Python::initialize();
2820        Python::attach(|py| {
2821            let py_actor = create_tracking_python_actor(py).unwrap();
2822
2823            let mut rust_actor = PyDataActor::new(None);
2824            rust_actor.set_python_instance(py_actor.clone_ref(py));
2825            rust_actor.register(trader_id, clock, cache).unwrap();
2826
2827            let result = DataActor::on_start(rust_actor.inner_mut());
2828
2829            assert!(result.is_ok());
2830            assert!(python_method_was_called(&py_actor, py, "on_start"));
2831            assert_eq!(python_method_call_count(&py_actor, py, "on_start"), 1);
2832        });
2833    }
2834
2835    #[rstest]
2836    fn test_python_dispatch_on_stop(
2837        clock: Rc<RefCell<TestClock>>,
2838        cache: Rc<RefCell<Cache>>,
2839        trader_id: TraderId,
2840    ) {
2841        pyo3::Python::initialize();
2842        Python::attach(|py| {
2843            let py_actor = create_tracking_python_actor(py).unwrap();
2844
2845            let mut rust_actor = PyDataActor::new(None);
2846            rust_actor.set_python_instance(py_actor.clone_ref(py));
2847            rust_actor.register(trader_id, clock, cache).unwrap();
2848
2849            let result = DataActor::on_stop(rust_actor.inner_mut());
2850
2851            assert!(result.is_ok());
2852            assert!(python_method_was_called(&py_actor, py, "on_stop"));
2853        });
2854    }
2855
2856    #[rstest]
2857    fn test_python_dispatch_on_resume(
2858        clock: Rc<RefCell<TestClock>>,
2859        cache: Rc<RefCell<Cache>>,
2860        trader_id: TraderId,
2861    ) {
2862        pyo3::Python::initialize();
2863        Python::attach(|py| {
2864            let py_actor = create_tracking_python_actor(py).unwrap();
2865
2866            let mut rust_actor = PyDataActor::new(None);
2867            rust_actor.set_python_instance(py_actor.clone_ref(py));
2868            rust_actor.register(trader_id, clock, cache).unwrap();
2869
2870            let result = DataActor::on_resume(rust_actor.inner_mut());
2871
2872            assert!(result.is_ok());
2873            assert!(python_method_was_called(&py_actor, py, "on_resume"));
2874        });
2875    }
2876
2877    #[rstest]
2878    fn test_python_dispatch_on_reset(
2879        clock: Rc<RefCell<TestClock>>,
2880        cache: Rc<RefCell<Cache>>,
2881        trader_id: TraderId,
2882    ) {
2883        pyo3::Python::initialize();
2884        Python::attach(|py| {
2885            let py_actor = create_tracking_python_actor(py).unwrap();
2886
2887            let mut rust_actor = PyDataActor::new(None);
2888            rust_actor.set_python_instance(py_actor.clone_ref(py));
2889            rust_actor.register(trader_id, clock, cache).unwrap();
2890
2891            let result = DataActor::on_reset(rust_actor.inner_mut());
2892
2893            assert!(result.is_ok());
2894            assert!(python_method_was_called(&py_actor, py, "on_reset"));
2895        });
2896    }
2897
2898    #[rstest]
2899    fn test_python_dispatch_on_dispose(
2900        clock: Rc<RefCell<TestClock>>,
2901        cache: Rc<RefCell<Cache>>,
2902        trader_id: TraderId,
2903    ) {
2904        pyo3::Python::initialize();
2905        Python::attach(|py| {
2906            let py_actor = create_tracking_python_actor(py).unwrap();
2907
2908            let mut rust_actor = PyDataActor::new(None);
2909            rust_actor.set_python_instance(py_actor.clone_ref(py));
2910            rust_actor.register(trader_id, clock, cache).unwrap();
2911
2912            let result = DataActor::on_dispose(rust_actor.inner_mut());
2913
2914            assert!(result.is_ok());
2915            assert!(python_method_was_called(&py_actor, py, "on_dispose"));
2916        });
2917    }
2918
2919    #[rstest]
2920    fn test_python_dispatch_on_degrade(
2921        clock: Rc<RefCell<TestClock>>,
2922        cache: Rc<RefCell<Cache>>,
2923        trader_id: TraderId,
2924    ) {
2925        pyo3::Python::initialize();
2926        Python::attach(|py| {
2927            let py_actor = create_tracking_python_actor(py).unwrap();
2928
2929            let mut rust_actor = PyDataActor::new(None);
2930            rust_actor.set_python_instance(py_actor.clone_ref(py));
2931            rust_actor.register(trader_id, clock, cache).unwrap();
2932
2933            let result = DataActor::on_degrade(rust_actor.inner_mut());
2934
2935            assert!(result.is_ok());
2936            assert!(python_method_was_called(&py_actor, py, "on_degrade"));
2937        });
2938    }
2939
2940    #[rstest]
2941    fn test_python_dispatch_on_fault(
2942        clock: Rc<RefCell<TestClock>>,
2943        cache: Rc<RefCell<Cache>>,
2944        trader_id: TraderId,
2945    ) {
2946        pyo3::Python::initialize();
2947        Python::attach(|py| {
2948            let py_actor = create_tracking_python_actor(py).unwrap();
2949
2950            let mut rust_actor = PyDataActor::new(None);
2951            rust_actor.set_python_instance(py_actor.clone_ref(py));
2952            rust_actor.register(trader_id, clock, cache).unwrap();
2953
2954            let result = DataActor::on_fault(rust_actor.inner_mut());
2955
2956            assert!(result.is_ok());
2957            assert!(python_method_was_called(&py_actor, py, "on_fault"));
2958        });
2959    }
2960
2961    #[rstest]
2962    fn test_python_dispatch_on_signal(
2963        clock: Rc<RefCell<TestClock>>,
2964        cache: Rc<RefCell<Cache>>,
2965        trader_id: TraderId,
2966    ) {
2967        pyo3::Python::initialize();
2968        Python::attach(|py| {
2969            let py_actor = create_tracking_python_actor(py).unwrap();
2970
2971            let mut rust_actor = PyDataActor::new(None);
2972            rust_actor.set_python_instance(py_actor.clone_ref(py));
2973            rust_actor.register(trader_id, clock, cache).unwrap();
2974
2975            let signal = Signal::new(
2976                Ustr::from("test_signal"),
2977                "1.0".to_string(),
2978                UnixNanos::default(),
2979                UnixNanos::default(),
2980            );
2981
2982            let result = rust_actor.inner_mut().on_signal(&signal);
2983
2984            assert!(result.is_ok());
2985            assert!(python_method_was_called(&py_actor, py, "on_signal"));
2986        });
2987    }
2988
2989    #[rstest]
2990    fn test_python_dispatch_on_time_event(
2991        clock: Rc<RefCell<TestClock>>,
2992        cache: Rc<RefCell<Cache>>,
2993        trader_id: TraderId,
2994    ) {
2995        pyo3::Python::initialize();
2996        Python::attach(|py| {
2997            let py_actor = create_tracking_python_actor(py).unwrap();
2998
2999            let mut rust_actor = PyDataActor::new(None);
3000            rust_actor.set_python_instance(py_actor.clone_ref(py));
3001            rust_actor.register(trader_id, clock, cache).unwrap();
3002
3003            let time_event = TimeEvent::new(
3004                Ustr::from("test_timer"),
3005                UUID4::new(),
3006                UnixNanos::default(),
3007                UnixNanos::default(),
3008            );
3009
3010            let result = rust_actor.inner_mut().on_time_event(&time_event);
3011
3012            assert!(result.is_ok());
3013            assert!(python_method_was_called(&py_actor, py, "on_time_event"));
3014        });
3015    }
3016
3017    #[rstest]
3018    fn test_python_dispatch_on_instrument(
3019        clock: Rc<RefCell<TestClock>>,
3020        cache: Rc<RefCell<Cache>>,
3021        trader_id: TraderId,
3022        audusd_sim: CurrencyPair,
3023    ) {
3024        pyo3::Python::initialize();
3025        Python::attach(|py| {
3026            let py_actor = create_tracking_python_actor(py).unwrap();
3027
3028            let mut rust_actor = PyDataActor::new(None);
3029            rust_actor.set_python_instance(py_actor.clone_ref(py));
3030            rust_actor.register(trader_id, clock, cache).unwrap();
3031
3032            let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3033
3034            let result = rust_actor.inner_mut().on_instrument(&instrument);
3035
3036            assert!(result.is_ok());
3037            assert!(python_method_was_called(&py_actor, py, "on_instrument"));
3038        });
3039    }
3040
3041    #[rstest]
3042    fn test_python_dispatch_on_quote(
3043        clock: Rc<RefCell<TestClock>>,
3044        cache: Rc<RefCell<Cache>>,
3045        trader_id: TraderId,
3046        audusd_sim: CurrencyPair,
3047    ) {
3048        pyo3::Python::initialize();
3049        Python::attach(|py| {
3050            let py_actor = create_tracking_python_actor(py).unwrap();
3051
3052            let mut rust_actor = PyDataActor::new(None);
3053            rust_actor.set_python_instance(py_actor.clone_ref(py));
3054            rust_actor.register(trader_id, clock, cache).unwrap();
3055
3056            let quote = QuoteTick::new(
3057                audusd_sim.id,
3058                Price::from("1.00000"),
3059                Price::from("1.00001"),
3060                Quantity::from(100_000),
3061                Quantity::from(100_000),
3062                UnixNanos::default(),
3063                UnixNanos::default(),
3064            );
3065
3066            let result = rust_actor.inner_mut().on_quote(&quote);
3067
3068            assert!(result.is_ok());
3069            assert!(python_method_was_called(&py_actor, py, "on_quote"));
3070        });
3071    }
3072
3073    #[rstest]
3074    fn test_python_dispatch_on_trade(
3075        clock: Rc<RefCell<TestClock>>,
3076        cache: Rc<RefCell<Cache>>,
3077        trader_id: TraderId,
3078        audusd_sim: CurrencyPair,
3079    ) {
3080        pyo3::Python::initialize();
3081        Python::attach(|py| {
3082            let py_actor = create_tracking_python_actor(py).unwrap();
3083
3084            let mut rust_actor = PyDataActor::new(None);
3085            rust_actor.set_python_instance(py_actor.clone_ref(py));
3086            rust_actor.register(trader_id, clock, cache).unwrap();
3087
3088            let trade = TradeTick::new(
3089                audusd_sim.id,
3090                Price::from("1.00000"),
3091                Quantity::from(100_000),
3092                AggressorSide::Buyer,
3093                TradeId::new("123456"),
3094                UnixNanos::default(),
3095                UnixNanos::default(),
3096            );
3097
3098            let result = rust_actor.inner_mut().on_trade(&trade);
3099
3100            assert!(result.is_ok());
3101            assert!(python_method_was_called(&py_actor, py, "on_trade"));
3102        });
3103    }
3104
3105    #[rstest]
3106    fn test_python_dispatch_on_bar(
3107        clock: Rc<RefCell<TestClock>>,
3108        cache: Rc<RefCell<Cache>>,
3109        trader_id: TraderId,
3110        audusd_sim: CurrencyPair,
3111    ) {
3112        pyo3::Python::initialize();
3113        Python::attach(|py| {
3114            let py_actor = create_tracking_python_actor(py).unwrap();
3115
3116            let mut rust_actor = PyDataActor::new(None);
3117            rust_actor.set_python_instance(py_actor.clone_ref(py));
3118            rust_actor.register(trader_id, clock, cache).unwrap();
3119
3120            let bar_type =
3121                BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap();
3122            let bar = Bar::new(
3123                bar_type,
3124                Price::from("1.00000"),
3125                Price::from("1.00010"),
3126                Price::from("0.99990"),
3127                Price::from("1.00005"),
3128                Quantity::from(100_000),
3129                UnixNanos::default(),
3130                UnixNanos::default(),
3131            );
3132
3133            let result = rust_actor.inner_mut().on_bar(&bar);
3134
3135            assert!(result.is_ok());
3136            assert!(python_method_was_called(&py_actor, py, "on_bar"));
3137        });
3138    }
3139
3140    #[rstest]
3141    fn test_python_dispatch_on_book(
3142        clock: Rc<RefCell<TestClock>>,
3143        cache: Rc<RefCell<Cache>>,
3144        trader_id: TraderId,
3145        audusd_sim: CurrencyPair,
3146    ) {
3147        pyo3::Python::initialize();
3148        Python::attach(|py| {
3149            let py_actor = create_tracking_python_actor(py).unwrap();
3150
3151            let mut rust_actor = PyDataActor::new(None);
3152            rust_actor.set_python_instance(py_actor.clone_ref(py));
3153            rust_actor.register(trader_id, clock, cache).unwrap();
3154
3155            let book = OrderBook::new(audusd_sim.id, BookType::L2_MBP);
3156
3157            let result = rust_actor.inner_mut().on_book(&book);
3158
3159            assert!(result.is_ok());
3160            assert!(python_method_was_called(&py_actor, py, "on_book"));
3161        });
3162    }
3163
3164    #[rstest]
3165    fn test_python_dispatch_on_book_deltas(
3166        clock: Rc<RefCell<TestClock>>,
3167        cache: Rc<RefCell<Cache>>,
3168        trader_id: TraderId,
3169        audusd_sim: CurrencyPair,
3170    ) {
3171        pyo3::Python::initialize();
3172        Python::attach(|py| {
3173            let py_actor = create_tracking_python_actor(py).unwrap();
3174
3175            let mut rust_actor = PyDataActor::new(None);
3176            rust_actor.set_python_instance(py_actor.clone_ref(py));
3177            rust_actor.register(trader_id, clock, cache).unwrap();
3178
3179            let delta =
3180                OrderBookDelta::clear(audusd_sim.id, 0, UnixNanos::default(), UnixNanos::default());
3181            let deltas = OrderBookDeltas::new(audusd_sim.id, vec![delta]);
3182
3183            let result = rust_actor.inner_mut().on_book_deltas(&deltas);
3184
3185            assert!(result.is_ok());
3186            assert!(python_method_was_called(&py_actor, py, "on_book_deltas"));
3187        });
3188    }
3189
3190    #[rstest]
3191    fn test_python_dispatch_on_mark_price(
3192        clock: Rc<RefCell<TestClock>>,
3193        cache: Rc<RefCell<Cache>>,
3194        trader_id: TraderId,
3195        audusd_sim: CurrencyPair,
3196    ) {
3197        pyo3::Python::initialize();
3198        Python::attach(|py| {
3199            let py_actor = create_tracking_python_actor(py).unwrap();
3200
3201            let mut rust_actor = PyDataActor::new(None);
3202            rust_actor.set_python_instance(py_actor.clone_ref(py));
3203            rust_actor.register(trader_id, clock, cache).unwrap();
3204
3205            let mark_price = MarkPriceUpdate::new(
3206                audusd_sim.id,
3207                Price::from("1.00000"),
3208                UnixNanos::default(),
3209                UnixNanos::default(),
3210            );
3211
3212            let result = rust_actor.inner_mut().on_mark_price(&mark_price);
3213
3214            assert!(result.is_ok());
3215            assert!(python_method_was_called(&py_actor, py, "on_mark_price"));
3216        });
3217    }
3218
3219    #[rstest]
3220    fn test_python_dispatch_on_index_price(
3221        clock: Rc<RefCell<TestClock>>,
3222        cache: Rc<RefCell<Cache>>,
3223        trader_id: TraderId,
3224        audusd_sim: CurrencyPair,
3225    ) {
3226        pyo3::Python::initialize();
3227        Python::attach(|py| {
3228            let py_actor = create_tracking_python_actor(py).unwrap();
3229
3230            let mut rust_actor = PyDataActor::new(None);
3231            rust_actor.set_python_instance(py_actor.clone_ref(py));
3232            rust_actor.register(trader_id, clock, cache).unwrap();
3233
3234            let index_price = IndexPriceUpdate::new(
3235                audusd_sim.id,
3236                Price::from("1.00000"),
3237                UnixNanos::default(),
3238                UnixNanos::default(),
3239            );
3240
3241            let result = rust_actor.inner_mut().on_index_price(&index_price);
3242
3243            assert!(result.is_ok());
3244            assert!(python_method_was_called(&py_actor, py, "on_index_price"));
3245        });
3246    }
3247
3248    #[rstest]
3249    fn test_python_dispatch_on_instrument_status(
3250        clock: Rc<RefCell<TestClock>>,
3251        cache: Rc<RefCell<Cache>>,
3252        trader_id: TraderId,
3253        audusd_sim: CurrencyPair,
3254    ) {
3255        pyo3::Python::initialize();
3256        Python::attach(|py| {
3257            let py_actor = create_tracking_python_actor(py).unwrap();
3258
3259            let mut rust_actor = PyDataActor::new(None);
3260            rust_actor.set_python_instance(py_actor.clone_ref(py));
3261            rust_actor.register(trader_id, clock, cache).unwrap();
3262
3263            let status = InstrumentStatus::new(
3264                audusd_sim.id,
3265                MarketStatusAction::Trading,
3266                UnixNanos::default(),
3267                UnixNanos::default(),
3268                None,
3269                None,
3270                None,
3271                None,
3272                None,
3273            );
3274
3275            let result = rust_actor.inner_mut().on_instrument_status(&status);
3276
3277            assert!(result.is_ok());
3278            assert!(python_method_was_called(
3279                &py_actor,
3280                py,
3281                "on_instrument_status"
3282            ));
3283        });
3284    }
3285
3286    #[rstest]
3287    fn test_python_dispatch_on_instrument_close(
3288        clock: Rc<RefCell<TestClock>>,
3289        cache: Rc<RefCell<Cache>>,
3290        trader_id: TraderId,
3291        audusd_sim: CurrencyPair,
3292    ) {
3293        pyo3::Python::initialize();
3294        Python::attach(|py| {
3295            let py_actor = create_tracking_python_actor(py).unwrap();
3296
3297            let mut rust_actor = PyDataActor::new(None);
3298            rust_actor.set_python_instance(py_actor.clone_ref(py));
3299            rust_actor.register(trader_id, clock, cache).unwrap();
3300
3301            let close = InstrumentClose::new(
3302                audusd_sim.id,
3303                Price::from("1.00000"),
3304                InstrumentCloseType::EndOfSession,
3305                UnixNanos::default(),
3306                UnixNanos::default(),
3307            );
3308
3309            let result = rust_actor.inner_mut().on_instrument_close(&close);
3310
3311            assert!(result.is_ok());
3312            assert!(python_method_was_called(
3313                &py_actor,
3314                py,
3315                "on_instrument_close"
3316            ));
3317        });
3318    }
3319
3320    #[rstest]
3321    fn test_python_dispatch_multiple_calls_tracked(
3322        clock: Rc<RefCell<TestClock>>,
3323        cache: Rc<RefCell<Cache>>,
3324        trader_id: TraderId,
3325        audusd_sim: CurrencyPair,
3326    ) {
3327        pyo3::Python::initialize();
3328        Python::attach(|py| {
3329            let py_actor = create_tracking_python_actor(py).unwrap();
3330
3331            let mut rust_actor = PyDataActor::new(None);
3332            rust_actor.set_python_instance(py_actor.clone_ref(py));
3333            rust_actor.register(trader_id, clock, cache).unwrap();
3334
3335            let quote = QuoteTick::new(
3336                audusd_sim.id,
3337                Price::from("1.00000"),
3338                Price::from("1.00001"),
3339                Quantity::from(100_000),
3340                Quantity::from(100_000),
3341                UnixNanos::default(),
3342                UnixNanos::default(),
3343            );
3344
3345            rust_actor.inner_mut().on_quote(&quote).unwrap();
3346            rust_actor.inner_mut().on_quote(&quote).unwrap();
3347            rust_actor.inner_mut().on_quote(&quote).unwrap();
3348
3349            assert_eq!(python_method_call_count(&py_actor, py, "on_quote"), 3);
3350        });
3351    }
3352
3353    #[rstest]
3354    fn test_python_dispatch_no_call_when_py_self_not_set(
3355        clock: Rc<RefCell<TestClock>>,
3356        cache: Rc<RefCell<Cache>>,
3357        trader_id: TraderId,
3358    ) {
3359        pyo3::Python::initialize();
3360        Python::attach(|_py| {
3361            let mut rust_actor = PyDataActor::new(None);
3362            rust_actor.register(trader_id, clock, cache).unwrap();
3363
3364            // When py_self is None, the dispatch returns Ok(()) without calling Python
3365            let result = DataActor::on_start(rust_actor.inner_mut());
3366            assert!(result.is_ok());
3367        });
3368    }
3369}