1use futures_util::StreamExt;
19use nautilus_common::live::get_runtime;
20use nautilus_core::python::to_pyruntime_err;
21use nautilus_model::{
22 data::{Data, OrderBookDeltas_API},
23 enums::{OrderSide, OrderType, TimeInForce},
24 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
25 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
26 types::{Price, Quantity},
27};
28use pyo3::{IntoPyObjectExt, prelude::*};
29
30use crate::{
31 common::enums::{BybitEnvironment, BybitProductType},
32 python::params::{BybitWsAmendOrderParams, BybitWsCancelOrderParams, BybitWsPlaceOrderParams},
33 websocket::{
34 client::BybitWebSocketClient,
35 messages::{BybitWebSocketError, NautilusWsMessage},
36 },
37};
38
39#[pymethods]
40impl BybitWebSocketError {
41 fn __repr__(&self) -> String {
42 format!(
43 "BybitWebSocketError(code={}, message='{}', conn_id={:?}, topic={:?})",
44 self.code, self.message, self.conn_id, self.topic
45 )
46 }
47
48 #[getter]
49 pub fn code(&self) -> i64 {
50 self.code
51 }
52
53 #[getter]
54 pub fn message(&self) -> &str {
55 &self.message
56 }
57
58 #[getter]
59 pub fn conn_id(&self) -> Option<&str> {
60 self.conn_id.as_deref()
61 }
62
63 #[getter]
64 pub fn topic(&self) -> Option<&str> {
65 self.topic.as_deref()
66 }
67
68 #[getter]
69 pub fn req_id(&self) -> Option<&str> {
70 self.req_id.as_deref()
71 }
72}
73
74#[pymethods]
75impl BybitWebSocketClient {
76 #[staticmethod]
77 #[pyo3(name = "new_public")]
78 #[pyo3(signature = (product_type, environment, url=None, heartbeat=None))]
79 fn py_new_public(
80 product_type: BybitProductType,
81 environment: BybitEnvironment,
82 url: Option<String>,
83 heartbeat: Option<u64>,
84 ) -> Self {
85 Self::new_public_with(product_type, environment, url, heartbeat)
86 }
87
88 #[staticmethod]
89 #[pyo3(name = "new_private")]
90 #[pyo3(signature = (environment, api_key=None, api_secret=None, url=None, heartbeat=None))]
91 fn py_new_private(
92 environment: BybitEnvironment,
93 api_key: Option<String>,
94 api_secret: Option<String>,
95 url: Option<String>,
96 heartbeat: Option<u64>,
97 ) -> Self {
98 Self::new_private(environment, api_key, api_secret, url, heartbeat)
99 }
100
101 #[staticmethod]
102 #[pyo3(name = "new_trade")]
103 #[pyo3(signature = (environment, api_key=None, api_secret=None, url=None, heartbeat=None))]
104 fn py_new_trade(
105 environment: BybitEnvironment,
106 api_key: Option<String>,
107 api_secret: Option<String>,
108 url: Option<String>,
109 heartbeat: Option<u64>,
110 ) -> Self {
111 Self::new_trade(environment, api_key, api_secret, url, heartbeat)
112 }
113
114 #[getter]
115 #[pyo3(name = "api_key_masked")]
116 #[must_use]
117 pub fn py_api_key_masked(&self) -> Option<String> {
118 self.credential().map(|c| c.api_key_masked())
119 }
120
121 #[pyo3(name = "is_active")]
122 fn py_is_active(&self) -> bool {
123 self.is_active()
124 }
125
126 #[pyo3(name = "is_closed")]
127 fn py_is_closed(&self) -> bool {
128 self.is_closed()
129 }
130
131 #[pyo3(name = "subscription_count")]
132 fn py_subscription_count(&self) -> usize {
133 self.subscription_count()
134 }
135
136 #[pyo3(name = "cache_instrument")]
137 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
138 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
139 Ok(())
140 }
141
142 #[pyo3(name = "set_account_id")]
143 fn py_set_account_id(&mut self, account_id: AccountId) {
144 self.set_account_id(account_id);
145 }
146
147 #[pyo3(name = "set_mm_level")]
148 fn py_set_mm_level(&self, mm_level: u8) {
149 self.set_mm_level(mm_level);
150 }
151
152 #[pyo3(name = "set_bars_timestamp_on_close")]
153 fn py_set_bars_timestamp_on_close(&mut self, value: bool) {
154 self.set_bars_timestamp_on_close(value);
155 }
156
157 #[pyo3(name = "connect")]
158 fn py_connect<'py>(
159 &mut self,
160 py: Python<'py>,
161 callback: Py<PyAny>,
162 ) -> PyResult<Bound<'py, PyAny>> {
163 let mut client = self.clone();
164
165 pyo3_async_runtimes::tokio::future_into_py(py, async move {
166 client.connect().await.map_err(to_pyruntime_err)?;
167
168 let stream = client.stream();
169
170 get_runtime().spawn(async move {
171 tokio::pin!(stream);
172
173 while let Some(msg) = stream.next().await {
174 match msg {
175 NautilusWsMessage::Data(data_vec) => {
176 Python::attach(|py| {
177 for data in data_vec {
178 let py_obj = data_to_pycapsule(py, data);
179 call_python(py, &callback, py_obj);
180 }
181 });
182 }
183 NautilusWsMessage::Deltas(deltas) => {
184 Python::attach(|py| {
185 let py_obj = data_to_pycapsule(
186 py,
187 Data::Deltas(OrderBookDeltas_API::new(deltas)),
188 );
189 call_python(py, &callback, py_obj);
190 });
191 }
192 NautilusWsMessage::FundingRates(rates) => {
193 for rate in rates {
194 call_python_with_data(&callback, move |py| {
195 rate.into_py_any(py).map(|obj| obj.into_bound(py))
196 });
197 }
198 }
199 NautilusWsMessage::OrderStatusReports(reports) => {
200 for report in reports {
201 call_python_with_data(&callback, move |py| {
202 report.into_py_any(py).map(|obj| obj.into_bound(py))
203 });
204 }
205 }
206 NautilusWsMessage::FillReports(reports) => {
207 for report in reports {
208 call_python_with_data(&callback, move |py| {
209 report.into_py_any(py).map(|obj| obj.into_bound(py))
210 });
211 }
212 }
213 NautilusWsMessage::PositionStatusReport(report) => {
214 call_python_with_data(&callback, move |py| {
215 report.into_py_any(py).map(|obj| obj.into_bound(py))
216 });
217 }
218 NautilusWsMessage::AccountState(state) => {
219 call_python_with_data(&callback, move |py| {
220 state.into_py_any(py).map(|obj| obj.into_bound(py))
221 });
222 }
223 NautilusWsMessage::OrderRejected(event) => {
224 call_python_with_data(&callback, move |py| {
225 event.into_py_any(py).map(|obj| obj.into_bound(py))
226 });
227 }
228 NautilusWsMessage::OrderCancelRejected(event) => {
229 call_python_with_data(&callback, move |py| {
230 event.into_py_any(py).map(|obj| obj.into_bound(py))
231 });
232 }
233 NautilusWsMessage::OrderModifyRejected(event) => {
234 call_python_with_data(&callback, move |py| {
235 event.into_py_any(py).map(|obj| obj.into_bound(py))
236 });
237 }
238 NautilusWsMessage::Error(err) => {
239 call_python_with_data(&callback, move |py| {
240 err.into_py_any(py).map(|obj| obj.into_bound(py))
241 });
242 }
243 NautilusWsMessage::Reconnected => {
244 tracing::info!("WebSocket reconnected");
245 }
246 NautilusWsMessage::Authenticated => {
247 tracing::info!("WebSocket authenticated");
248 }
249 }
250 }
251 });
252
253 Ok(())
254 })
255 }
256
257 #[pyo3(name = "close")]
258 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
259 let mut client = self.clone();
260
261 pyo3_async_runtimes::tokio::future_into_py(py, async move {
262 if let Err(e) = client.close().await {
263 tracing::error!("Error on close: {e}");
264 }
265 Ok(())
266 })
267 }
268
269 #[pyo3(name = "subscribe")]
270 fn py_subscribe<'py>(
271 &self,
272 py: Python<'py>,
273 topics: Vec<String>,
274 ) -> PyResult<Bound<'py, PyAny>> {
275 let client = self.clone();
276
277 pyo3_async_runtimes::tokio::future_into_py(py, async move {
278 client.subscribe(topics).await.map_err(to_pyruntime_err)?;
279 Ok(())
280 })
281 }
282
283 #[pyo3(name = "unsubscribe")]
284 fn py_unsubscribe<'py>(
285 &self,
286 py: Python<'py>,
287 topics: Vec<String>,
288 ) -> PyResult<Bound<'py, PyAny>> {
289 let client = self.clone();
290
291 pyo3_async_runtimes::tokio::future_into_py(py, async move {
292 client.unsubscribe(topics).await.map_err(to_pyruntime_err)?;
293 Ok(())
294 })
295 }
296
297 #[pyo3(name = "subscribe_orderbook")]
298 fn py_subscribe_orderbook<'py>(
299 &self,
300 py: Python<'py>,
301 instrument_id: InstrumentId,
302 depth: u32,
303 ) -> PyResult<Bound<'py, PyAny>> {
304 let client = self.clone();
305
306 pyo3_async_runtimes::tokio::future_into_py(py, async move {
307 client
308 .subscribe_orderbook(instrument_id, depth)
309 .await
310 .map_err(to_pyruntime_err)?;
311 Ok(())
312 })
313 }
314
315 #[pyo3(name = "unsubscribe_orderbook")]
316 fn py_unsubscribe_orderbook<'py>(
317 &self,
318 py: Python<'py>,
319 instrument_id: InstrumentId,
320 depth: u32,
321 ) -> PyResult<Bound<'py, PyAny>> {
322 let client = self.clone();
323
324 pyo3_async_runtimes::tokio::future_into_py(py, async move {
325 client
326 .unsubscribe_orderbook(instrument_id, depth)
327 .await
328 .map_err(to_pyruntime_err)?;
329 Ok(())
330 })
331 }
332
333 #[pyo3(name = "subscribe_trades")]
334 fn py_subscribe_trades<'py>(
335 &self,
336 py: Python<'py>,
337 instrument_id: InstrumentId,
338 ) -> PyResult<Bound<'py, PyAny>> {
339 let client = self.clone();
340
341 pyo3_async_runtimes::tokio::future_into_py(py, async move {
342 client
343 .subscribe_trades(instrument_id)
344 .await
345 .map_err(to_pyruntime_err)?;
346 Ok(())
347 })
348 }
349
350 #[pyo3(name = "unsubscribe_trades")]
351 fn py_unsubscribe_trades<'py>(
352 &self,
353 py: Python<'py>,
354 instrument_id: InstrumentId,
355 ) -> PyResult<Bound<'py, PyAny>> {
356 let client = self.clone();
357
358 pyo3_async_runtimes::tokio::future_into_py(py, async move {
359 client
360 .unsubscribe_trades(instrument_id)
361 .await
362 .map_err(to_pyruntime_err)?;
363 Ok(())
364 })
365 }
366
367 #[pyo3(name = "subscribe_ticker")]
368 fn py_subscribe_ticker<'py>(
369 &self,
370 py: Python<'py>,
371 instrument_id: InstrumentId,
372 ) -> PyResult<Bound<'py, PyAny>> {
373 let client = self.clone();
374
375 pyo3_async_runtimes::tokio::future_into_py(py, async move {
376 client
377 .subscribe_ticker(instrument_id)
378 .await
379 .map_err(to_pyruntime_err)?;
380 Ok(())
381 })
382 }
383
384 #[pyo3(name = "unsubscribe_ticker")]
385 fn py_unsubscribe_ticker<'py>(
386 &self,
387 py: Python<'py>,
388 instrument_id: InstrumentId,
389 ) -> PyResult<Bound<'py, PyAny>> {
390 let client = self.clone();
391
392 pyo3_async_runtimes::tokio::future_into_py(py, async move {
393 client
394 .unsubscribe_ticker(instrument_id)
395 .await
396 .map_err(to_pyruntime_err)?;
397 Ok(())
398 })
399 }
400
401 #[pyo3(name = "subscribe_klines")]
402 fn py_subscribe_klines<'py>(
403 &self,
404 py: Python<'py>,
405 instrument_id: InstrumentId,
406 interval: String,
407 ) -> PyResult<Bound<'py, PyAny>> {
408 let client = self.clone();
409
410 pyo3_async_runtimes::tokio::future_into_py(py, async move {
411 client
412 .subscribe_klines(instrument_id, interval)
413 .await
414 .map_err(to_pyruntime_err)?;
415 Ok(())
416 })
417 }
418
419 #[pyo3(name = "unsubscribe_klines")]
420 fn py_unsubscribe_klines<'py>(
421 &self,
422 py: Python<'py>,
423 instrument_id: InstrumentId,
424 interval: String,
425 ) -> PyResult<Bound<'py, PyAny>> {
426 let client = self.clone();
427
428 pyo3_async_runtimes::tokio::future_into_py(py, async move {
429 client
430 .unsubscribe_klines(instrument_id, interval)
431 .await
432 .map_err(to_pyruntime_err)?;
433 Ok(())
434 })
435 }
436
437 #[pyo3(name = "subscribe_orders")]
438 fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
439 let client = self.clone();
440
441 pyo3_async_runtimes::tokio::future_into_py(py, async move {
442 client.subscribe_orders().await.map_err(to_pyruntime_err)?;
443 Ok(())
444 })
445 }
446
447 #[pyo3(name = "unsubscribe_orders")]
448 fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
449 let client = self.clone();
450
451 pyo3_async_runtimes::tokio::future_into_py(py, async move {
452 client
453 .unsubscribe_orders()
454 .await
455 .map_err(to_pyruntime_err)?;
456 Ok(())
457 })
458 }
459
460 #[pyo3(name = "subscribe_executions")]
461 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
462 let client = self.clone();
463
464 pyo3_async_runtimes::tokio::future_into_py(py, async move {
465 client
466 .subscribe_executions()
467 .await
468 .map_err(to_pyruntime_err)?;
469 Ok(())
470 })
471 }
472
473 #[pyo3(name = "unsubscribe_executions")]
474 fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
475 let client = self.clone();
476
477 pyo3_async_runtimes::tokio::future_into_py(py, async move {
478 client
479 .unsubscribe_executions()
480 .await
481 .map_err(to_pyruntime_err)?;
482 Ok(())
483 })
484 }
485
486 #[pyo3(name = "subscribe_positions")]
487 fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
488 let client = self.clone();
489
490 pyo3_async_runtimes::tokio::future_into_py(py, async move {
491 client
492 .subscribe_positions()
493 .await
494 .map_err(to_pyruntime_err)?;
495 Ok(())
496 })
497 }
498
499 #[pyo3(name = "unsubscribe_positions")]
500 fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
501 let client = self.clone();
502
503 pyo3_async_runtimes::tokio::future_into_py(py, async move {
504 client
505 .unsubscribe_positions()
506 .await
507 .map_err(to_pyruntime_err)?;
508 Ok(())
509 })
510 }
511
512 #[pyo3(name = "subscribe_wallet")]
513 fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
514 let client = self.clone();
515
516 pyo3_async_runtimes::tokio::future_into_py(py, async move {
517 client.subscribe_wallet().await.map_err(to_pyruntime_err)?;
518 Ok(())
519 })
520 }
521
522 #[pyo3(name = "unsubscribe_wallet")]
523 fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
524 let client = self.clone();
525
526 pyo3_async_runtimes::tokio::future_into_py(py, async move {
527 client
528 .unsubscribe_wallet()
529 .await
530 .map_err(to_pyruntime_err)?;
531 Ok(())
532 })
533 }
534
535 #[pyo3(name = "wait_until_active")]
536 fn py_wait_until_active<'py>(
537 &self,
538 py: Python<'py>,
539 timeout_secs: f64,
540 ) -> PyResult<Bound<'py, PyAny>> {
541 let client = self.clone();
542
543 pyo3_async_runtimes::tokio::future_into_py(py, async move {
544 client
545 .wait_until_active(timeout_secs)
546 .await
547 .map_err(to_pyruntime_err)?;
548 Ok(())
549 })
550 }
551
552 #[pyo3(name = "submit_order")]
553 #[pyo3(signature = (
554 product_type,
555 trader_id,
556 strategy_id,
557 instrument_id,
558 client_order_id,
559 order_side,
560 order_type,
561 quantity,
562 is_quote_quantity=false,
563 time_in_force=None,
564 price=None,
565 trigger_price=None,
566 post_only=None,
567 reduce_only=None,
568 is_leverage=false,
569 ))]
570 #[allow(clippy::too_many_arguments)]
571 fn py_submit_order<'py>(
572 &self,
573 py: Python<'py>,
574 product_type: BybitProductType,
575 trader_id: TraderId,
576 strategy_id: StrategyId,
577 instrument_id: InstrumentId,
578 client_order_id: ClientOrderId,
579 order_side: OrderSide,
580 order_type: OrderType,
581 quantity: Quantity,
582 is_quote_quantity: bool,
583 time_in_force: Option<TimeInForce>,
584 price: Option<Price>,
585 trigger_price: Option<Price>,
586 post_only: Option<bool>,
587 reduce_only: Option<bool>,
588 is_leverage: bool,
589 ) -> PyResult<Bound<'py, PyAny>> {
590 let client = self.clone();
591
592 pyo3_async_runtimes::tokio::future_into_py(py, async move {
593 client
594 .submit_order(
595 product_type,
596 trader_id,
597 strategy_id,
598 instrument_id,
599 client_order_id,
600 order_side,
601 order_type,
602 quantity,
603 is_quote_quantity,
604 time_in_force,
605 price,
606 trigger_price,
607 post_only,
608 reduce_only,
609 is_leverage,
610 )
611 .await
612 .map_err(to_pyruntime_err)?;
613 Ok(())
614 })
615 }
616
617 #[pyo3(name = "modify_order")]
618 #[pyo3(signature = (
619 product_type,
620 trader_id,
621 strategy_id,
622 instrument_id,
623 client_order_id,
624 venue_order_id=None,
625 quantity=None,
626 price=None,
627 ))]
628 #[allow(clippy::too_many_arguments)]
629 fn py_modify_order<'py>(
630 &self,
631 py: Python<'py>,
632 product_type: BybitProductType,
633 trader_id: TraderId,
634 strategy_id: StrategyId,
635 instrument_id: InstrumentId,
636 client_order_id: ClientOrderId,
637 venue_order_id: Option<VenueOrderId>,
638 quantity: Option<Quantity>,
639 price: Option<Price>,
640 ) -> PyResult<Bound<'py, PyAny>> {
641 let client = self.clone();
642
643 pyo3_async_runtimes::tokio::future_into_py(py, async move {
644 client
645 .modify_order(
646 product_type,
647 trader_id,
648 strategy_id,
649 instrument_id,
650 client_order_id,
651 venue_order_id,
652 quantity,
653 price,
654 )
655 .await
656 .map_err(to_pyruntime_err)?;
657 Ok(())
658 })
659 }
660
661 #[pyo3(name = "cancel_order")]
662 #[pyo3(signature = (
663 product_type,
664 trader_id,
665 strategy_id,
666 instrument_id,
667 client_order_id,
668 venue_order_id=None,
669 ))]
670 #[allow(clippy::too_many_arguments)]
671 fn py_cancel_order<'py>(
672 &self,
673 py: Python<'py>,
674 product_type: BybitProductType,
675 trader_id: TraderId,
676 strategy_id: StrategyId,
677 instrument_id: InstrumentId,
678 client_order_id: ClientOrderId,
679 venue_order_id: Option<VenueOrderId>,
680 ) -> PyResult<Bound<'py, PyAny>> {
681 let client = self.clone();
682
683 pyo3_async_runtimes::tokio::future_into_py(py, async move {
684 client
685 .cancel_order_by_id(
686 product_type,
687 trader_id,
688 strategy_id,
689 instrument_id,
690 client_order_id,
691 venue_order_id,
692 )
693 .await
694 .map_err(to_pyruntime_err)?;
695 Ok(())
696 })
697 }
698
699 #[pyo3(name = "build_place_order_params")]
700 #[pyo3(signature = (
701 product_type,
702 instrument_id,
703 client_order_id,
704 order_side,
705 order_type,
706 quantity,
707 is_quote_quantity=false,
708 time_in_force=None,
709 price=None,
710 trigger_price=None,
711 post_only=None,
712 reduce_only=None,
713 is_leverage=false,
714 ))]
715 #[allow(clippy::too_many_arguments)]
716 fn py_build_place_order_params(
717 &self,
718 product_type: BybitProductType,
719 instrument_id: InstrumentId,
720 client_order_id: ClientOrderId,
721 order_side: OrderSide,
722 order_type: OrderType,
723 quantity: Quantity,
724 is_quote_quantity: bool,
725 time_in_force: Option<TimeInForce>,
726 price: Option<Price>,
727 trigger_price: Option<Price>,
728 post_only: Option<bool>,
729 reduce_only: Option<bool>,
730 is_leverage: bool,
731 ) -> PyResult<BybitWsPlaceOrderParams> {
732 let params = self
733 .build_place_order_params(
734 product_type,
735 instrument_id,
736 client_order_id,
737 order_side,
738 order_type,
739 quantity,
740 is_quote_quantity,
741 time_in_force,
742 price,
743 trigger_price,
744 post_only,
745 reduce_only,
746 is_leverage,
747 )
748 .map_err(to_pyruntime_err)?;
749 Ok(params.into())
750 }
751
752 #[pyo3(name = "batch_cancel_orders")]
753 fn py_batch_cancel_orders<'py>(
754 &self,
755 py: Python<'py>,
756 trader_id: TraderId,
757 strategy_id: StrategyId,
758 orders: Vec<BybitWsCancelOrderParams>,
759 ) -> PyResult<Bound<'py, PyAny>> {
760 let client = self.clone();
761
762 pyo3_async_runtimes::tokio::future_into_py(py, async move {
763 let order_params: Vec<_> = orders
764 .into_iter()
765 .map(|p| p.try_into())
766 .collect::<Result<Vec<_>, _>>()
767 .map_err(to_pyruntime_err)?;
768
769 client
770 .batch_cancel_orders(trader_id, strategy_id, order_params)
771 .await
772 .map_err(to_pyruntime_err)?;
773
774 Ok(())
775 })
776 }
777
778 #[pyo3(name = "build_amend_order_params")]
779 #[allow(clippy::too_many_arguments)]
780 fn py_build_amend_order_params(
781 &self,
782 product_type: BybitProductType,
783 instrument_id: InstrumentId,
784 venue_order_id: Option<VenueOrderId>,
785 client_order_id: Option<ClientOrderId>,
786 quantity: Option<Quantity>,
787 price: Option<Price>,
788 ) -> PyResult<crate::python::params::BybitWsAmendOrderParams> {
789 let params = self
790 .build_amend_order_params(
791 product_type,
792 instrument_id,
793 venue_order_id,
794 client_order_id,
795 quantity,
796 price,
797 )
798 .map_err(to_pyruntime_err)?;
799 Ok(params.into())
800 }
801
802 #[pyo3(name = "build_cancel_order_params")]
803 fn py_build_cancel_order_params(
804 &self,
805 product_type: BybitProductType,
806 instrument_id: InstrumentId,
807 venue_order_id: Option<VenueOrderId>,
808 client_order_id: Option<ClientOrderId>,
809 ) -> PyResult<crate::python::params::BybitWsCancelOrderParams> {
810 let params = self
811 .build_cancel_order_params(product_type, instrument_id, venue_order_id, client_order_id)
812 .map_err(to_pyruntime_err)?;
813 Ok(params.into())
814 }
815
816 #[pyo3(name = "batch_modify_orders")]
817 fn py_batch_modify_orders<'py>(
818 &self,
819 py: Python<'py>,
820 trader_id: TraderId,
821 strategy_id: StrategyId,
822 orders: Vec<BybitWsAmendOrderParams>,
823 ) -> PyResult<Bound<'py, PyAny>> {
824 let client = self.clone();
825
826 pyo3_async_runtimes::tokio::future_into_py(py, async move {
827 let order_params: Vec<_> = orders
828 .into_iter()
829 .map(|p| p.try_into())
830 .collect::<Result<Vec<_>, _>>()
831 .map_err(to_pyruntime_err)?;
832
833 client
834 .batch_amend_orders(trader_id, strategy_id, order_params)
835 .await
836 .map_err(to_pyruntime_err)?;
837
838 Ok(())
839 })
840 }
841
842 #[pyo3(name = "batch_place_orders")]
843 fn py_batch_place_orders<'py>(
844 &self,
845 py: Python<'py>,
846 trader_id: TraderId,
847 strategy_id: StrategyId,
848 orders: Vec<BybitWsPlaceOrderParams>,
849 ) -> PyResult<Bound<'py, PyAny>> {
850 let client = self.clone();
851
852 pyo3_async_runtimes::tokio::future_into_py(py, async move {
853 let order_params: Vec<_> = orders
854 .into_iter()
855 .map(|p| p.try_into())
856 .collect::<Result<Vec<_>, _>>()
857 .map_err(to_pyruntime_err)?;
858
859 client
860 .batch_place_orders(trader_id, strategy_id, order_params)
861 .await
862 .map_err(to_pyruntime_err)?;
863
864 Ok(())
865 })
866 }
867}
868
869fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
870 if let Err(e) = callback.call1(py, (py_obj,)) {
871 tracing::error!("Error calling Python callback: {e}");
872 }
873}
874
875fn call_python_with_data<F>(callback: &Py<PyAny>, data_fn: F)
876where
877 F: FnOnce(Python<'_>) -> PyResult<Bound<'_, PyAny>> + Send + 'static,
878{
879 Python::attach(|py| match data_fn(py) {
880 Ok(data) => {
881 if let Err(e) = callback.call1(py, (data,)) {
882 tracing::error!("Error calling Python callback: {e}");
883 }
884 }
885 Err(e) => {
886 tracing::error!("Error converting data to Python: {e}");
887 }
888 });
889}