1use futures_util::StreamExt;
19use nautilus_core::python::to_pyruntime_err;
20use nautilus_model::{
21 data::{Data, OrderBookDeltas_API},
22 enums::{OrderSide, OrderType, TimeInForce},
23 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
24 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
25 types::{Price, Quantity},
26};
27use pyo3::{IntoPyObjectExt, prelude::*};
28
29use crate::{
30 common::{
31 credential::Credential,
32 enums::{BybitEnvironment, BybitProductType},
33 },
34 python::params::{BybitWsAmendOrderParams, BybitWsPlaceOrderParams},
35 websocket::{
36 client::BybitWebSocketClient,
37 messages::{BybitWebSocketError, NautilusWsMessage},
38 },
39};
40
41#[pymethods]
42impl BybitWebSocketError {
43 fn __repr__(&self) -> String {
44 format!(
45 "BybitWebSocketError(code={}, message='{}', conn_id={:?}, topic={:?})",
46 self.code, self.message, self.conn_id, self.topic
47 )
48 }
49
50 #[getter]
51 pub fn code(&self) -> i64 {
52 self.code
53 }
54
55 #[getter]
56 pub fn message(&self) -> &str {
57 &self.message
58 }
59
60 #[getter]
61 pub fn conn_id(&self) -> Option<&str> {
62 self.conn_id.as_deref()
63 }
64
65 #[getter]
66 pub fn topic(&self) -> Option<&str> {
67 self.topic.as_deref()
68 }
69
70 #[getter]
71 pub fn req_id(&self) -> Option<&str> {
72 self.req_id.as_deref()
73 }
74}
75
76#[pymethods]
77impl BybitWebSocketClient {
78 #[staticmethod]
79 #[pyo3(name = "new_public")]
80 #[pyo3(signature = (product_type, environment, url=None, heartbeat=None))]
81 fn py_new_public(
82 product_type: BybitProductType,
83 environment: BybitEnvironment,
84 url: Option<String>,
85 heartbeat: Option<u64>,
86 ) -> Self {
87 Self::new_public_with(product_type, environment, url, heartbeat)
88 }
89
90 #[staticmethod]
91 #[pyo3(name = "new_private")]
92 #[pyo3(signature = (environment, api_key=None, api_secret=None, url=None, heartbeat=None))]
93 fn py_new_private(
94 environment: BybitEnvironment,
95 api_key: Option<String>,
96 api_secret: Option<String>,
97 url: Option<String>,
98 heartbeat: Option<u64>,
99 ) -> Self {
100 let (final_api_key, final_api_secret) = if api_key.is_none() && api_secret.is_none() {
102 let (key_var, secret_var) = match environment {
103 BybitEnvironment::Testnet => ("BYBIT_TESTNET_API_KEY", "BYBIT_TESTNET_API_SECRET"),
104 _ => ("BYBIT_API_KEY", "BYBIT_API_SECRET"),
105 };
106 let env_key = std::env::var(key_var).ok().unwrap_or_default();
107 let env_secret = std::env::var(secret_var).ok().unwrap_or_default();
108 (env_key, env_secret)
109 } else {
110 (api_key.unwrap_or_default(), api_secret.unwrap_or_default())
111 };
112
113 tracing::debug!(
114 "Creating private WebSocket client with API key: {}",
115 &final_api_key[..final_api_key.len().min(10)]
116 );
117 let credential = Credential::new(final_api_key, final_api_secret);
118 Self::new_private(environment, credential, url, heartbeat)
119 }
120
121 #[staticmethod]
122 #[pyo3(name = "new_trade")]
123 #[pyo3(signature = (environment, api_key=None, api_secret=None, url=None, heartbeat=None))]
124 fn py_new_trade(
125 environment: BybitEnvironment,
126 api_key: Option<String>,
127 api_secret: Option<String>,
128 url: Option<String>,
129 heartbeat: Option<u64>,
130 ) -> Self {
131 let (final_api_key, final_api_secret) = if api_key.is_none() && api_secret.is_none() {
133 let (key_var, secret_var) = match environment {
134 BybitEnvironment::Testnet => ("BYBIT_TESTNET_API_KEY", "BYBIT_TESTNET_API_SECRET"),
135 _ => ("BYBIT_API_KEY", "BYBIT_API_SECRET"),
136 };
137 let env_key = std::env::var(key_var).ok().unwrap_or_default();
138 let env_secret = std::env::var(secret_var).ok().unwrap_or_default();
139 (env_key, env_secret)
140 } else {
141 (api_key.unwrap_or_default(), api_secret.unwrap_or_default())
142 };
143
144 let credential = Credential::new(final_api_key, final_api_secret);
145 Self::new_trade(environment, credential, url, heartbeat)
146 }
147
148 #[pyo3(name = "is_active")]
149 fn py_is_active(&self) -> bool {
150 self.is_active()
151 }
152
153 #[pyo3(name = "is_closed")]
154 fn py_is_closed(&self) -> bool {
155 self.is_closed()
156 }
157
158 #[pyo3(name = "subscription_count")]
159 fn py_subscription_count(&self) -> usize {
160 self.subscription_count()
161 }
162
163 #[getter]
164 #[pyo3(name = "api_key_masked")]
165 #[must_use]
166 pub fn py_api_key_masked(&self) -> Option<String> {
167 self.credential().map(|c| c.api_key_masked())
168 }
169
170 #[pyo3(name = "cache_instrument")]
171 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
172 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
173 Ok(())
174 }
175
176 #[pyo3(name = "set_account_id")]
177 fn py_set_account_id(&mut self, account_id: AccountId) {
178 self.set_account_id(account_id);
179 }
180
181 #[pyo3(name = "set_mm_level")]
182 fn py_set_mm_level(&self, mm_level: u8) {
183 self.set_mm_level(mm_level);
184 }
185
186 #[pyo3(name = "connect")]
187 fn py_connect<'py>(
188 &mut self,
189 py: Python<'py>,
190 callback: Py<PyAny>,
191 ) -> PyResult<Bound<'py, PyAny>> {
192 let mut client = self.clone();
193
194 pyo3_async_runtimes::tokio::future_into_py(py, async move {
195 client.connect().await.map_err(to_pyruntime_err)?;
196
197 let stream = client.stream();
198
199 tokio::spawn(async move {
200 tokio::pin!(stream);
201
202 while let Some(msg) = stream.next().await {
203 match msg {
204 NautilusWsMessage::Data(data_vec) => {
205 Python::attach(|py| {
206 for data in data_vec {
207 let py_obj = data_to_pycapsule(py, data);
208 call_python(py, &callback, py_obj);
209 }
210 });
211 }
212 NautilusWsMessage::Deltas(deltas) => {
213 Python::attach(|py| {
214 let py_obj = data_to_pycapsule(
215 py,
216 Data::Deltas(OrderBookDeltas_API::new(deltas)),
217 );
218 call_python(py, &callback, py_obj);
219 });
220 }
221 NautilusWsMessage::FundingRates(rates) => {
222 for rate in rates {
223 call_python_with_data(&callback, move |py| {
224 rate.into_py_any(py).map(|obj| obj.into_bound(py))
225 });
226 }
227 }
228 NautilusWsMessage::OrderStatusReports(reports) => {
229 for report in reports {
230 call_python_with_data(&callback, move |py| {
231 report.into_py_any(py).map(|obj| obj.into_bound(py))
232 });
233 }
234 }
235 NautilusWsMessage::FillReports(reports) => {
236 for report in reports {
237 call_python_with_data(&callback, move |py| {
238 report.into_py_any(py).map(|obj| obj.into_bound(py))
239 });
240 }
241 }
242 NautilusWsMessage::PositionStatusReport(report) => {
243 call_python_with_data(&callback, move |py| {
244 report.into_py_any(py).map(|obj| obj.into_bound(py))
245 });
246 }
247 NautilusWsMessage::AccountState(state) => {
248 call_python_with_data(&callback, move |py| {
249 state.into_py_any(py).map(|obj| obj.into_bound(py))
250 });
251 }
252 NautilusWsMessage::OrderRejected(event) => {
253 call_python_with_data(&callback, move |py| {
254 event.into_py_any(py).map(|obj| obj.into_bound(py))
255 });
256 }
257 NautilusWsMessage::OrderCancelRejected(event) => {
258 call_python_with_data(&callback, move |py| {
259 event.into_py_any(py).map(|obj| obj.into_bound(py))
260 });
261 }
262 NautilusWsMessage::OrderModifyRejected(event) => {
263 call_python_with_data(&callback, move |py| {
264 event.into_py_any(py).map(|obj| obj.into_bound(py))
265 });
266 }
267 NautilusWsMessage::Error(err) => {
268 call_python_with_data(&callback, move |py| {
269 err.into_py_any(py).map(|obj| obj.into_bound(py))
270 });
271 }
272 NautilusWsMessage::Reconnected => {
273 tracing::info!("WebSocket reconnected");
274 }
275 NautilusWsMessage::Authenticated => {
276 tracing::info!("WebSocket authenticated");
277 }
278 }
279 }
280 });
281
282 Ok(())
283 })
284 }
285
286 #[pyo3(name = "close")]
287 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
288 let mut client = self.clone();
289
290 pyo3_async_runtimes::tokio::future_into_py(py, async move {
291 if let Err(e) = client.close().await {
292 tracing::error!("Error on close: {e}");
293 }
294 Ok(())
295 })
296 }
297
298 #[pyo3(name = "subscribe")]
299 fn py_subscribe<'py>(
300 &self,
301 py: Python<'py>,
302 topics: Vec<String>,
303 ) -> PyResult<Bound<'py, PyAny>> {
304 let client = self.clone();
305
306 pyo3_async_runtimes::tokio::future_into_py(py, async move {
307 client.subscribe(topics).await.map_err(to_pyruntime_err)?;
308 Ok(())
309 })
310 }
311
312 #[pyo3(name = "unsubscribe")]
313 fn py_unsubscribe<'py>(
314 &self,
315 py: Python<'py>,
316 topics: Vec<String>,
317 ) -> PyResult<Bound<'py, PyAny>> {
318 let client = self.clone();
319
320 pyo3_async_runtimes::tokio::future_into_py(py, async move {
321 client.unsubscribe(topics).await.map_err(to_pyruntime_err)?;
322 Ok(())
323 })
324 }
325
326 #[pyo3(name = "subscribe_orderbook")]
327 fn py_subscribe_orderbook<'py>(
328 &self,
329 py: Python<'py>,
330 instrument_id: InstrumentId,
331 depth: u32,
332 ) -> PyResult<Bound<'py, PyAny>> {
333 let client = self.clone();
334
335 pyo3_async_runtimes::tokio::future_into_py(py, async move {
336 client
337 .subscribe_orderbook(instrument_id, depth)
338 .await
339 .map_err(to_pyruntime_err)?;
340 Ok(())
341 })
342 }
343
344 #[pyo3(name = "unsubscribe_orderbook")]
345 fn py_unsubscribe_orderbook<'py>(
346 &self,
347 py: Python<'py>,
348 instrument_id: InstrumentId,
349 depth: u32,
350 ) -> PyResult<Bound<'py, PyAny>> {
351 let client = self.clone();
352
353 pyo3_async_runtimes::tokio::future_into_py(py, async move {
354 client
355 .unsubscribe_orderbook(instrument_id, depth)
356 .await
357 .map_err(to_pyruntime_err)?;
358 Ok(())
359 })
360 }
361
362 #[pyo3(name = "subscribe_trades")]
363 fn py_subscribe_trades<'py>(
364 &self,
365 py: Python<'py>,
366 instrument_id: InstrumentId,
367 ) -> PyResult<Bound<'py, PyAny>> {
368 let client = self.clone();
369
370 pyo3_async_runtimes::tokio::future_into_py(py, async move {
371 client
372 .subscribe_trades(instrument_id)
373 .await
374 .map_err(to_pyruntime_err)?;
375 Ok(())
376 })
377 }
378
379 #[pyo3(name = "unsubscribe_trades")]
380 fn py_unsubscribe_trades<'py>(
381 &self,
382 py: Python<'py>,
383 instrument_id: InstrumentId,
384 ) -> PyResult<Bound<'py, PyAny>> {
385 let client = self.clone();
386
387 pyo3_async_runtimes::tokio::future_into_py(py, async move {
388 client
389 .unsubscribe_trades(instrument_id)
390 .await
391 .map_err(to_pyruntime_err)?;
392 Ok(())
393 })
394 }
395
396 #[pyo3(name = "subscribe_ticker")]
397 fn py_subscribe_ticker<'py>(
398 &self,
399 py: Python<'py>,
400 instrument_id: InstrumentId,
401 ) -> PyResult<Bound<'py, PyAny>> {
402 let client = self.clone();
403
404 pyo3_async_runtimes::tokio::future_into_py(py, async move {
405 client
406 .subscribe_ticker(instrument_id)
407 .await
408 .map_err(to_pyruntime_err)?;
409 Ok(())
410 })
411 }
412
413 #[pyo3(name = "unsubscribe_ticker")]
414 fn py_unsubscribe_ticker<'py>(
415 &self,
416 py: Python<'py>,
417 instrument_id: InstrumentId,
418 ) -> PyResult<Bound<'py, PyAny>> {
419 let client = self.clone();
420
421 pyo3_async_runtimes::tokio::future_into_py(py, async move {
422 client
423 .unsubscribe_ticker(instrument_id)
424 .await
425 .map_err(to_pyruntime_err)?;
426 Ok(())
427 })
428 }
429
430 #[pyo3(name = "subscribe_klines")]
431 fn py_subscribe_klines<'py>(
432 &self,
433 py: Python<'py>,
434 instrument_id: InstrumentId,
435 interval: String,
436 ) -> PyResult<Bound<'py, PyAny>> {
437 let client = self.clone();
438
439 pyo3_async_runtimes::tokio::future_into_py(py, async move {
440 client
441 .subscribe_klines(instrument_id, interval)
442 .await
443 .map_err(to_pyruntime_err)?;
444 Ok(())
445 })
446 }
447
448 #[pyo3(name = "unsubscribe_klines")]
449 fn py_unsubscribe_klines<'py>(
450 &self,
451 py: Python<'py>,
452 instrument_id: InstrumentId,
453 interval: String,
454 ) -> PyResult<Bound<'py, PyAny>> {
455 let client = self.clone();
456
457 pyo3_async_runtimes::tokio::future_into_py(py, async move {
458 client
459 .unsubscribe_klines(instrument_id, interval)
460 .await
461 .map_err(to_pyruntime_err)?;
462 Ok(())
463 })
464 }
465
466 #[pyo3(name = "subscribe_orders")]
467 fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
468 let client = self.clone();
469
470 pyo3_async_runtimes::tokio::future_into_py(py, async move {
471 client.subscribe_orders().await.map_err(to_pyruntime_err)?;
472 Ok(())
473 })
474 }
475
476 #[pyo3(name = "unsubscribe_orders")]
477 fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
478 let client = self.clone();
479
480 pyo3_async_runtimes::tokio::future_into_py(py, async move {
481 client
482 .unsubscribe_orders()
483 .await
484 .map_err(to_pyruntime_err)?;
485 Ok(())
486 })
487 }
488
489 #[pyo3(name = "subscribe_executions")]
490 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
491 let client = self.clone();
492
493 pyo3_async_runtimes::tokio::future_into_py(py, async move {
494 client
495 .subscribe_executions()
496 .await
497 .map_err(to_pyruntime_err)?;
498 Ok(())
499 })
500 }
501
502 #[pyo3(name = "unsubscribe_executions")]
503 fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
504 let client = self.clone();
505
506 pyo3_async_runtimes::tokio::future_into_py(py, async move {
507 client
508 .unsubscribe_executions()
509 .await
510 .map_err(to_pyruntime_err)?;
511 Ok(())
512 })
513 }
514
515 #[pyo3(name = "subscribe_positions")]
516 fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
517 let client = self.clone();
518
519 pyo3_async_runtimes::tokio::future_into_py(py, async move {
520 client
521 .subscribe_positions()
522 .await
523 .map_err(to_pyruntime_err)?;
524 Ok(())
525 })
526 }
527
528 #[pyo3(name = "unsubscribe_positions")]
529 fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
530 let client = self.clone();
531
532 pyo3_async_runtimes::tokio::future_into_py(py, async move {
533 client
534 .unsubscribe_positions()
535 .await
536 .map_err(to_pyruntime_err)?;
537 Ok(())
538 })
539 }
540
541 #[pyo3(name = "subscribe_wallet")]
542 fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
543 let client = self.clone();
544
545 pyo3_async_runtimes::tokio::future_into_py(py, async move {
546 client.subscribe_wallet().await.map_err(to_pyruntime_err)?;
547 Ok(())
548 })
549 }
550
551 #[pyo3(name = "unsubscribe_wallet")]
552 fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
553 let client = self.clone();
554
555 pyo3_async_runtimes::tokio::future_into_py(py, async move {
556 client
557 .unsubscribe_wallet()
558 .await
559 .map_err(to_pyruntime_err)?;
560 Ok(())
561 })
562 }
563
564 #[pyo3(name = "wait_until_active")]
565 fn py_wait_until_active<'py>(
566 &self,
567 py: Python<'py>,
568 timeout_secs: f64,
569 ) -> PyResult<Bound<'py, PyAny>> {
570 let client = self.clone();
571
572 pyo3_async_runtimes::tokio::future_into_py(py, async move {
573 client
574 .wait_until_active(timeout_secs)
575 .await
576 .map_err(to_pyruntime_err)?;
577 Ok(())
578 })
579 }
580
581 #[pyo3(name = "submit_order")]
582 #[pyo3(signature = (
583 product_type,
584 trader_id,
585 strategy_id,
586 instrument_id,
587 client_order_id,
588 order_side,
589 order_type,
590 quantity,
591 is_quote_quantity=false,
592 time_in_force=None,
593 price=None,
594 trigger_price=None,
595 post_only=None,
596 reduce_only=None,
597 is_leverage=false,
598 ))]
599 #[allow(clippy::too_many_arguments)]
600 fn py_submit_order<'py>(
601 &self,
602 py: Python<'py>,
603 product_type: BybitProductType,
604 trader_id: TraderId,
605 strategy_id: StrategyId,
606 instrument_id: InstrumentId,
607 client_order_id: ClientOrderId,
608 order_side: OrderSide,
609 order_type: OrderType,
610 quantity: Quantity,
611 is_quote_quantity: bool,
612 time_in_force: Option<TimeInForce>,
613 price: Option<Price>,
614 trigger_price: Option<Price>,
615 post_only: Option<bool>,
616 reduce_only: Option<bool>,
617 is_leverage: bool,
618 ) -> PyResult<Bound<'py, PyAny>> {
619 let client = self.clone();
620
621 pyo3_async_runtimes::tokio::future_into_py(py, async move {
622 client
623 .submit_order(
624 product_type,
625 trader_id,
626 strategy_id,
627 instrument_id,
628 client_order_id,
629 order_side,
630 order_type,
631 quantity,
632 is_quote_quantity,
633 time_in_force,
634 price,
635 trigger_price,
636 post_only,
637 reduce_only,
638 is_leverage,
639 )
640 .await
641 .map_err(to_pyruntime_err)?;
642 Ok(())
643 })
644 }
645
646 #[pyo3(name = "modify_order")]
647 #[pyo3(signature = (
648 product_type,
649 trader_id,
650 strategy_id,
651 instrument_id,
652 client_order_id,
653 venue_order_id=None,
654 quantity=None,
655 price=None,
656 ))]
657 #[allow(clippy::too_many_arguments)]
658 fn py_modify_order<'py>(
659 &self,
660 py: Python<'py>,
661 product_type: BybitProductType,
662 trader_id: TraderId,
663 strategy_id: StrategyId,
664 instrument_id: InstrumentId,
665 client_order_id: ClientOrderId,
666 venue_order_id: Option<VenueOrderId>,
667 quantity: Option<Quantity>,
668 price: Option<Price>,
669 ) -> PyResult<Bound<'py, PyAny>> {
670 let client = self.clone();
671
672 pyo3_async_runtimes::tokio::future_into_py(py, async move {
673 client
674 .modify_order(
675 product_type,
676 trader_id,
677 strategy_id,
678 instrument_id,
679 client_order_id,
680 venue_order_id,
681 quantity,
682 price,
683 )
684 .await
685 .map_err(to_pyruntime_err)?;
686 Ok(())
687 })
688 }
689
690 #[pyo3(name = "cancel_order")]
691 #[pyo3(signature = (
692 product_type,
693 trader_id,
694 strategy_id,
695 instrument_id,
696 client_order_id,
697 venue_order_id=None,
698 ))]
699 #[allow(clippy::too_many_arguments)]
700 fn py_cancel_order<'py>(
701 &self,
702 py: Python<'py>,
703 product_type: BybitProductType,
704 trader_id: TraderId,
705 strategy_id: StrategyId,
706 instrument_id: InstrumentId,
707 client_order_id: ClientOrderId,
708 venue_order_id: Option<VenueOrderId>,
709 ) -> PyResult<Bound<'py, PyAny>> {
710 let client = self.clone();
711
712 pyo3_async_runtimes::tokio::future_into_py(py, async move {
713 client
714 .cancel_order_by_id(
715 product_type,
716 trader_id,
717 strategy_id,
718 instrument_id,
719 client_order_id,
720 venue_order_id,
721 )
722 .await
723 .map_err(to_pyruntime_err)?;
724 Ok(())
725 })
726 }
727
728 #[pyo3(name = "build_place_order_params")]
729 #[pyo3(signature = (
730 product_type,
731 instrument_id,
732 client_order_id,
733 order_side,
734 order_type,
735 quantity,
736 is_quote_quantity=false,
737 time_in_force=None,
738 price=None,
739 trigger_price=None,
740 post_only=None,
741 reduce_only=None,
742 is_leverage=false,
743 ))]
744 #[allow(clippy::too_many_arguments)]
745 fn py_build_place_order_params(
746 &self,
747 product_type: BybitProductType,
748 instrument_id: InstrumentId,
749 client_order_id: ClientOrderId,
750 order_side: OrderSide,
751 order_type: OrderType,
752 quantity: Quantity,
753 is_quote_quantity: bool,
754 time_in_force: Option<TimeInForce>,
755 price: Option<Price>,
756 trigger_price: Option<Price>,
757 post_only: Option<bool>,
758 reduce_only: Option<bool>,
759 is_leverage: bool,
760 ) -> PyResult<BybitWsPlaceOrderParams> {
761 let params = self
762 .build_place_order_params(
763 product_type,
764 instrument_id,
765 client_order_id,
766 order_side,
767 order_type,
768 quantity,
769 is_quote_quantity,
770 time_in_force,
771 price,
772 trigger_price,
773 post_only,
774 reduce_only,
775 is_leverage,
776 )
777 .map_err(to_pyruntime_err)?;
778 Ok(params.into())
779 }
780
781 #[pyo3(name = "batch_cancel_orders")]
782 #[pyo3(signature = (
783 product_type,
784 trader_id,
785 strategy_id,
786 instrument_ids,
787 venue_order_ids,
788 client_order_ids,
789 ))]
790 #[allow(clippy::too_many_arguments)]
791 fn py_batch_cancel_orders<'py>(
792 &self,
793 py: Python<'py>,
794 product_type: BybitProductType,
795 trader_id: TraderId,
796 strategy_id: StrategyId,
797 instrument_ids: Vec<InstrumentId>,
798 venue_order_ids: Vec<Option<VenueOrderId>>,
799 client_order_ids: Vec<Option<ClientOrderId>>,
800 ) -> PyResult<Bound<'py, PyAny>> {
801 let client = self.clone();
802
803 pyo3_async_runtimes::tokio::future_into_py(py, async move {
804 client
805 .batch_cancel_orders_by_id(
806 product_type,
807 trader_id,
808 strategy_id,
809 instrument_ids,
810 venue_order_ids,
811 client_order_ids,
812 )
813 .await
814 .map_err(to_pyruntime_err)?;
815 Ok(())
816 })
817 }
818
819 #[pyo3(name = "build_amend_order_params")]
820 #[allow(clippy::too_many_arguments)]
821 fn py_build_amend_order_params(
822 &self,
823 product_type: BybitProductType,
824 instrument_id: InstrumentId,
825 venue_order_id: Option<VenueOrderId>,
826 client_order_id: Option<ClientOrderId>,
827 quantity: Option<Quantity>,
828 price: Option<Price>,
829 ) -> PyResult<crate::python::params::BybitWsAmendOrderParams> {
830 let params = self
831 .build_amend_order_params(
832 product_type,
833 instrument_id,
834 venue_order_id,
835 client_order_id,
836 quantity,
837 price,
838 )
839 .map_err(to_pyruntime_err)?;
840 Ok(params.into())
841 }
842
843 #[pyo3(name = "batch_modify_orders")]
844 fn py_batch_modify_orders<'py>(
845 &self,
846 py: Python<'py>,
847 trader_id: TraderId,
848 strategy_id: StrategyId,
849 orders: Vec<BybitWsAmendOrderParams>,
850 ) -> PyResult<Bound<'py, PyAny>> {
851 let client = self.clone();
852
853 pyo3_async_runtimes::tokio::future_into_py(py, async move {
854 let order_params: Vec<_> = orders
855 .into_iter()
856 .map(|p| p.try_into())
857 .collect::<Result<Vec<_>, _>>()
858 .map_err(to_pyruntime_err)?;
859
860 client
861 .batch_amend_orders(trader_id, strategy_id, order_params)
862 .await
863 .map_err(to_pyruntime_err)?;
864
865 Ok(())
866 })
867 }
868
869 #[pyo3(name = "batch_place_orders")]
870 fn py_batch_place_orders<'py>(
871 &self,
872 py: Python<'py>,
873 trader_id: TraderId,
874 strategy_id: StrategyId,
875 orders: Vec<BybitWsPlaceOrderParams>,
876 ) -> PyResult<Bound<'py, PyAny>> {
877 let client = self.clone();
878
879 pyo3_async_runtimes::tokio::future_into_py(py, async move {
880 let order_params: Vec<_> = orders
881 .into_iter()
882 .map(|p| p.try_into())
883 .collect::<Result<Vec<_>, _>>()
884 .map_err(to_pyruntime_err)?;
885
886 client
887 .batch_place_orders(trader_id, strategy_id, order_params)
888 .await
889 .map_err(to_pyruntime_err)?;
890
891 Ok(())
892 })
893 }
894}
895
896fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
897 if let Err(e) = callback.call1(py, (py_obj,)) {
898 tracing::error!("Error calling Python callback: {e}");
899 }
900}
901
902fn call_python_with_data<F>(callback: &Py<PyAny>, data_fn: F)
903where
904 F: FnOnce(Python<'_>) -> PyResult<Bound<'_, PyAny>> + Send + 'static,
905{
906 Python::attach(|py| match data_fn(py) {
907 Ok(data) => {
908 if let Err(e) = callback.call1(py, (data,)) {
909 tracing::error!("Error calling Python callback: {e}");
910 }
911 }
912 Err(e) => {
913 tracing::error!("Error converting data to Python: {e}");
914 }
915 });
916}