Skip to main content

nautilus_bybit/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Bybit WebSocket client.
17
18use futures_util::StreamExt;
19use nautilus_common::live::get_runtime;
20use nautilus_core::python::{call_python, to_pyruntime_err, to_pyvalue_err};
21use nautilus_model::{
22    data::{BarType, Data, OrderBookDeltas_API},
23    enums::{AggregationSource, BarAggregation, OrderSide, OrderType, PriceType, TimeInForce},
24    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
25    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
26    types::{Price, Quantity},
27};
28use pyo3::{IntoPyObjectExt, prelude::*};
29
30use crate::{
31    common::enums::{BybitEnvironment, BybitProductType},
32    python::params::{BybitWsAmendOrderParams, BybitWsCancelOrderParams, BybitWsPlaceOrderParams},
33    websocket::{
34        client::BybitWebSocketClient,
35        messages::{BybitWebSocketError, NautilusWsMessage},
36    },
37};
38
39fn validate_bar_type(bar_type: &BarType) -> anyhow::Result<()> {
40    let spec = bar_type.spec();
41
42    if spec.price_type != PriceType::Last {
43        anyhow::bail!(
44            "Invalid bar type: Bybit bars only support LAST price type, received {}",
45            spec.price_type
46        );
47    }
48    if bar_type.aggregation_source() != AggregationSource::External {
49        anyhow::bail!(
50            "Invalid bar type: Bybit bars only support EXTERNAL aggregation source, received {}",
51            bar_type.aggregation_source()
52        );
53    }
54
55    let step = spec.step.get();
56    if spec.aggregation == BarAggregation::Minute && step >= 60 {
57        let hours = step / 60;
58        anyhow::bail!("Invalid bar type: {step}-MINUTE not supported, use {hours}-HOUR instead");
59    }
60
61    Ok(())
62}
63
64#[pymethods]
65impl BybitWebSocketError {
66    fn __repr__(&self) -> String {
67        format!(
68            "BybitWebSocketError(code={}, message='{}', conn_id={:?}, topic={:?})",
69            self.code, self.message, self.conn_id, self.topic
70        )
71    }
72
73    #[getter]
74    pub fn code(&self) -> i64 {
75        self.code
76    }
77
78    #[getter]
79    pub fn message(&self) -> &str {
80        &self.message
81    }
82
83    #[getter]
84    pub fn conn_id(&self) -> Option<&str> {
85        self.conn_id.as_deref()
86    }
87
88    #[getter]
89    pub fn topic(&self) -> Option<&str> {
90        self.topic.as_deref()
91    }
92
93    #[getter]
94    pub fn req_id(&self) -> Option<&str> {
95        self.req_id.as_deref()
96    }
97}
98
99#[pymethods]
100impl BybitWebSocketClient {
101    #[staticmethod]
102    #[pyo3(name = "new_public")]
103    #[pyo3(signature = (product_type, environment, url=None, heartbeat=None))]
104    fn py_new_public(
105        product_type: BybitProductType,
106        environment: BybitEnvironment,
107        url: Option<String>,
108        heartbeat: Option<u64>,
109    ) -> Self {
110        Self::new_public_with(product_type, environment, url, heartbeat)
111    }
112
113    #[staticmethod]
114    #[pyo3(name = "new_private")]
115    #[pyo3(signature = (environment, api_key=None, api_secret=None, url=None, heartbeat=None))]
116    fn py_new_private(
117        environment: BybitEnvironment,
118        api_key: Option<String>,
119        api_secret: Option<String>,
120        url: Option<String>,
121        heartbeat: Option<u64>,
122    ) -> Self {
123        Self::new_private(environment, api_key, api_secret, url, heartbeat)
124    }
125
126    #[staticmethod]
127    #[pyo3(name = "new_trade")]
128    #[pyo3(signature = (environment, api_key=None, api_secret=None, url=None, heartbeat=None))]
129    fn py_new_trade(
130        environment: BybitEnvironment,
131        api_key: Option<String>,
132        api_secret: Option<String>,
133        url: Option<String>,
134        heartbeat: Option<u64>,
135    ) -> Self {
136        Self::new_trade(environment, api_key, api_secret, url, heartbeat)
137    }
138
139    #[getter]
140    #[pyo3(name = "api_key_masked")]
141    #[must_use]
142    pub fn py_api_key_masked(&self) -> Option<String> {
143        self.credential().map(|c| c.api_key_masked())
144    }
145
146    #[pyo3(name = "is_active")]
147    fn py_is_active(&self) -> bool {
148        self.is_active()
149    }
150
151    #[pyo3(name = "is_closed")]
152    fn py_is_closed(&self) -> bool {
153        self.is_closed()
154    }
155
156    #[pyo3(name = "subscription_count")]
157    fn py_subscription_count(&self) -> usize {
158        self.subscription_count()
159    }
160
161    #[pyo3(name = "cache_instrument")]
162    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
163        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
164        Ok(())
165    }
166
167    #[pyo3(name = "set_account_id")]
168    fn py_set_account_id(&mut self, account_id: AccountId) {
169        self.set_account_id(account_id);
170    }
171
172    #[pyo3(name = "set_mm_level")]
173    fn py_set_mm_level(&self, mm_level: u8) {
174        self.set_mm_level(mm_level);
175    }
176
177    #[pyo3(name = "set_bars_timestamp_on_close")]
178    fn py_set_bars_timestamp_on_close(&mut self, value: bool) {
179        self.set_bars_timestamp_on_close(value);
180    }
181
182    #[pyo3(name = "connect")]
183    fn py_connect<'py>(
184        &mut self,
185        py: Python<'py>,
186        callback: Py<PyAny>,
187    ) -> PyResult<Bound<'py, PyAny>> {
188        let mut client = self.clone();
189
190        pyo3_async_runtimes::tokio::future_into_py(py, async move {
191            client.connect().await.map_err(to_pyruntime_err)?;
192
193            let stream = client.stream();
194
195            get_runtime().spawn(async move {
196                tokio::pin!(stream);
197
198                while let Some(msg) = stream.next().await {
199                    match msg {
200                        NautilusWsMessage::Data(data_vec) => {
201                            Python::attach(|py| {
202                                for data in data_vec {
203                                    let py_obj = data_to_pycapsule(py, data);
204                                    call_python(py, &callback, py_obj);
205                                }
206                            });
207                        }
208                        NautilusWsMessage::Deltas(deltas) => {
209                            Python::attach(|py| {
210                                let py_obj = data_to_pycapsule(
211                                    py,
212                                    Data::Deltas(OrderBookDeltas_API::new(deltas)),
213                                );
214                                call_python(py, &callback, py_obj);
215                            });
216                        }
217                        NautilusWsMessage::FundingRates(rates) => {
218                            for rate in rates {
219                                call_python_with_data(&callback, move |py| {
220                                    rate.into_py_any(py).map(|obj| obj.into_bound(py))
221                                });
222                            }
223                        }
224                        NautilusWsMessage::MarkPrices(prices) => {
225                            for price in prices {
226                                call_python_with_data(&callback, move |py| {
227                                    price.into_py_any(py).map(|obj| obj.into_bound(py))
228                                });
229                            }
230                        }
231                        NautilusWsMessage::IndexPrices(prices) => {
232                            for price in prices {
233                                call_python_with_data(&callback, move |py| {
234                                    price.into_py_any(py).map(|obj| obj.into_bound(py))
235                                });
236                            }
237                        }
238                        NautilusWsMessage::OrderStatusReports(reports) => {
239                            for report in reports {
240                                call_python_with_data(&callback, move |py| {
241                                    report.into_py_any(py).map(|obj| obj.into_bound(py))
242                                });
243                            }
244                        }
245                        NautilusWsMessage::FillReports(reports) => {
246                            for report in reports {
247                                call_python_with_data(&callback, move |py| {
248                                    report.into_py_any(py).map(|obj| obj.into_bound(py))
249                                });
250                            }
251                        }
252                        NautilusWsMessage::PositionStatusReport(report) => {
253                            call_python_with_data(&callback, move |py| {
254                                report.into_py_any(py).map(|obj| obj.into_bound(py))
255                            });
256                        }
257                        NautilusWsMessage::AccountState(state) => {
258                            call_python_with_data(&callback, move |py| {
259                                state.into_py_any(py).map(|obj| obj.into_bound(py))
260                            });
261                        }
262                        NautilusWsMessage::OrderRejected(event) => {
263                            call_python_with_data(&callback, move |py| {
264                                event.into_py_any(py).map(|obj| obj.into_bound(py))
265                            });
266                        }
267                        NautilusWsMessage::OrderCancelRejected(event) => {
268                            call_python_with_data(&callback, move |py| {
269                                event.into_py_any(py).map(|obj| obj.into_bound(py))
270                            });
271                        }
272                        NautilusWsMessage::OrderModifyRejected(event) => {
273                            call_python_with_data(&callback, move |py| {
274                                event.into_py_any(py).map(|obj| obj.into_bound(py))
275                            });
276                        }
277                        NautilusWsMessage::Error(err) => {
278                            call_python_with_data(&callback, move |py| {
279                                err.into_py_any(py).map(|obj| obj.into_bound(py))
280                            });
281                        }
282                        NautilusWsMessage::Reconnected => {
283                            log::info!("WebSocket reconnected");
284                        }
285                        NautilusWsMessage::Authenticated => {
286                            log::info!("WebSocket authenticated");
287                        }
288                    }
289                }
290            });
291
292            Ok(())
293        })
294    }
295
296    #[pyo3(name = "close")]
297    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
298        let mut client = self.clone();
299
300        pyo3_async_runtimes::tokio::future_into_py(py, async move {
301            if let Err(e) = client.close().await {
302                log::error!("Error on close: {e}");
303            }
304            Ok(())
305        })
306    }
307
308    #[pyo3(name = "subscribe")]
309    fn py_subscribe<'py>(
310        &self,
311        py: Python<'py>,
312        topics: Vec<String>,
313    ) -> PyResult<Bound<'py, PyAny>> {
314        let client = self.clone();
315
316        pyo3_async_runtimes::tokio::future_into_py(py, async move {
317            client.subscribe(topics).await.map_err(to_pyruntime_err)?;
318            Ok(())
319        })
320    }
321
322    #[pyo3(name = "unsubscribe")]
323    fn py_unsubscribe<'py>(
324        &self,
325        py: Python<'py>,
326        topics: Vec<String>,
327    ) -> PyResult<Bound<'py, PyAny>> {
328        let client = self.clone();
329
330        pyo3_async_runtimes::tokio::future_into_py(py, async move {
331            client.unsubscribe(topics).await.map_err(to_pyruntime_err)?;
332            Ok(())
333        })
334    }
335
336    #[pyo3(name = "subscribe_orderbook")]
337    fn py_subscribe_orderbook<'py>(
338        &self,
339        py: Python<'py>,
340        instrument_id: InstrumentId,
341        depth: u32,
342    ) -> PyResult<Bound<'py, PyAny>> {
343        let client = self.clone();
344
345        pyo3_async_runtimes::tokio::future_into_py(py, async move {
346            client
347                .subscribe_orderbook(instrument_id, depth)
348                .await
349                .map_err(to_pyruntime_err)?;
350            Ok(())
351        })
352    }
353
354    #[pyo3(name = "unsubscribe_orderbook")]
355    fn py_unsubscribe_orderbook<'py>(
356        &self,
357        py: Python<'py>,
358        instrument_id: InstrumentId,
359        depth: u32,
360    ) -> PyResult<Bound<'py, PyAny>> {
361        let client = self.clone();
362
363        pyo3_async_runtimes::tokio::future_into_py(py, async move {
364            client
365                .unsubscribe_orderbook(instrument_id, depth)
366                .await
367                .map_err(to_pyruntime_err)?;
368            Ok(())
369        })
370    }
371
372    #[pyo3(name = "subscribe_trades")]
373    fn py_subscribe_trades<'py>(
374        &self,
375        py: Python<'py>,
376        instrument_id: InstrumentId,
377    ) -> PyResult<Bound<'py, PyAny>> {
378        let client = self.clone();
379
380        pyo3_async_runtimes::tokio::future_into_py(py, async move {
381            client
382                .subscribe_trades(instrument_id)
383                .await
384                .map_err(to_pyruntime_err)?;
385            Ok(())
386        })
387    }
388
389    #[pyo3(name = "unsubscribe_trades")]
390    fn py_unsubscribe_trades<'py>(
391        &self,
392        py: Python<'py>,
393        instrument_id: InstrumentId,
394    ) -> PyResult<Bound<'py, PyAny>> {
395        let client = self.clone();
396
397        pyo3_async_runtimes::tokio::future_into_py(py, async move {
398            client
399                .unsubscribe_trades(instrument_id)
400                .await
401                .map_err(to_pyruntime_err)?;
402            Ok(())
403        })
404    }
405
406    #[pyo3(name = "subscribe_ticker")]
407    fn py_subscribe_ticker<'py>(
408        &self,
409        py: Python<'py>,
410        instrument_id: InstrumentId,
411    ) -> PyResult<Bound<'py, PyAny>> {
412        let client = self.clone();
413
414        pyo3_async_runtimes::tokio::future_into_py(py, async move {
415            client
416                .subscribe_ticker(instrument_id)
417                .await
418                .map_err(to_pyruntime_err)?;
419            Ok(())
420        })
421    }
422
423    #[pyo3(name = "unsubscribe_ticker")]
424    fn py_unsubscribe_ticker<'py>(
425        &self,
426        py: Python<'py>,
427        instrument_id: InstrumentId,
428    ) -> PyResult<Bound<'py, PyAny>> {
429        let client = self.clone();
430
431        pyo3_async_runtimes::tokio::future_into_py(py, async move {
432            client
433                .unsubscribe_ticker(instrument_id)
434                .await
435                .map_err(to_pyruntime_err)?;
436            Ok(())
437        })
438    }
439
440    #[pyo3(name = "subscribe_bars")]
441    fn py_subscribe_bars<'py>(
442        &self,
443        py: Python<'py>,
444        bar_type: BarType,
445    ) -> PyResult<Bound<'py, PyAny>> {
446        validate_bar_type(&bar_type).map_err(to_pyvalue_err)?;
447
448        let client = self.clone();
449        pyo3_async_runtimes::tokio::future_into_py(py, async move {
450            client
451                .subscribe_bars(bar_type)
452                .await
453                .map_err(to_pyruntime_err)?;
454            Ok(())
455        })
456    }
457
458    #[pyo3(name = "unsubscribe_bars")]
459    fn py_unsubscribe_bars<'py>(
460        &self,
461        py: Python<'py>,
462        bar_type: BarType,
463    ) -> PyResult<Bound<'py, PyAny>> {
464        validate_bar_type(&bar_type).map_err(to_pyvalue_err)?;
465
466        let client = self.clone();
467        pyo3_async_runtimes::tokio::future_into_py(py, async move {
468            client
469                .unsubscribe_bars(bar_type)
470                .await
471                .map_err(to_pyruntime_err)?;
472            Ok(())
473        })
474    }
475
476    #[pyo3(name = "subscribe_orders")]
477    fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
478        let client = self.clone();
479
480        pyo3_async_runtimes::tokio::future_into_py(py, async move {
481            client.subscribe_orders().await.map_err(to_pyruntime_err)?;
482            Ok(())
483        })
484    }
485
486    #[pyo3(name = "unsubscribe_orders")]
487    fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
488        let client = self.clone();
489
490        pyo3_async_runtimes::tokio::future_into_py(py, async move {
491            client
492                .unsubscribe_orders()
493                .await
494                .map_err(to_pyruntime_err)?;
495            Ok(())
496        })
497    }
498
499    #[pyo3(name = "subscribe_executions")]
500    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
501        let client = self.clone();
502
503        pyo3_async_runtimes::tokio::future_into_py(py, async move {
504            client
505                .subscribe_executions()
506                .await
507                .map_err(to_pyruntime_err)?;
508            Ok(())
509        })
510    }
511
512    #[pyo3(name = "unsubscribe_executions")]
513    fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
514        let client = self.clone();
515
516        pyo3_async_runtimes::tokio::future_into_py(py, async move {
517            client
518                .unsubscribe_executions()
519                .await
520                .map_err(to_pyruntime_err)?;
521            Ok(())
522        })
523    }
524
525    #[pyo3(name = "subscribe_positions")]
526    fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
527        let client = self.clone();
528
529        pyo3_async_runtimes::tokio::future_into_py(py, async move {
530            client
531                .subscribe_positions()
532                .await
533                .map_err(to_pyruntime_err)?;
534            Ok(())
535        })
536    }
537
538    #[pyo3(name = "unsubscribe_positions")]
539    fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
540        let client = self.clone();
541
542        pyo3_async_runtimes::tokio::future_into_py(py, async move {
543            client
544                .unsubscribe_positions()
545                .await
546                .map_err(to_pyruntime_err)?;
547            Ok(())
548        })
549    }
550
551    #[pyo3(name = "subscribe_wallet")]
552    fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
553        let client = self.clone();
554
555        pyo3_async_runtimes::tokio::future_into_py(py, async move {
556            client.subscribe_wallet().await.map_err(to_pyruntime_err)?;
557            Ok(())
558        })
559    }
560
561    #[pyo3(name = "unsubscribe_wallet")]
562    fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
563        let client = self.clone();
564
565        pyo3_async_runtimes::tokio::future_into_py(py, async move {
566            client
567                .unsubscribe_wallet()
568                .await
569                .map_err(to_pyruntime_err)?;
570            Ok(())
571        })
572    }
573
574    #[pyo3(name = "wait_until_active")]
575    fn py_wait_until_active<'py>(
576        &self,
577        py: Python<'py>,
578        timeout_secs: f64,
579    ) -> PyResult<Bound<'py, PyAny>> {
580        let client = self.clone();
581
582        pyo3_async_runtimes::tokio::future_into_py(py, async move {
583            client
584                .wait_until_active(timeout_secs)
585                .await
586                .map_err(to_pyruntime_err)?;
587            Ok(())
588        })
589    }
590
591    #[pyo3(name = "submit_order")]
592    #[pyo3(signature = (
593        product_type,
594        trader_id,
595        strategy_id,
596        instrument_id,
597        client_order_id,
598        order_side,
599        order_type,
600        quantity,
601        is_quote_quantity=false,
602        time_in_force=None,
603        price=None,
604        trigger_price=None,
605        post_only=None,
606        reduce_only=None,
607        is_leverage=false,
608    ))]
609    #[allow(clippy::too_many_arguments)]
610    fn py_submit_order<'py>(
611        &self,
612        py: Python<'py>,
613        product_type: BybitProductType,
614        trader_id: TraderId,
615        strategy_id: StrategyId,
616        instrument_id: InstrumentId,
617        client_order_id: ClientOrderId,
618        order_side: OrderSide,
619        order_type: OrderType,
620        quantity: Quantity,
621        is_quote_quantity: bool,
622        time_in_force: Option<TimeInForce>,
623        price: Option<Price>,
624        trigger_price: Option<Price>,
625        post_only: Option<bool>,
626        reduce_only: Option<bool>,
627        is_leverage: bool,
628    ) -> PyResult<Bound<'py, PyAny>> {
629        let client = self.clone();
630
631        pyo3_async_runtimes::tokio::future_into_py(py, async move {
632            client
633                .submit_order(
634                    product_type,
635                    trader_id,
636                    strategy_id,
637                    instrument_id,
638                    client_order_id,
639                    order_side,
640                    order_type,
641                    quantity,
642                    is_quote_quantity,
643                    time_in_force,
644                    price,
645                    trigger_price,
646                    post_only,
647                    reduce_only,
648                    is_leverage,
649                )
650                .await
651                .map_err(to_pyruntime_err)?;
652            Ok(())
653        })
654    }
655
656    #[pyo3(name = "modify_order")]
657    #[pyo3(signature = (
658        product_type,
659        trader_id,
660        strategy_id,
661        instrument_id,
662        client_order_id,
663        venue_order_id=None,
664        quantity=None,
665        price=None,
666    ))]
667    #[allow(clippy::too_many_arguments)]
668    fn py_modify_order<'py>(
669        &self,
670        py: Python<'py>,
671        product_type: BybitProductType,
672        trader_id: TraderId,
673        strategy_id: StrategyId,
674        instrument_id: InstrumentId,
675        client_order_id: ClientOrderId,
676        venue_order_id: Option<VenueOrderId>,
677        quantity: Option<Quantity>,
678        price: Option<Price>,
679    ) -> PyResult<Bound<'py, PyAny>> {
680        let client = self.clone();
681
682        pyo3_async_runtimes::tokio::future_into_py(py, async move {
683            client
684                .modify_order(
685                    product_type,
686                    trader_id,
687                    strategy_id,
688                    instrument_id,
689                    client_order_id,
690                    venue_order_id,
691                    quantity,
692                    price,
693                )
694                .await
695                .map_err(to_pyruntime_err)?;
696            Ok(())
697        })
698    }
699
700    #[pyo3(name = "cancel_order")]
701    #[pyo3(signature = (
702        product_type,
703        trader_id,
704        strategy_id,
705        instrument_id,
706        client_order_id,
707        venue_order_id=None,
708    ))]
709    #[allow(clippy::too_many_arguments)]
710    fn py_cancel_order<'py>(
711        &self,
712        py: Python<'py>,
713        product_type: BybitProductType,
714        trader_id: TraderId,
715        strategy_id: StrategyId,
716        instrument_id: InstrumentId,
717        client_order_id: ClientOrderId,
718        venue_order_id: Option<VenueOrderId>,
719    ) -> PyResult<Bound<'py, PyAny>> {
720        let client = self.clone();
721
722        pyo3_async_runtimes::tokio::future_into_py(py, async move {
723            client
724                .cancel_order_by_id(
725                    product_type,
726                    trader_id,
727                    strategy_id,
728                    instrument_id,
729                    client_order_id,
730                    venue_order_id,
731                )
732                .await
733                .map_err(to_pyruntime_err)?;
734            Ok(())
735        })
736    }
737
738    #[pyo3(name = "build_place_order_params")]
739    #[pyo3(signature = (
740        product_type,
741        instrument_id,
742        client_order_id,
743        order_side,
744        order_type,
745        quantity,
746        is_quote_quantity=false,
747        time_in_force=None,
748        price=None,
749        trigger_price=None,
750        post_only=None,
751        reduce_only=None,
752        is_leverage=false,
753        take_profit=None,
754        stop_loss=None,
755    ))]
756    #[allow(clippy::too_many_arguments)]
757    fn py_build_place_order_params(
758        &self,
759        product_type: BybitProductType,
760        instrument_id: InstrumentId,
761        client_order_id: ClientOrderId,
762        order_side: OrderSide,
763        order_type: OrderType,
764        quantity: Quantity,
765        is_quote_quantity: bool,
766        time_in_force: Option<TimeInForce>,
767        price: Option<Price>,
768        trigger_price: Option<Price>,
769        post_only: Option<bool>,
770        reduce_only: Option<bool>,
771        is_leverage: bool,
772        take_profit: Option<Price>,
773        stop_loss: Option<Price>,
774    ) -> PyResult<BybitWsPlaceOrderParams> {
775        let params = self
776            .build_place_order_params(
777                product_type,
778                instrument_id,
779                client_order_id,
780                order_side,
781                order_type,
782                quantity,
783                is_quote_quantity,
784                time_in_force,
785                price,
786                trigger_price,
787                post_only,
788                reduce_only,
789                is_leverage,
790                take_profit,
791                stop_loss,
792            )
793            .map_err(to_pyruntime_err)?;
794        Ok(params.into())
795    }
796
797    #[pyo3(name = "batch_cancel_orders")]
798    fn py_batch_cancel_orders<'py>(
799        &self,
800        py: Python<'py>,
801        trader_id: TraderId,
802        strategy_id: StrategyId,
803        orders: Vec<BybitWsCancelOrderParams>,
804    ) -> PyResult<Bound<'py, PyAny>> {
805        let client = self.clone();
806
807        pyo3_async_runtimes::tokio::future_into_py(py, async move {
808            let order_params: Vec<_> = orders
809                .into_iter()
810                .map(|p| p.try_into())
811                .collect::<Result<Vec<_>, _>>()
812                .map_err(to_pyruntime_err)?;
813
814            client
815                .batch_cancel_orders(trader_id, strategy_id, order_params)
816                .await
817                .map_err(to_pyruntime_err)?;
818
819            Ok(())
820        })
821    }
822
823    #[pyo3(name = "build_amend_order_params")]
824    #[allow(clippy::too_many_arguments)]
825    fn py_build_amend_order_params(
826        &self,
827        product_type: BybitProductType,
828        instrument_id: InstrumentId,
829        venue_order_id: Option<VenueOrderId>,
830        client_order_id: Option<ClientOrderId>,
831        quantity: Option<Quantity>,
832        price: Option<Price>,
833    ) -> PyResult<crate::python::params::BybitWsAmendOrderParams> {
834        let params = self
835            .build_amend_order_params(
836                product_type,
837                instrument_id,
838                venue_order_id,
839                client_order_id,
840                quantity,
841                price,
842            )
843            .map_err(to_pyruntime_err)?;
844        Ok(params.into())
845    }
846
847    #[pyo3(name = "build_cancel_order_params")]
848    fn py_build_cancel_order_params(
849        &self,
850        product_type: BybitProductType,
851        instrument_id: InstrumentId,
852        venue_order_id: Option<VenueOrderId>,
853        client_order_id: Option<ClientOrderId>,
854    ) -> PyResult<crate::python::params::BybitWsCancelOrderParams> {
855        let params = self
856            .build_cancel_order_params(product_type, instrument_id, venue_order_id, client_order_id)
857            .map_err(to_pyruntime_err)?;
858        Ok(params.into())
859    }
860
861    #[pyo3(name = "batch_modify_orders")]
862    fn py_batch_modify_orders<'py>(
863        &self,
864        py: Python<'py>,
865        trader_id: TraderId,
866        strategy_id: StrategyId,
867        orders: Vec<BybitWsAmendOrderParams>,
868    ) -> PyResult<Bound<'py, PyAny>> {
869        let client = self.clone();
870
871        pyo3_async_runtimes::tokio::future_into_py(py, async move {
872            let order_params: Vec<_> = orders
873                .into_iter()
874                .map(|p| p.try_into())
875                .collect::<Result<Vec<_>, _>>()
876                .map_err(to_pyruntime_err)?;
877
878            client
879                .batch_amend_orders(trader_id, strategy_id, order_params)
880                .await
881                .map_err(to_pyruntime_err)?;
882
883            Ok(())
884        })
885    }
886
887    #[pyo3(name = "batch_place_orders")]
888    fn py_batch_place_orders<'py>(
889        &self,
890        py: Python<'py>,
891        trader_id: TraderId,
892        strategy_id: StrategyId,
893        orders: Vec<BybitWsPlaceOrderParams>,
894    ) -> PyResult<Bound<'py, PyAny>> {
895        let client = self.clone();
896
897        pyo3_async_runtimes::tokio::future_into_py(py, async move {
898            let order_params: Vec<_> = orders
899                .into_iter()
900                .map(|p| p.try_into())
901                .collect::<Result<Vec<_>, _>>()
902                .map_err(to_pyruntime_err)?;
903
904            client
905                .batch_place_orders(trader_id, strategy_id, order_params)
906                .await
907                .map_err(to_pyruntime_err)?;
908
909            Ok(())
910        })
911    }
912}
913
914fn call_python_with_data<F>(callback: &Py<PyAny>, data_fn: F)
915where
916    F: FnOnce(Python<'_>) -> PyResult<Bound<'_, PyAny>> + Send + 'static,
917{
918    Python::attach(|py| match data_fn(py) {
919        Ok(data) => {
920            if let Err(e) = callback.call1(py, (data,)) {
921                log::error!("Error calling Python callback: {e}");
922            }
923        }
924        Err(e) => {
925            log::error!("Error converting data to Python: {e}");
926        }
927    });
928}