1use std::{num::NonZero, sync::Arc};
19
20use futures_util::StreamExt;
21use nautilus_core::{nanos::UnixNanos, python::to_pyruntime_err, time::get_atomic_clock_realtime};
22use nautilus_model::{
23 data::{BarSpecification, BarType, Data, OrderBookDeltas_API},
24 enums::{AggregationSource, BarAggregation, PriceType},
25 identifiers::{AccountId, InstrumentId},
26 instruments::Instrument,
27 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
28};
29use pyo3::{IntoPyObjectExt, prelude::*};
30
31use crate::{
32 common::{
33 credential::Credential,
34 enums::{BybitEnvironment, BybitProductType},
35 parse::make_bybit_symbol,
36 },
37 websocket::{
38 client::BybitWebSocketClient,
39 messages::{BybitWebSocketError, BybitWebSocketMessage},
40 parse::{
41 parse_kline_topic, parse_millis_i64, parse_orderbook_deltas, parse_ws_account_state,
42 parse_ws_fill_report, parse_ws_kline_bar, parse_ws_order_status_report,
43 parse_ws_position_status_report, parse_ws_trade_tick,
44 },
45 },
46};
47
48#[pymethods]
49impl BybitWebSocketError {
50 fn __repr__(&self) -> String {
51 format!(
52 "BybitWebSocketError(code={}, message='{}', conn_id={:?}, topic={:?})",
53 self.code, self.message, self.conn_id, self.topic
54 )
55 }
56
57 #[getter]
58 pub fn code(&self) -> i64 {
59 self.code
60 }
61
62 #[getter]
63 pub fn message(&self) -> &str {
64 &self.message
65 }
66
67 #[getter]
68 pub fn conn_id(&self) -> Option<&str> {
69 self.conn_id.as_deref()
70 }
71
72 #[getter]
73 pub fn topic(&self) -> Option<&str> {
74 self.topic.as_deref()
75 }
76
77 #[getter]
78 pub fn req_id(&self) -> Option<&str> {
79 self.req_id.as_deref()
80 }
81}
82
83#[pymethods]
84impl BybitWebSocketClient {
85 #[staticmethod]
86 #[pyo3(name = "new_public")]
87 #[pyo3(signature = (product_type, environment, url=None, heartbeat=None))]
88 fn py_new_public(
89 product_type: BybitProductType,
90 environment: BybitEnvironment,
91 url: Option<String>,
92 heartbeat: Option<u64>,
93 ) -> Self {
94 Self::new_public_with(product_type, environment, url, heartbeat)
95 }
96
97 #[staticmethod]
98 #[pyo3(name = "new_private")]
99 #[pyo3(signature = (environment, api_key, api_secret, url=None, heartbeat=None))]
100 fn py_new_private(
101 environment: BybitEnvironment,
102 api_key: String,
103 api_secret: String,
104 url: Option<String>,
105 heartbeat: Option<u64>,
106 ) -> Self {
107 tracing::debug!(
108 "Creating private WebSocket client with API key: {}",
109 &api_key[..api_key.len().min(10)]
110 );
111 let credential = crate::common::credential::Credential::new(api_key, api_secret);
112 Self::new_private(environment, credential, url, heartbeat)
113 }
114
115 #[staticmethod]
116 #[pyo3(name = "new_trade")]
117 #[pyo3(signature = (environment, api_key, api_secret, url=None, heartbeat=None))]
118 fn py_new_trade(
119 environment: BybitEnvironment,
120 api_key: String,
121 api_secret: String,
122 url: Option<String>,
123 heartbeat: Option<u64>,
124 ) -> Self {
125 let credential = Credential::new(api_key, api_secret);
126 Self::new_trade(environment, credential, url, heartbeat)
127 }
128
129 #[pyo3(name = "is_active")]
130 fn py_is_active<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
131 let client = self.clone();
132
133 pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(client.is_active().await) })
134 }
135
136 #[pyo3(name = "subscription_count")]
137 fn py_subscription_count(&self) -> usize {
138 self.subscription_count()
139 }
140
141 #[pyo3(name = "add_instrument")]
142 fn py_add_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
143 self.add_instrument(pyobject_to_instrument_any(py, instrument)?);
144 Ok(())
145 }
146
147 #[pyo3(name = "set_account_id")]
148 fn py_set_account_id(&mut self, account_id: AccountId) {
149 self.set_account_id(account_id);
150 }
151
152 #[pyo3(name = "connect")]
153 fn py_connect<'py>(
154 &mut self,
155 py: Python<'py>,
156 callback: Py<PyAny>,
157 ) -> PyResult<Bound<'py, PyAny>> {
158 let mut client = self.clone();
159
160 pyo3_async_runtimes::tokio::future_into_py(py, async move {
161 client.connect().await.map_err(to_pyruntime_err)?;
162
163 let stream = client.stream();
164
165 let instruments = Arc::clone(client.instruments());
166 let account_id = client.account_id();
167 let product_type = client.product_type();
168 let quote_cache = Arc::clone(client.quote_cache());
169
170 tokio::spawn(async move {
171 tokio::pin!(stream);
172
173 let clock = get_atomic_clock_realtime();
174
175 while let Some(msg) = stream.next().await {
176 match msg {
177 BybitWebSocketMessage::Orderbook(msg) => {
178 let raw_symbol = msg.data.s;
179
180 let symbol = product_type.map_or(raw_symbol, |pt| {
181 make_bybit_symbol(raw_symbol.as_str(), pt)
182 });
183
184 if let Some(instrument_entry) = instruments
185 .iter()
186 .find(|e| e.key().symbol.as_str() == symbol.as_str())
187 {
188 let instrument = instrument_entry.value();
189 let ts_init = clock.get_time_ns();
190
191 match parse_orderbook_deltas(&msg, instrument, ts_init) {
192 Ok(deltas) => {
193 Python::attach(|py| {
194 let py_obj = data_to_pycapsule(
195 py,
196 Data::Deltas(OrderBookDeltas_API::new(deltas)),
197 );
198 call_python(py, &callback, py_obj);
199 });
200 }
201 Err(e) => {
202 tracing::error!("Error parsing orderbook deltas: {e}");
203 }
204 }
205 } else {
206 tracing::warn!(
207 raw_symbol = %raw_symbol,
208 full_symbol = %symbol,
209 "No instrument found for symbol"
210 );
211 }
212 }
213 BybitWebSocketMessage::TickerLinear(msg) => {
214 let raw_symbol = msg.data.symbol;
215
216 let symbol = product_type.map_or(raw_symbol, |pt| {
217 make_bybit_symbol(raw_symbol.as_str(), pt)
218 });
219
220 if let Some(instrument_entry) = instruments
221 .iter()
222 .find(|e| e.key().symbol.as_str() == symbol.as_str())
223 {
224 let instrument = instrument_entry.value();
225 let instrument_id = instrument.id();
226 let ts_event = parse_millis_i64(msg.ts, "ticker.ts")
227 .unwrap_or_else(|_| get_atomic_clock_realtime().get_time_ns());
228 let ts_init = clock.get_time_ns();
229
230 match quote_cache.write().await.process_linear_ticker(
231 &msg.data,
232 instrument_id,
233 instrument,
234 ts_event,
235 ts_init,
236 ) {
237 Ok(quote) => {
238 Python::attach(|py| {
239 let py_obj = data_to_pycapsule(py, Data::Quote(quote));
240 call_python(py, &callback, py_obj);
241 });
242 }
243 Err(e) => {
244 tracing::debug!("Skipping partial ticker update: {e}");
245 }
246 }
247 } else {
248 tracing::warn!(
249 raw_symbol = %raw_symbol,
250 full_symbol = %symbol,
251 "No instrument found for symbol"
252 );
253 }
254 }
255 BybitWebSocketMessage::TickerOption(msg) => {
256 let raw_symbol = &msg.data.symbol;
257
258 let symbol = product_type.map_or_else(
259 || raw_symbol.as_str().into(),
260 |pt| make_bybit_symbol(raw_symbol, pt),
261 );
262
263 if let Some(instrument_entry) = instruments
264 .iter()
265 .find(|e| e.key().symbol.as_str() == symbol.as_str())
266 {
267 let instrument = instrument_entry.value();
268 let instrument_id = instrument.id();
269 let ts_event = parse_millis_i64(msg.ts, "ticker.ts")
270 .unwrap_or_else(|_| get_atomic_clock_realtime().get_time_ns());
271 let ts_init = clock.get_time_ns();
272
273 match quote_cache.write().await.process_option_ticker(
274 &msg.data,
275 instrument_id,
276 instrument,
277 ts_event,
278 ts_init,
279 ) {
280 Ok(quote) => {
281 Python::attach(|py| {
282 let py_obj = data_to_pycapsule(py, Data::Quote(quote));
283 call_python(py, &callback, py_obj);
284 });
285 }
286 Err(e) => {
287 tracing::debug!("Skipping partial ticker update: {e}");
288 }
289 }
290 } else {
291 tracing::warn!(
292 raw_symbol = %raw_symbol,
293 full_symbol = %symbol,
294 "No instrument found for symbol"
295 );
296 }
297 }
298 BybitWebSocketMessage::Trade(msg) => {
299 for trade in &msg.data {
300 let raw_symbol = trade.s;
301
302 let symbol = product_type.map_or(raw_symbol, |pt| {
303 make_bybit_symbol(raw_symbol.as_str(), pt)
304 });
305
306 if let Some(instrument_entry) = instruments
307 .iter()
308 .find(|e| e.key().symbol.as_str() == symbol.as_str())
309 {
310 let instrument = instrument_entry.value();
311 let ts_init = clock.get_time_ns();
312
313 match parse_ws_trade_tick(trade, instrument, ts_init) {
314 Ok(tick) => {
315 Python::attach(|py| {
316 let py_obj =
317 data_to_pycapsule(py, Data::Trade(tick));
318 call_python(py, &callback, py_obj);
319 });
320 }
321 Err(e) => {
322 tracing::error!("Error parsing trade tick: {e}");
323 }
324 }
325 } else {
326 tracing::warn!(
327 raw_symbol = %raw_symbol,
328 full_symbol = %symbol,
329 "No instrument found for symbol"
330 );
331 }
332 }
333 }
334 BybitWebSocketMessage::Kline(msg) => {
335 let (interval_str, raw_symbol) = match parse_kline_topic(&msg.topic) {
336 Ok(parts) => parts,
337 Err(e) => {
338 tracing::warn!("Failed to parse kline topic: {e}");
339 continue;
340 }
341 };
342
343 let symbol = product_type.map_or_else(
344 || raw_symbol.into(),
345 |pt| make_bybit_symbol(raw_symbol, pt),
346 );
347
348 if let Some(instrument_entry) = instruments
349 .iter()
350 .find(|e| e.key().symbol.as_str() == symbol.as_str())
351 {
352 let instrument = instrument_entry.value();
353 let ts_init = clock.get_time_ns();
354
355 let (step, aggregation) = match interval_str.parse::<usize>() {
356 Ok(minutes) if minutes > 0 => (minutes, BarAggregation::Minute),
357 _ => {
358 tracing::warn!(
360 "Unsupported kline interval: {}",
361 interval_str
362 );
363 continue;
364 }
365 };
366
367 if let Some(non_zero_step) = NonZero::new(step) {
368 let bar_spec = BarSpecification {
369 step: non_zero_step,
370 aggregation,
371 price_type: PriceType::Last,
372 };
373 let bar_type = BarType::new(
374 instrument.id(),
375 bar_spec,
376 AggregationSource::External,
377 );
378
379 for kline in &msg.data {
380 match parse_ws_kline_bar(
381 kline, instrument, bar_type, false, ts_init,
382 ) {
383 Ok(bar) => {
384 Python::attach(|py| {
385 let py_obj =
386 data_to_pycapsule(py, Data::Bar(bar));
387 call_python(py, &callback, py_obj);
388 });
389 }
390 Err(e) => {
391 tracing::error!("Error parsing kline to bar: {e}");
392 }
393 }
394 }
395 } else {
396 tracing::error!("Invalid step value: {}", step);
397 }
398 } else {
399 tracing::warn!(
400 raw_symbol = %raw_symbol,
401 full_symbol = %symbol,
402 "No instrument found for symbol"
403 );
404 }
405 }
406
407 BybitWebSocketMessage::AccountOrder(msg) => {
408 if let Some(account_id) = account_id {
409 for order in &msg.data {
410 let raw_symbol = order.symbol;
411
412 let symbol =
413 make_bybit_symbol(raw_symbol.as_str(), order.category);
414
415 if let Some(instrument_entry) = instruments
416 .iter()
417 .find(|e| e.key().symbol.as_str() == symbol.as_str())
418 {
419 let instrument = instrument_entry.value();
420 let ts_init = clock.get_time_ns();
421
422 match parse_ws_order_status_report(
423 order, instrument, account_id, ts_init,
424 ) {
425 Ok(report) => {
426 Python::attach(|py| {
427 if let Ok(py_obj) = report.into_py_any(py) {
428 call_python(py, &callback, py_obj);
429 }
430 });
431 }
432 Err(e) => {
433 tracing::error!(
434 "Error parsing order status report: {e}"
435 );
436 }
437 }
438 } else {
439 tracing::warn!(
440 raw_symbol = %raw_symbol,
441 full_symbol = %symbol,
442 "No instrument found for symbol"
443 );
444 }
445 }
446 } else {
447 tracing::error!(
448 "Received AccountOrder message but account_id is not set"
449 );
450 }
451 }
452 BybitWebSocketMessage::AccountExecution(msg) => {
453 if let Some(account_id) = account_id {
454 for execution in &msg.data {
455 let raw_symbol = execution.symbol;
456 let symbol =
457 make_bybit_symbol(raw_symbol.as_str(), execution.category);
458
459 if let Some(instrument_entry) = instruments
460 .iter()
461 .find(|e| e.key().symbol.as_str() == symbol.as_str())
462 {
463 let instrument = instrument_entry.value();
464 let ts_init = clock.get_time_ns();
465
466 match parse_ws_fill_report(
467 execution, account_id, instrument, ts_init,
468 ) {
469 Ok(report) => {
470 Python::attach(|py| {
471 if let Ok(py_obj) = report.into_py_any(py) {
472 call_python(py, &callback, py_obj);
473 }
474 });
475 }
476 Err(e) => {
477 tracing::error!("Error parsing fill report: {e}");
478 }
479 }
480 } else {
481 tracing::warn!(
482 raw_symbol = %raw_symbol,
483 full_symbol = %symbol,
484 "No instrument found for symbol"
485 );
486 }
487 }
488 } else {
489 tracing::error!(
490 "Received AccountExecution message but account_id is not set"
491 );
492 }
493 }
494 BybitWebSocketMessage::AccountWallet(msg) => {
495 if let Some(account_id) = account_id {
496 for wallet in &msg.data {
497 let ts_event =
498 UnixNanos::from(msg.creation_time as u64 * 1_000_000);
499 let ts_init = clock.get_time_ns();
500
501 match parse_ws_account_state(
502 wallet, account_id, ts_event, ts_init,
503 ) {
504 Ok(state) => {
505 Python::attach(|py| {
506 if let Ok(py_obj) = state.into_py_any(py) {
507 call_python(py, &callback, py_obj);
508 }
509 });
510 }
511 Err(e) => {
512 tracing::error!("Error parsing account state: {e}");
513 }
514 }
515 }
516 } else {
517 tracing::error!(
518 "Received AccountWallet message but account_id is not set"
519 );
520 }
521 }
522 BybitWebSocketMessage::AccountPosition(msg) => {
523 if let Some(account_id) = account_id {
524 for position in &msg.data {
525 let raw_symbol = position.symbol;
526
527 if let Some(instrument_entry) = instruments.iter().find(|e| {
530 let inst_symbol = e.key().symbol.as_str();
531 inst_symbol.starts_with(raw_symbol.as_str())
533 && inst_symbol.len() > raw_symbol.len()
534 && inst_symbol.as_bytes().get(raw_symbol.len())
535 == Some(&b'-')
536 }) {
537 let instrument = instrument_entry.value();
538 let ts_init = clock.get_time_ns();
539
540 match parse_ws_position_status_report(
541 position, account_id, instrument, ts_init,
542 ) {
543 Ok(report) => {
544 Python::attach(|py| {
545 if let Ok(py_obj) = report.into_py_any(py) {
546 call_python(py, &callback, py_obj);
547 }
548 });
549 }
550 Err(e) => {
551 tracing::error!(
552 "Error parsing position status report: {e}"
553 );
554 }
555 }
556 } else {
557 tracing::warn!(
558 raw_symbol = %raw_symbol,
559 "No instrument found for symbol"
560 );
561 }
562 }
563 } else {
564 tracing::error!(
565 "Received AccountPosition message but account_id is not set"
566 );
567 }
568 }
569 BybitWebSocketMessage::Error(msg) => {
570 call_python_with_data(&callback, |py| {
571 msg.into_py_any(py).map(|obj| obj.into_bound(py))
572 });
573 }
574 BybitWebSocketMessage::Reconnected => {}
575 BybitWebSocketMessage::Pong => {}
576 BybitWebSocketMessage::Response(msg) => {
577 tracing::debug!("Received response message: {:?}", msg);
578 }
579 BybitWebSocketMessage::Auth(msg) => {
580 tracing::debug!("Received auth message: {:?}", msg);
581 }
582 BybitWebSocketMessage::Subscription(msg) => {
583 tracing::debug!("Received subscription message: {:?}", msg);
584 }
585 BybitWebSocketMessage::Raw(value) => {
586 tracing::debug!("Received raw/unhandled message, skipping: {value}");
587 }
588 }
589 }
590 });
591
592 Ok(())
593 })
594 }
595
596 #[pyo3(name = "close")]
597 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
598 let mut client = self.clone();
599
600 pyo3_async_runtimes::tokio::future_into_py(py, async move {
601 if let Err(e) = client.close().await {
602 tracing::error!("Error on close: {e}");
603 }
604 Ok(())
605 })
606 }
607
608 #[pyo3(name = "subscribe")]
609 fn py_subscribe<'py>(
610 &self,
611 py: Python<'py>,
612 topics: Vec<String>,
613 ) -> PyResult<Bound<'py, PyAny>> {
614 let client = self.clone();
615
616 pyo3_async_runtimes::tokio::future_into_py(py, async move {
617 client.subscribe(topics).await.map_err(to_pyruntime_err)?;
618 Ok(())
619 })
620 }
621
622 #[pyo3(name = "unsubscribe")]
623 fn py_unsubscribe<'py>(
624 &self,
625 py: Python<'py>,
626 topics: Vec<String>,
627 ) -> PyResult<Bound<'py, PyAny>> {
628 let client = self.clone();
629
630 pyo3_async_runtimes::tokio::future_into_py(py, async move {
631 client.unsubscribe(topics).await.map_err(to_pyruntime_err)?;
632 Ok(())
633 })
634 }
635
636 #[pyo3(name = "subscribe_orderbook")]
637 fn py_subscribe_orderbook<'py>(
638 &self,
639 py: Python<'py>,
640 instrument_id: InstrumentId,
641 depth: u32,
642 ) -> PyResult<Bound<'py, PyAny>> {
643 let client = self.clone();
644
645 pyo3_async_runtimes::tokio::future_into_py(py, async move {
646 client
647 .subscribe_orderbook(instrument_id, depth)
648 .await
649 .map_err(to_pyruntime_err)?;
650 Ok(())
651 })
652 }
653
654 #[pyo3(name = "unsubscribe_orderbook")]
655 fn py_unsubscribe_orderbook<'py>(
656 &self,
657 py: Python<'py>,
658 instrument_id: InstrumentId,
659 depth: u32,
660 ) -> PyResult<Bound<'py, PyAny>> {
661 let client = self.clone();
662
663 pyo3_async_runtimes::tokio::future_into_py(py, async move {
664 client
665 .unsubscribe_orderbook(instrument_id, depth)
666 .await
667 .map_err(to_pyruntime_err)?;
668 Ok(())
669 })
670 }
671
672 #[pyo3(name = "subscribe_trades")]
673 fn py_subscribe_trades<'py>(
674 &self,
675 py: Python<'py>,
676 instrument_id: InstrumentId,
677 ) -> PyResult<Bound<'py, PyAny>> {
678 let client = self.clone();
679
680 pyo3_async_runtimes::tokio::future_into_py(py, async move {
681 client
682 .subscribe_trades(instrument_id)
683 .await
684 .map_err(to_pyruntime_err)?;
685 Ok(())
686 })
687 }
688
689 #[pyo3(name = "unsubscribe_trades")]
690 fn py_unsubscribe_trades<'py>(
691 &self,
692 py: Python<'py>,
693 instrument_id: InstrumentId,
694 ) -> PyResult<Bound<'py, PyAny>> {
695 let client = self.clone();
696
697 pyo3_async_runtimes::tokio::future_into_py(py, async move {
698 client
699 .unsubscribe_trades(instrument_id)
700 .await
701 .map_err(to_pyruntime_err)?;
702 Ok(())
703 })
704 }
705
706 #[pyo3(name = "subscribe_ticker")]
707 fn py_subscribe_ticker<'py>(
708 &self,
709 py: Python<'py>,
710 instrument_id: InstrumentId,
711 ) -> PyResult<Bound<'py, PyAny>> {
712 let client = self.clone();
713
714 pyo3_async_runtimes::tokio::future_into_py(py, async move {
715 client
716 .subscribe_ticker(instrument_id)
717 .await
718 .map_err(to_pyruntime_err)?;
719 Ok(())
720 })
721 }
722
723 #[pyo3(name = "unsubscribe_ticker")]
724 fn py_unsubscribe_ticker<'py>(
725 &self,
726 py: Python<'py>,
727 instrument_id: InstrumentId,
728 ) -> PyResult<Bound<'py, PyAny>> {
729 let client = self.clone();
730
731 pyo3_async_runtimes::tokio::future_into_py(py, async move {
732 client
733 .unsubscribe_ticker(instrument_id)
734 .await
735 .map_err(to_pyruntime_err)?;
736 Ok(())
737 })
738 }
739
740 #[pyo3(name = "subscribe_klines")]
741 fn py_subscribe_klines<'py>(
742 &self,
743 py: Python<'py>,
744 instrument_id: InstrumentId,
745 interval: String,
746 ) -> PyResult<Bound<'py, PyAny>> {
747 let client = self.clone();
748
749 pyo3_async_runtimes::tokio::future_into_py(py, async move {
750 client
751 .subscribe_klines(instrument_id, interval)
752 .await
753 .map_err(to_pyruntime_err)?;
754 Ok(())
755 })
756 }
757
758 #[pyo3(name = "unsubscribe_klines")]
759 fn py_unsubscribe_klines<'py>(
760 &self,
761 py: Python<'py>,
762 instrument_id: InstrumentId,
763 interval: String,
764 ) -> PyResult<Bound<'py, PyAny>> {
765 let client = self.clone();
766
767 pyo3_async_runtimes::tokio::future_into_py(py, async move {
768 client
769 .unsubscribe_klines(instrument_id, interval)
770 .await
771 .map_err(to_pyruntime_err)?;
772 Ok(())
773 })
774 }
775
776 #[pyo3(name = "subscribe_orders")]
777 fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
778 let client = self.clone();
779
780 pyo3_async_runtimes::tokio::future_into_py(py, async move {
781 client.subscribe_orders().await.map_err(to_pyruntime_err)?;
782 Ok(())
783 })
784 }
785
786 #[pyo3(name = "unsubscribe_orders")]
787 fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
788 let client = self.clone();
789
790 pyo3_async_runtimes::tokio::future_into_py(py, async move {
791 client
792 .unsubscribe_orders()
793 .await
794 .map_err(to_pyruntime_err)?;
795 Ok(())
796 })
797 }
798
799 #[pyo3(name = "subscribe_executions")]
800 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
801 let client = self.clone();
802
803 pyo3_async_runtimes::tokio::future_into_py(py, async move {
804 client
805 .subscribe_executions()
806 .await
807 .map_err(to_pyruntime_err)?;
808 Ok(())
809 })
810 }
811
812 #[pyo3(name = "unsubscribe_executions")]
813 fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
814 let client = self.clone();
815
816 pyo3_async_runtimes::tokio::future_into_py(py, async move {
817 client
818 .unsubscribe_executions()
819 .await
820 .map_err(to_pyruntime_err)?;
821 Ok(())
822 })
823 }
824
825 #[pyo3(name = "subscribe_positions")]
826 fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
827 let client = self.clone();
828
829 pyo3_async_runtimes::tokio::future_into_py(py, async move {
830 client
831 .subscribe_positions()
832 .await
833 .map_err(to_pyruntime_err)?;
834 Ok(())
835 })
836 }
837
838 #[pyo3(name = "unsubscribe_positions")]
839 fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
840 let client = self.clone();
841
842 pyo3_async_runtimes::tokio::future_into_py(py, async move {
843 client
844 .unsubscribe_positions()
845 .await
846 .map_err(to_pyruntime_err)?;
847 Ok(())
848 })
849 }
850
851 #[pyo3(name = "subscribe_wallet")]
852 fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
853 let client = self.clone();
854
855 pyo3_async_runtimes::tokio::future_into_py(py, async move {
856 client.subscribe_wallet().await.map_err(to_pyruntime_err)?;
857 Ok(())
858 })
859 }
860
861 #[pyo3(name = "unsubscribe_wallet")]
862 fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
863 let client = self.clone();
864
865 pyo3_async_runtimes::tokio::future_into_py(py, async move {
866 client
867 .unsubscribe_wallet()
868 .await
869 .map_err(to_pyruntime_err)?;
870 Ok(())
871 })
872 }
873
874 #[pyo3(name = "wait_until_active")]
875 fn py_wait_until_active<'py>(
876 &self,
877 py: Python<'py>,
878 timeout_secs: f64,
879 ) -> PyResult<Bound<'py, PyAny>> {
880 let client = self.clone();
881
882 pyo3_async_runtimes::tokio::future_into_py(py, async move {
883 client
884 .wait_until_active(timeout_secs)
885 .await
886 .map_err(to_pyruntime_err)?;
887 Ok(())
888 })
889 }
890
891 #[pyo3(name = "submit_order")]
892 #[pyo3(signature = (
893 product_type,
894 instrument_id,
895 client_order_id,
896 order_side,
897 order_type,
898 quantity,
899 time_in_force=None,
900 price=None,
901 trigger_price=None,
902 post_only=None,
903 reduce_only=None,
904 ))]
905 #[allow(clippy::too_many_arguments)]
906 fn py_submit_order<'py>(
907 &self,
908 py: Python<'py>,
909 product_type: crate::common::enums::BybitProductType,
910 instrument_id: nautilus_model::identifiers::InstrumentId,
911 client_order_id: nautilus_model::identifiers::ClientOrderId,
912 order_side: nautilus_model::enums::OrderSide,
913 order_type: nautilus_model::enums::OrderType,
914 quantity: nautilus_model::types::Quantity,
915 time_in_force: Option<nautilus_model::enums::TimeInForce>,
916 price: Option<nautilus_model::types::Price>,
917 trigger_price: Option<nautilus_model::types::Price>,
918 post_only: Option<bool>,
919 reduce_only: Option<bool>,
920 ) -> PyResult<Bound<'py, PyAny>> {
921 let client = self.clone();
922
923 pyo3_async_runtimes::tokio::future_into_py(py, async move {
924 client
925 .submit_order(
926 product_type,
927 instrument_id,
928 client_order_id,
929 order_side,
930 order_type,
931 quantity,
932 time_in_force,
933 price,
934 trigger_price,
935 post_only,
936 reduce_only,
937 )
938 .await
939 .map_err(to_pyruntime_err)?;
940 Ok(())
941 })
942 }
943
944 #[pyo3(name = "modify_order")]
945 #[pyo3(signature = (
946 product_type,
947 instrument_id,
948 venue_order_id=None,
949 client_order_id=None,
950 quantity=None,
951 price=None,
952 ))]
953 #[allow(clippy::too_many_arguments)]
954 fn py_modify_order<'py>(
955 &self,
956 py: Python<'py>,
957 product_type: crate::common::enums::BybitProductType,
958 instrument_id: nautilus_model::identifiers::InstrumentId,
959 venue_order_id: Option<nautilus_model::identifiers::VenueOrderId>,
960 client_order_id: Option<nautilus_model::identifiers::ClientOrderId>,
961 quantity: Option<nautilus_model::types::Quantity>,
962 price: Option<nautilus_model::types::Price>,
963 ) -> PyResult<Bound<'py, PyAny>> {
964 let client = self.clone();
965
966 pyo3_async_runtimes::tokio::future_into_py(py, async move {
967 client
968 .modify_order(
969 product_type,
970 instrument_id,
971 venue_order_id,
972 client_order_id,
973 quantity,
974 price,
975 )
976 .await
977 .map_err(to_pyruntime_err)?;
978 Ok(())
979 })
980 }
981
982 #[pyo3(name = "cancel_order")]
983 #[pyo3(signature = (
984 product_type,
985 instrument_id,
986 venue_order_id=None,
987 client_order_id=None,
988 ))]
989 fn py_cancel_order<'py>(
990 &self,
991 py: Python<'py>,
992 product_type: crate::common::enums::BybitProductType,
993 instrument_id: nautilus_model::identifiers::InstrumentId,
994 venue_order_id: Option<nautilus_model::identifiers::VenueOrderId>,
995 client_order_id: Option<nautilus_model::identifiers::ClientOrderId>,
996 ) -> PyResult<Bound<'py, PyAny>> {
997 let client = self.clone();
998
999 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1000 client
1001 .cancel_order_by_id(product_type, instrument_id, venue_order_id, client_order_id)
1002 .await
1003 .map_err(to_pyruntime_err)?;
1004 Ok(())
1005 })
1006 }
1007}
1008
1009fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
1010 if let Err(e) = callback.call1(py, (py_obj,)) {
1011 tracing::error!("Error calling Python callback: {e}");
1012 }
1013}
1014
1015fn call_python_with_data<F>(callback: &Py<PyAny>, data_fn: F)
1016where
1017 F: FnOnce(Python<'_>) -> PyResult<Bound<'_, PyAny>> + Send + 'static,
1018{
1019 Python::attach(|py| match data_fn(py) {
1020 Ok(data) => {
1021 if let Err(e) = callback.call1(py, (data,)) {
1022 tracing::error!("Error calling Python callback: {e}");
1023 }
1024 }
1025 Err(e) => {
1026 tracing::error!("Error converting data to Python: {e}");
1027 }
1028 });
1029}