nautilus_bybit/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Bybit WebSocket client.
17
18use std::{num::NonZero, sync::Arc};
19
20use futures_util::StreamExt;
21use nautilus_core::{nanos::UnixNanos, python::to_pyruntime_err, time::get_atomic_clock_realtime};
22use nautilus_model::{
23    data::{BarSpecification, BarType, Data, OrderBookDeltas_API},
24    enums::{AggregationSource, BarAggregation, PriceType},
25    identifiers::{AccountId, InstrumentId},
26    instruments::Instrument,
27    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
28};
29use pyo3::{IntoPyObjectExt, prelude::*};
30
31use crate::{
32    common::{
33        credential::Credential,
34        enums::{BybitEnvironment, BybitProductType},
35        parse::make_bybit_symbol,
36    },
37    websocket::{
38        client::BybitWebSocketClient,
39        messages::{BybitWebSocketError, BybitWebSocketMessage},
40        parse::{
41            parse_kline_topic, parse_millis_i64, parse_orderbook_deltas, parse_ws_account_state,
42            parse_ws_fill_report, parse_ws_kline_bar, parse_ws_order_status_report,
43            parse_ws_position_status_report, parse_ws_trade_tick,
44        },
45    },
46};
47
48#[pymethods]
49impl BybitWebSocketError {
50    fn __repr__(&self) -> String {
51        format!(
52            "BybitWebSocketError(code={}, message='{}', conn_id={:?}, topic={:?})",
53            self.code, self.message, self.conn_id, self.topic
54        )
55    }
56
57    #[getter]
58    pub fn code(&self) -> i64 {
59        self.code
60    }
61
62    #[getter]
63    pub fn message(&self) -> &str {
64        &self.message
65    }
66
67    #[getter]
68    pub fn conn_id(&self) -> Option<&str> {
69        self.conn_id.as_deref()
70    }
71
72    #[getter]
73    pub fn topic(&self) -> Option<&str> {
74        self.topic.as_deref()
75    }
76
77    #[getter]
78    pub fn req_id(&self) -> Option<&str> {
79        self.req_id.as_deref()
80    }
81}
82
83#[pymethods]
84impl BybitWebSocketClient {
85    #[staticmethod]
86    #[pyo3(name = "new_public")]
87    #[pyo3(signature = (product_type, environment, url=None, heartbeat=None))]
88    fn py_new_public(
89        product_type: BybitProductType,
90        environment: BybitEnvironment,
91        url: Option<String>,
92        heartbeat: Option<u64>,
93    ) -> Self {
94        Self::new_public_with(product_type, environment, url, heartbeat)
95    }
96
97    #[staticmethod]
98    #[pyo3(name = "new_private")]
99    #[pyo3(signature = (environment, api_key, api_secret, url=None, heartbeat=None))]
100    fn py_new_private(
101        environment: BybitEnvironment,
102        api_key: String,
103        api_secret: String,
104        url: Option<String>,
105        heartbeat: Option<u64>,
106    ) -> Self {
107        tracing::debug!(
108            "Creating private WebSocket client with API key: {}",
109            &api_key[..api_key.len().min(10)]
110        );
111        let credential = crate::common::credential::Credential::new(api_key, api_secret);
112        Self::new_private(environment, credential, url, heartbeat)
113    }
114
115    #[staticmethod]
116    #[pyo3(name = "new_trade")]
117    #[pyo3(signature = (environment, api_key, api_secret, url=None, heartbeat=None))]
118    fn py_new_trade(
119        environment: BybitEnvironment,
120        api_key: String,
121        api_secret: String,
122        url: Option<String>,
123        heartbeat: Option<u64>,
124    ) -> Self {
125        let credential = Credential::new(api_key, api_secret);
126        Self::new_trade(environment, credential, url, heartbeat)
127    }
128
129    #[pyo3(name = "is_active")]
130    fn py_is_active<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
131        let client = self.clone();
132
133        pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(client.is_active().await) })
134    }
135
136    #[pyo3(name = "subscription_count")]
137    fn py_subscription_count(&self) -> usize {
138        self.subscription_count()
139    }
140
141    #[pyo3(name = "add_instrument")]
142    fn py_add_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
143        self.add_instrument(pyobject_to_instrument_any(py, instrument)?);
144        Ok(())
145    }
146
147    #[pyo3(name = "set_account_id")]
148    fn py_set_account_id(&mut self, account_id: AccountId) {
149        self.set_account_id(account_id);
150    }
151
152    #[pyo3(name = "connect")]
153    fn py_connect<'py>(
154        &mut self,
155        py: Python<'py>,
156        callback: Py<PyAny>,
157    ) -> PyResult<Bound<'py, PyAny>> {
158        let mut client = self.clone();
159
160        pyo3_async_runtimes::tokio::future_into_py(py, async move {
161            client.connect().await.map_err(to_pyruntime_err)?;
162
163            let stream = client.stream();
164
165            let instruments = Arc::clone(client.instruments());
166            let account_id = client.account_id();
167            let product_type = client.product_type();
168            let quote_cache = Arc::clone(client.quote_cache());
169
170            tokio::spawn(async move {
171                tokio::pin!(stream);
172
173                let clock = get_atomic_clock_realtime();
174
175                while let Some(msg) = stream.next().await {
176                    match msg {
177                        BybitWebSocketMessage::Orderbook(msg) => {
178                            let raw_symbol = msg.data.s;
179
180                            let symbol = product_type
181                                .map(|pt| make_bybit_symbol(raw_symbol.as_str(), pt))
182                                .unwrap_or(raw_symbol);
183
184                            if let Some(instrument_entry) = instruments
185                                .iter()
186                                .find(|e| e.key().symbol.as_str() == symbol.as_str())
187                            {
188                                let instrument = instrument_entry.value();
189                                let ts_init = clock.get_time_ns();
190
191                                match parse_orderbook_deltas(&msg, instrument, ts_init) {
192                                    Ok(deltas) => {
193                                        Python::attach(|py| {
194                                            let py_obj = data_to_pycapsule(
195                                                py,
196                                                Data::Deltas(OrderBookDeltas_API::new(deltas)),
197                                            );
198                                            call_python(py, &callback, py_obj);
199                                        });
200                                    }
201                                    Err(e) => {
202                                        tracing::error!("Error parsing orderbook deltas: {e}");
203                                    }
204                                }
205                            } else {
206                                tracing::warn!(
207                                    raw_symbol = %raw_symbol,
208                                    full_symbol = %symbol,
209                                    "No instrument found for symbol"
210                                );
211                            }
212                        }
213                        BybitWebSocketMessage::TickerLinear(msg) => {
214                            let raw_symbol = msg.data.symbol;
215
216                            let symbol = product_type
217                                .map(|pt| make_bybit_symbol(raw_symbol.as_str(), pt))
218                                .unwrap_or(raw_symbol);
219
220                            if let Some(instrument_entry) = instruments
221                                .iter()
222                                .find(|e| e.key().symbol.as_str() == symbol.as_str())
223                            {
224                                let instrument = instrument_entry.value();
225                                let instrument_id = instrument.id();
226                                let ts_event = parse_millis_i64(msg.ts, "ticker.ts")
227                                    .unwrap_or_else(|_| get_atomic_clock_realtime().get_time_ns());
228                                let ts_init = clock.get_time_ns();
229
230                                match quote_cache.write().await.process_linear_ticker(
231                                    &msg.data,
232                                    instrument_id,
233                                    instrument,
234                                    ts_event,
235                                    ts_init,
236                                ) {
237                                    Ok(quote) => {
238                                        Python::attach(|py| {
239                                            let py_obj = data_to_pycapsule(py, Data::Quote(quote));
240                                            call_python(py, &callback, py_obj);
241                                        });
242                                    }
243                                    Err(e) => {
244                                        tracing::debug!("Skipping partial ticker update: {e}");
245                                    }
246                                }
247                            } else {
248                                tracing::warn!(
249                                    raw_symbol = %raw_symbol,
250                                    full_symbol = %symbol,
251                                    "No instrument found for symbol"
252                                );
253                            }
254                        }
255                        BybitWebSocketMessage::TickerOption(msg) => {
256                            let raw_symbol = &msg.data.symbol;
257
258                            let symbol = product_type
259                                .map(|pt| make_bybit_symbol(raw_symbol, pt))
260                                .unwrap_or_else(|| raw_symbol.as_str().into());
261
262                            if let Some(instrument_entry) = instruments
263                                .iter()
264                                .find(|e| e.key().symbol.as_str() == symbol.as_str())
265                            {
266                                let instrument = instrument_entry.value();
267                                let instrument_id = instrument.id();
268                                let ts_event = parse_millis_i64(msg.ts, "ticker.ts")
269                                    .unwrap_or_else(|_| get_atomic_clock_realtime().get_time_ns());
270                                let ts_init = clock.get_time_ns();
271
272                                match quote_cache.write().await.process_option_ticker(
273                                    &msg.data,
274                                    instrument_id,
275                                    instrument,
276                                    ts_event,
277                                    ts_init,
278                                ) {
279                                    Ok(quote) => {
280                                        Python::attach(|py| {
281                                            let py_obj = data_to_pycapsule(py, Data::Quote(quote));
282                                            call_python(py, &callback, py_obj);
283                                        });
284                                    }
285                                    Err(e) => {
286                                        tracing::debug!("Skipping partial ticker update: {e}");
287                                    }
288                                }
289                            } else {
290                                tracing::warn!(
291                                    raw_symbol = %raw_symbol,
292                                    full_symbol = %symbol,
293                                    "No instrument found for symbol"
294                                );
295                            }
296                        }
297                        BybitWebSocketMessage::Trade(msg) => {
298                            for trade in &msg.data {
299                                let raw_symbol = trade.s;
300
301                                let symbol = product_type
302                                    .map(|pt| make_bybit_symbol(raw_symbol.as_str(), pt))
303                                    .unwrap_or(raw_symbol);
304
305                                if let Some(instrument_entry) = instruments
306                                    .iter()
307                                    .find(|e| e.key().symbol.as_str() == symbol.as_str())
308                                {
309                                    let instrument = instrument_entry.value();
310                                    let ts_init = clock.get_time_ns();
311
312                                    match parse_ws_trade_tick(trade, instrument, ts_init) {
313                                        Ok(tick) => {
314                                            Python::attach(|py| {
315                                                let py_obj =
316                                                    data_to_pycapsule(py, Data::Trade(tick));
317                                                call_python(py, &callback, py_obj);
318                                            });
319                                        }
320                                        Err(e) => {
321                                            tracing::error!("Error parsing trade tick: {e}");
322                                        }
323                                    }
324                                } else {
325                                    tracing::warn!(
326                                        raw_symbol = %raw_symbol,
327                                        full_symbol = %symbol,
328                                        "No instrument found for symbol"
329                                    );
330                                }
331                            }
332                        }
333                        BybitWebSocketMessage::Kline(msg) => {
334                            let (interval_str, raw_symbol) = match parse_kline_topic(&msg.topic) {
335                                Ok(parts) => parts,
336                                Err(e) => {
337                                    tracing::warn!("Failed to parse kline topic: {e}");
338                                    continue;
339                                }
340                            };
341
342                            let symbol = product_type
343                                .map(|pt| make_bybit_symbol(raw_symbol, pt))
344                                .unwrap_or_else(|| raw_symbol.into());
345
346                            if let Some(instrument_entry) = instruments
347                                .iter()
348                                .find(|e| e.key().symbol.as_str() == symbol.as_str())
349                            {
350                                let instrument = instrument_entry.value();
351                                let ts_init = clock.get_time_ns();
352
353                                let (step, aggregation) = match interval_str.parse::<usize>() {
354                                    Ok(minutes) if minutes > 0 => (minutes, BarAggregation::Minute),
355                                    _ => {
356                                        // Handle other intervals (D, W, M) if needed
357                                        tracing::warn!(
358                                            "Unsupported kline interval: {}",
359                                            interval_str
360                                        );
361                                        continue;
362                                    }
363                                };
364
365                                if let Some(non_zero_step) = NonZero::new(step) {
366                                    let bar_spec = BarSpecification {
367                                        step: non_zero_step,
368                                        aggregation,
369                                        price_type: PriceType::Last,
370                                    };
371                                    let bar_type = BarType::new(
372                                        instrument.id(),
373                                        bar_spec,
374                                        AggregationSource::External,
375                                    );
376
377                                    for kline in &msg.data {
378                                        match parse_ws_kline_bar(
379                                            kline, instrument, bar_type, false, ts_init,
380                                        ) {
381                                            Ok(bar) => {
382                                                Python::attach(|py| {
383                                                    let py_obj =
384                                                        data_to_pycapsule(py, Data::Bar(bar));
385                                                    call_python(py, &callback, py_obj);
386                                                });
387                                            }
388                                            Err(e) => {
389                                                tracing::error!("Error parsing kline to bar: {e}");
390                                            }
391                                        }
392                                    }
393                                } else {
394                                    tracing::error!("Invalid step value: {}", step);
395                                }
396                            } else {
397                                tracing::warn!(
398                                    raw_symbol = %raw_symbol,
399                                    full_symbol = %symbol,
400                                    "No instrument found for symbol"
401                                );
402                            }
403                        }
404
405                        BybitWebSocketMessage::AccountOrder(msg) => {
406                            if let Some(account_id) = account_id {
407                                for order in &msg.data {
408                                    let raw_symbol = order.symbol;
409
410                                    let symbol =
411                                        make_bybit_symbol(raw_symbol.as_str(), order.category);
412
413                                    if let Some(instrument_entry) = instruments
414                                        .iter()
415                                        .find(|e| e.key().symbol.as_str() == symbol.as_str())
416                                    {
417                                        let instrument = instrument_entry.value();
418                                        let ts_init = clock.get_time_ns();
419
420                                        match parse_ws_order_status_report(
421                                            order, instrument, account_id, ts_init,
422                                        ) {
423                                            Ok(report) => {
424                                                Python::attach(|py| {
425                                                    if let Ok(py_obj) = report.into_py_any(py) {
426                                                        call_python(py, &callback, py_obj);
427                                                    }
428                                                });
429                                            }
430                                            Err(e) => {
431                                                tracing::error!(
432                                                    "Error parsing order status report: {e}"
433                                                );
434                                            }
435                                        }
436                                    } else {
437                                        tracing::warn!(
438                                            raw_symbol = %raw_symbol,
439                                            full_symbol = %symbol,
440                                            "No instrument found for symbol"
441                                        );
442                                    }
443                                }
444                            } else {
445                                tracing::error!(
446                                    "Received AccountOrder message but account_id is not set"
447                                );
448                            }
449                        }
450                        BybitWebSocketMessage::AccountExecution(msg) => {
451                            if let Some(account_id) = account_id {
452                                for execution in &msg.data {
453                                    let raw_symbol = execution.symbol;
454                                    let symbol =
455                                        make_bybit_symbol(raw_symbol.as_str(), execution.category);
456
457                                    if let Some(instrument_entry) = instruments
458                                        .iter()
459                                        .find(|e| e.key().symbol.as_str() == symbol.as_str())
460                                    {
461                                        let instrument = instrument_entry.value();
462                                        let ts_init = clock.get_time_ns();
463
464                                        match parse_ws_fill_report(
465                                            execution, account_id, instrument, ts_init,
466                                        ) {
467                                            Ok(report) => {
468                                                Python::attach(|py| {
469                                                    if let Ok(py_obj) = report.into_py_any(py) {
470                                                        call_python(py, &callback, py_obj);
471                                                    }
472                                                });
473                                            }
474                                            Err(e) => {
475                                                tracing::error!("Error parsing fill report: {e}");
476                                            }
477                                        }
478                                    } else {
479                                        tracing::warn!(
480                                            raw_symbol = %raw_symbol,
481                                            full_symbol = %symbol,
482                                            "No instrument found for symbol"
483                                        );
484                                    }
485                                }
486                            } else {
487                                tracing::error!(
488                                    "Received AccountExecution message but account_id is not set"
489                                );
490                            }
491                        }
492                        BybitWebSocketMessage::AccountWallet(msg) => {
493                            if let Some(account_id) = account_id {
494                                for wallet in &msg.data {
495                                    let ts_event =
496                                        UnixNanos::from(msg.creation_time as u64 * 1_000_000);
497                                    let ts_init = clock.get_time_ns();
498
499                                    match parse_ws_account_state(
500                                        wallet, account_id, ts_event, ts_init,
501                                    ) {
502                                        Ok(state) => {
503                                            Python::attach(|py| {
504                                                if let Ok(py_obj) = state.into_py_any(py) {
505                                                    call_python(py, &callback, py_obj);
506                                                }
507                                            });
508                                        }
509                                        Err(e) => {
510                                            tracing::error!("Error parsing account state: {e}");
511                                        }
512                                    }
513                                }
514                            } else {
515                                tracing::error!(
516                                    "Received AccountWallet message but account_id is not set"
517                                );
518                            }
519                        }
520                        BybitWebSocketMessage::AccountPosition(msg) => {
521                            if let Some(account_id) = account_id {
522                                for position in &msg.data {
523                                    let raw_symbol = position.symbol;
524
525                                    // For positions, find instrument by matching raw symbol prefix
526                                    // since position messages don't include product type category
527                                    if let Some(instrument_entry) = instruments.iter().find(|e| {
528                                        let inst_symbol = e.key().symbol.as_str();
529                                        // Check if instrument symbol starts with raw_symbol and has hyphen
530                                        inst_symbol.starts_with(raw_symbol.as_str())
531                                            && inst_symbol.len() > raw_symbol.len()
532                                            && inst_symbol.as_bytes().get(raw_symbol.len())
533                                                == Some(&b'-')
534                                    }) {
535                                        let instrument = instrument_entry.value();
536                                        let ts_init = clock.get_time_ns();
537
538                                        match parse_ws_position_status_report(
539                                            position, account_id, instrument, ts_init,
540                                        ) {
541                                            Ok(report) => {
542                                                Python::attach(|py| {
543                                                    if let Ok(py_obj) = report.into_py_any(py) {
544                                                        call_python(py, &callback, py_obj);
545                                                    }
546                                                });
547                                            }
548                                            Err(e) => {
549                                                tracing::error!(
550                                                    "Error parsing position status report: {e}"
551                                                );
552                                            }
553                                        }
554                                    } else {
555                                        tracing::warn!(
556                                            raw_symbol = %raw_symbol,
557                                            "No instrument found for symbol"
558                                        );
559                                    }
560                                }
561                            } else {
562                                tracing::error!(
563                                    "Received AccountPosition message but account_id is not set"
564                                );
565                            }
566                        }
567                        BybitWebSocketMessage::Error(msg) => {
568                            call_python_with_data(&callback, |py| {
569                                msg.into_py_any(py).map(|obj| obj.into_bound(py))
570                            });
571                        }
572                        BybitWebSocketMessage::Reconnected => {}
573                        BybitWebSocketMessage::Pong => {}
574                        BybitWebSocketMessage::Response(msg) => {
575                            tracing::debug!("Received response message: {:?}", msg);
576                        }
577                        BybitWebSocketMessage::Auth(msg) => {
578                            tracing::debug!("Received auth message: {:?}", msg);
579                        }
580                        BybitWebSocketMessage::Subscription(msg) => {
581                            tracing::debug!("Received subscription message: {:?}", msg);
582                        }
583                        BybitWebSocketMessage::Raw(value) => {
584                            tracing::debug!("Received raw/unhandled message, skipping: {value}");
585                        }
586                    }
587                }
588            });
589
590            Ok(())
591        })
592    }
593
594    #[pyo3(name = "close")]
595    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
596        let mut client = self.clone();
597
598        pyo3_async_runtimes::tokio::future_into_py(py, async move {
599            if let Err(e) = client.close().await {
600                tracing::error!("Error on close: {e}");
601            }
602            Ok(())
603        })
604    }
605
606    #[pyo3(name = "subscribe")]
607    fn py_subscribe<'py>(
608        &self,
609        py: Python<'py>,
610        topics: Vec<String>,
611    ) -> PyResult<Bound<'py, PyAny>> {
612        let client = self.clone();
613
614        pyo3_async_runtimes::tokio::future_into_py(py, async move {
615            client.subscribe(topics).await.map_err(to_pyruntime_err)?;
616            Ok(())
617        })
618    }
619
620    #[pyo3(name = "unsubscribe")]
621    fn py_unsubscribe<'py>(
622        &self,
623        py: Python<'py>,
624        topics: Vec<String>,
625    ) -> PyResult<Bound<'py, PyAny>> {
626        let client = self.clone();
627
628        pyo3_async_runtimes::tokio::future_into_py(py, async move {
629            client.unsubscribe(topics).await.map_err(to_pyruntime_err)?;
630            Ok(())
631        })
632    }
633
634    #[pyo3(name = "subscribe_orderbook")]
635    fn py_subscribe_orderbook<'py>(
636        &self,
637        py: Python<'py>,
638        instrument_id: InstrumentId,
639        depth: u32,
640    ) -> PyResult<Bound<'py, PyAny>> {
641        let client = self.clone();
642
643        pyo3_async_runtimes::tokio::future_into_py(py, async move {
644            client
645                .subscribe_orderbook(instrument_id, depth)
646                .await
647                .map_err(to_pyruntime_err)?;
648            Ok(())
649        })
650    }
651
652    #[pyo3(name = "unsubscribe_orderbook")]
653    fn py_unsubscribe_orderbook<'py>(
654        &self,
655        py: Python<'py>,
656        instrument_id: InstrumentId,
657        depth: u32,
658    ) -> PyResult<Bound<'py, PyAny>> {
659        let client = self.clone();
660
661        pyo3_async_runtimes::tokio::future_into_py(py, async move {
662            client
663                .unsubscribe_orderbook(instrument_id, depth)
664                .await
665                .map_err(to_pyruntime_err)?;
666            Ok(())
667        })
668    }
669
670    #[pyo3(name = "subscribe_trades")]
671    fn py_subscribe_trades<'py>(
672        &self,
673        py: Python<'py>,
674        instrument_id: InstrumentId,
675    ) -> PyResult<Bound<'py, PyAny>> {
676        let client = self.clone();
677
678        pyo3_async_runtimes::tokio::future_into_py(py, async move {
679            client
680                .subscribe_trades(instrument_id)
681                .await
682                .map_err(to_pyruntime_err)?;
683            Ok(())
684        })
685    }
686
687    #[pyo3(name = "unsubscribe_trades")]
688    fn py_unsubscribe_trades<'py>(
689        &self,
690        py: Python<'py>,
691        instrument_id: InstrumentId,
692    ) -> PyResult<Bound<'py, PyAny>> {
693        let client = self.clone();
694
695        pyo3_async_runtimes::tokio::future_into_py(py, async move {
696            client
697                .unsubscribe_trades(instrument_id)
698                .await
699                .map_err(to_pyruntime_err)?;
700            Ok(())
701        })
702    }
703
704    #[pyo3(name = "subscribe_ticker")]
705    fn py_subscribe_ticker<'py>(
706        &self,
707        py: Python<'py>,
708        instrument_id: InstrumentId,
709    ) -> PyResult<Bound<'py, PyAny>> {
710        let client = self.clone();
711
712        pyo3_async_runtimes::tokio::future_into_py(py, async move {
713            client
714                .subscribe_ticker(instrument_id)
715                .await
716                .map_err(to_pyruntime_err)?;
717            Ok(())
718        })
719    }
720
721    #[pyo3(name = "unsubscribe_ticker")]
722    fn py_unsubscribe_ticker<'py>(
723        &self,
724        py: Python<'py>,
725        instrument_id: InstrumentId,
726    ) -> PyResult<Bound<'py, PyAny>> {
727        let client = self.clone();
728
729        pyo3_async_runtimes::tokio::future_into_py(py, async move {
730            client
731                .unsubscribe_ticker(instrument_id)
732                .await
733                .map_err(to_pyruntime_err)?;
734            Ok(())
735        })
736    }
737
738    #[pyo3(name = "subscribe_klines")]
739    fn py_subscribe_klines<'py>(
740        &self,
741        py: Python<'py>,
742        instrument_id: InstrumentId,
743        interval: String,
744    ) -> PyResult<Bound<'py, PyAny>> {
745        let client = self.clone();
746
747        pyo3_async_runtimes::tokio::future_into_py(py, async move {
748            client
749                .subscribe_klines(instrument_id, interval)
750                .await
751                .map_err(to_pyruntime_err)?;
752            Ok(())
753        })
754    }
755
756    #[pyo3(name = "unsubscribe_klines")]
757    fn py_unsubscribe_klines<'py>(
758        &self,
759        py: Python<'py>,
760        instrument_id: InstrumentId,
761        interval: String,
762    ) -> PyResult<Bound<'py, PyAny>> {
763        let client = self.clone();
764
765        pyo3_async_runtimes::tokio::future_into_py(py, async move {
766            client
767                .unsubscribe_klines(instrument_id, interval)
768                .await
769                .map_err(to_pyruntime_err)?;
770            Ok(())
771        })
772    }
773
774    #[pyo3(name = "subscribe_orders")]
775    fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
776        let client = self.clone();
777
778        pyo3_async_runtimes::tokio::future_into_py(py, async move {
779            client.subscribe_orders().await.map_err(to_pyruntime_err)?;
780            Ok(())
781        })
782    }
783
784    #[pyo3(name = "unsubscribe_orders")]
785    fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
786        let client = self.clone();
787
788        pyo3_async_runtimes::tokio::future_into_py(py, async move {
789            client
790                .unsubscribe_orders()
791                .await
792                .map_err(to_pyruntime_err)?;
793            Ok(())
794        })
795    }
796
797    #[pyo3(name = "subscribe_executions")]
798    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
799        let client = self.clone();
800
801        pyo3_async_runtimes::tokio::future_into_py(py, async move {
802            client
803                .subscribe_executions()
804                .await
805                .map_err(to_pyruntime_err)?;
806            Ok(())
807        })
808    }
809
810    #[pyo3(name = "unsubscribe_executions")]
811    fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
812        let client = self.clone();
813
814        pyo3_async_runtimes::tokio::future_into_py(py, async move {
815            client
816                .unsubscribe_executions()
817                .await
818                .map_err(to_pyruntime_err)?;
819            Ok(())
820        })
821    }
822
823    #[pyo3(name = "subscribe_positions")]
824    fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
825        let client = self.clone();
826
827        pyo3_async_runtimes::tokio::future_into_py(py, async move {
828            client
829                .subscribe_positions()
830                .await
831                .map_err(to_pyruntime_err)?;
832            Ok(())
833        })
834    }
835
836    #[pyo3(name = "unsubscribe_positions")]
837    fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
838        let client = self.clone();
839
840        pyo3_async_runtimes::tokio::future_into_py(py, async move {
841            client
842                .unsubscribe_positions()
843                .await
844                .map_err(to_pyruntime_err)?;
845            Ok(())
846        })
847    }
848
849    #[pyo3(name = "subscribe_wallet")]
850    fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
851        let client = self.clone();
852
853        pyo3_async_runtimes::tokio::future_into_py(py, async move {
854            client.subscribe_wallet().await.map_err(to_pyruntime_err)?;
855            Ok(())
856        })
857    }
858
859    #[pyo3(name = "unsubscribe_wallet")]
860    fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
861        let client = self.clone();
862
863        pyo3_async_runtimes::tokio::future_into_py(py, async move {
864            client
865                .unsubscribe_wallet()
866                .await
867                .map_err(to_pyruntime_err)?;
868            Ok(())
869        })
870    }
871
872    #[pyo3(name = "wait_until_active")]
873    fn py_wait_until_active<'py>(
874        &self,
875        py: Python<'py>,
876        timeout_secs: f64,
877    ) -> PyResult<Bound<'py, PyAny>> {
878        let client = self.clone();
879
880        pyo3_async_runtimes::tokio::future_into_py(py, async move {
881            client
882                .wait_until_active(timeout_secs)
883                .await
884                .map_err(to_pyruntime_err)?;
885            Ok(())
886        })
887    }
888
889    #[pyo3(name = "submit_order")]
890    #[pyo3(signature = (
891        product_type,
892        instrument_id,
893        client_order_id,
894        order_side,
895        order_type,
896        quantity,
897        time_in_force=None,
898        price=None,
899        trigger_price=None,
900        post_only=None,
901        reduce_only=None,
902    ))]
903    #[allow(clippy::too_many_arguments)]
904    fn py_submit_order<'py>(
905        &self,
906        py: Python<'py>,
907        product_type: crate::common::enums::BybitProductType,
908        instrument_id: nautilus_model::identifiers::InstrumentId,
909        client_order_id: nautilus_model::identifiers::ClientOrderId,
910        order_side: nautilus_model::enums::OrderSide,
911        order_type: nautilus_model::enums::OrderType,
912        quantity: nautilus_model::types::Quantity,
913        time_in_force: Option<nautilus_model::enums::TimeInForce>,
914        price: Option<nautilus_model::types::Price>,
915        trigger_price: Option<nautilus_model::types::Price>,
916        post_only: Option<bool>,
917        reduce_only: Option<bool>,
918    ) -> PyResult<Bound<'py, PyAny>> {
919        let client = self.clone();
920
921        pyo3_async_runtimes::tokio::future_into_py(py, async move {
922            client
923                .submit_order(
924                    product_type,
925                    instrument_id,
926                    client_order_id,
927                    order_side,
928                    order_type,
929                    quantity,
930                    time_in_force,
931                    price,
932                    trigger_price,
933                    post_only,
934                    reduce_only,
935                )
936                .await
937                .map_err(to_pyruntime_err)?;
938            Ok(())
939        })
940    }
941
942    #[pyo3(name = "modify_order")]
943    #[pyo3(signature = (
944        product_type,
945        instrument_id,
946        venue_order_id=None,
947        client_order_id=None,
948        quantity=None,
949        price=None,
950    ))]
951    #[allow(clippy::too_many_arguments)]
952    fn py_modify_order<'py>(
953        &self,
954        py: Python<'py>,
955        product_type: crate::common::enums::BybitProductType,
956        instrument_id: nautilus_model::identifiers::InstrumentId,
957        venue_order_id: Option<nautilus_model::identifiers::VenueOrderId>,
958        client_order_id: Option<nautilus_model::identifiers::ClientOrderId>,
959        quantity: Option<nautilus_model::types::Quantity>,
960        price: Option<nautilus_model::types::Price>,
961    ) -> PyResult<Bound<'py, PyAny>> {
962        let client = self.clone();
963
964        pyo3_async_runtimes::tokio::future_into_py(py, async move {
965            client
966                .modify_order(
967                    product_type,
968                    instrument_id,
969                    venue_order_id,
970                    client_order_id,
971                    quantity,
972                    price,
973                )
974                .await
975                .map_err(to_pyruntime_err)?;
976            Ok(())
977        })
978    }
979
980    #[pyo3(name = "cancel_order")]
981    #[pyo3(signature = (
982        product_type,
983        instrument_id,
984        venue_order_id=None,
985        client_order_id=None,
986    ))]
987    fn py_cancel_order<'py>(
988        &self,
989        py: Python<'py>,
990        product_type: crate::common::enums::BybitProductType,
991        instrument_id: nautilus_model::identifiers::InstrumentId,
992        venue_order_id: Option<nautilus_model::identifiers::VenueOrderId>,
993        client_order_id: Option<nautilus_model::identifiers::ClientOrderId>,
994    ) -> PyResult<Bound<'py, PyAny>> {
995        let client = self.clone();
996
997        pyo3_async_runtimes::tokio::future_into_py(py, async move {
998            client
999                .cancel_order_by_id(product_type, instrument_id, venue_order_id, client_order_id)
1000                .await
1001                .map_err(to_pyruntime_err)?;
1002            Ok(())
1003        })
1004    }
1005}
1006
1007fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
1008    if let Err(e) = callback.call1(py, (py_obj,)) {
1009        tracing::error!("Error calling Python callback: {e}");
1010    }
1011}
1012
1013fn call_python_with_data<F>(callback: &Py<PyAny>, data_fn: F)
1014where
1015    F: FnOnce(Python<'_>) -> PyResult<Bound<'_, PyAny>> + Send + 'static,
1016{
1017    Python::attach(|py| match data_fn(py) {
1018        Ok(data) => {
1019            if let Err(e) = callback.call1(py, (data,)) {
1020                tracing::error!("Error calling Python callback: {e}");
1021            }
1022        }
1023        Err(e) => {
1024            tracing::error!("Error converting data to Python: {e}");
1025        }
1026    });
1027}