nautilus_bybit/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Bybit WebSocket client.
17
18use futures_util::StreamExt;
19use nautilus_core::python::to_pyruntime_err;
20use nautilus_model::{
21    data::{Data, OrderBookDeltas_API},
22    enums::{OrderSide, OrderType, TimeInForce},
23    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
24    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
25    types::{Price, Quantity},
26};
27use pyo3::{IntoPyObjectExt, prelude::*};
28
29use crate::{
30    common::{
31        credential::Credential,
32        enums::{BybitEnvironment, BybitProductType},
33    },
34    python::params::{BybitWsAmendOrderParams, BybitWsPlaceOrderParams},
35    websocket::{
36        client::BybitWebSocketClient,
37        messages::{BybitWebSocketError, NautilusWsMessage},
38    },
39};
40
41#[pymethods]
42impl BybitWebSocketError {
43    fn __repr__(&self) -> String {
44        format!(
45            "BybitWebSocketError(code={}, message='{}', conn_id={:?}, topic={:?})",
46            self.code, self.message, self.conn_id, self.topic
47        )
48    }
49
50    #[getter]
51    pub fn code(&self) -> i64 {
52        self.code
53    }
54
55    #[getter]
56    pub fn message(&self) -> &str {
57        &self.message
58    }
59
60    #[getter]
61    pub fn conn_id(&self) -> Option<&str> {
62        self.conn_id.as_deref()
63    }
64
65    #[getter]
66    pub fn topic(&self) -> Option<&str> {
67        self.topic.as_deref()
68    }
69
70    #[getter]
71    pub fn req_id(&self) -> Option<&str> {
72        self.req_id.as_deref()
73    }
74}
75
76#[pymethods]
77impl BybitWebSocketClient {
78    #[staticmethod]
79    #[pyo3(name = "new_public")]
80    #[pyo3(signature = (product_type, environment, url=None, heartbeat=None))]
81    fn py_new_public(
82        product_type: BybitProductType,
83        environment: BybitEnvironment,
84        url: Option<String>,
85        heartbeat: Option<u64>,
86    ) -> Self {
87        Self::new_public_with(product_type, environment, url, heartbeat)
88    }
89
90    #[staticmethod]
91    #[pyo3(name = "new_private")]
92    #[pyo3(signature = (environment, api_key=None, api_secret=None, url=None, heartbeat=None))]
93    fn py_new_private(
94        environment: BybitEnvironment,
95        api_key: Option<String>,
96        api_secret: Option<String>,
97        url: Option<String>,
98        heartbeat: Option<u64>,
99    ) -> Self {
100        // If both api_key and api_secret are None, try to load from environment
101        let (final_api_key, final_api_secret) = if api_key.is_none() && api_secret.is_none() {
102            let (key_var, secret_var) = match environment {
103                BybitEnvironment::Testnet => ("BYBIT_TESTNET_API_KEY", "BYBIT_TESTNET_API_SECRET"),
104                _ => ("BYBIT_API_KEY", "BYBIT_API_SECRET"),
105            };
106            let env_key = std::env::var(key_var).ok().unwrap_or_default();
107            let env_secret = std::env::var(secret_var).ok().unwrap_or_default();
108            (env_key, env_secret)
109        } else {
110            (api_key.unwrap_or_default(), api_secret.unwrap_or_default())
111        };
112
113        tracing::debug!(
114            "Creating private WebSocket client with API key: {}",
115            &final_api_key[..final_api_key.len().min(10)]
116        );
117        let credential = Credential::new(final_api_key, final_api_secret);
118        Self::new_private(environment, credential, url, heartbeat)
119    }
120
121    #[staticmethod]
122    #[pyo3(name = "new_trade")]
123    #[pyo3(signature = (environment, api_key=None, api_secret=None, url=None, heartbeat=None))]
124    fn py_new_trade(
125        environment: BybitEnvironment,
126        api_key: Option<String>,
127        api_secret: Option<String>,
128        url: Option<String>,
129        heartbeat: Option<u64>,
130    ) -> Self {
131        // If both api_key and api_secret are None, try to load from environment
132        let (final_api_key, final_api_secret) = if api_key.is_none() && api_secret.is_none() {
133            let (key_var, secret_var) = match environment {
134                BybitEnvironment::Testnet => ("BYBIT_TESTNET_API_KEY", "BYBIT_TESTNET_API_SECRET"),
135                _ => ("BYBIT_API_KEY", "BYBIT_API_SECRET"),
136            };
137            let env_key = std::env::var(key_var).ok().unwrap_or_default();
138            let env_secret = std::env::var(secret_var).ok().unwrap_or_default();
139            (env_key, env_secret)
140        } else {
141            (api_key.unwrap_or_default(), api_secret.unwrap_or_default())
142        };
143
144        let credential = Credential::new(final_api_key, final_api_secret);
145        Self::new_trade(environment, credential, url, heartbeat)
146    }
147
148    #[pyo3(name = "is_active")]
149    fn py_is_active(&self) -> bool {
150        self.is_active()
151    }
152
153    #[pyo3(name = "is_closed")]
154    fn py_is_closed(&self) -> bool {
155        self.is_closed()
156    }
157
158    #[pyo3(name = "subscription_count")]
159    fn py_subscription_count(&self) -> usize {
160        self.subscription_count()
161    }
162
163    #[getter]
164    #[pyo3(name = "api_key_masked")]
165    #[must_use]
166    pub fn py_api_key_masked(&self) -> Option<String> {
167        self.credential().map(|c| c.api_key_masked())
168    }
169
170    #[pyo3(name = "cache_instrument")]
171    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
172        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
173        Ok(())
174    }
175
176    #[pyo3(name = "set_account_id")]
177    fn py_set_account_id(&mut self, account_id: AccountId) {
178        self.set_account_id(account_id);
179    }
180
181    #[pyo3(name = "set_mm_level")]
182    fn py_set_mm_level(&self, mm_level: u8) {
183        self.set_mm_level(mm_level);
184    }
185
186    #[pyo3(name = "connect")]
187    fn py_connect<'py>(
188        &mut self,
189        py: Python<'py>,
190        callback: Py<PyAny>,
191    ) -> PyResult<Bound<'py, PyAny>> {
192        let mut client = self.clone();
193
194        pyo3_async_runtimes::tokio::future_into_py(py, async move {
195            client.connect().await.map_err(to_pyruntime_err)?;
196
197            let stream = client.stream();
198
199            tokio::spawn(async move {
200                tokio::pin!(stream);
201
202                while let Some(msg) = stream.next().await {
203                    match msg {
204                        NautilusWsMessage::Data(data_vec) => {
205                            Python::attach(|py| {
206                                for data in data_vec {
207                                    let py_obj = data_to_pycapsule(py, data);
208                                    call_python(py, &callback, py_obj);
209                                }
210                            });
211                        }
212                        NautilusWsMessage::Deltas(deltas) => {
213                            Python::attach(|py| {
214                                let py_obj = data_to_pycapsule(
215                                    py,
216                                    Data::Deltas(OrderBookDeltas_API::new(deltas)),
217                                );
218                                call_python(py, &callback, py_obj);
219                            });
220                        }
221                        NautilusWsMessage::FundingRates(rates) => {
222                            for rate in rates {
223                                call_python_with_data(&callback, move |py| {
224                                    rate.into_py_any(py).map(|obj| obj.into_bound(py))
225                                });
226                            }
227                        }
228                        NautilusWsMessage::OrderStatusReports(reports) => {
229                            for report in reports {
230                                call_python_with_data(&callback, move |py| {
231                                    report.into_py_any(py).map(|obj| obj.into_bound(py))
232                                });
233                            }
234                        }
235                        NautilusWsMessage::FillReports(reports) => {
236                            for report in reports {
237                                call_python_with_data(&callback, move |py| {
238                                    report.into_py_any(py).map(|obj| obj.into_bound(py))
239                                });
240                            }
241                        }
242                        NautilusWsMessage::PositionStatusReport(report) => {
243                            call_python_with_data(&callback, move |py| {
244                                report.into_py_any(py).map(|obj| obj.into_bound(py))
245                            });
246                        }
247                        NautilusWsMessage::AccountState(state) => {
248                            call_python_with_data(&callback, move |py| {
249                                state.into_py_any(py).map(|obj| obj.into_bound(py))
250                            });
251                        }
252                        NautilusWsMessage::OrderRejected(event) => {
253                            call_python_with_data(&callback, move |py| {
254                                event.into_py_any(py).map(|obj| obj.into_bound(py))
255                            });
256                        }
257                        NautilusWsMessage::OrderCancelRejected(event) => {
258                            call_python_with_data(&callback, move |py| {
259                                event.into_py_any(py).map(|obj| obj.into_bound(py))
260                            });
261                        }
262                        NautilusWsMessage::OrderModifyRejected(event) => {
263                            call_python_with_data(&callback, move |py| {
264                                event.into_py_any(py).map(|obj| obj.into_bound(py))
265                            });
266                        }
267                        NautilusWsMessage::Error(err) => {
268                            call_python_with_data(&callback, move |py| {
269                                err.into_py_any(py).map(|obj| obj.into_bound(py))
270                            });
271                        }
272                        NautilusWsMessage::Reconnected => {
273                            tracing::info!("WebSocket reconnected");
274                        }
275                        NautilusWsMessage::Authenticated => {
276                            tracing::info!("WebSocket authenticated");
277                        }
278                    }
279                }
280            });
281
282            Ok(())
283        })
284    }
285
286    #[pyo3(name = "close")]
287    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
288        let mut client = self.clone();
289
290        pyo3_async_runtimes::tokio::future_into_py(py, async move {
291            if let Err(e) = client.close().await {
292                tracing::error!("Error on close: {e}");
293            }
294            Ok(())
295        })
296    }
297
298    #[pyo3(name = "subscribe")]
299    fn py_subscribe<'py>(
300        &self,
301        py: Python<'py>,
302        topics: Vec<String>,
303    ) -> PyResult<Bound<'py, PyAny>> {
304        let client = self.clone();
305
306        pyo3_async_runtimes::tokio::future_into_py(py, async move {
307            client.subscribe(topics).await.map_err(to_pyruntime_err)?;
308            Ok(())
309        })
310    }
311
312    #[pyo3(name = "unsubscribe")]
313    fn py_unsubscribe<'py>(
314        &self,
315        py: Python<'py>,
316        topics: Vec<String>,
317    ) -> PyResult<Bound<'py, PyAny>> {
318        let client = self.clone();
319
320        pyo3_async_runtimes::tokio::future_into_py(py, async move {
321            client.unsubscribe(topics).await.map_err(to_pyruntime_err)?;
322            Ok(())
323        })
324    }
325
326    #[pyo3(name = "subscribe_orderbook")]
327    fn py_subscribe_orderbook<'py>(
328        &self,
329        py: Python<'py>,
330        instrument_id: InstrumentId,
331        depth: u32,
332    ) -> PyResult<Bound<'py, PyAny>> {
333        let client = self.clone();
334
335        pyo3_async_runtimes::tokio::future_into_py(py, async move {
336            client
337                .subscribe_orderbook(instrument_id, depth)
338                .await
339                .map_err(to_pyruntime_err)?;
340            Ok(())
341        })
342    }
343
344    #[pyo3(name = "unsubscribe_orderbook")]
345    fn py_unsubscribe_orderbook<'py>(
346        &self,
347        py: Python<'py>,
348        instrument_id: InstrumentId,
349        depth: u32,
350    ) -> PyResult<Bound<'py, PyAny>> {
351        let client = self.clone();
352
353        pyo3_async_runtimes::tokio::future_into_py(py, async move {
354            client
355                .unsubscribe_orderbook(instrument_id, depth)
356                .await
357                .map_err(to_pyruntime_err)?;
358            Ok(())
359        })
360    }
361
362    #[pyo3(name = "subscribe_trades")]
363    fn py_subscribe_trades<'py>(
364        &self,
365        py: Python<'py>,
366        instrument_id: InstrumentId,
367    ) -> PyResult<Bound<'py, PyAny>> {
368        let client = self.clone();
369
370        pyo3_async_runtimes::tokio::future_into_py(py, async move {
371            client
372                .subscribe_trades(instrument_id)
373                .await
374                .map_err(to_pyruntime_err)?;
375            Ok(())
376        })
377    }
378
379    #[pyo3(name = "unsubscribe_trades")]
380    fn py_unsubscribe_trades<'py>(
381        &self,
382        py: Python<'py>,
383        instrument_id: InstrumentId,
384    ) -> PyResult<Bound<'py, PyAny>> {
385        let client = self.clone();
386
387        pyo3_async_runtimes::tokio::future_into_py(py, async move {
388            client
389                .unsubscribe_trades(instrument_id)
390                .await
391                .map_err(to_pyruntime_err)?;
392            Ok(())
393        })
394    }
395
396    #[pyo3(name = "subscribe_ticker")]
397    fn py_subscribe_ticker<'py>(
398        &self,
399        py: Python<'py>,
400        instrument_id: InstrumentId,
401    ) -> PyResult<Bound<'py, PyAny>> {
402        let client = self.clone();
403
404        pyo3_async_runtimes::tokio::future_into_py(py, async move {
405            client
406                .subscribe_ticker(instrument_id)
407                .await
408                .map_err(to_pyruntime_err)?;
409            Ok(())
410        })
411    }
412
413    #[pyo3(name = "unsubscribe_ticker")]
414    fn py_unsubscribe_ticker<'py>(
415        &self,
416        py: Python<'py>,
417        instrument_id: InstrumentId,
418    ) -> PyResult<Bound<'py, PyAny>> {
419        let client = self.clone();
420
421        pyo3_async_runtimes::tokio::future_into_py(py, async move {
422            client
423                .unsubscribe_ticker(instrument_id)
424                .await
425                .map_err(to_pyruntime_err)?;
426            Ok(())
427        })
428    }
429
430    #[pyo3(name = "subscribe_klines")]
431    fn py_subscribe_klines<'py>(
432        &self,
433        py: Python<'py>,
434        instrument_id: InstrumentId,
435        interval: String,
436    ) -> PyResult<Bound<'py, PyAny>> {
437        let client = self.clone();
438
439        pyo3_async_runtimes::tokio::future_into_py(py, async move {
440            client
441                .subscribe_klines(instrument_id, interval)
442                .await
443                .map_err(to_pyruntime_err)?;
444            Ok(())
445        })
446    }
447
448    #[pyo3(name = "unsubscribe_klines")]
449    fn py_unsubscribe_klines<'py>(
450        &self,
451        py: Python<'py>,
452        instrument_id: InstrumentId,
453        interval: String,
454    ) -> PyResult<Bound<'py, PyAny>> {
455        let client = self.clone();
456
457        pyo3_async_runtimes::tokio::future_into_py(py, async move {
458            client
459                .unsubscribe_klines(instrument_id, interval)
460                .await
461                .map_err(to_pyruntime_err)?;
462            Ok(())
463        })
464    }
465
466    #[pyo3(name = "subscribe_orders")]
467    fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
468        let client = self.clone();
469
470        pyo3_async_runtimes::tokio::future_into_py(py, async move {
471            client.subscribe_orders().await.map_err(to_pyruntime_err)?;
472            Ok(())
473        })
474    }
475
476    #[pyo3(name = "unsubscribe_orders")]
477    fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
478        let client = self.clone();
479
480        pyo3_async_runtimes::tokio::future_into_py(py, async move {
481            client
482                .unsubscribe_orders()
483                .await
484                .map_err(to_pyruntime_err)?;
485            Ok(())
486        })
487    }
488
489    #[pyo3(name = "subscribe_executions")]
490    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
491        let client = self.clone();
492
493        pyo3_async_runtimes::tokio::future_into_py(py, async move {
494            client
495                .subscribe_executions()
496                .await
497                .map_err(to_pyruntime_err)?;
498            Ok(())
499        })
500    }
501
502    #[pyo3(name = "unsubscribe_executions")]
503    fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
504        let client = self.clone();
505
506        pyo3_async_runtimes::tokio::future_into_py(py, async move {
507            client
508                .unsubscribe_executions()
509                .await
510                .map_err(to_pyruntime_err)?;
511            Ok(())
512        })
513    }
514
515    #[pyo3(name = "subscribe_positions")]
516    fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
517        let client = self.clone();
518
519        pyo3_async_runtimes::tokio::future_into_py(py, async move {
520            client
521                .subscribe_positions()
522                .await
523                .map_err(to_pyruntime_err)?;
524            Ok(())
525        })
526    }
527
528    #[pyo3(name = "unsubscribe_positions")]
529    fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
530        let client = self.clone();
531
532        pyo3_async_runtimes::tokio::future_into_py(py, async move {
533            client
534                .unsubscribe_positions()
535                .await
536                .map_err(to_pyruntime_err)?;
537            Ok(())
538        })
539    }
540
541    #[pyo3(name = "subscribe_wallet")]
542    fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
543        let client = self.clone();
544
545        pyo3_async_runtimes::tokio::future_into_py(py, async move {
546            client.subscribe_wallet().await.map_err(to_pyruntime_err)?;
547            Ok(())
548        })
549    }
550
551    #[pyo3(name = "unsubscribe_wallet")]
552    fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
553        let client = self.clone();
554
555        pyo3_async_runtimes::tokio::future_into_py(py, async move {
556            client
557                .unsubscribe_wallet()
558                .await
559                .map_err(to_pyruntime_err)?;
560            Ok(())
561        })
562    }
563
564    #[pyo3(name = "wait_until_active")]
565    fn py_wait_until_active<'py>(
566        &self,
567        py: Python<'py>,
568        timeout_secs: f64,
569    ) -> PyResult<Bound<'py, PyAny>> {
570        let client = self.clone();
571
572        pyo3_async_runtimes::tokio::future_into_py(py, async move {
573            client
574                .wait_until_active(timeout_secs)
575                .await
576                .map_err(to_pyruntime_err)?;
577            Ok(())
578        })
579    }
580
581    #[pyo3(name = "submit_order")]
582    #[pyo3(signature = (
583        product_type,
584        trader_id,
585        strategy_id,
586        instrument_id,
587        client_order_id,
588        order_side,
589        order_type,
590        quantity,
591        is_quote_quantity=false,
592        time_in_force=None,
593        price=None,
594        trigger_price=None,
595        post_only=None,
596        reduce_only=None,
597        is_leverage=false,
598    ))]
599    #[allow(clippy::too_many_arguments)]
600    fn py_submit_order<'py>(
601        &self,
602        py: Python<'py>,
603        product_type: BybitProductType,
604        trader_id: TraderId,
605        strategy_id: StrategyId,
606        instrument_id: InstrumentId,
607        client_order_id: ClientOrderId,
608        order_side: OrderSide,
609        order_type: OrderType,
610        quantity: Quantity,
611        is_quote_quantity: bool,
612        time_in_force: Option<TimeInForce>,
613        price: Option<Price>,
614        trigger_price: Option<Price>,
615        post_only: Option<bool>,
616        reduce_only: Option<bool>,
617        is_leverage: bool,
618    ) -> PyResult<Bound<'py, PyAny>> {
619        let client = self.clone();
620
621        pyo3_async_runtimes::tokio::future_into_py(py, async move {
622            client
623                .submit_order(
624                    product_type,
625                    trader_id,
626                    strategy_id,
627                    instrument_id,
628                    client_order_id,
629                    order_side,
630                    order_type,
631                    quantity,
632                    is_quote_quantity,
633                    time_in_force,
634                    price,
635                    trigger_price,
636                    post_only,
637                    reduce_only,
638                    is_leverage,
639                )
640                .await
641                .map_err(to_pyruntime_err)?;
642            Ok(())
643        })
644    }
645
646    #[pyo3(name = "modify_order")]
647    #[pyo3(signature = (
648        product_type,
649        trader_id,
650        strategy_id,
651        instrument_id,
652        client_order_id,
653        venue_order_id=None,
654        quantity=None,
655        price=None,
656    ))]
657    #[allow(clippy::too_many_arguments)]
658    fn py_modify_order<'py>(
659        &self,
660        py: Python<'py>,
661        product_type: BybitProductType,
662        trader_id: TraderId,
663        strategy_id: StrategyId,
664        instrument_id: InstrumentId,
665        client_order_id: ClientOrderId,
666        venue_order_id: Option<VenueOrderId>,
667        quantity: Option<Quantity>,
668        price: Option<Price>,
669    ) -> PyResult<Bound<'py, PyAny>> {
670        let client = self.clone();
671
672        pyo3_async_runtimes::tokio::future_into_py(py, async move {
673            client
674                .modify_order(
675                    product_type,
676                    trader_id,
677                    strategy_id,
678                    instrument_id,
679                    client_order_id,
680                    venue_order_id,
681                    quantity,
682                    price,
683                )
684                .await
685                .map_err(to_pyruntime_err)?;
686            Ok(())
687        })
688    }
689
690    #[pyo3(name = "cancel_order")]
691    #[pyo3(signature = (
692        product_type,
693        trader_id,
694        strategy_id,
695        instrument_id,
696        client_order_id,
697        venue_order_id=None,
698    ))]
699    #[allow(clippy::too_many_arguments)]
700    fn py_cancel_order<'py>(
701        &self,
702        py: Python<'py>,
703        product_type: BybitProductType,
704        trader_id: TraderId,
705        strategy_id: StrategyId,
706        instrument_id: InstrumentId,
707        client_order_id: ClientOrderId,
708        venue_order_id: Option<VenueOrderId>,
709    ) -> PyResult<Bound<'py, PyAny>> {
710        let client = self.clone();
711
712        pyo3_async_runtimes::tokio::future_into_py(py, async move {
713            client
714                .cancel_order_by_id(
715                    product_type,
716                    trader_id,
717                    strategy_id,
718                    instrument_id,
719                    client_order_id,
720                    venue_order_id,
721                )
722                .await
723                .map_err(to_pyruntime_err)?;
724            Ok(())
725        })
726    }
727
728    #[pyo3(name = "build_place_order_params")]
729    #[pyo3(signature = (
730        product_type,
731        instrument_id,
732        client_order_id,
733        order_side,
734        order_type,
735        quantity,
736        is_quote_quantity=false,
737        time_in_force=None,
738        price=None,
739        trigger_price=None,
740        post_only=None,
741        reduce_only=None,
742        is_leverage=false,
743    ))]
744    #[allow(clippy::too_many_arguments)]
745    fn py_build_place_order_params(
746        &self,
747        product_type: BybitProductType,
748        instrument_id: InstrumentId,
749        client_order_id: ClientOrderId,
750        order_side: OrderSide,
751        order_type: OrderType,
752        quantity: Quantity,
753        is_quote_quantity: bool,
754        time_in_force: Option<TimeInForce>,
755        price: Option<Price>,
756        trigger_price: Option<Price>,
757        post_only: Option<bool>,
758        reduce_only: Option<bool>,
759        is_leverage: bool,
760    ) -> PyResult<BybitWsPlaceOrderParams> {
761        let params = self
762            .build_place_order_params(
763                product_type,
764                instrument_id,
765                client_order_id,
766                order_side,
767                order_type,
768                quantity,
769                is_quote_quantity,
770                time_in_force,
771                price,
772                trigger_price,
773                post_only,
774                reduce_only,
775                is_leverage,
776            )
777            .map_err(to_pyruntime_err)?;
778        Ok(params.into())
779    }
780
781    #[pyo3(name = "batch_cancel_orders")]
782    #[pyo3(signature = (
783        product_type,
784        trader_id,
785        strategy_id,
786        instrument_ids,
787        venue_order_ids,
788        client_order_ids,
789    ))]
790    #[allow(clippy::too_many_arguments)]
791    fn py_batch_cancel_orders<'py>(
792        &self,
793        py: Python<'py>,
794        product_type: BybitProductType,
795        trader_id: TraderId,
796        strategy_id: StrategyId,
797        instrument_ids: Vec<InstrumentId>,
798        venue_order_ids: Vec<Option<VenueOrderId>>,
799        client_order_ids: Vec<Option<ClientOrderId>>,
800    ) -> PyResult<Bound<'py, PyAny>> {
801        let client = self.clone();
802
803        pyo3_async_runtimes::tokio::future_into_py(py, async move {
804            client
805                .batch_cancel_orders_by_id(
806                    product_type,
807                    trader_id,
808                    strategy_id,
809                    instrument_ids,
810                    venue_order_ids,
811                    client_order_ids,
812                )
813                .await
814                .map_err(to_pyruntime_err)?;
815            Ok(())
816        })
817    }
818
819    #[pyo3(name = "build_amend_order_params")]
820    #[allow(clippy::too_many_arguments)]
821    fn py_build_amend_order_params(
822        &self,
823        product_type: BybitProductType,
824        instrument_id: InstrumentId,
825        venue_order_id: Option<VenueOrderId>,
826        client_order_id: Option<ClientOrderId>,
827        quantity: Option<Quantity>,
828        price: Option<Price>,
829    ) -> PyResult<crate::python::params::BybitWsAmendOrderParams> {
830        let params = self
831            .build_amend_order_params(
832                product_type,
833                instrument_id,
834                venue_order_id,
835                client_order_id,
836                quantity,
837                price,
838            )
839            .map_err(to_pyruntime_err)?;
840        Ok(params.into())
841    }
842
843    #[pyo3(name = "batch_modify_orders")]
844    fn py_batch_modify_orders<'py>(
845        &self,
846        py: Python<'py>,
847        trader_id: TraderId,
848        strategy_id: StrategyId,
849        orders: Vec<BybitWsAmendOrderParams>,
850    ) -> PyResult<Bound<'py, PyAny>> {
851        let client = self.clone();
852
853        pyo3_async_runtimes::tokio::future_into_py(py, async move {
854            let order_params: Vec<_> = orders
855                .into_iter()
856                .map(|p| p.try_into())
857                .collect::<Result<Vec<_>, _>>()
858                .map_err(to_pyruntime_err)?;
859
860            client
861                .batch_amend_orders(trader_id, strategy_id, order_params)
862                .await
863                .map_err(to_pyruntime_err)?;
864
865            Ok(())
866        })
867    }
868
869    #[pyo3(name = "batch_place_orders")]
870    fn py_batch_place_orders<'py>(
871        &self,
872        py: Python<'py>,
873        trader_id: TraderId,
874        strategy_id: StrategyId,
875        orders: Vec<BybitWsPlaceOrderParams>,
876    ) -> PyResult<Bound<'py, PyAny>> {
877        let client = self.clone();
878
879        pyo3_async_runtimes::tokio::future_into_py(py, async move {
880            let order_params: Vec<_> = orders
881                .into_iter()
882                .map(|p| p.try_into())
883                .collect::<Result<Vec<_>, _>>()
884                .map_err(to_pyruntime_err)?;
885
886            client
887                .batch_place_orders(trader_id, strategy_id, order_params)
888                .await
889                .map_err(to_pyruntime_err)?;
890
891            Ok(())
892        })
893    }
894}
895
896fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
897    if let Err(e) = callback.call1(py, (py_obj,)) {
898        tracing::error!("Error calling Python callback: {e}");
899    }
900}
901
902fn call_python_with_data<F>(callback: &Py<PyAny>, data_fn: F)
903where
904    F: FnOnce(Python<'_>) -> PyResult<Bound<'_, PyAny>> + Send + 'static,
905{
906    Python::attach(|py| match data_fn(py) {
907        Ok(data) => {
908            if let Err(e) = callback.call1(py, (data,)) {
909                tracing::error!("Error calling Python callback: {e}");
910            }
911        }
912        Err(e) => {
913            tracing::error!("Error converting data to Python: {e}");
914        }
915    });
916}