1use std::{
19 any::Any,
20 num::NonZeroUsize,
21 ops::{Deref, DerefMut},
22};
23
24use indexmap::IndexMap;
25use nautilus_core::{
26 nanos::UnixNanos,
27 python::{to_pyruntime_err, to_pyvalue_err},
28};
29#[cfg(feature = "defi")]
30use nautilus_model::defi::{Block, Pool, PoolLiquidityUpdate, PoolSwap};
31use nautilus_model::{
32 data::{
33 Bar, BarType, DataType, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
34 OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
35 },
36 enums::BookType,
37 identifiers::{ActorId, ClientId, InstrumentId, TraderId, Venue},
38 instruments::InstrumentAny,
39 orderbook::OrderBook,
40 python::instruments::instrument_any_to_pyobject,
41};
42use pyo3::{exceptions::PyValueError, prelude::*};
43
44use crate::{
45 actor::{
46 DataActor,
47 data_actor::{DataActorConfig, DataActorCore},
48 },
49 component::Component,
50 enums::ComponentState,
51 signal::Signal,
52 timer::TimeEvent,
53};
54
55#[allow(non_camel_case_types)]
56#[pyo3::pyclass(
57 module = "nautilus_trader.core.nautilus_pyo3.common",
58 name = "DataActor",
59 unsendable,
60 subclass
61)]
62#[derive(Debug)]
63pub struct PyDataActor {
64 core: DataActorCore,
65}
66
67impl Deref for PyDataActor {
68 type Target = DataActorCore;
69
70 fn deref(&self) -> &Self::Target {
71 &self.core
72 }
73}
74
75impl DerefMut for PyDataActor {
76 fn deref_mut(&mut self) -> &mut Self::Target {
77 &mut self.core
78 }
79}
80
81impl DataActor for PyDataActor {
82 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
83 self.py_on_time_event(event.clone())
84 .map_err(|e| anyhow::anyhow!("Python on_time_event failed: {e}"))
85 }
86
87 #[allow(unused_variables)]
88 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
89 Python::with_gil(|py| {
90 let py_data = py.None();
93
94 self.py_on_data(py_data)
95 .map_err(|e| anyhow::anyhow!("Python on_data failed: {e}"))
96 })
97 }
98
99 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
100 self.py_on_signal(signal)
101 .map_err(|e| anyhow::anyhow!("Python on_signal failed: {e}"))
102 }
103
104 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
105 Python::with_gil(|py| {
106 let py_instrument = instrument_any_to_pyobject(py, instrument.clone())
107 .map_err(|e| anyhow::anyhow!("Failed to convert InstrumentAny to Python: {e}"))?;
108 self.py_on_instrument(py_instrument)
109 .map_err(|e| anyhow::anyhow!("Python on_instrument failed: {e}"))
110 })
111 }
112
113 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
114 self.py_on_quote(*quote)
115 .map_err(|e| anyhow::anyhow!("Python on_quote failed: {e}"))
116 }
117
118 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
119 self.py_on_trade(*tick)
120 .map_err(|e| anyhow::anyhow!("Python on_trade failed: {e}"))
121 }
122
123 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
124 self.py_on_bar(*bar)
125 .map_err(|e| anyhow::anyhow!("Python on_bar failed: {e}"))
126 }
127
128 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
129 self.py_on_book_deltas(deltas.clone())
130 .map_err(|e| anyhow::anyhow!("Python on_book_deltas failed: {e}"))
131 }
132
133 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
134 self.py_on_book(order_book)
135 .map_err(|e| anyhow::anyhow!("Python on_book failed: {e}"))
136 }
137
138 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
139 self.py_on_mark_price(*mark_price)
140 .map_err(|e| anyhow::anyhow!("Python on_mark_price failed: {e}"))
141 }
142
143 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
144 self.py_on_index_price(*index_price)
145 .map_err(|e| anyhow::anyhow!("Python on_index_price failed: {e}"))
146 }
147
148 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
149 self.py_on_instrument_status(*data)
150 .map_err(|e| anyhow::anyhow!("Python on_instrument_status failed: {e}"))
151 }
152
153 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
154 self.py_on_instrument_close(*update)
155 .map_err(|e| anyhow::anyhow!("Python on_instrument_close failed: {e}"))
156 }
157
158 #[cfg(feature = "defi")]
159 fn on_block(&mut self, _block: &Block) -> anyhow::Result<()> {
160 self.py_on_block()
162 .map_err(|e| anyhow::anyhow!("Python on_block failed: {e}"))
163 }
164
165 #[cfg(feature = "defi")]
166 fn on_pool(&mut self, _pool: &Pool) -> anyhow::Result<()> {
167 self.py_on_pool()
169 .map_err(|e| anyhow::anyhow!("Python on_pool failed: {e}"))
170 }
171
172 #[cfg(feature = "defi")]
173 fn on_pool_swap(&mut self, _swap: &PoolSwap) -> anyhow::Result<()> {
174 self.py_on_pool_swap()
176 .map_err(|e| anyhow::anyhow!("Python on_pool_swap failed: {e}"))
177 }
178
179 #[cfg(feature = "defi")]
180 fn on_pool_liquidity_update(&mut self, _update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
181 self.py_on_pool_liquidity_update()
183 .map_err(|e| anyhow::anyhow!("Python on_pool_liquidity_update failed: {e}"))
184 }
185}
186
187#[pymethods]
188impl PyDataActor {
189 #[new]
190 #[pyo3(signature = (_config=None))]
191 fn py_new(_config: Option<PyObject>) -> PyResult<Self> {
192 let config = DataActorConfig::default();
194
195 Ok(Self {
196 core: DataActorCore::new(config),
197 })
198 }
199
200 #[getter]
201 #[pyo3(name = "actor_id")]
202 fn py_actor_id(&self) -> ActorId {
203 self.actor_id
204 }
205
206 #[getter]
207 #[pyo3(name = "trader_id")]
208 fn py_trader_id(&self) -> Option<TraderId> {
209 self.trader_id()
210 }
211
212 #[pyo3(name = "state")]
213 fn py_state(&self) -> ComponentState {
214 self.state()
215 }
216
217 #[pyo3(name = "is_ready")]
218 fn py_is_ready(&self) -> bool {
219 self.is_ready()
220 }
221
222 #[pyo3(name = "is_running")]
223 fn py_is_running(&self) -> bool {
224 self.is_running()
225 }
226
227 #[pyo3(name = "is_stopped")]
228 fn py_is_stopped(&self) -> bool {
229 self.is_stopped()
230 }
231
232 #[pyo3(name = "is_degraded")]
233 fn py_is_degraded(&self) -> bool {
234 self.is_degraded()
235 }
236
237 #[pyo3(name = "is_faulted")]
238 fn py_is_faulted(&self) -> bool {
239 self.is_faulted()
240 }
241
242 #[pyo3(name = "is_disposed")]
243 fn py_is_disposed(&self) -> bool {
244 self.is_disposed()
245 }
246
247 #[pyo3(name = "start")]
248 fn py_start(&mut self) -> PyResult<()> {
249 self.start().map_err(to_pyruntime_err)
250 }
251
252 #[pyo3(name = "stop")]
253 fn py_stop(&mut self) -> PyResult<()> {
254 self.stop().map_err(to_pyruntime_err)
255 }
256
257 #[pyo3(name = "resume")]
258 fn py_resume(&mut self) -> PyResult<()> {
259 self.resume().map_err(to_pyruntime_err)
260 }
261
262 #[pyo3(name = "reset")]
263 fn py_reset(&mut self) -> PyResult<()> {
264 self.reset().map_err(to_pyruntime_err)
265 }
266
267 #[pyo3(name = "dispose")]
268 fn py_dispose(&mut self) -> PyResult<()> {
269 self.dispose().map_err(to_pyruntime_err)
270 }
271
272 #[pyo3(name = "degrade")]
273 fn py_degrade(&mut self) -> PyResult<()> {
274 self.degrade().map_err(to_pyruntime_err)
275 }
276
277 #[pyo3(name = "fault")]
278 fn py_fault(&mut self) -> PyResult<()> {
279 self.fault().map_err(to_pyruntime_err)
280 }
281
282 #[pyo3(name = "shutdown_system")]
283 #[pyo3(signature = (reason=None))]
284 fn py_shutdown_system(&self, reason: Option<String>) -> PyResult<()> {
285 self.core.shutdown_system(reason);
286 Ok(())
287 }
288
289 #[allow(unused_variables)]
290 #[pyo3(name = "on_time_event")]
291 fn py_on_time_event(&mut self, event: TimeEvent) -> PyResult<()> {
292 Ok(())
294 }
295
296 #[allow(unused_variables)]
297 #[pyo3(name = "on_data")]
298 fn py_on_data(&mut self, data: PyObject) -> PyResult<()> {
299 Ok(())
301 }
302
303 #[allow(unused_variables)]
304 #[pyo3(name = "on_signal")]
305 fn py_on_signal(&mut self, signal: &Signal) -> PyResult<()> {
306 Ok(())
308 }
309
310 #[allow(unused_variables)]
311 #[pyo3(name = "on_instrument")]
312 fn py_on_instrument(&mut self, instrument: PyObject) -> PyResult<()> {
313 Ok(())
315 }
316
317 #[allow(unused_variables)]
318 #[pyo3(name = "on_quote")]
319 fn py_on_quote(&mut self, quote: QuoteTick) -> PyResult<()> {
320 Ok(())
322 }
323
324 #[allow(unused_variables)]
325 #[pyo3(name = "on_trade")]
326 fn py_on_trade(&mut self, trade: TradeTick) -> PyResult<()> {
327 Ok(())
329 }
330
331 #[allow(unused_variables)]
332 #[pyo3(name = "on_bar")]
333 fn py_on_bar(&mut self, bar: Bar) -> PyResult<()> {
334 Ok(())
336 }
337
338 #[allow(unused_variables)]
339 #[pyo3(name = "on_book_deltas")]
340 fn py_on_book_deltas(&mut self, deltas: OrderBookDeltas) -> PyResult<()> {
341 Ok(())
343 }
344
345 #[allow(unused_variables)]
346 #[pyo3(name = "on_book")]
347 fn py_on_book(&mut self, order_book: &OrderBook) -> PyResult<()> {
348 Ok(())
350 }
351
352 #[allow(unused_variables)]
353 #[pyo3(name = "on_mark_price")]
354 fn py_on_mark_price(&mut self, mark_price: MarkPriceUpdate) -> PyResult<()> {
355 Ok(())
357 }
358
359 #[allow(unused_variables)]
360 #[pyo3(name = "on_index_price")]
361 fn py_on_index_price(&mut self, index_price: IndexPriceUpdate) -> PyResult<()> {
362 Ok(())
364 }
365
366 #[allow(unused_variables)]
367 #[pyo3(name = "on_instrument_status")]
368 fn py_on_instrument_status(&mut self, status: InstrumentStatus) -> PyResult<()> {
369 Ok(())
371 }
372
373 #[allow(unused_variables)]
374 #[pyo3(name = "on_instrument_close")]
375 fn py_on_instrument_close(&mut self, close: InstrumentClose) -> PyResult<()> {
376 Ok(())
378 }
379
380 #[cfg(feature = "defi")]
381 #[allow(unused_variables)]
382 #[pyo3(name = "on_block")]
383 fn py_on_block(&mut self) -> PyResult<()> {
384 Ok(())
387 }
388
389 #[cfg(feature = "defi")]
390 #[allow(unused_variables)]
391 #[pyo3(name = "on_pool")]
392 fn py_on_pool(&mut self) -> PyResult<()> {
393 Ok(())
396 }
397
398 #[cfg(feature = "defi")]
399 #[allow(unused_variables)]
400 #[pyo3(name = "on_pool_swap")]
401 fn py_on_pool_swap(&mut self) -> PyResult<()> {
402 Ok(())
405 }
406
407 #[cfg(feature = "defi")]
408 #[allow(unused_variables)]
409 #[pyo3(name = "on_pool_liquidity_update")]
410 fn py_on_pool_liquidity_update(&mut self) -> PyResult<()> {
411 Ok(())
414 }
415
416 #[pyo3(name = "subscribe_data")]
417 #[pyo3(signature = (data_type, client_id=None, params=None))]
418 fn py_subscribe_data(
419 &mut self,
420 data_type: DataType,
421 client_id: Option<ClientId>,
422 params: Option<IndexMap<String, String>>,
423 ) -> PyResult<()> {
424 self.subscribe_data(data_type, client_id, params);
425 Ok(())
426 }
427
428 #[pyo3(name = "subscribe_instruments")]
429 #[pyo3(signature = (venue, client_id=None, params=None))]
430 fn py_subscribe_instruments(
431 &mut self,
432 venue: Venue,
433 client_id: Option<ClientId>,
434 params: Option<IndexMap<String, String>>,
435 ) -> PyResult<()> {
436 self.subscribe_instruments(venue, client_id, params);
437 Ok(())
438 }
439
440 #[pyo3(name = "subscribe_instrument")]
441 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
442 fn py_subscribe_instrument(
443 &mut self,
444 instrument_id: InstrumentId,
445 client_id: Option<ClientId>,
446 params: Option<IndexMap<String, String>>,
447 ) -> PyResult<()> {
448 self.subscribe_instrument(instrument_id, client_id, params);
449 Ok(())
450 }
451
452 #[pyo3(name = "subscribe_book_deltas")]
453 #[pyo3(signature = (instrument_id, book_type, depth=None, client_id=None, managed=false, params=None))]
454 fn py_subscribe_book_deltas(
455 &mut self,
456 instrument_id: InstrumentId,
457 book_type: BookType,
458 depth: Option<usize>,
459 client_id: Option<ClientId>,
460 managed: bool,
461 params: Option<IndexMap<String, String>>,
462 ) -> PyResult<()> {
463 let depth = depth.and_then(NonZeroUsize::new);
464 self.subscribe_book_deltas(instrument_id, book_type, depth, client_id, managed, params);
465 Ok(())
466 }
467
468 #[pyo3(name = "subscribe_book_at_interval")]
469 #[pyo3(signature = (instrument_id, book_type, interval_ms, depth=None, client_id=None, params=None))]
470 fn py_subscribe_book_at_interval(
471 &mut self,
472 instrument_id: InstrumentId,
473 book_type: BookType,
474 interval_ms: usize,
475 depth: Option<usize>,
476 client_id: Option<ClientId>,
477 params: Option<IndexMap<String, String>>,
478 ) -> PyResult<()> {
479 let depth = depth.and_then(NonZeroUsize::new);
480 let interval_ms = NonZeroUsize::new(interval_ms)
481 .ok_or_else(|| PyErr::new::<PyValueError, _>("interval_ms must be > 0"))?;
482
483 self.subscribe_book_at_interval(
484 instrument_id,
485 book_type,
486 depth,
487 interval_ms,
488 client_id,
489 params,
490 );
491 Ok(())
492 }
493
494 #[pyo3(name = "subscribe_quotes")]
495 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
496 fn py_subscribe_quotes(
497 &mut self,
498 instrument_id: InstrumentId,
499 client_id: Option<ClientId>,
500 params: Option<IndexMap<String, String>>,
501 ) -> PyResult<()> {
502 self.subscribe_quotes(instrument_id, client_id, params);
503 Ok(())
504 }
505
506 #[pyo3(name = "subscribe_trades")]
507 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
508 fn py_subscribe_trades(
509 &mut self,
510 instrument_id: InstrumentId,
511 client_id: Option<ClientId>,
512 params: Option<IndexMap<String, String>>,
513 ) -> PyResult<()> {
514 self.subscribe_trades(instrument_id, client_id, params);
515 Ok(())
516 }
517
518 #[pyo3(name = "subscribe_bars")]
519 #[pyo3(signature = (bar_type, client_id=None, await_partial=false, params=None))]
520 fn py_subscribe_bars(
521 &mut self,
522 bar_type: BarType,
523 client_id: Option<ClientId>,
524 await_partial: bool,
525 params: Option<IndexMap<String, String>>,
526 ) -> PyResult<()> {
527 self.subscribe_bars(bar_type, client_id, await_partial, params);
528 Ok(())
529 }
530
531 #[pyo3(name = "subscribe_mark_prices")]
532 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
533 fn py_subscribe_mark_prices(
534 &mut self,
535 instrument_id: InstrumentId,
536 client_id: Option<ClientId>,
537 params: Option<IndexMap<String, String>>,
538 ) -> PyResult<()> {
539 self.subscribe_mark_prices(instrument_id, client_id, params);
540 Ok(())
541 }
542
543 #[pyo3(name = "subscribe_index_prices")]
544 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
545 fn py_subscribe_index_prices(
546 &mut self,
547 instrument_id: InstrumentId,
548 client_id: Option<ClientId>,
549 params: Option<IndexMap<String, String>>,
550 ) -> PyResult<()> {
551 self.subscribe_index_prices(instrument_id, client_id, params);
552 Ok(())
553 }
554
555 #[pyo3(name = "subscribe_instrument_status")]
556 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
557 fn py_subscribe_instrument_status(
558 &mut self,
559 instrument_id: InstrumentId,
560 client_id: Option<ClientId>,
561 params: Option<IndexMap<String, String>>,
562 ) -> PyResult<()> {
563 self.subscribe_instrument_status(instrument_id, client_id, params);
564 Ok(())
565 }
566
567 #[pyo3(name = "subscribe_instrument_close")]
568 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
569 fn py_subscribe_instrument_close(
570 &mut self,
571 instrument_id: InstrumentId,
572 client_id: Option<ClientId>,
573 params: Option<IndexMap<String, String>>,
574 ) -> PyResult<()> {
575 self.subscribe_instrument_close(instrument_id, client_id, params);
576 Ok(())
577 }
578
579 #[pyo3(name = "request_data")]
581 #[pyo3(signature = (data_type, client_id, start=None, end=None, limit=None, params=None))]
582 fn py_request_data(
583 &mut self,
584 data_type: DataType,
585 client_id: ClientId,
586 start: Option<u64>,
587 end: Option<u64>,
588 limit: Option<usize>,
589 params: Option<IndexMap<String, String>>,
590 ) -> PyResult<String> {
591 let limit = limit.and_then(NonZeroUsize::new);
592 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
593 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
594
595 let request_id = self
596 .request_data(data_type, client_id, start, end, limit, params)
597 .map_err(to_pyvalue_err)?;
598 Ok(request_id.to_string())
599 }
600
601 #[pyo3(name = "request_instrument")]
602 #[pyo3(signature = (instrument_id, start=None, end=None, client_id=None, params=None))]
603 fn py_request_instrument(
604 &mut self,
605 instrument_id: InstrumentId,
606 start: Option<u64>,
607 end: Option<u64>,
608 client_id: Option<ClientId>,
609 params: Option<IndexMap<String, String>>,
610 ) -> PyResult<String> {
611 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
612 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
613
614 let request_id = self
615 .request_instrument(instrument_id, start, end, client_id, params)
616 .map_err(to_pyvalue_err)?;
617 Ok(request_id.to_string())
618 }
619
620 #[pyo3(name = "request_instruments")]
621 #[pyo3(signature = (venue=None, start=None, end=None, client_id=None, params=None))]
622 fn py_request_instruments(
623 &mut self,
624 venue: Option<Venue>,
625 start: Option<u64>,
626 end: Option<u64>,
627 client_id: Option<ClientId>,
628 params: Option<IndexMap<String, String>>,
629 ) -> PyResult<String> {
630 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
631 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
632
633 let request_id = self
634 .request_instruments(venue, start, end, client_id, params)
635 .map_err(to_pyvalue_err)?;
636 Ok(request_id.to_string())
637 }
638
639 #[pyo3(name = "request_book_snapshot")]
640 #[pyo3(signature = (instrument_id, depth=None, client_id=None, params=None))]
641 fn py_request_book_snapshot(
642 &mut self,
643 instrument_id: InstrumentId,
644 depth: Option<usize>,
645 client_id: Option<ClientId>,
646 params: Option<IndexMap<String, String>>,
647 ) -> PyResult<String> {
648 let depth = depth.and_then(NonZeroUsize::new);
649
650 let request_id = self
651 .request_book_snapshot(instrument_id, depth, client_id, params)
652 .map_err(to_pyvalue_err)?;
653 Ok(request_id.to_string())
654 }
655
656 #[pyo3(name = "request_quotes")]
657 #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
658 fn py_request_quotes(
659 &mut self,
660 instrument_id: InstrumentId,
661 start: Option<u64>,
662 end: Option<u64>,
663 limit: Option<usize>,
664 client_id: Option<ClientId>,
665 params: Option<IndexMap<String, String>>,
666 ) -> PyResult<String> {
667 let limit = limit.and_then(NonZeroUsize::new);
668 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
669 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
670
671 let request_id = self
672 .request_quotes(instrument_id, start, end, limit, client_id, params)
673 .map_err(to_pyvalue_err)?;
674 Ok(request_id.to_string())
675 }
676
677 #[pyo3(name = "request_trades")]
678 #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
679 fn py_request_trades(
680 &mut self,
681 instrument_id: InstrumentId,
682 start: Option<u64>,
683 end: Option<u64>,
684 limit: Option<usize>,
685 client_id: Option<ClientId>,
686 params: Option<IndexMap<String, String>>,
687 ) -> PyResult<String> {
688 let limit = limit.and_then(NonZeroUsize::new);
689 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
690 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
691
692 let request_id = self
693 .request_trades(instrument_id, start, end, limit, client_id, params)
694 .map_err(to_pyvalue_err)?;
695 Ok(request_id.to_string())
696 }
697
698 #[pyo3(name = "request_bars")]
699 #[pyo3(signature = (bar_type, start=None, end=None, limit=None, client_id=None, params=None))]
700 fn py_request_bars(
701 &mut self,
702 bar_type: BarType,
703 start: Option<u64>,
704 end: Option<u64>,
705 limit: Option<usize>,
706 client_id: Option<ClientId>,
707 params: Option<IndexMap<String, String>>,
708 ) -> PyResult<String> {
709 let limit = limit.and_then(NonZeroUsize::new);
710 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
711 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
712
713 let request_id = self
714 .request_bars(bar_type, start, end, limit, client_id, params)
715 .map_err(to_pyvalue_err)?;
716 Ok(request_id.to_string())
717 }
718
719 #[pyo3(name = "unsubscribe_data")]
721 #[pyo3(signature = (data_type, client_id=None, params=None))]
722 fn py_unsubscribe_data(
723 &mut self,
724 data_type: DataType,
725 client_id: Option<ClientId>,
726 params: Option<IndexMap<String, String>>,
727 ) -> PyResult<()> {
728 self.unsubscribe_data(data_type, client_id, params);
729 Ok(())
730 }
731
732 #[pyo3(name = "unsubscribe_instruments")]
733 #[pyo3(signature = (venue, client_id=None, params=None))]
734 fn py_unsubscribe_instruments(
735 &mut self,
736 venue: Venue,
737 client_id: Option<ClientId>,
738 params: Option<IndexMap<String, String>>,
739 ) -> PyResult<()> {
740 self.unsubscribe_instruments(venue, client_id, params);
741 Ok(())
742 }
743
744 #[pyo3(name = "unsubscribe_instrument")]
745 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
746 fn py_unsubscribe_instrument(
747 &mut self,
748 instrument_id: InstrumentId,
749 client_id: Option<ClientId>,
750 params: Option<IndexMap<String, String>>,
751 ) -> PyResult<()> {
752 self.unsubscribe_instrument(instrument_id, client_id, params);
753 Ok(())
754 }
755
756 #[pyo3(name = "unsubscribe_book_deltas")]
757 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
758 fn py_unsubscribe_book_deltas(
759 &mut self,
760 instrument_id: InstrumentId,
761 client_id: Option<ClientId>,
762 params: Option<IndexMap<String, String>>,
763 ) -> PyResult<()> {
764 self.unsubscribe_book_deltas(instrument_id, client_id, params);
765 Ok(())
766 }
767
768 #[pyo3(name = "unsubscribe_book_at_interval")]
769 #[pyo3(signature = (instrument_id, interval_ms, client_id=None, params=None))]
770 fn py_unsubscribe_book_at_interval(
771 &mut self,
772 instrument_id: InstrumentId,
773 interval_ms: usize,
774 client_id: Option<ClientId>,
775 params: Option<IndexMap<String, String>>,
776 ) -> PyResult<()> {
777 let interval_ms = NonZeroUsize::new(interval_ms)
778 .ok_or_else(|| PyErr::new::<PyValueError, _>("interval_ms must be > 0"))?;
779
780 self.unsubscribe_book_at_interval(instrument_id, interval_ms, client_id, params);
781 Ok(())
782 }
783
784 #[pyo3(name = "unsubscribe_quotes")]
785 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
786 fn py_unsubscribe_quotes(
787 &mut self,
788 instrument_id: InstrumentId,
789 client_id: Option<ClientId>,
790 params: Option<IndexMap<String, String>>,
791 ) -> PyResult<()> {
792 self.unsubscribe_quotes(instrument_id, client_id, params);
793 Ok(())
794 }
795
796 #[pyo3(name = "unsubscribe_trades")]
797 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
798 fn py_unsubscribe_trades(
799 &mut self,
800 instrument_id: InstrumentId,
801 client_id: Option<ClientId>,
802 params: Option<IndexMap<String, String>>,
803 ) -> PyResult<()> {
804 self.unsubscribe_trades(instrument_id, client_id, params);
805 Ok(())
806 }
807
808 #[pyo3(name = "unsubscribe_bars")]
809 #[pyo3(signature = (bar_type, client_id=None, params=None))]
810 fn py_unsubscribe_bars(
811 &mut self,
812 bar_type: BarType,
813 client_id: Option<ClientId>,
814 params: Option<IndexMap<String, String>>,
815 ) -> PyResult<()> {
816 self.unsubscribe_bars(bar_type, client_id, params);
817 Ok(())
818 }
819
820 #[pyo3(name = "unsubscribe_mark_prices")]
821 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
822 fn py_unsubscribe_mark_prices(
823 &mut self,
824 instrument_id: InstrumentId,
825 client_id: Option<ClientId>,
826 params: Option<IndexMap<String, String>>,
827 ) -> PyResult<()> {
828 self.unsubscribe_mark_prices(instrument_id, client_id, params);
829 Ok(())
830 }
831
832 #[pyo3(name = "unsubscribe_index_prices")]
833 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
834 fn py_unsubscribe_index_prices(
835 &mut self,
836 instrument_id: InstrumentId,
837 client_id: Option<ClientId>,
838 params: Option<IndexMap<String, String>>,
839 ) -> PyResult<()> {
840 self.unsubscribe_index_prices(instrument_id, client_id, params);
841 Ok(())
842 }
843
844 #[pyo3(name = "unsubscribe_instrument_status")]
845 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
846 fn py_unsubscribe_instrument_status(
847 &mut self,
848 instrument_id: InstrumentId,
849 client_id: Option<ClientId>,
850 params: Option<IndexMap<String, String>>,
851 ) -> PyResult<()> {
852 self.unsubscribe_instrument_status(instrument_id, client_id, params);
853 Ok(())
854 }
855
856 #[pyo3(name = "unsubscribe_instrument_close")]
857 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
858 fn py_unsubscribe_instrument_close(
859 &mut self,
860 instrument_id: InstrumentId,
861 client_id: Option<ClientId>,
862 params: Option<IndexMap<String, String>>,
863 ) -> PyResult<()> {
864 self.unsubscribe_instrument_close(instrument_id, client_id, params);
865 Ok(())
866 }
867}
868
869#[cfg(test)]
873mod tests {
874 use std::{
875 any::Any,
876 cell::RefCell,
877 collections::HashMap,
878 ops::{Deref, DerefMut},
879 rc::Rc,
880 str::FromStr,
881 sync::{Arc, Mutex},
882 };
883
884 use nautilus_core::{UUID4, UnixNanos};
885 #[cfg(feature = "defi")]
886 use nautilus_model::defi::{
887 AmmType, Block, Blockchain, Chain, Dex, Pool, PoolLiquidityUpdate, PoolSwap, Token,
888 };
889 use nautilus_model::{
890 data::{
891 Bar, BarType, DataType, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
892 OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
893 },
894 enums::BookType,
895 identifiers::{ClientId, TraderId, Venue},
896 instruments::{CurrencyPair, InstrumentAny, stubs::audusd_sim},
897 orderbook::OrderBook,
898 types::{Price, Quantity},
899 };
900 use rstest::{fixture, rstest};
901 use ustr::Ustr;
902
903 use super::PyDataActor;
904 use crate::{
905 actor::{DataActor, data_actor::DataActorCore},
906 cache::Cache,
907 clock::TestClock,
908 component::Component,
909 enums::ComponentState,
910 runner::{SyncDataCommandSender, set_data_cmd_sender},
911 signal::Signal,
912 timer::TimeEvent,
913 };
914
915 #[fixture]
916 fn clock() -> Rc<RefCell<TestClock>> {
917 Rc::new(RefCell::new(TestClock::new()))
918 }
919
920 #[fixture]
921 fn cache() -> Rc<RefCell<Cache>> {
922 Rc::new(RefCell::new(Cache::new(None, None)))
923 }
924
925 #[fixture]
926 fn trader_id() -> TraderId {
927 TraderId::from("TRADER-001")
928 }
929
930 #[fixture]
931 fn client_id() -> ClientId {
932 ClientId::new("TestClient")
933 }
934
935 #[fixture]
936 fn venue() -> Venue {
937 Venue::from("SIM")
938 }
939
940 #[fixture]
941 fn data_type() -> DataType {
942 DataType::new("TestData", None)
943 }
944
945 #[fixture]
946 fn bar_type(audusd_sim: CurrencyPair) -> BarType {
947 BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap()
948 }
949
950 fn create_unregistered_actor() -> PyDataActor {
951 PyDataActor::py_new(None).unwrap()
952 }
953
954 fn create_registered_actor(
955 clock: Rc<RefCell<TestClock>>,
956 cache: Rc<RefCell<Cache>>,
957 trader_id: TraderId,
958 ) -> PyDataActor {
959 let sender = SyncDataCommandSender;
961 set_data_cmd_sender(Arc::new(sender));
962
963 let mut actor = PyDataActor::py_new(None).unwrap();
964 actor.register(trader_id, clock, cache).unwrap();
965 actor
966 }
967
968 #[rstest]
969 fn test_new_actor_creation() {
970 let actor = PyDataActor::py_new(None).unwrap();
971 assert!(actor.trader_id().is_none());
972 }
973
974 #[rstest]
975 fn test_unregistered_actor_methods_work() {
976 let actor = create_unregistered_actor();
977
978 assert!(!actor.py_is_ready());
979 assert!(!actor.py_is_running());
980 assert!(!actor.py_is_stopped());
981 assert!(!actor.py_is_disposed());
982 assert!(!actor.py_is_degraded());
983 assert!(!actor.py_is_faulted());
984
985 assert_eq!(actor.trader_id(), None);
987 }
988
989 #[rstest]
990 fn test_registration_success(
991 clock: Rc<RefCell<TestClock>>,
992 cache: Rc<RefCell<Cache>>,
993 trader_id: TraderId,
994 ) {
995 let mut actor = create_unregistered_actor();
996 actor.register(trader_id, clock, cache).unwrap();
997 assert!(actor.trader_id().is_some());
998 assert_eq!(actor.trader_id().unwrap(), trader_id);
999 }
1000
1001 #[rstest]
1002 fn test_registered_actor_basic_properties(
1003 clock: Rc<RefCell<TestClock>>,
1004 cache: Rc<RefCell<Cache>>,
1005 trader_id: TraderId,
1006 ) {
1007 let actor = create_registered_actor(clock, cache, trader_id);
1008
1009 assert_eq!(actor.state(), ComponentState::Ready);
1010 assert_eq!(actor.trader_id(), Some(TraderId::from("TRADER-001")));
1011 assert!(actor.py_is_ready());
1012 assert!(!actor.py_is_running());
1013 assert!(!actor.py_is_stopped());
1014 assert!(!actor.py_is_disposed());
1015 assert!(!actor.py_is_degraded());
1016 assert!(!actor.py_is_faulted());
1017 }
1018
1019 #[rstest]
1020 fn test_basic_subscription_methods_compile(
1021 clock: Rc<RefCell<TestClock>>,
1022 cache: Rc<RefCell<Cache>>,
1023 trader_id: TraderId,
1024 data_type: DataType,
1025 client_id: ClientId,
1026 audusd_sim: CurrencyPair,
1027 ) {
1028 let mut actor = create_registered_actor(clock, cache, trader_id);
1029
1030 let _ = actor.py_subscribe_data(data_type.clone(), Some(client_id), None);
1031 let _ = actor.py_subscribe_quotes(audusd_sim.id, Some(client_id), None);
1032 let _ = actor.py_unsubscribe_data(data_type, Some(client_id), None);
1033 let _ = actor.py_unsubscribe_quotes(audusd_sim.id, Some(client_id), None);
1034 }
1035
1036 #[rstest]
1037 fn test_lifecycle_methods_pass_through(
1038 clock: Rc<RefCell<TestClock>>,
1039 cache: Rc<RefCell<Cache>>,
1040 trader_id: TraderId,
1041 ) {
1042 let mut actor = create_registered_actor(clock, cache, trader_id);
1043
1044 assert!(actor.py_start().is_ok());
1045 assert!(actor.py_stop().is_ok());
1046 assert!(actor.py_dispose().is_ok());
1047 }
1048
1049 #[rstest]
1050 fn test_shutdown_system_passes_through(
1051 clock: Rc<RefCell<TestClock>>,
1052 cache: Rc<RefCell<Cache>>,
1053 trader_id: TraderId,
1054 ) {
1055 let actor = create_registered_actor(clock, cache, trader_id);
1056
1057 assert!(
1058 actor
1059 .py_shutdown_system(Some("Test shutdown".to_string()))
1060 .is_ok()
1061 );
1062 assert!(actor.py_shutdown_system(None).is_ok());
1063 }
1064
1065 #[rstest]
1066 fn test_book_at_interval_invalid_interval_ms(
1067 clock: Rc<RefCell<TestClock>>,
1068 cache: Rc<RefCell<Cache>>,
1069 trader_id: TraderId,
1070 audusd_sim: CurrencyPair,
1071 ) {
1072 pyo3::prepare_freethreaded_python();
1073
1074 let mut actor = create_registered_actor(clock, cache, trader_id);
1075
1076 let result = actor.py_subscribe_book_at_interval(
1077 audusd_sim.id,
1078 BookType::L2_MBP,
1079 0,
1080 None,
1081 None,
1082 None,
1083 );
1084 assert!(result.is_err());
1085 assert_eq!(
1086 result.unwrap_err().to_string(),
1087 "ValueError: interval_ms must be > 0"
1088 );
1089
1090 let result = actor.py_unsubscribe_book_at_interval(audusd_sim.id, 0, None, None);
1091 assert!(result.is_err());
1092 assert_eq!(
1093 result.unwrap_err().to_string(),
1094 "ValueError: interval_ms must be > 0"
1095 );
1096 }
1097
1098 #[rstest]
1099 fn test_request_methods_signatures_exist() {
1100 let actor = create_unregistered_actor();
1101 assert!(actor.trader_id().is_none());
1102 }
1103
1104 #[rstest]
1105 fn test_data_actor_trait_implementation(
1106 clock: Rc<RefCell<TestClock>>,
1107 cache: Rc<RefCell<Cache>>,
1108 trader_id: TraderId,
1109 ) {
1110 let actor = create_registered_actor(clock, cache, trader_id);
1111 let state = actor.state();
1112 assert_eq!(state, ComponentState::Ready);
1113 }
1114
1115 static CALL_TRACKER: std::sync::LazyLock<Arc<Mutex<HashMap<String, i32>>>> =
1119 std::sync::LazyLock::new(|| Arc::new(Mutex::new(HashMap::new())));
1120
1121 #[derive(Debug)]
1123 struct TestDataActor {
1124 inner: PyDataActor,
1125 }
1126
1127 impl TestDataActor {
1128 fn new() -> Self {
1129 Self {
1130 inner: PyDataActor::py_new(None).unwrap(),
1131 }
1132 }
1133
1134 fn track_call(&self, handler_name: &str) {
1135 let mut tracker = CALL_TRACKER.lock().unwrap();
1136 *tracker.entry(handler_name.to_string()).or_insert(0) += 1;
1137 }
1138
1139 fn get_call_count(&self, handler_name: &str) -> i32 {
1140 let tracker = CALL_TRACKER.lock().unwrap();
1141 tracker.get(handler_name).copied().unwrap_or(0)
1142 }
1143
1144 fn reset_tracker(&self) {
1145 let mut tracker = CALL_TRACKER.lock().unwrap();
1146 tracker.clear();
1147 }
1148 }
1149
1150 impl Deref for TestDataActor {
1151 type Target = DataActorCore;
1152 fn deref(&self) -> &Self::Target {
1153 &self.inner.core
1154 }
1155 }
1156
1157 impl DerefMut for TestDataActor {
1158 fn deref_mut(&mut self) -> &mut Self::Target {
1159 &mut self.inner.core
1160 }
1161 }
1162
1163 impl DataActor for TestDataActor {
1164 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
1165 self.track_call("on_time_event");
1166 self.inner.on_time_event(event)
1167 }
1168
1169 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
1170 self.track_call("on_data");
1171 self.inner.on_data(data)
1172 }
1173
1174 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
1175 self.track_call("on_signal");
1176 self.inner.on_signal(signal)
1177 }
1178
1179 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
1180 self.track_call("on_instrument");
1181 self.inner.on_instrument(instrument)
1182 }
1183
1184 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
1185 self.track_call("on_quote");
1186 self.inner.on_quote(quote)
1187 }
1188
1189 fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
1190 self.track_call("on_trade");
1191 self.inner.on_trade(trade)
1192 }
1193
1194 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
1195 self.track_call("on_bar");
1196 self.inner.on_bar(bar)
1197 }
1198
1199 fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
1200 self.track_call("on_book");
1201 self.inner.on_book(book)
1202 }
1203
1204 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
1205 self.track_call("on_book_deltas");
1206 self.inner.on_book_deltas(deltas)
1207 }
1208
1209 fn on_mark_price(&mut self, update: &MarkPriceUpdate) -> anyhow::Result<()> {
1210 self.track_call("on_mark_price");
1211 self.inner.on_mark_price(update)
1212 }
1213
1214 fn on_index_price(&mut self, update: &IndexPriceUpdate) -> anyhow::Result<()> {
1215 self.track_call("on_index_price");
1216 self.inner.on_index_price(update)
1217 }
1218
1219 fn on_instrument_status(&mut self, update: &InstrumentStatus) -> anyhow::Result<()> {
1220 self.track_call("on_instrument_status");
1221 self.inner.on_instrument_status(update)
1222 }
1223
1224 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
1225 self.track_call("on_instrument_close");
1226 self.inner.on_instrument_close(update)
1227 }
1228
1229 #[cfg(feature = "defi")]
1230 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
1231 self.track_call("on_block");
1232 self.inner.on_block(block)
1233 }
1234
1235 #[cfg(feature = "defi")]
1236 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
1237 self.track_call("on_pool");
1238 self.inner.on_pool(pool)
1239 }
1240
1241 #[cfg(feature = "defi")]
1242 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
1243 self.track_call("on_pool_swap");
1244 self.inner.on_pool_swap(swap)
1245 }
1246
1247 #[cfg(feature = "defi")]
1248 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
1249 self.track_call("on_pool_liquidity_update");
1250 self.inner.on_pool_liquidity_update(update)
1251 }
1252 }
1253
1254 #[rstest]
1255 fn test_python_on_signal_handler(
1256 clock: Rc<RefCell<TestClock>>,
1257 cache: Rc<RefCell<Cache>>,
1258 trader_id: TraderId,
1259 ) {
1260 pyo3::prepare_freethreaded_python();
1261
1262 let mut test_actor = TestDataActor::new();
1263 test_actor.reset_tracker();
1264 test_actor
1265 .register(trader_id, clock.clone(), cache.clone())
1266 .unwrap();
1267
1268 let signal = Signal::new(
1269 Ustr::from("test_signal"),
1270 "1.0".to_string(),
1271 UnixNanos::default(),
1272 UnixNanos::default(),
1273 );
1274
1275 assert!(test_actor.on_signal(&signal).is_ok());
1276 assert_eq!(test_actor.get_call_count("on_signal"), 1);
1277 }
1278
1279 #[rstest]
1280 fn test_python_on_data_handler(
1281 clock: Rc<RefCell<TestClock>>,
1282 cache: Rc<RefCell<Cache>>,
1283 trader_id: TraderId,
1284 ) {
1285 pyo3::prepare_freethreaded_python();
1286
1287 let mut test_actor = TestDataActor::new();
1288 test_actor.reset_tracker();
1289 test_actor
1290 .register(trader_id, clock.clone(), cache.clone())
1291 .unwrap();
1292
1293 assert!(test_actor.on_data(&()).is_ok());
1294 assert_eq!(test_actor.get_call_count("on_data"), 1);
1295 }
1296
1297 #[rstest]
1298 fn test_python_on_time_event_handler(
1299 clock: Rc<RefCell<TestClock>>,
1300 cache: Rc<RefCell<Cache>>,
1301 trader_id: TraderId,
1302 ) {
1303 pyo3::prepare_freethreaded_python();
1304
1305 let mut test_actor = TestDataActor::new();
1306 test_actor.reset_tracker();
1307 test_actor
1308 .register(trader_id, clock.clone(), cache.clone())
1309 .unwrap();
1310
1311 let time_event = TimeEvent::new(
1312 Ustr::from("test_timer"),
1313 UUID4::new(),
1314 UnixNanos::default(),
1315 UnixNanos::default(),
1316 );
1317
1318 assert!(test_actor.on_time_event(&time_event).is_ok());
1319 assert_eq!(test_actor.get_call_count("on_time_event"), 1);
1320 }
1321
1322 #[rstest]
1323 fn test_python_on_instrument_handler(
1324 clock: Rc<RefCell<TestClock>>,
1325 cache: Rc<RefCell<Cache>>,
1326 trader_id: TraderId,
1327 audusd_sim: CurrencyPair,
1328 ) {
1329 pyo3::prepare_freethreaded_python();
1330
1331 let mut rust_actor = PyDataActor::py_new(None).unwrap();
1332 rust_actor
1333 .register(trader_id, clock.clone(), cache.clone())
1334 .unwrap();
1335
1336 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1337
1338 assert!(rust_actor.on_instrument(&instrument).is_ok());
1339 }
1340
1341 #[rstest]
1342 fn test_python_on_quote_handler(
1343 clock: Rc<RefCell<TestClock>>,
1344 cache: Rc<RefCell<Cache>>,
1345 trader_id: TraderId,
1346 audusd_sim: CurrencyPair,
1347 ) {
1348 pyo3::prepare_freethreaded_python();
1349
1350 let mut rust_actor = PyDataActor::py_new(None).unwrap();
1351 rust_actor
1352 .register(trader_id, clock.clone(), cache.clone())
1353 .unwrap();
1354
1355 let quote = QuoteTick::new(
1356 audusd_sim.id,
1357 Price::from("1.0000"),
1358 Price::from("1.0001"),
1359 Quantity::from("100000"),
1360 Quantity::from("100000"),
1361 UnixNanos::default(),
1362 UnixNanos::default(),
1363 );
1364
1365 assert!(rust_actor.on_quote("e).is_ok());
1366 }
1367
1368 #[rstest]
1369 fn test_python_on_trade_handler(
1370 clock: Rc<RefCell<TestClock>>,
1371 cache: Rc<RefCell<Cache>>,
1372 trader_id: TraderId,
1373 audusd_sim: CurrencyPair,
1374 ) {
1375 pyo3::prepare_freethreaded_python();
1376
1377 let mut rust_actor = PyDataActor::py_new(None).unwrap();
1378 rust_actor
1379 .register(trader_id, clock.clone(), cache.clone())
1380 .unwrap();
1381
1382 let trade = TradeTick::new(
1383 audusd_sim.id,
1384 Price::from("1.0000"),
1385 Quantity::from("100000"),
1386 nautilus_model::enums::AggressorSide::Buyer,
1387 "T123".to_string().into(),
1388 UnixNanos::default(),
1389 UnixNanos::default(),
1390 );
1391
1392 assert!(rust_actor.on_trade(&trade).is_ok());
1393 }
1394
1395 #[rstest]
1396 fn test_python_on_bar_handler(
1397 clock: Rc<RefCell<TestClock>>,
1398 cache: Rc<RefCell<Cache>>,
1399 trader_id: TraderId,
1400 audusd_sim: CurrencyPair,
1401 ) {
1402 pyo3::prepare_freethreaded_python();
1403
1404 let mut rust_actor = PyDataActor::py_new(None).unwrap();
1405 rust_actor
1406 .register(trader_id, clock.clone(), cache.clone())
1407 .unwrap();
1408
1409 let bar_type =
1410 BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap();
1411 let bar = Bar::new(
1412 bar_type,
1413 Price::from("1.0000"),
1414 Price::from("1.0001"),
1415 Price::from("0.9999"),
1416 Price::from("1.0000"),
1417 Quantity::from("100000"),
1418 UnixNanos::default(),
1419 UnixNanos::default(),
1420 );
1421
1422 assert!(rust_actor.on_bar(&bar).is_ok());
1423 }
1424
1425 #[rstest]
1426 fn test_python_on_book_handler(
1427 clock: Rc<RefCell<TestClock>>,
1428 cache: Rc<RefCell<Cache>>,
1429 trader_id: TraderId,
1430 audusd_sim: CurrencyPair,
1431 ) {
1432 pyo3::prepare_freethreaded_python();
1433
1434 let mut rust_actor = PyDataActor::py_new(None).unwrap();
1435 rust_actor
1436 .register(trader_id, clock.clone(), cache.clone())
1437 .unwrap();
1438
1439 let book = OrderBook::new(audusd_sim.id, BookType::L2_MBP);
1440 assert!(rust_actor.on_book(&book).is_ok());
1441 }
1442
1443 #[rstest]
1444 fn test_python_on_book_deltas_handler(
1445 clock: Rc<RefCell<TestClock>>,
1446 cache: Rc<RefCell<Cache>>,
1447 trader_id: TraderId,
1448 audusd_sim: CurrencyPair,
1449 ) {
1450 pyo3::prepare_freethreaded_python();
1451
1452 let mut rust_actor = PyDataActor::py_new(None).unwrap();
1453 rust_actor
1454 .register(trader_id, clock.clone(), cache.clone())
1455 .unwrap();
1456
1457 let delta =
1458 OrderBookDelta::clear(audusd_sim.id, 0, UnixNanos::default(), UnixNanos::default());
1459 let deltas = OrderBookDeltas::new(audusd_sim.id, vec![delta]);
1460
1461 assert!(rust_actor.on_book_deltas(&deltas).is_ok());
1462 }
1463
1464 #[rstest]
1465 fn test_python_on_mark_price_handler(
1466 clock: Rc<RefCell<TestClock>>,
1467 cache: Rc<RefCell<Cache>>,
1468 trader_id: TraderId,
1469 audusd_sim: CurrencyPair,
1470 ) {
1471 pyo3::prepare_freethreaded_python();
1472
1473 let mut rust_actor = PyDataActor::py_new(None).unwrap();
1474 rust_actor
1475 .register(trader_id, clock.clone(), cache.clone())
1476 .unwrap();
1477
1478 let mark_price = MarkPriceUpdate::new(
1479 audusd_sim.id,
1480 Price::from("1.0000"),
1481 UnixNanos::default(),
1482 UnixNanos::default(),
1483 );
1484
1485 assert!(rust_actor.on_mark_price(&mark_price).is_ok());
1486 }
1487
1488 #[rstest]
1489 fn test_python_on_index_price_handler(
1490 clock: Rc<RefCell<TestClock>>,
1491 cache: Rc<RefCell<Cache>>,
1492 trader_id: TraderId,
1493 audusd_sim: CurrencyPair,
1494 ) {
1495 pyo3::prepare_freethreaded_python();
1496
1497 let mut rust_actor = PyDataActor::py_new(None).unwrap();
1498 rust_actor
1499 .register(trader_id, clock.clone(), cache.clone())
1500 .unwrap();
1501
1502 let index_price = IndexPriceUpdate::new(
1503 audusd_sim.id,
1504 Price::from("1.0000"),
1505 UnixNanos::default(),
1506 UnixNanos::default(),
1507 );
1508
1509 assert!(rust_actor.on_index_price(&index_price).is_ok());
1510 }
1511
1512 #[rstest]
1513 fn test_python_on_instrument_status_handler(
1514 clock: Rc<RefCell<TestClock>>,
1515 cache: Rc<RefCell<Cache>>,
1516 trader_id: TraderId,
1517 audusd_sim: CurrencyPair,
1518 ) {
1519 pyo3::prepare_freethreaded_python();
1520
1521 let mut rust_actor = PyDataActor::py_new(None).unwrap();
1522 rust_actor
1523 .register(trader_id, clock.clone(), cache.clone())
1524 .unwrap();
1525
1526 let status = InstrumentStatus::new(
1527 audusd_sim.id,
1528 nautilus_model::enums::MarketStatusAction::Trading,
1529 UnixNanos::default(),
1530 UnixNanos::default(),
1531 None,
1532 None,
1533 None,
1534 None,
1535 None,
1536 );
1537
1538 assert!(rust_actor.on_instrument_status(&status).is_ok());
1539 }
1540
1541 #[rstest]
1542 fn test_python_on_instrument_close_handler(
1543 clock: Rc<RefCell<TestClock>>,
1544 cache: Rc<RefCell<Cache>>,
1545 trader_id: TraderId,
1546 audusd_sim: CurrencyPair,
1547 ) {
1548 pyo3::prepare_freethreaded_python();
1549
1550 let mut rust_actor = PyDataActor::py_new(None).unwrap();
1551 rust_actor
1552 .register(trader_id, clock.clone(), cache.clone())
1553 .unwrap();
1554
1555 let close = InstrumentClose::new(
1556 audusd_sim.id,
1557 Price::from("1.0000"),
1558 nautilus_model::enums::InstrumentCloseType::EndOfSession,
1559 UnixNanos::default(),
1560 UnixNanos::default(),
1561 );
1562
1563 assert!(rust_actor.on_instrument_close(&close).is_ok());
1564 }
1565
1566 #[cfg(feature = "defi")]
1567 #[rstest]
1568 fn test_python_on_block_handler(
1569 clock: Rc<RefCell<TestClock>>,
1570 cache: Rc<RefCell<Cache>>,
1571 trader_id: TraderId,
1572 ) {
1573 pyo3::prepare_freethreaded_python();
1574
1575 let mut test_actor = TestDataActor::new();
1576 test_actor.reset_tracker();
1577 test_actor
1578 .register(trader_id, clock.clone(), cache.clone())
1579 .unwrap();
1580
1581 let block = Block::new(
1582 "0x1234567890abcdef".to_string(),
1583 "0xabcdef1234567890".to_string(),
1584 12345,
1585 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0".into(),
1586 21000,
1587 20000,
1588 UnixNanos::default(),
1589 Some(Blockchain::Ethereum),
1590 );
1591
1592 assert!(test_actor.on_block(&block).is_ok());
1593 assert_eq!(test_actor.get_call_count("on_block"), 1);
1594 }
1595
1596 #[cfg(feature = "defi")]
1597 #[rstest]
1598 fn test_python_on_pool_swap_handler(
1599 clock: Rc<RefCell<TestClock>>,
1600 cache: Rc<RefCell<Cache>>,
1601 trader_id: TraderId,
1602 ) {
1603 pyo3::prepare_freethreaded_python();
1604
1605 let mut rust_actor = PyDataActor::py_new(None).unwrap();
1606 rust_actor
1607 .register(trader_id, clock.clone(), cache.clone())
1608 .unwrap();
1609
1610 let chain = Arc::new(Chain::new(Blockchain::Ethereum, 1));
1611 let dex = Arc::new(Dex::new(
1612 Chain::new(Blockchain::Ethereum, 1),
1613 "Uniswap V3",
1614 "0x1f98431c8ad98523631ae4a59f267346ea31f984",
1615 AmmType::CLAMM,
1616 "PoolCreated",
1617 "Swap",
1618 "Mint",
1619 "Burn",
1620 ));
1621 let token0 = Token::new(
1622 chain.clone(),
1623 "0xa0b86a33e6441c8c06dd7b111a8c4e82e2b2a5e1"
1624 .parse()
1625 .unwrap(),
1626 "USDC".into(),
1627 "USD Coin".into(),
1628 6,
1629 );
1630 let token1 = Token::new(
1631 chain.clone(),
1632 "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
1633 .parse()
1634 .unwrap(),
1635 "WETH".into(),
1636 "Wrapped Ether".into(),
1637 18,
1638 );
1639 let pool = Arc::new(Pool::new(
1640 chain.clone(),
1641 dex.as_ref().clone(),
1642 "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
1643 .parse()
1644 .unwrap(),
1645 12345,
1646 token0,
1647 token1,
1648 500,
1649 10,
1650 UnixNanos::default(),
1651 ));
1652
1653 let swap = PoolSwap::new(
1654 chain.clone(),
1655 dex.clone(),
1656 pool.clone(),
1657 12345,
1658 "0xabc123".to_string(),
1659 0,
1660 0,
1661 UnixNanos::default(),
1662 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
1663 .parse()
1664 .unwrap(),
1665 nautilus_model::enums::OrderSide::Buy,
1666 Quantity::from("1000"),
1667 Price::from("1.0"),
1668 );
1669
1670 assert!(rust_actor.on_pool_swap(&swap).is_ok());
1671 }
1672
1673 #[cfg(feature = "defi")]
1674 #[rstest]
1675 fn test_python_on_pool_liquidity_update_handler(
1676 clock: Rc<RefCell<TestClock>>,
1677 cache: Rc<RefCell<Cache>>,
1678 trader_id: TraderId,
1679 ) {
1680 pyo3::prepare_freethreaded_python();
1681
1682 let mut rust_actor = PyDataActor::py_new(None).unwrap();
1683 rust_actor
1684 .register(trader_id, clock.clone(), cache.clone())
1685 .unwrap();
1686
1687 let block = Block::new(
1688 "0x1234567890abcdef".to_string(),
1689 "0xabcdef1234567890".to_string(),
1690 12345,
1691 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0".into(),
1692 21000,
1693 20000,
1694 UnixNanos::default(),
1695 Some(Blockchain::Ethereum),
1696 );
1697
1698 assert!(rust_actor.on_block(&block).is_ok());
1702 }
1703}