nautilus_bybit/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Bybit WebSocket client.
17
18use futures_util::StreamExt;
19use nautilus_core::python::to_pyruntime_err;
20use nautilus_model::{
21    data::{Data, OrderBookDeltas_API},
22    enums::{OrderSide, OrderType, TimeInForce},
23    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
24    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
25    types::{Price, Quantity},
26};
27use pyo3::{IntoPyObjectExt, prelude::*};
28
29use crate::{
30    common::enums::{BybitEnvironment, BybitProductType},
31    python::params::{BybitWsAmendOrderParams, BybitWsCancelOrderParams, BybitWsPlaceOrderParams},
32    websocket::{
33        client::BybitWebSocketClient,
34        messages::{BybitWebSocketError, NautilusWsMessage},
35    },
36};
37
38#[pymethods]
39impl BybitWebSocketError {
40    fn __repr__(&self) -> String {
41        format!(
42            "BybitWebSocketError(code={}, message='{}', conn_id={:?}, topic={:?})",
43            self.code, self.message, self.conn_id, self.topic
44        )
45    }
46
47    #[getter]
48    pub fn code(&self) -> i64 {
49        self.code
50    }
51
52    #[getter]
53    pub fn message(&self) -> &str {
54        &self.message
55    }
56
57    #[getter]
58    pub fn conn_id(&self) -> Option<&str> {
59        self.conn_id.as_deref()
60    }
61
62    #[getter]
63    pub fn topic(&self) -> Option<&str> {
64        self.topic.as_deref()
65    }
66
67    #[getter]
68    pub fn req_id(&self) -> Option<&str> {
69        self.req_id.as_deref()
70    }
71}
72
73#[pymethods]
74impl BybitWebSocketClient {
75    #[staticmethod]
76    #[pyo3(name = "new_public")]
77    #[pyo3(signature = (product_type, environment, url=None, heartbeat=None))]
78    fn py_new_public(
79        product_type: BybitProductType,
80        environment: BybitEnvironment,
81        url: Option<String>,
82        heartbeat: Option<u64>,
83    ) -> Self {
84        Self::new_public_with(product_type, environment, url, heartbeat)
85    }
86
87    #[staticmethod]
88    #[pyo3(name = "new_private")]
89    #[pyo3(signature = (environment, api_key=None, api_secret=None, url=None, heartbeat=None))]
90    fn py_new_private(
91        environment: BybitEnvironment,
92        api_key: Option<String>,
93        api_secret: Option<String>,
94        url: Option<String>,
95        heartbeat: Option<u64>,
96    ) -> Self {
97        Self::new_private(environment, api_key, api_secret, url, heartbeat)
98    }
99
100    #[staticmethod]
101    #[pyo3(name = "new_trade")]
102    #[pyo3(signature = (environment, api_key=None, api_secret=None, url=None, heartbeat=None))]
103    fn py_new_trade(
104        environment: BybitEnvironment,
105        api_key: Option<String>,
106        api_secret: Option<String>,
107        url: Option<String>,
108        heartbeat: Option<u64>,
109    ) -> Self {
110        Self::new_trade(environment, api_key, api_secret, url, heartbeat)
111    }
112
113    #[getter]
114    #[pyo3(name = "api_key_masked")]
115    #[must_use]
116    pub fn py_api_key_masked(&self) -> Option<String> {
117        self.credential().map(|c| c.api_key_masked())
118    }
119
120    #[pyo3(name = "is_active")]
121    fn py_is_active(&self) -> bool {
122        self.is_active()
123    }
124
125    #[pyo3(name = "is_closed")]
126    fn py_is_closed(&self) -> bool {
127        self.is_closed()
128    }
129
130    #[pyo3(name = "subscription_count")]
131    fn py_subscription_count(&self) -> usize {
132        self.subscription_count()
133    }
134
135    #[pyo3(name = "cache_instrument")]
136    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
137        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
138        Ok(())
139    }
140
141    #[pyo3(name = "set_account_id")]
142    fn py_set_account_id(&mut self, account_id: AccountId) {
143        self.set_account_id(account_id);
144    }
145
146    #[pyo3(name = "set_mm_level")]
147    fn py_set_mm_level(&self, mm_level: u8) {
148        self.set_mm_level(mm_level);
149    }
150
151    #[pyo3(name = "connect")]
152    fn py_connect<'py>(
153        &mut self,
154        py: Python<'py>,
155        callback: Py<PyAny>,
156    ) -> PyResult<Bound<'py, PyAny>> {
157        let mut client = self.clone();
158
159        pyo3_async_runtimes::tokio::future_into_py(py, async move {
160            client.connect().await.map_err(to_pyruntime_err)?;
161
162            let stream = client.stream();
163
164            tokio::spawn(async move {
165                tokio::pin!(stream);
166
167                while let Some(msg) = stream.next().await {
168                    match msg {
169                        NautilusWsMessage::Data(data_vec) => {
170                            Python::attach(|py| {
171                                for data in data_vec {
172                                    let py_obj = data_to_pycapsule(py, data);
173                                    call_python(py, &callback, py_obj);
174                                }
175                            });
176                        }
177                        NautilusWsMessage::Deltas(deltas) => {
178                            Python::attach(|py| {
179                                let py_obj = data_to_pycapsule(
180                                    py,
181                                    Data::Deltas(OrderBookDeltas_API::new(deltas)),
182                                );
183                                call_python(py, &callback, py_obj);
184                            });
185                        }
186                        NautilusWsMessage::FundingRates(rates) => {
187                            for rate in rates {
188                                call_python_with_data(&callback, move |py| {
189                                    rate.into_py_any(py).map(|obj| obj.into_bound(py))
190                                });
191                            }
192                        }
193                        NautilusWsMessage::OrderStatusReports(reports) => {
194                            for report in reports {
195                                call_python_with_data(&callback, move |py| {
196                                    report.into_py_any(py).map(|obj| obj.into_bound(py))
197                                });
198                            }
199                        }
200                        NautilusWsMessage::FillReports(reports) => {
201                            for report in reports {
202                                call_python_with_data(&callback, move |py| {
203                                    report.into_py_any(py).map(|obj| obj.into_bound(py))
204                                });
205                            }
206                        }
207                        NautilusWsMessage::PositionStatusReport(report) => {
208                            call_python_with_data(&callback, move |py| {
209                                report.into_py_any(py).map(|obj| obj.into_bound(py))
210                            });
211                        }
212                        NautilusWsMessage::AccountState(state) => {
213                            call_python_with_data(&callback, move |py| {
214                                state.into_py_any(py).map(|obj| obj.into_bound(py))
215                            });
216                        }
217                        NautilusWsMessage::OrderRejected(event) => {
218                            call_python_with_data(&callback, move |py| {
219                                event.into_py_any(py).map(|obj| obj.into_bound(py))
220                            });
221                        }
222                        NautilusWsMessage::OrderCancelRejected(event) => {
223                            call_python_with_data(&callback, move |py| {
224                                event.into_py_any(py).map(|obj| obj.into_bound(py))
225                            });
226                        }
227                        NautilusWsMessage::OrderModifyRejected(event) => {
228                            call_python_with_data(&callback, move |py| {
229                                event.into_py_any(py).map(|obj| obj.into_bound(py))
230                            });
231                        }
232                        NautilusWsMessage::Error(err) => {
233                            call_python_with_data(&callback, move |py| {
234                                err.into_py_any(py).map(|obj| obj.into_bound(py))
235                            });
236                        }
237                        NautilusWsMessage::Reconnected => {
238                            tracing::info!("WebSocket reconnected");
239                        }
240                        NautilusWsMessage::Authenticated => {
241                            tracing::info!("WebSocket authenticated");
242                        }
243                    }
244                }
245            });
246
247            Ok(())
248        })
249    }
250
251    #[pyo3(name = "close")]
252    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
253        let mut client = self.clone();
254
255        pyo3_async_runtimes::tokio::future_into_py(py, async move {
256            if let Err(e) = client.close().await {
257                tracing::error!("Error on close: {e}");
258            }
259            Ok(())
260        })
261    }
262
263    #[pyo3(name = "subscribe")]
264    fn py_subscribe<'py>(
265        &self,
266        py: Python<'py>,
267        topics: Vec<String>,
268    ) -> PyResult<Bound<'py, PyAny>> {
269        let client = self.clone();
270
271        pyo3_async_runtimes::tokio::future_into_py(py, async move {
272            client.subscribe(topics).await.map_err(to_pyruntime_err)?;
273            Ok(())
274        })
275    }
276
277    #[pyo3(name = "unsubscribe")]
278    fn py_unsubscribe<'py>(
279        &self,
280        py: Python<'py>,
281        topics: Vec<String>,
282    ) -> PyResult<Bound<'py, PyAny>> {
283        let client = self.clone();
284
285        pyo3_async_runtimes::tokio::future_into_py(py, async move {
286            client.unsubscribe(topics).await.map_err(to_pyruntime_err)?;
287            Ok(())
288        })
289    }
290
291    #[pyo3(name = "subscribe_orderbook")]
292    fn py_subscribe_orderbook<'py>(
293        &self,
294        py: Python<'py>,
295        instrument_id: InstrumentId,
296        depth: u32,
297    ) -> PyResult<Bound<'py, PyAny>> {
298        let client = self.clone();
299
300        pyo3_async_runtimes::tokio::future_into_py(py, async move {
301            client
302                .subscribe_orderbook(instrument_id, depth)
303                .await
304                .map_err(to_pyruntime_err)?;
305            Ok(())
306        })
307    }
308
309    #[pyo3(name = "unsubscribe_orderbook")]
310    fn py_unsubscribe_orderbook<'py>(
311        &self,
312        py: Python<'py>,
313        instrument_id: InstrumentId,
314        depth: u32,
315    ) -> PyResult<Bound<'py, PyAny>> {
316        let client = self.clone();
317
318        pyo3_async_runtimes::tokio::future_into_py(py, async move {
319            client
320                .unsubscribe_orderbook(instrument_id, depth)
321                .await
322                .map_err(to_pyruntime_err)?;
323            Ok(())
324        })
325    }
326
327    #[pyo3(name = "subscribe_trades")]
328    fn py_subscribe_trades<'py>(
329        &self,
330        py: Python<'py>,
331        instrument_id: InstrumentId,
332    ) -> PyResult<Bound<'py, PyAny>> {
333        let client = self.clone();
334
335        pyo3_async_runtimes::tokio::future_into_py(py, async move {
336            client
337                .subscribe_trades(instrument_id)
338                .await
339                .map_err(to_pyruntime_err)?;
340            Ok(())
341        })
342    }
343
344    #[pyo3(name = "unsubscribe_trades")]
345    fn py_unsubscribe_trades<'py>(
346        &self,
347        py: Python<'py>,
348        instrument_id: InstrumentId,
349    ) -> PyResult<Bound<'py, PyAny>> {
350        let client = self.clone();
351
352        pyo3_async_runtimes::tokio::future_into_py(py, async move {
353            client
354                .unsubscribe_trades(instrument_id)
355                .await
356                .map_err(to_pyruntime_err)?;
357            Ok(())
358        })
359    }
360
361    #[pyo3(name = "subscribe_ticker")]
362    fn py_subscribe_ticker<'py>(
363        &self,
364        py: Python<'py>,
365        instrument_id: InstrumentId,
366    ) -> PyResult<Bound<'py, PyAny>> {
367        let client = self.clone();
368
369        pyo3_async_runtimes::tokio::future_into_py(py, async move {
370            client
371                .subscribe_ticker(instrument_id)
372                .await
373                .map_err(to_pyruntime_err)?;
374            Ok(())
375        })
376    }
377
378    #[pyo3(name = "unsubscribe_ticker")]
379    fn py_unsubscribe_ticker<'py>(
380        &self,
381        py: Python<'py>,
382        instrument_id: InstrumentId,
383    ) -> PyResult<Bound<'py, PyAny>> {
384        let client = self.clone();
385
386        pyo3_async_runtimes::tokio::future_into_py(py, async move {
387            client
388                .unsubscribe_ticker(instrument_id)
389                .await
390                .map_err(to_pyruntime_err)?;
391            Ok(())
392        })
393    }
394
395    #[pyo3(name = "subscribe_klines")]
396    fn py_subscribe_klines<'py>(
397        &self,
398        py: Python<'py>,
399        instrument_id: InstrumentId,
400        interval: String,
401    ) -> PyResult<Bound<'py, PyAny>> {
402        let client = self.clone();
403
404        pyo3_async_runtimes::tokio::future_into_py(py, async move {
405            client
406                .subscribe_klines(instrument_id, interval)
407                .await
408                .map_err(to_pyruntime_err)?;
409            Ok(())
410        })
411    }
412
413    #[pyo3(name = "unsubscribe_klines")]
414    fn py_unsubscribe_klines<'py>(
415        &self,
416        py: Python<'py>,
417        instrument_id: InstrumentId,
418        interval: String,
419    ) -> PyResult<Bound<'py, PyAny>> {
420        let client = self.clone();
421
422        pyo3_async_runtimes::tokio::future_into_py(py, async move {
423            client
424                .unsubscribe_klines(instrument_id, interval)
425                .await
426                .map_err(to_pyruntime_err)?;
427            Ok(())
428        })
429    }
430
431    #[pyo3(name = "subscribe_orders")]
432    fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
433        let client = self.clone();
434
435        pyo3_async_runtimes::tokio::future_into_py(py, async move {
436            client.subscribe_orders().await.map_err(to_pyruntime_err)?;
437            Ok(())
438        })
439    }
440
441    #[pyo3(name = "unsubscribe_orders")]
442    fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
443        let client = self.clone();
444
445        pyo3_async_runtimes::tokio::future_into_py(py, async move {
446            client
447                .unsubscribe_orders()
448                .await
449                .map_err(to_pyruntime_err)?;
450            Ok(())
451        })
452    }
453
454    #[pyo3(name = "subscribe_executions")]
455    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
456        let client = self.clone();
457
458        pyo3_async_runtimes::tokio::future_into_py(py, async move {
459            client
460                .subscribe_executions()
461                .await
462                .map_err(to_pyruntime_err)?;
463            Ok(())
464        })
465    }
466
467    #[pyo3(name = "unsubscribe_executions")]
468    fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
469        let client = self.clone();
470
471        pyo3_async_runtimes::tokio::future_into_py(py, async move {
472            client
473                .unsubscribe_executions()
474                .await
475                .map_err(to_pyruntime_err)?;
476            Ok(())
477        })
478    }
479
480    #[pyo3(name = "subscribe_positions")]
481    fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
482        let client = self.clone();
483
484        pyo3_async_runtimes::tokio::future_into_py(py, async move {
485            client
486                .subscribe_positions()
487                .await
488                .map_err(to_pyruntime_err)?;
489            Ok(())
490        })
491    }
492
493    #[pyo3(name = "unsubscribe_positions")]
494    fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
495        let client = self.clone();
496
497        pyo3_async_runtimes::tokio::future_into_py(py, async move {
498            client
499                .unsubscribe_positions()
500                .await
501                .map_err(to_pyruntime_err)?;
502            Ok(())
503        })
504    }
505
506    #[pyo3(name = "subscribe_wallet")]
507    fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
508        let client = self.clone();
509
510        pyo3_async_runtimes::tokio::future_into_py(py, async move {
511            client.subscribe_wallet().await.map_err(to_pyruntime_err)?;
512            Ok(())
513        })
514    }
515
516    #[pyo3(name = "unsubscribe_wallet")]
517    fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
518        let client = self.clone();
519
520        pyo3_async_runtimes::tokio::future_into_py(py, async move {
521            client
522                .unsubscribe_wallet()
523                .await
524                .map_err(to_pyruntime_err)?;
525            Ok(())
526        })
527    }
528
529    #[pyo3(name = "wait_until_active")]
530    fn py_wait_until_active<'py>(
531        &self,
532        py: Python<'py>,
533        timeout_secs: f64,
534    ) -> PyResult<Bound<'py, PyAny>> {
535        let client = self.clone();
536
537        pyo3_async_runtimes::tokio::future_into_py(py, async move {
538            client
539                .wait_until_active(timeout_secs)
540                .await
541                .map_err(to_pyruntime_err)?;
542            Ok(())
543        })
544    }
545
546    #[pyo3(name = "submit_order")]
547    #[pyo3(signature = (
548        product_type,
549        trader_id,
550        strategy_id,
551        instrument_id,
552        client_order_id,
553        order_side,
554        order_type,
555        quantity,
556        is_quote_quantity=false,
557        time_in_force=None,
558        price=None,
559        trigger_price=None,
560        post_only=None,
561        reduce_only=None,
562        is_leverage=false,
563    ))]
564    #[allow(clippy::too_many_arguments)]
565    fn py_submit_order<'py>(
566        &self,
567        py: Python<'py>,
568        product_type: BybitProductType,
569        trader_id: TraderId,
570        strategy_id: StrategyId,
571        instrument_id: InstrumentId,
572        client_order_id: ClientOrderId,
573        order_side: OrderSide,
574        order_type: OrderType,
575        quantity: Quantity,
576        is_quote_quantity: bool,
577        time_in_force: Option<TimeInForce>,
578        price: Option<Price>,
579        trigger_price: Option<Price>,
580        post_only: Option<bool>,
581        reduce_only: Option<bool>,
582        is_leverage: bool,
583    ) -> PyResult<Bound<'py, PyAny>> {
584        let client = self.clone();
585
586        pyo3_async_runtimes::tokio::future_into_py(py, async move {
587            client
588                .submit_order(
589                    product_type,
590                    trader_id,
591                    strategy_id,
592                    instrument_id,
593                    client_order_id,
594                    order_side,
595                    order_type,
596                    quantity,
597                    is_quote_quantity,
598                    time_in_force,
599                    price,
600                    trigger_price,
601                    post_only,
602                    reduce_only,
603                    is_leverage,
604                )
605                .await
606                .map_err(to_pyruntime_err)?;
607            Ok(())
608        })
609    }
610
611    #[pyo3(name = "modify_order")]
612    #[pyo3(signature = (
613        product_type,
614        trader_id,
615        strategy_id,
616        instrument_id,
617        client_order_id,
618        venue_order_id=None,
619        quantity=None,
620        price=None,
621    ))]
622    #[allow(clippy::too_many_arguments)]
623    fn py_modify_order<'py>(
624        &self,
625        py: Python<'py>,
626        product_type: BybitProductType,
627        trader_id: TraderId,
628        strategy_id: StrategyId,
629        instrument_id: InstrumentId,
630        client_order_id: ClientOrderId,
631        venue_order_id: Option<VenueOrderId>,
632        quantity: Option<Quantity>,
633        price: Option<Price>,
634    ) -> PyResult<Bound<'py, PyAny>> {
635        let client = self.clone();
636
637        pyo3_async_runtimes::tokio::future_into_py(py, async move {
638            client
639                .modify_order(
640                    product_type,
641                    trader_id,
642                    strategy_id,
643                    instrument_id,
644                    client_order_id,
645                    venue_order_id,
646                    quantity,
647                    price,
648                )
649                .await
650                .map_err(to_pyruntime_err)?;
651            Ok(())
652        })
653    }
654
655    #[pyo3(name = "cancel_order")]
656    #[pyo3(signature = (
657        product_type,
658        trader_id,
659        strategy_id,
660        instrument_id,
661        client_order_id,
662        venue_order_id=None,
663    ))]
664    #[allow(clippy::too_many_arguments)]
665    fn py_cancel_order<'py>(
666        &self,
667        py: Python<'py>,
668        product_type: BybitProductType,
669        trader_id: TraderId,
670        strategy_id: StrategyId,
671        instrument_id: InstrumentId,
672        client_order_id: ClientOrderId,
673        venue_order_id: Option<VenueOrderId>,
674    ) -> PyResult<Bound<'py, PyAny>> {
675        let client = self.clone();
676
677        pyo3_async_runtimes::tokio::future_into_py(py, async move {
678            client
679                .cancel_order_by_id(
680                    product_type,
681                    trader_id,
682                    strategy_id,
683                    instrument_id,
684                    client_order_id,
685                    venue_order_id,
686                )
687                .await
688                .map_err(to_pyruntime_err)?;
689            Ok(())
690        })
691    }
692
693    #[pyo3(name = "build_place_order_params")]
694    #[pyo3(signature = (
695        product_type,
696        instrument_id,
697        client_order_id,
698        order_side,
699        order_type,
700        quantity,
701        is_quote_quantity=false,
702        time_in_force=None,
703        price=None,
704        trigger_price=None,
705        post_only=None,
706        reduce_only=None,
707        is_leverage=false,
708    ))]
709    #[allow(clippy::too_many_arguments)]
710    fn py_build_place_order_params(
711        &self,
712        product_type: BybitProductType,
713        instrument_id: InstrumentId,
714        client_order_id: ClientOrderId,
715        order_side: OrderSide,
716        order_type: OrderType,
717        quantity: Quantity,
718        is_quote_quantity: bool,
719        time_in_force: Option<TimeInForce>,
720        price: Option<Price>,
721        trigger_price: Option<Price>,
722        post_only: Option<bool>,
723        reduce_only: Option<bool>,
724        is_leverage: bool,
725    ) -> PyResult<BybitWsPlaceOrderParams> {
726        let params = self
727            .build_place_order_params(
728                product_type,
729                instrument_id,
730                client_order_id,
731                order_side,
732                order_type,
733                quantity,
734                is_quote_quantity,
735                time_in_force,
736                price,
737                trigger_price,
738                post_only,
739                reduce_only,
740                is_leverage,
741            )
742            .map_err(to_pyruntime_err)?;
743        Ok(params.into())
744    }
745
746    #[pyo3(name = "batch_cancel_orders")]
747    fn py_batch_cancel_orders<'py>(
748        &self,
749        py: Python<'py>,
750        trader_id: TraderId,
751        strategy_id: StrategyId,
752        orders: Vec<BybitWsCancelOrderParams>,
753    ) -> PyResult<Bound<'py, PyAny>> {
754        let client = self.clone();
755
756        pyo3_async_runtimes::tokio::future_into_py(py, async move {
757            let order_params: Vec<_> = orders
758                .into_iter()
759                .map(|p| p.try_into())
760                .collect::<Result<Vec<_>, _>>()
761                .map_err(to_pyruntime_err)?;
762
763            client
764                .batch_cancel_orders(trader_id, strategy_id, order_params)
765                .await
766                .map_err(to_pyruntime_err)?;
767
768            Ok(())
769        })
770    }
771
772    #[pyo3(name = "build_amend_order_params")]
773    #[allow(clippy::too_many_arguments)]
774    fn py_build_amend_order_params(
775        &self,
776        product_type: BybitProductType,
777        instrument_id: InstrumentId,
778        venue_order_id: Option<VenueOrderId>,
779        client_order_id: Option<ClientOrderId>,
780        quantity: Option<Quantity>,
781        price: Option<Price>,
782    ) -> PyResult<crate::python::params::BybitWsAmendOrderParams> {
783        let params = self
784            .build_amend_order_params(
785                product_type,
786                instrument_id,
787                venue_order_id,
788                client_order_id,
789                quantity,
790                price,
791            )
792            .map_err(to_pyruntime_err)?;
793        Ok(params.into())
794    }
795
796    #[pyo3(name = "build_cancel_order_params")]
797    fn py_build_cancel_order_params(
798        &self,
799        product_type: BybitProductType,
800        instrument_id: InstrumentId,
801        venue_order_id: Option<VenueOrderId>,
802        client_order_id: Option<ClientOrderId>,
803    ) -> PyResult<crate::python::params::BybitWsCancelOrderParams> {
804        let params = self
805            .build_cancel_order_params(product_type, instrument_id, venue_order_id, client_order_id)
806            .map_err(to_pyruntime_err)?;
807        Ok(params.into())
808    }
809
810    #[pyo3(name = "batch_modify_orders")]
811    fn py_batch_modify_orders<'py>(
812        &self,
813        py: Python<'py>,
814        trader_id: TraderId,
815        strategy_id: StrategyId,
816        orders: Vec<BybitWsAmendOrderParams>,
817    ) -> PyResult<Bound<'py, PyAny>> {
818        let client = self.clone();
819
820        pyo3_async_runtimes::tokio::future_into_py(py, async move {
821            let order_params: Vec<_> = orders
822                .into_iter()
823                .map(|p| p.try_into())
824                .collect::<Result<Vec<_>, _>>()
825                .map_err(to_pyruntime_err)?;
826
827            client
828                .batch_amend_orders(trader_id, strategy_id, order_params)
829                .await
830                .map_err(to_pyruntime_err)?;
831
832            Ok(())
833        })
834    }
835
836    #[pyo3(name = "batch_place_orders")]
837    fn py_batch_place_orders<'py>(
838        &self,
839        py: Python<'py>,
840        trader_id: TraderId,
841        strategy_id: StrategyId,
842        orders: Vec<BybitWsPlaceOrderParams>,
843    ) -> PyResult<Bound<'py, PyAny>> {
844        let client = self.clone();
845
846        pyo3_async_runtimes::tokio::future_into_py(py, async move {
847            let order_params: Vec<_> = orders
848                .into_iter()
849                .map(|p| p.try_into())
850                .collect::<Result<Vec<_>, _>>()
851                .map_err(to_pyruntime_err)?;
852
853            client
854                .batch_place_orders(trader_id, strategy_id, order_params)
855                .await
856                .map_err(to_pyruntime_err)?;
857
858            Ok(())
859        })
860    }
861}
862
863fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
864    if let Err(e) = callback.call1(py, (py_obj,)) {
865        tracing::error!("Error calling Python callback: {e}");
866    }
867}
868
869fn call_python_with_data<F>(callback: &Py<PyAny>, data_fn: F)
870where
871    F: FnOnce(Python<'_>) -> PyResult<Bound<'_, PyAny>> + Send + 'static,
872{
873    Python::attach(|py| match data_fn(py) {
874        Ok(data) => {
875            if let Err(e) = callback.call1(py, (data,)) {
876                tracing::error!("Error calling Python callback: {e}");
877            }
878        }
879        Err(e) => {
880            tracing::error!("Error converting data to Python: {e}");
881        }
882    });
883}