1use futures_util::StreamExt;
19use nautilus_core::python::to_pyruntime_err;
20use nautilus_model::{
21 data::{Data, OrderBookDeltas_API},
22 enums::{OrderSide, OrderType, TimeInForce},
23 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
24 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
25 types::{Price, Quantity},
26};
27use pyo3::{IntoPyObjectExt, prelude::*};
28
29use crate::{
30 common::enums::{BybitEnvironment, BybitProductType},
31 python::params::{BybitWsAmendOrderParams, BybitWsCancelOrderParams, BybitWsPlaceOrderParams},
32 websocket::{
33 client::BybitWebSocketClient,
34 messages::{BybitWebSocketError, NautilusWsMessage},
35 },
36};
37
38#[pymethods]
39impl BybitWebSocketError {
40 fn __repr__(&self) -> String {
41 format!(
42 "BybitWebSocketError(code={}, message='{}', conn_id={:?}, topic={:?})",
43 self.code, self.message, self.conn_id, self.topic
44 )
45 }
46
47 #[getter]
48 pub fn code(&self) -> i64 {
49 self.code
50 }
51
52 #[getter]
53 pub fn message(&self) -> &str {
54 &self.message
55 }
56
57 #[getter]
58 pub fn conn_id(&self) -> Option<&str> {
59 self.conn_id.as_deref()
60 }
61
62 #[getter]
63 pub fn topic(&self) -> Option<&str> {
64 self.topic.as_deref()
65 }
66
67 #[getter]
68 pub fn req_id(&self) -> Option<&str> {
69 self.req_id.as_deref()
70 }
71}
72
73#[pymethods]
74impl BybitWebSocketClient {
75 #[staticmethod]
76 #[pyo3(name = "new_public")]
77 #[pyo3(signature = (product_type, environment, url=None, heartbeat=None))]
78 fn py_new_public(
79 product_type: BybitProductType,
80 environment: BybitEnvironment,
81 url: Option<String>,
82 heartbeat: Option<u64>,
83 ) -> Self {
84 Self::new_public_with(product_type, environment, url, heartbeat)
85 }
86
87 #[staticmethod]
88 #[pyo3(name = "new_private")]
89 #[pyo3(signature = (environment, api_key=None, api_secret=None, url=None, heartbeat=None))]
90 fn py_new_private(
91 environment: BybitEnvironment,
92 api_key: Option<String>,
93 api_secret: Option<String>,
94 url: Option<String>,
95 heartbeat: Option<u64>,
96 ) -> Self {
97 Self::new_private(environment, api_key, api_secret, url, heartbeat)
98 }
99
100 #[staticmethod]
101 #[pyo3(name = "new_trade")]
102 #[pyo3(signature = (environment, api_key=None, api_secret=None, url=None, heartbeat=None))]
103 fn py_new_trade(
104 environment: BybitEnvironment,
105 api_key: Option<String>,
106 api_secret: Option<String>,
107 url: Option<String>,
108 heartbeat: Option<u64>,
109 ) -> Self {
110 Self::new_trade(environment, api_key, api_secret, url, heartbeat)
111 }
112
113 #[getter]
114 #[pyo3(name = "api_key_masked")]
115 #[must_use]
116 pub fn py_api_key_masked(&self) -> Option<String> {
117 self.credential().map(|c| c.api_key_masked())
118 }
119
120 #[pyo3(name = "is_active")]
121 fn py_is_active(&self) -> bool {
122 self.is_active()
123 }
124
125 #[pyo3(name = "is_closed")]
126 fn py_is_closed(&self) -> bool {
127 self.is_closed()
128 }
129
130 #[pyo3(name = "subscription_count")]
131 fn py_subscription_count(&self) -> usize {
132 self.subscription_count()
133 }
134
135 #[pyo3(name = "cache_instrument")]
136 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
137 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
138 Ok(())
139 }
140
141 #[pyo3(name = "set_account_id")]
142 fn py_set_account_id(&mut self, account_id: AccountId) {
143 self.set_account_id(account_id);
144 }
145
146 #[pyo3(name = "set_mm_level")]
147 fn py_set_mm_level(&self, mm_level: u8) {
148 self.set_mm_level(mm_level);
149 }
150
151 #[pyo3(name = "connect")]
152 fn py_connect<'py>(
153 &mut self,
154 py: Python<'py>,
155 callback: Py<PyAny>,
156 ) -> PyResult<Bound<'py, PyAny>> {
157 let mut client = self.clone();
158
159 pyo3_async_runtimes::tokio::future_into_py(py, async move {
160 client.connect().await.map_err(to_pyruntime_err)?;
161
162 let stream = client.stream();
163
164 tokio::spawn(async move {
165 tokio::pin!(stream);
166
167 while let Some(msg) = stream.next().await {
168 match msg {
169 NautilusWsMessage::Data(data_vec) => {
170 Python::attach(|py| {
171 for data in data_vec {
172 let py_obj = data_to_pycapsule(py, data);
173 call_python(py, &callback, py_obj);
174 }
175 });
176 }
177 NautilusWsMessage::Deltas(deltas) => {
178 Python::attach(|py| {
179 let py_obj = data_to_pycapsule(
180 py,
181 Data::Deltas(OrderBookDeltas_API::new(deltas)),
182 );
183 call_python(py, &callback, py_obj);
184 });
185 }
186 NautilusWsMessage::FundingRates(rates) => {
187 for rate in rates {
188 call_python_with_data(&callback, move |py| {
189 rate.into_py_any(py).map(|obj| obj.into_bound(py))
190 });
191 }
192 }
193 NautilusWsMessage::OrderStatusReports(reports) => {
194 for report in reports {
195 call_python_with_data(&callback, move |py| {
196 report.into_py_any(py).map(|obj| obj.into_bound(py))
197 });
198 }
199 }
200 NautilusWsMessage::FillReports(reports) => {
201 for report in reports {
202 call_python_with_data(&callback, move |py| {
203 report.into_py_any(py).map(|obj| obj.into_bound(py))
204 });
205 }
206 }
207 NautilusWsMessage::PositionStatusReport(report) => {
208 call_python_with_data(&callback, move |py| {
209 report.into_py_any(py).map(|obj| obj.into_bound(py))
210 });
211 }
212 NautilusWsMessage::AccountState(state) => {
213 call_python_with_data(&callback, move |py| {
214 state.into_py_any(py).map(|obj| obj.into_bound(py))
215 });
216 }
217 NautilusWsMessage::OrderRejected(event) => {
218 call_python_with_data(&callback, move |py| {
219 event.into_py_any(py).map(|obj| obj.into_bound(py))
220 });
221 }
222 NautilusWsMessage::OrderCancelRejected(event) => {
223 call_python_with_data(&callback, move |py| {
224 event.into_py_any(py).map(|obj| obj.into_bound(py))
225 });
226 }
227 NautilusWsMessage::OrderModifyRejected(event) => {
228 call_python_with_data(&callback, move |py| {
229 event.into_py_any(py).map(|obj| obj.into_bound(py))
230 });
231 }
232 NautilusWsMessage::Error(err) => {
233 call_python_with_data(&callback, move |py| {
234 err.into_py_any(py).map(|obj| obj.into_bound(py))
235 });
236 }
237 NautilusWsMessage::Reconnected => {
238 tracing::info!("WebSocket reconnected");
239 }
240 NautilusWsMessage::Authenticated => {
241 tracing::info!("WebSocket authenticated");
242 }
243 }
244 }
245 });
246
247 Ok(())
248 })
249 }
250
251 #[pyo3(name = "close")]
252 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
253 let mut client = self.clone();
254
255 pyo3_async_runtimes::tokio::future_into_py(py, async move {
256 if let Err(e) = client.close().await {
257 tracing::error!("Error on close: {e}");
258 }
259 Ok(())
260 })
261 }
262
263 #[pyo3(name = "subscribe")]
264 fn py_subscribe<'py>(
265 &self,
266 py: Python<'py>,
267 topics: Vec<String>,
268 ) -> PyResult<Bound<'py, PyAny>> {
269 let client = self.clone();
270
271 pyo3_async_runtimes::tokio::future_into_py(py, async move {
272 client.subscribe(topics).await.map_err(to_pyruntime_err)?;
273 Ok(())
274 })
275 }
276
277 #[pyo3(name = "unsubscribe")]
278 fn py_unsubscribe<'py>(
279 &self,
280 py: Python<'py>,
281 topics: Vec<String>,
282 ) -> PyResult<Bound<'py, PyAny>> {
283 let client = self.clone();
284
285 pyo3_async_runtimes::tokio::future_into_py(py, async move {
286 client.unsubscribe(topics).await.map_err(to_pyruntime_err)?;
287 Ok(())
288 })
289 }
290
291 #[pyo3(name = "subscribe_orderbook")]
292 fn py_subscribe_orderbook<'py>(
293 &self,
294 py: Python<'py>,
295 instrument_id: InstrumentId,
296 depth: u32,
297 ) -> PyResult<Bound<'py, PyAny>> {
298 let client = self.clone();
299
300 pyo3_async_runtimes::tokio::future_into_py(py, async move {
301 client
302 .subscribe_orderbook(instrument_id, depth)
303 .await
304 .map_err(to_pyruntime_err)?;
305 Ok(())
306 })
307 }
308
309 #[pyo3(name = "unsubscribe_orderbook")]
310 fn py_unsubscribe_orderbook<'py>(
311 &self,
312 py: Python<'py>,
313 instrument_id: InstrumentId,
314 depth: u32,
315 ) -> PyResult<Bound<'py, PyAny>> {
316 let client = self.clone();
317
318 pyo3_async_runtimes::tokio::future_into_py(py, async move {
319 client
320 .unsubscribe_orderbook(instrument_id, depth)
321 .await
322 .map_err(to_pyruntime_err)?;
323 Ok(())
324 })
325 }
326
327 #[pyo3(name = "subscribe_trades")]
328 fn py_subscribe_trades<'py>(
329 &self,
330 py: Python<'py>,
331 instrument_id: InstrumentId,
332 ) -> PyResult<Bound<'py, PyAny>> {
333 let client = self.clone();
334
335 pyo3_async_runtimes::tokio::future_into_py(py, async move {
336 client
337 .subscribe_trades(instrument_id)
338 .await
339 .map_err(to_pyruntime_err)?;
340 Ok(())
341 })
342 }
343
344 #[pyo3(name = "unsubscribe_trades")]
345 fn py_unsubscribe_trades<'py>(
346 &self,
347 py: Python<'py>,
348 instrument_id: InstrumentId,
349 ) -> PyResult<Bound<'py, PyAny>> {
350 let client = self.clone();
351
352 pyo3_async_runtimes::tokio::future_into_py(py, async move {
353 client
354 .unsubscribe_trades(instrument_id)
355 .await
356 .map_err(to_pyruntime_err)?;
357 Ok(())
358 })
359 }
360
361 #[pyo3(name = "subscribe_ticker")]
362 fn py_subscribe_ticker<'py>(
363 &self,
364 py: Python<'py>,
365 instrument_id: InstrumentId,
366 ) -> PyResult<Bound<'py, PyAny>> {
367 let client = self.clone();
368
369 pyo3_async_runtimes::tokio::future_into_py(py, async move {
370 client
371 .subscribe_ticker(instrument_id)
372 .await
373 .map_err(to_pyruntime_err)?;
374 Ok(())
375 })
376 }
377
378 #[pyo3(name = "unsubscribe_ticker")]
379 fn py_unsubscribe_ticker<'py>(
380 &self,
381 py: Python<'py>,
382 instrument_id: InstrumentId,
383 ) -> PyResult<Bound<'py, PyAny>> {
384 let client = self.clone();
385
386 pyo3_async_runtimes::tokio::future_into_py(py, async move {
387 client
388 .unsubscribe_ticker(instrument_id)
389 .await
390 .map_err(to_pyruntime_err)?;
391 Ok(())
392 })
393 }
394
395 #[pyo3(name = "subscribe_klines")]
396 fn py_subscribe_klines<'py>(
397 &self,
398 py: Python<'py>,
399 instrument_id: InstrumentId,
400 interval: String,
401 ) -> PyResult<Bound<'py, PyAny>> {
402 let client = self.clone();
403
404 pyo3_async_runtimes::tokio::future_into_py(py, async move {
405 client
406 .subscribe_klines(instrument_id, interval)
407 .await
408 .map_err(to_pyruntime_err)?;
409 Ok(())
410 })
411 }
412
413 #[pyo3(name = "unsubscribe_klines")]
414 fn py_unsubscribe_klines<'py>(
415 &self,
416 py: Python<'py>,
417 instrument_id: InstrumentId,
418 interval: String,
419 ) -> PyResult<Bound<'py, PyAny>> {
420 let client = self.clone();
421
422 pyo3_async_runtimes::tokio::future_into_py(py, async move {
423 client
424 .unsubscribe_klines(instrument_id, interval)
425 .await
426 .map_err(to_pyruntime_err)?;
427 Ok(())
428 })
429 }
430
431 #[pyo3(name = "subscribe_orders")]
432 fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
433 let client = self.clone();
434
435 pyo3_async_runtimes::tokio::future_into_py(py, async move {
436 client.subscribe_orders().await.map_err(to_pyruntime_err)?;
437 Ok(())
438 })
439 }
440
441 #[pyo3(name = "unsubscribe_orders")]
442 fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
443 let client = self.clone();
444
445 pyo3_async_runtimes::tokio::future_into_py(py, async move {
446 client
447 .unsubscribe_orders()
448 .await
449 .map_err(to_pyruntime_err)?;
450 Ok(())
451 })
452 }
453
454 #[pyo3(name = "subscribe_executions")]
455 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
456 let client = self.clone();
457
458 pyo3_async_runtimes::tokio::future_into_py(py, async move {
459 client
460 .subscribe_executions()
461 .await
462 .map_err(to_pyruntime_err)?;
463 Ok(())
464 })
465 }
466
467 #[pyo3(name = "unsubscribe_executions")]
468 fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
469 let client = self.clone();
470
471 pyo3_async_runtimes::tokio::future_into_py(py, async move {
472 client
473 .unsubscribe_executions()
474 .await
475 .map_err(to_pyruntime_err)?;
476 Ok(())
477 })
478 }
479
480 #[pyo3(name = "subscribe_positions")]
481 fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
482 let client = self.clone();
483
484 pyo3_async_runtimes::tokio::future_into_py(py, async move {
485 client
486 .subscribe_positions()
487 .await
488 .map_err(to_pyruntime_err)?;
489 Ok(())
490 })
491 }
492
493 #[pyo3(name = "unsubscribe_positions")]
494 fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
495 let client = self.clone();
496
497 pyo3_async_runtimes::tokio::future_into_py(py, async move {
498 client
499 .unsubscribe_positions()
500 .await
501 .map_err(to_pyruntime_err)?;
502 Ok(())
503 })
504 }
505
506 #[pyo3(name = "subscribe_wallet")]
507 fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
508 let client = self.clone();
509
510 pyo3_async_runtimes::tokio::future_into_py(py, async move {
511 client.subscribe_wallet().await.map_err(to_pyruntime_err)?;
512 Ok(())
513 })
514 }
515
516 #[pyo3(name = "unsubscribe_wallet")]
517 fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
518 let client = self.clone();
519
520 pyo3_async_runtimes::tokio::future_into_py(py, async move {
521 client
522 .unsubscribe_wallet()
523 .await
524 .map_err(to_pyruntime_err)?;
525 Ok(())
526 })
527 }
528
529 #[pyo3(name = "wait_until_active")]
530 fn py_wait_until_active<'py>(
531 &self,
532 py: Python<'py>,
533 timeout_secs: f64,
534 ) -> PyResult<Bound<'py, PyAny>> {
535 let client = self.clone();
536
537 pyo3_async_runtimes::tokio::future_into_py(py, async move {
538 client
539 .wait_until_active(timeout_secs)
540 .await
541 .map_err(to_pyruntime_err)?;
542 Ok(())
543 })
544 }
545
546 #[pyo3(name = "submit_order")]
547 #[pyo3(signature = (
548 product_type,
549 trader_id,
550 strategy_id,
551 instrument_id,
552 client_order_id,
553 order_side,
554 order_type,
555 quantity,
556 is_quote_quantity=false,
557 time_in_force=None,
558 price=None,
559 trigger_price=None,
560 post_only=None,
561 reduce_only=None,
562 is_leverage=false,
563 ))]
564 #[allow(clippy::too_many_arguments)]
565 fn py_submit_order<'py>(
566 &self,
567 py: Python<'py>,
568 product_type: BybitProductType,
569 trader_id: TraderId,
570 strategy_id: StrategyId,
571 instrument_id: InstrumentId,
572 client_order_id: ClientOrderId,
573 order_side: OrderSide,
574 order_type: OrderType,
575 quantity: Quantity,
576 is_quote_quantity: bool,
577 time_in_force: Option<TimeInForce>,
578 price: Option<Price>,
579 trigger_price: Option<Price>,
580 post_only: Option<bool>,
581 reduce_only: Option<bool>,
582 is_leverage: bool,
583 ) -> PyResult<Bound<'py, PyAny>> {
584 let client = self.clone();
585
586 pyo3_async_runtimes::tokio::future_into_py(py, async move {
587 client
588 .submit_order(
589 product_type,
590 trader_id,
591 strategy_id,
592 instrument_id,
593 client_order_id,
594 order_side,
595 order_type,
596 quantity,
597 is_quote_quantity,
598 time_in_force,
599 price,
600 trigger_price,
601 post_only,
602 reduce_only,
603 is_leverage,
604 )
605 .await
606 .map_err(to_pyruntime_err)?;
607 Ok(())
608 })
609 }
610
611 #[pyo3(name = "modify_order")]
612 #[pyo3(signature = (
613 product_type,
614 trader_id,
615 strategy_id,
616 instrument_id,
617 client_order_id,
618 venue_order_id=None,
619 quantity=None,
620 price=None,
621 ))]
622 #[allow(clippy::too_many_arguments)]
623 fn py_modify_order<'py>(
624 &self,
625 py: Python<'py>,
626 product_type: BybitProductType,
627 trader_id: TraderId,
628 strategy_id: StrategyId,
629 instrument_id: InstrumentId,
630 client_order_id: ClientOrderId,
631 venue_order_id: Option<VenueOrderId>,
632 quantity: Option<Quantity>,
633 price: Option<Price>,
634 ) -> PyResult<Bound<'py, PyAny>> {
635 let client = self.clone();
636
637 pyo3_async_runtimes::tokio::future_into_py(py, async move {
638 client
639 .modify_order(
640 product_type,
641 trader_id,
642 strategy_id,
643 instrument_id,
644 client_order_id,
645 venue_order_id,
646 quantity,
647 price,
648 )
649 .await
650 .map_err(to_pyruntime_err)?;
651 Ok(())
652 })
653 }
654
655 #[pyo3(name = "cancel_order")]
656 #[pyo3(signature = (
657 product_type,
658 trader_id,
659 strategy_id,
660 instrument_id,
661 client_order_id,
662 venue_order_id=None,
663 ))]
664 #[allow(clippy::too_many_arguments)]
665 fn py_cancel_order<'py>(
666 &self,
667 py: Python<'py>,
668 product_type: BybitProductType,
669 trader_id: TraderId,
670 strategy_id: StrategyId,
671 instrument_id: InstrumentId,
672 client_order_id: ClientOrderId,
673 venue_order_id: Option<VenueOrderId>,
674 ) -> PyResult<Bound<'py, PyAny>> {
675 let client = self.clone();
676
677 pyo3_async_runtimes::tokio::future_into_py(py, async move {
678 client
679 .cancel_order_by_id(
680 product_type,
681 trader_id,
682 strategy_id,
683 instrument_id,
684 client_order_id,
685 venue_order_id,
686 )
687 .await
688 .map_err(to_pyruntime_err)?;
689 Ok(())
690 })
691 }
692
693 #[pyo3(name = "build_place_order_params")]
694 #[pyo3(signature = (
695 product_type,
696 instrument_id,
697 client_order_id,
698 order_side,
699 order_type,
700 quantity,
701 is_quote_quantity=false,
702 time_in_force=None,
703 price=None,
704 trigger_price=None,
705 post_only=None,
706 reduce_only=None,
707 is_leverage=false,
708 ))]
709 #[allow(clippy::too_many_arguments)]
710 fn py_build_place_order_params(
711 &self,
712 product_type: BybitProductType,
713 instrument_id: InstrumentId,
714 client_order_id: ClientOrderId,
715 order_side: OrderSide,
716 order_type: OrderType,
717 quantity: Quantity,
718 is_quote_quantity: bool,
719 time_in_force: Option<TimeInForce>,
720 price: Option<Price>,
721 trigger_price: Option<Price>,
722 post_only: Option<bool>,
723 reduce_only: Option<bool>,
724 is_leverage: bool,
725 ) -> PyResult<BybitWsPlaceOrderParams> {
726 let params = self
727 .build_place_order_params(
728 product_type,
729 instrument_id,
730 client_order_id,
731 order_side,
732 order_type,
733 quantity,
734 is_quote_quantity,
735 time_in_force,
736 price,
737 trigger_price,
738 post_only,
739 reduce_only,
740 is_leverage,
741 )
742 .map_err(to_pyruntime_err)?;
743 Ok(params.into())
744 }
745
746 #[pyo3(name = "batch_cancel_orders")]
747 fn py_batch_cancel_orders<'py>(
748 &self,
749 py: Python<'py>,
750 trader_id: TraderId,
751 strategy_id: StrategyId,
752 orders: Vec<BybitWsCancelOrderParams>,
753 ) -> PyResult<Bound<'py, PyAny>> {
754 let client = self.clone();
755
756 pyo3_async_runtimes::tokio::future_into_py(py, async move {
757 let order_params: Vec<_> = orders
758 .into_iter()
759 .map(|p| p.try_into())
760 .collect::<Result<Vec<_>, _>>()
761 .map_err(to_pyruntime_err)?;
762
763 client
764 .batch_cancel_orders(trader_id, strategy_id, order_params)
765 .await
766 .map_err(to_pyruntime_err)?;
767
768 Ok(())
769 })
770 }
771
772 #[pyo3(name = "build_amend_order_params")]
773 #[allow(clippy::too_many_arguments)]
774 fn py_build_amend_order_params(
775 &self,
776 product_type: BybitProductType,
777 instrument_id: InstrumentId,
778 venue_order_id: Option<VenueOrderId>,
779 client_order_id: Option<ClientOrderId>,
780 quantity: Option<Quantity>,
781 price: Option<Price>,
782 ) -> PyResult<crate::python::params::BybitWsAmendOrderParams> {
783 let params = self
784 .build_amend_order_params(
785 product_type,
786 instrument_id,
787 venue_order_id,
788 client_order_id,
789 quantity,
790 price,
791 )
792 .map_err(to_pyruntime_err)?;
793 Ok(params.into())
794 }
795
796 #[pyo3(name = "build_cancel_order_params")]
797 fn py_build_cancel_order_params(
798 &self,
799 product_type: BybitProductType,
800 instrument_id: InstrumentId,
801 venue_order_id: Option<VenueOrderId>,
802 client_order_id: Option<ClientOrderId>,
803 ) -> PyResult<crate::python::params::BybitWsCancelOrderParams> {
804 let params = self
805 .build_cancel_order_params(product_type, instrument_id, venue_order_id, client_order_id)
806 .map_err(to_pyruntime_err)?;
807 Ok(params.into())
808 }
809
810 #[pyo3(name = "batch_modify_orders")]
811 fn py_batch_modify_orders<'py>(
812 &self,
813 py: Python<'py>,
814 trader_id: TraderId,
815 strategy_id: StrategyId,
816 orders: Vec<BybitWsAmendOrderParams>,
817 ) -> PyResult<Bound<'py, PyAny>> {
818 let client = self.clone();
819
820 pyo3_async_runtimes::tokio::future_into_py(py, async move {
821 let order_params: Vec<_> = orders
822 .into_iter()
823 .map(|p| p.try_into())
824 .collect::<Result<Vec<_>, _>>()
825 .map_err(to_pyruntime_err)?;
826
827 client
828 .batch_amend_orders(trader_id, strategy_id, order_params)
829 .await
830 .map_err(to_pyruntime_err)?;
831
832 Ok(())
833 })
834 }
835
836 #[pyo3(name = "batch_place_orders")]
837 fn py_batch_place_orders<'py>(
838 &self,
839 py: Python<'py>,
840 trader_id: TraderId,
841 strategy_id: StrategyId,
842 orders: Vec<BybitWsPlaceOrderParams>,
843 ) -> PyResult<Bound<'py, PyAny>> {
844 let client = self.clone();
845
846 pyo3_async_runtimes::tokio::future_into_py(py, async move {
847 let order_params: Vec<_> = orders
848 .into_iter()
849 .map(|p| p.try_into())
850 .collect::<Result<Vec<_>, _>>()
851 .map_err(to_pyruntime_err)?;
852
853 client
854 .batch_place_orders(trader_id, strategy_id, order_params)
855 .await
856 .map_err(to_pyruntime_err)?;
857
858 Ok(())
859 })
860 }
861}
862
863fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
864 if let Err(e) = callback.call1(py, (py_obj,)) {
865 tracing::error!("Error calling Python callback: {e}");
866 }
867}
868
869fn call_python_with_data<F>(callback: &Py<PyAny>, data_fn: F)
870where
871 F: FnOnce(Python<'_>) -> PyResult<Bound<'_, PyAny>> + Send + 'static,
872{
873 Python::attach(|py| match data_fn(py) {
874 Ok(data) => {
875 if let Err(e) = callback.call1(py, (data,)) {
876 tracing::error!("Error calling Python callback: {e}");
877 }
878 }
879 Err(e) => {
880 tracing::error!("Error converting data to Python: {e}");
881 }
882 });
883}