1use futures_util::StreamExt;
19use nautilus_common::live::get_runtime;
20use nautilus_core::python::{call_python, to_pyruntime_err, to_pyvalue_err};
21use nautilus_model::{
22 data::{BarType, Data, OrderBookDeltas_API},
23 enums::{AggregationSource, BarAggregation, OrderSide, OrderType, PriceType, TimeInForce},
24 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
25 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
26 types::{Price, Quantity},
27};
28use pyo3::{IntoPyObjectExt, prelude::*};
29
30use crate::{
31 common::enums::{BybitEnvironment, BybitProductType},
32 python::params::{BybitWsAmendOrderParams, BybitWsCancelOrderParams, BybitWsPlaceOrderParams},
33 websocket::{
34 client::BybitWebSocketClient,
35 messages::{BybitWebSocketError, NautilusWsMessage},
36 },
37};
38
39fn validate_bar_type(bar_type: &BarType) -> anyhow::Result<()> {
40 let spec = bar_type.spec();
41
42 if spec.price_type != PriceType::Last {
43 anyhow::bail!(
44 "Invalid bar type: Bybit bars only support LAST price type, received {}",
45 spec.price_type
46 );
47 }
48 if bar_type.aggregation_source() != AggregationSource::External {
49 anyhow::bail!(
50 "Invalid bar type: Bybit bars only support EXTERNAL aggregation source, received {}",
51 bar_type.aggregation_source()
52 );
53 }
54
55 let step = spec.step.get();
56 if spec.aggregation == BarAggregation::Minute && step >= 60 {
57 let hours = step / 60;
58 anyhow::bail!("Invalid bar type: {step}-MINUTE not supported, use {hours}-HOUR instead");
59 }
60
61 Ok(())
62}
63
64#[pymethods]
65impl BybitWebSocketError {
66 fn __repr__(&self) -> String {
67 format!(
68 "BybitWebSocketError(code={}, message='{}', conn_id={:?}, topic={:?})",
69 self.code, self.message, self.conn_id, self.topic
70 )
71 }
72
73 #[getter]
74 pub fn code(&self) -> i64 {
75 self.code
76 }
77
78 #[getter]
79 pub fn message(&self) -> &str {
80 &self.message
81 }
82
83 #[getter]
84 pub fn conn_id(&self) -> Option<&str> {
85 self.conn_id.as_deref()
86 }
87
88 #[getter]
89 pub fn topic(&self) -> Option<&str> {
90 self.topic.as_deref()
91 }
92
93 #[getter]
94 pub fn req_id(&self) -> Option<&str> {
95 self.req_id.as_deref()
96 }
97}
98
99#[pymethods]
100impl BybitWebSocketClient {
101 #[staticmethod]
102 #[pyo3(name = "new_public")]
103 #[pyo3(signature = (product_type, environment, url=None, heartbeat=None))]
104 fn py_new_public(
105 product_type: BybitProductType,
106 environment: BybitEnvironment,
107 url: Option<String>,
108 heartbeat: Option<u64>,
109 ) -> Self {
110 Self::new_public_with(product_type, environment, url, heartbeat)
111 }
112
113 #[staticmethod]
114 #[pyo3(name = "new_private")]
115 #[pyo3(signature = (environment, api_key=None, api_secret=None, url=None, heartbeat=None))]
116 fn py_new_private(
117 environment: BybitEnvironment,
118 api_key: Option<String>,
119 api_secret: Option<String>,
120 url: Option<String>,
121 heartbeat: Option<u64>,
122 ) -> Self {
123 Self::new_private(environment, api_key, api_secret, url, heartbeat)
124 }
125
126 #[staticmethod]
127 #[pyo3(name = "new_trade")]
128 #[pyo3(signature = (environment, api_key=None, api_secret=None, url=None, heartbeat=None))]
129 fn py_new_trade(
130 environment: BybitEnvironment,
131 api_key: Option<String>,
132 api_secret: Option<String>,
133 url: Option<String>,
134 heartbeat: Option<u64>,
135 ) -> Self {
136 Self::new_trade(environment, api_key, api_secret, url, heartbeat)
137 }
138
139 #[getter]
140 #[pyo3(name = "api_key_masked")]
141 #[must_use]
142 pub fn py_api_key_masked(&self) -> Option<String> {
143 self.credential().map(|c| c.api_key_masked())
144 }
145
146 #[pyo3(name = "is_active")]
147 fn py_is_active(&self) -> bool {
148 self.is_active()
149 }
150
151 #[pyo3(name = "is_closed")]
152 fn py_is_closed(&self) -> bool {
153 self.is_closed()
154 }
155
156 #[pyo3(name = "subscription_count")]
157 fn py_subscription_count(&self) -> usize {
158 self.subscription_count()
159 }
160
161 #[pyo3(name = "cache_instrument")]
162 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
163 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
164 Ok(())
165 }
166
167 #[pyo3(name = "set_account_id")]
168 fn py_set_account_id(&mut self, account_id: AccountId) {
169 self.set_account_id(account_id);
170 }
171
172 #[pyo3(name = "set_mm_level")]
173 fn py_set_mm_level(&self, mm_level: u8) {
174 self.set_mm_level(mm_level);
175 }
176
177 #[pyo3(name = "set_bars_timestamp_on_close")]
178 fn py_set_bars_timestamp_on_close(&mut self, value: bool) {
179 self.set_bars_timestamp_on_close(value);
180 }
181
182 #[pyo3(name = "connect")]
183 fn py_connect<'py>(
184 &mut self,
185 py: Python<'py>,
186 callback: Py<PyAny>,
187 ) -> PyResult<Bound<'py, PyAny>> {
188 let mut client = self.clone();
189
190 pyo3_async_runtimes::tokio::future_into_py(py, async move {
191 client.connect().await.map_err(to_pyruntime_err)?;
192
193 let stream = client.stream();
194
195 get_runtime().spawn(async move {
196 tokio::pin!(stream);
197
198 while let Some(msg) = stream.next().await {
199 match msg {
200 NautilusWsMessage::Data(data_vec) => {
201 Python::attach(|py| {
202 for data in data_vec {
203 let py_obj = data_to_pycapsule(py, data);
204 call_python(py, &callback, py_obj);
205 }
206 });
207 }
208 NautilusWsMessage::Deltas(deltas) => {
209 Python::attach(|py| {
210 let py_obj = data_to_pycapsule(
211 py,
212 Data::Deltas(OrderBookDeltas_API::new(deltas)),
213 );
214 call_python(py, &callback, py_obj);
215 });
216 }
217 NautilusWsMessage::FundingRates(rates) => {
218 for rate in rates {
219 call_python_with_data(&callback, move |py| {
220 rate.into_py_any(py).map(|obj| obj.into_bound(py))
221 });
222 }
223 }
224 NautilusWsMessage::MarkPrices(prices) => {
225 for price in prices {
226 call_python_with_data(&callback, move |py| {
227 price.into_py_any(py).map(|obj| obj.into_bound(py))
228 });
229 }
230 }
231 NautilusWsMessage::IndexPrices(prices) => {
232 for price in prices {
233 call_python_with_data(&callback, move |py| {
234 price.into_py_any(py).map(|obj| obj.into_bound(py))
235 });
236 }
237 }
238 NautilusWsMessage::OrderStatusReports(reports) => {
239 for report in reports {
240 call_python_with_data(&callback, move |py| {
241 report.into_py_any(py).map(|obj| obj.into_bound(py))
242 });
243 }
244 }
245 NautilusWsMessage::FillReports(reports) => {
246 for report in reports {
247 call_python_with_data(&callback, move |py| {
248 report.into_py_any(py).map(|obj| obj.into_bound(py))
249 });
250 }
251 }
252 NautilusWsMessage::PositionStatusReport(report) => {
253 call_python_with_data(&callback, move |py| {
254 report.into_py_any(py).map(|obj| obj.into_bound(py))
255 });
256 }
257 NautilusWsMessage::AccountState(state) => {
258 call_python_with_data(&callback, move |py| {
259 state.into_py_any(py).map(|obj| obj.into_bound(py))
260 });
261 }
262 NautilusWsMessage::OrderRejected(event) => {
263 call_python_with_data(&callback, move |py| {
264 event.into_py_any(py).map(|obj| obj.into_bound(py))
265 });
266 }
267 NautilusWsMessage::OrderCancelRejected(event) => {
268 call_python_with_data(&callback, move |py| {
269 event.into_py_any(py).map(|obj| obj.into_bound(py))
270 });
271 }
272 NautilusWsMessage::OrderModifyRejected(event) => {
273 call_python_with_data(&callback, move |py| {
274 event.into_py_any(py).map(|obj| obj.into_bound(py))
275 });
276 }
277 NautilusWsMessage::Error(err) => {
278 call_python_with_data(&callback, move |py| {
279 err.into_py_any(py).map(|obj| obj.into_bound(py))
280 });
281 }
282 NautilusWsMessage::Reconnected => {
283 log::info!("WebSocket reconnected");
284 }
285 NautilusWsMessage::Authenticated => {
286 log::info!("WebSocket authenticated");
287 }
288 }
289 }
290 });
291
292 Ok(())
293 })
294 }
295
296 #[pyo3(name = "close")]
297 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
298 let mut client = self.clone();
299
300 pyo3_async_runtimes::tokio::future_into_py(py, async move {
301 if let Err(e) = client.close().await {
302 log::error!("Error on close: {e}");
303 }
304 Ok(())
305 })
306 }
307
308 #[pyo3(name = "subscribe")]
309 fn py_subscribe<'py>(
310 &self,
311 py: Python<'py>,
312 topics: Vec<String>,
313 ) -> PyResult<Bound<'py, PyAny>> {
314 let client = self.clone();
315
316 pyo3_async_runtimes::tokio::future_into_py(py, async move {
317 client.subscribe(topics).await.map_err(to_pyruntime_err)?;
318 Ok(())
319 })
320 }
321
322 #[pyo3(name = "unsubscribe")]
323 fn py_unsubscribe<'py>(
324 &self,
325 py: Python<'py>,
326 topics: Vec<String>,
327 ) -> PyResult<Bound<'py, PyAny>> {
328 let client = self.clone();
329
330 pyo3_async_runtimes::tokio::future_into_py(py, async move {
331 client.unsubscribe(topics).await.map_err(to_pyruntime_err)?;
332 Ok(())
333 })
334 }
335
336 #[pyo3(name = "subscribe_orderbook")]
337 fn py_subscribe_orderbook<'py>(
338 &self,
339 py: Python<'py>,
340 instrument_id: InstrumentId,
341 depth: u32,
342 ) -> PyResult<Bound<'py, PyAny>> {
343 let client = self.clone();
344
345 pyo3_async_runtimes::tokio::future_into_py(py, async move {
346 client
347 .subscribe_orderbook(instrument_id, depth)
348 .await
349 .map_err(to_pyruntime_err)?;
350 Ok(())
351 })
352 }
353
354 #[pyo3(name = "unsubscribe_orderbook")]
355 fn py_unsubscribe_orderbook<'py>(
356 &self,
357 py: Python<'py>,
358 instrument_id: InstrumentId,
359 depth: u32,
360 ) -> PyResult<Bound<'py, PyAny>> {
361 let client = self.clone();
362
363 pyo3_async_runtimes::tokio::future_into_py(py, async move {
364 client
365 .unsubscribe_orderbook(instrument_id, depth)
366 .await
367 .map_err(to_pyruntime_err)?;
368 Ok(())
369 })
370 }
371
372 #[pyo3(name = "subscribe_trades")]
373 fn py_subscribe_trades<'py>(
374 &self,
375 py: Python<'py>,
376 instrument_id: InstrumentId,
377 ) -> PyResult<Bound<'py, PyAny>> {
378 let client = self.clone();
379
380 pyo3_async_runtimes::tokio::future_into_py(py, async move {
381 client
382 .subscribe_trades(instrument_id)
383 .await
384 .map_err(to_pyruntime_err)?;
385 Ok(())
386 })
387 }
388
389 #[pyo3(name = "unsubscribe_trades")]
390 fn py_unsubscribe_trades<'py>(
391 &self,
392 py: Python<'py>,
393 instrument_id: InstrumentId,
394 ) -> PyResult<Bound<'py, PyAny>> {
395 let client = self.clone();
396
397 pyo3_async_runtimes::tokio::future_into_py(py, async move {
398 client
399 .unsubscribe_trades(instrument_id)
400 .await
401 .map_err(to_pyruntime_err)?;
402 Ok(())
403 })
404 }
405
406 #[pyo3(name = "subscribe_ticker")]
407 fn py_subscribe_ticker<'py>(
408 &self,
409 py: Python<'py>,
410 instrument_id: InstrumentId,
411 ) -> PyResult<Bound<'py, PyAny>> {
412 let client = self.clone();
413
414 pyo3_async_runtimes::tokio::future_into_py(py, async move {
415 client
416 .subscribe_ticker(instrument_id)
417 .await
418 .map_err(to_pyruntime_err)?;
419 Ok(())
420 })
421 }
422
423 #[pyo3(name = "unsubscribe_ticker")]
424 fn py_unsubscribe_ticker<'py>(
425 &self,
426 py: Python<'py>,
427 instrument_id: InstrumentId,
428 ) -> PyResult<Bound<'py, PyAny>> {
429 let client = self.clone();
430
431 pyo3_async_runtimes::tokio::future_into_py(py, async move {
432 client
433 .unsubscribe_ticker(instrument_id)
434 .await
435 .map_err(to_pyruntime_err)?;
436 Ok(())
437 })
438 }
439
440 #[pyo3(name = "subscribe_bars")]
441 fn py_subscribe_bars<'py>(
442 &self,
443 py: Python<'py>,
444 bar_type: BarType,
445 ) -> PyResult<Bound<'py, PyAny>> {
446 validate_bar_type(&bar_type).map_err(to_pyvalue_err)?;
447
448 let client = self.clone();
449 pyo3_async_runtimes::tokio::future_into_py(py, async move {
450 client
451 .subscribe_bars(bar_type)
452 .await
453 .map_err(to_pyruntime_err)?;
454 Ok(())
455 })
456 }
457
458 #[pyo3(name = "unsubscribe_bars")]
459 fn py_unsubscribe_bars<'py>(
460 &self,
461 py: Python<'py>,
462 bar_type: BarType,
463 ) -> PyResult<Bound<'py, PyAny>> {
464 validate_bar_type(&bar_type).map_err(to_pyvalue_err)?;
465
466 let client = self.clone();
467 pyo3_async_runtimes::tokio::future_into_py(py, async move {
468 client
469 .unsubscribe_bars(bar_type)
470 .await
471 .map_err(to_pyruntime_err)?;
472 Ok(())
473 })
474 }
475
476 #[pyo3(name = "subscribe_orders")]
477 fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
478 let client = self.clone();
479
480 pyo3_async_runtimes::tokio::future_into_py(py, async move {
481 client.subscribe_orders().await.map_err(to_pyruntime_err)?;
482 Ok(())
483 })
484 }
485
486 #[pyo3(name = "unsubscribe_orders")]
487 fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
488 let client = self.clone();
489
490 pyo3_async_runtimes::tokio::future_into_py(py, async move {
491 client
492 .unsubscribe_orders()
493 .await
494 .map_err(to_pyruntime_err)?;
495 Ok(())
496 })
497 }
498
499 #[pyo3(name = "subscribe_executions")]
500 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
501 let client = self.clone();
502
503 pyo3_async_runtimes::tokio::future_into_py(py, async move {
504 client
505 .subscribe_executions()
506 .await
507 .map_err(to_pyruntime_err)?;
508 Ok(())
509 })
510 }
511
512 #[pyo3(name = "unsubscribe_executions")]
513 fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
514 let client = self.clone();
515
516 pyo3_async_runtimes::tokio::future_into_py(py, async move {
517 client
518 .unsubscribe_executions()
519 .await
520 .map_err(to_pyruntime_err)?;
521 Ok(())
522 })
523 }
524
525 #[pyo3(name = "subscribe_positions")]
526 fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
527 let client = self.clone();
528
529 pyo3_async_runtimes::tokio::future_into_py(py, async move {
530 client
531 .subscribe_positions()
532 .await
533 .map_err(to_pyruntime_err)?;
534 Ok(())
535 })
536 }
537
538 #[pyo3(name = "unsubscribe_positions")]
539 fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
540 let client = self.clone();
541
542 pyo3_async_runtimes::tokio::future_into_py(py, async move {
543 client
544 .unsubscribe_positions()
545 .await
546 .map_err(to_pyruntime_err)?;
547 Ok(())
548 })
549 }
550
551 #[pyo3(name = "subscribe_wallet")]
552 fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
553 let client = self.clone();
554
555 pyo3_async_runtimes::tokio::future_into_py(py, async move {
556 client.subscribe_wallet().await.map_err(to_pyruntime_err)?;
557 Ok(())
558 })
559 }
560
561 #[pyo3(name = "unsubscribe_wallet")]
562 fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
563 let client = self.clone();
564
565 pyo3_async_runtimes::tokio::future_into_py(py, async move {
566 client
567 .unsubscribe_wallet()
568 .await
569 .map_err(to_pyruntime_err)?;
570 Ok(())
571 })
572 }
573
574 #[pyo3(name = "wait_until_active")]
575 fn py_wait_until_active<'py>(
576 &self,
577 py: Python<'py>,
578 timeout_secs: f64,
579 ) -> PyResult<Bound<'py, PyAny>> {
580 let client = self.clone();
581
582 pyo3_async_runtimes::tokio::future_into_py(py, async move {
583 client
584 .wait_until_active(timeout_secs)
585 .await
586 .map_err(to_pyruntime_err)?;
587 Ok(())
588 })
589 }
590
591 #[pyo3(name = "submit_order")]
592 #[pyo3(signature = (
593 product_type,
594 trader_id,
595 strategy_id,
596 instrument_id,
597 client_order_id,
598 order_side,
599 order_type,
600 quantity,
601 is_quote_quantity=false,
602 time_in_force=None,
603 price=None,
604 trigger_price=None,
605 post_only=None,
606 reduce_only=None,
607 is_leverage=false,
608 ))]
609 #[allow(clippy::too_many_arguments)]
610 fn py_submit_order<'py>(
611 &self,
612 py: Python<'py>,
613 product_type: BybitProductType,
614 trader_id: TraderId,
615 strategy_id: StrategyId,
616 instrument_id: InstrumentId,
617 client_order_id: ClientOrderId,
618 order_side: OrderSide,
619 order_type: OrderType,
620 quantity: Quantity,
621 is_quote_quantity: bool,
622 time_in_force: Option<TimeInForce>,
623 price: Option<Price>,
624 trigger_price: Option<Price>,
625 post_only: Option<bool>,
626 reduce_only: Option<bool>,
627 is_leverage: bool,
628 ) -> PyResult<Bound<'py, PyAny>> {
629 let client = self.clone();
630
631 pyo3_async_runtimes::tokio::future_into_py(py, async move {
632 client
633 .submit_order(
634 product_type,
635 trader_id,
636 strategy_id,
637 instrument_id,
638 client_order_id,
639 order_side,
640 order_type,
641 quantity,
642 is_quote_quantity,
643 time_in_force,
644 price,
645 trigger_price,
646 post_only,
647 reduce_only,
648 is_leverage,
649 )
650 .await
651 .map_err(to_pyruntime_err)?;
652 Ok(())
653 })
654 }
655
656 #[pyo3(name = "modify_order")]
657 #[pyo3(signature = (
658 product_type,
659 trader_id,
660 strategy_id,
661 instrument_id,
662 client_order_id,
663 venue_order_id=None,
664 quantity=None,
665 price=None,
666 ))]
667 #[allow(clippy::too_many_arguments)]
668 fn py_modify_order<'py>(
669 &self,
670 py: Python<'py>,
671 product_type: BybitProductType,
672 trader_id: TraderId,
673 strategy_id: StrategyId,
674 instrument_id: InstrumentId,
675 client_order_id: ClientOrderId,
676 venue_order_id: Option<VenueOrderId>,
677 quantity: Option<Quantity>,
678 price: Option<Price>,
679 ) -> PyResult<Bound<'py, PyAny>> {
680 let client = self.clone();
681
682 pyo3_async_runtimes::tokio::future_into_py(py, async move {
683 client
684 .modify_order(
685 product_type,
686 trader_id,
687 strategy_id,
688 instrument_id,
689 client_order_id,
690 venue_order_id,
691 quantity,
692 price,
693 )
694 .await
695 .map_err(to_pyruntime_err)?;
696 Ok(())
697 })
698 }
699
700 #[pyo3(name = "cancel_order")]
701 #[pyo3(signature = (
702 product_type,
703 trader_id,
704 strategy_id,
705 instrument_id,
706 client_order_id,
707 venue_order_id=None,
708 ))]
709 #[allow(clippy::too_many_arguments)]
710 fn py_cancel_order<'py>(
711 &self,
712 py: Python<'py>,
713 product_type: BybitProductType,
714 trader_id: TraderId,
715 strategy_id: StrategyId,
716 instrument_id: InstrumentId,
717 client_order_id: ClientOrderId,
718 venue_order_id: Option<VenueOrderId>,
719 ) -> PyResult<Bound<'py, PyAny>> {
720 let client = self.clone();
721
722 pyo3_async_runtimes::tokio::future_into_py(py, async move {
723 client
724 .cancel_order_by_id(
725 product_type,
726 trader_id,
727 strategy_id,
728 instrument_id,
729 client_order_id,
730 venue_order_id,
731 )
732 .await
733 .map_err(to_pyruntime_err)?;
734 Ok(())
735 })
736 }
737
738 #[pyo3(name = "build_place_order_params")]
739 #[pyo3(signature = (
740 product_type,
741 instrument_id,
742 client_order_id,
743 order_side,
744 order_type,
745 quantity,
746 is_quote_quantity=false,
747 time_in_force=None,
748 price=None,
749 trigger_price=None,
750 post_only=None,
751 reduce_only=None,
752 is_leverage=false,
753 take_profit=None,
754 stop_loss=None,
755 ))]
756 #[allow(clippy::too_many_arguments)]
757 fn py_build_place_order_params(
758 &self,
759 product_type: BybitProductType,
760 instrument_id: InstrumentId,
761 client_order_id: ClientOrderId,
762 order_side: OrderSide,
763 order_type: OrderType,
764 quantity: Quantity,
765 is_quote_quantity: bool,
766 time_in_force: Option<TimeInForce>,
767 price: Option<Price>,
768 trigger_price: Option<Price>,
769 post_only: Option<bool>,
770 reduce_only: Option<bool>,
771 is_leverage: bool,
772 take_profit: Option<Price>,
773 stop_loss: Option<Price>,
774 ) -> PyResult<BybitWsPlaceOrderParams> {
775 let params = self
776 .build_place_order_params(
777 product_type,
778 instrument_id,
779 client_order_id,
780 order_side,
781 order_type,
782 quantity,
783 is_quote_quantity,
784 time_in_force,
785 price,
786 trigger_price,
787 post_only,
788 reduce_only,
789 is_leverage,
790 take_profit,
791 stop_loss,
792 )
793 .map_err(to_pyruntime_err)?;
794 Ok(params.into())
795 }
796
797 #[pyo3(name = "batch_cancel_orders")]
798 fn py_batch_cancel_orders<'py>(
799 &self,
800 py: Python<'py>,
801 trader_id: TraderId,
802 strategy_id: StrategyId,
803 orders: Vec<BybitWsCancelOrderParams>,
804 ) -> PyResult<Bound<'py, PyAny>> {
805 let client = self.clone();
806
807 pyo3_async_runtimes::tokio::future_into_py(py, async move {
808 let order_params: Vec<_> = orders
809 .into_iter()
810 .map(|p| p.try_into())
811 .collect::<Result<Vec<_>, _>>()
812 .map_err(to_pyruntime_err)?;
813
814 client
815 .batch_cancel_orders(trader_id, strategy_id, order_params)
816 .await
817 .map_err(to_pyruntime_err)?;
818
819 Ok(())
820 })
821 }
822
823 #[pyo3(name = "build_amend_order_params")]
824 #[allow(clippy::too_many_arguments)]
825 fn py_build_amend_order_params(
826 &self,
827 product_type: BybitProductType,
828 instrument_id: InstrumentId,
829 venue_order_id: Option<VenueOrderId>,
830 client_order_id: Option<ClientOrderId>,
831 quantity: Option<Quantity>,
832 price: Option<Price>,
833 ) -> PyResult<crate::python::params::BybitWsAmendOrderParams> {
834 let params = self
835 .build_amend_order_params(
836 product_type,
837 instrument_id,
838 venue_order_id,
839 client_order_id,
840 quantity,
841 price,
842 )
843 .map_err(to_pyruntime_err)?;
844 Ok(params.into())
845 }
846
847 #[pyo3(name = "build_cancel_order_params")]
848 fn py_build_cancel_order_params(
849 &self,
850 product_type: BybitProductType,
851 instrument_id: InstrumentId,
852 venue_order_id: Option<VenueOrderId>,
853 client_order_id: Option<ClientOrderId>,
854 ) -> PyResult<crate::python::params::BybitWsCancelOrderParams> {
855 let params = self
856 .build_cancel_order_params(product_type, instrument_id, venue_order_id, client_order_id)
857 .map_err(to_pyruntime_err)?;
858 Ok(params.into())
859 }
860
861 #[pyo3(name = "batch_modify_orders")]
862 fn py_batch_modify_orders<'py>(
863 &self,
864 py: Python<'py>,
865 trader_id: TraderId,
866 strategy_id: StrategyId,
867 orders: Vec<BybitWsAmendOrderParams>,
868 ) -> PyResult<Bound<'py, PyAny>> {
869 let client = self.clone();
870
871 pyo3_async_runtimes::tokio::future_into_py(py, async move {
872 let order_params: Vec<_> = orders
873 .into_iter()
874 .map(|p| p.try_into())
875 .collect::<Result<Vec<_>, _>>()
876 .map_err(to_pyruntime_err)?;
877
878 client
879 .batch_amend_orders(trader_id, strategy_id, order_params)
880 .await
881 .map_err(to_pyruntime_err)?;
882
883 Ok(())
884 })
885 }
886
887 #[pyo3(name = "batch_place_orders")]
888 fn py_batch_place_orders<'py>(
889 &self,
890 py: Python<'py>,
891 trader_id: TraderId,
892 strategy_id: StrategyId,
893 orders: Vec<BybitWsPlaceOrderParams>,
894 ) -> PyResult<Bound<'py, PyAny>> {
895 let client = self.clone();
896
897 pyo3_async_runtimes::tokio::future_into_py(py, async move {
898 let order_params: Vec<_> = orders
899 .into_iter()
900 .map(|p| p.try_into())
901 .collect::<Result<Vec<_>, _>>()
902 .map_err(to_pyruntime_err)?;
903
904 client
905 .batch_place_orders(trader_id, strategy_id, order_params)
906 .await
907 .map_err(to_pyruntime_err)?;
908
909 Ok(())
910 })
911 }
912}
913
914fn call_python_with_data<F>(callback: &Py<PyAny>, data_fn: F)
915where
916 F: FnOnce(Python<'_>) -> PyResult<Bound<'_, PyAny>> + Send + 'static,
917{
918 Python::attach(|py| match data_fn(py) {
919 Ok(data) => {
920 if let Err(e) = callback.call1(py, (data,)) {
921 log::error!("Error calling Python callback: {e}");
922 }
923 }
924 Err(e) => {
925 log::error!("Error converting data to Python: {e}");
926 }
927 });
928}