nautilus_common/python/
actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
5//  You may not use this file except in compliance with the License.
6//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
7//  Unless required by applicable law or agreed to in writing, software
8//  distributed under the License is distributed on an "AS IS" BASIS,
9//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10//  See the License for the specific language governing permissions and
11//  limitations under the License.
12// -------------------------------------------------------------------------------------------------
13
14//! Python bindings for DataActor with complete command and event handler forwarding.
15
16use std::{
17    any::Any,
18    cell::{RefCell, UnsafeCell},
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroUsize,
22    ops::{Deref, DerefMut},
23    rc::Rc,
24};
25
26use indexmap::IndexMap;
27use nautilus_core::{
28    nanos::UnixNanos,
29    python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err},
30};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33    Block, Blockchain, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap,
34};
35use nautilus_model::{
36    data::{
37        Bar, BarType, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38        MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
39    },
40    enums::BookType,
41    identifiers::{ActorId, ClientId, InstrumentId, TraderId, Venue},
42    instruments::InstrumentAny,
43    orderbook::OrderBook,
44    python::instruments::instrument_any_to_pyobject,
45};
46use pyo3::{exceptions::PyValueError, prelude::*, types::PyDict};
47
48use crate::{
49    actor::{
50        Actor, DataActor,
51        data_actor::{DataActorConfig, DataActorCore, ImportableActorConfig},
52        registry::{get_actor_registry, try_get_actor_unchecked},
53    },
54    cache::Cache,
55    clock::Clock,
56    component::{Component, get_component_registry},
57    enums::ComponentState,
58    python::{cache::PyCache, clock::PyClock, logging::PyLogger},
59    signal::Signal,
60    timer::{TimeEvent, TimeEventCallback},
61};
62
63#[pyo3::pymethods]
64impl DataActorConfig {
65    #[new]
66    #[pyo3(signature = (actor_id=None, log_events=true, log_commands=true))]
67    fn py_new(actor_id: Option<ActorId>, log_events: bool, log_commands: bool) -> Self {
68        Self {
69            actor_id,
70            log_events,
71            log_commands,
72        }
73    }
74}
75
76#[pyo3::pymethods]
77impl ImportableActorConfig {
78    #[new]
79    fn py_new(actor_path: String, config_path: String, config: Py<PyDict>) -> PyResult<Self> {
80        let json_config = Python::attach(|py| -> PyResult<HashMap<String, serde_json::Value>> {
81            let json_str: String = PyModule::import(py, "json")?
82                .call_method("dumps", (config.bind(py),), None)?
83                .extract()?;
84
85            let json_value: serde_json::Value = serde_json::from_str(&json_str)
86                .map_err(|e| PyErr::new::<PyValueError, _>(e.to_string()))?;
87
88            if let serde_json::Value::Object(map) = json_value {
89                Ok(map.into_iter().collect())
90            } else {
91                Err(PyErr::new::<PyValueError, _>("Config must be a dictionary"))
92            }
93        })?;
94
95        Ok(Self {
96            actor_path,
97            config_path,
98            config: json_config,
99        })
100    }
101
102    #[getter]
103    fn actor_path(&self) -> &String {
104        &self.actor_path
105    }
106
107    #[getter]
108    fn config_path(&self) -> &String {
109        &self.config_path
110    }
111
112    #[getter]
113    fn config(&self, py: Python<'_>) -> PyResult<Py<PyDict>> {
114        // Convert HashMap<String, serde_json::Value> back to Python dict
115        let py_dict = PyDict::new(py);
116        for (key, value) in &self.config {
117            // Convert serde_json::Value back to Python object via JSON
118            let json_str = serde_json::to_string(value)
119                .map_err(|e| PyErr::new::<PyValueError, _>(e.to_string()))?;
120            let py_value = PyModule::import(py, "json")?.call_method("loads", (json_str,), None)?;
121            py_dict.set_item(key, py_value)?;
122        }
123        Ok(py_dict.unbind())
124    }
125}
126
127/// Inner state of PyDataActor, shared between Python wrapper and Rust registries.
128///
129/// This type holds the actual actor state and implements all the actor traits.
130/// It is wrapped in `Rc<UnsafeCell<>>` to allow shared ownership between Python
131/// and the global registries without copying.
132pub struct PyDataActorInner {
133    core: DataActorCore,
134    py_self: Option<Py<PyAny>>,
135    clock: PyClock,
136    logger: PyLogger,
137}
138
139impl Debug for PyDataActorInner {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        f.debug_struct("PyDataActorInner")
142            .field("core", &self.core)
143            .field("py_self", &self.py_self.as_ref().map(|_| "<Py<PyAny>>"))
144            .field("clock", &self.clock)
145            .field("logger", &self.logger)
146            .finish()
147    }
148}
149
150impl Deref for PyDataActorInner {
151    type Target = DataActorCore;
152
153    fn deref(&self) -> &Self::Target {
154        &self.core
155    }
156}
157
158impl DerefMut for PyDataActorInner {
159    fn deref_mut(&mut self) -> &mut Self::Target {
160        &mut self.core
161    }
162}
163
164impl PyDataActorInner {
165    fn dispatch_on_start(&self) -> PyResult<()> {
166        if let Some(ref py_self) = self.py_self {
167            Python::attach(|py| py_self.call_method0(py, "on_start"))?;
168        }
169        Ok(())
170    }
171
172    fn dispatch_on_stop(&mut self) -> PyResult<()> {
173        if let Some(ref py_self) = self.py_self {
174            Python::attach(|py| py_self.call_method0(py, "on_stop"))?;
175        }
176        Ok(())
177    }
178
179    fn dispatch_on_resume(&mut self) -> PyResult<()> {
180        if let Some(ref py_self) = self.py_self {
181            Python::attach(|py| py_self.call_method0(py, "on_resume"))?;
182        }
183        Ok(())
184    }
185
186    fn dispatch_on_reset(&mut self) -> PyResult<()> {
187        if let Some(ref py_self) = self.py_self {
188            Python::attach(|py| py_self.call_method0(py, "on_reset"))?;
189        }
190        Ok(())
191    }
192
193    fn dispatch_on_dispose(&mut self) -> PyResult<()> {
194        if let Some(ref py_self) = self.py_self {
195            Python::attach(|py| py_self.call_method0(py, "on_dispose"))?;
196        }
197        Ok(())
198    }
199
200    fn dispatch_on_degrade(&mut self) -> PyResult<()> {
201        if let Some(ref py_self) = self.py_self {
202            Python::attach(|py| py_self.call_method0(py, "on_degrade"))?;
203        }
204        Ok(())
205    }
206
207    fn dispatch_on_fault(&mut self) -> PyResult<()> {
208        if let Some(ref py_self) = self.py_self {
209            Python::attach(|py| py_self.call_method0(py, "on_fault"))?;
210        }
211        Ok(())
212    }
213
214    fn dispatch_on_time_event(&mut self, event: TimeEvent) -> PyResult<()> {
215        if let Some(ref py_self) = self.py_self {
216            Python::attach(|py| {
217                py_self.call_method1(py, "on_time_event", (event.into_py_any_unwrap(py),))
218            })?;
219        }
220        Ok(())
221    }
222
223    fn dispatch_on_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
224        if let Some(ref py_self) = self.py_self {
225            Python::attach(|py| py_self.call_method1(py, "on_data", (data,)))?;
226        }
227        Ok(())
228    }
229
230    fn dispatch_on_signal(&mut self, signal: &Signal) -> PyResult<()> {
231        if let Some(ref py_self) = self.py_self {
232            Python::attach(|py| {
233                py_self.call_method1(py, "on_signal", (signal.clone().into_py_any_unwrap(py),))
234            })?;
235        }
236        Ok(())
237    }
238
239    fn dispatch_on_instrument(&mut self, instrument: Py<PyAny>) -> PyResult<()> {
240        if let Some(ref py_self) = self.py_self {
241            Python::attach(|py| py_self.call_method1(py, "on_instrument", (instrument,)))?;
242        }
243        Ok(())
244    }
245
246    fn dispatch_on_quote(&mut self, quote: QuoteTick) -> PyResult<()> {
247        if let Some(ref py_self) = self.py_self {
248            Python::attach(|py| {
249                py_self.call_method1(py, "on_quote", (quote.into_py_any_unwrap(py),))
250            })?;
251        }
252        Ok(())
253    }
254
255    fn dispatch_on_trade(&mut self, trade: TradeTick) -> PyResult<()> {
256        if let Some(ref py_self) = self.py_self {
257            Python::attach(|py| {
258                py_self.call_method1(py, "on_trade", (trade.into_py_any_unwrap(py),))
259            })?;
260        }
261        Ok(())
262    }
263
264    fn dispatch_on_bar(&mut self, bar: Bar) -> PyResult<()> {
265        if let Some(ref py_self) = self.py_self {
266            Python::attach(|py| py_self.call_method1(py, "on_bar", (bar.into_py_any_unwrap(py),)))?;
267        }
268        Ok(())
269    }
270
271    fn dispatch_on_book_deltas(&mut self, deltas: OrderBookDeltas) -> PyResult<()> {
272        if let Some(ref py_self) = self.py_self {
273            Python::attach(|py| {
274                py_self.call_method1(py, "on_book_deltas", (deltas.into_py_any_unwrap(py),))
275            })?;
276        }
277        Ok(())
278    }
279
280    fn dispatch_on_book(&mut self, book: &OrderBook) -> PyResult<()> {
281        if let Some(ref py_self) = self.py_self {
282            Python::attach(|py| {
283                py_self.call_method1(py, "on_book", (book.clone().into_py_any_unwrap(py),))
284            })?;
285        }
286        Ok(())
287    }
288
289    fn dispatch_on_mark_price(&mut self, mark_price: MarkPriceUpdate) -> PyResult<()> {
290        if let Some(ref py_self) = self.py_self {
291            Python::attach(|py| {
292                py_self.call_method1(py, "on_mark_price", (mark_price.into_py_any_unwrap(py),))
293            })?;
294        }
295        Ok(())
296    }
297
298    fn dispatch_on_index_price(&mut self, index_price: IndexPriceUpdate) -> PyResult<()> {
299        if let Some(ref py_self) = self.py_self {
300            Python::attach(|py| {
301                py_self.call_method1(py, "on_index_price", (index_price.into_py_any_unwrap(py),))
302            })?;
303        }
304        Ok(())
305    }
306
307    fn dispatch_on_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> PyResult<()> {
308        if let Some(ref py_self) = self.py_self {
309            Python::attach(|py| {
310                py_self.call_method1(
311                    py,
312                    "on_funding_rate",
313                    (funding_rate.into_py_any_unwrap(py),),
314                )
315            })?;
316        }
317        Ok(())
318    }
319
320    fn dispatch_on_instrument_status(&mut self, data: InstrumentStatus) -> PyResult<()> {
321        if let Some(ref py_self) = self.py_self {
322            Python::attach(|py| {
323                py_self.call_method1(py, "on_instrument_status", (data.into_py_any_unwrap(py),))
324            })?;
325        }
326        Ok(())
327    }
328
329    fn dispatch_on_instrument_close(&mut self, update: InstrumentClose) -> PyResult<()> {
330        if let Some(ref py_self) = self.py_self {
331            Python::attach(|py| {
332                py_self.call_method1(py, "on_instrument_close", (update.into_py_any_unwrap(py),))
333            })?;
334        }
335        Ok(())
336    }
337
338    fn dispatch_on_historical_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
339        if let Some(ref py_self) = self.py_self {
340            Python::attach(|py| py_self.call_method1(py, "on_historical_data", (data,)))?;
341        }
342        Ok(())
343    }
344
345    fn dispatch_on_historical_quotes(&mut self, quotes: Vec<QuoteTick>) -> PyResult<()> {
346        if let Some(ref py_self) = self.py_self {
347            Python::attach(|py| {
348                let py_quotes: Vec<_> = quotes
349                    .into_iter()
350                    .map(|q| q.into_py_any_unwrap(py))
351                    .collect();
352                py_self.call_method1(py, "on_historical_quotes", (py_quotes,))
353            })?;
354        }
355        Ok(())
356    }
357
358    fn dispatch_on_historical_trades(&mut self, trades: Vec<TradeTick>) -> PyResult<()> {
359        if let Some(ref py_self) = self.py_self {
360            Python::attach(|py| {
361                let py_trades: Vec<_> = trades
362                    .into_iter()
363                    .map(|t| t.into_py_any_unwrap(py))
364                    .collect();
365                py_self.call_method1(py, "on_historical_trades", (py_trades,))
366            })?;
367        }
368        Ok(())
369    }
370
371    fn dispatch_on_historical_bars(&mut self, bars: Vec<Bar>) -> PyResult<()> {
372        if let Some(ref py_self) = self.py_self {
373            Python::attach(|py| {
374                let py_bars: Vec<_> = bars.into_iter().map(|b| b.into_py_any_unwrap(py)).collect();
375                py_self.call_method1(py, "on_historical_bars", (py_bars,))
376            })?;
377        }
378        Ok(())
379    }
380
381    fn dispatch_on_historical_mark_prices(
382        &mut self,
383        mark_prices: Vec<MarkPriceUpdate>,
384    ) -> PyResult<()> {
385        if let Some(ref py_self) = self.py_self {
386            Python::attach(|py| {
387                let py_prices: Vec<_> = mark_prices
388                    .into_iter()
389                    .map(|p| p.into_py_any_unwrap(py))
390                    .collect();
391                py_self.call_method1(py, "on_historical_mark_prices", (py_prices,))
392            })?;
393        }
394        Ok(())
395    }
396
397    fn dispatch_on_historical_index_prices(
398        &mut self,
399        index_prices: Vec<IndexPriceUpdate>,
400    ) -> PyResult<()> {
401        if let Some(ref py_self) = self.py_self {
402            Python::attach(|py| {
403                let py_prices: Vec<_> = index_prices
404                    .into_iter()
405                    .map(|p| p.into_py_any_unwrap(py))
406                    .collect();
407                py_self.call_method1(py, "on_historical_index_prices", (py_prices,))
408            })?;
409        }
410        Ok(())
411    }
412
413    #[cfg(feature = "defi")]
414    fn dispatch_on_block(&mut self, block: Block) -> PyResult<()> {
415        if let Some(ref py_self) = self.py_self {
416            Python::attach(|py| {
417                py_self.call_method1(py, "on_block", (block.into_py_any_unwrap(py),))
418            })?;
419        }
420        Ok(())
421    }
422
423    #[cfg(feature = "defi")]
424    fn dispatch_on_pool(&mut self, pool: Pool) -> PyResult<()> {
425        if let Some(ref py_self) = self.py_self {
426            Python::attach(|py| {
427                py_self.call_method1(py, "on_pool", (pool.into_py_any_unwrap(py),))
428            })?;
429        }
430        Ok(())
431    }
432
433    #[cfg(feature = "defi")]
434    fn dispatch_on_pool_swap(&mut self, swap: PoolSwap) -> PyResult<()> {
435        if let Some(ref py_self) = self.py_self {
436            Python::attach(|py| {
437                py_self.call_method1(py, "on_pool_swap", (swap.into_py_any_unwrap(py),))
438            })?;
439        }
440        Ok(())
441    }
442
443    #[cfg(feature = "defi")]
444    fn dispatch_on_pool_liquidity_update(&mut self, update: PoolLiquidityUpdate) -> PyResult<()> {
445        if let Some(ref py_self) = self.py_self {
446            Python::attach(|py| {
447                py_self.call_method1(
448                    py,
449                    "on_pool_liquidity_update",
450                    (update.into_py_any_unwrap(py),),
451                )
452            })?;
453        }
454        Ok(())
455    }
456
457    #[cfg(feature = "defi")]
458    fn dispatch_on_pool_fee_collect(&mut self, collect: PoolFeeCollect) -> PyResult<()> {
459        if let Some(ref py_self) = self.py_self {
460            Python::attach(|py| {
461                py_self.call_method1(py, "on_pool_fee_collect", (collect.into_py_any_unwrap(py),))
462            })?;
463        }
464        Ok(())
465    }
466
467    #[cfg(feature = "defi")]
468    fn dispatch_on_pool_flash(&mut self, flash: PoolFlash) -> PyResult<()> {
469        if let Some(ref py_self) = self.py_self {
470            Python::attach(|py| {
471                py_self.call_method1(py, "on_pool_flash", (flash.into_py_any_unwrap(py),))
472            })?;
473        }
474        Ok(())
475    }
476}
477
478/// Python-facing wrapper for DataActor.
479///
480/// This wrapper holds shared ownership of `PyDataActorInner` via `Rc<UnsafeCell<>>`.
481/// Both Python (through this wrapper) and the global registries share the same
482/// underlying actor instance, ensuring mutations are visible from both sides.
483#[allow(non_camel_case_types)]
484#[pyo3::pyclass(
485    module = "nautilus_trader.common",
486    name = "DataActor",
487    unsendable,
488    subclass
489)]
490pub struct PyDataActor {
491    inner: Rc<UnsafeCell<PyDataActorInner>>,
492}
493
494impl Debug for PyDataActor {
495    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
496        f.debug_struct("PyDataActor")
497            .field("inner", &self.inner())
498            .finish()
499    }
500}
501
502impl PyDataActor {
503    /// Returns a reference to the inner actor state.
504    ///
505    /// # Safety
506    ///
507    /// This is safe for single-threaded use. The `UnsafeCell` allows interior
508    /// mutability which is required for the registries to mutate the actor.
509    #[inline]
510    #[allow(unsafe_code)]
511    pub(crate) fn inner(&self) -> &PyDataActorInner {
512        unsafe { &*self.inner.get() }
513    }
514
515    /// Returns a mutable reference to the inner actor state.
516    ///
517    /// # Safety
518    ///
519    /// This is safe for single-threaded use. Callers must ensure no aliasing
520    /// mutable references exist.
521    #[inline]
522    #[allow(unsafe_code, clippy::mut_from_ref)]
523    pub(crate) fn inner_mut(&self) -> &mut PyDataActorInner {
524        unsafe { &mut *self.inner.get() }
525    }
526}
527
528impl Deref for PyDataActor {
529    type Target = DataActorCore;
530
531    fn deref(&self) -> &Self::Target {
532        &self.inner().core
533    }
534}
535
536impl DerefMut for PyDataActor {
537    fn deref_mut(&mut self) -> &mut Self::Target {
538        &mut self.inner_mut().core
539    }
540}
541
542impl PyDataActor {
543    // Rust constructor for tests and direct Rust usage
544    pub fn new(config: Option<DataActorConfig>) -> Self {
545        let config = config.unwrap_or_default();
546        let core = DataActorCore::new(config);
547        let clock = PyClock::new_test(); // Temporary clock, will be updated on registration
548        let logger = PyLogger::new(core.actor_id().as_str());
549
550        let inner = PyDataActorInner {
551            core,
552            py_self: None,
553            clock,
554            logger,
555        };
556
557        Self {
558            inner: Rc::new(UnsafeCell::new(inner)),
559        }
560    }
561
562    /// Sets the Python instance reference for method dispatch.
563    ///
564    /// This enables the PyDataActor to forward method calls (like `on_start`, `on_stop`)
565    /// to the original Python instance that contains this PyDataActor. This is essential
566    /// for Python inheritance to work correctly, allowing Python subclasses to override
567    /// DataActor methods and have them called by the Rust system.
568    pub fn set_python_instance(&mut self, py_obj: Py<PyAny>) {
569        self.inner_mut().py_self = Some(py_obj);
570    }
571
572    /// Updates the actor_id in both the core config and the actor_id field.
573    ///
574    /// # Safety
575    ///
576    /// This method is only exposed for the Python actor to assist with configuration and should
577    /// **never** be called post registration. Calling this after registration will cause
578    /// inconsistent state where the actor is registered under one ID but its internal actor_id
579    /// field contains another, breaking message routing and lifecycle management.
580    pub fn set_actor_id(&mut self, actor_id: ActorId) {
581        let inner = self.inner_mut();
582        inner.core.config.actor_id = Some(actor_id);
583        inner.core.actor_id = actor_id;
584    }
585
586    /// Updates the log_events setting in the core config.
587    pub fn set_log_events(&mut self, log_events: bool) {
588        self.inner_mut().core.config.log_events = log_events;
589    }
590
591    /// Updates the log_commands setting in the core config.
592    pub fn set_log_commands(&mut self, log_commands: bool) {
593        self.inner_mut().core.config.log_commands = log_commands;
594    }
595
596    /// Returns the memory address of this instance as a hexadecimal string.
597    pub fn mem_address(&self) -> String {
598        self.inner().core.mem_address()
599    }
600
601    /// Returns a value indicating whether the actor has been registered with a trader.
602    pub fn is_registered(&self) -> bool {
603        self.inner().core.is_registered()
604    }
605
606    /// Register the actor with a trader.
607    ///
608    /// # Errors
609    ///
610    /// Returns an error if the actor is already registered or if the registration process fails.
611    pub fn register(
612        &mut self,
613        trader_id: TraderId,
614        clock: Rc<RefCell<dyn Clock>>,
615        cache: Rc<RefCell<Cache>>,
616    ) -> anyhow::Result<()> {
617        let inner = self.inner_mut();
618        inner.core.register(trader_id, clock, cache)?;
619
620        inner.clock = PyClock::from_rc(inner.core.clock_rc());
621
622        // Register default time event handler for this actor
623        let actor_id = inner.actor_id().inner();
624        let callback = TimeEventCallback::from(move |event: TimeEvent| {
625            if let Some(mut actor) = try_get_actor_unchecked::<PyDataActorInner>(&actor_id) {
626                if let Err(e) = actor.on_time_event(&event) {
627                    log::error!("Python time event handler failed for actor {actor_id}: {e}");
628                }
629            } else {
630                log::error!("Actor {actor_id} not found for time event handling");
631            }
632        });
633
634        inner.clock.inner_mut().register_default_handler(callback);
635
636        inner.initialize()
637    }
638
639    /// Registers this actor in the global component and actor registries.
640    ///
641    /// Clones the internal `Rc` and inserts into both registries. This ensures
642    /// Python and the registries share the exact same actor instance.
643    pub fn register_in_global_registries(&self) {
644        let inner = self.inner();
645        let component_id = inner.component_id().inner();
646        let actor_id = Actor::id(inner);
647
648        let inner_ref: Rc<UnsafeCell<PyDataActorInner>> = self.inner.clone();
649
650        let component_trait_ref: Rc<UnsafeCell<dyn Component>> = inner_ref.clone();
651        get_component_registry().insert(component_id, component_trait_ref);
652
653        let actor_trait_ref: Rc<UnsafeCell<dyn Actor>> = inner_ref;
654        get_actor_registry().insert(actor_id, actor_trait_ref);
655    }
656}
657
658impl DataActor for PyDataActorInner {
659    fn on_start(&mut self) -> anyhow::Result<()> {
660        self.dispatch_on_start()
661            .map_err(|e| anyhow::anyhow!("Python on_start failed: {e}"))
662    }
663
664    fn on_stop(&mut self) -> anyhow::Result<()> {
665        self.dispatch_on_stop()
666            .map_err(|e| anyhow::anyhow!("Python on_stop failed: {e}"))
667    }
668
669    fn on_resume(&mut self) -> anyhow::Result<()> {
670        self.dispatch_on_resume()
671            .map_err(|e| anyhow::anyhow!("Python on_resume failed: {e}"))
672    }
673
674    fn on_reset(&mut self) -> anyhow::Result<()> {
675        self.dispatch_on_reset()
676            .map_err(|e| anyhow::anyhow!("Python on_reset failed: {e}"))
677    }
678
679    fn on_dispose(&mut self) -> anyhow::Result<()> {
680        self.dispatch_on_dispose()
681            .map_err(|e| anyhow::anyhow!("Python on_dispose failed: {e}"))
682    }
683
684    fn on_degrade(&mut self) -> anyhow::Result<()> {
685        self.dispatch_on_degrade()
686            .map_err(|e| anyhow::anyhow!("Python on_degrade failed: {e}"))
687    }
688
689    fn on_fault(&mut self) -> anyhow::Result<()> {
690        self.dispatch_on_fault()
691            .map_err(|e| anyhow::anyhow!("Python on_fault failed: {e}"))
692    }
693
694    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
695        self.dispatch_on_time_event(event.clone())
696            .map_err(|e| anyhow::anyhow!("Python on_time_event failed: {e}"))
697    }
698
699    #[allow(unused_variables)]
700    fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
701        Python::attach(|py| {
702            // TODO: Create a placeholder object since we can't easily convert &dyn Any to Py<PyAny>
703            // For now, we'll pass None and let Python subclasses handle specific data types
704            let py_data = py.None();
705
706            self.dispatch_on_data(py_data)
707                .map_err(|e| anyhow::anyhow!("Python on_data failed: {e}"))
708        })
709    }
710
711    fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
712        self.dispatch_on_signal(signal)
713            .map_err(|e| anyhow::anyhow!("Python on_signal failed: {e}"))
714    }
715
716    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
717        Python::attach(|py| {
718            let py_instrument = instrument_any_to_pyobject(py, instrument.clone())
719                .map_err(|e| anyhow::anyhow!("Failed to convert InstrumentAny to Python: {e}"))?;
720            self.dispatch_on_instrument(py_instrument)
721                .map_err(|e| anyhow::anyhow!("Python on_instrument failed: {e}"))
722        })
723    }
724
725    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
726        self.dispatch_on_quote(*quote)
727            .map_err(|e| anyhow::anyhow!("Python on_quote failed: {e}"))
728    }
729
730    fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
731        self.dispatch_on_trade(*tick)
732            .map_err(|e| anyhow::anyhow!("Python on_trade failed: {e}"))
733    }
734
735    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
736        self.dispatch_on_bar(*bar)
737            .map_err(|e| anyhow::anyhow!("Python on_bar failed: {e}"))
738    }
739
740    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
741        self.dispatch_on_book_deltas(deltas.clone())
742            .map_err(|e| anyhow::anyhow!("Python on_book_deltas failed: {e}"))
743    }
744
745    fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
746        self.dispatch_on_book(order_book)
747            .map_err(|e| anyhow::anyhow!("Python on_book failed: {e}"))
748    }
749
750    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
751        self.dispatch_on_mark_price(*mark_price)
752            .map_err(|e| anyhow::anyhow!("Python on_mark_price failed: {e}"))
753    }
754
755    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
756        self.dispatch_on_index_price(*index_price)
757            .map_err(|e| anyhow::anyhow!("Python on_index_price failed: {e}"))
758    }
759
760    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
761        self.dispatch_on_funding_rate(*funding_rate)
762            .map_err(|e| anyhow::anyhow!("Python on_funding_rate failed: {e}"))
763    }
764
765    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
766        self.dispatch_on_instrument_status(*data)
767            .map_err(|e| anyhow::anyhow!("Python on_instrument_status failed: {e}"))
768    }
769
770    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
771        self.dispatch_on_instrument_close(*update)
772            .map_err(|e| anyhow::anyhow!("Python on_instrument_close failed: {e}"))
773    }
774
775    #[cfg(feature = "defi")]
776    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
777        self.dispatch_on_block(block.clone())
778            .map_err(|e| anyhow::anyhow!("Python on_block failed: {e}"))
779    }
780
781    #[cfg(feature = "defi")]
782    fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
783        self.dispatch_on_pool(pool.clone())
784            .map_err(|e| anyhow::anyhow!("Python on_pool failed: {e}"))
785    }
786
787    #[cfg(feature = "defi")]
788    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
789        self.dispatch_on_pool_swap(swap.clone())
790            .map_err(|e| anyhow::anyhow!("Python on_pool_swap failed: {e}"))
791    }
792
793    #[cfg(feature = "defi")]
794    fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
795        self.dispatch_on_pool_liquidity_update(update.clone())
796            .map_err(|e| anyhow::anyhow!("Python on_pool_liquidity_update failed: {e}"))
797    }
798
799    #[cfg(feature = "defi")]
800    fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
801        self.dispatch_on_pool_fee_collect(collect.clone())
802            .map_err(|e| anyhow::anyhow!("Python on_pool_fee_collect failed: {e}"))
803    }
804
805    #[cfg(feature = "defi")]
806    fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
807        self.dispatch_on_pool_flash(flash.clone())
808            .map_err(|e| anyhow::anyhow!("Python on_pool_flash failed: {e}"))
809    }
810
811    fn on_historical_data(&mut self, _data: &dyn Any) -> anyhow::Result<()> {
812        Python::attach(|py| {
813            let py_data = py.None();
814            self.dispatch_on_historical_data(py_data)
815                .map_err(|e| anyhow::anyhow!("Python on_historical_data failed: {e}"))
816        })
817    }
818
819    fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
820        self.dispatch_on_historical_quotes(quotes.to_vec())
821            .map_err(|e| anyhow::anyhow!("Python on_historical_quotes failed: {e}"))
822    }
823
824    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
825        self.dispatch_on_historical_trades(trades.to_vec())
826            .map_err(|e| anyhow::anyhow!("Python on_historical_trades failed: {e}"))
827    }
828
829    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
830        self.dispatch_on_historical_bars(bars.to_vec())
831            .map_err(|e| anyhow::anyhow!("Python on_historical_bars failed: {e}"))
832    }
833
834    fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
835        self.dispatch_on_historical_mark_prices(mark_prices.to_vec())
836            .map_err(|e| anyhow::anyhow!("Python on_historical_mark_prices failed: {e}"))
837    }
838
839    fn on_historical_index_prices(
840        &mut self,
841        index_prices: &[IndexPriceUpdate],
842    ) -> anyhow::Result<()> {
843        self.dispatch_on_historical_index_prices(index_prices.to_vec())
844            .map_err(|e| anyhow::anyhow!("Python on_historical_index_prices failed: {e}"))
845    }
846}
847
848#[pymethods]
849impl PyDataActor {
850    #[new]
851    #[pyo3(signature = (config=None))]
852    fn py_new(config: Option<DataActorConfig>) -> PyResult<Self> {
853        Ok(Self::new(config))
854    }
855
856    #[getter]
857    #[pyo3(name = "clock")]
858    fn py_clock(&self) -> PyResult<PyClock> {
859        let inner = self.inner();
860        if !inner.core.is_registered() {
861            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
862                "Actor must be registered with a trader before accessing clock",
863            ))
864        } else {
865            Ok(inner.clock.clone())
866        }
867    }
868
869    #[getter]
870    #[pyo3(name = "cache")]
871    fn py_cache(&self) -> PyResult<PyCache> {
872        let inner = self.inner();
873        if !inner.core.is_registered() {
874            Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
875                "Actor must be registered with a trader before accessing cache",
876            ))
877        } else {
878            Ok(PyCache::from_rc(inner.core.cache_rc()))
879        }
880    }
881
882    #[getter]
883    #[pyo3(name = "log")]
884    fn py_log(&self) -> PyLogger {
885        self.inner().logger.clone()
886    }
887
888    #[getter]
889    #[pyo3(name = "actor_id")]
890    fn py_actor_id(&self) -> ActorId {
891        self.inner().core.actor_id
892    }
893
894    #[getter]
895    #[pyo3(name = "trader_id")]
896    fn py_trader_id(&self) -> Option<TraderId> {
897        self.inner().core.trader_id()
898    }
899
900    #[pyo3(name = "state")]
901    fn py_state(&self) -> ComponentState {
902        Component::state(self.inner())
903    }
904
905    #[pyo3(name = "is_ready")]
906    fn py_is_ready(&self) -> bool {
907        Component::is_ready(self.inner())
908    }
909
910    #[pyo3(name = "is_running")]
911    fn py_is_running(&self) -> bool {
912        Component::is_running(self.inner())
913    }
914
915    #[pyo3(name = "is_stopped")]
916    fn py_is_stopped(&self) -> bool {
917        Component::is_stopped(self.inner())
918    }
919
920    #[pyo3(name = "is_degraded")]
921    fn py_is_degraded(&self) -> bool {
922        Component::is_degraded(self.inner())
923    }
924
925    #[pyo3(name = "is_faulted")]
926    fn py_is_faulted(&self) -> bool {
927        Component::is_faulted(self.inner())
928    }
929
930    #[pyo3(name = "is_disposed")]
931    fn py_is_disposed(&self) -> bool {
932        Component::is_disposed(self.inner())
933    }
934
935    #[pyo3(name = "start")]
936    fn py_start(&mut self) -> PyResult<()> {
937        Component::start(self.inner_mut()).map_err(to_pyruntime_err)
938    }
939
940    #[pyo3(name = "stop")]
941    fn py_stop(&mut self) -> PyResult<()> {
942        Component::stop(self.inner_mut()).map_err(to_pyruntime_err)
943    }
944
945    #[pyo3(name = "resume")]
946    fn py_resume(&mut self) -> PyResult<()> {
947        Component::resume(self.inner_mut()).map_err(to_pyruntime_err)
948    }
949
950    #[pyo3(name = "reset")]
951    fn py_reset(&mut self) -> PyResult<()> {
952        Component::reset(self.inner_mut()).map_err(to_pyruntime_err)
953    }
954
955    #[pyo3(name = "dispose")]
956    fn py_dispose(&mut self) -> PyResult<()> {
957        Component::dispose(self.inner_mut()).map_err(to_pyruntime_err)
958    }
959
960    #[pyo3(name = "degrade")]
961    fn py_degrade(&mut self) -> PyResult<()> {
962        Component::degrade(self.inner_mut()).map_err(to_pyruntime_err)
963    }
964
965    #[pyo3(name = "fault")]
966    fn py_fault(&mut self) -> PyResult<()> {
967        Component::fault(self.inner_mut()).map_err(to_pyruntime_err)
968    }
969
970    #[pyo3(name = "shutdown_system")]
971    #[pyo3(signature = (reason=None))]
972    fn py_shutdown_system(&self, reason: Option<String>) -> PyResult<()> {
973        self.inner().core.shutdown_system(reason);
974        Ok(())
975    }
976
977    #[pyo3(name = "on_start")]
978    fn py_on_start(&self) -> PyResult<()> {
979        self.inner().dispatch_on_start()
980    }
981
982    #[pyo3(name = "on_stop")]
983    fn py_on_stop(&mut self) -> PyResult<()> {
984        self.inner_mut().dispatch_on_stop()
985    }
986
987    #[pyo3(name = "on_resume")]
988    fn py_on_resume(&mut self) -> PyResult<()> {
989        self.inner_mut().dispatch_on_resume()
990    }
991
992    #[pyo3(name = "on_reset")]
993    fn py_on_reset(&mut self) -> PyResult<()> {
994        self.inner_mut().dispatch_on_reset()
995    }
996
997    #[pyo3(name = "on_dispose")]
998    fn py_on_dispose(&mut self) -> PyResult<()> {
999        self.inner_mut().dispatch_on_dispose()
1000    }
1001
1002    #[pyo3(name = "on_degrade")]
1003    fn py_on_degrade(&mut self) -> PyResult<()> {
1004        self.inner_mut().dispatch_on_degrade()
1005    }
1006
1007    #[pyo3(name = "on_fault")]
1008    fn py_on_fault(&mut self) -> PyResult<()> {
1009        self.inner_mut().dispatch_on_fault()
1010    }
1011
1012    #[pyo3(name = "on_time_event")]
1013    fn py_on_time_event(&mut self, event: TimeEvent) -> PyResult<()> {
1014        self.inner_mut().dispatch_on_time_event(event)
1015    }
1016
1017    #[pyo3(name = "on_data")]
1018    fn py_on_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
1019        self.inner_mut().dispatch_on_data(data)
1020    }
1021
1022    #[pyo3(name = "on_signal")]
1023    fn py_on_signal(&mut self, signal: &Signal) -> PyResult<()> {
1024        self.inner_mut().dispatch_on_signal(signal)
1025    }
1026
1027    #[pyo3(name = "on_instrument")]
1028    fn py_on_instrument(&mut self, instrument: Py<PyAny>) -> PyResult<()> {
1029        self.inner_mut().dispatch_on_instrument(instrument)
1030    }
1031
1032    #[pyo3(name = "on_quote")]
1033    fn py_on_quote(&mut self, quote: QuoteTick) -> PyResult<()> {
1034        self.inner_mut().dispatch_on_quote(quote)
1035    }
1036
1037    #[pyo3(name = "on_trade")]
1038    fn py_on_trade(&mut self, trade: TradeTick) -> PyResult<()> {
1039        self.inner_mut().dispatch_on_trade(trade)
1040    }
1041
1042    #[pyo3(name = "on_bar")]
1043    fn py_on_bar(&mut self, bar: Bar) -> PyResult<()> {
1044        self.inner_mut().dispatch_on_bar(bar)
1045    }
1046
1047    #[pyo3(name = "on_book_deltas")]
1048    fn py_on_book_deltas(&mut self, deltas: OrderBookDeltas) -> PyResult<()> {
1049        self.inner_mut().dispatch_on_book_deltas(deltas)
1050    }
1051
1052    #[pyo3(name = "on_book")]
1053    fn py_on_book(&mut self, book: &OrderBook) -> PyResult<()> {
1054        self.inner_mut().dispatch_on_book(book)
1055    }
1056
1057    #[pyo3(name = "on_mark_price")]
1058    fn py_on_mark_price(&mut self, mark_price: MarkPriceUpdate) -> PyResult<()> {
1059        self.inner_mut().dispatch_on_mark_price(mark_price)
1060    }
1061
1062    #[pyo3(name = "on_index_price")]
1063    fn py_on_index_price(&mut self, index_price: IndexPriceUpdate) -> PyResult<()> {
1064        self.inner_mut().dispatch_on_index_price(index_price)
1065    }
1066
1067    #[pyo3(name = "on_funding_rate")]
1068    fn py_on_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> PyResult<()> {
1069        self.inner_mut().dispatch_on_funding_rate(funding_rate)
1070    }
1071
1072    #[pyo3(name = "on_instrument_status")]
1073    fn py_on_instrument_status(&mut self, status: InstrumentStatus) -> PyResult<()> {
1074        self.inner_mut().dispatch_on_instrument_status(status)
1075    }
1076
1077    #[pyo3(name = "on_instrument_close")]
1078    fn py_on_instrument_close(&mut self, close: InstrumentClose) -> PyResult<()> {
1079        self.inner_mut().dispatch_on_instrument_close(close)
1080    }
1081
1082    #[cfg(feature = "defi")]
1083    #[pyo3(name = "on_block")]
1084    fn py_on_block(&mut self, block: Block) -> PyResult<()> {
1085        self.inner_mut().dispatch_on_block(block)
1086    }
1087
1088    #[cfg(feature = "defi")]
1089    #[pyo3(name = "on_pool")]
1090    fn py_on_pool(&mut self, pool: Pool) -> PyResult<()> {
1091        self.inner_mut().dispatch_on_pool(pool)
1092    }
1093
1094    #[cfg(feature = "defi")]
1095    #[pyo3(name = "on_pool_swap")]
1096    fn py_on_pool_swap(&mut self, swap: PoolSwap) -> PyResult<()> {
1097        self.inner_mut().dispatch_on_pool_swap(swap)
1098    }
1099
1100    #[cfg(feature = "defi")]
1101    #[pyo3(name = "on_pool_liquidity_update")]
1102    fn py_on_pool_liquidity_update(&mut self, update: PoolLiquidityUpdate) -> PyResult<()> {
1103        self.inner_mut().dispatch_on_pool_liquidity_update(update)
1104    }
1105
1106    #[cfg(feature = "defi")]
1107    #[pyo3(name = "on_pool_fee_collect")]
1108    fn py_on_pool_fee_collect(&mut self, update: PoolFeeCollect) -> PyResult<()> {
1109        self.inner_mut().dispatch_on_pool_fee_collect(update)
1110    }
1111
1112    #[cfg(feature = "defi")]
1113    #[pyo3(name = "on_pool_flash")]
1114    fn py_on_pool_flash(&mut self, flash: PoolFlash) -> PyResult<()> {
1115        self.inner_mut().dispatch_on_pool_flash(flash)
1116    }
1117
1118    #[pyo3(name = "subscribe_data")]
1119    #[pyo3(signature = (data_type, client_id=None, params=None))]
1120    fn py_subscribe_data(
1121        &mut self,
1122        data_type: DataType,
1123        client_id: Option<ClientId>,
1124        params: Option<IndexMap<String, String>>,
1125    ) -> PyResult<()> {
1126        DataActor::subscribe_data(self.inner_mut(), data_type, client_id, params);
1127        Ok(())
1128    }
1129
1130    #[pyo3(name = "subscribe_instruments")]
1131    #[pyo3(signature = (venue, client_id=None, params=None))]
1132    fn py_subscribe_instruments(
1133        &mut self,
1134        venue: Venue,
1135        client_id: Option<ClientId>,
1136        params: Option<IndexMap<String, String>>,
1137    ) -> PyResult<()> {
1138        DataActor::subscribe_instruments(self.inner_mut(), venue, client_id, params);
1139        Ok(())
1140    }
1141
1142    #[pyo3(name = "subscribe_instrument")]
1143    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1144    fn py_subscribe_instrument(
1145        &mut self,
1146        instrument_id: InstrumentId,
1147        client_id: Option<ClientId>,
1148        params: Option<IndexMap<String, String>>,
1149    ) -> PyResult<()> {
1150        DataActor::subscribe_instrument(self.inner_mut(), instrument_id, client_id, params);
1151        Ok(())
1152    }
1153
1154    #[pyo3(name = "subscribe_book_deltas")]
1155    #[pyo3(signature = (instrument_id, book_type, depth=None, client_id=None, managed=false, params=None))]
1156    fn py_subscribe_book_deltas(
1157        &mut self,
1158        instrument_id: InstrumentId,
1159        book_type: BookType,
1160        depth: Option<usize>,
1161        client_id: Option<ClientId>,
1162        managed: bool,
1163        params: Option<IndexMap<String, String>>,
1164    ) -> PyResult<()> {
1165        let depth = depth.and_then(NonZeroUsize::new);
1166        DataActor::subscribe_book_deltas(
1167            self.inner_mut(),
1168            instrument_id,
1169            book_type,
1170            depth,
1171            client_id,
1172            managed,
1173            params,
1174        );
1175        Ok(())
1176    }
1177
1178    #[pyo3(name = "subscribe_book_at_interval")]
1179    #[pyo3(signature = (instrument_id, book_type, interval_ms, depth=None, client_id=None, params=None))]
1180    fn py_subscribe_book_at_interval(
1181        &mut self,
1182        instrument_id: InstrumentId,
1183        book_type: BookType,
1184        interval_ms: usize,
1185        depth: Option<usize>,
1186        client_id: Option<ClientId>,
1187        params: Option<IndexMap<String, String>>,
1188    ) -> PyResult<()> {
1189        let depth = depth.and_then(NonZeroUsize::new);
1190        let interval_ms = NonZeroUsize::new(interval_ms)
1191            .ok_or_else(|| PyErr::new::<PyValueError, _>("interval_ms must be > 0"))?;
1192
1193        DataActor::subscribe_book_at_interval(
1194            self.inner_mut(),
1195            instrument_id,
1196            book_type,
1197            depth,
1198            interval_ms,
1199            client_id,
1200            params,
1201        );
1202        Ok(())
1203    }
1204
1205    #[pyo3(name = "subscribe_quotes")]
1206    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1207    fn py_subscribe_quotes(
1208        &mut self,
1209        instrument_id: InstrumentId,
1210        client_id: Option<ClientId>,
1211        params: Option<IndexMap<String, String>>,
1212    ) -> PyResult<()> {
1213        DataActor::subscribe_quotes(self.inner_mut(), instrument_id, client_id, params);
1214        Ok(())
1215    }
1216
1217    #[pyo3(name = "subscribe_trades")]
1218    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1219    fn py_subscribe_trades(
1220        &mut self,
1221        instrument_id: InstrumentId,
1222        client_id: Option<ClientId>,
1223        params: Option<IndexMap<String, String>>,
1224    ) -> PyResult<()> {
1225        DataActor::subscribe_trades(self.inner_mut(), instrument_id, client_id, params);
1226        Ok(())
1227    }
1228
1229    #[pyo3(name = "subscribe_bars")]
1230    #[pyo3(signature = (bar_type, client_id=None, params=None))]
1231    fn py_subscribe_bars(
1232        &mut self,
1233        bar_type: BarType,
1234        client_id: Option<ClientId>,
1235        params: Option<IndexMap<String, String>>,
1236    ) -> PyResult<()> {
1237        DataActor::subscribe_bars(self.inner_mut(), bar_type, client_id, params);
1238        Ok(())
1239    }
1240
1241    #[pyo3(name = "subscribe_mark_prices")]
1242    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1243    fn py_subscribe_mark_prices(
1244        &mut self,
1245        instrument_id: InstrumentId,
1246        client_id: Option<ClientId>,
1247        params: Option<IndexMap<String, String>>,
1248    ) -> PyResult<()> {
1249        DataActor::subscribe_mark_prices(self.inner_mut(), instrument_id, client_id, params);
1250        Ok(())
1251    }
1252
1253    #[pyo3(name = "subscribe_index_prices")]
1254    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1255    fn py_subscribe_index_prices(
1256        &mut self,
1257        instrument_id: InstrumentId,
1258        client_id: Option<ClientId>,
1259        params: Option<IndexMap<String, String>>,
1260    ) -> PyResult<()> {
1261        DataActor::subscribe_index_prices(self.inner_mut(), instrument_id, client_id, params);
1262        Ok(())
1263    }
1264
1265    #[pyo3(name = "subscribe_instrument_status")]
1266    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1267    fn py_subscribe_instrument_status(
1268        &mut self,
1269        instrument_id: InstrumentId,
1270        client_id: Option<ClientId>,
1271        params: Option<IndexMap<String, String>>,
1272    ) -> PyResult<()> {
1273        DataActor::subscribe_instrument_status(self.inner_mut(), instrument_id, client_id, params);
1274        Ok(())
1275    }
1276
1277    #[pyo3(name = "subscribe_instrument_close")]
1278    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1279    fn py_subscribe_instrument_close(
1280        &mut self,
1281        instrument_id: InstrumentId,
1282        client_id: Option<ClientId>,
1283        params: Option<IndexMap<String, String>>,
1284    ) -> PyResult<()> {
1285        DataActor::subscribe_instrument_close(self.inner_mut(), instrument_id, client_id, params);
1286        Ok(())
1287    }
1288
1289    #[pyo3(name = "subscribe_order_fills")]
1290    #[pyo3(signature = (instrument_id))]
1291    fn py_subscribe_order_fills(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1292        DataActor::subscribe_order_fills(self.inner_mut(), instrument_id);
1293        Ok(())
1294    }
1295
1296    #[pyo3(name = "subscribe_order_cancels")]
1297    #[pyo3(signature = (instrument_id))]
1298    fn py_subscribe_order_cancels(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1299        DataActor::subscribe_order_cancels(self.inner_mut(), instrument_id);
1300        Ok(())
1301    }
1302
1303    #[cfg(feature = "defi")]
1304    #[pyo3(name = "subscribe_blocks")]
1305    #[pyo3(signature = (chain, client_id=None, params=None))]
1306    fn py_subscribe_blocks(
1307        &mut self,
1308        chain: Blockchain,
1309        client_id: Option<ClientId>,
1310        params: Option<IndexMap<String, String>>,
1311    ) -> PyResult<()> {
1312        DataActor::subscribe_blocks(self.inner_mut(), chain, client_id, params);
1313        Ok(())
1314    }
1315
1316    #[cfg(feature = "defi")]
1317    #[pyo3(name = "subscribe_pool")]
1318    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1319    fn py_subscribe_pool(
1320        &mut self,
1321        instrument_id: InstrumentId,
1322        client_id: Option<ClientId>,
1323        params: Option<IndexMap<String, String>>,
1324    ) -> PyResult<()> {
1325        DataActor::subscribe_pool(self.inner_mut(), instrument_id, client_id, params);
1326        Ok(())
1327    }
1328
1329    #[cfg(feature = "defi")]
1330    #[pyo3(name = "subscribe_pool_swaps")]
1331    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1332    fn py_subscribe_pool_swaps(
1333        &mut self,
1334        instrument_id: InstrumentId,
1335        client_id: Option<ClientId>,
1336        params: Option<IndexMap<String, String>>,
1337    ) -> PyResult<()> {
1338        DataActor::subscribe_pool_swaps(self.inner_mut(), instrument_id, client_id, params);
1339        Ok(())
1340    }
1341
1342    #[cfg(feature = "defi")]
1343    #[pyo3(name = "subscribe_pool_liquidity_updates")]
1344    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1345    fn py_subscribe_pool_liquidity_updates(
1346        &mut self,
1347        instrument_id: InstrumentId,
1348        client_id: Option<ClientId>,
1349        params: Option<IndexMap<String, String>>,
1350    ) -> PyResult<()> {
1351        DataActor::subscribe_pool_liquidity_updates(
1352            self.inner_mut(),
1353            instrument_id,
1354            client_id,
1355            params,
1356        );
1357        Ok(())
1358    }
1359
1360    #[cfg(feature = "defi")]
1361    #[pyo3(name = "subscribe_pool_fee_collects")]
1362    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1363    fn py_subscribe_pool_fee_collects(
1364        &mut self,
1365        instrument_id: InstrumentId,
1366        client_id: Option<ClientId>,
1367        params: Option<IndexMap<String, String>>,
1368    ) -> PyResult<()> {
1369        DataActor::subscribe_pool_fee_collects(self.inner_mut(), instrument_id, client_id, params);
1370        Ok(())
1371    }
1372
1373    #[cfg(feature = "defi")]
1374    #[pyo3(name = "subscribe_pool_flash_events")]
1375    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1376    fn py_subscribe_pool_flash_events(
1377        &mut self,
1378        instrument_id: InstrumentId,
1379        client_id: Option<ClientId>,
1380        params: Option<IndexMap<String, String>>,
1381    ) -> PyResult<()> {
1382        DataActor::subscribe_pool_flash_events(self.inner_mut(), instrument_id, client_id, params);
1383        Ok(())
1384    }
1385
1386    #[pyo3(name = "request_data")]
1387    #[pyo3(signature = (data_type, client_id, start=None, end=None, limit=None, params=None))]
1388    fn py_request_data(
1389        &mut self,
1390        data_type: DataType,
1391        client_id: ClientId,
1392        start: Option<u64>,
1393        end: Option<u64>,
1394        limit: Option<usize>,
1395        params: Option<IndexMap<String, String>>,
1396    ) -> PyResult<String> {
1397        let limit = limit.and_then(NonZeroUsize::new);
1398        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1399        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1400
1401        let request_id = DataActor::request_data(
1402            self.inner_mut(),
1403            data_type,
1404            client_id,
1405            start,
1406            end,
1407            limit,
1408            params,
1409        )
1410        .map_err(to_pyvalue_err)?;
1411        Ok(request_id.to_string())
1412    }
1413
1414    #[pyo3(name = "request_instrument")]
1415    #[pyo3(signature = (instrument_id, start=None, end=None, client_id=None, params=None))]
1416    fn py_request_instrument(
1417        &mut self,
1418        instrument_id: InstrumentId,
1419        start: Option<u64>,
1420        end: Option<u64>,
1421        client_id: Option<ClientId>,
1422        params: Option<IndexMap<String, String>>,
1423    ) -> PyResult<String> {
1424        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1425        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1426
1427        let request_id = DataActor::request_instrument(
1428            self.inner_mut(),
1429            instrument_id,
1430            start,
1431            end,
1432            client_id,
1433            params,
1434        )
1435        .map_err(to_pyvalue_err)?;
1436        Ok(request_id.to_string())
1437    }
1438
1439    #[pyo3(name = "request_instruments")]
1440    #[pyo3(signature = (venue=None, start=None, end=None, client_id=None, params=None))]
1441    fn py_request_instruments(
1442        &mut self,
1443        venue: Option<Venue>,
1444        start: Option<u64>,
1445        end: Option<u64>,
1446        client_id: Option<ClientId>,
1447        params: Option<IndexMap<String, String>>,
1448    ) -> PyResult<String> {
1449        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1450        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1451
1452        let request_id =
1453            DataActor::request_instruments(self.inner_mut(), venue, start, end, client_id, params)
1454                .map_err(to_pyvalue_err)?;
1455        Ok(request_id.to_string())
1456    }
1457
1458    #[pyo3(name = "request_book_snapshot")]
1459    #[pyo3(signature = (instrument_id, depth=None, client_id=None, params=None))]
1460    fn py_request_book_snapshot(
1461        &mut self,
1462        instrument_id: InstrumentId,
1463        depth: Option<usize>,
1464        client_id: Option<ClientId>,
1465        params: Option<IndexMap<String, String>>,
1466    ) -> PyResult<String> {
1467        let depth = depth.and_then(NonZeroUsize::new);
1468
1469        let request_id = DataActor::request_book_snapshot(
1470            self.inner_mut(),
1471            instrument_id,
1472            depth,
1473            client_id,
1474            params,
1475        )
1476        .map_err(to_pyvalue_err)?;
1477        Ok(request_id.to_string())
1478    }
1479
1480    #[pyo3(name = "request_quotes")]
1481    #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1482    fn py_request_quotes(
1483        &mut self,
1484        instrument_id: InstrumentId,
1485        start: Option<u64>,
1486        end: Option<u64>,
1487        limit: Option<usize>,
1488        client_id: Option<ClientId>,
1489        params: Option<IndexMap<String, String>>,
1490    ) -> PyResult<String> {
1491        let limit = limit.and_then(NonZeroUsize::new);
1492        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1493        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1494
1495        let request_id = DataActor::request_quotes(
1496            self.inner_mut(),
1497            instrument_id,
1498            start,
1499            end,
1500            limit,
1501            client_id,
1502            params,
1503        )
1504        .map_err(to_pyvalue_err)?;
1505        Ok(request_id.to_string())
1506    }
1507
1508    #[pyo3(name = "request_trades")]
1509    #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1510    fn py_request_trades(
1511        &mut self,
1512        instrument_id: InstrumentId,
1513        start: Option<u64>,
1514        end: Option<u64>,
1515        limit: Option<usize>,
1516        client_id: Option<ClientId>,
1517        params: Option<IndexMap<String, String>>,
1518    ) -> PyResult<String> {
1519        let limit = limit.and_then(NonZeroUsize::new);
1520        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1521        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1522
1523        let request_id = DataActor::request_trades(
1524            self.inner_mut(),
1525            instrument_id,
1526            start,
1527            end,
1528            limit,
1529            client_id,
1530            params,
1531        )
1532        .map_err(to_pyvalue_err)?;
1533        Ok(request_id.to_string())
1534    }
1535
1536    #[pyo3(name = "request_bars")]
1537    #[pyo3(signature = (bar_type, start=None, end=None, limit=None, client_id=None, params=None))]
1538    fn py_request_bars(
1539        &mut self,
1540        bar_type: BarType,
1541        start: Option<u64>,
1542        end: Option<u64>,
1543        limit: Option<usize>,
1544        client_id: Option<ClientId>,
1545        params: Option<IndexMap<String, String>>,
1546    ) -> PyResult<String> {
1547        let limit = limit.and_then(NonZeroUsize::new);
1548        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1549        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1550
1551        let request_id = DataActor::request_bars(
1552            self.inner_mut(),
1553            bar_type,
1554            start,
1555            end,
1556            limit,
1557            client_id,
1558            params,
1559        )
1560        .map_err(to_pyvalue_err)?;
1561        Ok(request_id.to_string())
1562    }
1563
1564    #[pyo3(name = "unsubscribe_data")]
1565    #[pyo3(signature = (data_type, client_id=None, params=None))]
1566    fn py_unsubscribe_data(
1567        &mut self,
1568        data_type: DataType,
1569        client_id: Option<ClientId>,
1570        params: Option<IndexMap<String, String>>,
1571    ) -> PyResult<()> {
1572        DataActor::unsubscribe_data(self.inner_mut(), data_type, client_id, params);
1573        Ok(())
1574    }
1575
1576    #[pyo3(name = "unsubscribe_instruments")]
1577    #[pyo3(signature = (venue, client_id=None, params=None))]
1578    fn py_unsubscribe_instruments(
1579        &mut self,
1580        venue: Venue,
1581        client_id: Option<ClientId>,
1582        params: Option<IndexMap<String, String>>,
1583    ) -> PyResult<()> {
1584        DataActor::unsubscribe_instruments(self.inner_mut(), venue, client_id, params);
1585        Ok(())
1586    }
1587
1588    #[pyo3(name = "unsubscribe_instrument")]
1589    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1590    fn py_unsubscribe_instrument(
1591        &mut self,
1592        instrument_id: InstrumentId,
1593        client_id: Option<ClientId>,
1594        params: Option<IndexMap<String, String>>,
1595    ) -> PyResult<()> {
1596        DataActor::unsubscribe_instrument(self.inner_mut(), instrument_id, client_id, params);
1597        Ok(())
1598    }
1599
1600    #[pyo3(name = "unsubscribe_book_deltas")]
1601    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1602    fn py_unsubscribe_book_deltas(
1603        &mut self,
1604        instrument_id: InstrumentId,
1605        client_id: Option<ClientId>,
1606        params: Option<IndexMap<String, String>>,
1607    ) -> PyResult<()> {
1608        DataActor::unsubscribe_book_deltas(self.inner_mut(), instrument_id, client_id, params);
1609        Ok(())
1610    }
1611
1612    #[pyo3(name = "unsubscribe_book_at_interval")]
1613    #[pyo3(signature = (instrument_id, interval_ms, client_id=None, params=None))]
1614    fn py_unsubscribe_book_at_interval(
1615        &mut self,
1616        instrument_id: InstrumentId,
1617        interval_ms: usize,
1618        client_id: Option<ClientId>,
1619        params: Option<IndexMap<String, String>>,
1620    ) -> PyResult<()> {
1621        let interval_ms = NonZeroUsize::new(interval_ms)
1622            .ok_or_else(|| PyErr::new::<PyValueError, _>("interval_ms must be > 0"))?;
1623
1624        DataActor::unsubscribe_book_at_interval(
1625            self.inner_mut(),
1626            instrument_id,
1627            interval_ms,
1628            client_id,
1629            params,
1630        );
1631        Ok(())
1632    }
1633
1634    #[pyo3(name = "unsubscribe_quotes")]
1635    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1636    fn py_unsubscribe_quotes(
1637        &mut self,
1638        instrument_id: InstrumentId,
1639        client_id: Option<ClientId>,
1640        params: Option<IndexMap<String, String>>,
1641    ) -> PyResult<()> {
1642        DataActor::unsubscribe_quotes(self.inner_mut(), instrument_id, client_id, params);
1643        Ok(())
1644    }
1645
1646    #[pyo3(name = "unsubscribe_trades")]
1647    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1648    fn py_unsubscribe_trades(
1649        &mut self,
1650        instrument_id: InstrumentId,
1651        client_id: Option<ClientId>,
1652        params: Option<IndexMap<String, String>>,
1653    ) -> PyResult<()> {
1654        DataActor::unsubscribe_trades(self.inner_mut(), instrument_id, client_id, params);
1655        Ok(())
1656    }
1657
1658    #[pyo3(name = "unsubscribe_bars")]
1659    #[pyo3(signature = (bar_type, client_id=None, params=None))]
1660    fn py_unsubscribe_bars(
1661        &mut self,
1662        bar_type: BarType,
1663        client_id: Option<ClientId>,
1664        params: Option<IndexMap<String, String>>,
1665    ) -> PyResult<()> {
1666        DataActor::unsubscribe_bars(self.inner_mut(), bar_type, client_id, params);
1667        Ok(())
1668    }
1669
1670    #[pyo3(name = "unsubscribe_mark_prices")]
1671    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1672    fn py_unsubscribe_mark_prices(
1673        &mut self,
1674        instrument_id: InstrumentId,
1675        client_id: Option<ClientId>,
1676        params: Option<IndexMap<String, String>>,
1677    ) -> PyResult<()> {
1678        DataActor::unsubscribe_mark_prices(self.inner_mut(), instrument_id, client_id, params);
1679        Ok(())
1680    }
1681
1682    #[pyo3(name = "unsubscribe_index_prices")]
1683    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1684    fn py_unsubscribe_index_prices(
1685        &mut self,
1686        instrument_id: InstrumentId,
1687        client_id: Option<ClientId>,
1688        params: Option<IndexMap<String, String>>,
1689    ) -> PyResult<()> {
1690        DataActor::unsubscribe_index_prices(self.inner_mut(), instrument_id, client_id, params);
1691        Ok(())
1692    }
1693
1694    #[pyo3(name = "unsubscribe_instrument_status")]
1695    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1696    fn py_unsubscribe_instrument_status(
1697        &mut self,
1698        instrument_id: InstrumentId,
1699        client_id: Option<ClientId>,
1700        params: Option<IndexMap<String, String>>,
1701    ) -> PyResult<()> {
1702        DataActor::unsubscribe_instrument_status(
1703            self.inner_mut(),
1704            instrument_id,
1705            client_id,
1706            params,
1707        );
1708        Ok(())
1709    }
1710
1711    #[pyo3(name = "unsubscribe_instrument_close")]
1712    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1713    fn py_unsubscribe_instrument_close(
1714        &mut self,
1715        instrument_id: InstrumentId,
1716        client_id: Option<ClientId>,
1717        params: Option<IndexMap<String, String>>,
1718    ) -> PyResult<()> {
1719        DataActor::unsubscribe_instrument_close(self.inner_mut(), instrument_id, client_id, params);
1720        Ok(())
1721    }
1722
1723    #[pyo3(name = "unsubscribe_order_fills")]
1724    #[pyo3(signature = (instrument_id))]
1725    fn py_unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1726        DataActor::unsubscribe_order_fills(self.inner_mut(), instrument_id);
1727        Ok(())
1728    }
1729
1730    #[pyo3(name = "unsubscribe_order_cancels")]
1731    #[pyo3(signature = (instrument_id))]
1732    fn py_unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1733        DataActor::unsubscribe_order_cancels(self.inner_mut(), instrument_id);
1734        Ok(())
1735    }
1736
1737    #[cfg(feature = "defi")]
1738    #[pyo3(name = "unsubscribe_blocks")]
1739    #[pyo3(signature = (chain, client_id=None, params=None))]
1740    fn py_unsubscribe_blocks(
1741        &mut self,
1742        chain: Blockchain,
1743        client_id: Option<ClientId>,
1744        params: Option<IndexMap<String, String>>,
1745    ) -> PyResult<()> {
1746        DataActor::unsubscribe_blocks(self.inner_mut(), chain, client_id, params);
1747        Ok(())
1748    }
1749
1750    #[cfg(feature = "defi")]
1751    #[pyo3(name = "unsubscribe_pool")]
1752    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1753    fn py_unsubscribe_pool(
1754        &mut self,
1755        instrument_id: InstrumentId,
1756        client_id: Option<ClientId>,
1757        params: Option<IndexMap<String, String>>,
1758    ) -> PyResult<()> {
1759        DataActor::unsubscribe_pool(self.inner_mut(), instrument_id, client_id, params);
1760        Ok(())
1761    }
1762
1763    #[cfg(feature = "defi")]
1764    #[pyo3(name = "unsubscribe_pool_swaps")]
1765    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1766    fn py_unsubscribe_pool_swaps(
1767        &mut self,
1768        instrument_id: InstrumentId,
1769        client_id: Option<ClientId>,
1770        params: Option<IndexMap<String, String>>,
1771    ) -> PyResult<()> {
1772        DataActor::unsubscribe_pool_swaps(self.inner_mut(), instrument_id, client_id, params);
1773        Ok(())
1774    }
1775
1776    #[cfg(feature = "defi")]
1777    #[pyo3(name = "unsubscribe_pool_liquidity_updates")]
1778    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1779    fn py_unsubscribe_pool_liquidity_updates(
1780        &mut self,
1781        instrument_id: InstrumentId,
1782        client_id: Option<ClientId>,
1783        params: Option<IndexMap<String, String>>,
1784    ) -> PyResult<()> {
1785        DataActor::unsubscribe_pool_liquidity_updates(
1786            self.inner_mut(),
1787            instrument_id,
1788            client_id,
1789            params,
1790        );
1791        Ok(())
1792    }
1793
1794    #[cfg(feature = "defi")]
1795    #[pyo3(name = "unsubscribe_pool_fee_collects")]
1796    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1797    fn py_unsubscribe_pool_fee_collects(
1798        &mut self,
1799        instrument_id: InstrumentId,
1800        client_id: Option<ClientId>,
1801        params: Option<IndexMap<String, String>>,
1802    ) -> PyResult<()> {
1803        DataActor::unsubscribe_pool_fee_collects(
1804            self.inner_mut(),
1805            instrument_id,
1806            client_id,
1807            params,
1808        );
1809        Ok(())
1810    }
1811
1812    #[cfg(feature = "defi")]
1813    #[pyo3(name = "unsubscribe_pool_flash_events")]
1814    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1815    fn py_unsubscribe_pool_flash_events(
1816        &mut self,
1817        instrument_id: InstrumentId,
1818        client_id: Option<ClientId>,
1819        params: Option<IndexMap<String, String>>,
1820    ) -> PyResult<()> {
1821        DataActor::unsubscribe_pool_flash_events(
1822            self.inner_mut(),
1823            instrument_id,
1824            client_id,
1825            params,
1826        );
1827        Ok(())
1828    }
1829
1830    #[allow(unused_variables)]
1831    #[pyo3(name = "on_historical_data")]
1832    fn py_on_historical_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
1833        // Default implementation - can be overridden in Python subclasses
1834        Ok(())
1835    }
1836
1837    #[allow(unused_variables)]
1838    #[pyo3(name = "on_historical_quotes")]
1839    fn py_on_historical_quotes(&mut self, quotes: Vec<QuoteTick>) -> PyResult<()> {
1840        // Default implementation - can be overridden in Python subclasses
1841        Ok(())
1842    }
1843
1844    #[allow(unused_variables)]
1845    #[pyo3(name = "on_historical_trades")]
1846    fn py_on_historical_trades(&mut self, trades: Vec<TradeTick>) -> PyResult<()> {
1847        // Default implementation - can be overridden in Python subclasses
1848        Ok(())
1849    }
1850
1851    #[allow(unused_variables)]
1852    #[pyo3(name = "on_historical_bars")]
1853    fn py_on_historical_bars(&mut self, bars: Vec<Bar>) -> PyResult<()> {
1854        // Default implementation - can be overridden in Python subclasses
1855        Ok(())
1856    }
1857
1858    #[allow(unused_variables)]
1859    #[pyo3(name = "on_historical_mark_prices")]
1860    fn py_on_historical_mark_prices(&mut self, mark_prices: Vec<MarkPriceUpdate>) -> PyResult<()> {
1861        // Default implementation - can be overridden in Python subclasses
1862        Ok(())
1863    }
1864
1865    #[allow(unused_variables)]
1866    #[pyo3(name = "on_historical_index_prices")]
1867    fn py_on_historical_index_prices(
1868        &mut self,
1869        index_prices: Vec<IndexPriceUpdate>,
1870    ) -> PyResult<()> {
1871        // Default implementation - can be overridden in Python subclasses
1872        Ok(())
1873    }
1874}
1875
1876#[cfg(test)]
1877mod tests {
1878    use std::{
1879        any::Any,
1880        cell::RefCell,
1881        collections::HashMap,
1882        ops::{Deref, DerefMut},
1883        rc::Rc,
1884        str::FromStr,
1885        sync::{Arc, Mutex},
1886    };
1887
1888    #[cfg(feature = "defi")]
1889    use alloy_primitives::{I256, U160};
1890    use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
1891    #[cfg(feature = "defi")]
1892    use nautilus_model::defi::{
1893        AmmType, Block, Blockchain, Chain, Dex, DexType, Pool, PoolIdentifier, PoolLiquidityUpdate,
1894        PoolSwap, Token,
1895    };
1896    use nautilus_model::{
1897        data::{
1898            Bar, BarType, DataType, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
1899            OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
1900        },
1901        enums::{AggressorSide, BookType, InstrumentCloseType, MarketStatusAction},
1902        identifiers::{ClientId, TradeId, TraderId, Venue},
1903        instruments::{CurrencyPair, InstrumentAny, stubs::audusd_sim},
1904        orderbook::OrderBook,
1905        types::{Price, Quantity},
1906    };
1907    use pyo3::{Py, PyAny, PyResult, Python, ffi::c_str, types::PyAnyMethods};
1908    use rstest::{fixture, rstest};
1909    use ustr::Ustr;
1910
1911    use super::PyDataActor;
1912    use crate::{
1913        actor::{DataActor, data_actor::DataActorCore},
1914        cache::Cache,
1915        clock::TestClock,
1916        component::Component,
1917        enums::ComponentState,
1918        runner::{SyncDataCommandSender, set_data_cmd_sender},
1919        signal::Signal,
1920        timer::TimeEvent,
1921    };
1922
1923    #[fixture]
1924    fn clock() -> Rc<RefCell<TestClock>> {
1925        Rc::new(RefCell::new(TestClock::new()))
1926    }
1927
1928    #[fixture]
1929    fn cache() -> Rc<RefCell<Cache>> {
1930        Rc::new(RefCell::new(Cache::new(None, None)))
1931    }
1932
1933    #[fixture]
1934    fn trader_id() -> TraderId {
1935        TraderId::from("TRADER-001")
1936    }
1937
1938    #[fixture]
1939    fn client_id() -> ClientId {
1940        ClientId::new("TestClient")
1941    }
1942
1943    #[fixture]
1944    fn venue() -> Venue {
1945        Venue::from("SIM")
1946    }
1947
1948    #[fixture]
1949    fn data_type() -> DataType {
1950        DataType::new("TestData", None)
1951    }
1952
1953    #[fixture]
1954    fn bar_type(audusd_sim: CurrencyPair) -> BarType {
1955        BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap()
1956    }
1957
1958    fn create_unregistered_actor() -> PyDataActor {
1959        PyDataActor::new(None)
1960    }
1961
1962    fn create_registered_actor(
1963        clock: Rc<RefCell<TestClock>>,
1964        cache: Rc<RefCell<Cache>>,
1965        trader_id: TraderId,
1966    ) -> PyDataActor {
1967        // Set up sync data command sender for tests
1968        let sender = SyncDataCommandSender;
1969        set_data_cmd_sender(Arc::new(sender));
1970
1971        let mut actor = PyDataActor::new(None);
1972        actor.register(trader_id, clock, cache).unwrap();
1973        actor
1974    }
1975
1976    #[rstest]
1977    fn test_new_actor_creation() {
1978        let actor = PyDataActor::new(None);
1979        assert!(actor.trader_id().is_none());
1980    }
1981
1982    #[rstest]
1983    fn test_clock_access_before_registration_raises_error() {
1984        let actor = PyDataActor::new(None);
1985
1986        // Accessing clock before registration should raise PyRuntimeError
1987        let result = actor.py_clock();
1988        assert!(result.is_err());
1989
1990        let error = result.unwrap_err();
1991        pyo3::Python::initialize();
1992        pyo3::Python::attach(|py| {
1993            assert!(error.is_instance_of::<pyo3::exceptions::PyRuntimeError>(py));
1994        });
1995
1996        let error_msg = error.to_string();
1997        assert!(
1998            error_msg.contains("Actor must be registered with a trader before accessing clock")
1999        );
2000    }
2001
2002    #[rstest]
2003    fn test_unregistered_actor_methods_work() {
2004        let actor = create_unregistered_actor();
2005
2006        assert!(!actor.py_is_ready());
2007        assert!(!actor.py_is_running());
2008        assert!(!actor.py_is_stopped());
2009        assert!(!actor.py_is_disposed());
2010        assert!(!actor.py_is_degraded());
2011        assert!(!actor.py_is_faulted());
2012
2013        // Verify unregistered state
2014        assert_eq!(actor.trader_id(), None);
2015    }
2016
2017    #[rstest]
2018    fn test_registration_success(
2019        clock: Rc<RefCell<TestClock>>,
2020        cache: Rc<RefCell<Cache>>,
2021        trader_id: TraderId,
2022    ) {
2023        let mut actor = create_unregistered_actor();
2024        actor.register(trader_id, clock, cache).unwrap();
2025        assert!(actor.trader_id().is_some());
2026        assert_eq!(actor.trader_id().unwrap(), trader_id);
2027    }
2028
2029    #[rstest]
2030    fn test_registered_actor_basic_properties(
2031        clock: Rc<RefCell<TestClock>>,
2032        cache: Rc<RefCell<Cache>>,
2033        trader_id: TraderId,
2034    ) {
2035        let actor = create_registered_actor(clock, cache, trader_id);
2036
2037        assert_eq!(actor.state(), ComponentState::Ready);
2038        assert_eq!(actor.trader_id(), Some(TraderId::from("TRADER-001")));
2039        assert!(actor.py_is_ready());
2040        assert!(!actor.py_is_running());
2041        assert!(!actor.py_is_stopped());
2042        assert!(!actor.py_is_disposed());
2043        assert!(!actor.py_is_degraded());
2044        assert!(!actor.py_is_faulted());
2045    }
2046
2047    #[rstest]
2048    fn test_basic_subscription_methods_compile(
2049        clock: Rc<RefCell<TestClock>>,
2050        cache: Rc<RefCell<Cache>>,
2051        trader_id: TraderId,
2052        data_type: DataType,
2053        client_id: ClientId,
2054        audusd_sim: CurrencyPair,
2055    ) {
2056        let mut actor = create_registered_actor(clock, cache, trader_id);
2057
2058        assert!(
2059            actor
2060                .py_subscribe_data(data_type.clone(), Some(client_id), None)
2061                .is_ok()
2062        );
2063        assert!(
2064            actor
2065                .py_subscribe_quotes(audusd_sim.id, Some(client_id), None)
2066                .is_ok()
2067        );
2068        assert!(
2069            actor
2070                .py_unsubscribe_data(data_type, Some(client_id), None)
2071                .is_ok()
2072        );
2073        assert!(
2074            actor
2075                .py_unsubscribe_quotes(audusd_sim.id, Some(client_id), None)
2076                .is_ok()
2077        );
2078    }
2079
2080    #[rstest]
2081    fn test_shutdown_system_passes_through(
2082        clock: Rc<RefCell<TestClock>>,
2083        cache: Rc<RefCell<Cache>>,
2084        trader_id: TraderId,
2085    ) {
2086        let actor = create_registered_actor(clock, cache, trader_id);
2087
2088        assert!(
2089            actor
2090                .py_shutdown_system(Some("Test shutdown".to_string()))
2091                .is_ok()
2092        );
2093        assert!(actor.py_shutdown_system(None).is_ok());
2094    }
2095
2096    #[rstest]
2097    fn test_book_at_interval_invalid_interval_ms(
2098        clock: Rc<RefCell<TestClock>>,
2099        cache: Rc<RefCell<Cache>>,
2100        trader_id: TraderId,
2101        audusd_sim: CurrencyPair,
2102    ) {
2103        pyo3::Python::initialize();
2104        let mut actor = create_registered_actor(clock, cache, trader_id);
2105
2106        let result = actor.py_subscribe_book_at_interval(
2107            audusd_sim.id,
2108            BookType::L2_MBP,
2109            0,
2110            None,
2111            None,
2112            None,
2113        );
2114        assert!(result.is_err());
2115        assert_eq!(
2116            result.unwrap_err().to_string(),
2117            "ValueError: interval_ms must be > 0"
2118        );
2119
2120        let result = actor.py_unsubscribe_book_at_interval(audusd_sim.id, 0, None, None);
2121        assert!(result.is_err());
2122        assert_eq!(
2123            result.unwrap_err().to_string(),
2124            "ValueError: interval_ms must be > 0"
2125        );
2126    }
2127
2128    #[rstest]
2129    fn test_request_methods_signatures_exist() {
2130        let actor = create_unregistered_actor();
2131        assert!(actor.trader_id().is_none());
2132    }
2133
2134    #[rstest]
2135    fn test_data_actor_trait_implementation(
2136        clock: Rc<RefCell<TestClock>>,
2137        cache: Rc<RefCell<Cache>>,
2138        trader_id: TraderId,
2139    ) {
2140        let actor = create_registered_actor(clock, cache, trader_id);
2141        let state = actor.state();
2142        assert_eq!(state, ComponentState::Ready);
2143    }
2144
2145    static CALL_TRACKER: std::sync::LazyLock<Arc<Mutex<HashMap<String, i32>>>> =
2146        std::sync::LazyLock::new(|| Arc::new(Mutex::new(HashMap::new())));
2147
2148    #[derive(Debug)]
2149    struct TestDataActor {
2150        inner: PyDataActor,
2151    }
2152
2153    impl TestDataActor {
2154        fn new() -> Self {
2155            Self {
2156                inner: PyDataActor::new(None),
2157            }
2158        }
2159
2160        fn track_call(&self, handler_name: &str) {
2161            let mut tracker = CALL_TRACKER.lock().expect(MUTEX_POISONED);
2162            *tracker.entry(handler_name.to_string()).or_insert(0) += 1;
2163        }
2164
2165        fn get_call_count(&self, handler_name: &str) -> i32 {
2166            let tracker = CALL_TRACKER.lock().expect(MUTEX_POISONED);
2167            tracker.get(handler_name).copied().unwrap_or(0)
2168        }
2169
2170        fn reset_tracker(&self) {
2171            let mut tracker = CALL_TRACKER.lock().expect(MUTEX_POISONED);
2172            tracker.clear();
2173        }
2174    }
2175
2176    impl Deref for TestDataActor {
2177        type Target = DataActorCore;
2178        fn deref(&self) -> &Self::Target {
2179            &self.inner.inner().core
2180        }
2181    }
2182
2183    impl DerefMut for TestDataActor {
2184        fn deref_mut(&mut self) -> &mut Self::Target {
2185            &mut self.inner.inner_mut().core
2186        }
2187    }
2188
2189    impl DataActor for TestDataActor {
2190        fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
2191            self.track_call("on_time_event");
2192            self.inner.inner_mut().on_time_event(event)
2193        }
2194
2195        fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
2196            self.track_call("on_data");
2197            self.inner.inner_mut().on_data(data)
2198        }
2199
2200        fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
2201            self.track_call("on_signal");
2202            self.inner.inner_mut().on_signal(signal)
2203        }
2204
2205        fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
2206            self.track_call("on_instrument");
2207            self.inner.inner_mut().on_instrument(instrument)
2208        }
2209
2210        fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
2211            self.track_call("on_quote");
2212            self.inner.inner_mut().on_quote(quote)
2213        }
2214
2215        fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
2216            self.track_call("on_trade");
2217            self.inner.inner_mut().on_trade(trade)
2218        }
2219
2220        fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
2221            self.track_call("on_bar");
2222            self.inner.inner_mut().on_bar(bar)
2223        }
2224
2225        fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
2226            self.track_call("on_book");
2227            self.inner.inner_mut().on_book(book)
2228        }
2229
2230        fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
2231            self.track_call("on_book_deltas");
2232            self.inner.inner_mut().on_book_deltas(deltas)
2233        }
2234
2235        fn on_mark_price(&mut self, update: &MarkPriceUpdate) -> anyhow::Result<()> {
2236            self.track_call("on_mark_price");
2237            self.inner.inner_mut().on_mark_price(update)
2238        }
2239
2240        fn on_index_price(&mut self, update: &IndexPriceUpdate) -> anyhow::Result<()> {
2241            self.track_call("on_index_price");
2242            self.inner.inner_mut().on_index_price(update)
2243        }
2244
2245        fn on_instrument_status(&mut self, update: &InstrumentStatus) -> anyhow::Result<()> {
2246            self.track_call("on_instrument_status");
2247            self.inner.inner_mut().on_instrument_status(update)
2248        }
2249
2250        fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
2251            self.track_call("on_instrument_close");
2252            self.inner.inner_mut().on_instrument_close(update)
2253        }
2254
2255        #[cfg(feature = "defi")]
2256        fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
2257            self.track_call("on_block");
2258            self.inner.inner_mut().on_block(block)
2259        }
2260
2261        #[cfg(feature = "defi")]
2262        fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
2263            self.track_call("on_pool");
2264            self.inner.inner_mut().on_pool(pool)
2265        }
2266
2267        #[cfg(feature = "defi")]
2268        fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
2269            self.track_call("on_pool_swap");
2270            self.inner.inner_mut().on_pool_swap(swap)
2271        }
2272
2273        #[cfg(feature = "defi")]
2274        fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
2275            self.track_call("on_pool_liquidity_update");
2276            self.inner.inner_mut().on_pool_liquidity_update(update)
2277        }
2278    }
2279
2280    #[rstest]
2281    fn test_python_on_signal_handler(
2282        clock: Rc<RefCell<TestClock>>,
2283        cache: Rc<RefCell<Cache>>,
2284        trader_id: TraderId,
2285    ) {
2286        pyo3::Python::initialize();
2287        let mut test_actor = TestDataActor::new();
2288        test_actor.reset_tracker();
2289        test_actor.register(trader_id, clock, cache).unwrap();
2290
2291        let signal = Signal::new(
2292            Ustr::from("test_signal"),
2293            "1.0".to_string(),
2294            UnixNanos::default(),
2295            UnixNanos::default(),
2296        );
2297
2298        assert!(test_actor.on_signal(&signal).is_ok());
2299        assert_eq!(test_actor.get_call_count("on_signal"), 1);
2300    }
2301
2302    #[rstest]
2303    fn test_python_on_data_handler(
2304        clock: Rc<RefCell<TestClock>>,
2305        cache: Rc<RefCell<Cache>>,
2306        trader_id: TraderId,
2307    ) {
2308        pyo3::Python::initialize();
2309        let mut test_actor = TestDataActor::new();
2310        test_actor.reset_tracker();
2311        test_actor.register(trader_id, clock, cache).unwrap();
2312
2313        assert!(test_actor.on_data(&()).is_ok());
2314        assert_eq!(test_actor.get_call_count("on_data"), 1);
2315    }
2316
2317    #[rstest]
2318    fn test_python_on_time_event_handler(
2319        clock: Rc<RefCell<TestClock>>,
2320        cache: Rc<RefCell<Cache>>,
2321        trader_id: TraderId,
2322    ) {
2323        pyo3::Python::initialize();
2324        let mut test_actor = TestDataActor::new();
2325        test_actor.reset_tracker();
2326        test_actor.register(trader_id, clock, cache).unwrap();
2327
2328        let time_event = TimeEvent::new(
2329            Ustr::from("test_timer"),
2330            UUID4::new(),
2331            UnixNanos::default(),
2332            UnixNanos::default(),
2333        );
2334
2335        assert!(test_actor.on_time_event(&time_event).is_ok());
2336        assert_eq!(test_actor.get_call_count("on_time_event"), 1);
2337    }
2338
2339    #[rstest]
2340    fn test_python_on_instrument_handler(
2341        clock: Rc<RefCell<TestClock>>,
2342        cache: Rc<RefCell<Cache>>,
2343        trader_id: TraderId,
2344        audusd_sim: CurrencyPair,
2345    ) {
2346        pyo3::Python::initialize();
2347        let mut rust_actor = PyDataActor::new(None);
2348        rust_actor.register(trader_id, clock, cache).unwrap();
2349
2350        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2351
2352        assert!(rust_actor.inner_mut().on_instrument(&instrument).is_ok());
2353    }
2354
2355    #[rstest]
2356    fn test_python_on_quote_handler(
2357        clock: Rc<RefCell<TestClock>>,
2358        cache: Rc<RefCell<Cache>>,
2359        trader_id: TraderId,
2360        audusd_sim: CurrencyPair,
2361    ) {
2362        pyo3::Python::initialize();
2363        let mut rust_actor = PyDataActor::new(None);
2364        rust_actor.register(trader_id, clock, cache).unwrap();
2365
2366        let quote = QuoteTick::new(
2367            audusd_sim.id,
2368            Price::from("1.0000"),
2369            Price::from("1.0001"),
2370            Quantity::from("100000"),
2371            Quantity::from("100000"),
2372            UnixNanos::default(),
2373            UnixNanos::default(),
2374        );
2375
2376        assert!(rust_actor.inner_mut().on_quote(&quote).is_ok());
2377    }
2378
2379    #[rstest]
2380    fn test_python_on_trade_handler(
2381        clock: Rc<RefCell<TestClock>>,
2382        cache: Rc<RefCell<Cache>>,
2383        trader_id: TraderId,
2384        audusd_sim: CurrencyPair,
2385    ) {
2386        pyo3::Python::initialize();
2387        let mut rust_actor = PyDataActor::new(None);
2388        rust_actor.register(trader_id, clock, cache).unwrap();
2389
2390        let trade = TradeTick::new(
2391            audusd_sim.id,
2392            Price::from("1.0000"),
2393            Quantity::from("100000"),
2394            AggressorSide::Buyer,
2395            "T123".to_string().into(),
2396            UnixNanos::default(),
2397            UnixNanos::default(),
2398        );
2399
2400        assert!(rust_actor.inner_mut().on_trade(&trade).is_ok());
2401    }
2402
2403    #[rstest]
2404    fn test_python_on_bar_handler(
2405        clock: Rc<RefCell<TestClock>>,
2406        cache: Rc<RefCell<Cache>>,
2407        trader_id: TraderId,
2408        audusd_sim: CurrencyPair,
2409    ) {
2410        pyo3::Python::initialize();
2411        let mut rust_actor = PyDataActor::new(None);
2412        rust_actor.register(trader_id, clock, cache).unwrap();
2413
2414        let bar_type =
2415            BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap();
2416        let bar = Bar::new(
2417            bar_type,
2418            Price::from("1.0000"),
2419            Price::from("1.0001"),
2420            Price::from("0.9999"),
2421            Price::from("1.0000"),
2422            Quantity::from("100000"),
2423            UnixNanos::default(),
2424            UnixNanos::default(),
2425        );
2426
2427        assert!(rust_actor.inner_mut().on_bar(&bar).is_ok());
2428    }
2429
2430    #[rstest]
2431    fn test_python_on_book_handler(
2432        clock: Rc<RefCell<TestClock>>,
2433        cache: Rc<RefCell<Cache>>,
2434        trader_id: TraderId,
2435        audusd_sim: CurrencyPair,
2436    ) {
2437        pyo3::Python::initialize();
2438        let mut rust_actor = PyDataActor::new(None);
2439        rust_actor.register(trader_id, clock, cache).unwrap();
2440
2441        let book = OrderBook::new(audusd_sim.id, BookType::L2_MBP);
2442        assert!(rust_actor.inner_mut().on_book(&book).is_ok());
2443    }
2444
2445    #[rstest]
2446    fn test_python_on_book_deltas_handler(
2447        clock: Rc<RefCell<TestClock>>,
2448        cache: Rc<RefCell<Cache>>,
2449        trader_id: TraderId,
2450        audusd_sim: CurrencyPair,
2451    ) {
2452        pyo3::Python::initialize();
2453        let mut rust_actor = PyDataActor::new(None);
2454        rust_actor.register(trader_id, clock, cache).unwrap();
2455
2456        let delta =
2457            OrderBookDelta::clear(audusd_sim.id, 0, UnixNanos::default(), UnixNanos::default());
2458        let deltas = OrderBookDeltas::new(audusd_sim.id, vec![delta]);
2459
2460        assert!(rust_actor.inner_mut().on_book_deltas(&deltas).is_ok());
2461    }
2462
2463    #[rstest]
2464    fn test_python_on_mark_price_handler(
2465        clock: Rc<RefCell<TestClock>>,
2466        cache: Rc<RefCell<Cache>>,
2467        trader_id: TraderId,
2468        audusd_sim: CurrencyPair,
2469    ) {
2470        pyo3::Python::initialize();
2471        let mut rust_actor = PyDataActor::new(None);
2472        rust_actor.register(trader_id, clock, cache).unwrap();
2473
2474        let mark_price = MarkPriceUpdate::new(
2475            audusd_sim.id,
2476            Price::from("1.0000"),
2477            UnixNanos::default(),
2478            UnixNanos::default(),
2479        );
2480
2481        assert!(rust_actor.inner_mut().on_mark_price(&mark_price).is_ok());
2482    }
2483
2484    #[rstest]
2485    fn test_python_on_index_price_handler(
2486        clock: Rc<RefCell<TestClock>>,
2487        cache: Rc<RefCell<Cache>>,
2488        trader_id: TraderId,
2489        audusd_sim: CurrencyPair,
2490    ) {
2491        pyo3::Python::initialize();
2492        let mut rust_actor = PyDataActor::new(None);
2493        rust_actor.register(trader_id, clock, cache).unwrap();
2494
2495        let index_price = IndexPriceUpdate::new(
2496            audusd_sim.id,
2497            Price::from("1.0000"),
2498            UnixNanos::default(),
2499            UnixNanos::default(),
2500        );
2501
2502        assert!(rust_actor.inner_mut().on_index_price(&index_price).is_ok());
2503    }
2504
2505    #[rstest]
2506    fn test_python_on_instrument_status_handler(
2507        clock: Rc<RefCell<TestClock>>,
2508        cache: Rc<RefCell<Cache>>,
2509        trader_id: TraderId,
2510        audusd_sim: CurrencyPair,
2511    ) {
2512        pyo3::Python::initialize();
2513        let mut rust_actor = PyDataActor::new(None);
2514        rust_actor.register(trader_id, clock, cache).unwrap();
2515
2516        let status = InstrumentStatus::new(
2517            audusd_sim.id,
2518            MarketStatusAction::Trading,
2519            UnixNanos::default(),
2520            UnixNanos::default(),
2521            None,
2522            None,
2523            None,
2524            None,
2525            None,
2526        );
2527
2528        assert!(rust_actor.inner_mut().on_instrument_status(&status).is_ok());
2529    }
2530
2531    #[rstest]
2532    fn test_python_on_instrument_close_handler(
2533        clock: Rc<RefCell<TestClock>>,
2534        cache: Rc<RefCell<Cache>>,
2535        trader_id: TraderId,
2536        audusd_sim: CurrencyPair,
2537    ) {
2538        pyo3::Python::initialize();
2539        let mut rust_actor = PyDataActor::new(None);
2540        rust_actor.register(trader_id, clock, cache).unwrap();
2541
2542        let close = InstrumentClose::new(
2543            audusd_sim.id,
2544            Price::from("1.0000"),
2545            InstrumentCloseType::EndOfSession,
2546            UnixNanos::default(),
2547            UnixNanos::default(),
2548        );
2549
2550        assert!(rust_actor.inner_mut().on_instrument_close(&close).is_ok());
2551    }
2552
2553    #[cfg(feature = "defi")]
2554    #[rstest]
2555    fn test_python_on_block_handler(
2556        clock: Rc<RefCell<TestClock>>,
2557        cache: Rc<RefCell<Cache>>,
2558        trader_id: TraderId,
2559    ) {
2560        pyo3::Python::initialize();
2561        let mut test_actor = TestDataActor::new();
2562        test_actor.reset_tracker();
2563        test_actor.register(trader_id, clock, cache).unwrap();
2564
2565        let block = Block::new(
2566            "0x1234567890abcdef".to_string(),
2567            "0xabcdef1234567890".to_string(),
2568            12345,
2569            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0".into(),
2570            21000,
2571            20000,
2572            UnixNanos::default(),
2573            Some(Blockchain::Ethereum),
2574        );
2575
2576        assert!(test_actor.on_block(&block).is_ok());
2577        assert_eq!(test_actor.get_call_count("on_block"), 1);
2578    }
2579
2580    #[cfg(feature = "defi")]
2581    #[rstest]
2582    fn test_python_on_pool_swap_handler(
2583        clock: Rc<RefCell<TestClock>>,
2584        cache: Rc<RefCell<Cache>>,
2585        trader_id: TraderId,
2586    ) {
2587        pyo3::Python::initialize();
2588        let mut rust_actor = PyDataActor::new(None);
2589        rust_actor.register(trader_id, clock, cache).unwrap();
2590
2591        let chain = Arc::new(Chain::new(Blockchain::Ethereum, 1));
2592        let dex = Arc::new(Dex::new(
2593            Chain::new(Blockchain::Ethereum, 1),
2594            DexType::UniswapV3,
2595            "0x1F98431c8aD98523631AE4a59f267346ea31F984",
2596            0,
2597            AmmType::CLAMM,
2598            "PoolCreated",
2599            "Swap",
2600            "Mint",
2601            "Burn",
2602            "Collect",
2603        ));
2604        let token0 = Token::new(
2605            chain.clone(),
2606            "0xa0b86a33e6441c8c06dd7b111a8c4e82e2b2a5e1"
2607                .parse()
2608                .unwrap(),
2609            "USDC".into(),
2610            "USD Coin".into(),
2611            6,
2612        );
2613        let token1 = Token::new(
2614            chain.clone(),
2615            "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
2616                .parse()
2617                .unwrap(),
2618            "WETH".into(),
2619            "Wrapped Ether".into(),
2620            18,
2621        );
2622        let pool_address = "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
2623            .parse()
2624            .unwrap();
2625        let pool_identifier: PoolIdentifier = "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
2626            .parse()
2627            .unwrap();
2628        let pool = Arc::new(Pool::new(
2629            chain.clone(),
2630            dex.clone(),
2631            pool_address,
2632            pool_identifier,
2633            12345,
2634            token0,
2635            token1,
2636            Some(500),
2637            Some(10),
2638            UnixNanos::default(),
2639        ));
2640
2641        let swap = PoolSwap::new(
2642            chain,
2643            dex,
2644            pool.instrument_id,
2645            pool.pool_identifier,
2646            12345,
2647            "0xabc123".to_string(),
2648            0,
2649            0,
2650            None,
2651            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
2652                .parse()
2653                .unwrap(),
2654            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
2655                .parse()
2656                .unwrap(),
2657            I256::from_str("1000000000000000000").unwrap(),
2658            I256::from_str("400000000000000").unwrap(),
2659            U160::from(59000000000000u128),
2660            1000000,
2661            100,
2662        );
2663
2664        assert!(rust_actor.inner_mut().on_pool_swap(&swap).is_ok());
2665    }
2666
2667    #[cfg(feature = "defi")]
2668    #[rstest]
2669    fn test_python_on_pool_liquidity_update_handler(
2670        clock: Rc<RefCell<TestClock>>,
2671        cache: Rc<RefCell<Cache>>,
2672        trader_id: TraderId,
2673    ) {
2674        pyo3::Python::initialize();
2675        let mut rust_actor = PyDataActor::new(None);
2676        rust_actor.register(trader_id, clock, cache).unwrap();
2677
2678        let block = Block::new(
2679            "0x1234567890abcdef".to_string(),
2680            "0xabcdef1234567890".to_string(),
2681            12345,
2682            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0".into(),
2683            21000,
2684            20000,
2685            UnixNanos::default(),
2686            Some(Blockchain::Ethereum),
2687        );
2688
2689        // We test on_block since PoolLiquidityUpdate construction is complex
2690        assert!(rust_actor.inner_mut().on_block(&block).is_ok());
2691    }
2692
2693    const TRACKING_ACTOR_CODE: &std::ffi::CStr = c_str!(
2694        r#"
2695class TrackingActor:
2696    """A mock Python actor that tracks all method calls."""
2697
2698    def __init__(self):
2699        self.calls = []
2700
2701    def _record(self, method_name, *args):
2702        self.calls.append((method_name, args))
2703
2704    def was_called(self, method_name):
2705        return any(call[0] == method_name for call in self.calls)
2706
2707    def call_count(self, method_name):
2708        return sum(1 for call in self.calls if call[0] == method_name)
2709
2710    def on_start(self):
2711        self._record("on_start")
2712
2713    def on_stop(self):
2714        self._record("on_stop")
2715
2716    def on_resume(self):
2717        self._record("on_resume")
2718
2719    def on_reset(self):
2720        self._record("on_reset")
2721
2722    def on_dispose(self):
2723        self._record("on_dispose")
2724
2725    def on_degrade(self):
2726        self._record("on_degrade")
2727
2728    def on_fault(self):
2729        self._record("on_fault")
2730
2731    def on_time_event(self, event):
2732        self._record("on_time_event", event)
2733
2734    def on_data(self, data):
2735        self._record("on_data", data)
2736
2737    def on_signal(self, signal):
2738        self._record("on_signal", signal)
2739
2740    def on_instrument(self, instrument):
2741        self._record("on_instrument", instrument)
2742
2743    def on_quote(self, quote):
2744        self._record("on_quote", quote)
2745
2746    def on_trade(self, trade):
2747        self._record("on_trade", trade)
2748
2749    def on_bar(self, bar):
2750        self._record("on_bar", bar)
2751
2752    def on_book(self, book):
2753        self._record("on_book", book)
2754
2755    def on_book_deltas(self, deltas):
2756        self._record("on_book_deltas", deltas)
2757
2758    def on_mark_price(self, update):
2759        self._record("on_mark_price", update)
2760
2761    def on_index_price(self, update):
2762        self._record("on_index_price", update)
2763
2764    def on_funding_rate(self, update):
2765        self._record("on_funding_rate", update)
2766
2767    def on_instrument_status(self, status):
2768        self._record("on_instrument_status", status)
2769
2770    def on_instrument_close(self, close):
2771        self._record("on_instrument_close", close)
2772
2773    def on_historical_data(self, data):
2774        self._record("on_historical_data", data)
2775
2776    def on_historical_quotes(self, quotes):
2777        self._record("on_historical_quotes", quotes)
2778
2779    def on_historical_trades(self, trades):
2780        self._record("on_historical_trades", trades)
2781
2782    def on_historical_bars(self, bars):
2783        self._record("on_historical_bars", bars)
2784
2785    def on_historical_mark_prices(self, prices):
2786        self._record("on_historical_mark_prices", prices)
2787
2788    def on_historical_index_prices(self, prices):
2789        self._record("on_historical_index_prices", prices)
2790"#
2791    );
2792
2793    fn create_tracking_python_actor(py: Python<'_>) -> PyResult<Py<PyAny>> {
2794        py.run(TRACKING_ACTOR_CODE, None, None)?;
2795        let tracking_actor_class = py.eval(c_str!("TrackingActor"), None, None)?;
2796        let instance = tracking_actor_class.call0()?;
2797        Ok(instance.unbind())
2798    }
2799
2800    fn python_method_was_called(py_actor: &Py<PyAny>, py: Python<'_>, method_name: &str) -> bool {
2801        py_actor
2802            .call_method1(py, "was_called", (method_name,))
2803            .and_then(|r| r.extract::<bool>(py))
2804            .unwrap_or(false)
2805    }
2806
2807    fn python_method_call_count(py_actor: &Py<PyAny>, py: Python<'_>, method_name: &str) -> i32 {
2808        py_actor
2809            .call_method1(py, "call_count", (method_name,))
2810            .and_then(|r| r.extract::<i32>(py))
2811            .unwrap_or(0)
2812    }
2813
2814    #[rstest]
2815    fn test_python_dispatch_on_start(
2816        clock: Rc<RefCell<TestClock>>,
2817        cache: Rc<RefCell<Cache>>,
2818        trader_id: TraderId,
2819    ) {
2820        pyo3::Python::initialize();
2821        Python::attach(|py| {
2822            let py_actor = create_tracking_python_actor(py).unwrap();
2823
2824            let mut rust_actor = PyDataActor::new(None);
2825            rust_actor.set_python_instance(py_actor.clone_ref(py));
2826            rust_actor.register(trader_id, clock, cache).unwrap();
2827
2828            let result = DataActor::on_start(rust_actor.inner_mut());
2829
2830            assert!(result.is_ok());
2831            assert!(python_method_was_called(&py_actor, py, "on_start"));
2832            assert_eq!(python_method_call_count(&py_actor, py, "on_start"), 1);
2833        });
2834    }
2835
2836    #[rstest]
2837    fn test_python_dispatch_on_stop(
2838        clock: Rc<RefCell<TestClock>>,
2839        cache: Rc<RefCell<Cache>>,
2840        trader_id: TraderId,
2841    ) {
2842        pyo3::Python::initialize();
2843        Python::attach(|py| {
2844            let py_actor = create_tracking_python_actor(py).unwrap();
2845
2846            let mut rust_actor = PyDataActor::new(None);
2847            rust_actor.set_python_instance(py_actor.clone_ref(py));
2848            rust_actor.register(trader_id, clock, cache).unwrap();
2849
2850            let result = DataActor::on_stop(rust_actor.inner_mut());
2851
2852            assert!(result.is_ok());
2853            assert!(python_method_was_called(&py_actor, py, "on_stop"));
2854        });
2855    }
2856
2857    #[rstest]
2858    fn test_python_dispatch_on_resume(
2859        clock: Rc<RefCell<TestClock>>,
2860        cache: Rc<RefCell<Cache>>,
2861        trader_id: TraderId,
2862    ) {
2863        pyo3::Python::initialize();
2864        Python::attach(|py| {
2865            let py_actor = create_tracking_python_actor(py).unwrap();
2866
2867            let mut rust_actor = PyDataActor::new(None);
2868            rust_actor.set_python_instance(py_actor.clone_ref(py));
2869            rust_actor.register(trader_id, clock, cache).unwrap();
2870
2871            let result = DataActor::on_resume(rust_actor.inner_mut());
2872
2873            assert!(result.is_ok());
2874            assert!(python_method_was_called(&py_actor, py, "on_resume"));
2875        });
2876    }
2877
2878    #[rstest]
2879    fn test_python_dispatch_on_reset(
2880        clock: Rc<RefCell<TestClock>>,
2881        cache: Rc<RefCell<Cache>>,
2882        trader_id: TraderId,
2883    ) {
2884        pyo3::Python::initialize();
2885        Python::attach(|py| {
2886            let py_actor = create_tracking_python_actor(py).unwrap();
2887
2888            let mut rust_actor = PyDataActor::new(None);
2889            rust_actor.set_python_instance(py_actor.clone_ref(py));
2890            rust_actor.register(trader_id, clock, cache).unwrap();
2891
2892            let result = DataActor::on_reset(rust_actor.inner_mut());
2893
2894            assert!(result.is_ok());
2895            assert!(python_method_was_called(&py_actor, py, "on_reset"));
2896        });
2897    }
2898
2899    #[rstest]
2900    fn test_python_dispatch_on_dispose(
2901        clock: Rc<RefCell<TestClock>>,
2902        cache: Rc<RefCell<Cache>>,
2903        trader_id: TraderId,
2904    ) {
2905        pyo3::Python::initialize();
2906        Python::attach(|py| {
2907            let py_actor = create_tracking_python_actor(py).unwrap();
2908
2909            let mut rust_actor = PyDataActor::new(None);
2910            rust_actor.set_python_instance(py_actor.clone_ref(py));
2911            rust_actor.register(trader_id, clock, cache).unwrap();
2912
2913            let result = DataActor::on_dispose(rust_actor.inner_mut());
2914
2915            assert!(result.is_ok());
2916            assert!(python_method_was_called(&py_actor, py, "on_dispose"));
2917        });
2918    }
2919
2920    #[rstest]
2921    fn test_python_dispatch_on_degrade(
2922        clock: Rc<RefCell<TestClock>>,
2923        cache: Rc<RefCell<Cache>>,
2924        trader_id: TraderId,
2925    ) {
2926        pyo3::Python::initialize();
2927        Python::attach(|py| {
2928            let py_actor = create_tracking_python_actor(py).unwrap();
2929
2930            let mut rust_actor = PyDataActor::new(None);
2931            rust_actor.set_python_instance(py_actor.clone_ref(py));
2932            rust_actor.register(trader_id, clock, cache).unwrap();
2933
2934            let result = DataActor::on_degrade(rust_actor.inner_mut());
2935
2936            assert!(result.is_ok());
2937            assert!(python_method_was_called(&py_actor, py, "on_degrade"));
2938        });
2939    }
2940
2941    #[rstest]
2942    fn test_python_dispatch_on_fault(
2943        clock: Rc<RefCell<TestClock>>,
2944        cache: Rc<RefCell<Cache>>,
2945        trader_id: TraderId,
2946    ) {
2947        pyo3::Python::initialize();
2948        Python::attach(|py| {
2949            let py_actor = create_tracking_python_actor(py).unwrap();
2950
2951            let mut rust_actor = PyDataActor::new(None);
2952            rust_actor.set_python_instance(py_actor.clone_ref(py));
2953            rust_actor.register(trader_id, clock, cache).unwrap();
2954
2955            let result = DataActor::on_fault(rust_actor.inner_mut());
2956
2957            assert!(result.is_ok());
2958            assert!(python_method_was_called(&py_actor, py, "on_fault"));
2959        });
2960    }
2961
2962    #[rstest]
2963    fn test_python_dispatch_on_signal(
2964        clock: Rc<RefCell<TestClock>>,
2965        cache: Rc<RefCell<Cache>>,
2966        trader_id: TraderId,
2967    ) {
2968        pyo3::Python::initialize();
2969        Python::attach(|py| {
2970            let py_actor = create_tracking_python_actor(py).unwrap();
2971
2972            let mut rust_actor = PyDataActor::new(None);
2973            rust_actor.set_python_instance(py_actor.clone_ref(py));
2974            rust_actor.register(trader_id, clock, cache).unwrap();
2975
2976            let signal = Signal::new(
2977                Ustr::from("test_signal"),
2978                "1.0".to_string(),
2979                UnixNanos::default(),
2980                UnixNanos::default(),
2981            );
2982
2983            let result = rust_actor.inner_mut().on_signal(&signal);
2984
2985            assert!(result.is_ok());
2986            assert!(python_method_was_called(&py_actor, py, "on_signal"));
2987        });
2988    }
2989
2990    #[rstest]
2991    fn test_python_dispatch_on_time_event(
2992        clock: Rc<RefCell<TestClock>>,
2993        cache: Rc<RefCell<Cache>>,
2994        trader_id: TraderId,
2995    ) {
2996        pyo3::Python::initialize();
2997        Python::attach(|py| {
2998            let py_actor = create_tracking_python_actor(py).unwrap();
2999
3000            let mut rust_actor = PyDataActor::new(None);
3001            rust_actor.set_python_instance(py_actor.clone_ref(py));
3002            rust_actor.register(trader_id, clock, cache).unwrap();
3003
3004            let time_event = TimeEvent::new(
3005                Ustr::from("test_timer"),
3006                UUID4::new(),
3007                UnixNanos::default(),
3008                UnixNanos::default(),
3009            );
3010
3011            let result = rust_actor.inner_mut().on_time_event(&time_event);
3012
3013            assert!(result.is_ok());
3014            assert!(python_method_was_called(&py_actor, py, "on_time_event"));
3015        });
3016    }
3017
3018    #[rstest]
3019    fn test_python_dispatch_on_instrument(
3020        clock: Rc<RefCell<TestClock>>,
3021        cache: Rc<RefCell<Cache>>,
3022        trader_id: TraderId,
3023        audusd_sim: CurrencyPair,
3024    ) {
3025        pyo3::Python::initialize();
3026        Python::attach(|py| {
3027            let py_actor = create_tracking_python_actor(py).unwrap();
3028
3029            let mut rust_actor = PyDataActor::new(None);
3030            rust_actor.set_python_instance(py_actor.clone_ref(py));
3031            rust_actor.register(trader_id, clock, cache).unwrap();
3032
3033            let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3034
3035            let result = rust_actor.inner_mut().on_instrument(&instrument);
3036
3037            assert!(result.is_ok());
3038            assert!(python_method_was_called(&py_actor, py, "on_instrument"));
3039        });
3040    }
3041
3042    #[rstest]
3043    fn test_python_dispatch_on_quote(
3044        clock: Rc<RefCell<TestClock>>,
3045        cache: Rc<RefCell<Cache>>,
3046        trader_id: TraderId,
3047        audusd_sim: CurrencyPair,
3048    ) {
3049        pyo3::Python::initialize();
3050        Python::attach(|py| {
3051            let py_actor = create_tracking_python_actor(py).unwrap();
3052
3053            let mut rust_actor = PyDataActor::new(None);
3054            rust_actor.set_python_instance(py_actor.clone_ref(py));
3055            rust_actor.register(trader_id, clock, cache).unwrap();
3056
3057            let quote = QuoteTick::new(
3058                audusd_sim.id,
3059                Price::from("1.00000"),
3060                Price::from("1.00001"),
3061                Quantity::from(100_000),
3062                Quantity::from(100_000),
3063                UnixNanos::default(),
3064                UnixNanos::default(),
3065            );
3066
3067            let result = rust_actor.inner_mut().on_quote(&quote);
3068
3069            assert!(result.is_ok());
3070            assert!(python_method_was_called(&py_actor, py, "on_quote"));
3071        });
3072    }
3073
3074    #[rstest]
3075    fn test_python_dispatch_on_trade(
3076        clock: Rc<RefCell<TestClock>>,
3077        cache: Rc<RefCell<Cache>>,
3078        trader_id: TraderId,
3079        audusd_sim: CurrencyPair,
3080    ) {
3081        pyo3::Python::initialize();
3082        Python::attach(|py| {
3083            let py_actor = create_tracking_python_actor(py).unwrap();
3084
3085            let mut rust_actor = PyDataActor::new(None);
3086            rust_actor.set_python_instance(py_actor.clone_ref(py));
3087            rust_actor.register(trader_id, clock, cache).unwrap();
3088
3089            let trade = TradeTick::new(
3090                audusd_sim.id,
3091                Price::from("1.00000"),
3092                Quantity::from(100_000),
3093                AggressorSide::Buyer,
3094                TradeId::new("123456"),
3095                UnixNanos::default(),
3096                UnixNanos::default(),
3097            );
3098
3099            let result = rust_actor.inner_mut().on_trade(&trade);
3100
3101            assert!(result.is_ok());
3102            assert!(python_method_was_called(&py_actor, py, "on_trade"));
3103        });
3104    }
3105
3106    #[rstest]
3107    fn test_python_dispatch_on_bar(
3108        clock: Rc<RefCell<TestClock>>,
3109        cache: Rc<RefCell<Cache>>,
3110        trader_id: TraderId,
3111        audusd_sim: CurrencyPair,
3112    ) {
3113        pyo3::Python::initialize();
3114        Python::attach(|py| {
3115            let py_actor = create_tracking_python_actor(py).unwrap();
3116
3117            let mut rust_actor = PyDataActor::new(None);
3118            rust_actor.set_python_instance(py_actor.clone_ref(py));
3119            rust_actor.register(trader_id, clock, cache).unwrap();
3120
3121            let bar_type =
3122                BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap();
3123            let bar = Bar::new(
3124                bar_type,
3125                Price::from("1.00000"),
3126                Price::from("1.00010"),
3127                Price::from("0.99990"),
3128                Price::from("1.00005"),
3129                Quantity::from(100_000),
3130                UnixNanos::default(),
3131                UnixNanos::default(),
3132            );
3133
3134            let result = rust_actor.inner_mut().on_bar(&bar);
3135
3136            assert!(result.is_ok());
3137            assert!(python_method_was_called(&py_actor, py, "on_bar"));
3138        });
3139    }
3140
3141    #[rstest]
3142    fn test_python_dispatch_on_book(
3143        clock: Rc<RefCell<TestClock>>,
3144        cache: Rc<RefCell<Cache>>,
3145        trader_id: TraderId,
3146        audusd_sim: CurrencyPair,
3147    ) {
3148        pyo3::Python::initialize();
3149        Python::attach(|py| {
3150            let py_actor = create_tracking_python_actor(py).unwrap();
3151
3152            let mut rust_actor = PyDataActor::new(None);
3153            rust_actor.set_python_instance(py_actor.clone_ref(py));
3154            rust_actor.register(trader_id, clock, cache).unwrap();
3155
3156            let book = OrderBook::new(audusd_sim.id, BookType::L2_MBP);
3157
3158            let result = rust_actor.inner_mut().on_book(&book);
3159
3160            assert!(result.is_ok());
3161            assert!(python_method_was_called(&py_actor, py, "on_book"));
3162        });
3163    }
3164
3165    #[rstest]
3166    fn test_python_dispatch_on_book_deltas(
3167        clock: Rc<RefCell<TestClock>>,
3168        cache: Rc<RefCell<Cache>>,
3169        trader_id: TraderId,
3170        audusd_sim: CurrencyPair,
3171    ) {
3172        pyo3::Python::initialize();
3173        Python::attach(|py| {
3174            let py_actor = create_tracking_python_actor(py).unwrap();
3175
3176            let mut rust_actor = PyDataActor::new(None);
3177            rust_actor.set_python_instance(py_actor.clone_ref(py));
3178            rust_actor.register(trader_id, clock, cache).unwrap();
3179
3180            let delta =
3181                OrderBookDelta::clear(audusd_sim.id, 0, UnixNanos::default(), UnixNanos::default());
3182            let deltas = OrderBookDeltas::new(audusd_sim.id, vec![delta]);
3183
3184            let result = rust_actor.inner_mut().on_book_deltas(&deltas);
3185
3186            assert!(result.is_ok());
3187            assert!(python_method_was_called(&py_actor, py, "on_book_deltas"));
3188        });
3189    }
3190
3191    #[rstest]
3192    fn test_python_dispatch_on_mark_price(
3193        clock: Rc<RefCell<TestClock>>,
3194        cache: Rc<RefCell<Cache>>,
3195        trader_id: TraderId,
3196        audusd_sim: CurrencyPair,
3197    ) {
3198        pyo3::Python::initialize();
3199        Python::attach(|py| {
3200            let py_actor = create_tracking_python_actor(py).unwrap();
3201
3202            let mut rust_actor = PyDataActor::new(None);
3203            rust_actor.set_python_instance(py_actor.clone_ref(py));
3204            rust_actor.register(trader_id, clock, cache).unwrap();
3205
3206            let mark_price = MarkPriceUpdate::new(
3207                audusd_sim.id,
3208                Price::from("1.00000"),
3209                UnixNanos::default(),
3210                UnixNanos::default(),
3211            );
3212
3213            let result = rust_actor.inner_mut().on_mark_price(&mark_price);
3214
3215            assert!(result.is_ok());
3216            assert!(python_method_was_called(&py_actor, py, "on_mark_price"));
3217        });
3218    }
3219
3220    #[rstest]
3221    fn test_python_dispatch_on_index_price(
3222        clock: Rc<RefCell<TestClock>>,
3223        cache: Rc<RefCell<Cache>>,
3224        trader_id: TraderId,
3225        audusd_sim: CurrencyPair,
3226    ) {
3227        pyo3::Python::initialize();
3228        Python::attach(|py| {
3229            let py_actor = create_tracking_python_actor(py).unwrap();
3230
3231            let mut rust_actor = PyDataActor::new(None);
3232            rust_actor.set_python_instance(py_actor.clone_ref(py));
3233            rust_actor.register(trader_id, clock, cache).unwrap();
3234
3235            let index_price = IndexPriceUpdate::new(
3236                audusd_sim.id,
3237                Price::from("1.00000"),
3238                UnixNanos::default(),
3239                UnixNanos::default(),
3240            );
3241
3242            let result = rust_actor.inner_mut().on_index_price(&index_price);
3243
3244            assert!(result.is_ok());
3245            assert!(python_method_was_called(&py_actor, py, "on_index_price"));
3246        });
3247    }
3248
3249    #[rstest]
3250    fn test_python_dispatch_on_instrument_status(
3251        clock: Rc<RefCell<TestClock>>,
3252        cache: Rc<RefCell<Cache>>,
3253        trader_id: TraderId,
3254        audusd_sim: CurrencyPair,
3255    ) {
3256        pyo3::Python::initialize();
3257        Python::attach(|py| {
3258            let py_actor = create_tracking_python_actor(py).unwrap();
3259
3260            let mut rust_actor = PyDataActor::new(None);
3261            rust_actor.set_python_instance(py_actor.clone_ref(py));
3262            rust_actor.register(trader_id, clock, cache).unwrap();
3263
3264            let status = InstrumentStatus::new(
3265                audusd_sim.id,
3266                MarketStatusAction::Trading,
3267                UnixNanos::default(),
3268                UnixNanos::default(),
3269                None,
3270                None,
3271                None,
3272                None,
3273                None,
3274            );
3275
3276            let result = rust_actor.inner_mut().on_instrument_status(&status);
3277
3278            assert!(result.is_ok());
3279            assert!(python_method_was_called(
3280                &py_actor,
3281                py,
3282                "on_instrument_status"
3283            ));
3284        });
3285    }
3286
3287    #[rstest]
3288    fn test_python_dispatch_on_instrument_close(
3289        clock: Rc<RefCell<TestClock>>,
3290        cache: Rc<RefCell<Cache>>,
3291        trader_id: TraderId,
3292        audusd_sim: CurrencyPair,
3293    ) {
3294        pyo3::Python::initialize();
3295        Python::attach(|py| {
3296            let py_actor = create_tracking_python_actor(py).unwrap();
3297
3298            let mut rust_actor = PyDataActor::new(None);
3299            rust_actor.set_python_instance(py_actor.clone_ref(py));
3300            rust_actor.register(trader_id, clock, cache).unwrap();
3301
3302            let close = InstrumentClose::new(
3303                audusd_sim.id,
3304                Price::from("1.00000"),
3305                InstrumentCloseType::EndOfSession,
3306                UnixNanos::default(),
3307                UnixNanos::default(),
3308            );
3309
3310            let result = rust_actor.inner_mut().on_instrument_close(&close);
3311
3312            assert!(result.is_ok());
3313            assert!(python_method_was_called(
3314                &py_actor,
3315                py,
3316                "on_instrument_close"
3317            ));
3318        });
3319    }
3320
3321    #[rstest]
3322    fn test_python_dispatch_multiple_calls_tracked(
3323        clock: Rc<RefCell<TestClock>>,
3324        cache: Rc<RefCell<Cache>>,
3325        trader_id: TraderId,
3326        audusd_sim: CurrencyPair,
3327    ) {
3328        pyo3::Python::initialize();
3329        Python::attach(|py| {
3330            let py_actor = create_tracking_python_actor(py).unwrap();
3331
3332            let mut rust_actor = PyDataActor::new(None);
3333            rust_actor.set_python_instance(py_actor.clone_ref(py));
3334            rust_actor.register(trader_id, clock, cache).unwrap();
3335
3336            let quote = QuoteTick::new(
3337                audusd_sim.id,
3338                Price::from("1.00000"),
3339                Price::from("1.00001"),
3340                Quantity::from(100_000),
3341                Quantity::from(100_000),
3342                UnixNanos::default(),
3343                UnixNanos::default(),
3344            );
3345
3346            rust_actor.inner_mut().on_quote(&quote).unwrap();
3347            rust_actor.inner_mut().on_quote(&quote).unwrap();
3348            rust_actor.inner_mut().on_quote(&quote).unwrap();
3349
3350            assert_eq!(python_method_call_count(&py_actor, py, "on_quote"), 3);
3351        });
3352    }
3353
3354    #[rstest]
3355    fn test_python_dispatch_no_call_when_py_self_not_set(
3356        clock: Rc<RefCell<TestClock>>,
3357        cache: Rc<RefCell<Cache>>,
3358        trader_id: TraderId,
3359    ) {
3360        pyo3::Python::initialize();
3361        Python::attach(|_py| {
3362            let mut rust_actor = PyDataActor::new(None);
3363            rust_actor.register(trader_id, clock, cache).unwrap();
3364
3365            // When py_self is None, the dispatch returns Ok(()) without calling Python
3366            let result = DataActor::on_start(rust_actor.inner_mut());
3367            assert!(result.is_ok());
3368        });
3369    }
3370}