nautilus_bybit/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Bybit WebSocket client.
17
18use std::{num::NonZero, sync::Arc};
19
20use futures_util::StreamExt;
21use nautilus_core::{nanos::UnixNanos, python::to_pyruntime_err, time::get_atomic_clock_realtime};
22use nautilus_model::{
23    data::{BarSpecification, BarType, Data, OrderBookDeltas_API},
24    enums::{AggregationSource, BarAggregation, PriceType},
25    identifiers::{AccountId, InstrumentId},
26    instruments::Instrument,
27    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
28};
29use pyo3::{IntoPyObjectExt, prelude::*};
30
31use crate::{
32    common::{
33        credential::Credential,
34        enums::{BybitEnvironment, BybitProductType},
35        parse::make_bybit_symbol,
36    },
37    websocket::{
38        client::BybitWebSocketClient,
39        messages::{BybitWebSocketError, BybitWebSocketMessage},
40        parse::{
41            parse_kline_topic, parse_millis_i64, parse_orderbook_deltas, parse_ws_account_state,
42            parse_ws_fill_report, parse_ws_kline_bar, parse_ws_order_status_report,
43            parse_ws_position_status_report, parse_ws_trade_tick,
44        },
45    },
46};
47
48#[pymethods]
49impl BybitWebSocketError {
50    fn __repr__(&self) -> String {
51        format!(
52            "BybitWebSocketError(code={}, message='{}', conn_id={:?}, topic={:?})",
53            self.code, self.message, self.conn_id, self.topic
54        )
55    }
56
57    #[getter]
58    pub fn code(&self) -> i64 {
59        self.code
60    }
61
62    #[getter]
63    pub fn message(&self) -> &str {
64        &self.message
65    }
66
67    #[getter]
68    pub fn conn_id(&self) -> Option<&str> {
69        self.conn_id.as_deref()
70    }
71
72    #[getter]
73    pub fn topic(&self) -> Option<&str> {
74        self.topic.as_deref()
75    }
76
77    #[getter]
78    pub fn req_id(&self) -> Option<&str> {
79        self.req_id.as_deref()
80    }
81}
82
83#[pymethods]
84impl BybitWebSocketClient {
85    #[staticmethod]
86    #[pyo3(name = "new_public")]
87    #[pyo3(signature = (product_type, environment, url=None, heartbeat=None))]
88    fn py_new_public(
89        product_type: BybitProductType,
90        environment: BybitEnvironment,
91        url: Option<String>,
92        heartbeat: Option<u64>,
93    ) -> Self {
94        Self::new_public_with(product_type, environment, url, heartbeat)
95    }
96
97    #[staticmethod]
98    #[pyo3(name = "new_private")]
99    #[pyo3(signature = (environment, api_key, api_secret, url=None, heartbeat=None))]
100    fn py_new_private(
101        environment: BybitEnvironment,
102        api_key: String,
103        api_secret: String,
104        url: Option<String>,
105        heartbeat: Option<u64>,
106    ) -> Self {
107        tracing::debug!(
108            "Creating private WebSocket client with API key: {}",
109            &api_key[..api_key.len().min(10)]
110        );
111        let credential = crate::common::credential::Credential::new(api_key, api_secret);
112        Self::new_private(environment, credential, url, heartbeat)
113    }
114
115    #[staticmethod]
116    #[pyo3(name = "new_trade")]
117    #[pyo3(signature = (environment, api_key, api_secret, url=None, heartbeat=None))]
118    fn py_new_trade(
119        environment: BybitEnvironment,
120        api_key: String,
121        api_secret: String,
122        url: Option<String>,
123        heartbeat: Option<u64>,
124    ) -> Self {
125        let credential = Credential::new(api_key, api_secret);
126        Self::new_trade(environment, credential, url, heartbeat)
127    }
128
129    #[pyo3(name = "is_active")]
130    fn py_is_active<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
131        let client = self.clone();
132
133        pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(client.is_active().await) })
134    }
135
136    #[pyo3(name = "subscription_count")]
137    fn py_subscription_count(&self) -> usize {
138        self.subscription_count()
139    }
140
141    #[pyo3(name = "add_instrument")]
142    fn py_add_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
143        self.add_instrument(pyobject_to_instrument_any(py, instrument)?);
144        Ok(())
145    }
146
147    #[pyo3(name = "set_account_id")]
148    fn py_set_account_id(&mut self, account_id: AccountId) {
149        self.set_account_id(account_id);
150    }
151
152    #[pyo3(name = "connect")]
153    fn py_connect<'py>(
154        &mut self,
155        py: Python<'py>,
156        callback: Py<PyAny>,
157    ) -> PyResult<Bound<'py, PyAny>> {
158        let mut client = self.clone();
159
160        pyo3_async_runtimes::tokio::future_into_py(py, async move {
161            client.connect().await.map_err(to_pyruntime_err)?;
162
163            let stream = client.stream();
164
165            let instruments = Arc::clone(client.instruments());
166            let account_id = client.account_id();
167            let product_type = client.product_type();
168            let quote_cache = Arc::clone(client.quote_cache());
169
170            tokio::spawn(async move {
171                tokio::pin!(stream);
172
173                let clock = get_atomic_clock_realtime();
174
175                while let Some(msg) = stream.next().await {
176                    match msg {
177                        BybitWebSocketMessage::Orderbook(msg) => {
178                            let raw_symbol = msg.data.s;
179
180                            let symbol = product_type.map_or(raw_symbol, |pt| {
181                                make_bybit_symbol(raw_symbol.as_str(), pt)
182                            });
183
184                            if let Some(instrument_entry) = instruments
185                                .iter()
186                                .find(|e| e.key().symbol.as_str() == symbol.as_str())
187                            {
188                                let instrument = instrument_entry.value();
189                                let ts_init = clock.get_time_ns();
190
191                                match parse_orderbook_deltas(&msg, instrument, ts_init) {
192                                    Ok(deltas) => {
193                                        Python::attach(|py| {
194                                            let py_obj = data_to_pycapsule(
195                                                py,
196                                                Data::Deltas(OrderBookDeltas_API::new(deltas)),
197                                            );
198                                            call_python(py, &callback, py_obj);
199                                        });
200                                    }
201                                    Err(e) => {
202                                        tracing::error!("Error parsing orderbook deltas: {e}");
203                                    }
204                                }
205                            } else {
206                                tracing::warn!(
207                                    raw_symbol = %raw_symbol,
208                                    full_symbol = %symbol,
209                                    "No instrument found for symbol"
210                                );
211                            }
212                        }
213                        BybitWebSocketMessage::TickerLinear(msg) => {
214                            let raw_symbol = msg.data.symbol;
215
216                            let symbol = product_type.map_or(raw_symbol, |pt| {
217                                make_bybit_symbol(raw_symbol.as_str(), pt)
218                            });
219
220                            if let Some(instrument_entry) = instruments
221                                .iter()
222                                .find(|e| e.key().symbol.as_str() == symbol.as_str())
223                            {
224                                let instrument = instrument_entry.value();
225                                let instrument_id = instrument.id();
226                                let ts_event = parse_millis_i64(msg.ts, "ticker.ts")
227                                    .unwrap_or_else(|_| get_atomic_clock_realtime().get_time_ns());
228                                let ts_init = clock.get_time_ns();
229
230                                match quote_cache.write().await.process_linear_ticker(
231                                    &msg.data,
232                                    instrument_id,
233                                    instrument,
234                                    ts_event,
235                                    ts_init,
236                                ) {
237                                    Ok(quote) => {
238                                        Python::attach(|py| {
239                                            let py_obj = data_to_pycapsule(py, Data::Quote(quote));
240                                            call_python(py, &callback, py_obj);
241                                        });
242                                    }
243                                    Err(e) => {
244                                        tracing::debug!("Skipping partial ticker update: {e}");
245                                    }
246                                }
247                            } else {
248                                tracing::warn!(
249                                    raw_symbol = %raw_symbol,
250                                    full_symbol = %symbol,
251                                    "No instrument found for symbol"
252                                );
253                            }
254                        }
255                        BybitWebSocketMessage::TickerOption(msg) => {
256                            let raw_symbol = &msg.data.symbol;
257
258                            let symbol = product_type.map_or_else(
259                                || raw_symbol.as_str().into(),
260                                |pt| make_bybit_symbol(raw_symbol, pt),
261                            );
262
263                            if let Some(instrument_entry) = instruments
264                                .iter()
265                                .find(|e| e.key().symbol.as_str() == symbol.as_str())
266                            {
267                                let instrument = instrument_entry.value();
268                                let instrument_id = instrument.id();
269                                let ts_event = parse_millis_i64(msg.ts, "ticker.ts")
270                                    .unwrap_or_else(|_| get_atomic_clock_realtime().get_time_ns());
271                                let ts_init = clock.get_time_ns();
272
273                                match quote_cache.write().await.process_option_ticker(
274                                    &msg.data,
275                                    instrument_id,
276                                    instrument,
277                                    ts_event,
278                                    ts_init,
279                                ) {
280                                    Ok(quote) => {
281                                        Python::attach(|py| {
282                                            let py_obj = data_to_pycapsule(py, Data::Quote(quote));
283                                            call_python(py, &callback, py_obj);
284                                        });
285                                    }
286                                    Err(e) => {
287                                        tracing::debug!("Skipping partial ticker update: {e}");
288                                    }
289                                }
290                            } else {
291                                tracing::warn!(
292                                    raw_symbol = %raw_symbol,
293                                    full_symbol = %symbol,
294                                    "No instrument found for symbol"
295                                );
296                            }
297                        }
298                        BybitWebSocketMessage::Trade(msg) => {
299                            for trade in &msg.data {
300                                let raw_symbol = trade.s;
301
302                                let symbol = product_type.map_or(raw_symbol, |pt| {
303                                    make_bybit_symbol(raw_symbol.as_str(), pt)
304                                });
305
306                                if let Some(instrument_entry) = instruments
307                                    .iter()
308                                    .find(|e| e.key().symbol.as_str() == symbol.as_str())
309                                {
310                                    let instrument = instrument_entry.value();
311                                    let ts_init = clock.get_time_ns();
312
313                                    match parse_ws_trade_tick(trade, instrument, ts_init) {
314                                        Ok(tick) => {
315                                            Python::attach(|py| {
316                                                let py_obj =
317                                                    data_to_pycapsule(py, Data::Trade(tick));
318                                                call_python(py, &callback, py_obj);
319                                            });
320                                        }
321                                        Err(e) => {
322                                            tracing::error!("Error parsing trade tick: {e}");
323                                        }
324                                    }
325                                } else {
326                                    tracing::warn!(
327                                        raw_symbol = %raw_symbol,
328                                        full_symbol = %symbol,
329                                        "No instrument found for symbol"
330                                    );
331                                }
332                            }
333                        }
334                        BybitWebSocketMessage::Kline(msg) => {
335                            let (interval_str, raw_symbol) = match parse_kline_topic(&msg.topic) {
336                                Ok(parts) => parts,
337                                Err(e) => {
338                                    tracing::warn!("Failed to parse kline topic: {e}");
339                                    continue;
340                                }
341                            };
342
343                            let symbol = product_type.map_or_else(
344                                || raw_symbol.into(),
345                                |pt| make_bybit_symbol(raw_symbol, pt),
346                            );
347
348                            if let Some(instrument_entry) = instruments
349                                .iter()
350                                .find(|e| e.key().symbol.as_str() == symbol.as_str())
351                            {
352                                let instrument = instrument_entry.value();
353                                let ts_init = clock.get_time_ns();
354
355                                let (step, aggregation) = match interval_str.parse::<usize>() {
356                                    Ok(minutes) if minutes > 0 => (minutes, BarAggregation::Minute),
357                                    _ => {
358                                        // Handle other intervals (D, W, M) if needed
359                                        tracing::warn!(
360                                            "Unsupported kline interval: {}",
361                                            interval_str
362                                        );
363                                        continue;
364                                    }
365                                };
366
367                                if let Some(non_zero_step) = NonZero::new(step) {
368                                    let bar_spec = BarSpecification {
369                                        step: non_zero_step,
370                                        aggregation,
371                                        price_type: PriceType::Last,
372                                    };
373                                    let bar_type = BarType::new(
374                                        instrument.id(),
375                                        bar_spec,
376                                        AggregationSource::External,
377                                    );
378
379                                    for kline in &msg.data {
380                                        match parse_ws_kline_bar(
381                                            kline, instrument, bar_type, false, ts_init,
382                                        ) {
383                                            Ok(bar) => {
384                                                Python::attach(|py| {
385                                                    let py_obj =
386                                                        data_to_pycapsule(py, Data::Bar(bar));
387                                                    call_python(py, &callback, py_obj);
388                                                });
389                                            }
390                                            Err(e) => {
391                                                tracing::error!("Error parsing kline to bar: {e}");
392                                            }
393                                        }
394                                    }
395                                } else {
396                                    tracing::error!("Invalid step value: {}", step);
397                                }
398                            } else {
399                                tracing::warn!(
400                                    raw_symbol = %raw_symbol,
401                                    full_symbol = %symbol,
402                                    "No instrument found for symbol"
403                                );
404                            }
405                        }
406
407                        BybitWebSocketMessage::AccountOrder(msg) => {
408                            if let Some(account_id) = account_id {
409                                for order in &msg.data {
410                                    let raw_symbol = order.symbol;
411
412                                    let symbol =
413                                        make_bybit_symbol(raw_symbol.as_str(), order.category);
414
415                                    if let Some(instrument_entry) = instruments
416                                        .iter()
417                                        .find(|e| e.key().symbol.as_str() == symbol.as_str())
418                                    {
419                                        let instrument = instrument_entry.value();
420                                        let ts_init = clock.get_time_ns();
421
422                                        match parse_ws_order_status_report(
423                                            order, instrument, account_id, ts_init,
424                                        ) {
425                                            Ok(report) => {
426                                                Python::attach(|py| {
427                                                    if let Ok(py_obj) = report.into_py_any(py) {
428                                                        call_python(py, &callback, py_obj);
429                                                    }
430                                                });
431                                            }
432                                            Err(e) => {
433                                                tracing::error!(
434                                                    "Error parsing order status report: {e}"
435                                                );
436                                            }
437                                        }
438                                    } else {
439                                        tracing::warn!(
440                                            raw_symbol = %raw_symbol,
441                                            full_symbol = %symbol,
442                                            "No instrument found for symbol"
443                                        );
444                                    }
445                                }
446                            } else {
447                                tracing::error!(
448                                    "Received AccountOrder message but account_id is not set"
449                                );
450                            }
451                        }
452                        BybitWebSocketMessage::AccountExecution(msg) => {
453                            if let Some(account_id) = account_id {
454                                for execution in &msg.data {
455                                    let raw_symbol = execution.symbol;
456                                    let symbol =
457                                        make_bybit_symbol(raw_symbol.as_str(), execution.category);
458
459                                    if let Some(instrument_entry) = instruments
460                                        .iter()
461                                        .find(|e| e.key().symbol.as_str() == symbol.as_str())
462                                    {
463                                        let instrument = instrument_entry.value();
464                                        let ts_init = clock.get_time_ns();
465
466                                        match parse_ws_fill_report(
467                                            execution, account_id, instrument, ts_init,
468                                        ) {
469                                            Ok(report) => {
470                                                Python::attach(|py| {
471                                                    if let Ok(py_obj) = report.into_py_any(py) {
472                                                        call_python(py, &callback, py_obj);
473                                                    }
474                                                });
475                                            }
476                                            Err(e) => {
477                                                tracing::error!("Error parsing fill report: {e}");
478                                            }
479                                        }
480                                    } else {
481                                        tracing::warn!(
482                                            raw_symbol = %raw_symbol,
483                                            full_symbol = %symbol,
484                                            "No instrument found for symbol"
485                                        );
486                                    }
487                                }
488                            } else {
489                                tracing::error!(
490                                    "Received AccountExecution message but account_id is not set"
491                                );
492                            }
493                        }
494                        BybitWebSocketMessage::AccountWallet(msg) => {
495                            if let Some(account_id) = account_id {
496                                for wallet in &msg.data {
497                                    let ts_event =
498                                        UnixNanos::from(msg.creation_time as u64 * 1_000_000);
499                                    let ts_init = clock.get_time_ns();
500
501                                    match parse_ws_account_state(
502                                        wallet, account_id, ts_event, ts_init,
503                                    ) {
504                                        Ok(state) => {
505                                            Python::attach(|py| {
506                                                if let Ok(py_obj) = state.into_py_any(py) {
507                                                    call_python(py, &callback, py_obj);
508                                                }
509                                            });
510                                        }
511                                        Err(e) => {
512                                            tracing::error!("Error parsing account state: {e}");
513                                        }
514                                    }
515                                }
516                            } else {
517                                tracing::error!(
518                                    "Received AccountWallet message but account_id is not set"
519                                );
520                            }
521                        }
522                        BybitWebSocketMessage::AccountPosition(msg) => {
523                            if let Some(account_id) = account_id {
524                                for position in &msg.data {
525                                    let raw_symbol = position.symbol;
526
527                                    // For positions, find instrument by matching raw symbol prefix
528                                    // since position messages don't include product type category
529                                    if let Some(instrument_entry) = instruments.iter().find(|e| {
530                                        let inst_symbol = e.key().symbol.as_str();
531                                        // Check if instrument symbol starts with raw_symbol and has hyphen
532                                        inst_symbol.starts_with(raw_symbol.as_str())
533                                            && inst_symbol.len() > raw_symbol.len()
534                                            && inst_symbol.as_bytes().get(raw_symbol.len())
535                                                == Some(&b'-')
536                                    }) {
537                                        let instrument = instrument_entry.value();
538                                        let ts_init = clock.get_time_ns();
539
540                                        match parse_ws_position_status_report(
541                                            position, account_id, instrument, ts_init,
542                                        ) {
543                                            Ok(report) => {
544                                                Python::attach(|py| {
545                                                    if let Ok(py_obj) = report.into_py_any(py) {
546                                                        call_python(py, &callback, py_obj);
547                                                    }
548                                                });
549                                            }
550                                            Err(e) => {
551                                                tracing::error!(
552                                                    "Error parsing position status report: {e}"
553                                                );
554                                            }
555                                        }
556                                    } else {
557                                        tracing::warn!(
558                                            raw_symbol = %raw_symbol,
559                                            "No instrument found for symbol"
560                                        );
561                                    }
562                                }
563                            } else {
564                                tracing::error!(
565                                    "Received AccountPosition message but account_id is not set"
566                                );
567                            }
568                        }
569                        BybitWebSocketMessage::Error(msg) => {
570                            call_python_with_data(&callback, |py| {
571                                msg.into_py_any(py).map(|obj| obj.into_bound(py))
572                            });
573                        }
574                        BybitWebSocketMessage::Reconnected => {}
575                        BybitWebSocketMessage::Pong => {}
576                        BybitWebSocketMessage::Response(msg) => {
577                            tracing::debug!("Received response message: {:?}", msg);
578                        }
579                        BybitWebSocketMessage::Auth(msg) => {
580                            tracing::debug!("Received auth message: {:?}", msg);
581                        }
582                        BybitWebSocketMessage::Subscription(msg) => {
583                            tracing::debug!("Received subscription message: {:?}", msg);
584                        }
585                        BybitWebSocketMessage::Raw(value) => {
586                            tracing::debug!("Received raw/unhandled message, skipping: {value}");
587                        }
588                    }
589                }
590            });
591
592            Ok(())
593        })
594    }
595
596    #[pyo3(name = "close")]
597    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
598        let mut client = self.clone();
599
600        pyo3_async_runtimes::tokio::future_into_py(py, async move {
601            if let Err(e) = client.close().await {
602                tracing::error!("Error on close: {e}");
603            }
604            Ok(())
605        })
606    }
607
608    #[pyo3(name = "subscribe")]
609    fn py_subscribe<'py>(
610        &self,
611        py: Python<'py>,
612        topics: Vec<String>,
613    ) -> PyResult<Bound<'py, PyAny>> {
614        let client = self.clone();
615
616        pyo3_async_runtimes::tokio::future_into_py(py, async move {
617            client.subscribe(topics).await.map_err(to_pyruntime_err)?;
618            Ok(())
619        })
620    }
621
622    #[pyo3(name = "unsubscribe")]
623    fn py_unsubscribe<'py>(
624        &self,
625        py: Python<'py>,
626        topics: Vec<String>,
627    ) -> PyResult<Bound<'py, PyAny>> {
628        let client = self.clone();
629
630        pyo3_async_runtimes::tokio::future_into_py(py, async move {
631            client.unsubscribe(topics).await.map_err(to_pyruntime_err)?;
632            Ok(())
633        })
634    }
635
636    #[pyo3(name = "subscribe_orderbook")]
637    fn py_subscribe_orderbook<'py>(
638        &self,
639        py: Python<'py>,
640        instrument_id: InstrumentId,
641        depth: u32,
642    ) -> PyResult<Bound<'py, PyAny>> {
643        let client = self.clone();
644
645        pyo3_async_runtimes::tokio::future_into_py(py, async move {
646            client
647                .subscribe_orderbook(instrument_id, depth)
648                .await
649                .map_err(to_pyruntime_err)?;
650            Ok(())
651        })
652    }
653
654    #[pyo3(name = "unsubscribe_orderbook")]
655    fn py_unsubscribe_orderbook<'py>(
656        &self,
657        py: Python<'py>,
658        instrument_id: InstrumentId,
659        depth: u32,
660    ) -> PyResult<Bound<'py, PyAny>> {
661        let client = self.clone();
662
663        pyo3_async_runtimes::tokio::future_into_py(py, async move {
664            client
665                .unsubscribe_orderbook(instrument_id, depth)
666                .await
667                .map_err(to_pyruntime_err)?;
668            Ok(())
669        })
670    }
671
672    #[pyo3(name = "subscribe_trades")]
673    fn py_subscribe_trades<'py>(
674        &self,
675        py: Python<'py>,
676        instrument_id: InstrumentId,
677    ) -> PyResult<Bound<'py, PyAny>> {
678        let client = self.clone();
679
680        pyo3_async_runtimes::tokio::future_into_py(py, async move {
681            client
682                .subscribe_trades(instrument_id)
683                .await
684                .map_err(to_pyruntime_err)?;
685            Ok(())
686        })
687    }
688
689    #[pyo3(name = "unsubscribe_trades")]
690    fn py_unsubscribe_trades<'py>(
691        &self,
692        py: Python<'py>,
693        instrument_id: InstrumentId,
694    ) -> PyResult<Bound<'py, PyAny>> {
695        let client = self.clone();
696
697        pyo3_async_runtimes::tokio::future_into_py(py, async move {
698            client
699                .unsubscribe_trades(instrument_id)
700                .await
701                .map_err(to_pyruntime_err)?;
702            Ok(())
703        })
704    }
705
706    #[pyo3(name = "subscribe_ticker")]
707    fn py_subscribe_ticker<'py>(
708        &self,
709        py: Python<'py>,
710        instrument_id: InstrumentId,
711    ) -> PyResult<Bound<'py, PyAny>> {
712        let client = self.clone();
713
714        pyo3_async_runtimes::tokio::future_into_py(py, async move {
715            client
716                .subscribe_ticker(instrument_id)
717                .await
718                .map_err(to_pyruntime_err)?;
719            Ok(())
720        })
721    }
722
723    #[pyo3(name = "unsubscribe_ticker")]
724    fn py_unsubscribe_ticker<'py>(
725        &self,
726        py: Python<'py>,
727        instrument_id: InstrumentId,
728    ) -> PyResult<Bound<'py, PyAny>> {
729        let client = self.clone();
730
731        pyo3_async_runtimes::tokio::future_into_py(py, async move {
732            client
733                .unsubscribe_ticker(instrument_id)
734                .await
735                .map_err(to_pyruntime_err)?;
736            Ok(())
737        })
738    }
739
740    #[pyo3(name = "subscribe_klines")]
741    fn py_subscribe_klines<'py>(
742        &self,
743        py: Python<'py>,
744        instrument_id: InstrumentId,
745        interval: String,
746    ) -> PyResult<Bound<'py, PyAny>> {
747        let client = self.clone();
748
749        pyo3_async_runtimes::tokio::future_into_py(py, async move {
750            client
751                .subscribe_klines(instrument_id, interval)
752                .await
753                .map_err(to_pyruntime_err)?;
754            Ok(())
755        })
756    }
757
758    #[pyo3(name = "unsubscribe_klines")]
759    fn py_unsubscribe_klines<'py>(
760        &self,
761        py: Python<'py>,
762        instrument_id: InstrumentId,
763        interval: String,
764    ) -> PyResult<Bound<'py, PyAny>> {
765        let client = self.clone();
766
767        pyo3_async_runtimes::tokio::future_into_py(py, async move {
768            client
769                .unsubscribe_klines(instrument_id, interval)
770                .await
771                .map_err(to_pyruntime_err)?;
772            Ok(())
773        })
774    }
775
776    #[pyo3(name = "subscribe_orders")]
777    fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
778        let client = self.clone();
779
780        pyo3_async_runtimes::tokio::future_into_py(py, async move {
781            client.subscribe_orders().await.map_err(to_pyruntime_err)?;
782            Ok(())
783        })
784    }
785
786    #[pyo3(name = "unsubscribe_orders")]
787    fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
788        let client = self.clone();
789
790        pyo3_async_runtimes::tokio::future_into_py(py, async move {
791            client
792                .unsubscribe_orders()
793                .await
794                .map_err(to_pyruntime_err)?;
795            Ok(())
796        })
797    }
798
799    #[pyo3(name = "subscribe_executions")]
800    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
801        let client = self.clone();
802
803        pyo3_async_runtimes::tokio::future_into_py(py, async move {
804            client
805                .subscribe_executions()
806                .await
807                .map_err(to_pyruntime_err)?;
808            Ok(())
809        })
810    }
811
812    #[pyo3(name = "unsubscribe_executions")]
813    fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
814        let client = self.clone();
815
816        pyo3_async_runtimes::tokio::future_into_py(py, async move {
817            client
818                .unsubscribe_executions()
819                .await
820                .map_err(to_pyruntime_err)?;
821            Ok(())
822        })
823    }
824
825    #[pyo3(name = "subscribe_positions")]
826    fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
827        let client = self.clone();
828
829        pyo3_async_runtimes::tokio::future_into_py(py, async move {
830            client
831                .subscribe_positions()
832                .await
833                .map_err(to_pyruntime_err)?;
834            Ok(())
835        })
836    }
837
838    #[pyo3(name = "unsubscribe_positions")]
839    fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
840        let client = self.clone();
841
842        pyo3_async_runtimes::tokio::future_into_py(py, async move {
843            client
844                .unsubscribe_positions()
845                .await
846                .map_err(to_pyruntime_err)?;
847            Ok(())
848        })
849    }
850
851    #[pyo3(name = "subscribe_wallet")]
852    fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
853        let client = self.clone();
854
855        pyo3_async_runtimes::tokio::future_into_py(py, async move {
856            client.subscribe_wallet().await.map_err(to_pyruntime_err)?;
857            Ok(())
858        })
859    }
860
861    #[pyo3(name = "unsubscribe_wallet")]
862    fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
863        let client = self.clone();
864
865        pyo3_async_runtimes::tokio::future_into_py(py, async move {
866            client
867                .unsubscribe_wallet()
868                .await
869                .map_err(to_pyruntime_err)?;
870            Ok(())
871        })
872    }
873
874    #[pyo3(name = "wait_until_active")]
875    fn py_wait_until_active<'py>(
876        &self,
877        py: Python<'py>,
878        timeout_secs: f64,
879    ) -> PyResult<Bound<'py, PyAny>> {
880        let client = self.clone();
881
882        pyo3_async_runtimes::tokio::future_into_py(py, async move {
883            client
884                .wait_until_active(timeout_secs)
885                .await
886                .map_err(to_pyruntime_err)?;
887            Ok(())
888        })
889    }
890
891    #[pyo3(name = "submit_order")]
892    #[pyo3(signature = (
893        product_type,
894        instrument_id,
895        client_order_id,
896        order_side,
897        order_type,
898        quantity,
899        time_in_force=None,
900        price=None,
901        trigger_price=None,
902        post_only=None,
903        reduce_only=None,
904    ))]
905    #[allow(clippy::too_many_arguments)]
906    fn py_submit_order<'py>(
907        &self,
908        py: Python<'py>,
909        product_type: crate::common::enums::BybitProductType,
910        instrument_id: nautilus_model::identifiers::InstrumentId,
911        client_order_id: nautilus_model::identifiers::ClientOrderId,
912        order_side: nautilus_model::enums::OrderSide,
913        order_type: nautilus_model::enums::OrderType,
914        quantity: nautilus_model::types::Quantity,
915        time_in_force: Option<nautilus_model::enums::TimeInForce>,
916        price: Option<nautilus_model::types::Price>,
917        trigger_price: Option<nautilus_model::types::Price>,
918        post_only: Option<bool>,
919        reduce_only: Option<bool>,
920    ) -> PyResult<Bound<'py, PyAny>> {
921        let client = self.clone();
922
923        pyo3_async_runtimes::tokio::future_into_py(py, async move {
924            client
925                .submit_order(
926                    product_type,
927                    instrument_id,
928                    client_order_id,
929                    order_side,
930                    order_type,
931                    quantity,
932                    time_in_force,
933                    price,
934                    trigger_price,
935                    post_only,
936                    reduce_only,
937                )
938                .await
939                .map_err(to_pyruntime_err)?;
940            Ok(())
941        })
942    }
943
944    #[pyo3(name = "modify_order")]
945    #[pyo3(signature = (
946        product_type,
947        instrument_id,
948        venue_order_id=None,
949        client_order_id=None,
950        quantity=None,
951        price=None,
952    ))]
953    #[allow(clippy::too_many_arguments)]
954    fn py_modify_order<'py>(
955        &self,
956        py: Python<'py>,
957        product_type: crate::common::enums::BybitProductType,
958        instrument_id: nautilus_model::identifiers::InstrumentId,
959        venue_order_id: Option<nautilus_model::identifiers::VenueOrderId>,
960        client_order_id: Option<nautilus_model::identifiers::ClientOrderId>,
961        quantity: Option<nautilus_model::types::Quantity>,
962        price: Option<nautilus_model::types::Price>,
963    ) -> PyResult<Bound<'py, PyAny>> {
964        let client = self.clone();
965
966        pyo3_async_runtimes::tokio::future_into_py(py, async move {
967            client
968                .modify_order(
969                    product_type,
970                    instrument_id,
971                    venue_order_id,
972                    client_order_id,
973                    quantity,
974                    price,
975                )
976                .await
977                .map_err(to_pyruntime_err)?;
978            Ok(())
979        })
980    }
981
982    #[pyo3(name = "cancel_order")]
983    #[pyo3(signature = (
984        product_type,
985        instrument_id,
986        venue_order_id=None,
987        client_order_id=None,
988    ))]
989    fn py_cancel_order<'py>(
990        &self,
991        py: Python<'py>,
992        product_type: crate::common::enums::BybitProductType,
993        instrument_id: nautilus_model::identifiers::InstrumentId,
994        venue_order_id: Option<nautilus_model::identifiers::VenueOrderId>,
995        client_order_id: Option<nautilus_model::identifiers::ClientOrderId>,
996    ) -> PyResult<Bound<'py, PyAny>> {
997        let client = self.clone();
998
999        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1000            client
1001                .cancel_order_by_id(product_type, instrument_id, venue_order_id, client_order_id)
1002                .await
1003                .map_err(to_pyruntime_err)?;
1004            Ok(())
1005        })
1006    }
1007}
1008
1009fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
1010    if let Err(e) = callback.call1(py, (py_obj,)) {
1011        tracing::error!("Error calling Python callback: {e}");
1012    }
1013}
1014
1015fn call_python_with_data<F>(callback: &Py<PyAny>, data_fn: F)
1016where
1017    F: FnOnce(Python<'_>) -> PyResult<Bound<'_, PyAny>> + Send + 'static,
1018{
1019    Python::attach(|py| match data_fn(py) {
1020        Ok(data) => {
1021            if let Err(e) = callback.call1(py, (data,)) {
1022                tracing::error!("Error calling Python callback: {e}");
1023            }
1024        }
1025        Err(e) => {
1026            tracing::error!("Error converting data to Python: {e}");
1027        }
1028    });
1029}