1use std::{num::NonZero, sync::Arc};
19
20use futures_util::StreamExt;
21use nautilus_core::{nanos::UnixNanos, python::to_pyruntime_err, time::get_atomic_clock_realtime};
22use nautilus_model::{
23 data::{BarSpecification, BarType, Data, OrderBookDeltas_API},
24 enums::{AggregationSource, BarAggregation, PriceType},
25 identifiers::{AccountId, InstrumentId},
26 instruments::Instrument,
27 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
28};
29use pyo3::{IntoPyObjectExt, prelude::*};
30
31use crate::{
32 common::{
33 credential::Credential,
34 enums::{BybitEnvironment, BybitProductType},
35 parse::make_bybit_symbol,
36 },
37 websocket::{
38 client::BybitWebSocketClient,
39 messages::{BybitWebSocketError, BybitWebSocketMessage},
40 parse::{
41 parse_kline_topic, parse_millis_i64, parse_orderbook_deltas, parse_ws_account_state,
42 parse_ws_fill_report, parse_ws_kline_bar, parse_ws_order_status_report,
43 parse_ws_position_status_report, parse_ws_trade_tick,
44 },
45 },
46};
47
48#[pymethods]
49impl BybitWebSocketError {
50 fn __repr__(&self) -> String {
51 format!(
52 "BybitWebSocketError(code={}, message='{}', conn_id={:?}, topic={:?})",
53 self.code, self.message, self.conn_id, self.topic
54 )
55 }
56
57 #[getter]
58 pub fn code(&self) -> i64 {
59 self.code
60 }
61
62 #[getter]
63 pub fn message(&self) -> &str {
64 &self.message
65 }
66
67 #[getter]
68 pub fn conn_id(&self) -> Option<&str> {
69 self.conn_id.as_deref()
70 }
71
72 #[getter]
73 pub fn topic(&self) -> Option<&str> {
74 self.topic.as_deref()
75 }
76
77 #[getter]
78 pub fn req_id(&self) -> Option<&str> {
79 self.req_id.as_deref()
80 }
81}
82
83#[pymethods]
84impl BybitWebSocketClient {
85 #[staticmethod]
86 #[pyo3(name = "new_public")]
87 #[pyo3(signature = (product_type, environment, url=None, heartbeat=None))]
88 fn py_new_public(
89 product_type: BybitProductType,
90 environment: BybitEnvironment,
91 url: Option<String>,
92 heartbeat: Option<u64>,
93 ) -> Self {
94 Self::new_public_with(product_type, environment, url, heartbeat)
95 }
96
97 #[staticmethod]
98 #[pyo3(name = "new_private")]
99 #[pyo3(signature = (environment, api_key, api_secret, url=None, heartbeat=None))]
100 fn py_new_private(
101 environment: BybitEnvironment,
102 api_key: String,
103 api_secret: String,
104 url: Option<String>,
105 heartbeat: Option<u64>,
106 ) -> Self {
107 tracing::debug!(
108 "Creating private WebSocket client with API key: {}",
109 &api_key[..api_key.len().min(10)]
110 );
111 let credential = crate::common::credential::Credential::new(api_key, api_secret);
112 Self::new_private(environment, credential, url, heartbeat)
113 }
114
115 #[staticmethod]
116 #[pyo3(name = "new_trade")]
117 #[pyo3(signature = (environment, api_key, api_secret, url=None, heartbeat=None))]
118 fn py_new_trade(
119 environment: BybitEnvironment,
120 api_key: String,
121 api_secret: String,
122 url: Option<String>,
123 heartbeat: Option<u64>,
124 ) -> Self {
125 let credential = Credential::new(api_key, api_secret);
126 Self::new_trade(environment, credential, url, heartbeat)
127 }
128
129 #[pyo3(name = "is_active")]
130 fn py_is_active<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
131 let client = self.clone();
132
133 pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(client.is_active().await) })
134 }
135
136 #[pyo3(name = "subscription_count")]
137 fn py_subscription_count(&self) -> usize {
138 self.subscription_count()
139 }
140
141 #[pyo3(name = "add_instrument")]
142 fn py_add_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
143 self.add_instrument(pyobject_to_instrument_any(py, instrument)?);
144 Ok(())
145 }
146
147 #[pyo3(name = "set_account_id")]
148 fn py_set_account_id(&mut self, account_id: AccountId) {
149 self.set_account_id(account_id);
150 }
151
152 #[pyo3(name = "connect")]
153 fn py_connect<'py>(
154 &mut self,
155 py: Python<'py>,
156 callback: Py<PyAny>,
157 ) -> PyResult<Bound<'py, PyAny>> {
158 let mut client = self.clone();
159
160 pyo3_async_runtimes::tokio::future_into_py(py, async move {
161 client.connect().await.map_err(to_pyruntime_err)?;
162
163 let stream = client.stream();
164
165 let instruments = Arc::clone(client.instruments());
166 let account_id = client.account_id();
167 let product_type = client.product_type();
168 let quote_cache = Arc::clone(client.quote_cache());
169
170 tokio::spawn(async move {
171 tokio::pin!(stream);
172
173 let clock = get_atomic_clock_realtime();
174
175 while let Some(msg) = stream.next().await {
176 match msg {
177 BybitWebSocketMessage::Orderbook(msg) => {
178 let raw_symbol = msg.data.s;
179
180 let symbol = product_type
181 .map(|pt| make_bybit_symbol(raw_symbol.as_str(), pt))
182 .unwrap_or(raw_symbol);
183
184 if let Some(instrument_entry) = instruments
185 .iter()
186 .find(|e| e.key().symbol.as_str() == symbol.as_str())
187 {
188 let instrument = instrument_entry.value();
189 let ts_init = clock.get_time_ns();
190
191 match parse_orderbook_deltas(&msg, instrument, ts_init) {
192 Ok(deltas) => {
193 Python::attach(|py| {
194 let py_obj = data_to_pycapsule(
195 py,
196 Data::Deltas(OrderBookDeltas_API::new(deltas)),
197 );
198 call_python(py, &callback, py_obj);
199 });
200 }
201 Err(e) => {
202 tracing::error!("Error parsing orderbook deltas: {e}");
203 }
204 }
205 } else {
206 tracing::warn!(
207 raw_symbol = %raw_symbol,
208 full_symbol = %symbol,
209 "No instrument found for symbol"
210 );
211 }
212 }
213 BybitWebSocketMessage::TickerLinear(msg) => {
214 let raw_symbol = msg.data.symbol;
215
216 let symbol = product_type
217 .map(|pt| make_bybit_symbol(raw_symbol.as_str(), pt))
218 .unwrap_or(raw_symbol);
219
220 if let Some(instrument_entry) = instruments
221 .iter()
222 .find(|e| e.key().symbol.as_str() == symbol.as_str())
223 {
224 let instrument = instrument_entry.value();
225 let instrument_id = instrument.id();
226 let ts_event = parse_millis_i64(msg.ts, "ticker.ts")
227 .unwrap_or_else(|_| get_atomic_clock_realtime().get_time_ns());
228 let ts_init = clock.get_time_ns();
229
230 match quote_cache.write().await.process_linear_ticker(
231 &msg.data,
232 instrument_id,
233 instrument,
234 ts_event,
235 ts_init,
236 ) {
237 Ok(quote) => {
238 Python::attach(|py| {
239 let py_obj = data_to_pycapsule(py, Data::Quote(quote));
240 call_python(py, &callback, py_obj);
241 });
242 }
243 Err(e) => {
244 tracing::debug!("Skipping partial ticker update: {e}");
245 }
246 }
247 } else {
248 tracing::warn!(
249 raw_symbol = %raw_symbol,
250 full_symbol = %symbol,
251 "No instrument found for symbol"
252 );
253 }
254 }
255 BybitWebSocketMessage::TickerOption(msg) => {
256 let raw_symbol = &msg.data.symbol;
257
258 let symbol = product_type
259 .map(|pt| make_bybit_symbol(raw_symbol, pt))
260 .unwrap_or_else(|| raw_symbol.as_str().into());
261
262 if let Some(instrument_entry) = instruments
263 .iter()
264 .find(|e| e.key().symbol.as_str() == symbol.as_str())
265 {
266 let instrument = instrument_entry.value();
267 let instrument_id = instrument.id();
268 let ts_event = parse_millis_i64(msg.ts, "ticker.ts")
269 .unwrap_or_else(|_| get_atomic_clock_realtime().get_time_ns());
270 let ts_init = clock.get_time_ns();
271
272 match quote_cache.write().await.process_option_ticker(
273 &msg.data,
274 instrument_id,
275 instrument,
276 ts_event,
277 ts_init,
278 ) {
279 Ok(quote) => {
280 Python::attach(|py| {
281 let py_obj = data_to_pycapsule(py, Data::Quote(quote));
282 call_python(py, &callback, py_obj);
283 });
284 }
285 Err(e) => {
286 tracing::debug!("Skipping partial ticker update: {e}");
287 }
288 }
289 } else {
290 tracing::warn!(
291 raw_symbol = %raw_symbol,
292 full_symbol = %symbol,
293 "No instrument found for symbol"
294 );
295 }
296 }
297 BybitWebSocketMessage::Trade(msg) => {
298 for trade in &msg.data {
299 let raw_symbol = trade.s;
300
301 let symbol = product_type
302 .map(|pt| make_bybit_symbol(raw_symbol.as_str(), pt))
303 .unwrap_or(raw_symbol);
304
305 if let Some(instrument_entry) = instruments
306 .iter()
307 .find(|e| e.key().symbol.as_str() == symbol.as_str())
308 {
309 let instrument = instrument_entry.value();
310 let ts_init = clock.get_time_ns();
311
312 match parse_ws_trade_tick(trade, instrument, ts_init) {
313 Ok(tick) => {
314 Python::attach(|py| {
315 let py_obj =
316 data_to_pycapsule(py, Data::Trade(tick));
317 call_python(py, &callback, py_obj);
318 });
319 }
320 Err(e) => {
321 tracing::error!("Error parsing trade tick: {e}");
322 }
323 }
324 } else {
325 tracing::warn!(
326 raw_symbol = %raw_symbol,
327 full_symbol = %symbol,
328 "No instrument found for symbol"
329 );
330 }
331 }
332 }
333 BybitWebSocketMessage::Kline(msg) => {
334 let (interval_str, raw_symbol) = match parse_kline_topic(&msg.topic) {
335 Ok(parts) => parts,
336 Err(e) => {
337 tracing::warn!("Failed to parse kline topic: {e}");
338 continue;
339 }
340 };
341
342 let symbol = product_type
343 .map(|pt| make_bybit_symbol(raw_symbol, pt))
344 .unwrap_or_else(|| raw_symbol.into());
345
346 if let Some(instrument_entry) = instruments
347 .iter()
348 .find(|e| e.key().symbol.as_str() == symbol.as_str())
349 {
350 let instrument = instrument_entry.value();
351 let ts_init = clock.get_time_ns();
352
353 let (step, aggregation) = match interval_str.parse::<usize>() {
354 Ok(minutes) if minutes > 0 => (minutes, BarAggregation::Minute),
355 _ => {
356 tracing::warn!(
358 "Unsupported kline interval: {}",
359 interval_str
360 );
361 continue;
362 }
363 };
364
365 if let Some(non_zero_step) = NonZero::new(step) {
366 let bar_spec = BarSpecification {
367 step: non_zero_step,
368 aggregation,
369 price_type: PriceType::Last,
370 };
371 let bar_type = BarType::new(
372 instrument.id(),
373 bar_spec,
374 AggregationSource::External,
375 );
376
377 for kline in &msg.data {
378 match parse_ws_kline_bar(
379 kline, instrument, bar_type, false, ts_init,
380 ) {
381 Ok(bar) => {
382 Python::attach(|py| {
383 let py_obj =
384 data_to_pycapsule(py, Data::Bar(bar));
385 call_python(py, &callback, py_obj);
386 });
387 }
388 Err(e) => {
389 tracing::error!("Error parsing kline to bar: {e}");
390 }
391 }
392 }
393 } else {
394 tracing::error!("Invalid step value: {}", step);
395 }
396 } else {
397 tracing::warn!(
398 raw_symbol = %raw_symbol,
399 full_symbol = %symbol,
400 "No instrument found for symbol"
401 );
402 }
403 }
404
405 BybitWebSocketMessage::AccountOrder(msg) => {
406 if let Some(account_id) = account_id {
407 for order in &msg.data {
408 let raw_symbol = order.symbol;
409
410 let symbol =
411 make_bybit_symbol(raw_symbol.as_str(), order.category);
412
413 if let Some(instrument_entry) = instruments
414 .iter()
415 .find(|e| e.key().symbol.as_str() == symbol.as_str())
416 {
417 let instrument = instrument_entry.value();
418 let ts_init = clock.get_time_ns();
419
420 match parse_ws_order_status_report(
421 order, instrument, account_id, ts_init,
422 ) {
423 Ok(report) => {
424 Python::attach(|py| {
425 if let Ok(py_obj) = report.into_py_any(py) {
426 call_python(py, &callback, py_obj);
427 }
428 });
429 }
430 Err(e) => {
431 tracing::error!(
432 "Error parsing order status report: {e}"
433 );
434 }
435 }
436 } else {
437 tracing::warn!(
438 raw_symbol = %raw_symbol,
439 full_symbol = %symbol,
440 "No instrument found for symbol"
441 );
442 }
443 }
444 } else {
445 tracing::error!(
446 "Received AccountOrder message but account_id is not set"
447 );
448 }
449 }
450 BybitWebSocketMessage::AccountExecution(msg) => {
451 if let Some(account_id) = account_id {
452 for execution in &msg.data {
453 let raw_symbol = execution.symbol;
454 let symbol =
455 make_bybit_symbol(raw_symbol.as_str(), execution.category);
456
457 if let Some(instrument_entry) = instruments
458 .iter()
459 .find(|e| e.key().symbol.as_str() == symbol.as_str())
460 {
461 let instrument = instrument_entry.value();
462 let ts_init = clock.get_time_ns();
463
464 match parse_ws_fill_report(
465 execution, account_id, instrument, ts_init,
466 ) {
467 Ok(report) => {
468 Python::attach(|py| {
469 if let Ok(py_obj) = report.into_py_any(py) {
470 call_python(py, &callback, py_obj);
471 }
472 });
473 }
474 Err(e) => {
475 tracing::error!("Error parsing fill report: {e}");
476 }
477 }
478 } else {
479 tracing::warn!(
480 raw_symbol = %raw_symbol,
481 full_symbol = %symbol,
482 "No instrument found for symbol"
483 );
484 }
485 }
486 } else {
487 tracing::error!(
488 "Received AccountExecution message but account_id is not set"
489 );
490 }
491 }
492 BybitWebSocketMessage::AccountWallet(msg) => {
493 if let Some(account_id) = account_id {
494 for wallet in &msg.data {
495 let ts_event =
496 UnixNanos::from(msg.creation_time as u64 * 1_000_000);
497 let ts_init = clock.get_time_ns();
498
499 match parse_ws_account_state(
500 wallet, account_id, ts_event, ts_init,
501 ) {
502 Ok(state) => {
503 Python::attach(|py| {
504 if let Ok(py_obj) = state.into_py_any(py) {
505 call_python(py, &callback, py_obj);
506 }
507 });
508 }
509 Err(e) => {
510 tracing::error!("Error parsing account state: {e}");
511 }
512 }
513 }
514 } else {
515 tracing::error!(
516 "Received AccountWallet message but account_id is not set"
517 );
518 }
519 }
520 BybitWebSocketMessage::AccountPosition(msg) => {
521 if let Some(account_id) = account_id {
522 for position in &msg.data {
523 let raw_symbol = position.symbol;
524
525 if let Some(instrument_entry) = instruments.iter().find(|e| {
528 let inst_symbol = e.key().symbol.as_str();
529 inst_symbol.starts_with(raw_symbol.as_str())
531 && inst_symbol.len() > raw_symbol.len()
532 && inst_symbol.as_bytes().get(raw_symbol.len())
533 == Some(&b'-')
534 }) {
535 let instrument = instrument_entry.value();
536 let ts_init = clock.get_time_ns();
537
538 match parse_ws_position_status_report(
539 position, account_id, instrument, ts_init,
540 ) {
541 Ok(report) => {
542 Python::attach(|py| {
543 if let Ok(py_obj) = report.into_py_any(py) {
544 call_python(py, &callback, py_obj);
545 }
546 });
547 }
548 Err(e) => {
549 tracing::error!(
550 "Error parsing position status report: {e}"
551 );
552 }
553 }
554 } else {
555 tracing::warn!(
556 raw_symbol = %raw_symbol,
557 "No instrument found for symbol"
558 );
559 }
560 }
561 } else {
562 tracing::error!(
563 "Received AccountPosition message but account_id is not set"
564 );
565 }
566 }
567 BybitWebSocketMessage::Error(msg) => {
568 call_python_with_data(&callback, |py| {
569 msg.into_py_any(py).map(|obj| obj.into_bound(py))
570 });
571 }
572 BybitWebSocketMessage::Reconnected => {}
573 BybitWebSocketMessage::Pong => {}
574 BybitWebSocketMessage::Response(msg) => {
575 tracing::debug!("Received response message: {:?}", msg);
576 }
577 BybitWebSocketMessage::Auth(msg) => {
578 tracing::debug!("Received auth message: {:?}", msg);
579 }
580 BybitWebSocketMessage::Subscription(msg) => {
581 tracing::debug!("Received subscription message: {:?}", msg);
582 }
583 BybitWebSocketMessage::Raw(value) => {
584 tracing::debug!("Received raw/unhandled message, skipping: {value}");
585 }
586 }
587 }
588 });
589
590 Ok(())
591 })
592 }
593
594 #[pyo3(name = "close")]
595 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
596 let mut client = self.clone();
597
598 pyo3_async_runtimes::tokio::future_into_py(py, async move {
599 if let Err(e) = client.close().await {
600 tracing::error!("Error on close: {e}");
601 }
602 Ok(())
603 })
604 }
605
606 #[pyo3(name = "subscribe")]
607 fn py_subscribe<'py>(
608 &self,
609 py: Python<'py>,
610 topics: Vec<String>,
611 ) -> PyResult<Bound<'py, PyAny>> {
612 let client = self.clone();
613
614 pyo3_async_runtimes::tokio::future_into_py(py, async move {
615 client.subscribe(topics).await.map_err(to_pyruntime_err)?;
616 Ok(())
617 })
618 }
619
620 #[pyo3(name = "unsubscribe")]
621 fn py_unsubscribe<'py>(
622 &self,
623 py: Python<'py>,
624 topics: Vec<String>,
625 ) -> PyResult<Bound<'py, PyAny>> {
626 let client = self.clone();
627
628 pyo3_async_runtimes::tokio::future_into_py(py, async move {
629 client.unsubscribe(topics).await.map_err(to_pyruntime_err)?;
630 Ok(())
631 })
632 }
633
634 #[pyo3(name = "subscribe_orderbook")]
635 fn py_subscribe_orderbook<'py>(
636 &self,
637 py: Python<'py>,
638 instrument_id: InstrumentId,
639 depth: u32,
640 ) -> PyResult<Bound<'py, PyAny>> {
641 let client = self.clone();
642
643 pyo3_async_runtimes::tokio::future_into_py(py, async move {
644 client
645 .subscribe_orderbook(instrument_id, depth)
646 .await
647 .map_err(to_pyruntime_err)?;
648 Ok(())
649 })
650 }
651
652 #[pyo3(name = "unsubscribe_orderbook")]
653 fn py_unsubscribe_orderbook<'py>(
654 &self,
655 py: Python<'py>,
656 instrument_id: InstrumentId,
657 depth: u32,
658 ) -> PyResult<Bound<'py, PyAny>> {
659 let client = self.clone();
660
661 pyo3_async_runtimes::tokio::future_into_py(py, async move {
662 client
663 .unsubscribe_orderbook(instrument_id, depth)
664 .await
665 .map_err(to_pyruntime_err)?;
666 Ok(())
667 })
668 }
669
670 #[pyo3(name = "subscribe_trades")]
671 fn py_subscribe_trades<'py>(
672 &self,
673 py: Python<'py>,
674 instrument_id: InstrumentId,
675 ) -> PyResult<Bound<'py, PyAny>> {
676 let client = self.clone();
677
678 pyo3_async_runtimes::tokio::future_into_py(py, async move {
679 client
680 .subscribe_trades(instrument_id)
681 .await
682 .map_err(to_pyruntime_err)?;
683 Ok(())
684 })
685 }
686
687 #[pyo3(name = "unsubscribe_trades")]
688 fn py_unsubscribe_trades<'py>(
689 &self,
690 py: Python<'py>,
691 instrument_id: InstrumentId,
692 ) -> PyResult<Bound<'py, PyAny>> {
693 let client = self.clone();
694
695 pyo3_async_runtimes::tokio::future_into_py(py, async move {
696 client
697 .unsubscribe_trades(instrument_id)
698 .await
699 .map_err(to_pyruntime_err)?;
700 Ok(())
701 })
702 }
703
704 #[pyo3(name = "subscribe_ticker")]
705 fn py_subscribe_ticker<'py>(
706 &self,
707 py: Python<'py>,
708 instrument_id: InstrumentId,
709 ) -> PyResult<Bound<'py, PyAny>> {
710 let client = self.clone();
711
712 pyo3_async_runtimes::tokio::future_into_py(py, async move {
713 client
714 .subscribe_ticker(instrument_id)
715 .await
716 .map_err(to_pyruntime_err)?;
717 Ok(())
718 })
719 }
720
721 #[pyo3(name = "unsubscribe_ticker")]
722 fn py_unsubscribe_ticker<'py>(
723 &self,
724 py: Python<'py>,
725 instrument_id: InstrumentId,
726 ) -> PyResult<Bound<'py, PyAny>> {
727 let client = self.clone();
728
729 pyo3_async_runtimes::tokio::future_into_py(py, async move {
730 client
731 .unsubscribe_ticker(instrument_id)
732 .await
733 .map_err(to_pyruntime_err)?;
734 Ok(())
735 })
736 }
737
738 #[pyo3(name = "subscribe_klines")]
739 fn py_subscribe_klines<'py>(
740 &self,
741 py: Python<'py>,
742 instrument_id: InstrumentId,
743 interval: String,
744 ) -> PyResult<Bound<'py, PyAny>> {
745 let client = self.clone();
746
747 pyo3_async_runtimes::tokio::future_into_py(py, async move {
748 client
749 .subscribe_klines(instrument_id, interval)
750 .await
751 .map_err(to_pyruntime_err)?;
752 Ok(())
753 })
754 }
755
756 #[pyo3(name = "unsubscribe_klines")]
757 fn py_unsubscribe_klines<'py>(
758 &self,
759 py: Python<'py>,
760 instrument_id: InstrumentId,
761 interval: String,
762 ) -> PyResult<Bound<'py, PyAny>> {
763 let client = self.clone();
764
765 pyo3_async_runtimes::tokio::future_into_py(py, async move {
766 client
767 .unsubscribe_klines(instrument_id, interval)
768 .await
769 .map_err(to_pyruntime_err)?;
770 Ok(())
771 })
772 }
773
774 #[pyo3(name = "subscribe_orders")]
775 fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
776 let client = self.clone();
777
778 pyo3_async_runtimes::tokio::future_into_py(py, async move {
779 client.subscribe_orders().await.map_err(to_pyruntime_err)?;
780 Ok(())
781 })
782 }
783
784 #[pyo3(name = "unsubscribe_orders")]
785 fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
786 let client = self.clone();
787
788 pyo3_async_runtimes::tokio::future_into_py(py, async move {
789 client
790 .unsubscribe_orders()
791 .await
792 .map_err(to_pyruntime_err)?;
793 Ok(())
794 })
795 }
796
797 #[pyo3(name = "subscribe_executions")]
798 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
799 let client = self.clone();
800
801 pyo3_async_runtimes::tokio::future_into_py(py, async move {
802 client
803 .subscribe_executions()
804 .await
805 .map_err(to_pyruntime_err)?;
806 Ok(())
807 })
808 }
809
810 #[pyo3(name = "unsubscribe_executions")]
811 fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
812 let client = self.clone();
813
814 pyo3_async_runtimes::tokio::future_into_py(py, async move {
815 client
816 .unsubscribe_executions()
817 .await
818 .map_err(to_pyruntime_err)?;
819 Ok(())
820 })
821 }
822
823 #[pyo3(name = "subscribe_positions")]
824 fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
825 let client = self.clone();
826
827 pyo3_async_runtimes::tokio::future_into_py(py, async move {
828 client
829 .subscribe_positions()
830 .await
831 .map_err(to_pyruntime_err)?;
832 Ok(())
833 })
834 }
835
836 #[pyo3(name = "unsubscribe_positions")]
837 fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
838 let client = self.clone();
839
840 pyo3_async_runtimes::tokio::future_into_py(py, async move {
841 client
842 .unsubscribe_positions()
843 .await
844 .map_err(to_pyruntime_err)?;
845 Ok(())
846 })
847 }
848
849 #[pyo3(name = "subscribe_wallet")]
850 fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
851 let client = self.clone();
852
853 pyo3_async_runtimes::tokio::future_into_py(py, async move {
854 client.subscribe_wallet().await.map_err(to_pyruntime_err)?;
855 Ok(())
856 })
857 }
858
859 #[pyo3(name = "unsubscribe_wallet")]
860 fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
861 let client = self.clone();
862
863 pyo3_async_runtimes::tokio::future_into_py(py, async move {
864 client
865 .unsubscribe_wallet()
866 .await
867 .map_err(to_pyruntime_err)?;
868 Ok(())
869 })
870 }
871
872 #[pyo3(name = "wait_until_active")]
873 fn py_wait_until_active<'py>(
874 &self,
875 py: Python<'py>,
876 timeout_secs: f64,
877 ) -> PyResult<Bound<'py, PyAny>> {
878 let client = self.clone();
879
880 pyo3_async_runtimes::tokio::future_into_py(py, async move {
881 client
882 .wait_until_active(timeout_secs)
883 .await
884 .map_err(to_pyruntime_err)?;
885 Ok(())
886 })
887 }
888
889 #[pyo3(name = "submit_order")]
890 #[pyo3(signature = (
891 product_type,
892 instrument_id,
893 client_order_id,
894 order_side,
895 order_type,
896 quantity,
897 time_in_force=None,
898 price=None,
899 trigger_price=None,
900 post_only=None,
901 reduce_only=None,
902 ))]
903 #[allow(clippy::too_many_arguments)]
904 fn py_submit_order<'py>(
905 &self,
906 py: Python<'py>,
907 product_type: crate::common::enums::BybitProductType,
908 instrument_id: nautilus_model::identifiers::InstrumentId,
909 client_order_id: nautilus_model::identifiers::ClientOrderId,
910 order_side: nautilus_model::enums::OrderSide,
911 order_type: nautilus_model::enums::OrderType,
912 quantity: nautilus_model::types::Quantity,
913 time_in_force: Option<nautilus_model::enums::TimeInForce>,
914 price: Option<nautilus_model::types::Price>,
915 trigger_price: Option<nautilus_model::types::Price>,
916 post_only: Option<bool>,
917 reduce_only: Option<bool>,
918 ) -> PyResult<Bound<'py, PyAny>> {
919 let client = self.clone();
920
921 pyo3_async_runtimes::tokio::future_into_py(py, async move {
922 client
923 .submit_order(
924 product_type,
925 instrument_id,
926 client_order_id,
927 order_side,
928 order_type,
929 quantity,
930 time_in_force,
931 price,
932 trigger_price,
933 post_only,
934 reduce_only,
935 )
936 .await
937 .map_err(to_pyruntime_err)?;
938 Ok(())
939 })
940 }
941
942 #[pyo3(name = "modify_order")]
943 #[pyo3(signature = (
944 product_type,
945 instrument_id,
946 venue_order_id=None,
947 client_order_id=None,
948 quantity=None,
949 price=None,
950 ))]
951 #[allow(clippy::too_many_arguments)]
952 fn py_modify_order<'py>(
953 &self,
954 py: Python<'py>,
955 product_type: crate::common::enums::BybitProductType,
956 instrument_id: nautilus_model::identifiers::InstrumentId,
957 venue_order_id: Option<nautilus_model::identifiers::VenueOrderId>,
958 client_order_id: Option<nautilus_model::identifiers::ClientOrderId>,
959 quantity: Option<nautilus_model::types::Quantity>,
960 price: Option<nautilus_model::types::Price>,
961 ) -> PyResult<Bound<'py, PyAny>> {
962 let client = self.clone();
963
964 pyo3_async_runtimes::tokio::future_into_py(py, async move {
965 client
966 .modify_order(
967 product_type,
968 instrument_id,
969 venue_order_id,
970 client_order_id,
971 quantity,
972 price,
973 )
974 .await
975 .map_err(to_pyruntime_err)?;
976 Ok(())
977 })
978 }
979
980 #[pyo3(name = "cancel_order")]
981 #[pyo3(signature = (
982 product_type,
983 instrument_id,
984 venue_order_id=None,
985 client_order_id=None,
986 ))]
987 fn py_cancel_order<'py>(
988 &self,
989 py: Python<'py>,
990 product_type: crate::common::enums::BybitProductType,
991 instrument_id: nautilus_model::identifiers::InstrumentId,
992 venue_order_id: Option<nautilus_model::identifiers::VenueOrderId>,
993 client_order_id: Option<nautilus_model::identifiers::ClientOrderId>,
994 ) -> PyResult<Bound<'py, PyAny>> {
995 let client = self.clone();
996
997 pyo3_async_runtimes::tokio::future_into_py(py, async move {
998 client
999 .cancel_order_by_id(product_type, instrument_id, venue_order_id, client_order_id)
1000 .await
1001 .map_err(to_pyruntime_err)?;
1002 Ok(())
1003 })
1004 }
1005}
1006
1007fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
1008 if let Err(e) = callback.call1(py, (py_obj,)) {
1009 tracing::error!("Error calling Python callback: {e}");
1010 }
1011}
1012
1013fn call_python_with_data<F>(callback: &Py<PyAny>, data_fn: F)
1014where
1015 F: FnOnce(Python<'_>) -> PyResult<Bound<'_, PyAny>> + Send + 'static,
1016{
1017 Python::attach(|py| match data_fn(py) {
1018 Ok(data) => {
1019 if let Err(e) = callback.call1(py, (data,)) {
1020 tracing::error!("Error calling Python callback: {e}");
1021 }
1022 }
1023 Err(e) => {
1024 tracing::error!("Error converting data to Python: {e}");
1025 }
1026 });
1027}