1use std::{
19 any::Any,
20 cell::RefCell,
21 collections::HashMap,
22 num::NonZeroUsize,
23 ops::{Deref, DerefMut},
24 rc::Rc,
25};
26
27use indexmap::IndexMap;
28use nautilus_core::{
29 nanos::UnixNanos,
30 python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err},
31};
32#[cfg(feature = "defi")]
33use nautilus_model::defi::{
34 Block, Blockchain, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap,
35};
36use nautilus_model::{
37 data::{
38 Bar, BarType, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
39 MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
40 },
41 enums::BookType,
42 identifiers::{ActorId, ClientId, InstrumentId, TraderId, Venue},
43 instruments::InstrumentAny,
44 orderbook::OrderBook,
45 python::instruments::instrument_any_to_pyobject,
46};
47use pyo3::{exceptions::PyValueError, prelude::*, types::PyDict};
48
49use crate::{
50 actor::{
51 DataActor,
52 data_actor::{DataActorConfig, DataActorCore, ImportableActorConfig},
53 registry::try_get_actor_unchecked,
54 },
55 cache::Cache,
56 clock::Clock,
57 component::Component,
58 enums::ComponentState,
59 python::{cache::PyCache, clock::PyClock, logging::PyLogger},
60 signal::Signal,
61 timer::{TimeEvent, TimeEventCallback},
62};
63
64#[pyo3::pymethods]
65impl DataActorConfig {
66 #[new]
67 #[pyo3(signature = (actor_id=None, log_events=true, log_commands=true))]
68 fn py_new(actor_id: Option<ActorId>, log_events: bool, log_commands: bool) -> Self {
69 Self {
70 actor_id,
71 log_events,
72 log_commands,
73 }
74 }
75}
76
77#[pyo3::pymethods]
78impl ImportableActorConfig {
79 #[new]
80 fn py_new(actor_path: String, config_path: String, config: Py<PyDict>) -> PyResult<Self> {
81 let json_config = Python::attach(|py| -> PyResult<HashMap<String, serde_json::Value>> {
82 let json_str: String = PyModule::import(py, "json")?
83 .call_method("dumps", (config.bind(py),), None)?
84 .extract()?;
85
86 let json_value: serde_json::Value = serde_json::from_str(&json_str)
87 .map_err(|e| PyErr::new::<PyValueError, _>(e.to_string()))?;
88
89 if let serde_json::Value::Object(map) = json_value {
90 Ok(map.into_iter().collect())
91 } else {
92 Err(PyErr::new::<PyValueError, _>("Config must be a dictionary"))
93 }
94 })?;
95
96 Ok(Self {
97 actor_path,
98 config_path,
99 config: json_config,
100 })
101 }
102
103 #[getter]
104 fn actor_path(&self) -> &String {
105 &self.actor_path
106 }
107
108 #[getter]
109 fn config_path(&self) -> &String {
110 &self.config_path
111 }
112
113 #[getter]
114 fn config(&self, py: Python<'_>) -> PyResult<Py<PyDict>> {
115 let py_dict = PyDict::new(py);
117 for (key, value) in &self.config {
118 let json_str = serde_json::to_string(value)
120 .map_err(|e| PyErr::new::<PyValueError, _>(e.to_string()))?;
121 let py_value = PyModule::import(py, "json")?.call_method("loads", (json_str,), None)?;
122 py_dict.set_item(key, py_value)?;
123 }
124 Ok(py_dict.unbind())
125 }
126}
127
128#[allow(non_camel_case_types)]
129#[pyo3::pyclass(
130 module = "nautilus_trader.common",
131 name = "DataActor",
132 unsendable,
133 subclass
134)]
135#[derive(Debug)]
136pub struct PyDataActor {
137 core: DataActorCore,
138 py_self: Option<Py<PyAny>>,
139 clock: PyClock,
140 logger: PyLogger,
141}
142
143impl Deref for PyDataActor {
144 type Target = DataActorCore;
145
146 fn deref(&self) -> &Self::Target {
147 &self.core
148 }
149}
150
151impl DerefMut for PyDataActor {
152 fn deref_mut(&mut self) -> &mut Self::Target {
153 &mut self.core
154 }
155}
156
157impl PyDataActor {
158 pub fn new(config: Option<DataActorConfig>) -> Self {
160 let config = config.unwrap_or_default();
161 let core = DataActorCore::new(config);
162 let clock = PyClock::new_test(); let logger = PyLogger::new(core.actor_id().as_str());
164
165 Self {
166 core,
167 py_self: None,
168 clock,
169 logger,
170 }
171 }
172
173 pub fn set_python_instance(&mut self, py_obj: Py<PyAny>) {
180 self.py_self = Some(py_obj);
181 }
182
183 pub fn set_actor_id(&mut self, actor_id: ActorId) {
192 self.core.config.actor_id = Some(actor_id);
193 self.core.actor_id = actor_id;
194 }
195
196 pub fn set_log_events(&mut self, log_events: bool) {
198 self.core.config.log_events = log_events;
199 }
200
201 pub fn set_log_commands(&mut self, log_commands: bool) {
203 self.core.config.log_commands = log_commands;
204 }
205 pub fn mem_address(&self) -> String {
207 self.core.mem_address()
208 }
209
210 pub fn is_registered(&self) -> bool {
212 self.core.is_registered()
213 }
214
215 pub fn register(
221 &mut self,
222 trader_id: TraderId,
223 clock: Rc<RefCell<dyn Clock>>,
224 cache: Rc<RefCell<Cache>>,
225 ) -> anyhow::Result<()> {
226 self.core.register(trader_id, clock, cache)?;
227
228 self.clock = PyClock::from_rc(self.core.clock_rc());
229
230 let actor_id = self.actor_id().inner();
232 let callback = TimeEventCallback::from(move |event: TimeEvent| {
233 if let Some(actor) = try_get_actor_unchecked::<Self>(&actor_id) {
234 if let Err(e) = actor.on_time_event(&event) {
235 log::error!("Python time event handler failed for actor {actor_id}: {e}");
236 }
237 } else {
238 log::error!("Actor {actor_id} not found for time event handling");
239 }
240 });
241
242 self.clock.inner_mut().register_default_handler(callback);
243
244 self.initialize()
245 }
246}
247
248impl DataActor for PyDataActor {
249 fn on_start(&mut self) -> anyhow::Result<()> {
250 self.py_on_start()
251 .map_err(|e| anyhow::anyhow!("Python on_start failed: {e}"))
252 }
253
254 fn on_stop(&mut self) -> anyhow::Result<()> {
255 self.py_on_stop()
256 .map_err(|e| anyhow::anyhow!("Python on_stop failed: {e}"))
257 }
258
259 fn on_resume(&mut self) -> anyhow::Result<()> {
260 self.py_on_resume()
261 .map_err(|e| anyhow::anyhow!("Python on_resume failed: {e}"))
262 }
263
264 fn on_reset(&mut self) -> anyhow::Result<()> {
265 self.py_on_reset()
266 .map_err(|e| anyhow::anyhow!("Python on_reset failed: {e}"))
267 }
268
269 fn on_dispose(&mut self) -> anyhow::Result<()> {
270 self.py_on_dispose()
271 .map_err(|e| anyhow::anyhow!("Python on_dispose failed: {e}"))
272 }
273
274 fn on_degrade(&mut self) -> anyhow::Result<()> {
275 self.py_on_degrade()
276 .map_err(|e| anyhow::anyhow!("Python on_degrade failed: {e}"))
277 }
278
279 fn on_fault(&mut self) -> anyhow::Result<()> {
280 self.py_on_fault()
281 .map_err(|e| anyhow::anyhow!("Python on_fault failed: {e}"))
282 }
283
284 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
285 self.py_on_time_event(event.clone())
286 .map_err(|e| anyhow::anyhow!("Python on_time_event failed: {e}"))
287 }
288
289 #[allow(unused_variables)]
290 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
291 Python::attach(|py| {
292 let py_data = py.None();
295
296 self.py_on_data(py_data)
297 .map_err(|e| anyhow::anyhow!("Python on_data failed: {e}"))
298 })
299 }
300
301 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
302 self.py_on_signal(signal)
303 .map_err(|e| anyhow::anyhow!("Python on_signal failed: {e}"))
304 }
305
306 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
307 Python::attach(|py| {
308 let py_instrument = instrument_any_to_pyobject(py, instrument.clone())
309 .map_err(|e| anyhow::anyhow!("Failed to convert InstrumentAny to Python: {e}"))?;
310 self.py_on_instrument(py_instrument)
311 .map_err(|e| anyhow::anyhow!("Python on_instrument failed: {e}"))
312 })
313 }
314
315 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
316 self.py_on_quote(*quote)
317 .map_err(|e| anyhow::anyhow!("Python on_quote failed: {e}"))
318 }
319
320 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
321 self.py_on_trade(*tick)
322 .map_err(|e| anyhow::anyhow!("Python on_trade failed: {e}"))
323 }
324
325 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
326 self.py_on_bar(*bar)
327 .map_err(|e| anyhow::anyhow!("Python on_bar failed: {e}"))
328 }
329
330 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
331 self.py_on_book_deltas(deltas.clone())
332 .map_err(|e| anyhow::anyhow!("Python on_book_deltas failed: {e}"))
333 }
334
335 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
336 self.py_on_book(order_book)
337 .map_err(|e| anyhow::anyhow!("Python on_book failed: {e}"))
338 }
339
340 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
341 self.py_on_mark_price(*mark_price)
342 .map_err(|e| anyhow::anyhow!("Python on_mark_price failed: {e}"))
343 }
344
345 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
346 self.py_on_index_price(*index_price)
347 .map_err(|e| anyhow::anyhow!("Python on_index_price failed: {e}"))
348 }
349
350 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
351 self.py_on_funding_rate(*funding_rate)
352 .map_err(|e| anyhow::anyhow!("Python on_funding_rate failed: {e}"))
353 }
354
355 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
356 self.py_on_instrument_status(*data)
357 .map_err(|e| anyhow::anyhow!("Python on_instrument_status failed: {e}"))
358 }
359
360 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
361 self.py_on_instrument_close(*update)
362 .map_err(|e| anyhow::anyhow!("Python on_instrument_close failed: {e}"))
363 }
364
365 #[cfg(feature = "defi")]
366 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
367 self.py_on_block(block.clone())
368 .map_err(|e| anyhow::anyhow!("Python on_block failed: {e}"))
369 }
370
371 #[cfg(feature = "defi")]
372 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
373 self.py_on_pool(pool.clone())
374 .map_err(|e| anyhow::anyhow!("Python on_pool failed: {e}"))
375 }
376
377 #[cfg(feature = "defi")]
378 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
379 self.py_on_pool_swap(swap.clone())
380 .map_err(|e| anyhow::anyhow!("Python on_pool_swap failed: {e}"))
381 }
382
383 #[cfg(feature = "defi")]
384 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
385 self.py_on_pool_liquidity_update(update.clone())
386 .map_err(|e| anyhow::anyhow!("Python on_pool_liquidity_update failed: {e}"))
387 }
388
389 #[cfg(feature = "defi")]
390 fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
391 self.py_on_pool_fee_collect(collect.clone())
392 .map_err(|e| anyhow::anyhow!("Python on_pool_fee_collect failed: {e}"))
393 }
394
395 #[cfg(feature = "defi")]
396 fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
397 self.py_on_pool_flash(flash.clone())
398 .map_err(|e| anyhow::anyhow!("Python on_pool_flash failed: {e}"))
399 }
400
401 fn on_historical_data(&mut self, _data: &dyn Any) -> anyhow::Result<()> {
402 Python::attach(|py| {
403 let py_data = py.None();
404 self.py_on_historical_data(py_data)
405 .map_err(|e| anyhow::anyhow!("Python on_historical_data failed: {e}"))
406 })
407 }
408
409 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
410 self.py_on_historical_quotes(quotes.to_vec())
411 .map_err(|e| anyhow::anyhow!("Python on_historical_quotes failed: {e}"))
412 }
413
414 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
415 self.py_on_historical_trades(trades.to_vec())
416 .map_err(|e| anyhow::anyhow!("Python on_historical_trades failed: {e}"))
417 }
418
419 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
420 self.py_on_historical_bars(bars.to_vec())
421 .map_err(|e| anyhow::anyhow!("Python on_historical_bars failed: {e}"))
422 }
423
424 fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
425 self.py_on_historical_mark_prices(mark_prices.to_vec())
426 .map_err(|e| anyhow::anyhow!("Python on_historical_mark_prices failed: {e}"))
427 }
428
429 fn on_historical_index_prices(
430 &mut self,
431 index_prices: &[IndexPriceUpdate],
432 ) -> anyhow::Result<()> {
433 self.py_on_historical_index_prices(index_prices.to_vec())
434 .map_err(|e| anyhow::anyhow!("Python on_historical_index_prices failed: {e}"))
435 }
436}
437
438#[pymethods]
439impl PyDataActor {
440 #[new]
441 #[pyo3(signature = (config=None))]
442 fn py_new(config: Option<DataActorConfig>) -> PyResult<Self> {
443 Ok(Self::new(config))
444 }
445
446 #[getter]
447 #[pyo3(name = "clock")]
448 fn py_clock(&self) -> PyResult<PyClock> {
449 if !self.core.is_registered() {
450 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
451 "Actor must be registered with a trader before accessing clock",
452 ))
453 } else {
454 Ok(self.clock.clone())
455 }
456 }
457
458 #[getter]
459 #[pyo3(name = "cache")]
460 fn py_cache(&self) -> PyResult<PyCache> {
461 if !self.core.is_registered() {
462 Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
463 "Actor must be registered with a trader before accessing cache",
464 ))
465 } else {
466 Ok(PyCache::from_rc(self.core.cache_rc()))
467 }
468 }
469
470 #[getter]
471 #[pyo3(name = "log")]
472 fn py_log(&self) -> PyLogger {
473 self.logger.clone()
474 }
475
476 #[getter]
477 #[pyo3(name = "actor_id")]
478 fn py_actor_id(&self) -> ActorId {
479 self.actor_id
480 }
481
482 #[getter]
483 #[pyo3(name = "trader_id")]
484 fn py_trader_id(&self) -> Option<TraderId> {
485 self.trader_id()
486 }
487
488 #[pyo3(name = "state")]
489 fn py_state(&self) -> ComponentState {
490 self.state()
491 }
492
493 #[pyo3(name = "is_ready")]
494 fn py_is_ready(&self) -> bool {
495 self.is_ready()
496 }
497
498 #[pyo3(name = "is_running")]
499 fn py_is_running(&self) -> bool {
500 self.is_running()
501 }
502
503 #[pyo3(name = "is_stopped")]
504 fn py_is_stopped(&self) -> bool {
505 self.is_stopped()
506 }
507
508 #[pyo3(name = "is_degraded")]
509 fn py_is_degraded(&self) -> bool {
510 self.is_degraded()
511 }
512
513 #[pyo3(name = "is_faulted")]
514 fn py_is_faulted(&self) -> bool {
515 self.is_faulted()
516 }
517
518 #[pyo3(name = "is_disposed")]
519 fn py_is_disposed(&self) -> bool {
520 self.is_disposed()
521 }
522
523 #[pyo3(name = "start")]
524 fn py_start(&mut self) -> PyResult<()> {
525 self.start().map_err(to_pyruntime_err)
526 }
527
528 #[pyo3(name = "stop")]
529 fn py_stop(&mut self) -> PyResult<()> {
530 self.stop().map_err(to_pyruntime_err)
531 }
532
533 #[pyo3(name = "resume")]
534 fn py_resume(&mut self) -> PyResult<()> {
535 self.resume().map_err(to_pyruntime_err)
536 }
537
538 #[pyo3(name = "reset")]
539 fn py_reset(&mut self) -> PyResult<()> {
540 self.reset().map_err(to_pyruntime_err)
541 }
542
543 #[pyo3(name = "dispose")]
544 fn py_dispose(&mut self) -> PyResult<()> {
545 self.dispose().map_err(to_pyruntime_err)
546 }
547
548 #[pyo3(name = "degrade")]
549 fn py_degrade(&mut self) -> PyResult<()> {
550 self.degrade().map_err(to_pyruntime_err)
551 }
552
553 #[pyo3(name = "fault")]
554 fn py_fault(&mut self) -> PyResult<()> {
555 self.fault().map_err(to_pyruntime_err)
556 }
557
558 #[pyo3(name = "shutdown_system")]
559 #[pyo3(signature = (reason=None))]
560 fn py_shutdown_system(&self, reason: Option<String>) -> PyResult<()> {
561 self.shutdown_system(reason);
562 Ok(())
563 }
564
565 #[pyo3(name = "on_start")]
566 fn py_on_start(&self) -> PyResult<()> {
567 if let Some(ref py_self) = self.py_self {
569 Python::attach(|py| py_self.call_method0(py, "on_start"))?;
570 }
571 Ok(())
572 }
573
574 #[pyo3(name = "on_stop")]
575 fn py_on_stop(&mut self) -> PyResult<()> {
576 if let Some(ref py_self) = self.py_self {
578 Python::attach(|py| py_self.call_method0(py, "on_stop"))?;
579 }
580 Ok(())
581 }
582
583 #[pyo3(name = "on_resume")]
584 fn py_on_resume(&mut self) -> PyResult<()> {
585 if let Some(ref py_self) = self.py_self {
587 Python::attach(|py| py_self.call_method0(py, "on_resume"))?;
588 }
589 Ok(())
590 }
591
592 #[pyo3(name = "on_reset")]
593 fn py_on_reset(&mut self) -> PyResult<()> {
594 if let Some(ref py_self) = self.py_self {
596 Python::attach(|py| py_self.call_method0(py, "on_reset"))?;
597 }
598 Ok(())
599 }
600
601 #[pyo3(name = "on_dispose")]
602 fn py_on_dispose(&mut self) -> PyResult<()> {
603 if let Some(ref py_self) = self.py_self {
605 Python::attach(|py| py_self.call_method0(py, "on_dispose"))?;
606 }
607 Ok(())
608 }
609
610 #[pyo3(name = "on_degrade")]
611 fn py_on_degrade(&mut self) -> PyResult<()> {
612 if let Some(ref py_self) = self.py_self {
614 Python::attach(|py| py_self.call_method0(py, "on_degrade"))?;
615 }
616 Ok(())
617 }
618
619 #[pyo3(name = "on_fault")]
620 fn py_on_fault(&mut self) -> PyResult<()> {
621 if let Some(ref py_self) = self.py_self {
623 Python::attach(|py| py_self.call_method0(py, "on_fault"))?;
624 }
625 Ok(())
626 }
627
628 #[allow(unused_variables)]
629 #[pyo3(name = "on_time_event")]
630 fn py_on_time_event(&mut self, event: TimeEvent) -> PyResult<()> {
631 if let Some(ref py_self) = self.py_self {
633 Python::attach(|py| {
634 py_self.call_method1(py, "on_time_event", (event.into_py_any_unwrap(py),))
635 })?;
636 }
637 Ok(())
638 }
639
640 #[pyo3(name = "on_data")]
641 fn py_on_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
642 if let Some(ref py_self) = self.py_self {
644 Python::attach(|py| py_self.call_method1(py, "on_data", (data,)))?;
645 }
646 Ok(())
647 }
648
649 #[allow(unused_variables)]
650 #[pyo3(name = "on_signal")]
651 fn py_on_signal(&mut self, signal: &Signal) -> PyResult<()> {
652 if let Some(ref py_self) = self.py_self {
654 Python::attach(|py| {
655 py_self.call_method1(py, "on_signal", (signal.clone().into_py_any_unwrap(py),))
656 })?;
657 }
658 Ok(())
659 }
660
661 #[pyo3(name = "on_instrument")]
662 fn py_on_instrument(&mut self, instrument: Py<PyAny>) -> PyResult<()> {
663 if let Some(ref py_self) = self.py_self {
665 Python::attach(|py| py_self.call_method1(py, "on_instrument", (instrument,)))?;
666 }
667 Ok(())
668 }
669
670 #[allow(unused_variables)]
671 #[pyo3(name = "on_quote")]
672 fn py_on_quote(&mut self, quote: QuoteTick) -> PyResult<()> {
673 if let Some(ref py_self) = self.py_self {
675 Python::attach(|py| {
676 py_self.call_method1(py, "on_quote", (quote.into_py_any_unwrap(py),))
677 })?;
678 }
679 Ok(())
680 }
681
682 #[allow(unused_variables)]
683 #[pyo3(name = "on_trade")]
684 fn py_on_trade(&mut self, trade: TradeTick) -> PyResult<()> {
685 if let Some(ref py_self) = self.py_self {
687 Python::attach(|py| {
688 py_self.call_method1(py, "on_trade", (trade.into_py_any_unwrap(py),))
689 })?;
690 }
691 Ok(())
692 }
693
694 #[allow(unused_variables)]
695 #[pyo3(name = "on_bar")]
696 fn py_on_bar(&mut self, bar: Bar) -> PyResult<()> {
697 if let Some(ref py_self) = self.py_self {
699 Python::attach(|py| py_self.call_method1(py, "on_bar", (bar.into_py_any_unwrap(py),)))?;
700 }
701 Ok(())
702 }
703
704 #[allow(unused_variables)]
705 #[pyo3(name = "on_book_deltas")]
706 fn py_on_book_deltas(&mut self, deltas: OrderBookDeltas) -> PyResult<()> {
707 if let Some(ref py_self) = self.py_self {
709 Python::attach(|py| {
710 py_self.call_method1(py, "on_book_deltas", (deltas.into_py_any_unwrap(py),))
711 })?;
712 }
713 Ok(())
714 }
715
716 #[allow(unused_variables)]
717 #[pyo3(name = "on_book")]
718 fn py_on_book(&mut self, book: &OrderBook) -> PyResult<()> {
719 if let Some(ref py_self) = self.py_self {
721 Python::attach(|py| {
722 py_self.call_method1(py, "on_book", (book.clone().into_py_any_unwrap(py),))
723 })?;
724 }
725 Ok(())
726 }
727
728 #[allow(unused_variables)]
729 #[pyo3(name = "on_mark_price")]
730 fn py_on_mark_price(&mut self, mark_price: MarkPriceUpdate) -> PyResult<()> {
731 if let Some(ref py_self) = self.py_self {
733 Python::attach(|py| {
734 py_self.call_method1(py, "on_mark_price", (mark_price.into_py_any_unwrap(py),))
735 })?;
736 }
737 Ok(())
738 }
739
740 #[allow(unused_variables)]
741 #[pyo3(name = "on_index_price")]
742 fn py_on_index_price(&mut self, index_price: IndexPriceUpdate) -> PyResult<()> {
743 if let Some(ref py_self) = self.py_self {
745 Python::attach(|py| {
746 py_self.call_method1(py, "on_index_price", (index_price.into_py_any_unwrap(py),))
747 })?;
748 }
749 Ok(())
750 }
751
752 #[allow(unused_variables)]
753 #[pyo3(name = "on_funding_rate")]
754 fn py_on_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> PyResult<()> {
755 if let Some(ref py_self) = self.py_self {
757 Python::attach(|py| {
758 py_self.call_method1(
759 py,
760 "on_funding_rate",
761 (funding_rate.into_py_any_unwrap(py),),
762 )
763 })?;
764 }
765 Ok(())
766 }
767
768 #[allow(unused_variables)]
769 #[pyo3(name = "on_instrument_status")]
770 fn py_on_instrument_status(&mut self, status: InstrumentStatus) -> PyResult<()> {
771 if let Some(ref py_self) = self.py_self {
773 Python::attach(|py| {
774 py_self.call_method1(py, "on_instrument_status", (status.into_py_any_unwrap(py),))
775 })?;
776 }
777 Ok(())
778 }
779
780 #[allow(unused_variables)]
781 #[pyo3(name = "on_instrument_close")]
782 fn py_on_instrument_close(&mut self, close: InstrumentClose) -> PyResult<()> {
783 if let Some(ref py_self) = self.py_self {
785 Python::attach(|py| {
786 py_self.call_method1(py, "on_instrument_close", (close.into_py_any_unwrap(py),))
787 })?;
788 }
789 Ok(())
790 }
791
792 #[cfg(feature = "defi")]
793 #[allow(unused_variables)]
794 #[pyo3(name = "on_block")]
795 fn py_on_block(&mut self, block: Block) -> PyResult<()> {
796 if let Some(ref py_self) = self.py_self {
798 Python::attach(|py| {
799 py_self.call_method1(py, "on_block", (block.into_py_any_unwrap(py),))
800 })?;
801 }
802 Ok(())
803 }
804
805 #[cfg(feature = "defi")]
806 #[allow(unused_variables)]
807 #[pyo3(name = "on_pool")]
808 fn py_on_pool(&mut self, pool: Pool) -> PyResult<()> {
809 if let Some(ref py_self) = self.py_self {
811 Python::attach(|py| {
812 py_self.call_method1(py, "on_pool", (pool.into_py_any_unwrap(py),))
813 })?;
814 }
815 Ok(())
816 }
817
818 #[cfg(feature = "defi")]
819 #[allow(unused_variables)]
820 #[pyo3(name = "on_pool_swap")]
821 fn py_on_pool_swap(&mut self, swap: PoolSwap) -> PyResult<()> {
822 if let Some(ref py_self) = self.py_self {
824 Python::attach(|py| {
825 py_self.call_method1(py, "on_pool_swap", (swap.into_py_any_unwrap(py),))
826 })?;
827 }
828 Ok(())
829 }
830
831 #[cfg(feature = "defi")]
832 #[allow(unused_variables)]
833 #[pyo3(name = "on_pool_liquidity_update")]
834 fn py_on_pool_liquidity_update(&mut self, update: PoolLiquidityUpdate) -> PyResult<()> {
835 if let Some(ref py_self) = self.py_self {
837 Python::attach(|py| {
838 py_self.call_method1(
839 py,
840 "on_pool_liquidity_update",
841 (update.into_py_any_unwrap(py),),
842 )
843 })?;
844 }
845 Ok(())
846 }
847
848 #[cfg(feature = "defi")]
849 #[allow(unused_variables)]
850 #[pyo3(name = "on_pool_fee_collect")]
851 fn py_on_pool_fee_collect(&mut self, update: PoolFeeCollect) -> PyResult<()> {
852 if let Some(ref py_self) = self.py_self {
854 Python::attach(|py| {
855 py_self.call_method1(py, "on_pool_fee_collect", (update.into_py_any_unwrap(py),))
856 })?;
857 }
858 Ok(())
859 }
860
861 #[cfg(feature = "defi")]
862 #[allow(unused_variables)]
863 #[pyo3(name = "on_pool_flash")]
864 fn py_on_pool_flash(&mut self, flash: PoolFlash) -> PyResult<()> {
865 if let Some(ref py_self) = self.py_self {
867 Python::attach(|py| {
868 py_self.call_method1(py, "on_pool_flash", (flash.into_py_any_unwrap(py),))
869 })?;
870 }
871 Ok(())
872 }
873
874 #[pyo3(name = "subscribe_data")]
875 #[pyo3(signature = (data_type, client_id=None, params=None))]
876 fn py_subscribe_data(
877 &mut self,
878 data_type: DataType,
879 client_id: Option<ClientId>,
880 params: Option<IndexMap<String, String>>,
881 ) -> PyResult<()> {
882 self.subscribe_data(data_type, client_id, params);
883 Ok(())
884 }
885
886 #[pyo3(name = "subscribe_instruments")]
887 #[pyo3(signature = (venue, client_id=None, params=None))]
888 fn py_subscribe_instruments(
889 &mut self,
890 venue: Venue,
891 client_id: Option<ClientId>,
892 params: Option<IndexMap<String, String>>,
893 ) -> PyResult<()> {
894 self.subscribe_instruments(venue, client_id, params);
895 Ok(())
896 }
897
898 #[pyo3(name = "subscribe_instrument")]
899 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
900 fn py_subscribe_instrument(
901 &mut self,
902 instrument_id: InstrumentId,
903 client_id: Option<ClientId>,
904 params: Option<IndexMap<String, String>>,
905 ) -> PyResult<()> {
906 self.subscribe_instrument(instrument_id, client_id, params);
907 Ok(())
908 }
909
910 #[pyo3(name = "subscribe_book_deltas")]
911 #[pyo3(signature = (instrument_id, book_type, depth=None, client_id=None, managed=false, params=None))]
912 fn py_subscribe_book_deltas(
913 &mut self,
914 instrument_id: InstrumentId,
915 book_type: BookType,
916 depth: Option<usize>,
917 client_id: Option<ClientId>,
918 managed: bool,
919 params: Option<IndexMap<String, String>>,
920 ) -> PyResult<()> {
921 let depth = depth.and_then(NonZeroUsize::new);
922 self.subscribe_book_deltas(instrument_id, book_type, depth, client_id, managed, params);
923 Ok(())
924 }
925
926 #[pyo3(name = "subscribe_book_at_interval")]
927 #[pyo3(signature = (instrument_id, book_type, interval_ms, depth=None, client_id=None, params=None))]
928 fn py_subscribe_book_at_interval(
929 &mut self,
930 instrument_id: InstrumentId,
931 book_type: BookType,
932 interval_ms: usize,
933 depth: Option<usize>,
934 client_id: Option<ClientId>,
935 params: Option<IndexMap<String, String>>,
936 ) -> PyResult<()> {
937 let depth = depth.and_then(NonZeroUsize::new);
938 let interval_ms = NonZeroUsize::new(interval_ms)
939 .ok_or_else(|| PyErr::new::<PyValueError, _>("interval_ms must be > 0"))?;
940
941 self.subscribe_book_at_interval(
942 instrument_id,
943 book_type,
944 depth,
945 interval_ms,
946 client_id,
947 params,
948 );
949 Ok(())
950 }
951
952 #[pyo3(name = "subscribe_quotes")]
953 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
954 fn py_subscribe_quotes(
955 &mut self,
956 instrument_id: InstrumentId,
957 client_id: Option<ClientId>,
958 params: Option<IndexMap<String, String>>,
959 ) -> PyResult<()> {
960 self.subscribe_quotes(instrument_id, client_id, params);
961 Ok(())
962 }
963
964 #[pyo3(name = "subscribe_trades")]
965 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
966 fn py_subscribe_trades(
967 &mut self,
968 instrument_id: InstrumentId,
969 client_id: Option<ClientId>,
970 params: Option<IndexMap<String, String>>,
971 ) -> PyResult<()> {
972 self.subscribe_trades(instrument_id, client_id, params);
973 Ok(())
974 }
975
976 #[pyo3(name = "subscribe_bars")]
977 #[pyo3(signature = (bar_type, client_id=None, params=None))]
978 fn py_subscribe_bars(
979 &mut self,
980 bar_type: BarType,
981 client_id: Option<ClientId>,
982 params: Option<IndexMap<String, String>>,
983 ) -> PyResult<()> {
984 self.subscribe_bars(bar_type, client_id, params);
985 Ok(())
986 }
987
988 #[pyo3(name = "subscribe_mark_prices")]
989 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
990 fn py_subscribe_mark_prices(
991 &mut self,
992 instrument_id: InstrumentId,
993 client_id: Option<ClientId>,
994 params: Option<IndexMap<String, String>>,
995 ) -> PyResult<()> {
996 self.subscribe_mark_prices(instrument_id, client_id, params);
997 Ok(())
998 }
999
1000 #[pyo3(name = "subscribe_index_prices")]
1001 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1002 fn py_subscribe_index_prices(
1003 &mut self,
1004 instrument_id: InstrumentId,
1005 client_id: Option<ClientId>,
1006 params: Option<IndexMap<String, String>>,
1007 ) -> PyResult<()> {
1008 self.subscribe_index_prices(instrument_id, client_id, params);
1009 Ok(())
1010 }
1011
1012 #[pyo3(name = "subscribe_instrument_status")]
1013 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1014 fn py_subscribe_instrument_status(
1015 &mut self,
1016 instrument_id: InstrumentId,
1017 client_id: Option<ClientId>,
1018 params: Option<IndexMap<String, String>>,
1019 ) -> PyResult<()> {
1020 self.subscribe_instrument_status(instrument_id, client_id, params);
1021 Ok(())
1022 }
1023
1024 #[pyo3(name = "subscribe_instrument_close")]
1025 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1026 fn py_subscribe_instrument_close(
1027 &mut self,
1028 instrument_id: InstrumentId,
1029 client_id: Option<ClientId>,
1030 params: Option<IndexMap<String, String>>,
1031 ) -> PyResult<()> {
1032 self.subscribe_instrument_close(instrument_id, client_id, params);
1033 Ok(())
1034 }
1035
1036 #[pyo3(name = "subscribe_order_fills")]
1037 #[pyo3(signature = (instrument_id))]
1038 fn py_subscribe_order_fills(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1039 self.subscribe_order_fills(instrument_id);
1040 Ok(())
1041 }
1042
1043 #[cfg(feature = "defi")]
1044 #[pyo3(name = "subscribe_blocks")]
1045 #[pyo3(signature = (chain, client_id=None, params=None))]
1046 fn py_subscribe_blocks(
1047 &mut self,
1048 chain: Blockchain,
1049 client_id: Option<ClientId>,
1050 params: Option<IndexMap<String, String>>,
1051 ) -> PyResult<()> {
1052 self.subscribe_blocks(chain, client_id, params);
1053 Ok(())
1054 }
1055
1056 #[cfg(feature = "defi")]
1057 #[pyo3(name = "subscribe_pool")]
1058 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1059 fn py_subscribe_pool(
1060 &mut self,
1061 instrument_id: InstrumentId,
1062 client_id: Option<ClientId>,
1063 params: Option<IndexMap<String, String>>,
1064 ) -> PyResult<()> {
1065 self.subscribe_pool(instrument_id, client_id, params);
1066 Ok(())
1067 }
1068
1069 #[cfg(feature = "defi")]
1070 #[pyo3(name = "subscribe_pool_swaps")]
1071 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1072 fn py_subscribe_pool_swaps(
1073 &mut self,
1074 instrument_id: InstrumentId,
1075 client_id: Option<ClientId>,
1076 params: Option<IndexMap<String, String>>,
1077 ) -> PyResult<()> {
1078 self.subscribe_pool_swaps(instrument_id, client_id, params);
1079 Ok(())
1080 }
1081
1082 #[cfg(feature = "defi")]
1083 #[pyo3(name = "subscribe_pool_liquidity_updates")]
1084 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1085 fn py_subscribe_pool_liquidity_updates(
1086 &mut self,
1087 instrument_id: InstrumentId,
1088 client_id: Option<ClientId>,
1089 params: Option<IndexMap<String, String>>,
1090 ) -> PyResult<()> {
1091 self.subscribe_pool_liquidity_updates(instrument_id, client_id, params);
1092 Ok(())
1093 }
1094
1095 #[cfg(feature = "defi")]
1096 #[pyo3(name = "subscribe_pool_fee_collects")]
1097 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1098 fn py_subscribe_pool_fee_collects(
1099 &mut self,
1100 instrument_id: InstrumentId,
1101 client_id: Option<ClientId>,
1102 params: Option<IndexMap<String, String>>,
1103 ) -> PyResult<()> {
1104 self.subscribe_pool_fee_collects(instrument_id, client_id, params);
1105 Ok(())
1106 }
1107
1108 #[cfg(feature = "defi")]
1109 #[pyo3(name = "subscribe_pool_flash_events")]
1110 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1111 fn py_subscribe_pool_flash_events(
1112 &mut self,
1113 instrument_id: InstrumentId,
1114 client_id: Option<ClientId>,
1115 params: Option<IndexMap<String, String>>,
1116 ) -> PyResult<()> {
1117 self.subscribe_pool_flash_events(instrument_id, client_id, params);
1118 Ok(())
1119 }
1120
1121 #[pyo3(name = "request_data")]
1122 #[pyo3(signature = (data_type, client_id, start=None, end=None, limit=None, params=None))]
1123 fn py_request_data(
1124 &mut self,
1125 data_type: DataType,
1126 client_id: ClientId,
1127 start: Option<u64>,
1128 end: Option<u64>,
1129 limit: Option<usize>,
1130 params: Option<IndexMap<String, String>>,
1131 ) -> PyResult<String> {
1132 let limit = limit.and_then(NonZeroUsize::new);
1133 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1134 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1135
1136 let request_id = self
1137 .request_data(data_type, client_id, start, end, limit, params)
1138 .map_err(to_pyvalue_err)?;
1139 Ok(request_id.to_string())
1140 }
1141
1142 #[pyo3(name = "request_instrument")]
1143 #[pyo3(signature = (instrument_id, start=None, end=None, client_id=None, params=None))]
1144 fn py_request_instrument(
1145 &mut self,
1146 instrument_id: InstrumentId,
1147 start: Option<u64>,
1148 end: Option<u64>,
1149 client_id: Option<ClientId>,
1150 params: Option<IndexMap<String, String>>,
1151 ) -> PyResult<String> {
1152 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1153 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1154
1155 let request_id = self
1156 .request_instrument(instrument_id, start, end, client_id, params)
1157 .map_err(to_pyvalue_err)?;
1158 Ok(request_id.to_string())
1159 }
1160
1161 #[pyo3(name = "request_instruments")]
1162 #[pyo3(signature = (venue=None, start=None, end=None, client_id=None, params=None))]
1163 fn py_request_instruments(
1164 &mut self,
1165 venue: Option<Venue>,
1166 start: Option<u64>,
1167 end: Option<u64>,
1168 client_id: Option<ClientId>,
1169 params: Option<IndexMap<String, String>>,
1170 ) -> PyResult<String> {
1171 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1172 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1173
1174 let request_id = self
1175 .request_instruments(venue, start, end, client_id, params)
1176 .map_err(to_pyvalue_err)?;
1177 Ok(request_id.to_string())
1178 }
1179
1180 #[pyo3(name = "request_book_snapshot")]
1181 #[pyo3(signature = (instrument_id, depth=None, client_id=None, params=None))]
1182 fn py_request_book_snapshot(
1183 &mut self,
1184 instrument_id: InstrumentId,
1185 depth: Option<usize>,
1186 client_id: Option<ClientId>,
1187 params: Option<IndexMap<String, String>>,
1188 ) -> PyResult<String> {
1189 let depth = depth.and_then(NonZeroUsize::new);
1190
1191 let request_id = self
1192 .request_book_snapshot(instrument_id, depth, client_id, params)
1193 .map_err(to_pyvalue_err)?;
1194 Ok(request_id.to_string())
1195 }
1196
1197 #[pyo3(name = "request_quotes")]
1198 #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1199 fn py_request_quotes(
1200 &mut self,
1201 instrument_id: InstrumentId,
1202 start: Option<u64>,
1203 end: Option<u64>,
1204 limit: Option<usize>,
1205 client_id: Option<ClientId>,
1206 params: Option<IndexMap<String, String>>,
1207 ) -> PyResult<String> {
1208 let limit = limit.and_then(NonZeroUsize::new);
1209 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1210 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1211
1212 let request_id = self
1213 .request_quotes(instrument_id, start, end, limit, client_id, params)
1214 .map_err(to_pyvalue_err)?;
1215 Ok(request_id.to_string())
1216 }
1217
1218 #[pyo3(name = "request_trades")]
1219 #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1220 fn py_request_trades(
1221 &mut self,
1222 instrument_id: InstrumentId,
1223 start: Option<u64>,
1224 end: Option<u64>,
1225 limit: Option<usize>,
1226 client_id: Option<ClientId>,
1227 params: Option<IndexMap<String, String>>,
1228 ) -> PyResult<String> {
1229 let limit = limit.and_then(NonZeroUsize::new);
1230 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1231 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1232
1233 let request_id = self
1234 .request_trades(instrument_id, start, end, limit, client_id, params)
1235 .map_err(to_pyvalue_err)?;
1236 Ok(request_id.to_string())
1237 }
1238
1239 #[pyo3(name = "request_bars")]
1240 #[pyo3(signature = (bar_type, start=None, end=None, limit=None, client_id=None, params=None))]
1241 fn py_request_bars(
1242 &mut self,
1243 bar_type: BarType,
1244 start: Option<u64>,
1245 end: Option<u64>,
1246 limit: Option<usize>,
1247 client_id: Option<ClientId>,
1248 params: Option<IndexMap<String, String>>,
1249 ) -> PyResult<String> {
1250 let limit = limit.and_then(NonZeroUsize::new);
1251 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1252 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1253
1254 let request_id = self
1255 .request_bars(bar_type, start, end, limit, client_id, params)
1256 .map_err(to_pyvalue_err)?;
1257 Ok(request_id.to_string())
1258 }
1259
1260 #[pyo3(name = "unsubscribe_data")]
1261 #[pyo3(signature = (data_type, client_id=None, params=None))]
1262 fn py_unsubscribe_data(
1263 &mut self,
1264 data_type: DataType,
1265 client_id: Option<ClientId>,
1266 params: Option<IndexMap<String, String>>,
1267 ) -> PyResult<()> {
1268 self.unsubscribe_data(data_type, client_id, params);
1269 Ok(())
1270 }
1271
1272 #[pyo3(name = "unsubscribe_instruments")]
1273 #[pyo3(signature = (venue, client_id=None, params=None))]
1274 fn py_unsubscribe_instruments(
1275 &mut self,
1276 venue: Venue,
1277 client_id: Option<ClientId>,
1278 params: Option<IndexMap<String, String>>,
1279 ) -> PyResult<()> {
1280 self.unsubscribe_instruments(venue, client_id, params);
1281 Ok(())
1282 }
1283
1284 #[pyo3(name = "unsubscribe_instrument")]
1285 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1286 fn py_unsubscribe_instrument(
1287 &mut self,
1288 instrument_id: InstrumentId,
1289 client_id: Option<ClientId>,
1290 params: Option<IndexMap<String, String>>,
1291 ) -> PyResult<()> {
1292 self.unsubscribe_instrument(instrument_id, client_id, params);
1293 Ok(())
1294 }
1295
1296 #[pyo3(name = "unsubscribe_book_deltas")]
1297 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1298 fn py_unsubscribe_book_deltas(
1299 &mut self,
1300 instrument_id: InstrumentId,
1301 client_id: Option<ClientId>,
1302 params: Option<IndexMap<String, String>>,
1303 ) -> PyResult<()> {
1304 self.unsubscribe_book_deltas(instrument_id, client_id, params);
1305 Ok(())
1306 }
1307
1308 #[pyo3(name = "unsubscribe_book_at_interval")]
1309 #[pyo3(signature = (instrument_id, interval_ms, client_id=None, params=None))]
1310 fn py_unsubscribe_book_at_interval(
1311 &mut self,
1312 instrument_id: InstrumentId,
1313 interval_ms: usize,
1314 client_id: Option<ClientId>,
1315 params: Option<IndexMap<String, String>>,
1316 ) -> PyResult<()> {
1317 let interval_ms = NonZeroUsize::new(interval_ms)
1318 .ok_or_else(|| PyErr::new::<PyValueError, _>("interval_ms must be > 0"))?;
1319
1320 self.unsubscribe_book_at_interval(instrument_id, interval_ms, client_id, params);
1321 Ok(())
1322 }
1323
1324 #[pyo3(name = "unsubscribe_quotes")]
1325 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1326 fn py_unsubscribe_quotes(
1327 &mut self,
1328 instrument_id: InstrumentId,
1329 client_id: Option<ClientId>,
1330 params: Option<IndexMap<String, String>>,
1331 ) -> PyResult<()> {
1332 self.unsubscribe_quotes(instrument_id, client_id, params);
1333 Ok(())
1334 }
1335
1336 #[pyo3(name = "unsubscribe_trades")]
1337 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1338 fn py_unsubscribe_trades(
1339 &mut self,
1340 instrument_id: InstrumentId,
1341 client_id: Option<ClientId>,
1342 params: Option<IndexMap<String, String>>,
1343 ) -> PyResult<()> {
1344 self.unsubscribe_trades(instrument_id, client_id, params);
1345 Ok(())
1346 }
1347
1348 #[pyo3(name = "unsubscribe_bars")]
1349 #[pyo3(signature = (bar_type, client_id=None, params=None))]
1350 fn py_unsubscribe_bars(
1351 &mut self,
1352 bar_type: BarType,
1353 client_id: Option<ClientId>,
1354 params: Option<IndexMap<String, String>>,
1355 ) -> PyResult<()> {
1356 self.unsubscribe_bars(bar_type, client_id, params);
1357 Ok(())
1358 }
1359
1360 #[pyo3(name = "unsubscribe_mark_prices")]
1361 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1362 fn py_unsubscribe_mark_prices(
1363 &mut self,
1364 instrument_id: InstrumentId,
1365 client_id: Option<ClientId>,
1366 params: Option<IndexMap<String, String>>,
1367 ) -> PyResult<()> {
1368 self.unsubscribe_mark_prices(instrument_id, client_id, params);
1369 Ok(())
1370 }
1371
1372 #[pyo3(name = "unsubscribe_index_prices")]
1373 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1374 fn py_unsubscribe_index_prices(
1375 &mut self,
1376 instrument_id: InstrumentId,
1377 client_id: Option<ClientId>,
1378 params: Option<IndexMap<String, String>>,
1379 ) -> PyResult<()> {
1380 self.unsubscribe_index_prices(instrument_id, client_id, params);
1381 Ok(())
1382 }
1383
1384 #[pyo3(name = "unsubscribe_instrument_status")]
1385 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1386 fn py_unsubscribe_instrument_status(
1387 &mut self,
1388 instrument_id: InstrumentId,
1389 client_id: Option<ClientId>,
1390 params: Option<IndexMap<String, String>>,
1391 ) -> PyResult<()> {
1392 self.unsubscribe_instrument_status(instrument_id, client_id, params);
1393 Ok(())
1394 }
1395
1396 #[pyo3(name = "unsubscribe_instrument_close")]
1397 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1398 fn py_unsubscribe_instrument_close(
1399 &mut self,
1400 instrument_id: InstrumentId,
1401 client_id: Option<ClientId>,
1402 params: Option<IndexMap<String, String>>,
1403 ) -> PyResult<()> {
1404 self.unsubscribe_instrument_close(instrument_id, client_id, params);
1405 Ok(())
1406 }
1407
1408 #[pyo3(name = "unsubscribe_order_fills")]
1409 #[pyo3(signature = (instrument_id))]
1410 fn py_unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) -> PyResult<()> {
1411 self.unsubscribe_order_fills(instrument_id);
1412 Ok(())
1413 }
1414
1415 #[cfg(feature = "defi")]
1416 #[pyo3(name = "unsubscribe_blocks")]
1417 #[pyo3(signature = (chain, client_id=None, params=None))]
1418 fn py_unsubscribe_blocks(
1419 &mut self,
1420 chain: Blockchain,
1421 client_id: Option<ClientId>,
1422 params: Option<IndexMap<String, String>>,
1423 ) -> PyResult<()> {
1424 self.unsubscribe_blocks(chain, client_id, params);
1425 Ok(())
1426 }
1427
1428 #[cfg(feature = "defi")]
1429 #[pyo3(name = "unsubscribe_pool")]
1430 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1431 fn py_unsubscribe_pool(
1432 &mut self,
1433 instrument_id: InstrumentId,
1434 client_id: Option<ClientId>,
1435 params: Option<IndexMap<String, String>>,
1436 ) -> PyResult<()> {
1437 self.unsubscribe_pool(instrument_id, client_id, params);
1438 Ok(())
1439 }
1440
1441 #[cfg(feature = "defi")]
1442 #[pyo3(name = "unsubscribe_pool_swaps")]
1443 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1444 fn py_unsubscribe_pool_swaps(
1445 &mut self,
1446 instrument_id: InstrumentId,
1447 client_id: Option<ClientId>,
1448 params: Option<IndexMap<String, String>>,
1449 ) -> PyResult<()> {
1450 self.unsubscribe_pool_swaps(instrument_id, client_id, params);
1451 Ok(())
1452 }
1453
1454 #[cfg(feature = "defi")]
1455 #[pyo3(name = "unsubscribe_pool_liquidity_updates")]
1456 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1457 fn py_unsubscribe_pool_liquidity_updates(
1458 &mut self,
1459 instrument_id: InstrumentId,
1460 client_id: Option<ClientId>,
1461 params: Option<IndexMap<String, String>>,
1462 ) -> PyResult<()> {
1463 self.unsubscribe_pool_liquidity_updates(instrument_id, client_id, params);
1464 Ok(())
1465 }
1466
1467 #[cfg(feature = "defi")]
1468 #[pyo3(name = "unsubscribe_pool_fee_collects")]
1469 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1470 fn py_unsubscribe_pool_fee_collects(
1471 &mut self,
1472 instrument_id: InstrumentId,
1473 client_id: Option<ClientId>,
1474 params: Option<IndexMap<String, String>>,
1475 ) -> PyResult<()> {
1476 self.unsubscribe_pool_fee_collects(instrument_id, client_id, params);
1477 Ok(())
1478 }
1479
1480 #[cfg(feature = "defi")]
1481 #[pyo3(name = "unsubscribe_pool_flash_events")]
1482 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1483 fn py_unsubscribe_pool_flash_events(
1484 &mut self,
1485 instrument_id: InstrumentId,
1486 client_id: Option<ClientId>,
1487 params: Option<IndexMap<String, String>>,
1488 ) -> PyResult<()> {
1489 self.unsubscribe_pool_flash_events(instrument_id, client_id, params);
1490 Ok(())
1491 }
1492
1493 #[allow(unused_variables)]
1494 #[pyo3(name = "on_historical_data")]
1495 fn py_on_historical_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
1496 Ok(())
1498 }
1499
1500 #[allow(unused_variables)]
1501 #[pyo3(name = "on_historical_quotes")]
1502 fn py_on_historical_quotes(&mut self, quotes: Vec<QuoteTick>) -> PyResult<()> {
1503 Ok(())
1505 }
1506
1507 #[allow(unused_variables)]
1508 #[pyo3(name = "on_historical_trades")]
1509 fn py_on_historical_trades(&mut self, trades: Vec<TradeTick>) -> PyResult<()> {
1510 Ok(())
1512 }
1513
1514 #[allow(unused_variables)]
1515 #[pyo3(name = "on_historical_bars")]
1516 fn py_on_historical_bars(&mut self, bars: Vec<Bar>) -> PyResult<()> {
1517 Ok(())
1519 }
1520
1521 #[allow(unused_variables)]
1522 #[pyo3(name = "on_historical_mark_prices")]
1523 fn py_on_historical_mark_prices(&mut self, mark_prices: Vec<MarkPriceUpdate>) -> PyResult<()> {
1524 Ok(())
1526 }
1527
1528 #[allow(unused_variables)]
1529 #[pyo3(name = "on_historical_index_prices")]
1530 fn py_on_historical_index_prices(
1531 &mut self,
1532 index_prices: Vec<IndexPriceUpdate>,
1533 ) -> PyResult<()> {
1534 Ok(())
1536 }
1537}
1538
1539#[cfg(test)]
1543mod tests {
1544 use std::{
1545 any::Any,
1546 cell::RefCell,
1547 collections::HashMap,
1548 ops::{Deref, DerefMut},
1549 rc::Rc,
1550 str::FromStr,
1551 sync::{Arc, Mutex},
1552 };
1553
1554 use alloy_primitives::{I256, U160};
1555 use nautilus_core::{UUID4, UnixNanos};
1556 #[cfg(feature = "defi")]
1557 use nautilus_model::defi::{
1558 AmmType, Block, Blockchain, Chain, Dex, DexType, Pool, PoolLiquidityUpdate, PoolSwap, Token,
1559 };
1560 use nautilus_model::{
1561 data::{
1562 Bar, BarType, DataType, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
1563 OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
1564 },
1565 enums::BookType,
1566 identifiers::{ClientId, TraderId, Venue},
1567 instruments::{CurrencyPair, InstrumentAny, stubs::audusd_sim},
1568 orderbook::OrderBook,
1569 types::{Price, Quantity},
1570 };
1571 use rstest::{fixture, rstest};
1572 use ustr::Ustr;
1573
1574 use super::PyDataActor;
1575 use crate::{
1576 actor::{DataActor, data_actor::DataActorCore},
1577 cache::Cache,
1578 clock::TestClock,
1579 component::Component,
1580 enums::ComponentState,
1581 runner::{SyncDataCommandSender, set_data_cmd_sender},
1582 signal::Signal,
1583 timer::TimeEvent,
1584 };
1585
1586 #[fixture]
1587 fn clock() -> Rc<RefCell<TestClock>> {
1588 Rc::new(RefCell::new(TestClock::new()))
1589 }
1590
1591 #[fixture]
1592 fn cache() -> Rc<RefCell<Cache>> {
1593 Rc::new(RefCell::new(Cache::new(None, None)))
1594 }
1595
1596 #[fixture]
1597 fn trader_id() -> TraderId {
1598 TraderId::from("TRADER-001")
1599 }
1600
1601 #[fixture]
1602 fn client_id() -> ClientId {
1603 ClientId::new("TestClient")
1604 }
1605
1606 #[fixture]
1607 fn venue() -> Venue {
1608 Venue::from("SIM")
1609 }
1610
1611 #[fixture]
1612 fn data_type() -> DataType {
1613 DataType::new("TestData", None)
1614 }
1615
1616 #[fixture]
1617 fn bar_type(audusd_sim: CurrencyPair) -> BarType {
1618 BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap()
1619 }
1620
1621 fn create_unregistered_actor() -> PyDataActor {
1622 PyDataActor::new(None)
1623 }
1624
1625 fn create_registered_actor(
1626 clock: Rc<RefCell<TestClock>>,
1627 cache: Rc<RefCell<Cache>>,
1628 trader_id: TraderId,
1629 ) -> PyDataActor {
1630 let sender = SyncDataCommandSender;
1632 set_data_cmd_sender(Arc::new(sender));
1633
1634 let mut actor = PyDataActor::new(None);
1635 actor.register(trader_id, clock, cache).unwrap();
1636 actor
1637 }
1638
1639 #[rstest]
1640 fn test_new_actor_creation() {
1641 let actor = PyDataActor::new(None);
1642 assert!(actor.trader_id().is_none());
1643 }
1644
1645 #[rstest]
1646 fn test_clock_access_before_registration_raises_error() {
1647 let actor = PyDataActor::new(None);
1648
1649 let result = actor.py_clock();
1651 assert!(result.is_err());
1652
1653 let error = result.unwrap_err();
1654 pyo3::Python::initialize();
1655 pyo3::Python::attach(|py| {
1656 assert!(error.is_instance_of::<pyo3::exceptions::PyRuntimeError>(py));
1657 });
1658
1659 let error_msg = error.to_string();
1660 assert!(
1661 error_msg.contains("Actor must be registered with a trader before accessing clock")
1662 );
1663 }
1664
1665 #[rstest]
1666 fn test_unregistered_actor_methods_work() {
1667 let actor = create_unregistered_actor();
1668
1669 assert!(!actor.py_is_ready());
1670 assert!(!actor.py_is_running());
1671 assert!(!actor.py_is_stopped());
1672 assert!(!actor.py_is_disposed());
1673 assert!(!actor.py_is_degraded());
1674 assert!(!actor.py_is_faulted());
1675
1676 assert_eq!(actor.trader_id(), None);
1678 }
1679
1680 #[rstest]
1681 fn test_registration_success(
1682 clock: Rc<RefCell<TestClock>>,
1683 cache: Rc<RefCell<Cache>>,
1684 trader_id: TraderId,
1685 ) {
1686 let mut actor = create_unregistered_actor();
1687 actor.register(trader_id, clock, cache).unwrap();
1688 assert!(actor.trader_id().is_some());
1689 assert_eq!(actor.trader_id().unwrap(), trader_id);
1690 }
1691
1692 #[rstest]
1693 fn test_registered_actor_basic_properties(
1694 clock: Rc<RefCell<TestClock>>,
1695 cache: Rc<RefCell<Cache>>,
1696 trader_id: TraderId,
1697 ) {
1698 let actor = create_registered_actor(clock, cache, trader_id);
1699
1700 assert_eq!(actor.state(), ComponentState::Ready);
1701 assert_eq!(actor.trader_id(), Some(TraderId::from("TRADER-001")));
1702 assert!(actor.py_is_ready());
1703 assert!(!actor.py_is_running());
1704 assert!(!actor.py_is_stopped());
1705 assert!(!actor.py_is_disposed());
1706 assert!(!actor.py_is_degraded());
1707 assert!(!actor.py_is_faulted());
1708 }
1709
1710 #[rstest]
1711 fn test_basic_subscription_methods_compile(
1712 clock: Rc<RefCell<TestClock>>,
1713 cache: Rc<RefCell<Cache>>,
1714 trader_id: TraderId,
1715 data_type: DataType,
1716 client_id: ClientId,
1717 audusd_sim: CurrencyPair,
1718 ) {
1719 let mut actor = create_registered_actor(clock, cache, trader_id);
1720
1721 let _ = actor.py_subscribe_data(data_type.clone(), Some(client_id), None);
1722 let _ = actor.py_subscribe_quotes(audusd_sim.id, Some(client_id), None);
1723 let _ = actor.py_unsubscribe_data(data_type, Some(client_id), None);
1724 let _ = actor.py_unsubscribe_quotes(audusd_sim.id, Some(client_id), None);
1725 }
1726
1727 #[ignore = "TODO: Under development"]
1728 #[rstest]
1729 fn test_lifecycle_methods_pass_through(
1730 clock: Rc<RefCell<TestClock>>,
1731 cache: Rc<RefCell<Cache>>,
1732 trader_id: TraderId,
1733 ) {
1734 let mut actor = create_registered_actor(clock, cache, trader_id);
1735
1736 assert!(actor.py_start().is_ok());
1737 assert!(actor.py_stop().is_ok());
1738 assert!(actor.py_dispose().is_ok());
1739 }
1740
1741 #[rstest]
1742 fn test_shutdown_system_passes_through(
1743 clock: Rc<RefCell<TestClock>>,
1744 cache: Rc<RefCell<Cache>>,
1745 trader_id: TraderId,
1746 ) {
1747 let actor = create_registered_actor(clock, cache, trader_id);
1748
1749 assert!(
1750 actor
1751 .py_shutdown_system(Some("Test shutdown".to_string()))
1752 .is_ok()
1753 );
1754 assert!(actor.py_shutdown_system(None).is_ok());
1755 }
1756
1757 #[rstest]
1758 fn test_book_at_interval_invalid_interval_ms(
1759 clock: Rc<RefCell<TestClock>>,
1760 cache: Rc<RefCell<Cache>>,
1761 trader_id: TraderId,
1762 audusd_sim: CurrencyPair,
1763 ) {
1764 pyo3::Python::initialize();
1765 let mut actor = create_registered_actor(clock, cache, trader_id);
1766
1767 let result = actor.py_subscribe_book_at_interval(
1768 audusd_sim.id,
1769 BookType::L2_MBP,
1770 0,
1771 None,
1772 None,
1773 None,
1774 );
1775 assert!(result.is_err());
1776 assert_eq!(
1777 result.unwrap_err().to_string(),
1778 "ValueError: interval_ms must be > 0"
1779 );
1780
1781 let result = actor.py_unsubscribe_book_at_interval(audusd_sim.id, 0, None, None);
1782 assert!(result.is_err());
1783 assert_eq!(
1784 result.unwrap_err().to_string(),
1785 "ValueError: interval_ms must be > 0"
1786 );
1787 }
1788
1789 #[rstest]
1790 fn test_request_methods_signatures_exist() {
1791 let actor = create_unregistered_actor();
1792 assert!(actor.trader_id().is_none());
1793 }
1794
1795 #[rstest]
1796 fn test_data_actor_trait_implementation(
1797 clock: Rc<RefCell<TestClock>>,
1798 cache: Rc<RefCell<Cache>>,
1799 trader_id: TraderId,
1800 ) {
1801 let actor = create_registered_actor(clock, cache, trader_id);
1802 let state = actor.state();
1803 assert_eq!(state, ComponentState::Ready);
1804 }
1805
1806 static CALL_TRACKER: std::sync::LazyLock<Arc<Mutex<HashMap<String, i32>>>> =
1810 std::sync::LazyLock::new(|| Arc::new(Mutex::new(HashMap::new())));
1811
1812 #[derive(Debug)]
1814 struct TestDataActor {
1815 inner: PyDataActor,
1816 }
1817
1818 impl TestDataActor {
1819 fn new() -> Self {
1820 Self {
1821 inner: PyDataActor::new(None),
1822 }
1823 }
1824
1825 fn track_call(&self, handler_name: &str) {
1826 let mut tracker = CALL_TRACKER.lock().unwrap();
1827 *tracker.entry(handler_name.to_string()).or_insert(0) += 1;
1828 }
1829
1830 fn get_call_count(&self, handler_name: &str) -> i32 {
1831 let tracker = CALL_TRACKER.lock().unwrap();
1832 tracker.get(handler_name).copied().unwrap_or(0)
1833 }
1834
1835 fn reset_tracker(&self) {
1836 let mut tracker = CALL_TRACKER.lock().unwrap();
1837 tracker.clear();
1838 }
1839 }
1840
1841 impl Deref for TestDataActor {
1842 type Target = DataActorCore;
1843 fn deref(&self) -> &Self::Target {
1844 &self.inner.core
1845 }
1846 }
1847
1848 impl DerefMut for TestDataActor {
1849 fn deref_mut(&mut self) -> &mut Self::Target {
1850 &mut self.inner.core
1851 }
1852 }
1853
1854 impl DataActor for TestDataActor {
1855 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
1856 self.track_call("on_time_event");
1857 self.inner.on_time_event(event)
1858 }
1859
1860 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
1861 self.track_call("on_data");
1862 self.inner.on_data(data)
1863 }
1864
1865 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
1866 self.track_call("on_signal");
1867 self.inner.on_signal(signal)
1868 }
1869
1870 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
1871 self.track_call("on_instrument");
1872 self.inner.on_instrument(instrument)
1873 }
1874
1875 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
1876 self.track_call("on_quote");
1877 self.inner.on_quote(quote)
1878 }
1879
1880 fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
1881 self.track_call("on_trade");
1882 self.inner.on_trade(trade)
1883 }
1884
1885 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
1886 self.track_call("on_bar");
1887 self.inner.on_bar(bar)
1888 }
1889
1890 fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
1891 self.track_call("on_book");
1892 self.inner.on_book(book)
1893 }
1894
1895 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
1896 self.track_call("on_book_deltas");
1897 self.inner.on_book_deltas(deltas)
1898 }
1899
1900 fn on_mark_price(&mut self, update: &MarkPriceUpdate) -> anyhow::Result<()> {
1901 self.track_call("on_mark_price");
1902 self.inner.on_mark_price(update)
1903 }
1904
1905 fn on_index_price(&mut self, update: &IndexPriceUpdate) -> anyhow::Result<()> {
1906 self.track_call("on_index_price");
1907 self.inner.on_index_price(update)
1908 }
1909
1910 fn on_instrument_status(&mut self, update: &InstrumentStatus) -> anyhow::Result<()> {
1911 self.track_call("on_instrument_status");
1912 self.inner.on_instrument_status(update)
1913 }
1914
1915 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
1916 self.track_call("on_instrument_close");
1917 self.inner.on_instrument_close(update)
1918 }
1919
1920 #[cfg(feature = "defi")]
1921 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
1922 self.track_call("on_block");
1923 self.inner.on_block(block)
1924 }
1925
1926 #[cfg(feature = "defi")]
1927 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
1928 self.track_call("on_pool");
1929 self.inner.on_pool(pool)
1930 }
1931
1932 #[cfg(feature = "defi")]
1933 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
1934 self.track_call("on_pool_swap");
1935 self.inner.on_pool_swap(swap)
1936 }
1937
1938 #[cfg(feature = "defi")]
1939 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
1940 self.track_call("on_pool_liquidity_update");
1941 self.inner.on_pool_liquidity_update(update)
1942 }
1943 }
1944
1945 #[rstest]
1946 fn test_python_on_signal_handler(
1947 clock: Rc<RefCell<TestClock>>,
1948 cache: Rc<RefCell<Cache>>,
1949 trader_id: TraderId,
1950 ) {
1951 pyo3::Python::initialize();
1952 let mut test_actor = TestDataActor::new();
1953 test_actor.reset_tracker();
1954 test_actor
1955 .register(trader_id, clock.clone(), cache.clone())
1956 .unwrap();
1957
1958 let signal = Signal::new(
1959 Ustr::from("test_signal"),
1960 "1.0".to_string(),
1961 UnixNanos::default(),
1962 UnixNanos::default(),
1963 );
1964
1965 assert!(test_actor.on_signal(&signal).is_ok());
1966 assert_eq!(test_actor.get_call_count("on_signal"), 1);
1967 }
1968
1969 #[rstest]
1970 fn test_python_on_data_handler(
1971 clock: Rc<RefCell<TestClock>>,
1972 cache: Rc<RefCell<Cache>>,
1973 trader_id: TraderId,
1974 ) {
1975 pyo3::Python::initialize();
1976 let mut test_actor = TestDataActor::new();
1977 test_actor.reset_tracker();
1978 test_actor
1979 .register(trader_id, clock.clone(), cache.clone())
1980 .unwrap();
1981
1982 assert!(test_actor.on_data(&()).is_ok());
1983 assert_eq!(test_actor.get_call_count("on_data"), 1);
1984 }
1985
1986 #[rstest]
1987 fn test_python_on_time_event_handler(
1988 clock: Rc<RefCell<TestClock>>,
1989 cache: Rc<RefCell<Cache>>,
1990 trader_id: TraderId,
1991 ) {
1992 pyo3::Python::initialize();
1993 let mut test_actor = TestDataActor::new();
1994 test_actor.reset_tracker();
1995 test_actor
1996 .register(trader_id, clock.clone(), cache.clone())
1997 .unwrap();
1998
1999 let time_event = TimeEvent::new(
2000 Ustr::from("test_timer"),
2001 UUID4::new(),
2002 UnixNanos::default(),
2003 UnixNanos::default(),
2004 );
2005
2006 assert!(test_actor.on_time_event(&time_event).is_ok());
2007 assert_eq!(test_actor.get_call_count("on_time_event"), 1);
2008 }
2009
2010 #[rstest]
2011 fn test_python_on_instrument_handler(
2012 clock: Rc<RefCell<TestClock>>,
2013 cache: Rc<RefCell<Cache>>,
2014 trader_id: TraderId,
2015 audusd_sim: CurrencyPair,
2016 ) {
2017 pyo3::Python::initialize();
2018 let mut rust_actor = PyDataActor::new(None);
2019 rust_actor
2020 .register(trader_id, clock.clone(), cache.clone())
2021 .unwrap();
2022
2023 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2024
2025 assert!(rust_actor.on_instrument(&instrument).is_ok());
2026 }
2027
2028 #[rstest]
2029 fn test_python_on_quote_handler(
2030 clock: Rc<RefCell<TestClock>>,
2031 cache: Rc<RefCell<Cache>>,
2032 trader_id: TraderId,
2033 audusd_sim: CurrencyPair,
2034 ) {
2035 pyo3::Python::initialize();
2036 let mut rust_actor = PyDataActor::new(None);
2037 rust_actor
2038 .register(trader_id, clock.clone(), cache.clone())
2039 .unwrap();
2040
2041 let quote = QuoteTick::new(
2042 audusd_sim.id,
2043 Price::from("1.0000"),
2044 Price::from("1.0001"),
2045 Quantity::from("100000"),
2046 Quantity::from("100000"),
2047 UnixNanos::default(),
2048 UnixNanos::default(),
2049 );
2050
2051 assert!(rust_actor.on_quote("e).is_ok());
2052 }
2053
2054 #[rstest]
2055 fn test_python_on_trade_handler(
2056 clock: Rc<RefCell<TestClock>>,
2057 cache: Rc<RefCell<Cache>>,
2058 trader_id: TraderId,
2059 audusd_sim: CurrencyPair,
2060 ) {
2061 pyo3::Python::initialize();
2062 let mut rust_actor = PyDataActor::new(None);
2063 rust_actor
2064 .register(trader_id, clock.clone(), cache.clone())
2065 .unwrap();
2066
2067 let trade = TradeTick::new(
2068 audusd_sim.id,
2069 Price::from("1.0000"),
2070 Quantity::from("100000"),
2071 nautilus_model::enums::AggressorSide::Buyer,
2072 "T123".to_string().into(),
2073 UnixNanos::default(),
2074 UnixNanos::default(),
2075 );
2076
2077 assert!(rust_actor.on_trade(&trade).is_ok());
2078 }
2079
2080 #[rstest]
2081 fn test_python_on_bar_handler(
2082 clock: Rc<RefCell<TestClock>>,
2083 cache: Rc<RefCell<Cache>>,
2084 trader_id: TraderId,
2085 audusd_sim: CurrencyPair,
2086 ) {
2087 pyo3::Python::initialize();
2088 let mut rust_actor = PyDataActor::new(None);
2089 rust_actor
2090 .register(trader_id, clock.clone(), cache.clone())
2091 .unwrap();
2092
2093 let bar_type =
2094 BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap();
2095 let bar = Bar::new(
2096 bar_type,
2097 Price::from("1.0000"),
2098 Price::from("1.0001"),
2099 Price::from("0.9999"),
2100 Price::from("1.0000"),
2101 Quantity::from("100000"),
2102 UnixNanos::default(),
2103 UnixNanos::default(),
2104 );
2105
2106 assert!(rust_actor.on_bar(&bar).is_ok());
2107 }
2108
2109 #[rstest]
2110 fn test_python_on_book_handler(
2111 clock: Rc<RefCell<TestClock>>,
2112 cache: Rc<RefCell<Cache>>,
2113 trader_id: TraderId,
2114 audusd_sim: CurrencyPair,
2115 ) {
2116 pyo3::Python::initialize();
2117 let mut rust_actor = PyDataActor::new(None);
2118 rust_actor
2119 .register(trader_id, clock.clone(), cache.clone())
2120 .unwrap();
2121
2122 let book = OrderBook::new(audusd_sim.id, BookType::L2_MBP);
2123 assert!(rust_actor.on_book(&book).is_ok());
2124 }
2125
2126 #[rstest]
2127 fn test_python_on_book_deltas_handler(
2128 clock: Rc<RefCell<TestClock>>,
2129 cache: Rc<RefCell<Cache>>,
2130 trader_id: TraderId,
2131 audusd_sim: CurrencyPair,
2132 ) {
2133 pyo3::Python::initialize();
2134 let mut rust_actor = PyDataActor::new(None);
2135 rust_actor
2136 .register(trader_id, clock.clone(), cache.clone())
2137 .unwrap();
2138
2139 let delta =
2140 OrderBookDelta::clear(audusd_sim.id, 0, UnixNanos::default(), UnixNanos::default());
2141 let deltas = OrderBookDeltas::new(audusd_sim.id, vec![delta]);
2142
2143 assert!(rust_actor.on_book_deltas(&deltas).is_ok());
2144 }
2145
2146 #[rstest]
2147 fn test_python_on_mark_price_handler(
2148 clock: Rc<RefCell<TestClock>>,
2149 cache: Rc<RefCell<Cache>>,
2150 trader_id: TraderId,
2151 audusd_sim: CurrencyPair,
2152 ) {
2153 pyo3::Python::initialize();
2154 let mut rust_actor = PyDataActor::new(None);
2155 rust_actor
2156 .register(trader_id, clock.clone(), cache.clone())
2157 .unwrap();
2158
2159 let mark_price = MarkPriceUpdate::new(
2160 audusd_sim.id,
2161 Price::from("1.0000"),
2162 UnixNanos::default(),
2163 UnixNanos::default(),
2164 );
2165
2166 assert!(rust_actor.on_mark_price(&mark_price).is_ok());
2167 }
2168
2169 #[rstest]
2170 fn test_python_on_index_price_handler(
2171 clock: Rc<RefCell<TestClock>>,
2172 cache: Rc<RefCell<Cache>>,
2173 trader_id: TraderId,
2174 audusd_sim: CurrencyPair,
2175 ) {
2176 pyo3::Python::initialize();
2177 let mut rust_actor = PyDataActor::new(None);
2178 rust_actor
2179 .register(trader_id, clock.clone(), cache.clone())
2180 .unwrap();
2181
2182 let index_price = IndexPriceUpdate::new(
2183 audusd_sim.id,
2184 Price::from("1.0000"),
2185 UnixNanos::default(),
2186 UnixNanos::default(),
2187 );
2188
2189 assert!(rust_actor.on_index_price(&index_price).is_ok());
2190 }
2191
2192 #[rstest]
2193 fn test_python_on_instrument_status_handler(
2194 clock: Rc<RefCell<TestClock>>,
2195 cache: Rc<RefCell<Cache>>,
2196 trader_id: TraderId,
2197 audusd_sim: CurrencyPair,
2198 ) {
2199 pyo3::Python::initialize();
2200 let mut rust_actor = PyDataActor::new(None);
2201 rust_actor
2202 .register(trader_id, clock.clone(), cache.clone())
2203 .unwrap();
2204
2205 let status = InstrumentStatus::new(
2206 audusd_sim.id,
2207 nautilus_model::enums::MarketStatusAction::Trading,
2208 UnixNanos::default(),
2209 UnixNanos::default(),
2210 None,
2211 None,
2212 None,
2213 None,
2214 None,
2215 );
2216
2217 assert!(rust_actor.on_instrument_status(&status).is_ok());
2218 }
2219
2220 #[rstest]
2221 fn test_python_on_instrument_close_handler(
2222 clock: Rc<RefCell<TestClock>>,
2223 cache: Rc<RefCell<Cache>>,
2224 trader_id: TraderId,
2225 audusd_sim: CurrencyPair,
2226 ) {
2227 pyo3::Python::initialize();
2228 let mut rust_actor = PyDataActor::new(None);
2229 rust_actor
2230 .register(trader_id, clock.clone(), cache.clone())
2231 .unwrap();
2232
2233 let close = InstrumentClose::new(
2234 audusd_sim.id,
2235 Price::from("1.0000"),
2236 nautilus_model::enums::InstrumentCloseType::EndOfSession,
2237 UnixNanos::default(),
2238 UnixNanos::default(),
2239 );
2240
2241 assert!(rust_actor.on_instrument_close(&close).is_ok());
2242 }
2243
2244 #[cfg(feature = "defi")]
2245 #[rstest]
2246 fn test_python_on_block_handler(
2247 clock: Rc<RefCell<TestClock>>,
2248 cache: Rc<RefCell<Cache>>,
2249 trader_id: TraderId,
2250 ) {
2251 pyo3::Python::initialize();
2252 let mut test_actor = TestDataActor::new();
2253 test_actor.reset_tracker();
2254 test_actor
2255 .register(trader_id, clock.clone(), cache.clone())
2256 .unwrap();
2257
2258 let block = Block::new(
2259 "0x1234567890abcdef".to_string(),
2260 "0xabcdef1234567890".to_string(),
2261 12345,
2262 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0".into(),
2263 21000,
2264 20000,
2265 UnixNanos::default(),
2266 Some(Blockchain::Ethereum),
2267 );
2268
2269 assert!(test_actor.on_block(&block).is_ok());
2270 assert_eq!(test_actor.get_call_count("on_block"), 1);
2271 }
2272
2273 #[cfg(feature = "defi")]
2274 #[rstest]
2275 fn test_python_on_pool_swap_handler(
2276 clock: Rc<RefCell<TestClock>>,
2277 cache: Rc<RefCell<Cache>>,
2278 trader_id: TraderId,
2279 ) {
2280 pyo3::Python::initialize();
2281 let mut rust_actor = PyDataActor::new(None);
2282 rust_actor
2283 .register(trader_id, clock.clone(), cache.clone())
2284 .unwrap();
2285
2286 let chain = Arc::new(Chain::new(Blockchain::Ethereum, 1));
2287 let dex = Arc::new(Dex::new(
2288 Chain::new(Blockchain::Ethereum, 1),
2289 DexType::UniswapV3,
2290 "0x1F98431c8aD98523631AE4a59f267346ea31F984",
2291 0,
2292 AmmType::CLAMM,
2293 "PoolCreated",
2294 "Swap",
2295 "Mint",
2296 "Burn",
2297 "Collect",
2298 ));
2299 let token0 = Token::new(
2300 chain.clone(),
2301 "0xa0b86a33e6441c8c06dd7b111a8c4e82e2b2a5e1"
2302 .parse()
2303 .unwrap(),
2304 "USDC".into(),
2305 "USD Coin".into(),
2306 6,
2307 );
2308 let token1 = Token::new(
2309 chain.clone(),
2310 "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
2311 .parse()
2312 .unwrap(),
2313 "WETH".into(),
2314 "Wrapped Ether".into(),
2315 18,
2316 );
2317 let pool = Arc::new(Pool::new(
2318 chain.clone(),
2319 dex.clone(),
2320 "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
2321 .parse()
2322 .unwrap(),
2323 12345,
2324 token0,
2325 token1,
2326 Some(500),
2327 Some(10),
2328 UnixNanos::default(),
2329 ));
2330
2331 let swap = PoolSwap::new(
2332 chain.clone(),
2333 dex.clone(),
2334 pool.instrument_id,
2335 pool.address,
2336 12345,
2337 "0xabc123".to_string(),
2338 0,
2339 0,
2340 None,
2341 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
2342 .parse()
2343 .unwrap(),
2344 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
2345 .parse()
2346 .unwrap(),
2347 I256::from_str("1000000000000000000").unwrap(),
2348 I256::from_str("400000000000000").unwrap(),
2349 U160::from(59000000000000u128),
2350 1000000,
2351 100,
2352 Some(nautilus_model::enums::OrderSide::Buy),
2353 Some(Quantity::from("1000")),
2354 Some(Price::from("1.0")),
2355 );
2356
2357 assert!(rust_actor.on_pool_swap(&swap).is_ok());
2358 }
2359
2360 #[cfg(feature = "defi")]
2361 #[rstest]
2362 fn test_python_on_pool_liquidity_update_handler(
2363 clock: Rc<RefCell<TestClock>>,
2364 cache: Rc<RefCell<Cache>>,
2365 trader_id: TraderId,
2366 ) {
2367 pyo3::Python::initialize();
2368 let mut rust_actor = PyDataActor::new(None);
2369 rust_actor
2370 .register(trader_id, clock.clone(), cache.clone())
2371 .unwrap();
2372
2373 let block = Block::new(
2374 "0x1234567890abcdef".to_string(),
2375 "0xabcdef1234567890".to_string(),
2376 12345,
2377 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0".into(),
2378 21000,
2379 20000,
2380 UnixNanos::default(),
2381 Some(Blockchain::Ethereum),
2382 );
2383
2384 assert!(rust_actor.on_block(&block).is_ok());
2388 }
2389}