1use std::{
17 any::Any,
18 cell::{RefCell, UnsafeCell},
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroUsize,
22 ops::{Deref, DerefMut},
23 rc::Rc,
24};
25
26use indexmap::IndexMap;
27use nautilus_core::{
28 nanos::UnixNanos,
29 python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err},
30};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33 Block, Blockchain, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap,
34};
35use nautilus_model::{
36 data::{
37 Bar, BarType, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38 MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
39 },
40 enums::BookType,
41 identifiers::{ActorId, ClientId, InstrumentId, TraderId, Venue},
42 instruments::InstrumentAny,
43 orderbook::OrderBook,
44 python::instruments::instrument_any_to_pyobject,
45};
46use pyo3::{prelude::*, types::PyDict};
47
48use crate::{
49 actor::{
50 Actor, DataActor,
51 data_actor::{DataActorConfig, DataActorCore, ImportableActorConfig},
52 registry::{get_actor_registry, try_get_actor_unchecked},
53 },
54 cache::Cache,
55 clock::Clock,
56 component::{Component, get_component_registry},
57 enums::ComponentState,
58 python::{cache::PyCache, clock::PyClock, logging::PyLogger},
59 signal::Signal,
60 timer::{TimeEvent, TimeEventCallback},
61};
62
63#[pyo3::pymethods]
64impl DataActorConfig {
65 #[new]
66 #[pyo3(signature = (actor_id=None, log_events=true, log_commands=true))]
67 fn py_new(actor_id: Option<ActorId>, log_events: bool, log_commands: bool) -> Self {
68 Self {
69 actor_id,
70 log_events,
71 log_commands,
72 }
73 }
74}
75
76#[pyo3::pymethods]
77impl ImportableActorConfig {
78 #[new]
79 fn py_new(actor_path: String, config_path: String, config: Py<PyDict>) -> PyResult<Self> {
80 let json_config = Python::attach(|py| -> PyResult<HashMap<String, serde_json::Value>> {
81 let json_str: String = PyModule::import(py, "json")?
82 .call_method("dumps", (config.bind(py),), None)?
83 .extract()?;
84
85 let json_value: serde_json::Value =
86 serde_json::from_str(&json_str).map_err(to_pyvalue_err)?;
87
88 if let serde_json::Value::Object(map) = json_value {
89 Ok(map.into_iter().collect())
90 } else {
91 Err(to_pyvalue_err("Config must be a dictionary"))
92 }
93 })?;
94
95 Ok(Self {
96 actor_path,
97 config_path,
98 config: json_config,
99 })
100 }
101
102 #[getter]
103 fn actor_path(&self) -> &String {
104 &self.actor_path
105 }
106
107 #[getter]
108 fn config_path(&self) -> &String {
109 &self.config_path
110 }
111
112 #[getter]
113 fn config(&self, py: Python<'_>) -> PyResult<Py<PyDict>> {
114 let py_dict = PyDict::new(py);
116 for (key, value) in &self.config {
117 let json_str = serde_json::to_string(value).map_err(to_pyvalue_err)?;
119 let py_value = PyModule::import(py, "json")?.call_method("loads", (json_str,), None)?;
120 py_dict.set_item(key, py_value)?;
121 }
122 Ok(py_dict.unbind())
123 }
124}
125
126pub struct PyDataActorInner {
132 core: DataActorCore,
133 py_self: Option<Py<PyAny>>,
134 clock: PyClock,
135 logger: PyLogger,
136}
137
138impl Debug for PyDataActorInner {
139 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 f.debug_struct(stringify!(PyDataActorInner))
141 .field("core", &self.core)
142 .field("py_self", &self.py_self.as_ref().map(|_| "<Py<PyAny>>"))
143 .field("clock", &self.clock)
144 .field("logger", &self.logger)
145 .finish()
146 }
147}
148
149impl Deref for PyDataActorInner {
150 type Target = DataActorCore;
151
152 fn deref(&self) -> &Self::Target {
153 &self.core
154 }
155}
156
157impl DerefMut for PyDataActorInner {
158 fn deref_mut(&mut self) -> &mut Self::Target {
159 &mut self.core
160 }
161}
162
163impl PyDataActorInner {
164 fn dispatch_on_start(&self) -> PyResult<()> {
165 if let Some(ref py_self) = self.py_self {
166 Python::attach(|py| py_self.call_method0(py, "on_start"))?;
167 }
168 Ok(())
169 }
170
171 fn dispatch_on_stop(&mut self) -> PyResult<()> {
172 if let Some(ref py_self) = self.py_self {
173 Python::attach(|py| py_self.call_method0(py, "on_stop"))?;
174 }
175 Ok(())
176 }
177
178 fn dispatch_on_resume(&mut self) -> PyResult<()> {
179 if let Some(ref py_self) = self.py_self {
180 Python::attach(|py| py_self.call_method0(py, "on_resume"))?;
181 }
182 Ok(())
183 }
184
185 fn dispatch_on_reset(&mut self) -> PyResult<()> {
186 if let Some(ref py_self) = self.py_self {
187 Python::attach(|py| py_self.call_method0(py, "on_reset"))?;
188 }
189 Ok(())
190 }
191
192 fn dispatch_on_dispose(&mut self) -> PyResult<()> {
193 if let Some(ref py_self) = self.py_self {
194 Python::attach(|py| py_self.call_method0(py, "on_dispose"))?;
195 }
196 Ok(())
197 }
198
199 fn dispatch_on_degrade(&mut self) -> PyResult<()> {
200 if let Some(ref py_self) = self.py_self {
201 Python::attach(|py| py_self.call_method0(py, "on_degrade"))?;
202 }
203 Ok(())
204 }
205
206 fn dispatch_on_fault(&mut self) -> PyResult<()> {
207 if let Some(ref py_self) = self.py_self {
208 Python::attach(|py| py_self.call_method0(py, "on_fault"))?;
209 }
210 Ok(())
211 }
212
213 fn dispatch_on_time_event(&mut self, event: TimeEvent) -> PyResult<()> {
214 if let Some(ref py_self) = self.py_self {
215 Python::attach(|py| {
216 py_self.call_method1(py, "on_time_event", (event.into_py_any_unwrap(py),))
217 })?;
218 }
219 Ok(())
220 }
221
222 fn dispatch_on_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
223 if let Some(ref py_self) = self.py_self {
224 Python::attach(|py| py_self.call_method1(py, "on_data", (data,)))?;
225 }
226 Ok(())
227 }
228
229 fn dispatch_on_signal(&mut self, signal: &Signal) -> PyResult<()> {
230 if let Some(ref py_self) = self.py_self {
231 Python::attach(|py| {
232 py_self.call_method1(py, "on_signal", (signal.clone().into_py_any_unwrap(py),))
233 })?;
234 }
235 Ok(())
236 }
237
238 fn dispatch_on_instrument(&mut self, instrument: Py<PyAny>) -> PyResult<()> {
239 if let Some(ref py_self) = self.py_self {
240 Python::attach(|py| py_self.call_method1(py, "on_instrument", (instrument,)))?;
241 }
242 Ok(())
243 }
244
245 fn dispatch_on_quote(&mut self, quote: QuoteTick) -> PyResult<()> {
246 if let Some(ref py_self) = self.py_self {
247 Python::attach(|py| {
248 py_self.call_method1(py, "on_quote", (quote.into_py_any_unwrap(py),))
249 })?;
250 }
251 Ok(())
252 }
253
254 fn dispatch_on_trade(&mut self, trade: TradeTick) -> PyResult<()> {
255 if let Some(ref py_self) = self.py_self {
256 Python::attach(|py| {
257 py_self.call_method1(py, "on_trade", (trade.into_py_any_unwrap(py),))
258 })?;
259 }
260 Ok(())
261 }
262
263 fn dispatch_on_bar(&mut self, bar: Bar) -> PyResult<()> {
264 if let Some(ref py_self) = self.py_self {
265 Python::attach(|py| py_self.call_method1(py, "on_bar", (bar.into_py_any_unwrap(py),)))?;
266 }
267 Ok(())
268 }
269
270 fn dispatch_on_book_deltas(&mut self, deltas: OrderBookDeltas) -> PyResult<()> {
271 if let Some(ref py_self) = self.py_self {
272 Python::attach(|py| {
273 py_self.call_method1(py, "on_book_deltas", (deltas.into_py_any_unwrap(py),))
274 })?;
275 }
276 Ok(())
277 }
278
279 fn dispatch_on_book(&mut self, book: &OrderBook) -> PyResult<()> {
280 if let Some(ref py_self) = self.py_self {
281 Python::attach(|py| {
282 py_self.call_method1(py, "on_book", (book.clone().into_py_any_unwrap(py),))
283 })?;
284 }
285 Ok(())
286 }
287
288 fn dispatch_on_mark_price(&mut self, mark_price: MarkPriceUpdate) -> PyResult<()> {
289 if let Some(ref py_self) = self.py_self {
290 Python::attach(|py| {
291 py_self.call_method1(py, "on_mark_price", (mark_price.into_py_any_unwrap(py),))
292 })?;
293 }
294 Ok(())
295 }
296
297 fn dispatch_on_index_price(&mut self, index_price: IndexPriceUpdate) -> PyResult<()> {
298 if let Some(ref py_self) = self.py_self {
299 Python::attach(|py| {
300 py_self.call_method1(py, "on_index_price", (index_price.into_py_any_unwrap(py),))
301 })?;
302 }
303 Ok(())
304 }
305
306 fn dispatch_on_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> PyResult<()> {
307 if let Some(ref py_self) = self.py_self {
308 Python::attach(|py| {
309 py_self.call_method1(
310 py,
311 "on_funding_rate",
312 (funding_rate.into_py_any_unwrap(py),),
313 )
314 })?;
315 }
316 Ok(())
317 }
318
319 fn dispatch_on_instrument_status(&mut self, data: InstrumentStatus) -> PyResult<()> {
320 if let Some(ref py_self) = self.py_self {
321 Python::attach(|py| {
322 py_self.call_method1(py, "on_instrument_status", (data.into_py_any_unwrap(py),))
323 })?;
324 }
325 Ok(())
326 }
327
328 fn dispatch_on_instrument_close(&mut self, update: InstrumentClose) -> PyResult<()> {
329 if let Some(ref py_self) = self.py_self {
330 Python::attach(|py| {
331 py_self.call_method1(py, "on_instrument_close", (update.into_py_any_unwrap(py),))
332 })?;
333 }
334 Ok(())
335 }
336
337 fn dispatch_on_historical_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
338 if let Some(ref py_self) = self.py_self {
339 Python::attach(|py| py_self.call_method1(py, "on_historical_data", (data,)))?;
340 }
341 Ok(())
342 }
343
344 fn dispatch_on_historical_quotes(&mut self, quotes: Vec<QuoteTick>) -> PyResult<()> {
345 if let Some(ref py_self) = self.py_self {
346 Python::attach(|py| {
347 let py_quotes: Vec<_> = quotes
348 .into_iter()
349 .map(|q| q.into_py_any_unwrap(py))
350 .collect();
351 py_self.call_method1(py, "on_historical_quotes", (py_quotes,))
352 })?;
353 }
354 Ok(())
355 }
356
357 fn dispatch_on_historical_trades(&mut self, trades: Vec<TradeTick>) -> PyResult<()> {
358 if let Some(ref py_self) = self.py_self {
359 Python::attach(|py| {
360 let py_trades: Vec<_> = trades
361 .into_iter()
362 .map(|t| t.into_py_any_unwrap(py))
363 .collect();
364 py_self.call_method1(py, "on_historical_trades", (py_trades,))
365 })?;
366 }
367 Ok(())
368 }
369
370 fn dispatch_on_historical_bars(&mut self, bars: Vec<Bar>) -> PyResult<()> {
371 if let Some(ref py_self) = self.py_self {
372 Python::attach(|py| {
373 let py_bars: Vec<_> = bars.into_iter().map(|b| b.into_py_any_unwrap(py)).collect();
374 py_self.call_method1(py, "on_historical_bars", (py_bars,))
375 })?;
376 }
377 Ok(())
378 }
379
380 fn dispatch_on_historical_mark_prices(
381 &mut self,
382 mark_prices: Vec<MarkPriceUpdate>,
383 ) -> PyResult<()> {
384 if let Some(ref py_self) = self.py_self {
385 Python::attach(|py| {
386 let py_prices: Vec<_> = mark_prices
387 .into_iter()
388 .map(|p| p.into_py_any_unwrap(py))
389 .collect();
390 py_self.call_method1(py, "on_historical_mark_prices", (py_prices,))
391 })?;
392 }
393 Ok(())
394 }
395
396 fn dispatch_on_historical_index_prices(
397 &mut self,
398 index_prices: Vec<IndexPriceUpdate>,
399 ) -> PyResult<()> {
400 if let Some(ref py_self) = self.py_self {
401 Python::attach(|py| {
402 let py_prices: Vec<_> = index_prices
403 .into_iter()
404 .map(|p| p.into_py_any_unwrap(py))
405 .collect();
406 py_self.call_method1(py, "on_historical_index_prices", (py_prices,))
407 })?;
408 }
409 Ok(())
410 }
411
412 #[cfg(feature = "defi")]
413 fn dispatch_on_block(&mut self, block: Block) -> PyResult<()> {
414 if let Some(ref py_self) = self.py_self {
415 Python::attach(|py| {
416 py_self.call_method1(py, "on_block", (block.into_py_any_unwrap(py),))
417 })?;
418 }
419 Ok(())
420 }
421
422 #[cfg(feature = "defi")]
423 fn dispatch_on_pool(&mut self, pool: Pool) -> PyResult<()> {
424 if let Some(ref py_self) = self.py_self {
425 Python::attach(|py| {
426 py_self.call_method1(py, "on_pool", (pool.into_py_any_unwrap(py),))
427 })?;
428 }
429 Ok(())
430 }
431
432 #[cfg(feature = "defi")]
433 fn dispatch_on_pool_swap(&mut self, swap: PoolSwap) -> PyResult<()> {
434 if let Some(ref py_self) = self.py_self {
435 Python::attach(|py| {
436 py_self.call_method1(py, "on_pool_swap", (swap.into_py_any_unwrap(py),))
437 })?;
438 }
439 Ok(())
440 }
441
442 #[cfg(feature = "defi")]
443 fn dispatch_on_pool_liquidity_update(&mut self, update: PoolLiquidityUpdate) -> PyResult<()> {
444 if let Some(ref py_self) = self.py_self {
445 Python::attach(|py| {
446 py_self.call_method1(
447 py,
448 "on_pool_liquidity_update",
449 (update.into_py_any_unwrap(py),),
450 )
451 })?;
452 }
453 Ok(())
454 }
455
456 #[cfg(feature = "defi")]
457 fn dispatch_on_pool_fee_collect(&mut self, collect: PoolFeeCollect) -> PyResult<()> {
458 if let Some(ref py_self) = self.py_self {
459 Python::attach(|py| {
460 py_self.call_method1(py, "on_pool_fee_collect", (collect.into_py_any_unwrap(py),))
461 })?;
462 }
463 Ok(())
464 }
465
466 #[cfg(feature = "defi")]
467 fn dispatch_on_pool_flash(&mut self, flash: PoolFlash) -> PyResult<()> {
468 if let Some(ref py_self) = self.py_self {
469 Python::attach(|py| {
470 py_self.call_method1(py, "on_pool_flash", (flash.into_py_any_unwrap(py),))
471 })?;
472 }
473 Ok(())
474 }
475}
476
477#[allow(non_camel_case_types)]
483#[pyo3::pyclass(
484 module = "nautilus_trader.common",
485 name = "DataActor",
486 unsendable,
487 subclass
488)]
489pub struct PyDataActor {
490 inner: Rc<UnsafeCell<PyDataActorInner>>,
491}
492
493impl Debug for PyDataActor {
494 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
495 f.debug_struct(stringify!(PyDataActor))
496 .field("inner", &self.inner())
497 .finish()
498 }
499}
500
501impl PyDataActor {
502 #[inline]
509 #[allow(unsafe_code)]
510 pub(crate) fn inner(&self) -> &PyDataActorInner {
511 unsafe { &*self.inner.get() }
512 }
513
514 #[inline]
521 #[allow(unsafe_code, clippy::mut_from_ref)]
522 pub(crate) fn inner_mut(&self) -> &mut PyDataActorInner {
523 unsafe { &mut *self.inner.get() }
524 }
525}
526
527impl Deref for PyDataActor {
528 type Target = DataActorCore;
529
530 fn deref(&self) -> &Self::Target {
531 &self.inner().core
532 }
533}
534
535impl DerefMut for PyDataActor {
536 fn deref_mut(&mut self) -> &mut Self::Target {
537 &mut self.inner_mut().core
538 }
539}
540
541impl PyDataActor {
542 pub fn new(config: Option<DataActorConfig>) -> Self {
544 let config = config.unwrap_or_default();
545 let core = DataActorCore::new(config);
546 let clock = PyClock::new_test(); let logger = PyLogger::new(core.actor_id().as_str());
548
549 let inner = PyDataActorInner {
550 core,
551 py_self: None,
552 clock,
553 logger,
554 };
555
556 Self {
557 inner: Rc::new(UnsafeCell::new(inner)),
558 }
559 }
560
561 pub fn set_python_instance(&mut self, py_obj: Py<PyAny>) {
568 self.inner_mut().py_self = Some(py_obj);
569 }
570
571 pub fn set_actor_id(&mut self, actor_id: ActorId) {
580 let inner = self.inner_mut();
581 inner.core.config.actor_id = Some(actor_id);
582 inner.core.actor_id = actor_id;
583 }
584
585 pub fn set_log_events(&mut self, log_events: bool) {
587 self.inner_mut().core.config.log_events = log_events;
588 }
589
590 pub fn set_log_commands(&mut self, log_commands: bool) {
592 self.inner_mut().core.config.log_commands = log_commands;
593 }
594
595 pub fn mem_address(&self) -> String {
597 self.inner().core.mem_address()
598 }
599
600 pub fn is_registered(&self) -> bool {
602 self.inner().core.is_registered()
603 }
604
605 pub fn register(
611 &mut self,
612 trader_id: TraderId,
613 clock: Rc<RefCell<dyn Clock>>,
614 cache: Rc<RefCell<Cache>>,
615 ) -> anyhow::Result<()> {
616 let inner = self.inner_mut();
617 inner.core.register(trader_id, clock, cache)?;
618
619 inner.clock = PyClock::from_rc(inner.core.clock_rc());
620
621 let actor_id = inner.actor_id().inner();
623 let callback = TimeEventCallback::from(move |event: TimeEvent| {
624 if let Some(mut actor) = try_get_actor_unchecked::<PyDataActorInner>(&actor_id) {
625 if let Err(e) = actor.on_time_event(&event) {
626 log::error!("Python time event handler failed for actor {actor_id}: {e}");
627 }
628 } else {
629 log::error!("Actor {actor_id} not found for time event handling");
630 }
631 });
632
633 inner.clock.inner_mut().register_default_handler(callback);
634
635 inner.initialize()
636 }
637
638 pub fn register_in_global_registries(&self) {
643 let inner = self.inner();
644 let component_id = inner.component_id().inner();
645 let actor_id = Actor::id(inner);
646
647 let inner_ref: Rc<UnsafeCell<PyDataActorInner>> = self.inner.clone();
648
649 let component_trait_ref: Rc<UnsafeCell<dyn Component>> = inner_ref.clone();
650 get_component_registry().insert(component_id, component_trait_ref);
651
652 let actor_trait_ref: Rc<UnsafeCell<dyn Actor>> = inner_ref;
653 get_actor_registry().insert(actor_id, actor_trait_ref);
654 }
655}
656
657impl DataActor for PyDataActorInner {
658 fn on_start(&mut self) -> anyhow::Result<()> {
659 self.dispatch_on_start()
660 .map_err(|e| anyhow::anyhow!("Python on_start failed: {e}"))
661 }
662
663 fn on_stop(&mut self) -> anyhow::Result<()> {
664 self.dispatch_on_stop()
665 .map_err(|e| anyhow::anyhow!("Python on_stop failed: {e}"))
666 }
667
668 fn on_resume(&mut self) -> anyhow::Result<()> {
669 self.dispatch_on_resume()
670 .map_err(|e| anyhow::anyhow!("Python on_resume failed: {e}"))
671 }
672
673 fn on_reset(&mut self) -> anyhow::Result<()> {
674 self.dispatch_on_reset()
675 .map_err(|e| anyhow::anyhow!("Python on_reset failed: {e}"))
676 }
677
678 fn on_dispose(&mut self) -> anyhow::Result<()> {
679 self.dispatch_on_dispose()
680 .map_err(|e| anyhow::anyhow!("Python on_dispose failed: {e}"))
681 }
682
683 fn on_degrade(&mut self) -> anyhow::Result<()> {
684 self.dispatch_on_degrade()
685 .map_err(|e| anyhow::anyhow!("Python on_degrade failed: {e}"))
686 }
687
688 fn on_fault(&mut self) -> anyhow::Result<()> {
689 self.dispatch_on_fault()
690 .map_err(|e| anyhow::anyhow!("Python on_fault failed: {e}"))
691 }
692
693 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
694 self.dispatch_on_time_event(event.clone())
695 .map_err(|e| anyhow::anyhow!("Python on_time_event failed: {e}"))
696 }
697
698 #[allow(unused_variables)]
699 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
700 Python::attach(|py| {
701 let py_data = py.None();
704
705 self.dispatch_on_data(py_data)
706 .map_err(|e| anyhow::anyhow!("Python on_data failed: {e}"))
707 })
708 }
709
710 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
711 self.dispatch_on_signal(signal)
712 .map_err(|e| anyhow::anyhow!("Python on_signal failed: {e}"))
713 }
714
715 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
716 Python::attach(|py| {
717 let py_instrument = instrument_any_to_pyobject(py, instrument.clone())
718 .map_err(|e| anyhow::anyhow!("Failed to convert InstrumentAny to Python: {e}"))?;
719 self.dispatch_on_instrument(py_instrument)
720 .map_err(|e| anyhow::anyhow!("Python on_instrument failed: {e}"))
721 })
722 }
723
724 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
725 self.dispatch_on_quote(*quote)
726 .map_err(|e| anyhow::anyhow!("Python on_quote failed: {e}"))
727 }
728
729 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
730 self.dispatch_on_trade(*tick)
731 .map_err(|e| anyhow::anyhow!("Python on_trade failed: {e}"))
732 }
733
734 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
735 self.dispatch_on_bar(*bar)
736 .map_err(|e| anyhow::anyhow!("Python on_bar failed: {e}"))
737 }
738
739 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
740 self.dispatch_on_book_deltas(deltas.clone())
741 .map_err(|e| anyhow::anyhow!("Python on_book_deltas failed: {e}"))
742 }
743
744 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
745 self.dispatch_on_book(order_book)
746 .map_err(|e| anyhow::anyhow!("Python on_book failed: {e}"))
747 }
748
749 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
750 self.dispatch_on_mark_price(*mark_price)
751 .map_err(|e| anyhow::anyhow!("Python on_mark_price failed: {e}"))
752 }
753
754 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
755 self.dispatch_on_index_price(*index_price)
756 .map_err(|e| anyhow::anyhow!("Python on_index_price failed: {e}"))
757 }
758
759 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
760 self.dispatch_on_funding_rate(*funding_rate)
761 .map_err(|e| anyhow::anyhow!("Python on_funding_rate failed: {e}"))
762 }
763
764 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
765 self.dispatch_on_instrument_status(*data)
766 .map_err(|e| anyhow::anyhow!("Python on_instrument_status failed: {e}"))
767 }
768
769 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
770 self.dispatch_on_instrument_close(*update)
771 .map_err(|e| anyhow::anyhow!("Python on_instrument_close failed: {e}"))
772 }
773
774 #[cfg(feature = "defi")]
775 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
776 self.dispatch_on_block(block.clone())
777 .map_err(|e| anyhow::anyhow!("Python on_block failed: {e}"))
778 }
779
780 #[cfg(feature = "defi")]
781 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
782 self.dispatch_on_pool(pool.clone())
783 .map_err(|e| anyhow::anyhow!("Python on_pool failed: {e}"))
784 }
785
786 #[cfg(feature = "defi")]
787 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
788 self.dispatch_on_pool_swap(swap.clone())
789 .map_err(|e| anyhow::anyhow!("Python on_pool_swap failed: {e}"))
790 }
791
792 #[cfg(feature = "defi")]
793 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
794 self.dispatch_on_pool_liquidity_update(update.clone())
795 .map_err(|e| anyhow::anyhow!("Python on_pool_liquidity_update failed: {e}"))
796 }
797
798 #[cfg(feature = "defi")]
799 fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
800 self.dispatch_on_pool_fee_collect(collect.clone())
801 .map_err(|e| anyhow::anyhow!("Python on_pool_fee_collect failed: {e}"))
802 }
803
804 #[cfg(feature = "defi")]
805 fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
806 self.dispatch_on_pool_flash(flash.clone())
807 .map_err(|e| anyhow::anyhow!("Python on_pool_flash failed: {e}"))
808 }
809
810 fn on_historical_data(&mut self, _data: &dyn Any) -> anyhow::Result<()> {
811 Python::attach(|py| {
812 let py_data = py.None();
813 self.dispatch_on_historical_data(py_data)
814 .map_err(|e| anyhow::anyhow!("Python on_historical_data failed: {e}"))
815 })
816 }
817
818 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
819 self.dispatch_on_historical_quotes(quotes.to_vec())
820 .map_err(|e| anyhow::anyhow!("Python on_historical_quotes failed: {e}"))
821 }
822
823 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
824 self.dispatch_on_historical_trades(trades.to_vec())
825 .map_err(|e| anyhow::anyhow!("Python on_historical_trades failed: {e}"))
826 }
827
828 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
829 self.dispatch_on_historical_bars(bars.to_vec())
830 .map_err(|e| anyhow::anyhow!("Python on_historical_bars failed: {e}"))
831 }
832
833 fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
834 self.dispatch_on_historical_mark_prices(mark_prices.to_vec())
835 .map_err(|e| anyhow::anyhow!("Python on_historical_mark_prices failed: {e}"))
836 }
837
838 fn on_historical_index_prices(
839 &mut self,
840 index_prices: &[IndexPriceUpdate],
841 ) -> anyhow::Result<()> {
842 self.dispatch_on_historical_index_prices(index_prices.to_vec())
843 .map_err(|e| anyhow::anyhow!("Python on_historical_index_prices failed: {e}"))
844 }
845}
846
847#[pymethods]
848impl PyDataActor {
849 #[new]
850 #[pyo3(signature = (config=None))]
851 fn py_new(config: Option<DataActorConfig>) -> PyResult<Self> {
852 Ok(Self::new(config))
853 }
854
855 #[getter]
856 #[pyo3(name = "clock")]
857 fn py_clock(&self) -> PyResult<PyClock> {
858 let inner = self.inner();
859 if inner.core.is_registered() {
860 Ok(inner.clock.clone())
861 } else {
862 Err(to_pyruntime_err(
863 "Actor must be registered with a trader before accessing clock",
864 ))
865 }
866 }
867
868 #[getter]
869 #[pyo3(name = "cache")]
870 fn py_cache(&self) -> PyResult<PyCache> {
871 let inner = self.inner();
872 if inner.core.is_registered() {
873 Ok(PyCache::from_rc(inner.core.cache_rc()))
874 } else {
875 Err(to_pyruntime_err(
876 "Actor must be registered with a trader before accessing cache",
877 ))
878 }
879 }
880
881 #[getter]
882 #[pyo3(name = "log")]
883 fn py_log(&self) -> PyLogger {
884 self.inner().logger.clone()
885 }
886
887 #[getter]
888 #[pyo3(name = "actor_id")]
889 fn py_actor_id(&self) -> ActorId {
890 self.inner().core.actor_id
891 }
892
893 #[getter]
894 #[pyo3(name = "trader_id")]
895 fn py_trader_id(&self) -> Option<TraderId> {
896 self.inner().core.trader_id()
897 }
898
899 #[pyo3(name = "state")]
900 fn py_state(&self) -> ComponentState {
901 Component::state(self.inner())
902 }
903
904 #[pyo3(name = "is_ready")]
905 fn py_is_ready(&self) -> bool {
906 Component::is_ready(self.inner())
907 }
908
909 #[pyo3(name = "is_running")]
910 fn py_is_running(&self) -> bool {
911 Component::is_running(self.inner())
912 }
913
914 #[pyo3(name = "is_stopped")]
915 fn py_is_stopped(&self) -> bool {
916 Component::is_stopped(self.inner())
917 }
918
919 #[pyo3(name = "is_degraded")]
920 fn py_is_degraded(&self) -> bool {
921 Component::is_degraded(self.inner())
922 }
923
924 #[pyo3(name = "is_faulted")]
925 fn py_is_faulted(&self) -> bool {
926 Component::is_faulted(self.inner())
927 }
928
929 #[pyo3(name = "is_disposed")]
930 fn py_is_disposed(&self) -> bool {
931 Component::is_disposed(self.inner())
932 }
933
934 #[pyo3(name = "start")]
935 fn py_start(&mut self) -> PyResult<()> {
936 Component::start(self.inner_mut()).map_err(to_pyruntime_err)
937 }
938
939 #[pyo3(name = "stop")]
940 fn py_stop(&mut self) -> PyResult<()> {
941 Component::stop(self.inner_mut()).map_err(to_pyruntime_err)
942 }
943
944 #[pyo3(name = "resume")]
945 fn py_resume(&mut self) -> PyResult<()> {
946 Component::resume(self.inner_mut()).map_err(to_pyruntime_err)
947 }
948
949 #[pyo3(name = "reset")]
950 fn py_reset(&mut self) -> PyResult<()> {
951 Component::reset(self.inner_mut()).map_err(to_pyruntime_err)
952 }
953
954 #[pyo3(name = "dispose")]
955 fn py_dispose(&mut self) -> PyResult<()> {
956 Component::dispose(self.inner_mut()).map_err(to_pyruntime_err)
957 }
958
959 #[pyo3(name = "degrade")]
960 fn py_degrade(&mut self) -> PyResult<()> {
961 Component::degrade(self.inner_mut()).map_err(to_pyruntime_err)
962 }
963
964 #[pyo3(name = "fault")]
965 fn py_fault(&mut self) -> PyResult<()> {
966 Component::fault(self.inner_mut()).map_err(to_pyruntime_err)
967 }
968
969 #[pyo3(name = "shutdown_system")]
970 #[pyo3(signature = (reason=None))]
971 fn py_shutdown_system(&self, reason: Option<String>) -> PyResult<()> {
972 self.inner().core.shutdown_system(reason);
973 Ok(())
974 }
975
976 #[pyo3(name = "on_start")]
977 fn py_on_start(&self) -> PyResult<()> {
978 self.inner().dispatch_on_start()
979 }
980
981 #[pyo3(name = "on_stop")]
982 fn py_on_stop(&mut self) -> PyResult<()> {
983 self.inner_mut().dispatch_on_stop()
984 }
985
986 #[pyo3(name = "on_resume")]
987 fn py_on_resume(&mut self) -> PyResult<()> {
988 self.inner_mut().dispatch_on_resume()
989 }
990
991 #[pyo3(name = "on_reset")]
992 fn py_on_reset(&mut self) -> PyResult<()> {
993 self.inner_mut().dispatch_on_reset()
994 }
995
996 #[pyo3(name = "on_dispose")]
997 fn py_on_dispose(&mut self) -> PyResult<()> {
998 self.inner_mut().dispatch_on_dispose()
999 }
1000
1001 #[pyo3(name = "on_degrade")]
1002 fn py_on_degrade(&mut self) -> PyResult<()> {
1003 self.inner_mut().dispatch_on_degrade()
1004 }
1005
1006 #[pyo3(name = "on_fault")]
1007 fn py_on_fault(&mut self) -> PyResult<()> {
1008 self.inner_mut().dispatch_on_fault()
1009 }
1010
1011 #[pyo3(name = "on_time_event")]
1012 fn py_on_time_event(&mut self, event: TimeEvent) -> PyResult<()> {
1013 self.inner_mut().dispatch_on_time_event(event)
1014 }
1015
1016 #[pyo3(name = "on_data")]
1017 fn py_on_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
1018 self.inner_mut().dispatch_on_data(data)
1019 }
1020
1021 #[pyo3(name = "on_signal")]
1022 fn py_on_signal(&mut self, signal: &Signal) -> PyResult<()> {
1023 self.inner_mut().dispatch_on_signal(signal)
1024 }
1025
1026 #[pyo3(name = "on_instrument")]
1027 fn py_on_instrument(&mut self, instrument: Py<PyAny>) -> PyResult<()> {
1028 self.inner_mut().dispatch_on_instrument(instrument)
1029 }
1030
1031 #[pyo3(name = "on_quote")]
1032 fn py_on_quote(&mut self, quote: QuoteTick) -> PyResult<()> {
1033 self.inner_mut().dispatch_on_quote(quote)
1034 }
1035
1036 #[pyo3(name = "on_trade")]
1037 fn py_on_trade(&mut self, trade: TradeTick) -> PyResult<()> {
1038 self.inner_mut().dispatch_on_trade(trade)
1039 }
1040
1041 #[pyo3(name = "on_bar")]
1042 fn py_on_bar(&mut self, bar: Bar) -> PyResult<()> {
1043 self.inner_mut().dispatch_on_bar(bar)
1044 }
1045
1046 #[pyo3(name = "on_book_deltas")]
1047 fn py_on_book_deltas(&mut self, deltas: OrderBookDeltas) -> PyResult<()> {
1048 self.inner_mut().dispatch_on_book_deltas(deltas)
1049 }
1050
1051 #[pyo3(name = "on_book")]
1052 fn py_on_book(&mut self, book: &OrderBook) -> PyResult<()> {
1053 self.inner_mut().dispatch_on_book(book)
1054 }
1055
1056 #[pyo3(name = "on_mark_price")]
1057 fn py_on_mark_price(&mut self, mark_price: MarkPriceUpdate) -> PyResult<()> {
1058 self.inner_mut().dispatch_on_mark_price(mark_price)
1059 }
1060
1061 #[pyo3(name = "on_index_price")]
1062 fn py_on_index_price(&mut self, index_price: IndexPriceUpdate) -> PyResult<()> {
1063 self.inner_mut().dispatch_on_index_price(index_price)
1064 }
1065
1066 #[pyo3(name = "on_funding_rate")]
1067 fn py_on_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> PyResult<()> {
1068 self.inner_mut().dispatch_on_funding_rate(funding_rate)
1069 }
1070
1071 #[pyo3(name = "on_instrument_status")]
1072 fn py_on_instrument_status(&mut self, status: InstrumentStatus) -> PyResult<()> {
1073 self.inner_mut().dispatch_on_instrument_status(status)
1074 }
1075
1076 #[pyo3(name = "on_instrument_close")]
1077 fn py_on_instrument_close(&mut self, close: InstrumentClose) -> PyResult<()> {
1078 self.inner_mut().dispatch_on_instrument_close(close)
1079 }
1080
1081 #[cfg(feature = "defi")]
1082 #[pyo3(name = "on_block")]
1083 fn py_on_block(&mut self, block: Block) -> PyResult<()> {
1084 self.inner_mut().dispatch_on_block(block)
1085 }
1086
1087 #[cfg(feature = "defi")]
1088 #[pyo3(name = "on_pool")]
1089 fn py_on_pool(&mut self, pool: Pool) -> PyResult<()> {
1090 self.inner_mut().dispatch_on_pool(pool)
1091 }
1092
1093 #[cfg(feature = "defi")]
1094 #[pyo3(name = "on_pool_swap")]
1095 fn py_on_pool_swap(&mut self, swap: PoolSwap) -> PyResult<()> {
1096 self.inner_mut().dispatch_on_pool_swap(swap)
1097 }
1098
1099 #[cfg(feature = "defi")]
1100 #[pyo3(name = "on_pool_liquidity_update")]
1101 fn py_on_pool_liquidity_update(&mut self, update: PoolLiquidityUpdate) -> PyResult<()> {
1102 self.inner_mut().dispatch_on_pool_liquidity_update(update)
1103 }
1104
1105 #[cfg(feature = "defi")]
1106 #[pyo3(name = "on_pool_fee_collect")]
1107 fn py_on_pool_fee_collect(&mut self, update: PoolFeeCollect) -> PyResult<()> {
1108 self.inner_mut().dispatch_on_pool_fee_collect(update)
1109 }
1110
1111 #[cfg(feature = "defi")]
1112 #[pyo3(name = "on_pool_flash")]
1113 fn py_on_pool_flash(&mut self, flash: PoolFlash) -> PyResult<()> {
1114 self.inner_mut().dispatch_on_pool_flash(flash)
1115 }
1116
1117 #[pyo3(name = "subscribe_data")]
1118 #[pyo3(signature = (data_type, client_id=None, params=None))]
1119 fn py_subscribe_data(
1120 &mut self,
1121 data_type: DataType,
1122 client_id: Option<ClientId>,
1123 params: Option<IndexMap<String, String>>,
1124 ) -> PyResult<()> {
1125 DataActor::subscribe_data(self.inner_mut(), data_type, client_id, params);
1126 Ok(())
1127 }
1128
1129 #[pyo3(name = "subscribe_instruments")]
1130 #[pyo3(signature = (venue, client_id=None, params=None))]
1131 fn py_subscribe_instruments(
1132 &mut self,
1133 venue: Venue,
1134 client_id: Option<ClientId>,
1135 params: Option<IndexMap<String, String>>,
1136 ) -> PyResult<()> {
1137 DataActor::subscribe_instruments(self.inner_mut(), venue, client_id, params);
1138 Ok(())
1139 }
1140
1141 #[pyo3(name = "subscribe_instrument")]
1142 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1143 fn py_subscribe_instrument(
1144 &mut self,
1145 instrument_id: InstrumentId,
1146 client_id: Option<ClientId>,
1147 params: Option<IndexMap<String, String>>,
1148 ) -> PyResult<()> {
1149 DataActor::subscribe_instrument(self.inner_mut(), instrument_id, client_id, params);
1150 Ok(())
1151 }
1152
1153 #[pyo3(name = "subscribe_book_deltas")]
1154 #[pyo3(signature = (instrument_id, book_type, depth=None, client_id=None, managed=false, params=None))]
1155 fn py_subscribe_book_deltas(
1156 &mut self,
1157 instrument_id: InstrumentId,
1158 book_type: BookType,
1159 depth: Option<usize>,
1160 client_id: Option<ClientId>,
1161 managed: bool,
1162 params: Option<IndexMap<String, String>>,
1163 ) -> PyResult<()> {
1164 let depth = depth.and_then(NonZeroUsize::new);
1165 DataActor::subscribe_book_deltas(
1166 self.inner_mut(),
1167 instrument_id,
1168 book_type,
1169 depth,
1170 client_id,
1171 managed,
1172 params,
1173 );
1174 Ok(())
1175 }
1176
1177 #[pyo3(name = "subscribe_book_at_interval")]
1178 #[pyo3(signature = (instrument_id, book_type, interval_ms, depth=None, client_id=None, params=None))]
1179 fn py_subscribe_book_at_interval(
1180 &mut self,
1181 instrument_id: InstrumentId,
1182 book_type: BookType,
1183 interval_ms: usize,
1184 depth: Option<usize>,
1185 client_id: Option<ClientId>,
1186 params: Option<IndexMap<String, String>>,
1187 ) -> PyResult<()> {
1188 let depth = depth.and_then(NonZeroUsize::new);
1189 let interval_ms = NonZeroUsize::new(interval_ms)
1190 .ok_or_else(|| to_pyvalue_err("interval_ms must be > 0"))?;
1191
1192 DataActor::subscribe_book_at_interval(
1193 self.inner_mut(),
1194 instrument_id,
1195 book_type,
1196 depth,
1197 interval_ms,
1198 client_id,
1199 params,
1200 );
1201 Ok(())
1202 }
1203
1204 #[pyo3(name = "subscribe_quotes")]
1205 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1206 fn py_subscribe_quotes(
1207 &mut self,
1208 instrument_id: InstrumentId,
1209 client_id: Option<ClientId>,
1210 params: Option<IndexMap<String, String>>,
1211 ) -> PyResult<()> {
1212 DataActor::subscribe_quotes(self.inner_mut(), instrument_id, client_id, params);
1213 Ok(())
1214 }
1215
1216 #[pyo3(name = "subscribe_trades")]
1217 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1218 fn py_subscribe_trades(
1219 &mut self,
1220 instrument_id: InstrumentId,
1221 client_id: Option<ClientId>,
1222 params: Option<IndexMap<String, String>>,
1223 ) -> PyResult<()> {
1224 DataActor::subscribe_trades(self.inner_mut(), instrument_id, client_id, params);
1225 Ok(())
1226 }
1227
1228 #[pyo3(name = "subscribe_bars")]
1229 #[pyo3(signature = (bar_type, client_id=None, params=None))]
1230 fn py_subscribe_bars(
1231 &mut self,
1232 bar_type: BarType,
1233 client_id: Option<ClientId>,
1234 params: Option<IndexMap<String, String>>,
1235 ) -> PyResult<()> {
1236 DataActor::subscribe_bars(self.inner_mut(), bar_type, client_id, params);
1237 Ok(())
1238 }
1239
1240 #[pyo3(name = "subscribe_mark_prices")]
1241 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1242 fn py_subscribe_mark_prices(
1243 &mut self,
1244 instrument_id: InstrumentId,
1245 client_id: Option<ClientId>,
1246 params: Option<IndexMap<String, String>>,
1247 ) -> PyResult<()> {
1248 DataActor::subscribe_mark_prices(self.inner_mut(), instrument_id, client_id, params);
1249 Ok(())
1250 }
1251
1252 #[pyo3(name = "subscribe_index_prices")]
1253 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1254 fn py_subscribe_index_prices(
1255 &mut self,
1256 instrument_id: InstrumentId,
1257 client_id: Option<ClientId>,
1258 params: Option<IndexMap<String, String>>,
1259 ) -> PyResult<()> {
1260 DataActor::subscribe_index_prices(self.inner_mut(), instrument_id, client_id, params);
1261 Ok(())
1262 }
1263
1264 #[pyo3(name = "subscribe_instrument_status")]
1265 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1266 fn py_subscribe_instrument_status(
1267 &mut self,
1268 instrument_id: InstrumentId,
1269 client_id: Option<ClientId>,
1270 params: Option<IndexMap<String, String>>,
1271 ) -> PyResult<()> {
1272 DataActor::subscribe_instrument_status(self.inner_mut(), instrument_id, client_id, params);
1273 Ok(())
1274 }
1275
1276 #[pyo3(name = "subscribe_instrument_close")]
1277 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1278 fn py_subscribe_instrument_close(
1279 &mut self,
1280 instrument_id: InstrumentId,
1281 client_id: Option<ClientId>,
1282 params: Option<IndexMap<String, String>>,
1283 ) -> PyResult<()> {
1284 DataActor::subscribe_instrument_close(self.inner_mut(), instrument_id, client_id, params);
1285 Ok(())
1286 }
1287
1288 #[pyo3(name = "subscribe_order_fills")]
1289 #[pyo3(signature = (instrument_id))]
1290 fn py_subscribe_order_fills(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1291 DataActor::subscribe_order_fills(self.inner_mut(), instrument_id);
1292 Ok(())
1293 }
1294
1295 #[pyo3(name = "subscribe_order_cancels")]
1296 #[pyo3(signature = (instrument_id))]
1297 fn py_subscribe_order_cancels(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1298 DataActor::subscribe_order_cancels(self.inner_mut(), instrument_id);
1299 Ok(())
1300 }
1301
1302 #[cfg(feature = "defi")]
1303 #[pyo3(name = "subscribe_blocks")]
1304 #[pyo3(signature = (chain, client_id=None, params=None))]
1305 fn py_subscribe_blocks(
1306 &mut self,
1307 chain: Blockchain,
1308 client_id: Option<ClientId>,
1309 params: Option<IndexMap<String, String>>,
1310 ) -> PyResult<()> {
1311 DataActor::subscribe_blocks(self.inner_mut(), chain, client_id, params);
1312 Ok(())
1313 }
1314
1315 #[cfg(feature = "defi")]
1316 #[pyo3(name = "subscribe_pool")]
1317 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1318 fn py_subscribe_pool(
1319 &mut self,
1320 instrument_id: InstrumentId,
1321 client_id: Option<ClientId>,
1322 params: Option<IndexMap<String, String>>,
1323 ) -> PyResult<()> {
1324 DataActor::subscribe_pool(self.inner_mut(), instrument_id, client_id, params);
1325 Ok(())
1326 }
1327
1328 #[cfg(feature = "defi")]
1329 #[pyo3(name = "subscribe_pool_swaps")]
1330 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1331 fn py_subscribe_pool_swaps(
1332 &mut self,
1333 instrument_id: InstrumentId,
1334 client_id: Option<ClientId>,
1335 params: Option<IndexMap<String, String>>,
1336 ) -> PyResult<()> {
1337 DataActor::subscribe_pool_swaps(self.inner_mut(), instrument_id, client_id, params);
1338 Ok(())
1339 }
1340
1341 #[cfg(feature = "defi")]
1342 #[pyo3(name = "subscribe_pool_liquidity_updates")]
1343 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1344 fn py_subscribe_pool_liquidity_updates(
1345 &mut self,
1346 instrument_id: InstrumentId,
1347 client_id: Option<ClientId>,
1348 params: Option<IndexMap<String, String>>,
1349 ) -> PyResult<()> {
1350 DataActor::subscribe_pool_liquidity_updates(
1351 self.inner_mut(),
1352 instrument_id,
1353 client_id,
1354 params,
1355 );
1356 Ok(())
1357 }
1358
1359 #[cfg(feature = "defi")]
1360 #[pyo3(name = "subscribe_pool_fee_collects")]
1361 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1362 fn py_subscribe_pool_fee_collects(
1363 &mut self,
1364 instrument_id: InstrumentId,
1365 client_id: Option<ClientId>,
1366 params: Option<IndexMap<String, String>>,
1367 ) -> PyResult<()> {
1368 DataActor::subscribe_pool_fee_collects(self.inner_mut(), instrument_id, client_id, params);
1369 Ok(())
1370 }
1371
1372 #[cfg(feature = "defi")]
1373 #[pyo3(name = "subscribe_pool_flash_events")]
1374 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1375 fn py_subscribe_pool_flash_events(
1376 &mut self,
1377 instrument_id: InstrumentId,
1378 client_id: Option<ClientId>,
1379 params: Option<IndexMap<String, String>>,
1380 ) -> PyResult<()> {
1381 DataActor::subscribe_pool_flash_events(self.inner_mut(), instrument_id, client_id, params);
1382 Ok(())
1383 }
1384
1385 #[pyo3(name = "request_data")]
1386 #[pyo3(signature = (data_type, client_id, start=None, end=None, limit=None, params=None))]
1387 fn py_request_data(
1388 &mut self,
1389 data_type: DataType,
1390 client_id: ClientId,
1391 start: Option<u64>,
1392 end: Option<u64>,
1393 limit: Option<usize>,
1394 params: Option<IndexMap<String, String>>,
1395 ) -> PyResult<String> {
1396 let limit = limit.and_then(NonZeroUsize::new);
1397 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1398 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1399
1400 let request_id = DataActor::request_data(
1401 self.inner_mut(),
1402 data_type,
1403 client_id,
1404 start,
1405 end,
1406 limit,
1407 params,
1408 )
1409 .map_err(to_pyvalue_err)?;
1410 Ok(request_id.to_string())
1411 }
1412
1413 #[pyo3(name = "request_instrument")]
1414 #[pyo3(signature = (instrument_id, start=None, end=None, client_id=None, params=None))]
1415 fn py_request_instrument(
1416 &mut self,
1417 instrument_id: InstrumentId,
1418 start: Option<u64>,
1419 end: Option<u64>,
1420 client_id: Option<ClientId>,
1421 params: Option<IndexMap<String, String>>,
1422 ) -> PyResult<String> {
1423 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1424 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1425
1426 let request_id = DataActor::request_instrument(
1427 self.inner_mut(),
1428 instrument_id,
1429 start,
1430 end,
1431 client_id,
1432 params,
1433 )
1434 .map_err(to_pyvalue_err)?;
1435 Ok(request_id.to_string())
1436 }
1437
1438 #[pyo3(name = "request_instruments")]
1439 #[pyo3(signature = (venue=None, start=None, end=None, client_id=None, params=None))]
1440 fn py_request_instruments(
1441 &mut self,
1442 venue: Option<Venue>,
1443 start: Option<u64>,
1444 end: Option<u64>,
1445 client_id: Option<ClientId>,
1446 params: Option<IndexMap<String, String>>,
1447 ) -> PyResult<String> {
1448 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1449 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1450
1451 let request_id =
1452 DataActor::request_instruments(self.inner_mut(), venue, start, end, client_id, params)
1453 .map_err(to_pyvalue_err)?;
1454 Ok(request_id.to_string())
1455 }
1456
1457 #[pyo3(name = "request_book_snapshot")]
1458 #[pyo3(signature = (instrument_id, depth=None, client_id=None, params=None))]
1459 fn py_request_book_snapshot(
1460 &mut self,
1461 instrument_id: InstrumentId,
1462 depth: Option<usize>,
1463 client_id: Option<ClientId>,
1464 params: Option<IndexMap<String, String>>,
1465 ) -> PyResult<String> {
1466 let depth = depth.and_then(NonZeroUsize::new);
1467
1468 let request_id = DataActor::request_book_snapshot(
1469 self.inner_mut(),
1470 instrument_id,
1471 depth,
1472 client_id,
1473 params,
1474 )
1475 .map_err(to_pyvalue_err)?;
1476 Ok(request_id.to_string())
1477 }
1478
1479 #[pyo3(name = "request_quotes")]
1480 #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1481 fn py_request_quotes(
1482 &mut self,
1483 instrument_id: InstrumentId,
1484 start: Option<u64>,
1485 end: Option<u64>,
1486 limit: Option<usize>,
1487 client_id: Option<ClientId>,
1488 params: Option<IndexMap<String, String>>,
1489 ) -> PyResult<String> {
1490 let limit = limit.and_then(NonZeroUsize::new);
1491 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1492 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1493
1494 let request_id = DataActor::request_quotes(
1495 self.inner_mut(),
1496 instrument_id,
1497 start,
1498 end,
1499 limit,
1500 client_id,
1501 params,
1502 )
1503 .map_err(to_pyvalue_err)?;
1504 Ok(request_id.to_string())
1505 }
1506
1507 #[pyo3(name = "request_trades")]
1508 #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1509 fn py_request_trades(
1510 &mut self,
1511 instrument_id: InstrumentId,
1512 start: Option<u64>,
1513 end: Option<u64>,
1514 limit: Option<usize>,
1515 client_id: Option<ClientId>,
1516 params: Option<IndexMap<String, String>>,
1517 ) -> PyResult<String> {
1518 let limit = limit.and_then(NonZeroUsize::new);
1519 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1520 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1521
1522 let request_id = DataActor::request_trades(
1523 self.inner_mut(),
1524 instrument_id,
1525 start,
1526 end,
1527 limit,
1528 client_id,
1529 params,
1530 )
1531 .map_err(to_pyvalue_err)?;
1532 Ok(request_id.to_string())
1533 }
1534
1535 #[pyo3(name = "request_bars")]
1536 #[pyo3(signature = (bar_type, start=None, end=None, limit=None, client_id=None, params=None))]
1537 fn py_request_bars(
1538 &mut self,
1539 bar_type: BarType,
1540 start: Option<u64>,
1541 end: Option<u64>,
1542 limit: Option<usize>,
1543 client_id: Option<ClientId>,
1544 params: Option<IndexMap<String, String>>,
1545 ) -> PyResult<String> {
1546 let limit = limit.and_then(NonZeroUsize::new);
1547 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1548 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1549
1550 let request_id = DataActor::request_bars(
1551 self.inner_mut(),
1552 bar_type,
1553 start,
1554 end,
1555 limit,
1556 client_id,
1557 params,
1558 )
1559 .map_err(to_pyvalue_err)?;
1560 Ok(request_id.to_string())
1561 }
1562
1563 #[pyo3(name = "unsubscribe_data")]
1564 #[pyo3(signature = (data_type, client_id=None, params=None))]
1565 fn py_unsubscribe_data(
1566 &mut self,
1567 data_type: DataType,
1568 client_id: Option<ClientId>,
1569 params: Option<IndexMap<String, String>>,
1570 ) -> PyResult<()> {
1571 DataActor::unsubscribe_data(self.inner_mut(), data_type, client_id, params);
1572 Ok(())
1573 }
1574
1575 #[pyo3(name = "unsubscribe_instruments")]
1576 #[pyo3(signature = (venue, client_id=None, params=None))]
1577 fn py_unsubscribe_instruments(
1578 &mut self,
1579 venue: Venue,
1580 client_id: Option<ClientId>,
1581 params: Option<IndexMap<String, String>>,
1582 ) -> PyResult<()> {
1583 DataActor::unsubscribe_instruments(self.inner_mut(), venue, client_id, params);
1584 Ok(())
1585 }
1586
1587 #[pyo3(name = "unsubscribe_instrument")]
1588 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1589 fn py_unsubscribe_instrument(
1590 &mut self,
1591 instrument_id: InstrumentId,
1592 client_id: Option<ClientId>,
1593 params: Option<IndexMap<String, String>>,
1594 ) -> PyResult<()> {
1595 DataActor::unsubscribe_instrument(self.inner_mut(), instrument_id, client_id, params);
1596 Ok(())
1597 }
1598
1599 #[pyo3(name = "unsubscribe_book_deltas")]
1600 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1601 fn py_unsubscribe_book_deltas(
1602 &mut self,
1603 instrument_id: InstrumentId,
1604 client_id: Option<ClientId>,
1605 params: Option<IndexMap<String, String>>,
1606 ) -> PyResult<()> {
1607 DataActor::unsubscribe_book_deltas(self.inner_mut(), instrument_id, client_id, params);
1608 Ok(())
1609 }
1610
1611 #[pyo3(name = "unsubscribe_book_at_interval")]
1612 #[pyo3(signature = (instrument_id, interval_ms, client_id=None, params=None))]
1613 fn py_unsubscribe_book_at_interval(
1614 &mut self,
1615 instrument_id: InstrumentId,
1616 interval_ms: usize,
1617 client_id: Option<ClientId>,
1618 params: Option<IndexMap<String, String>>,
1619 ) -> PyResult<()> {
1620 let interval_ms = NonZeroUsize::new(interval_ms)
1621 .ok_or_else(|| to_pyvalue_err("interval_ms must be > 0"))?;
1622
1623 DataActor::unsubscribe_book_at_interval(
1624 self.inner_mut(),
1625 instrument_id,
1626 interval_ms,
1627 client_id,
1628 params,
1629 );
1630 Ok(())
1631 }
1632
1633 #[pyo3(name = "unsubscribe_quotes")]
1634 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1635 fn py_unsubscribe_quotes(
1636 &mut self,
1637 instrument_id: InstrumentId,
1638 client_id: Option<ClientId>,
1639 params: Option<IndexMap<String, String>>,
1640 ) -> PyResult<()> {
1641 DataActor::unsubscribe_quotes(self.inner_mut(), instrument_id, client_id, params);
1642 Ok(())
1643 }
1644
1645 #[pyo3(name = "unsubscribe_trades")]
1646 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1647 fn py_unsubscribe_trades(
1648 &mut self,
1649 instrument_id: InstrumentId,
1650 client_id: Option<ClientId>,
1651 params: Option<IndexMap<String, String>>,
1652 ) -> PyResult<()> {
1653 DataActor::unsubscribe_trades(self.inner_mut(), instrument_id, client_id, params);
1654 Ok(())
1655 }
1656
1657 #[pyo3(name = "unsubscribe_bars")]
1658 #[pyo3(signature = (bar_type, client_id=None, params=None))]
1659 fn py_unsubscribe_bars(
1660 &mut self,
1661 bar_type: BarType,
1662 client_id: Option<ClientId>,
1663 params: Option<IndexMap<String, String>>,
1664 ) -> PyResult<()> {
1665 DataActor::unsubscribe_bars(self.inner_mut(), bar_type, client_id, params);
1666 Ok(())
1667 }
1668
1669 #[pyo3(name = "unsubscribe_mark_prices")]
1670 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1671 fn py_unsubscribe_mark_prices(
1672 &mut self,
1673 instrument_id: InstrumentId,
1674 client_id: Option<ClientId>,
1675 params: Option<IndexMap<String, String>>,
1676 ) -> PyResult<()> {
1677 DataActor::unsubscribe_mark_prices(self.inner_mut(), instrument_id, client_id, params);
1678 Ok(())
1679 }
1680
1681 #[pyo3(name = "unsubscribe_index_prices")]
1682 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1683 fn py_unsubscribe_index_prices(
1684 &mut self,
1685 instrument_id: InstrumentId,
1686 client_id: Option<ClientId>,
1687 params: Option<IndexMap<String, String>>,
1688 ) -> PyResult<()> {
1689 DataActor::unsubscribe_index_prices(self.inner_mut(), instrument_id, client_id, params);
1690 Ok(())
1691 }
1692
1693 #[pyo3(name = "unsubscribe_instrument_status")]
1694 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1695 fn py_unsubscribe_instrument_status(
1696 &mut self,
1697 instrument_id: InstrumentId,
1698 client_id: Option<ClientId>,
1699 params: Option<IndexMap<String, String>>,
1700 ) -> PyResult<()> {
1701 DataActor::unsubscribe_instrument_status(
1702 self.inner_mut(),
1703 instrument_id,
1704 client_id,
1705 params,
1706 );
1707 Ok(())
1708 }
1709
1710 #[pyo3(name = "unsubscribe_instrument_close")]
1711 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1712 fn py_unsubscribe_instrument_close(
1713 &mut self,
1714 instrument_id: InstrumentId,
1715 client_id: Option<ClientId>,
1716 params: Option<IndexMap<String, String>>,
1717 ) -> PyResult<()> {
1718 DataActor::unsubscribe_instrument_close(self.inner_mut(), instrument_id, client_id, params);
1719 Ok(())
1720 }
1721
1722 #[pyo3(name = "unsubscribe_order_fills")]
1723 #[pyo3(signature = (instrument_id))]
1724 fn py_unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1725 DataActor::unsubscribe_order_fills(self.inner_mut(), instrument_id);
1726 Ok(())
1727 }
1728
1729 #[pyo3(name = "unsubscribe_order_cancels")]
1730 #[pyo3(signature = (instrument_id))]
1731 fn py_unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1732 DataActor::unsubscribe_order_cancels(self.inner_mut(), instrument_id);
1733 Ok(())
1734 }
1735
1736 #[cfg(feature = "defi")]
1737 #[pyo3(name = "unsubscribe_blocks")]
1738 #[pyo3(signature = (chain, client_id=None, params=None))]
1739 fn py_unsubscribe_blocks(
1740 &mut self,
1741 chain: Blockchain,
1742 client_id: Option<ClientId>,
1743 params: Option<IndexMap<String, String>>,
1744 ) -> PyResult<()> {
1745 DataActor::unsubscribe_blocks(self.inner_mut(), chain, client_id, params);
1746 Ok(())
1747 }
1748
1749 #[cfg(feature = "defi")]
1750 #[pyo3(name = "unsubscribe_pool")]
1751 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1752 fn py_unsubscribe_pool(
1753 &mut self,
1754 instrument_id: InstrumentId,
1755 client_id: Option<ClientId>,
1756 params: Option<IndexMap<String, String>>,
1757 ) -> PyResult<()> {
1758 DataActor::unsubscribe_pool(self.inner_mut(), instrument_id, client_id, params);
1759 Ok(())
1760 }
1761
1762 #[cfg(feature = "defi")]
1763 #[pyo3(name = "unsubscribe_pool_swaps")]
1764 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1765 fn py_unsubscribe_pool_swaps(
1766 &mut self,
1767 instrument_id: InstrumentId,
1768 client_id: Option<ClientId>,
1769 params: Option<IndexMap<String, String>>,
1770 ) -> PyResult<()> {
1771 DataActor::unsubscribe_pool_swaps(self.inner_mut(), instrument_id, client_id, params);
1772 Ok(())
1773 }
1774
1775 #[cfg(feature = "defi")]
1776 #[pyo3(name = "unsubscribe_pool_liquidity_updates")]
1777 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1778 fn py_unsubscribe_pool_liquidity_updates(
1779 &mut self,
1780 instrument_id: InstrumentId,
1781 client_id: Option<ClientId>,
1782 params: Option<IndexMap<String, String>>,
1783 ) -> PyResult<()> {
1784 DataActor::unsubscribe_pool_liquidity_updates(
1785 self.inner_mut(),
1786 instrument_id,
1787 client_id,
1788 params,
1789 );
1790 Ok(())
1791 }
1792
1793 #[cfg(feature = "defi")]
1794 #[pyo3(name = "unsubscribe_pool_fee_collects")]
1795 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1796 fn py_unsubscribe_pool_fee_collects(
1797 &mut self,
1798 instrument_id: InstrumentId,
1799 client_id: Option<ClientId>,
1800 params: Option<IndexMap<String, String>>,
1801 ) -> PyResult<()> {
1802 DataActor::unsubscribe_pool_fee_collects(
1803 self.inner_mut(),
1804 instrument_id,
1805 client_id,
1806 params,
1807 );
1808 Ok(())
1809 }
1810
1811 #[cfg(feature = "defi")]
1812 #[pyo3(name = "unsubscribe_pool_flash_events")]
1813 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1814 fn py_unsubscribe_pool_flash_events(
1815 &mut self,
1816 instrument_id: InstrumentId,
1817 client_id: Option<ClientId>,
1818 params: Option<IndexMap<String, String>>,
1819 ) -> PyResult<()> {
1820 DataActor::unsubscribe_pool_flash_events(
1821 self.inner_mut(),
1822 instrument_id,
1823 client_id,
1824 params,
1825 );
1826 Ok(())
1827 }
1828
1829 #[allow(unused_variables)]
1830 #[pyo3(name = "on_historical_data")]
1831 fn py_on_historical_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
1832 Ok(())
1834 }
1835
1836 #[allow(unused_variables)]
1837 #[pyo3(name = "on_historical_quotes")]
1838 fn py_on_historical_quotes(&mut self, quotes: Vec<QuoteTick>) -> PyResult<()> {
1839 Ok(())
1841 }
1842
1843 #[allow(unused_variables)]
1844 #[pyo3(name = "on_historical_trades")]
1845 fn py_on_historical_trades(&mut self, trades: Vec<TradeTick>) -> PyResult<()> {
1846 Ok(())
1848 }
1849
1850 #[allow(unused_variables)]
1851 #[pyo3(name = "on_historical_bars")]
1852 fn py_on_historical_bars(&mut self, bars: Vec<Bar>) -> PyResult<()> {
1853 Ok(())
1855 }
1856
1857 #[allow(unused_variables)]
1858 #[pyo3(name = "on_historical_mark_prices")]
1859 fn py_on_historical_mark_prices(&mut self, mark_prices: Vec<MarkPriceUpdate>) -> PyResult<()> {
1860 Ok(())
1862 }
1863
1864 #[allow(unused_variables)]
1865 #[pyo3(name = "on_historical_index_prices")]
1866 fn py_on_historical_index_prices(
1867 &mut self,
1868 index_prices: Vec<IndexPriceUpdate>,
1869 ) -> PyResult<()> {
1870 Ok(())
1872 }
1873}
1874
1875#[cfg(test)]
1876mod tests {
1877 use std::{
1878 any::Any,
1879 cell::RefCell,
1880 collections::HashMap,
1881 ops::{Deref, DerefMut},
1882 rc::Rc,
1883 str::FromStr,
1884 sync::{Arc, Mutex},
1885 };
1886
1887 #[cfg(feature = "defi")]
1888 use alloy_primitives::{I256, U160};
1889 use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
1890 #[cfg(feature = "defi")]
1891 use nautilus_model::defi::{
1892 AmmType, Block, Blockchain, Chain, Dex, DexType, Pool, PoolIdentifier, PoolLiquidityUpdate,
1893 PoolSwap, Token,
1894 };
1895 use nautilus_model::{
1896 data::{
1897 Bar, BarType, DataType, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
1898 OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
1899 },
1900 enums::{AggressorSide, BookType, InstrumentCloseType, MarketStatusAction},
1901 identifiers::{ClientId, TradeId, TraderId, Venue},
1902 instruments::{CurrencyPair, InstrumentAny, stubs::audusd_sim},
1903 orderbook::OrderBook,
1904 types::{Price, Quantity},
1905 };
1906 use pyo3::{Py, PyAny, PyResult, Python, ffi::c_str, types::PyAnyMethods};
1907 use rstest::{fixture, rstest};
1908 use ustr::Ustr;
1909
1910 use super::PyDataActor;
1911 use crate::{
1912 actor::{DataActor, data_actor::DataActorCore},
1913 cache::Cache,
1914 clock::TestClock,
1915 component::Component,
1916 enums::ComponentState,
1917 runner::{SyncDataCommandSender, set_data_cmd_sender},
1918 signal::Signal,
1919 timer::TimeEvent,
1920 };
1921
1922 #[fixture]
1923 fn clock() -> Rc<RefCell<TestClock>> {
1924 Rc::new(RefCell::new(TestClock::new()))
1925 }
1926
1927 #[fixture]
1928 fn cache() -> Rc<RefCell<Cache>> {
1929 Rc::new(RefCell::new(Cache::new(None, None)))
1930 }
1931
1932 #[fixture]
1933 fn trader_id() -> TraderId {
1934 TraderId::from("TRADER-001")
1935 }
1936
1937 #[fixture]
1938 fn client_id() -> ClientId {
1939 ClientId::new("TestClient")
1940 }
1941
1942 #[fixture]
1943 fn venue() -> Venue {
1944 Venue::from("SIM")
1945 }
1946
1947 #[fixture]
1948 fn data_type() -> DataType {
1949 DataType::new("TestData", None)
1950 }
1951
1952 #[fixture]
1953 fn bar_type(audusd_sim: CurrencyPair) -> BarType {
1954 BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap()
1955 }
1956
1957 fn create_unregistered_actor() -> PyDataActor {
1958 PyDataActor::new(None)
1959 }
1960
1961 fn create_registered_actor(
1962 clock: Rc<RefCell<TestClock>>,
1963 cache: Rc<RefCell<Cache>>,
1964 trader_id: TraderId,
1965 ) -> PyDataActor {
1966 let sender = SyncDataCommandSender;
1968 set_data_cmd_sender(Arc::new(sender));
1969
1970 let mut actor = PyDataActor::new(None);
1971 actor.register(trader_id, clock, cache).unwrap();
1972 actor
1973 }
1974
1975 #[rstest]
1976 fn test_new_actor_creation() {
1977 let actor = PyDataActor::new(None);
1978 assert!(actor.trader_id().is_none());
1979 }
1980
1981 #[rstest]
1982 fn test_clock_access_before_registration_raises_error() {
1983 let actor = PyDataActor::new(None);
1984
1985 let result = actor.py_clock();
1987 assert!(result.is_err());
1988
1989 let error = result.unwrap_err();
1990 pyo3::Python::initialize();
1991 pyo3::Python::attach(|py| {
1992 assert!(error.is_instance_of::<pyo3::exceptions::PyRuntimeError>(py));
1993 });
1994
1995 let error_msg = error.to_string();
1996 assert!(
1997 error_msg.contains("Actor must be registered with a trader before accessing clock")
1998 );
1999 }
2000
2001 #[rstest]
2002 fn test_unregistered_actor_methods_work() {
2003 let actor = create_unregistered_actor();
2004
2005 assert!(!actor.py_is_ready());
2006 assert!(!actor.py_is_running());
2007 assert!(!actor.py_is_stopped());
2008 assert!(!actor.py_is_disposed());
2009 assert!(!actor.py_is_degraded());
2010 assert!(!actor.py_is_faulted());
2011
2012 assert_eq!(actor.trader_id(), None);
2014 }
2015
2016 #[rstest]
2017 fn test_registration_success(
2018 clock: Rc<RefCell<TestClock>>,
2019 cache: Rc<RefCell<Cache>>,
2020 trader_id: TraderId,
2021 ) {
2022 let mut actor = create_unregistered_actor();
2023 actor.register(trader_id, clock, cache).unwrap();
2024 assert!(actor.trader_id().is_some());
2025 assert_eq!(actor.trader_id().unwrap(), trader_id);
2026 }
2027
2028 #[rstest]
2029 fn test_registered_actor_basic_properties(
2030 clock: Rc<RefCell<TestClock>>,
2031 cache: Rc<RefCell<Cache>>,
2032 trader_id: TraderId,
2033 ) {
2034 let actor = create_registered_actor(clock, cache, trader_id);
2035
2036 assert_eq!(actor.state(), ComponentState::Ready);
2037 assert_eq!(actor.trader_id(), Some(TraderId::from("TRADER-001")));
2038 assert!(actor.py_is_ready());
2039 assert!(!actor.py_is_running());
2040 assert!(!actor.py_is_stopped());
2041 assert!(!actor.py_is_disposed());
2042 assert!(!actor.py_is_degraded());
2043 assert!(!actor.py_is_faulted());
2044 }
2045
2046 #[rstest]
2047 fn test_basic_subscription_methods_compile(
2048 clock: Rc<RefCell<TestClock>>,
2049 cache: Rc<RefCell<Cache>>,
2050 trader_id: TraderId,
2051 data_type: DataType,
2052 client_id: ClientId,
2053 audusd_sim: CurrencyPair,
2054 ) {
2055 let mut actor = create_registered_actor(clock, cache, trader_id);
2056
2057 assert!(
2058 actor
2059 .py_subscribe_data(data_type.clone(), Some(client_id), None)
2060 .is_ok()
2061 );
2062 assert!(
2063 actor
2064 .py_subscribe_quotes(audusd_sim.id, Some(client_id), None)
2065 .is_ok()
2066 );
2067 assert!(
2068 actor
2069 .py_unsubscribe_data(data_type, Some(client_id), None)
2070 .is_ok()
2071 );
2072 assert!(
2073 actor
2074 .py_unsubscribe_quotes(audusd_sim.id, Some(client_id), None)
2075 .is_ok()
2076 );
2077 }
2078
2079 #[rstest]
2080 fn test_shutdown_system_passes_through(
2081 clock: Rc<RefCell<TestClock>>,
2082 cache: Rc<RefCell<Cache>>,
2083 trader_id: TraderId,
2084 ) {
2085 let actor = create_registered_actor(clock, cache, trader_id);
2086
2087 assert!(
2088 actor
2089 .py_shutdown_system(Some("Test shutdown".to_string()))
2090 .is_ok()
2091 );
2092 assert!(actor.py_shutdown_system(None).is_ok());
2093 }
2094
2095 #[rstest]
2096 fn test_book_at_interval_invalid_interval_ms(
2097 clock: Rc<RefCell<TestClock>>,
2098 cache: Rc<RefCell<Cache>>,
2099 trader_id: TraderId,
2100 audusd_sim: CurrencyPair,
2101 ) {
2102 pyo3::Python::initialize();
2103 let mut actor = create_registered_actor(clock, cache, trader_id);
2104
2105 let result = actor.py_subscribe_book_at_interval(
2106 audusd_sim.id,
2107 BookType::L2_MBP,
2108 0,
2109 None,
2110 None,
2111 None,
2112 );
2113 assert!(result.is_err());
2114 assert_eq!(
2115 result.unwrap_err().to_string(),
2116 "ValueError: interval_ms must be > 0"
2117 );
2118
2119 let result = actor.py_unsubscribe_book_at_interval(audusd_sim.id, 0, None, None);
2120 assert!(result.is_err());
2121 assert_eq!(
2122 result.unwrap_err().to_string(),
2123 "ValueError: interval_ms must be > 0"
2124 );
2125 }
2126
2127 #[rstest]
2128 fn test_request_methods_signatures_exist() {
2129 let actor = create_unregistered_actor();
2130 assert!(actor.trader_id().is_none());
2131 }
2132
2133 #[rstest]
2134 fn test_data_actor_trait_implementation(
2135 clock: Rc<RefCell<TestClock>>,
2136 cache: Rc<RefCell<Cache>>,
2137 trader_id: TraderId,
2138 ) {
2139 let actor = create_registered_actor(clock, cache, trader_id);
2140 let state = actor.state();
2141 assert_eq!(state, ComponentState::Ready);
2142 }
2143
2144 static CALL_TRACKER: std::sync::LazyLock<Arc<Mutex<HashMap<String, i32>>>> =
2145 std::sync::LazyLock::new(|| Arc::new(Mutex::new(HashMap::new())));
2146
2147 #[derive(Debug)]
2148 struct TestDataActor {
2149 inner: PyDataActor,
2150 }
2151
2152 impl TestDataActor {
2153 fn new() -> Self {
2154 Self {
2155 inner: PyDataActor::new(None),
2156 }
2157 }
2158
2159 fn track_call(&self, handler_name: &str) {
2160 let mut tracker = CALL_TRACKER.lock().expect(MUTEX_POISONED);
2161 *tracker.entry(handler_name.to_string()).or_insert(0) += 1;
2162 }
2163
2164 fn get_call_count(&self, handler_name: &str) -> i32 {
2165 let tracker = CALL_TRACKER.lock().expect(MUTEX_POISONED);
2166 tracker.get(handler_name).copied().unwrap_or(0)
2167 }
2168
2169 fn reset_tracker(&self) {
2170 let mut tracker = CALL_TRACKER.lock().expect(MUTEX_POISONED);
2171 tracker.clear();
2172 }
2173 }
2174
2175 impl Deref for TestDataActor {
2176 type Target = DataActorCore;
2177 fn deref(&self) -> &Self::Target {
2178 &self.inner.inner().core
2179 }
2180 }
2181
2182 impl DerefMut for TestDataActor {
2183 fn deref_mut(&mut self) -> &mut Self::Target {
2184 &mut self.inner.inner_mut().core
2185 }
2186 }
2187
2188 impl DataActor for TestDataActor {
2189 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
2190 self.track_call("on_time_event");
2191 self.inner.inner_mut().on_time_event(event)
2192 }
2193
2194 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
2195 self.track_call("on_data");
2196 self.inner.inner_mut().on_data(data)
2197 }
2198
2199 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
2200 self.track_call("on_signal");
2201 self.inner.inner_mut().on_signal(signal)
2202 }
2203
2204 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
2205 self.track_call("on_instrument");
2206 self.inner.inner_mut().on_instrument(instrument)
2207 }
2208
2209 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
2210 self.track_call("on_quote");
2211 self.inner.inner_mut().on_quote(quote)
2212 }
2213
2214 fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
2215 self.track_call("on_trade");
2216 self.inner.inner_mut().on_trade(trade)
2217 }
2218
2219 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
2220 self.track_call("on_bar");
2221 self.inner.inner_mut().on_bar(bar)
2222 }
2223
2224 fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
2225 self.track_call("on_book");
2226 self.inner.inner_mut().on_book(book)
2227 }
2228
2229 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
2230 self.track_call("on_book_deltas");
2231 self.inner.inner_mut().on_book_deltas(deltas)
2232 }
2233
2234 fn on_mark_price(&mut self, update: &MarkPriceUpdate) -> anyhow::Result<()> {
2235 self.track_call("on_mark_price");
2236 self.inner.inner_mut().on_mark_price(update)
2237 }
2238
2239 fn on_index_price(&mut self, update: &IndexPriceUpdate) -> anyhow::Result<()> {
2240 self.track_call("on_index_price");
2241 self.inner.inner_mut().on_index_price(update)
2242 }
2243
2244 fn on_instrument_status(&mut self, update: &InstrumentStatus) -> anyhow::Result<()> {
2245 self.track_call("on_instrument_status");
2246 self.inner.inner_mut().on_instrument_status(update)
2247 }
2248
2249 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
2250 self.track_call("on_instrument_close");
2251 self.inner.inner_mut().on_instrument_close(update)
2252 }
2253
2254 #[cfg(feature = "defi")]
2255 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
2256 self.track_call("on_block");
2257 self.inner.inner_mut().on_block(block)
2258 }
2259
2260 #[cfg(feature = "defi")]
2261 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
2262 self.track_call("on_pool");
2263 self.inner.inner_mut().on_pool(pool)
2264 }
2265
2266 #[cfg(feature = "defi")]
2267 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
2268 self.track_call("on_pool_swap");
2269 self.inner.inner_mut().on_pool_swap(swap)
2270 }
2271
2272 #[cfg(feature = "defi")]
2273 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
2274 self.track_call("on_pool_liquidity_update");
2275 self.inner.inner_mut().on_pool_liquidity_update(update)
2276 }
2277 }
2278
2279 #[rstest]
2280 fn test_python_on_signal_handler(
2281 clock: Rc<RefCell<TestClock>>,
2282 cache: Rc<RefCell<Cache>>,
2283 trader_id: TraderId,
2284 ) {
2285 pyo3::Python::initialize();
2286 let mut test_actor = TestDataActor::new();
2287 test_actor.reset_tracker();
2288 test_actor.register(trader_id, clock, cache).unwrap();
2289
2290 let signal = Signal::new(
2291 Ustr::from("test_signal"),
2292 "1.0".to_string(),
2293 UnixNanos::default(),
2294 UnixNanos::default(),
2295 );
2296
2297 assert!(test_actor.on_signal(&signal).is_ok());
2298 assert_eq!(test_actor.get_call_count("on_signal"), 1);
2299 }
2300
2301 #[rstest]
2302 fn test_python_on_data_handler(
2303 clock: Rc<RefCell<TestClock>>,
2304 cache: Rc<RefCell<Cache>>,
2305 trader_id: TraderId,
2306 ) {
2307 pyo3::Python::initialize();
2308 let mut test_actor = TestDataActor::new();
2309 test_actor.reset_tracker();
2310 test_actor.register(trader_id, clock, cache).unwrap();
2311
2312 assert!(test_actor.on_data(&()).is_ok());
2313 assert_eq!(test_actor.get_call_count("on_data"), 1);
2314 }
2315
2316 #[rstest]
2317 fn test_python_on_time_event_handler(
2318 clock: Rc<RefCell<TestClock>>,
2319 cache: Rc<RefCell<Cache>>,
2320 trader_id: TraderId,
2321 ) {
2322 pyo3::Python::initialize();
2323 let mut test_actor = TestDataActor::new();
2324 test_actor.reset_tracker();
2325 test_actor.register(trader_id, clock, cache).unwrap();
2326
2327 let time_event = TimeEvent::new(
2328 Ustr::from("test_timer"),
2329 UUID4::new(),
2330 UnixNanos::default(),
2331 UnixNanos::default(),
2332 );
2333
2334 assert!(test_actor.on_time_event(&time_event).is_ok());
2335 assert_eq!(test_actor.get_call_count("on_time_event"), 1);
2336 }
2337
2338 #[rstest]
2339 fn test_python_on_instrument_handler(
2340 clock: Rc<RefCell<TestClock>>,
2341 cache: Rc<RefCell<Cache>>,
2342 trader_id: TraderId,
2343 audusd_sim: CurrencyPair,
2344 ) {
2345 pyo3::Python::initialize();
2346 let mut rust_actor = PyDataActor::new(None);
2347 rust_actor.register(trader_id, clock, cache).unwrap();
2348
2349 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2350
2351 assert!(rust_actor.inner_mut().on_instrument(&instrument).is_ok());
2352 }
2353
2354 #[rstest]
2355 fn test_python_on_quote_handler(
2356 clock: Rc<RefCell<TestClock>>,
2357 cache: Rc<RefCell<Cache>>,
2358 trader_id: TraderId,
2359 audusd_sim: CurrencyPair,
2360 ) {
2361 pyo3::Python::initialize();
2362 let mut rust_actor = PyDataActor::new(None);
2363 rust_actor.register(trader_id, clock, cache).unwrap();
2364
2365 let quote = QuoteTick::new(
2366 audusd_sim.id,
2367 Price::from("1.0000"),
2368 Price::from("1.0001"),
2369 Quantity::from("100000"),
2370 Quantity::from("100000"),
2371 UnixNanos::default(),
2372 UnixNanos::default(),
2373 );
2374
2375 assert!(rust_actor.inner_mut().on_quote("e).is_ok());
2376 }
2377
2378 #[rstest]
2379 fn test_python_on_trade_handler(
2380 clock: Rc<RefCell<TestClock>>,
2381 cache: Rc<RefCell<Cache>>,
2382 trader_id: TraderId,
2383 audusd_sim: CurrencyPair,
2384 ) {
2385 pyo3::Python::initialize();
2386 let mut rust_actor = PyDataActor::new(None);
2387 rust_actor.register(trader_id, clock, cache).unwrap();
2388
2389 let trade = TradeTick::new(
2390 audusd_sim.id,
2391 Price::from("1.0000"),
2392 Quantity::from("100000"),
2393 AggressorSide::Buyer,
2394 "T123".to_string().into(),
2395 UnixNanos::default(),
2396 UnixNanos::default(),
2397 );
2398
2399 assert!(rust_actor.inner_mut().on_trade(&trade).is_ok());
2400 }
2401
2402 #[rstest]
2403 fn test_python_on_bar_handler(
2404 clock: Rc<RefCell<TestClock>>,
2405 cache: Rc<RefCell<Cache>>,
2406 trader_id: TraderId,
2407 audusd_sim: CurrencyPair,
2408 ) {
2409 pyo3::Python::initialize();
2410 let mut rust_actor = PyDataActor::new(None);
2411 rust_actor.register(trader_id, clock, cache).unwrap();
2412
2413 let bar_type =
2414 BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap();
2415 let bar = Bar::new(
2416 bar_type,
2417 Price::from("1.0000"),
2418 Price::from("1.0001"),
2419 Price::from("0.9999"),
2420 Price::from("1.0000"),
2421 Quantity::from("100000"),
2422 UnixNanos::default(),
2423 UnixNanos::default(),
2424 );
2425
2426 assert!(rust_actor.inner_mut().on_bar(&bar).is_ok());
2427 }
2428
2429 #[rstest]
2430 fn test_python_on_book_handler(
2431 clock: Rc<RefCell<TestClock>>,
2432 cache: Rc<RefCell<Cache>>,
2433 trader_id: TraderId,
2434 audusd_sim: CurrencyPair,
2435 ) {
2436 pyo3::Python::initialize();
2437 let mut rust_actor = PyDataActor::new(None);
2438 rust_actor.register(trader_id, clock, cache).unwrap();
2439
2440 let book = OrderBook::new(audusd_sim.id, BookType::L2_MBP);
2441 assert!(rust_actor.inner_mut().on_book(&book).is_ok());
2442 }
2443
2444 #[rstest]
2445 fn test_python_on_book_deltas_handler(
2446 clock: Rc<RefCell<TestClock>>,
2447 cache: Rc<RefCell<Cache>>,
2448 trader_id: TraderId,
2449 audusd_sim: CurrencyPair,
2450 ) {
2451 pyo3::Python::initialize();
2452 let mut rust_actor = PyDataActor::new(None);
2453 rust_actor.register(trader_id, clock, cache).unwrap();
2454
2455 let delta =
2456 OrderBookDelta::clear(audusd_sim.id, 0, UnixNanos::default(), UnixNanos::default());
2457 let deltas = OrderBookDeltas::new(audusd_sim.id, vec![delta]);
2458
2459 assert!(rust_actor.inner_mut().on_book_deltas(&deltas).is_ok());
2460 }
2461
2462 #[rstest]
2463 fn test_python_on_mark_price_handler(
2464 clock: Rc<RefCell<TestClock>>,
2465 cache: Rc<RefCell<Cache>>,
2466 trader_id: TraderId,
2467 audusd_sim: CurrencyPair,
2468 ) {
2469 pyo3::Python::initialize();
2470 let mut rust_actor = PyDataActor::new(None);
2471 rust_actor.register(trader_id, clock, cache).unwrap();
2472
2473 let mark_price = MarkPriceUpdate::new(
2474 audusd_sim.id,
2475 Price::from("1.0000"),
2476 UnixNanos::default(),
2477 UnixNanos::default(),
2478 );
2479
2480 assert!(rust_actor.inner_mut().on_mark_price(&mark_price).is_ok());
2481 }
2482
2483 #[rstest]
2484 fn test_python_on_index_price_handler(
2485 clock: Rc<RefCell<TestClock>>,
2486 cache: Rc<RefCell<Cache>>,
2487 trader_id: TraderId,
2488 audusd_sim: CurrencyPair,
2489 ) {
2490 pyo3::Python::initialize();
2491 let mut rust_actor = PyDataActor::new(None);
2492 rust_actor.register(trader_id, clock, cache).unwrap();
2493
2494 let index_price = IndexPriceUpdate::new(
2495 audusd_sim.id,
2496 Price::from("1.0000"),
2497 UnixNanos::default(),
2498 UnixNanos::default(),
2499 );
2500
2501 assert!(rust_actor.inner_mut().on_index_price(&index_price).is_ok());
2502 }
2503
2504 #[rstest]
2505 fn test_python_on_instrument_status_handler(
2506 clock: Rc<RefCell<TestClock>>,
2507 cache: Rc<RefCell<Cache>>,
2508 trader_id: TraderId,
2509 audusd_sim: CurrencyPair,
2510 ) {
2511 pyo3::Python::initialize();
2512 let mut rust_actor = PyDataActor::new(None);
2513 rust_actor.register(trader_id, clock, cache).unwrap();
2514
2515 let status = InstrumentStatus::new(
2516 audusd_sim.id,
2517 MarketStatusAction::Trading,
2518 UnixNanos::default(),
2519 UnixNanos::default(),
2520 None,
2521 None,
2522 None,
2523 None,
2524 None,
2525 );
2526
2527 assert!(rust_actor.inner_mut().on_instrument_status(&status).is_ok());
2528 }
2529
2530 #[rstest]
2531 fn test_python_on_instrument_close_handler(
2532 clock: Rc<RefCell<TestClock>>,
2533 cache: Rc<RefCell<Cache>>,
2534 trader_id: TraderId,
2535 audusd_sim: CurrencyPair,
2536 ) {
2537 pyo3::Python::initialize();
2538 let mut rust_actor = PyDataActor::new(None);
2539 rust_actor.register(trader_id, clock, cache).unwrap();
2540
2541 let close = InstrumentClose::new(
2542 audusd_sim.id,
2543 Price::from("1.0000"),
2544 InstrumentCloseType::EndOfSession,
2545 UnixNanos::default(),
2546 UnixNanos::default(),
2547 );
2548
2549 assert!(rust_actor.inner_mut().on_instrument_close(&close).is_ok());
2550 }
2551
2552 #[cfg(feature = "defi")]
2553 #[rstest]
2554 fn test_python_on_block_handler(
2555 clock: Rc<RefCell<TestClock>>,
2556 cache: Rc<RefCell<Cache>>,
2557 trader_id: TraderId,
2558 ) {
2559 pyo3::Python::initialize();
2560 let mut test_actor = TestDataActor::new();
2561 test_actor.reset_tracker();
2562 test_actor.register(trader_id, clock, cache).unwrap();
2563
2564 let block = Block::new(
2565 "0x1234567890abcdef".to_string(),
2566 "0xabcdef1234567890".to_string(),
2567 12345,
2568 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0".into(),
2569 21000,
2570 20000,
2571 UnixNanos::default(),
2572 Some(Blockchain::Ethereum),
2573 );
2574
2575 assert!(test_actor.on_block(&block).is_ok());
2576 assert_eq!(test_actor.get_call_count("on_block"), 1);
2577 }
2578
2579 #[cfg(feature = "defi")]
2580 #[rstest]
2581 fn test_python_on_pool_swap_handler(
2582 clock: Rc<RefCell<TestClock>>,
2583 cache: Rc<RefCell<Cache>>,
2584 trader_id: TraderId,
2585 ) {
2586 pyo3::Python::initialize();
2587 let mut rust_actor = PyDataActor::new(None);
2588 rust_actor.register(trader_id, clock, cache).unwrap();
2589
2590 let chain = Arc::new(Chain::new(Blockchain::Ethereum, 1));
2591 let dex = Arc::new(Dex::new(
2592 Chain::new(Blockchain::Ethereum, 1),
2593 DexType::UniswapV3,
2594 "0x1F98431c8aD98523631AE4a59f267346ea31F984",
2595 0,
2596 AmmType::CLAMM,
2597 "PoolCreated",
2598 "Swap",
2599 "Mint",
2600 "Burn",
2601 "Collect",
2602 ));
2603 let token0 = Token::new(
2604 chain.clone(),
2605 "0xa0b86a33e6441c8c06dd7b111a8c4e82e2b2a5e1"
2606 .parse()
2607 .unwrap(),
2608 "USDC".into(),
2609 "USD Coin".into(),
2610 6,
2611 );
2612 let token1 = Token::new(
2613 chain.clone(),
2614 "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
2615 .parse()
2616 .unwrap(),
2617 "WETH".into(),
2618 "Wrapped Ether".into(),
2619 18,
2620 );
2621 let pool_address = "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
2622 .parse()
2623 .unwrap();
2624 let pool_identifier: PoolIdentifier = "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
2625 .parse()
2626 .unwrap();
2627 let pool = Arc::new(Pool::new(
2628 chain.clone(),
2629 dex.clone(),
2630 pool_address,
2631 pool_identifier,
2632 12345,
2633 token0,
2634 token1,
2635 Some(500),
2636 Some(10),
2637 UnixNanos::default(),
2638 ));
2639
2640 let swap = PoolSwap::new(
2641 chain,
2642 dex,
2643 pool.instrument_id,
2644 pool.pool_identifier,
2645 12345,
2646 "0xabc123".to_string(),
2647 0,
2648 0,
2649 None,
2650 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
2651 .parse()
2652 .unwrap(),
2653 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
2654 .parse()
2655 .unwrap(),
2656 I256::from_str("1000000000000000000").unwrap(),
2657 I256::from_str("400000000000000").unwrap(),
2658 U160::from(59000000000000u128),
2659 1000000,
2660 100,
2661 );
2662
2663 assert!(rust_actor.inner_mut().on_pool_swap(&swap).is_ok());
2664 }
2665
2666 #[cfg(feature = "defi")]
2667 #[rstest]
2668 fn test_python_on_pool_liquidity_update_handler(
2669 clock: Rc<RefCell<TestClock>>,
2670 cache: Rc<RefCell<Cache>>,
2671 trader_id: TraderId,
2672 ) {
2673 pyo3::Python::initialize();
2674 let mut rust_actor = PyDataActor::new(None);
2675 rust_actor.register(trader_id, clock, cache).unwrap();
2676
2677 let block = Block::new(
2678 "0x1234567890abcdef".to_string(),
2679 "0xabcdef1234567890".to_string(),
2680 12345,
2681 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0".into(),
2682 21000,
2683 20000,
2684 UnixNanos::default(),
2685 Some(Blockchain::Ethereum),
2686 );
2687
2688 assert!(rust_actor.inner_mut().on_block(&block).is_ok());
2690 }
2691
2692 const TRACKING_ACTOR_CODE: &std::ffi::CStr = c_str!(
2693 r#"
2694class TrackingActor:
2695 """A mock Python actor that tracks all method calls."""
2696
2697 def __init__(self):
2698 self.calls = []
2699
2700 def _record(self, method_name, *args):
2701 self.calls.append((method_name, args))
2702
2703 def was_called(self, method_name):
2704 return any(call[0] == method_name for call in self.calls)
2705
2706 def call_count(self, method_name):
2707 return sum(1 for call in self.calls if call[0] == method_name)
2708
2709 def on_start(self):
2710 self._record("on_start")
2711
2712 def on_stop(self):
2713 self._record("on_stop")
2714
2715 def on_resume(self):
2716 self._record("on_resume")
2717
2718 def on_reset(self):
2719 self._record("on_reset")
2720
2721 def on_dispose(self):
2722 self._record("on_dispose")
2723
2724 def on_degrade(self):
2725 self._record("on_degrade")
2726
2727 def on_fault(self):
2728 self._record("on_fault")
2729
2730 def on_time_event(self, event):
2731 self._record("on_time_event", event)
2732
2733 def on_data(self, data):
2734 self._record("on_data", data)
2735
2736 def on_signal(self, signal):
2737 self._record("on_signal", signal)
2738
2739 def on_instrument(self, instrument):
2740 self._record("on_instrument", instrument)
2741
2742 def on_quote(self, quote):
2743 self._record("on_quote", quote)
2744
2745 def on_trade(self, trade):
2746 self._record("on_trade", trade)
2747
2748 def on_bar(self, bar):
2749 self._record("on_bar", bar)
2750
2751 def on_book(self, book):
2752 self._record("on_book", book)
2753
2754 def on_book_deltas(self, deltas):
2755 self._record("on_book_deltas", deltas)
2756
2757 def on_mark_price(self, update):
2758 self._record("on_mark_price", update)
2759
2760 def on_index_price(self, update):
2761 self._record("on_index_price", update)
2762
2763 def on_funding_rate(self, update):
2764 self._record("on_funding_rate", update)
2765
2766 def on_instrument_status(self, status):
2767 self._record("on_instrument_status", status)
2768
2769 def on_instrument_close(self, close):
2770 self._record("on_instrument_close", close)
2771
2772 def on_historical_data(self, data):
2773 self._record("on_historical_data", data)
2774
2775 def on_historical_quotes(self, quotes):
2776 self._record("on_historical_quotes", quotes)
2777
2778 def on_historical_trades(self, trades):
2779 self._record("on_historical_trades", trades)
2780
2781 def on_historical_bars(self, bars):
2782 self._record("on_historical_bars", bars)
2783
2784 def on_historical_mark_prices(self, prices):
2785 self._record("on_historical_mark_prices", prices)
2786
2787 def on_historical_index_prices(self, prices):
2788 self._record("on_historical_index_prices", prices)
2789"#
2790 );
2791
2792 fn create_tracking_python_actor(py: Python<'_>) -> PyResult<Py<PyAny>> {
2793 py.run(TRACKING_ACTOR_CODE, None, None)?;
2794 let tracking_actor_class = py.eval(c_str!("TrackingActor"), None, None)?;
2795 let instance = tracking_actor_class.call0()?;
2796 Ok(instance.unbind())
2797 }
2798
2799 fn python_method_was_called(py_actor: &Py<PyAny>, py: Python<'_>, method_name: &str) -> bool {
2800 py_actor
2801 .call_method1(py, "was_called", (method_name,))
2802 .and_then(|r| r.extract::<bool>(py))
2803 .unwrap_or(false)
2804 }
2805
2806 fn python_method_call_count(py_actor: &Py<PyAny>, py: Python<'_>, method_name: &str) -> i32 {
2807 py_actor
2808 .call_method1(py, "call_count", (method_name,))
2809 .and_then(|r| r.extract::<i32>(py))
2810 .unwrap_or(0)
2811 }
2812
2813 #[rstest]
2814 fn test_python_dispatch_on_start(
2815 clock: Rc<RefCell<TestClock>>,
2816 cache: Rc<RefCell<Cache>>,
2817 trader_id: TraderId,
2818 ) {
2819 pyo3::Python::initialize();
2820 Python::attach(|py| {
2821 let py_actor = create_tracking_python_actor(py).unwrap();
2822
2823 let mut rust_actor = PyDataActor::new(None);
2824 rust_actor.set_python_instance(py_actor.clone_ref(py));
2825 rust_actor.register(trader_id, clock, cache).unwrap();
2826
2827 let result = DataActor::on_start(rust_actor.inner_mut());
2828
2829 assert!(result.is_ok());
2830 assert!(python_method_was_called(&py_actor, py, "on_start"));
2831 assert_eq!(python_method_call_count(&py_actor, py, "on_start"), 1);
2832 });
2833 }
2834
2835 #[rstest]
2836 fn test_python_dispatch_on_stop(
2837 clock: Rc<RefCell<TestClock>>,
2838 cache: Rc<RefCell<Cache>>,
2839 trader_id: TraderId,
2840 ) {
2841 pyo3::Python::initialize();
2842 Python::attach(|py| {
2843 let py_actor = create_tracking_python_actor(py).unwrap();
2844
2845 let mut rust_actor = PyDataActor::new(None);
2846 rust_actor.set_python_instance(py_actor.clone_ref(py));
2847 rust_actor.register(trader_id, clock, cache).unwrap();
2848
2849 let result = DataActor::on_stop(rust_actor.inner_mut());
2850
2851 assert!(result.is_ok());
2852 assert!(python_method_was_called(&py_actor, py, "on_stop"));
2853 });
2854 }
2855
2856 #[rstest]
2857 fn test_python_dispatch_on_resume(
2858 clock: Rc<RefCell<TestClock>>,
2859 cache: Rc<RefCell<Cache>>,
2860 trader_id: TraderId,
2861 ) {
2862 pyo3::Python::initialize();
2863 Python::attach(|py| {
2864 let py_actor = create_tracking_python_actor(py).unwrap();
2865
2866 let mut rust_actor = PyDataActor::new(None);
2867 rust_actor.set_python_instance(py_actor.clone_ref(py));
2868 rust_actor.register(trader_id, clock, cache).unwrap();
2869
2870 let result = DataActor::on_resume(rust_actor.inner_mut());
2871
2872 assert!(result.is_ok());
2873 assert!(python_method_was_called(&py_actor, py, "on_resume"));
2874 });
2875 }
2876
2877 #[rstest]
2878 fn test_python_dispatch_on_reset(
2879 clock: Rc<RefCell<TestClock>>,
2880 cache: Rc<RefCell<Cache>>,
2881 trader_id: TraderId,
2882 ) {
2883 pyo3::Python::initialize();
2884 Python::attach(|py| {
2885 let py_actor = create_tracking_python_actor(py).unwrap();
2886
2887 let mut rust_actor = PyDataActor::new(None);
2888 rust_actor.set_python_instance(py_actor.clone_ref(py));
2889 rust_actor.register(trader_id, clock, cache).unwrap();
2890
2891 let result = DataActor::on_reset(rust_actor.inner_mut());
2892
2893 assert!(result.is_ok());
2894 assert!(python_method_was_called(&py_actor, py, "on_reset"));
2895 });
2896 }
2897
2898 #[rstest]
2899 fn test_python_dispatch_on_dispose(
2900 clock: Rc<RefCell<TestClock>>,
2901 cache: Rc<RefCell<Cache>>,
2902 trader_id: TraderId,
2903 ) {
2904 pyo3::Python::initialize();
2905 Python::attach(|py| {
2906 let py_actor = create_tracking_python_actor(py).unwrap();
2907
2908 let mut rust_actor = PyDataActor::new(None);
2909 rust_actor.set_python_instance(py_actor.clone_ref(py));
2910 rust_actor.register(trader_id, clock, cache).unwrap();
2911
2912 let result = DataActor::on_dispose(rust_actor.inner_mut());
2913
2914 assert!(result.is_ok());
2915 assert!(python_method_was_called(&py_actor, py, "on_dispose"));
2916 });
2917 }
2918
2919 #[rstest]
2920 fn test_python_dispatch_on_degrade(
2921 clock: Rc<RefCell<TestClock>>,
2922 cache: Rc<RefCell<Cache>>,
2923 trader_id: TraderId,
2924 ) {
2925 pyo3::Python::initialize();
2926 Python::attach(|py| {
2927 let py_actor = create_tracking_python_actor(py).unwrap();
2928
2929 let mut rust_actor = PyDataActor::new(None);
2930 rust_actor.set_python_instance(py_actor.clone_ref(py));
2931 rust_actor.register(trader_id, clock, cache).unwrap();
2932
2933 let result = DataActor::on_degrade(rust_actor.inner_mut());
2934
2935 assert!(result.is_ok());
2936 assert!(python_method_was_called(&py_actor, py, "on_degrade"));
2937 });
2938 }
2939
2940 #[rstest]
2941 fn test_python_dispatch_on_fault(
2942 clock: Rc<RefCell<TestClock>>,
2943 cache: Rc<RefCell<Cache>>,
2944 trader_id: TraderId,
2945 ) {
2946 pyo3::Python::initialize();
2947 Python::attach(|py| {
2948 let py_actor = create_tracking_python_actor(py).unwrap();
2949
2950 let mut rust_actor = PyDataActor::new(None);
2951 rust_actor.set_python_instance(py_actor.clone_ref(py));
2952 rust_actor.register(trader_id, clock, cache).unwrap();
2953
2954 let result = DataActor::on_fault(rust_actor.inner_mut());
2955
2956 assert!(result.is_ok());
2957 assert!(python_method_was_called(&py_actor, py, "on_fault"));
2958 });
2959 }
2960
2961 #[rstest]
2962 fn test_python_dispatch_on_signal(
2963 clock: Rc<RefCell<TestClock>>,
2964 cache: Rc<RefCell<Cache>>,
2965 trader_id: TraderId,
2966 ) {
2967 pyo3::Python::initialize();
2968 Python::attach(|py| {
2969 let py_actor = create_tracking_python_actor(py).unwrap();
2970
2971 let mut rust_actor = PyDataActor::new(None);
2972 rust_actor.set_python_instance(py_actor.clone_ref(py));
2973 rust_actor.register(trader_id, clock, cache).unwrap();
2974
2975 let signal = Signal::new(
2976 Ustr::from("test_signal"),
2977 "1.0".to_string(),
2978 UnixNanos::default(),
2979 UnixNanos::default(),
2980 );
2981
2982 let result = rust_actor.inner_mut().on_signal(&signal);
2983
2984 assert!(result.is_ok());
2985 assert!(python_method_was_called(&py_actor, py, "on_signal"));
2986 });
2987 }
2988
2989 #[rstest]
2990 fn test_python_dispatch_on_time_event(
2991 clock: Rc<RefCell<TestClock>>,
2992 cache: Rc<RefCell<Cache>>,
2993 trader_id: TraderId,
2994 ) {
2995 pyo3::Python::initialize();
2996 Python::attach(|py| {
2997 let py_actor = create_tracking_python_actor(py).unwrap();
2998
2999 let mut rust_actor = PyDataActor::new(None);
3000 rust_actor.set_python_instance(py_actor.clone_ref(py));
3001 rust_actor.register(trader_id, clock, cache).unwrap();
3002
3003 let time_event = TimeEvent::new(
3004 Ustr::from("test_timer"),
3005 UUID4::new(),
3006 UnixNanos::default(),
3007 UnixNanos::default(),
3008 );
3009
3010 let result = rust_actor.inner_mut().on_time_event(&time_event);
3011
3012 assert!(result.is_ok());
3013 assert!(python_method_was_called(&py_actor, py, "on_time_event"));
3014 });
3015 }
3016
3017 #[rstest]
3018 fn test_python_dispatch_on_instrument(
3019 clock: Rc<RefCell<TestClock>>,
3020 cache: Rc<RefCell<Cache>>,
3021 trader_id: TraderId,
3022 audusd_sim: CurrencyPair,
3023 ) {
3024 pyo3::Python::initialize();
3025 Python::attach(|py| {
3026 let py_actor = create_tracking_python_actor(py).unwrap();
3027
3028 let mut rust_actor = PyDataActor::new(None);
3029 rust_actor.set_python_instance(py_actor.clone_ref(py));
3030 rust_actor.register(trader_id, clock, cache).unwrap();
3031
3032 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3033
3034 let result = rust_actor.inner_mut().on_instrument(&instrument);
3035
3036 assert!(result.is_ok());
3037 assert!(python_method_was_called(&py_actor, py, "on_instrument"));
3038 });
3039 }
3040
3041 #[rstest]
3042 fn test_python_dispatch_on_quote(
3043 clock: Rc<RefCell<TestClock>>,
3044 cache: Rc<RefCell<Cache>>,
3045 trader_id: TraderId,
3046 audusd_sim: CurrencyPair,
3047 ) {
3048 pyo3::Python::initialize();
3049 Python::attach(|py| {
3050 let py_actor = create_tracking_python_actor(py).unwrap();
3051
3052 let mut rust_actor = PyDataActor::new(None);
3053 rust_actor.set_python_instance(py_actor.clone_ref(py));
3054 rust_actor.register(trader_id, clock, cache).unwrap();
3055
3056 let quote = QuoteTick::new(
3057 audusd_sim.id,
3058 Price::from("1.00000"),
3059 Price::from("1.00001"),
3060 Quantity::from(100_000),
3061 Quantity::from(100_000),
3062 UnixNanos::default(),
3063 UnixNanos::default(),
3064 );
3065
3066 let result = rust_actor.inner_mut().on_quote("e);
3067
3068 assert!(result.is_ok());
3069 assert!(python_method_was_called(&py_actor, py, "on_quote"));
3070 });
3071 }
3072
3073 #[rstest]
3074 fn test_python_dispatch_on_trade(
3075 clock: Rc<RefCell<TestClock>>,
3076 cache: Rc<RefCell<Cache>>,
3077 trader_id: TraderId,
3078 audusd_sim: CurrencyPair,
3079 ) {
3080 pyo3::Python::initialize();
3081 Python::attach(|py| {
3082 let py_actor = create_tracking_python_actor(py).unwrap();
3083
3084 let mut rust_actor = PyDataActor::new(None);
3085 rust_actor.set_python_instance(py_actor.clone_ref(py));
3086 rust_actor.register(trader_id, clock, cache).unwrap();
3087
3088 let trade = TradeTick::new(
3089 audusd_sim.id,
3090 Price::from("1.00000"),
3091 Quantity::from(100_000),
3092 AggressorSide::Buyer,
3093 TradeId::new("123456"),
3094 UnixNanos::default(),
3095 UnixNanos::default(),
3096 );
3097
3098 let result = rust_actor.inner_mut().on_trade(&trade);
3099
3100 assert!(result.is_ok());
3101 assert!(python_method_was_called(&py_actor, py, "on_trade"));
3102 });
3103 }
3104
3105 #[rstest]
3106 fn test_python_dispatch_on_bar(
3107 clock: Rc<RefCell<TestClock>>,
3108 cache: Rc<RefCell<Cache>>,
3109 trader_id: TraderId,
3110 audusd_sim: CurrencyPair,
3111 ) {
3112 pyo3::Python::initialize();
3113 Python::attach(|py| {
3114 let py_actor = create_tracking_python_actor(py).unwrap();
3115
3116 let mut rust_actor = PyDataActor::new(None);
3117 rust_actor.set_python_instance(py_actor.clone_ref(py));
3118 rust_actor.register(trader_id, clock, cache).unwrap();
3119
3120 let bar_type =
3121 BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap();
3122 let bar = Bar::new(
3123 bar_type,
3124 Price::from("1.00000"),
3125 Price::from("1.00010"),
3126 Price::from("0.99990"),
3127 Price::from("1.00005"),
3128 Quantity::from(100_000),
3129 UnixNanos::default(),
3130 UnixNanos::default(),
3131 );
3132
3133 let result = rust_actor.inner_mut().on_bar(&bar);
3134
3135 assert!(result.is_ok());
3136 assert!(python_method_was_called(&py_actor, py, "on_bar"));
3137 });
3138 }
3139
3140 #[rstest]
3141 fn test_python_dispatch_on_book(
3142 clock: Rc<RefCell<TestClock>>,
3143 cache: Rc<RefCell<Cache>>,
3144 trader_id: TraderId,
3145 audusd_sim: CurrencyPair,
3146 ) {
3147 pyo3::Python::initialize();
3148 Python::attach(|py| {
3149 let py_actor = create_tracking_python_actor(py).unwrap();
3150
3151 let mut rust_actor = PyDataActor::new(None);
3152 rust_actor.set_python_instance(py_actor.clone_ref(py));
3153 rust_actor.register(trader_id, clock, cache).unwrap();
3154
3155 let book = OrderBook::new(audusd_sim.id, BookType::L2_MBP);
3156
3157 let result = rust_actor.inner_mut().on_book(&book);
3158
3159 assert!(result.is_ok());
3160 assert!(python_method_was_called(&py_actor, py, "on_book"));
3161 });
3162 }
3163
3164 #[rstest]
3165 fn test_python_dispatch_on_book_deltas(
3166 clock: Rc<RefCell<TestClock>>,
3167 cache: Rc<RefCell<Cache>>,
3168 trader_id: TraderId,
3169 audusd_sim: CurrencyPair,
3170 ) {
3171 pyo3::Python::initialize();
3172 Python::attach(|py| {
3173 let py_actor = create_tracking_python_actor(py).unwrap();
3174
3175 let mut rust_actor = PyDataActor::new(None);
3176 rust_actor.set_python_instance(py_actor.clone_ref(py));
3177 rust_actor.register(trader_id, clock, cache).unwrap();
3178
3179 let delta =
3180 OrderBookDelta::clear(audusd_sim.id, 0, UnixNanos::default(), UnixNanos::default());
3181 let deltas = OrderBookDeltas::new(audusd_sim.id, vec![delta]);
3182
3183 let result = rust_actor.inner_mut().on_book_deltas(&deltas);
3184
3185 assert!(result.is_ok());
3186 assert!(python_method_was_called(&py_actor, py, "on_book_deltas"));
3187 });
3188 }
3189
3190 #[rstest]
3191 fn test_python_dispatch_on_mark_price(
3192 clock: Rc<RefCell<TestClock>>,
3193 cache: Rc<RefCell<Cache>>,
3194 trader_id: TraderId,
3195 audusd_sim: CurrencyPair,
3196 ) {
3197 pyo3::Python::initialize();
3198 Python::attach(|py| {
3199 let py_actor = create_tracking_python_actor(py).unwrap();
3200
3201 let mut rust_actor = PyDataActor::new(None);
3202 rust_actor.set_python_instance(py_actor.clone_ref(py));
3203 rust_actor.register(trader_id, clock, cache).unwrap();
3204
3205 let mark_price = MarkPriceUpdate::new(
3206 audusd_sim.id,
3207 Price::from("1.00000"),
3208 UnixNanos::default(),
3209 UnixNanos::default(),
3210 );
3211
3212 let result = rust_actor.inner_mut().on_mark_price(&mark_price);
3213
3214 assert!(result.is_ok());
3215 assert!(python_method_was_called(&py_actor, py, "on_mark_price"));
3216 });
3217 }
3218
3219 #[rstest]
3220 fn test_python_dispatch_on_index_price(
3221 clock: Rc<RefCell<TestClock>>,
3222 cache: Rc<RefCell<Cache>>,
3223 trader_id: TraderId,
3224 audusd_sim: CurrencyPair,
3225 ) {
3226 pyo3::Python::initialize();
3227 Python::attach(|py| {
3228 let py_actor = create_tracking_python_actor(py).unwrap();
3229
3230 let mut rust_actor = PyDataActor::new(None);
3231 rust_actor.set_python_instance(py_actor.clone_ref(py));
3232 rust_actor.register(trader_id, clock, cache).unwrap();
3233
3234 let index_price = IndexPriceUpdate::new(
3235 audusd_sim.id,
3236 Price::from("1.00000"),
3237 UnixNanos::default(),
3238 UnixNanos::default(),
3239 );
3240
3241 let result = rust_actor.inner_mut().on_index_price(&index_price);
3242
3243 assert!(result.is_ok());
3244 assert!(python_method_was_called(&py_actor, py, "on_index_price"));
3245 });
3246 }
3247
3248 #[rstest]
3249 fn test_python_dispatch_on_instrument_status(
3250 clock: Rc<RefCell<TestClock>>,
3251 cache: Rc<RefCell<Cache>>,
3252 trader_id: TraderId,
3253 audusd_sim: CurrencyPair,
3254 ) {
3255 pyo3::Python::initialize();
3256 Python::attach(|py| {
3257 let py_actor = create_tracking_python_actor(py).unwrap();
3258
3259 let mut rust_actor = PyDataActor::new(None);
3260 rust_actor.set_python_instance(py_actor.clone_ref(py));
3261 rust_actor.register(trader_id, clock, cache).unwrap();
3262
3263 let status = InstrumentStatus::new(
3264 audusd_sim.id,
3265 MarketStatusAction::Trading,
3266 UnixNanos::default(),
3267 UnixNanos::default(),
3268 None,
3269 None,
3270 None,
3271 None,
3272 None,
3273 );
3274
3275 let result = rust_actor.inner_mut().on_instrument_status(&status);
3276
3277 assert!(result.is_ok());
3278 assert!(python_method_was_called(
3279 &py_actor,
3280 py,
3281 "on_instrument_status"
3282 ));
3283 });
3284 }
3285
3286 #[rstest]
3287 fn test_python_dispatch_on_instrument_close(
3288 clock: Rc<RefCell<TestClock>>,
3289 cache: Rc<RefCell<Cache>>,
3290 trader_id: TraderId,
3291 audusd_sim: CurrencyPair,
3292 ) {
3293 pyo3::Python::initialize();
3294 Python::attach(|py| {
3295 let py_actor = create_tracking_python_actor(py).unwrap();
3296
3297 let mut rust_actor = PyDataActor::new(None);
3298 rust_actor.set_python_instance(py_actor.clone_ref(py));
3299 rust_actor.register(trader_id, clock, cache).unwrap();
3300
3301 let close = InstrumentClose::new(
3302 audusd_sim.id,
3303 Price::from("1.00000"),
3304 InstrumentCloseType::EndOfSession,
3305 UnixNanos::default(),
3306 UnixNanos::default(),
3307 );
3308
3309 let result = rust_actor.inner_mut().on_instrument_close(&close);
3310
3311 assert!(result.is_ok());
3312 assert!(python_method_was_called(
3313 &py_actor,
3314 py,
3315 "on_instrument_close"
3316 ));
3317 });
3318 }
3319
3320 #[rstest]
3321 fn test_python_dispatch_multiple_calls_tracked(
3322 clock: Rc<RefCell<TestClock>>,
3323 cache: Rc<RefCell<Cache>>,
3324 trader_id: TraderId,
3325 audusd_sim: CurrencyPair,
3326 ) {
3327 pyo3::Python::initialize();
3328 Python::attach(|py| {
3329 let py_actor = create_tracking_python_actor(py).unwrap();
3330
3331 let mut rust_actor = PyDataActor::new(None);
3332 rust_actor.set_python_instance(py_actor.clone_ref(py));
3333 rust_actor.register(trader_id, clock, cache).unwrap();
3334
3335 let quote = QuoteTick::new(
3336 audusd_sim.id,
3337 Price::from("1.00000"),
3338 Price::from("1.00001"),
3339 Quantity::from(100_000),
3340 Quantity::from(100_000),
3341 UnixNanos::default(),
3342 UnixNanos::default(),
3343 );
3344
3345 rust_actor.inner_mut().on_quote("e).unwrap();
3346 rust_actor.inner_mut().on_quote("e).unwrap();
3347 rust_actor.inner_mut().on_quote("e).unwrap();
3348
3349 assert_eq!(python_method_call_count(&py_actor, py, "on_quote"), 3);
3350 });
3351 }
3352
3353 #[rstest]
3354 fn test_python_dispatch_no_call_when_py_self_not_set(
3355 clock: Rc<RefCell<TestClock>>,
3356 cache: Rc<RefCell<Cache>>,
3357 trader_id: TraderId,
3358 ) {
3359 pyo3::Python::initialize();
3360 Python::attach(|_py| {
3361 let mut rust_actor = PyDataActor::new(None);
3362 rust_actor.register(trader_id, clock, cache).unwrap();
3363
3364 let result = DataActor::on_start(rust_actor.inner_mut());
3366 assert!(result.is_ok());
3367 });
3368 }
3369}