1use std::{
19 any::Any,
20 cell::RefCell,
21 collections::HashMap,
22 num::NonZeroUsize,
23 ops::{Deref, DerefMut},
24 rc::Rc,
25};
26
27use indexmap::IndexMap;
28use nautilus_core::{
29 nanos::UnixNanos,
30 python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err},
31};
32#[cfg(feature = "defi")]
33use nautilus_model::defi::{Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap};
34use nautilus_model::{
35 data::{
36 Bar, BarType, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
37 MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
38 },
39 enums::BookType,
40 identifiers::{ActorId, ClientId, InstrumentId, TraderId, Venue},
41 instruments::InstrumentAny,
42 orderbook::OrderBook,
43 python::instruments::instrument_any_to_pyobject,
44};
45use pyo3::{exceptions::PyValueError, prelude::*, types::PyDict};
46
47use crate::{
48 actor::{
49 DataActor,
50 data_actor::{DataActorConfig, DataActorCore, ImportableActorConfig},
51 registry::try_get_actor_unchecked,
52 },
53 cache::Cache,
54 clock::Clock,
55 component::Component,
56 enums::ComponentState,
57 python::{clock::PyClock, logging::PyLogger},
58 signal::Signal,
59 timer::{TimeEvent, TimeEventCallback},
60};
61
62#[pyo3::pymethods]
63impl DataActorConfig {
64 #[new]
65 #[pyo3(signature = (actor_id=None, log_events=true, log_commands=true))]
66 fn py_new(actor_id: Option<ActorId>, log_events: bool, log_commands: bool) -> Self {
67 Self {
68 actor_id,
69 log_events,
70 log_commands,
71 }
72 }
73}
74
75#[pyo3::pymethods]
76impl ImportableActorConfig {
77 #[new]
78 fn py_new(actor_path: String, config_path: String, config: Py<PyDict>) -> PyResult<Self> {
79 let json_config = Python::with_gil(|py| -> PyResult<HashMap<String, serde_json::Value>> {
80 let json_str: String = PyModule::import(py, "json")?
81 .call_method("dumps", (config.bind(py),), None)?
82 .extract()?;
83
84 let json_value: serde_json::Value = serde_json::from_str(&json_str)
85 .map_err(|e| PyErr::new::<PyValueError, _>(e.to_string()))?;
86
87 if let serde_json::Value::Object(map) = json_value {
88 Ok(map.into_iter().collect())
89 } else {
90 Err(PyErr::new::<PyValueError, _>("Config must be a dictionary"))
91 }
92 })?;
93
94 Ok(Self {
95 actor_path,
96 config_path,
97 config: json_config,
98 })
99 }
100
101 #[getter]
102 fn actor_path(&self) -> &String {
103 &self.actor_path
104 }
105
106 #[getter]
107 fn config_path(&self) -> &String {
108 &self.config_path
109 }
110
111 #[getter]
112 fn config(&self, py: Python<'_>) -> PyResult<Py<PyDict>> {
113 let py_dict = PyDict::new(py);
115 for (key, value) in &self.config {
116 let json_str = serde_json::to_string(value)
118 .map_err(|e| PyErr::new::<PyValueError, _>(e.to_string()))?;
119 let py_value = PyModule::import(py, "json")?.call_method("loads", (json_str,), None)?;
120 py_dict.set_item(key, py_value)?;
121 }
122 Ok(py_dict.unbind())
123 }
124}
125
126#[allow(non_camel_case_types)]
127#[pyo3::pyclass(
128 module = "nautilus_trader.common",
129 name = "DataActor",
130 unsendable,
131 subclass
132)]
133#[derive(Debug)]
134pub struct PyDataActor {
135 core: DataActorCore,
136 py_self: Option<PyObject>,
137 clock: PyClock,
138 logger: PyLogger,
139}
140
141impl Deref for PyDataActor {
142 type Target = DataActorCore;
143
144 fn deref(&self) -> &Self::Target {
145 &self.core
146 }
147}
148
149impl DerefMut for PyDataActor {
150 fn deref_mut(&mut self) -> &mut Self::Target {
151 &mut self.core
152 }
153}
154
155impl PyDataActor {
156 pub fn new(config: Option<DataActorConfig>) -> Self {
158 let config = config.unwrap_or_default();
159 let core = DataActorCore::new(config);
160 let clock = PyClock::new_test(); let logger = PyLogger::new(core.actor_id().as_str());
162
163 Self {
164 core,
165 py_self: None,
166 clock,
167 logger,
168 }
169 }
170
171 pub fn set_python_instance(&mut self, py_obj: PyObject) {
178 self.py_self = Some(py_obj);
179 }
180
181 pub fn set_actor_id(&mut self, actor_id: ActorId) {
190 self.core.config.actor_id = Some(actor_id);
191 self.core.actor_id = actor_id;
192 }
193
194 pub fn set_log_events(&mut self, log_events: bool) {
196 self.core.config.log_events = log_events;
197 }
198
199 pub fn set_log_commands(&mut self, log_commands: bool) {
201 self.core.config.log_commands = log_commands;
202 }
203 pub fn mem_address(&self) -> String {
205 self.core.mem_address()
206 }
207
208 pub fn is_registered(&self) -> bool {
210 self.core.is_registered()
211 }
212
213 pub fn register(
220 &mut self,
221 trader_id: TraderId,
222 clock: Rc<RefCell<dyn Clock>>,
223 cache: Rc<RefCell<Cache>>,
224 ) -> anyhow::Result<()> {
225 self.core.register(trader_id, clock, cache)?;
226
227 self.clock = PyClock::from_rc(self.core.clock_rc());
228
229 let actor_id = self.actor_id().inner();
231 let callback = TimeEventCallback::Rust(Rc::new(move |event: TimeEvent| {
232 if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
233 if let Err(e) = actor.on_time_event(&event) {
234 log::error!("Python time event handler failed for actor {actor_id}: {e}");
235 }
236 } else {
237 log::error!("Actor {actor_id} not found for time event handling");
238 }
239 }));
240
241 self.clock.inner_mut().register_default_handler(callback);
242
243 self.initialize()
244 }
245}
246
247impl DataActor for PyDataActor {
248 fn on_start(&mut self) -> anyhow::Result<()> {
249 self.py_on_start()
250 .map_err(|e| anyhow::anyhow!("Python on_start failed: {e}"))
251 }
252
253 fn on_stop(&mut self) -> anyhow::Result<()> {
254 self.py_on_stop()
255 .map_err(|e| anyhow::anyhow!("Python on_stop failed: {e}"))
256 }
257
258 fn on_resume(&mut self) -> anyhow::Result<()> {
259 self.py_on_resume()
260 .map_err(|e| anyhow::anyhow!("Python on_resume failed: {e}"))
261 }
262
263 fn on_reset(&mut self) -> anyhow::Result<()> {
264 self.py_on_reset()
265 .map_err(|e| anyhow::anyhow!("Python on_reset failed: {e}"))
266 }
267
268 fn on_dispose(&mut self) -> anyhow::Result<()> {
269 self.py_on_dispose()
270 .map_err(|e| anyhow::anyhow!("Python on_dispose failed: {e}"))
271 }
272
273 fn on_degrade(&mut self) -> anyhow::Result<()> {
274 self.py_on_degrade()
275 .map_err(|e| anyhow::anyhow!("Python on_degrade failed: {e}"))
276 }
277
278 fn on_fault(&mut self) -> anyhow::Result<()> {
279 self.py_on_fault()
280 .map_err(|e| anyhow::anyhow!("Python on_fault failed: {e}"))
281 }
282
283 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
284 self.py_on_time_event(event.clone())
285 .map_err(|e| anyhow::anyhow!("Python on_time_event failed: {e}"))
286 }
287
288 #[allow(unused_variables)]
289 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
290 Python::with_gil(|py| {
291 let py_data = py.None();
294
295 self.py_on_data(py_data)
296 .map_err(|e| anyhow::anyhow!("Python on_data failed: {e}"))
297 })
298 }
299
300 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
301 self.py_on_signal(signal)
302 .map_err(|e| anyhow::anyhow!("Python on_signal failed: {e}"))
303 }
304
305 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
306 Python::with_gil(|py| {
307 let py_instrument = instrument_any_to_pyobject(py, instrument.clone())
308 .map_err(|e| anyhow::anyhow!("Failed to convert InstrumentAny to Python: {e}"))?;
309 self.py_on_instrument(py_instrument)
310 .map_err(|e| anyhow::anyhow!("Python on_instrument failed: {e}"))
311 })
312 }
313
314 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
315 self.py_on_quote(*quote)
316 .map_err(|e| anyhow::anyhow!("Python on_quote failed: {e}"))
317 }
318
319 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
320 self.py_on_trade(*tick)
321 .map_err(|e| anyhow::anyhow!("Python on_trade failed: {e}"))
322 }
323
324 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
325 self.py_on_bar(*bar)
326 .map_err(|e| anyhow::anyhow!("Python on_bar failed: {e}"))
327 }
328
329 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
330 self.py_on_book_deltas(deltas.clone())
331 .map_err(|e| anyhow::anyhow!("Python on_book_deltas failed: {e}"))
332 }
333
334 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
335 self.py_on_book(order_book)
336 .map_err(|e| anyhow::anyhow!("Python on_book failed: {e}"))
337 }
338
339 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
340 self.py_on_mark_price(*mark_price)
341 .map_err(|e| anyhow::anyhow!("Python on_mark_price failed: {e}"))
342 }
343
344 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
345 self.py_on_index_price(*index_price)
346 .map_err(|e| anyhow::anyhow!("Python on_index_price failed: {e}"))
347 }
348
349 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
350 self.py_on_funding_rate(*funding_rate)
351 .map_err(|e| anyhow::anyhow!("Python on_funding_rate failed: {e}"))
352 }
353
354 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
355 self.py_on_instrument_status(*data)
356 .map_err(|e| anyhow::anyhow!("Python on_instrument_status failed: {e}"))
357 }
358
359 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
360 self.py_on_instrument_close(*update)
361 .map_err(|e| anyhow::anyhow!("Python on_instrument_close failed: {e}"))
362 }
363
364 #[cfg(feature = "defi")]
365 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
366 self.py_on_block(block.clone())
367 .map_err(|e| anyhow::anyhow!("Python on_block failed: {e}"))
368 }
369
370 #[cfg(feature = "defi")]
371 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
372 self.py_on_pool(pool.clone())
373 .map_err(|e| anyhow::anyhow!("Python on_pool failed: {e}"))
374 }
375
376 #[cfg(feature = "defi")]
377 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
378 self.py_on_pool_swap(swap.clone())
379 .map_err(|e| anyhow::anyhow!("Python on_pool_swap failed: {e}"))
380 }
381
382 #[cfg(feature = "defi")]
383 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
384 self.py_on_pool_liquidity_update(update.clone())
385 .map_err(|e| anyhow::anyhow!("Python on_pool_liquidity_update failed: {e}"))
386 }
387
388 fn on_historical_data(&mut self, _data: &dyn Any) -> anyhow::Result<()> {
389 Python::with_gil(|py| {
390 let py_data = py.None();
391 self.py_on_historical_data(py_data)
392 .map_err(|e| anyhow::anyhow!("Python on_historical_data failed: {e}"))
393 })
394 }
395
396 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
397 self.py_on_historical_quotes(quotes.to_vec())
398 .map_err(|e| anyhow::anyhow!("Python on_historical_quotes failed: {e}"))
399 }
400
401 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
402 self.py_on_historical_trades(trades.to_vec())
403 .map_err(|e| anyhow::anyhow!("Python on_historical_trades failed: {e}"))
404 }
405
406 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
407 self.py_on_historical_bars(bars.to_vec())
408 .map_err(|e| anyhow::anyhow!("Python on_historical_bars failed: {e}"))
409 }
410
411 fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
412 self.py_on_historical_mark_prices(mark_prices.to_vec())
413 .map_err(|e| anyhow::anyhow!("Python on_historical_mark_prices failed: {e}"))
414 }
415
416 fn on_historical_index_prices(
417 &mut self,
418 index_prices: &[IndexPriceUpdate],
419 ) -> anyhow::Result<()> {
420 self.py_on_historical_index_prices(index_prices.to_vec())
421 .map_err(|e| anyhow::anyhow!("Python on_historical_index_prices failed: {e}"))
422 }
423}
424
425#[pymethods]
426impl PyDataActor {
427 #[new]
428 #[pyo3(signature = (config=None))]
429 fn py_new(config: Option<DataActorConfig>) -> PyResult<Self> {
430 Ok(Self::new(config))
431 }
432
433 #[getter]
434 #[pyo3(name = "clock")]
435 fn py_clock(&self) -> PyResult<PyClock> {
436 if !self.core.is_registered() {
437 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
438 "Actor must be registered with a trader before accessing clock",
439 ))
440 } else {
441 Ok(self.clock.clone())
442 }
443 }
444
445 #[getter]
446 #[pyo3(name = "log")]
447 fn py_log(&self) -> PyLogger {
448 self.logger.clone()
449 }
450
451 #[getter]
452 #[pyo3(name = "actor_id")]
453 fn py_actor_id(&self) -> ActorId {
454 self.actor_id
455 }
456
457 #[getter]
458 #[pyo3(name = "trader_id")]
459 fn py_trader_id(&self) -> Option<TraderId> {
460 self.trader_id()
461 }
462
463 #[pyo3(name = "state")]
464 fn py_state(&self) -> ComponentState {
465 self.state()
466 }
467
468 #[pyo3(name = "is_ready")]
469 fn py_is_ready(&self) -> bool {
470 self.is_ready()
471 }
472
473 #[pyo3(name = "is_running")]
474 fn py_is_running(&self) -> bool {
475 self.is_running()
476 }
477
478 #[pyo3(name = "is_stopped")]
479 fn py_is_stopped(&self) -> bool {
480 self.is_stopped()
481 }
482
483 #[pyo3(name = "is_degraded")]
484 fn py_is_degraded(&self) -> bool {
485 self.is_degraded()
486 }
487
488 #[pyo3(name = "is_faulted")]
489 fn py_is_faulted(&self) -> bool {
490 self.is_faulted()
491 }
492
493 #[pyo3(name = "is_disposed")]
494 fn py_is_disposed(&self) -> bool {
495 self.is_disposed()
496 }
497
498 #[pyo3(name = "start")]
499 fn py_start(&mut self) -> PyResult<()> {
500 self.start().map_err(to_pyruntime_err)
501 }
502
503 #[pyo3(name = "stop")]
504 fn py_stop(&mut self) -> PyResult<()> {
505 self.stop().map_err(to_pyruntime_err)
506 }
507
508 #[pyo3(name = "resume")]
509 fn py_resume(&mut self) -> PyResult<()> {
510 self.resume().map_err(to_pyruntime_err)
511 }
512
513 #[pyo3(name = "reset")]
514 fn py_reset(&mut self) -> PyResult<()> {
515 self.reset().map_err(to_pyruntime_err)
516 }
517
518 #[pyo3(name = "dispose")]
519 fn py_dispose(&mut self) -> PyResult<()> {
520 self.dispose().map_err(to_pyruntime_err)
521 }
522
523 #[pyo3(name = "degrade")]
524 fn py_degrade(&mut self) -> PyResult<()> {
525 self.degrade().map_err(to_pyruntime_err)
526 }
527
528 #[pyo3(name = "fault")]
529 fn py_fault(&mut self) -> PyResult<()> {
530 self.fault().map_err(to_pyruntime_err)
531 }
532
533 #[pyo3(name = "shutdown_system")]
534 #[pyo3(signature = (reason=None))]
535 fn py_shutdown_system(&self, reason: Option<String>) -> PyResult<()> {
536 self.shutdown_system(reason);
537 Ok(())
538 }
539
540 #[pyo3(name = "on_start")]
541 fn py_on_start(&self) -> PyResult<()> {
542 if let Some(ref py_self) = self.py_self {
544 Python::with_gil(|py| py_self.call_method0(py, "on_start"))?;
545 }
546 Ok(())
547 }
548
549 #[pyo3(name = "on_stop")]
550 fn py_on_stop(&mut self) -> PyResult<()> {
551 if let Some(ref py_self) = self.py_self {
553 Python::with_gil(|py| py_self.call_method0(py, "on_stop"))?;
554 }
555 Ok(())
556 }
557
558 #[pyo3(name = "on_resume")]
559 fn py_on_resume(&mut self) -> PyResult<()> {
560 if let Some(ref py_self) = self.py_self {
562 Python::with_gil(|py| py_self.call_method0(py, "on_resume"))?;
563 }
564 Ok(())
565 }
566
567 #[pyo3(name = "on_reset")]
568 fn py_on_reset(&mut self) -> PyResult<()> {
569 if let Some(ref py_self) = self.py_self {
571 Python::with_gil(|py| py_self.call_method0(py, "on_reset"))?;
572 }
573 Ok(())
574 }
575
576 #[pyo3(name = "on_dispose")]
577 fn py_on_dispose(&mut self) -> PyResult<()> {
578 if let Some(ref py_self) = self.py_self {
580 Python::with_gil(|py| py_self.call_method0(py, "on_dispose"))?;
581 }
582 Ok(())
583 }
584
585 #[pyo3(name = "on_degrade")]
586 fn py_on_degrade(&mut self) -> PyResult<()> {
587 if let Some(ref py_self) = self.py_self {
589 Python::with_gil(|py| py_self.call_method0(py, "on_degrade"))?;
590 }
591 Ok(())
592 }
593
594 #[pyo3(name = "on_fault")]
595 fn py_on_fault(&mut self) -> PyResult<()> {
596 if let Some(ref py_self) = self.py_self {
598 Python::with_gil(|py| py_self.call_method0(py, "on_fault"))?;
599 }
600 Ok(())
601 }
602
603 #[allow(unused_variables)]
604 #[pyo3(name = "on_time_event")]
605 fn py_on_time_event(&mut self, event: TimeEvent) -> PyResult<()> {
606 if let Some(ref py_self) = self.py_self {
608 Python::with_gil(|py| {
609 py_self.call_method1(py, "on_time_event", (event.into_py_any_unwrap(py),))
610 })?;
611 }
612 Ok(())
613 }
614
615 #[pyo3(name = "on_data")]
616 fn py_on_data(&mut self, data: PyObject) -> PyResult<()> {
617 if let Some(ref py_self) = self.py_self {
619 Python::with_gil(|py| py_self.call_method1(py, "on_data", (data,)))?;
620 }
621 Ok(())
622 }
623
624 #[allow(unused_variables)]
625 #[pyo3(name = "on_signal")]
626 fn py_on_signal(&mut self, signal: &Signal) -> PyResult<()> {
627 if let Some(ref py_self) = self.py_self {
629 Python::with_gil(|py| {
630 py_self.call_method1(py, "on_signal", (signal.clone().into_py_any_unwrap(py),))
631 })?;
632 }
633 Ok(())
634 }
635
636 #[pyo3(name = "on_instrument")]
637 fn py_on_instrument(&mut self, instrument: PyObject) -> PyResult<()> {
638 if let Some(ref py_self) = self.py_self {
640 Python::with_gil(|py| py_self.call_method1(py, "on_instrument", (instrument,)))?;
641 }
642 Ok(())
643 }
644
645 #[allow(unused_variables)]
646 #[pyo3(name = "on_quote")]
647 fn py_on_quote(&mut self, quote: QuoteTick) -> PyResult<()> {
648 if let Some(ref py_self) = self.py_self {
650 Python::with_gil(|py| {
651 py_self.call_method1(py, "on_quote", (quote.into_py_any_unwrap(py),))
652 })?;
653 }
654 Ok(())
655 }
656
657 #[allow(unused_variables)]
658 #[pyo3(name = "on_trade")]
659 fn py_on_trade(&mut self, trade: TradeTick) -> PyResult<()> {
660 if let Some(ref py_self) = self.py_self {
662 Python::with_gil(|py| {
663 py_self.call_method1(py, "on_trade", (trade.into_py_any_unwrap(py),))
664 })?;
665 }
666 Ok(())
667 }
668
669 #[allow(unused_variables)]
670 #[pyo3(name = "on_bar")]
671 fn py_on_bar(&mut self, bar: Bar) -> PyResult<()> {
672 if let Some(ref py_self) = self.py_self {
674 Python::with_gil(|py| {
675 py_self.call_method1(py, "on_bar", (bar.into_py_any_unwrap(py),))
676 })?;
677 }
678 Ok(())
679 }
680
681 #[allow(unused_variables)]
682 #[pyo3(name = "on_book_deltas")]
683 fn py_on_book_deltas(&mut self, deltas: OrderBookDeltas) -> PyResult<()> {
684 if let Some(ref py_self) = self.py_self {
686 Python::with_gil(|py| {
687 py_self.call_method1(py, "on_book_deltas", (deltas.into_py_any_unwrap(py),))
688 })?;
689 }
690 Ok(())
691 }
692
693 #[allow(unused_variables)]
694 #[pyo3(name = "on_book")]
695 fn py_on_book(&mut self, book: &OrderBook) -> PyResult<()> {
696 if let Some(ref py_self) = self.py_self {
698 Python::with_gil(|py| {
699 py_self.call_method1(py, "on_book", (book.clone().into_py_any_unwrap(py),))
700 })?;
701 }
702 Ok(())
703 }
704
705 #[allow(unused_variables)]
706 #[pyo3(name = "on_mark_price")]
707 fn py_on_mark_price(&mut self, mark_price: MarkPriceUpdate) -> PyResult<()> {
708 if let Some(ref py_self) = self.py_self {
710 Python::with_gil(|py| {
711 py_self.call_method1(py, "on_mark_price", (mark_price.into_py_any_unwrap(py),))
712 })?;
713 }
714 Ok(())
715 }
716
717 #[allow(unused_variables)]
718 #[pyo3(name = "on_index_price")]
719 fn py_on_index_price(&mut self, index_price: IndexPriceUpdate) -> PyResult<()> {
720 if let Some(ref py_self) = self.py_self {
722 Python::with_gil(|py| {
723 py_self.call_method1(py, "on_index_price", (index_price.into_py_any_unwrap(py),))
724 })?;
725 }
726 Ok(())
727 }
728
729 #[allow(unused_variables)]
730 #[pyo3(name = "on_funding_rate")]
731 fn py_on_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> PyResult<()> {
732 if let Some(ref py_self) = self.py_self {
734 Python::with_gil(|py| {
735 py_self.call_method1(
736 py,
737 "on_funding_rate",
738 (funding_rate.into_py_any_unwrap(py),),
739 )
740 })?;
741 }
742 Ok(())
743 }
744
745 #[allow(unused_variables)]
746 #[pyo3(name = "on_instrument_status")]
747 fn py_on_instrument_status(&mut self, status: InstrumentStatus) -> PyResult<()> {
748 if let Some(ref py_self) = self.py_self {
750 Python::with_gil(|py| {
751 py_self.call_method1(py, "on_instrument_status", (status.into_py_any_unwrap(py),))
752 })?;
753 }
754 Ok(())
755 }
756
757 #[allow(unused_variables)]
758 #[pyo3(name = "on_instrument_close")]
759 fn py_on_instrument_close(&mut self, close: InstrumentClose) -> PyResult<()> {
760 if let Some(ref py_self) = self.py_self {
762 Python::with_gil(|py| {
763 py_self.call_method1(py, "on_instrument_close", (close.into_py_any_unwrap(py),))
764 })?;
765 }
766 Ok(())
767 }
768
769 #[cfg(feature = "defi")]
770 #[allow(unused_variables)]
771 #[pyo3(name = "on_block")]
772 fn py_on_block(&mut self, block: Block) -> PyResult<()> {
773 if let Some(ref py_self) = self.py_self {
775 Python::with_gil(|py| {
776 py_self.call_method1(py, "on_block", (block.into_py_any_unwrap(py),))
777 })?;
778 }
779 Ok(())
780 }
781
782 #[cfg(feature = "defi")]
783 #[allow(unused_variables)]
784 #[pyo3(name = "on_pool")]
785 fn py_on_pool(&mut self, pool: Pool) -> PyResult<()> {
786 if let Some(ref py_self) = self.py_self {
788 Python::with_gil(|py| {
789 py_self.call_method1(py, "on_pool", (pool.into_py_any_unwrap(py),))
790 })?;
791 }
792 Ok(())
793 }
794
795 #[cfg(feature = "defi")]
796 #[allow(unused_variables)]
797 #[pyo3(name = "on_pool_swap")]
798 fn py_on_pool_swap(&mut self, swap: PoolSwap) -> PyResult<()> {
799 if let Some(ref py_self) = self.py_self {
801 Python::with_gil(|py| {
802 py_self.call_method1(py, "on_pool_swap", (swap.into_py_any_unwrap(py),))
803 })?;
804 }
805 Ok(())
806 }
807
808 #[cfg(feature = "defi")]
809 #[allow(unused_variables)]
810 #[pyo3(name = "on_pool_liquidity_update")]
811 fn py_on_pool_liquidity_update(&mut self, update: PoolLiquidityUpdate) -> PyResult<()> {
812 if let Some(ref py_self) = self.py_self {
814 Python::with_gil(|py| {
815 py_self.call_method1(
816 py,
817 "on_pool_liquidity_update",
818 (update.into_py_any_unwrap(py),),
819 )
820 })?;
821 }
822 Ok(())
823 }
824
825 #[pyo3(name = "subscribe_data")]
826 #[pyo3(signature = (data_type, client_id=None, params=None))]
827 fn py_subscribe_data(
828 &mut self,
829 data_type: DataType,
830 client_id: Option<ClientId>,
831 params: Option<IndexMap<String, String>>,
832 ) -> PyResult<()> {
833 self.subscribe_data(data_type, client_id, params);
834 Ok(())
835 }
836
837 #[pyo3(name = "subscribe_instruments")]
838 #[pyo3(signature = (venue, client_id=None, params=None))]
839 fn py_subscribe_instruments(
840 &mut self,
841 venue: Venue,
842 client_id: Option<ClientId>,
843 params: Option<IndexMap<String, String>>,
844 ) -> PyResult<()> {
845 self.subscribe_instruments(venue, client_id, params);
846 Ok(())
847 }
848
849 #[pyo3(name = "subscribe_instrument")]
850 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
851 fn py_subscribe_instrument(
852 &mut self,
853 instrument_id: InstrumentId,
854 client_id: Option<ClientId>,
855 params: Option<IndexMap<String, String>>,
856 ) -> PyResult<()> {
857 self.subscribe_instrument(instrument_id, client_id, params);
858 Ok(())
859 }
860
861 #[pyo3(name = "subscribe_book_deltas")]
862 #[pyo3(signature = (instrument_id, book_type, depth=None, client_id=None, managed=false, params=None))]
863 fn py_subscribe_book_deltas(
864 &mut self,
865 instrument_id: InstrumentId,
866 book_type: BookType,
867 depth: Option<usize>,
868 client_id: Option<ClientId>,
869 managed: bool,
870 params: Option<IndexMap<String, String>>,
871 ) -> PyResult<()> {
872 let depth = depth.and_then(NonZeroUsize::new);
873 self.subscribe_book_deltas(instrument_id, book_type, depth, client_id, managed, params);
874 Ok(())
875 }
876
877 #[pyo3(name = "subscribe_book_at_interval")]
878 #[pyo3(signature = (instrument_id, book_type, interval_ms, depth=None, client_id=None, params=None))]
879 fn py_subscribe_book_at_interval(
880 &mut self,
881 instrument_id: InstrumentId,
882 book_type: BookType,
883 interval_ms: usize,
884 depth: Option<usize>,
885 client_id: Option<ClientId>,
886 params: Option<IndexMap<String, String>>,
887 ) -> PyResult<()> {
888 let depth = depth.and_then(NonZeroUsize::new);
889 let interval_ms = NonZeroUsize::new(interval_ms)
890 .ok_or_else(|| PyErr::new::<PyValueError, _>("interval_ms must be > 0"))?;
891
892 self.subscribe_book_at_interval(
893 instrument_id,
894 book_type,
895 depth,
896 interval_ms,
897 client_id,
898 params,
899 );
900 Ok(())
901 }
902
903 #[pyo3(name = "subscribe_quotes")]
904 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
905 fn py_subscribe_quotes(
906 &mut self,
907 instrument_id: InstrumentId,
908 client_id: Option<ClientId>,
909 params: Option<IndexMap<String, String>>,
910 ) -> PyResult<()> {
911 self.subscribe_quotes(instrument_id, client_id, params);
912 Ok(())
913 }
914
915 #[pyo3(name = "subscribe_trades")]
916 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
917 fn py_subscribe_trades(
918 &mut self,
919 instrument_id: InstrumentId,
920 client_id: Option<ClientId>,
921 params: Option<IndexMap<String, String>>,
922 ) -> PyResult<()> {
923 self.subscribe_trades(instrument_id, client_id, params);
924 Ok(())
925 }
926
927 #[pyo3(name = "subscribe_bars")]
928 #[pyo3(signature = (bar_type, client_id=None, await_partial=false, params=None))]
929 fn py_subscribe_bars(
930 &mut self,
931 bar_type: BarType,
932 client_id: Option<ClientId>,
933 await_partial: bool,
934 params: Option<IndexMap<String, String>>,
935 ) -> PyResult<()> {
936 self.subscribe_bars(bar_type, client_id, await_partial, params);
937 Ok(())
938 }
939
940 #[pyo3(name = "subscribe_mark_prices")]
941 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
942 fn py_subscribe_mark_prices(
943 &mut self,
944 instrument_id: InstrumentId,
945 client_id: Option<ClientId>,
946 params: Option<IndexMap<String, String>>,
947 ) -> PyResult<()> {
948 self.subscribe_mark_prices(instrument_id, client_id, params);
949 Ok(())
950 }
951
952 #[pyo3(name = "subscribe_index_prices")]
953 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
954 fn py_subscribe_index_prices(
955 &mut self,
956 instrument_id: InstrumentId,
957 client_id: Option<ClientId>,
958 params: Option<IndexMap<String, String>>,
959 ) -> PyResult<()> {
960 self.subscribe_index_prices(instrument_id, client_id, params);
961 Ok(())
962 }
963
964 #[pyo3(name = "subscribe_instrument_status")]
965 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
966 fn py_subscribe_instrument_status(
967 &mut self,
968 instrument_id: InstrumentId,
969 client_id: Option<ClientId>,
970 params: Option<IndexMap<String, String>>,
971 ) -> PyResult<()> {
972 self.subscribe_instrument_status(instrument_id, client_id, params);
973 Ok(())
974 }
975
976 #[pyo3(name = "subscribe_instrument_close")]
977 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
978 fn py_subscribe_instrument_close(
979 &mut self,
980 instrument_id: InstrumentId,
981 client_id: Option<ClientId>,
982 params: Option<IndexMap<String, String>>,
983 ) -> PyResult<()> {
984 self.subscribe_instrument_close(instrument_id, client_id, params);
985 Ok(())
986 }
987
988 #[cfg(feature = "defi")]
989 #[pyo3(name = "subscribe_blocks")]
990 #[pyo3(signature = (chain, client_id=None, params=None))]
991 fn py_subscribe_blocks(
992 &mut self,
993 chain: Blockchain,
994 client_id: Option<ClientId>,
995 params: Option<IndexMap<String, String>>,
996 ) -> PyResult<()> {
997 self.subscribe_blocks(chain, client_id, params);
998 Ok(())
999 }
1000
1001 #[cfg(feature = "defi")]
1002 #[pyo3(name = "subscribe_pool")]
1003 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1004 fn py_subscribe_pool(
1005 &mut self,
1006 instrument_id: InstrumentId,
1007 client_id: Option<ClientId>,
1008 params: Option<IndexMap<String, String>>,
1009 ) -> PyResult<()> {
1010 self.subscribe_pool(instrument_id, client_id, params);
1011 Ok(())
1012 }
1013
1014 #[cfg(feature = "defi")]
1015 #[pyo3(name = "subscribe_pool_swaps")]
1016 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1017 fn py_subscribe_pool_swaps(
1018 &mut self,
1019 instrument_id: InstrumentId,
1020 client_id: Option<ClientId>,
1021 params: Option<IndexMap<String, String>>,
1022 ) -> PyResult<()> {
1023 self.subscribe_pool_swaps(instrument_id, client_id, params);
1024 Ok(())
1025 }
1026
1027 #[cfg(feature = "defi")]
1028 #[pyo3(name = "subscribe_pool_liquidity_updates")]
1029 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1030 fn py_subscribe_pool_liquidity_updates(
1031 &mut self,
1032 instrument_id: InstrumentId,
1033 client_id: Option<ClientId>,
1034 params: Option<IndexMap<String, String>>,
1035 ) -> PyResult<()> {
1036 self.subscribe_pool_liquidity_updates(instrument_id, client_id, params);
1037 Ok(())
1038 }
1039
1040 #[pyo3(name = "request_data")]
1041 #[pyo3(signature = (data_type, client_id, start=None, end=None, limit=None, params=None))]
1042 fn py_request_data(
1043 &mut self,
1044 data_type: DataType,
1045 client_id: ClientId,
1046 start: Option<u64>,
1047 end: Option<u64>,
1048 limit: Option<usize>,
1049 params: Option<IndexMap<String, String>>,
1050 ) -> PyResult<String> {
1051 let limit = limit.and_then(NonZeroUsize::new);
1052 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1053 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1054
1055 let request_id = self
1056 .request_data(data_type, client_id, start, end, limit, params)
1057 .map_err(to_pyvalue_err)?;
1058 Ok(request_id.to_string())
1059 }
1060
1061 #[pyo3(name = "request_instrument")]
1062 #[pyo3(signature = (instrument_id, start=None, end=None, client_id=None, params=None))]
1063 fn py_request_instrument(
1064 &mut self,
1065 instrument_id: InstrumentId,
1066 start: Option<u64>,
1067 end: Option<u64>,
1068 client_id: Option<ClientId>,
1069 params: Option<IndexMap<String, String>>,
1070 ) -> PyResult<String> {
1071 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1072 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1073
1074 let request_id = self
1075 .request_instrument(instrument_id, start, end, client_id, params)
1076 .map_err(to_pyvalue_err)?;
1077 Ok(request_id.to_string())
1078 }
1079
1080 #[pyo3(name = "request_instruments")]
1081 #[pyo3(signature = (venue=None, start=None, end=None, client_id=None, params=None))]
1082 fn py_request_instruments(
1083 &mut self,
1084 venue: Option<Venue>,
1085 start: Option<u64>,
1086 end: Option<u64>,
1087 client_id: Option<ClientId>,
1088 params: Option<IndexMap<String, String>>,
1089 ) -> PyResult<String> {
1090 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1091 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1092
1093 let request_id = self
1094 .request_instruments(venue, start, end, client_id, params)
1095 .map_err(to_pyvalue_err)?;
1096 Ok(request_id.to_string())
1097 }
1098
1099 #[pyo3(name = "request_book_snapshot")]
1100 #[pyo3(signature = (instrument_id, depth=None, client_id=None, params=None))]
1101 fn py_request_book_snapshot(
1102 &mut self,
1103 instrument_id: InstrumentId,
1104 depth: Option<usize>,
1105 client_id: Option<ClientId>,
1106 params: Option<IndexMap<String, String>>,
1107 ) -> PyResult<String> {
1108 let depth = depth.and_then(NonZeroUsize::new);
1109
1110 let request_id = self
1111 .request_book_snapshot(instrument_id, depth, client_id, params)
1112 .map_err(to_pyvalue_err)?;
1113 Ok(request_id.to_string())
1114 }
1115
1116 #[pyo3(name = "request_quotes")]
1117 #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1118 fn py_request_quotes(
1119 &mut self,
1120 instrument_id: InstrumentId,
1121 start: Option<u64>,
1122 end: Option<u64>,
1123 limit: Option<usize>,
1124 client_id: Option<ClientId>,
1125 params: Option<IndexMap<String, String>>,
1126 ) -> PyResult<String> {
1127 let limit = limit.and_then(NonZeroUsize::new);
1128 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1129 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1130
1131 let request_id = self
1132 .request_quotes(instrument_id, start, end, limit, client_id, params)
1133 .map_err(to_pyvalue_err)?;
1134 Ok(request_id.to_string())
1135 }
1136
1137 #[pyo3(name = "request_trades")]
1138 #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1139 fn py_request_trades(
1140 &mut self,
1141 instrument_id: InstrumentId,
1142 start: Option<u64>,
1143 end: Option<u64>,
1144 limit: Option<usize>,
1145 client_id: Option<ClientId>,
1146 params: Option<IndexMap<String, String>>,
1147 ) -> PyResult<String> {
1148 let limit = limit.and_then(NonZeroUsize::new);
1149 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1150 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1151
1152 let request_id = self
1153 .request_trades(instrument_id, start, end, limit, client_id, params)
1154 .map_err(to_pyvalue_err)?;
1155 Ok(request_id.to_string())
1156 }
1157
1158 #[pyo3(name = "request_bars")]
1159 #[pyo3(signature = (bar_type, start=None, end=None, limit=None, client_id=None, params=None))]
1160 fn py_request_bars(
1161 &mut self,
1162 bar_type: BarType,
1163 start: Option<u64>,
1164 end: Option<u64>,
1165 limit: Option<usize>,
1166 client_id: Option<ClientId>,
1167 params: Option<IndexMap<String, String>>,
1168 ) -> PyResult<String> {
1169 let limit = limit.and_then(NonZeroUsize::new);
1170 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1171 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1172
1173 let request_id = self
1174 .request_bars(bar_type, start, end, limit, client_id, params)
1175 .map_err(to_pyvalue_err)?;
1176 Ok(request_id.to_string())
1177 }
1178
1179 #[pyo3(name = "unsubscribe_data")]
1180 #[pyo3(signature = (data_type, client_id=None, params=None))]
1181 fn py_unsubscribe_data(
1182 &mut self,
1183 data_type: DataType,
1184 client_id: Option<ClientId>,
1185 params: Option<IndexMap<String, String>>,
1186 ) -> PyResult<()> {
1187 self.unsubscribe_data(data_type, client_id, params);
1188 Ok(())
1189 }
1190
1191 #[pyo3(name = "unsubscribe_instruments")]
1192 #[pyo3(signature = (venue, client_id=None, params=None))]
1193 fn py_unsubscribe_instruments(
1194 &mut self,
1195 venue: Venue,
1196 client_id: Option<ClientId>,
1197 params: Option<IndexMap<String, String>>,
1198 ) -> PyResult<()> {
1199 self.unsubscribe_instruments(venue, client_id, params);
1200 Ok(())
1201 }
1202
1203 #[pyo3(name = "unsubscribe_instrument")]
1204 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1205 fn py_unsubscribe_instrument(
1206 &mut self,
1207 instrument_id: InstrumentId,
1208 client_id: Option<ClientId>,
1209 params: Option<IndexMap<String, String>>,
1210 ) -> PyResult<()> {
1211 self.unsubscribe_instrument(instrument_id, client_id, params);
1212 Ok(())
1213 }
1214
1215 #[pyo3(name = "unsubscribe_book_deltas")]
1216 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1217 fn py_unsubscribe_book_deltas(
1218 &mut self,
1219 instrument_id: InstrumentId,
1220 client_id: Option<ClientId>,
1221 params: Option<IndexMap<String, String>>,
1222 ) -> PyResult<()> {
1223 self.unsubscribe_book_deltas(instrument_id, client_id, params);
1224 Ok(())
1225 }
1226
1227 #[pyo3(name = "unsubscribe_book_at_interval")]
1228 #[pyo3(signature = (instrument_id, interval_ms, client_id=None, params=None))]
1229 fn py_unsubscribe_book_at_interval(
1230 &mut self,
1231 instrument_id: InstrumentId,
1232 interval_ms: usize,
1233 client_id: Option<ClientId>,
1234 params: Option<IndexMap<String, String>>,
1235 ) -> PyResult<()> {
1236 let interval_ms = NonZeroUsize::new(interval_ms)
1237 .ok_or_else(|| PyErr::new::<PyValueError, _>("interval_ms must be > 0"))?;
1238
1239 self.unsubscribe_book_at_interval(instrument_id, interval_ms, client_id, params);
1240 Ok(())
1241 }
1242
1243 #[pyo3(name = "unsubscribe_quotes")]
1244 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1245 fn py_unsubscribe_quotes(
1246 &mut self,
1247 instrument_id: InstrumentId,
1248 client_id: Option<ClientId>,
1249 params: Option<IndexMap<String, String>>,
1250 ) -> PyResult<()> {
1251 self.unsubscribe_quotes(instrument_id, client_id, params);
1252 Ok(())
1253 }
1254
1255 #[pyo3(name = "unsubscribe_trades")]
1256 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1257 fn py_unsubscribe_trades(
1258 &mut self,
1259 instrument_id: InstrumentId,
1260 client_id: Option<ClientId>,
1261 params: Option<IndexMap<String, String>>,
1262 ) -> PyResult<()> {
1263 self.unsubscribe_trades(instrument_id, client_id, params);
1264 Ok(())
1265 }
1266
1267 #[pyo3(name = "unsubscribe_bars")]
1268 #[pyo3(signature = (bar_type, client_id=None, params=None))]
1269 fn py_unsubscribe_bars(
1270 &mut self,
1271 bar_type: BarType,
1272 client_id: Option<ClientId>,
1273 params: Option<IndexMap<String, String>>,
1274 ) -> PyResult<()> {
1275 self.unsubscribe_bars(bar_type, client_id, params);
1276 Ok(())
1277 }
1278
1279 #[pyo3(name = "unsubscribe_mark_prices")]
1280 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1281 fn py_unsubscribe_mark_prices(
1282 &mut self,
1283 instrument_id: InstrumentId,
1284 client_id: Option<ClientId>,
1285 params: Option<IndexMap<String, String>>,
1286 ) -> PyResult<()> {
1287 self.unsubscribe_mark_prices(instrument_id, client_id, params);
1288 Ok(())
1289 }
1290
1291 #[pyo3(name = "unsubscribe_index_prices")]
1292 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1293 fn py_unsubscribe_index_prices(
1294 &mut self,
1295 instrument_id: InstrumentId,
1296 client_id: Option<ClientId>,
1297 params: Option<IndexMap<String, String>>,
1298 ) -> PyResult<()> {
1299 self.unsubscribe_index_prices(instrument_id, client_id, params);
1300 Ok(())
1301 }
1302
1303 #[pyo3(name = "unsubscribe_instrument_status")]
1304 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1305 fn py_unsubscribe_instrument_status(
1306 &mut self,
1307 instrument_id: InstrumentId,
1308 client_id: Option<ClientId>,
1309 params: Option<IndexMap<String, String>>,
1310 ) -> PyResult<()> {
1311 self.unsubscribe_instrument_status(instrument_id, client_id, params);
1312 Ok(())
1313 }
1314
1315 #[pyo3(name = "unsubscribe_instrument_close")]
1316 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1317 fn py_unsubscribe_instrument_close(
1318 &mut self,
1319 instrument_id: InstrumentId,
1320 client_id: Option<ClientId>,
1321 params: Option<IndexMap<String, String>>,
1322 ) -> PyResult<()> {
1323 self.unsubscribe_instrument_close(instrument_id, client_id, params);
1324 Ok(())
1325 }
1326
1327 #[cfg(feature = "defi")]
1328 #[pyo3(name = "unsubscribe_blocks")]
1329 #[pyo3(signature = (chain, client_id=None, params=None))]
1330 fn py_unsubscribe_blocks(
1331 &mut self,
1332 chain: Blockchain,
1333 client_id: Option<ClientId>,
1334 params: Option<IndexMap<String, String>>,
1335 ) -> PyResult<()> {
1336 self.unsubscribe_blocks(chain, client_id, params);
1337 Ok(())
1338 }
1339
1340 #[cfg(feature = "defi")]
1341 #[pyo3(name = "unsubscribe_pool")]
1342 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1343 fn py_unsubscribe_pool(
1344 &mut self,
1345 instrument_id: InstrumentId,
1346 client_id: Option<ClientId>,
1347 params: Option<IndexMap<String, String>>,
1348 ) -> PyResult<()> {
1349 self.unsubscribe_pool(instrument_id, client_id, params);
1350 Ok(())
1351 }
1352
1353 #[cfg(feature = "defi")]
1354 #[pyo3(name = "unsubscribe_pool_swaps")]
1355 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1356 fn py_unsubscribe_pool_swaps(
1357 &mut self,
1358 instrument_id: InstrumentId,
1359 client_id: Option<ClientId>,
1360 params: Option<IndexMap<String, String>>,
1361 ) -> PyResult<()> {
1362 self.unsubscribe_pool_swaps(instrument_id, client_id, params);
1363 Ok(())
1364 }
1365
1366 #[cfg(feature = "defi")]
1367 #[pyo3(name = "unsubscribe_pool_liquidity_updates")]
1368 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1369 fn py_unsubscribe_pool_liquidity_updates(
1370 &mut self,
1371 instrument_id: InstrumentId,
1372 client_id: Option<ClientId>,
1373 params: Option<IndexMap<String, String>>,
1374 ) -> PyResult<()> {
1375 self.unsubscribe_pool_liquidity_updates(instrument_id, client_id, params);
1376 Ok(())
1377 }
1378
1379 #[allow(unused_variables)]
1380 #[pyo3(name = "on_historical_data")]
1381 fn py_on_historical_data(&mut self, data: PyObject) -> PyResult<()> {
1382 Ok(())
1384 }
1385
1386 #[allow(unused_variables)]
1387 #[pyo3(name = "on_historical_quotes")]
1388 fn py_on_historical_quotes(&mut self, quotes: Vec<QuoteTick>) -> PyResult<()> {
1389 Ok(())
1391 }
1392
1393 #[allow(unused_variables)]
1394 #[pyo3(name = "on_historical_trades")]
1395 fn py_on_historical_trades(&mut self, trades: Vec<TradeTick>) -> PyResult<()> {
1396 Ok(())
1398 }
1399
1400 #[allow(unused_variables)]
1401 #[pyo3(name = "on_historical_bars")]
1402 fn py_on_historical_bars(&mut self, bars: Vec<Bar>) -> PyResult<()> {
1403 Ok(())
1405 }
1406
1407 #[allow(unused_variables)]
1408 #[pyo3(name = "on_historical_mark_prices")]
1409 fn py_on_historical_mark_prices(&mut self, mark_prices: Vec<MarkPriceUpdate>) -> PyResult<()> {
1410 Ok(())
1412 }
1413
1414 #[allow(unused_variables)]
1415 #[pyo3(name = "on_historical_index_prices")]
1416 fn py_on_historical_index_prices(
1417 &mut self,
1418 index_prices: Vec<IndexPriceUpdate>,
1419 ) -> PyResult<()> {
1420 Ok(())
1422 }
1423}
1424
1425#[cfg(test)]
1429mod tests {
1430 use std::{
1431 any::Any,
1432 cell::RefCell,
1433 collections::HashMap,
1434 ops::{Deref, DerefMut},
1435 rc::Rc,
1436 str::FromStr,
1437 sync::{Arc, Mutex},
1438 };
1439
1440 use nautilus_core::{UUID4, UnixNanos};
1441 #[cfg(feature = "defi")]
1442 use nautilus_model::defi::{
1443 AmmType, Block, Blockchain, Chain, Dex, DexType, Pool, PoolLiquidityUpdate, PoolSwap, Token,
1444 };
1445 use nautilus_model::{
1446 data::{
1447 Bar, BarType, DataType, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
1448 OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
1449 },
1450 enums::BookType,
1451 identifiers::{ClientId, TraderId, Venue},
1452 instruments::{CurrencyPair, InstrumentAny, stubs::audusd_sim},
1453 orderbook::OrderBook,
1454 types::{Price, Quantity},
1455 };
1456 use rstest::{fixture, rstest};
1457 use ustr::Ustr;
1458
1459 use super::PyDataActor;
1460 use crate::{
1461 actor::{DataActor, data_actor::DataActorCore},
1462 cache::Cache,
1463 clock::TestClock,
1464 component::Component,
1465 enums::ComponentState,
1466 runner::{SyncDataCommandSender, set_data_cmd_sender},
1467 signal::Signal,
1468 timer::TimeEvent,
1469 };
1470
1471 #[fixture]
1472 fn clock() -> Rc<RefCell<TestClock>> {
1473 Rc::new(RefCell::new(TestClock::new()))
1474 }
1475
1476 #[fixture]
1477 fn cache() -> Rc<RefCell<Cache>> {
1478 Rc::new(RefCell::new(Cache::new(None, None)))
1479 }
1480
1481 #[fixture]
1482 fn trader_id() -> TraderId {
1483 TraderId::from("TRADER-001")
1484 }
1485
1486 #[fixture]
1487 fn client_id() -> ClientId {
1488 ClientId::new("TestClient")
1489 }
1490
1491 #[fixture]
1492 fn venue() -> Venue {
1493 Venue::from("SIM")
1494 }
1495
1496 #[fixture]
1497 fn data_type() -> DataType {
1498 DataType::new("TestData", None)
1499 }
1500
1501 #[fixture]
1502 fn bar_type(audusd_sim: CurrencyPair) -> BarType {
1503 BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap()
1504 }
1505
1506 fn create_unregistered_actor() -> PyDataActor {
1507 PyDataActor::new(None)
1508 }
1509
1510 fn create_registered_actor(
1511 clock: Rc<RefCell<TestClock>>,
1512 cache: Rc<RefCell<Cache>>,
1513 trader_id: TraderId,
1514 ) -> PyDataActor {
1515 let sender = SyncDataCommandSender;
1517 set_data_cmd_sender(Arc::new(sender));
1518
1519 let mut actor = PyDataActor::new(None);
1520 actor.register(trader_id, clock, cache).unwrap();
1521 actor
1522 }
1523
1524 #[rstest]
1525 fn test_new_actor_creation() {
1526 pyo3::prepare_freethreaded_python();
1527
1528 let actor = PyDataActor::new(None);
1529 assert!(actor.trader_id().is_none());
1530 }
1531
1532 #[rstest]
1533 fn test_clock_access_before_registration_raises_error() {
1534 pyo3::prepare_freethreaded_python();
1535
1536 let actor = PyDataActor::new(None);
1537
1538 let result = actor.py_clock();
1540 assert!(result.is_err());
1541
1542 let error = result.unwrap_err();
1543 pyo3::Python::with_gil(|py| {
1544 assert!(error.is_instance_of::<pyo3::exceptions::PyRuntimeError>(py));
1545 });
1546
1547 let error_msg = error.to_string();
1548 assert!(
1549 error_msg.contains("Actor must be registered with a trader before accessing clock")
1550 );
1551 }
1552
1553 #[rstest]
1554 fn test_unregistered_actor_methods_work() {
1555 pyo3::prepare_freethreaded_python();
1556
1557 let actor = create_unregistered_actor();
1558
1559 assert!(!actor.py_is_ready());
1560 assert!(!actor.py_is_running());
1561 assert!(!actor.py_is_stopped());
1562 assert!(!actor.py_is_disposed());
1563 assert!(!actor.py_is_degraded());
1564 assert!(!actor.py_is_faulted());
1565
1566 assert_eq!(actor.trader_id(), None);
1568 }
1569
1570 #[rstest]
1571 fn test_registration_success(
1572 clock: Rc<RefCell<TestClock>>,
1573 cache: Rc<RefCell<Cache>>,
1574 trader_id: TraderId,
1575 ) {
1576 pyo3::prepare_freethreaded_python();
1577
1578 let mut actor = create_unregistered_actor();
1579 actor.register(trader_id, clock, cache).unwrap();
1580 assert!(actor.trader_id().is_some());
1581 assert_eq!(actor.trader_id().unwrap(), trader_id);
1582 }
1583
1584 #[rstest]
1585 fn test_registered_actor_basic_properties(
1586 clock: Rc<RefCell<TestClock>>,
1587 cache: Rc<RefCell<Cache>>,
1588 trader_id: TraderId,
1589 ) {
1590 pyo3::prepare_freethreaded_python();
1591
1592 let actor = create_registered_actor(clock, cache, trader_id);
1593
1594 assert_eq!(actor.state(), ComponentState::Ready);
1595 assert_eq!(actor.trader_id(), Some(TraderId::from("TRADER-001")));
1596 assert!(actor.py_is_ready());
1597 assert!(!actor.py_is_running());
1598 assert!(!actor.py_is_stopped());
1599 assert!(!actor.py_is_disposed());
1600 assert!(!actor.py_is_degraded());
1601 assert!(!actor.py_is_faulted());
1602 }
1603
1604 #[rstest]
1605 fn test_basic_subscription_methods_compile(
1606 clock: Rc<RefCell<TestClock>>,
1607 cache: Rc<RefCell<Cache>>,
1608 trader_id: TraderId,
1609 data_type: DataType,
1610 client_id: ClientId,
1611 audusd_sim: CurrencyPair,
1612 ) {
1613 pyo3::prepare_freethreaded_python();
1614
1615 let mut actor = create_registered_actor(clock, cache, trader_id);
1616
1617 let _ = actor.py_subscribe_data(data_type.clone(), Some(client_id), None);
1618 let _ = actor.py_subscribe_quotes(audusd_sim.id, Some(client_id), None);
1619 let _ = actor.py_unsubscribe_data(data_type, Some(client_id), None);
1620 let _ = actor.py_unsubscribe_quotes(audusd_sim.id, Some(client_id), None);
1621 }
1622
1623 #[ignore] #[rstest]
1625 fn test_lifecycle_methods_pass_through(
1626 clock: Rc<RefCell<TestClock>>,
1627 cache: Rc<RefCell<Cache>>,
1628 trader_id: TraderId,
1629 ) {
1630 let mut actor = create_registered_actor(clock, cache, trader_id);
1631
1632 assert!(actor.py_start().is_ok());
1633 assert!(actor.py_stop().is_ok());
1634 assert!(actor.py_dispose().is_ok());
1635 }
1636
1637 #[rstest]
1638 fn test_shutdown_system_passes_through(
1639 clock: Rc<RefCell<TestClock>>,
1640 cache: Rc<RefCell<Cache>>,
1641 trader_id: TraderId,
1642 ) {
1643 pyo3::prepare_freethreaded_python();
1644
1645 let actor = create_registered_actor(clock, cache, trader_id);
1646
1647 assert!(
1648 actor
1649 .py_shutdown_system(Some("Test shutdown".to_string()))
1650 .is_ok()
1651 );
1652 assert!(actor.py_shutdown_system(None).is_ok());
1653 }
1654
1655 #[rstest]
1656 fn test_book_at_interval_invalid_interval_ms(
1657 clock: Rc<RefCell<TestClock>>,
1658 cache: Rc<RefCell<Cache>>,
1659 trader_id: TraderId,
1660 audusd_sim: CurrencyPair,
1661 ) {
1662 pyo3::prepare_freethreaded_python();
1663
1664 let mut actor = create_registered_actor(clock, cache, trader_id);
1665
1666 let result = actor.py_subscribe_book_at_interval(
1667 audusd_sim.id,
1668 BookType::L2_MBP,
1669 0,
1670 None,
1671 None,
1672 None,
1673 );
1674 assert!(result.is_err());
1675 assert_eq!(
1676 result.unwrap_err().to_string(),
1677 "ValueError: interval_ms must be > 0"
1678 );
1679
1680 let result = actor.py_unsubscribe_book_at_interval(audusd_sim.id, 0, None, None);
1681 assert!(result.is_err());
1682 assert_eq!(
1683 result.unwrap_err().to_string(),
1684 "ValueError: interval_ms must be > 0"
1685 );
1686 }
1687
1688 #[rstest]
1689 fn test_request_methods_signatures_exist() {
1690 pyo3::prepare_freethreaded_python();
1691
1692 let actor = create_unregistered_actor();
1693 assert!(actor.trader_id().is_none());
1694 }
1695
1696 #[rstest]
1697 fn test_data_actor_trait_implementation(
1698 clock: Rc<RefCell<TestClock>>,
1699 cache: Rc<RefCell<Cache>>,
1700 trader_id: TraderId,
1701 ) {
1702 pyo3::prepare_freethreaded_python();
1703
1704 let actor = create_registered_actor(clock, cache, trader_id);
1705 let state = actor.state();
1706 assert_eq!(state, ComponentState::Ready);
1707 }
1708
1709 static CALL_TRACKER: std::sync::LazyLock<Arc<Mutex<HashMap<String, i32>>>> =
1713 std::sync::LazyLock::new(|| Arc::new(Mutex::new(HashMap::new())));
1714
1715 #[derive(Debug)]
1717 struct TestDataActor {
1718 inner: PyDataActor,
1719 }
1720
1721 impl TestDataActor {
1722 fn new() -> Self {
1723 Self {
1724 inner: PyDataActor::new(None),
1725 }
1726 }
1727
1728 fn track_call(&self, handler_name: &str) {
1729 let mut tracker = CALL_TRACKER.lock().unwrap();
1730 *tracker.entry(handler_name.to_string()).or_insert(0) += 1;
1731 }
1732
1733 fn get_call_count(&self, handler_name: &str) -> i32 {
1734 let tracker = CALL_TRACKER.lock().unwrap();
1735 tracker.get(handler_name).copied().unwrap_or(0)
1736 }
1737
1738 fn reset_tracker(&self) {
1739 let mut tracker = CALL_TRACKER.lock().unwrap();
1740 tracker.clear();
1741 }
1742 }
1743
1744 impl Deref for TestDataActor {
1745 type Target = DataActorCore;
1746 fn deref(&self) -> &Self::Target {
1747 &self.inner.core
1748 }
1749 }
1750
1751 impl DerefMut for TestDataActor {
1752 fn deref_mut(&mut self) -> &mut Self::Target {
1753 &mut self.inner.core
1754 }
1755 }
1756
1757 impl DataActor for TestDataActor {
1758 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
1759 self.track_call("on_time_event");
1760 self.inner.on_time_event(event)
1761 }
1762
1763 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
1764 self.track_call("on_data");
1765 self.inner.on_data(data)
1766 }
1767
1768 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
1769 self.track_call("on_signal");
1770 self.inner.on_signal(signal)
1771 }
1772
1773 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
1774 self.track_call("on_instrument");
1775 self.inner.on_instrument(instrument)
1776 }
1777
1778 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
1779 self.track_call("on_quote");
1780 self.inner.on_quote(quote)
1781 }
1782
1783 fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
1784 self.track_call("on_trade");
1785 self.inner.on_trade(trade)
1786 }
1787
1788 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
1789 self.track_call("on_bar");
1790 self.inner.on_bar(bar)
1791 }
1792
1793 fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
1794 self.track_call("on_book");
1795 self.inner.on_book(book)
1796 }
1797
1798 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
1799 self.track_call("on_book_deltas");
1800 self.inner.on_book_deltas(deltas)
1801 }
1802
1803 fn on_mark_price(&mut self, update: &MarkPriceUpdate) -> anyhow::Result<()> {
1804 self.track_call("on_mark_price");
1805 self.inner.on_mark_price(update)
1806 }
1807
1808 fn on_index_price(&mut self, update: &IndexPriceUpdate) -> anyhow::Result<()> {
1809 self.track_call("on_index_price");
1810 self.inner.on_index_price(update)
1811 }
1812
1813 fn on_instrument_status(&mut self, update: &InstrumentStatus) -> anyhow::Result<()> {
1814 self.track_call("on_instrument_status");
1815 self.inner.on_instrument_status(update)
1816 }
1817
1818 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
1819 self.track_call("on_instrument_close");
1820 self.inner.on_instrument_close(update)
1821 }
1822
1823 #[cfg(feature = "defi")]
1824 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
1825 self.track_call("on_block");
1826 self.inner.on_block(block)
1827 }
1828
1829 #[cfg(feature = "defi")]
1830 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
1831 self.track_call("on_pool");
1832 self.inner.on_pool(pool)
1833 }
1834
1835 #[cfg(feature = "defi")]
1836 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
1837 self.track_call("on_pool_swap");
1838 self.inner.on_pool_swap(swap)
1839 }
1840
1841 #[cfg(feature = "defi")]
1842 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
1843 self.track_call("on_pool_liquidity_update");
1844 self.inner.on_pool_liquidity_update(update)
1845 }
1846 }
1847
1848 #[rstest]
1849 fn test_python_on_signal_handler(
1850 clock: Rc<RefCell<TestClock>>,
1851 cache: Rc<RefCell<Cache>>,
1852 trader_id: TraderId,
1853 ) {
1854 pyo3::prepare_freethreaded_python();
1855
1856 let mut test_actor = TestDataActor::new();
1857 test_actor.reset_tracker();
1858 test_actor
1859 .register(trader_id, clock.clone(), cache.clone())
1860 .unwrap();
1861
1862 let signal = Signal::new(
1863 Ustr::from("test_signal"),
1864 "1.0".to_string(),
1865 UnixNanos::default(),
1866 UnixNanos::default(),
1867 );
1868
1869 assert!(test_actor.on_signal(&signal).is_ok());
1870 assert_eq!(test_actor.get_call_count("on_signal"), 1);
1871 }
1872
1873 #[rstest]
1874 fn test_python_on_data_handler(
1875 clock: Rc<RefCell<TestClock>>,
1876 cache: Rc<RefCell<Cache>>,
1877 trader_id: TraderId,
1878 ) {
1879 pyo3::prepare_freethreaded_python();
1880
1881 let mut test_actor = TestDataActor::new();
1882 test_actor.reset_tracker();
1883 test_actor
1884 .register(trader_id, clock.clone(), cache.clone())
1885 .unwrap();
1886
1887 assert!(test_actor.on_data(&()).is_ok());
1888 assert_eq!(test_actor.get_call_count("on_data"), 1);
1889 }
1890
1891 #[rstest]
1892 fn test_python_on_time_event_handler(
1893 clock: Rc<RefCell<TestClock>>,
1894 cache: Rc<RefCell<Cache>>,
1895 trader_id: TraderId,
1896 ) {
1897 pyo3::prepare_freethreaded_python();
1898
1899 let mut test_actor = TestDataActor::new();
1900 test_actor.reset_tracker();
1901 test_actor
1902 .register(trader_id, clock.clone(), cache.clone())
1903 .unwrap();
1904
1905 let time_event = TimeEvent::new(
1906 Ustr::from("test_timer"),
1907 UUID4::new(),
1908 UnixNanos::default(),
1909 UnixNanos::default(),
1910 );
1911
1912 assert!(test_actor.on_time_event(&time_event).is_ok());
1913 assert_eq!(test_actor.get_call_count("on_time_event"), 1);
1914 }
1915
1916 #[rstest]
1917 fn test_python_on_instrument_handler(
1918 clock: Rc<RefCell<TestClock>>,
1919 cache: Rc<RefCell<Cache>>,
1920 trader_id: TraderId,
1921 audusd_sim: CurrencyPair,
1922 ) {
1923 pyo3::prepare_freethreaded_python();
1924
1925 let mut rust_actor = PyDataActor::new(None);
1926 rust_actor
1927 .register(trader_id, clock.clone(), cache.clone())
1928 .unwrap();
1929
1930 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1931
1932 assert!(rust_actor.on_instrument(&instrument).is_ok());
1933 }
1934
1935 #[rstest]
1936 fn test_python_on_quote_handler(
1937 clock: Rc<RefCell<TestClock>>,
1938 cache: Rc<RefCell<Cache>>,
1939 trader_id: TraderId,
1940 audusd_sim: CurrencyPair,
1941 ) {
1942 pyo3::prepare_freethreaded_python();
1943
1944 let mut rust_actor = PyDataActor::new(None);
1945 rust_actor
1946 .register(trader_id, clock.clone(), cache.clone())
1947 .unwrap();
1948
1949 let quote = QuoteTick::new(
1950 audusd_sim.id,
1951 Price::from("1.0000"),
1952 Price::from("1.0001"),
1953 Quantity::from("100000"),
1954 Quantity::from("100000"),
1955 UnixNanos::default(),
1956 UnixNanos::default(),
1957 );
1958
1959 assert!(rust_actor.on_quote("e).is_ok());
1960 }
1961
1962 #[rstest]
1963 fn test_python_on_trade_handler(
1964 clock: Rc<RefCell<TestClock>>,
1965 cache: Rc<RefCell<Cache>>,
1966 trader_id: TraderId,
1967 audusd_sim: CurrencyPair,
1968 ) {
1969 pyo3::prepare_freethreaded_python();
1970
1971 let mut rust_actor = PyDataActor::new(None);
1972 rust_actor
1973 .register(trader_id, clock.clone(), cache.clone())
1974 .unwrap();
1975
1976 let trade = TradeTick::new(
1977 audusd_sim.id,
1978 Price::from("1.0000"),
1979 Quantity::from("100000"),
1980 nautilus_model::enums::AggressorSide::Buyer,
1981 "T123".to_string().into(),
1982 UnixNanos::default(),
1983 UnixNanos::default(),
1984 );
1985
1986 assert!(rust_actor.on_trade(&trade).is_ok());
1987 }
1988
1989 #[rstest]
1990 fn test_python_on_bar_handler(
1991 clock: Rc<RefCell<TestClock>>,
1992 cache: Rc<RefCell<Cache>>,
1993 trader_id: TraderId,
1994 audusd_sim: CurrencyPair,
1995 ) {
1996 pyo3::prepare_freethreaded_python();
1997
1998 let mut rust_actor = PyDataActor::new(None);
1999 rust_actor
2000 .register(trader_id, clock.clone(), cache.clone())
2001 .unwrap();
2002
2003 let bar_type =
2004 BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap();
2005 let bar = Bar::new(
2006 bar_type,
2007 Price::from("1.0000"),
2008 Price::from("1.0001"),
2009 Price::from("0.9999"),
2010 Price::from("1.0000"),
2011 Quantity::from("100000"),
2012 UnixNanos::default(),
2013 UnixNanos::default(),
2014 );
2015
2016 assert!(rust_actor.on_bar(&bar).is_ok());
2017 }
2018
2019 #[rstest]
2020 fn test_python_on_book_handler(
2021 clock: Rc<RefCell<TestClock>>,
2022 cache: Rc<RefCell<Cache>>,
2023 trader_id: TraderId,
2024 audusd_sim: CurrencyPair,
2025 ) {
2026 pyo3::prepare_freethreaded_python();
2027
2028 let mut rust_actor = PyDataActor::new(None);
2029 rust_actor
2030 .register(trader_id, clock.clone(), cache.clone())
2031 .unwrap();
2032
2033 let book = OrderBook::new(audusd_sim.id, BookType::L2_MBP);
2034 assert!(rust_actor.on_book(&book).is_ok());
2035 }
2036
2037 #[rstest]
2038 fn test_python_on_book_deltas_handler(
2039 clock: Rc<RefCell<TestClock>>,
2040 cache: Rc<RefCell<Cache>>,
2041 trader_id: TraderId,
2042 audusd_sim: CurrencyPair,
2043 ) {
2044 pyo3::prepare_freethreaded_python();
2045
2046 let mut rust_actor = PyDataActor::new(None);
2047 rust_actor
2048 .register(trader_id, clock.clone(), cache.clone())
2049 .unwrap();
2050
2051 let delta =
2052 OrderBookDelta::clear(audusd_sim.id, 0, UnixNanos::default(), UnixNanos::default());
2053 let deltas = OrderBookDeltas::new(audusd_sim.id, vec![delta]);
2054
2055 assert!(rust_actor.on_book_deltas(&deltas).is_ok());
2056 }
2057
2058 #[rstest]
2059 fn test_python_on_mark_price_handler(
2060 clock: Rc<RefCell<TestClock>>,
2061 cache: Rc<RefCell<Cache>>,
2062 trader_id: TraderId,
2063 audusd_sim: CurrencyPair,
2064 ) {
2065 pyo3::prepare_freethreaded_python();
2066
2067 let mut rust_actor = PyDataActor::new(None);
2068 rust_actor
2069 .register(trader_id, clock.clone(), cache.clone())
2070 .unwrap();
2071
2072 let mark_price = MarkPriceUpdate::new(
2073 audusd_sim.id,
2074 Price::from("1.0000"),
2075 UnixNanos::default(),
2076 UnixNanos::default(),
2077 );
2078
2079 assert!(rust_actor.on_mark_price(&mark_price).is_ok());
2080 }
2081
2082 #[rstest]
2083 fn test_python_on_index_price_handler(
2084 clock: Rc<RefCell<TestClock>>,
2085 cache: Rc<RefCell<Cache>>,
2086 trader_id: TraderId,
2087 audusd_sim: CurrencyPair,
2088 ) {
2089 pyo3::prepare_freethreaded_python();
2090
2091 let mut rust_actor = PyDataActor::new(None);
2092 rust_actor
2093 .register(trader_id, clock.clone(), cache.clone())
2094 .unwrap();
2095
2096 let index_price = IndexPriceUpdate::new(
2097 audusd_sim.id,
2098 Price::from("1.0000"),
2099 UnixNanos::default(),
2100 UnixNanos::default(),
2101 );
2102
2103 assert!(rust_actor.on_index_price(&index_price).is_ok());
2104 }
2105
2106 #[rstest]
2107 fn test_python_on_instrument_status_handler(
2108 clock: Rc<RefCell<TestClock>>,
2109 cache: Rc<RefCell<Cache>>,
2110 trader_id: TraderId,
2111 audusd_sim: CurrencyPair,
2112 ) {
2113 pyo3::prepare_freethreaded_python();
2114
2115 let mut rust_actor = PyDataActor::new(None);
2116 rust_actor
2117 .register(trader_id, clock.clone(), cache.clone())
2118 .unwrap();
2119
2120 let status = InstrumentStatus::new(
2121 audusd_sim.id,
2122 nautilus_model::enums::MarketStatusAction::Trading,
2123 UnixNanos::default(),
2124 UnixNanos::default(),
2125 None,
2126 None,
2127 None,
2128 None,
2129 None,
2130 );
2131
2132 assert!(rust_actor.on_instrument_status(&status).is_ok());
2133 }
2134
2135 #[rstest]
2136 fn test_python_on_instrument_close_handler(
2137 clock: Rc<RefCell<TestClock>>,
2138 cache: Rc<RefCell<Cache>>,
2139 trader_id: TraderId,
2140 audusd_sim: CurrencyPair,
2141 ) {
2142 pyo3::prepare_freethreaded_python();
2143
2144 let mut rust_actor = PyDataActor::new(None);
2145 rust_actor
2146 .register(trader_id, clock.clone(), cache.clone())
2147 .unwrap();
2148
2149 let close = InstrumentClose::new(
2150 audusd_sim.id,
2151 Price::from("1.0000"),
2152 nautilus_model::enums::InstrumentCloseType::EndOfSession,
2153 UnixNanos::default(),
2154 UnixNanos::default(),
2155 );
2156
2157 assert!(rust_actor.on_instrument_close(&close).is_ok());
2158 }
2159
2160 #[cfg(feature = "defi")]
2161 #[rstest]
2162 fn test_python_on_block_handler(
2163 clock: Rc<RefCell<TestClock>>,
2164 cache: Rc<RefCell<Cache>>,
2165 trader_id: TraderId,
2166 ) {
2167 pyo3::prepare_freethreaded_python();
2168
2169 let mut test_actor = TestDataActor::new();
2170 test_actor.reset_tracker();
2171 test_actor
2172 .register(trader_id, clock.clone(), cache.clone())
2173 .unwrap();
2174
2175 let block = Block::new(
2176 "0x1234567890abcdef".to_string(),
2177 "0xabcdef1234567890".to_string(),
2178 12345,
2179 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0".into(),
2180 21000,
2181 20000,
2182 UnixNanos::default(),
2183 Some(Blockchain::Ethereum),
2184 );
2185
2186 assert!(test_actor.on_block(&block).is_ok());
2187 assert_eq!(test_actor.get_call_count("on_block"), 1);
2188 }
2189
2190 #[cfg(feature = "defi")]
2191 #[rstest]
2192 fn test_python_on_pool_swap_handler(
2193 clock: Rc<RefCell<TestClock>>,
2194 cache: Rc<RefCell<Cache>>,
2195 trader_id: TraderId,
2196 ) {
2197 pyo3::prepare_freethreaded_python();
2198
2199 let mut rust_actor = PyDataActor::new(None);
2200 rust_actor
2201 .register(trader_id, clock.clone(), cache.clone())
2202 .unwrap();
2203
2204 let chain = Arc::new(Chain::new(Blockchain::Ethereum, 1));
2205 let dex = Arc::new(Dex::new(
2206 Chain::new(Blockchain::Ethereum, 1),
2207 DexType::UniswapV3,
2208 "0x1f98431c8ad98523631ae4a59f267346ea31f984",
2209 0,
2210 AmmType::CLAMM,
2211 "PoolCreated",
2212 "Swap",
2213 "Mint",
2214 "Burn",
2215 ));
2216 let token0 = Token::new(
2217 chain.clone(),
2218 "0xa0b86a33e6441c8c06dd7b111a8c4e82e2b2a5e1"
2219 .parse()
2220 .unwrap(),
2221 "USDC".into(),
2222 "USD Coin".into(),
2223 6,
2224 );
2225 let token1 = Token::new(
2226 chain.clone(),
2227 "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
2228 .parse()
2229 .unwrap(),
2230 "WETH".into(),
2231 "Wrapped Ether".into(),
2232 18,
2233 );
2234 let pool = Arc::new(Pool::new(
2235 chain.clone(),
2236 dex.clone(),
2237 "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
2238 .parse()
2239 .unwrap(),
2240 12345,
2241 token0,
2242 token1,
2243 Some(500),
2244 Some(10),
2245 UnixNanos::default(),
2246 ));
2247
2248 let swap = PoolSwap::new(
2249 chain.clone(),
2250 dex.clone(),
2251 pool.instrument_id,
2252 pool.address,
2253 12345,
2254 "0xabc123".to_string(),
2255 0,
2256 0,
2257 None,
2258 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
2259 .parse()
2260 .unwrap(),
2261 nautilus_model::enums::OrderSide::Buy,
2262 Quantity::from("1000"),
2263 Price::from("1.0"),
2264 );
2265
2266 assert!(rust_actor.on_pool_swap(&swap).is_ok());
2267 }
2268
2269 #[cfg(feature = "defi")]
2270 #[rstest]
2271 fn test_python_on_pool_liquidity_update_handler(
2272 clock: Rc<RefCell<TestClock>>,
2273 cache: Rc<RefCell<Cache>>,
2274 trader_id: TraderId,
2275 ) {
2276 pyo3::prepare_freethreaded_python();
2277
2278 let mut rust_actor = PyDataActor::new(None);
2279 rust_actor
2280 .register(trader_id, clock.clone(), cache.clone())
2281 .unwrap();
2282
2283 let block = Block::new(
2284 "0x1234567890abcdef".to_string(),
2285 "0xabcdef1234567890".to_string(),
2286 12345,
2287 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0".into(),
2288 21000,
2289 20000,
2290 UnixNanos::default(),
2291 Some(Blockchain::Ethereum),
2292 );
2293
2294 assert!(rust_actor.on_block(&block).is_ok());
2298 }
2299}