Skip to main content

nautilus_okx/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the OKX WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - RwLock is preferred over Mutex (many reads, few writes).
43
44use std::str::FromStr;
45
46use futures_util::StreamExt;
47use nautilus_common::live::get_runtime;
48use nautilus_core::python::{call_python, to_pyruntime_err, to_pyvalue_err};
49use nautilus_model::{
50    data::{BarType, Data, OrderBookDeltas_API},
51    enums::{OrderSide, OrderType, PositionSide, TimeInForce},
52    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
53    python::{
54        data::data_to_pycapsule,
55        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
56    },
57    types::{Price, Quantity},
58};
59use pyo3::{IntoPyObjectExt, prelude::*};
60use ustr::Ustr;
61
62use crate::{
63    common::enums::{OKXInstrumentType, OKXTradeMode, OKXVipLevel},
64    websocket::{
65        OKXWebSocketClient,
66        messages::{ExecutionReport, NautilusWsMessage, OKXWebSocketError},
67    },
68};
69
70#[pyo3::pymethods]
71impl OKXWebSocketError {
72    #[getter]
73    pub fn code(&self) -> &str {
74        &self.code
75    }
76
77    #[getter]
78    pub fn message(&self) -> &str {
79        &self.message
80    }
81
82    #[getter]
83    pub fn conn_id(&self) -> Option<&str> {
84        self.conn_id.as_deref()
85    }
86
87    #[getter]
88    pub fn ts_event(&self) -> u64 {
89        self.timestamp
90    }
91
92    fn __repr__(&self) -> String {
93        format!(
94            "OKXWebSocketError(code='{}', message='{}', conn_id={:?}, ts_event={})",
95            self.code, self.message, self.conn_id, self.timestamp
96        )
97    }
98}
99
100#[pymethods]
101impl OKXWebSocketClient {
102    #[new]
103    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
104    fn py_new(
105        url: Option<String>,
106        api_key: Option<String>,
107        api_secret: Option<String>,
108        api_passphrase: Option<String>,
109        account_id: Option<AccountId>,
110        heartbeat: Option<u64>,
111    ) -> PyResult<Self> {
112        Self::new(
113            url,
114            api_key,
115            api_secret,
116            api_passphrase,
117            account_id,
118            heartbeat,
119        )
120        .map_err(to_pyvalue_err)
121    }
122
123    #[staticmethod]
124    #[pyo3(name = "with_credentials")]
125    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
126    fn py_with_credentials(
127        url: Option<String>,
128        api_key: Option<String>,
129        api_secret: Option<String>,
130        api_passphrase: Option<String>,
131        account_id: Option<AccountId>,
132        heartbeat: Option<u64>,
133    ) -> PyResult<Self> {
134        Self::with_credentials(
135            url,
136            api_key,
137            api_secret,
138            api_passphrase,
139            account_id,
140            heartbeat,
141        )
142        .map_err(to_pyvalue_err)
143    }
144
145    #[staticmethod]
146    #[pyo3(name = "from_env")]
147    fn py_from_env() -> PyResult<Self> {
148        Self::from_env().map_err(to_pyvalue_err)
149    }
150
151    #[getter]
152    #[pyo3(name = "url")]
153    #[must_use]
154    pub fn py_url(&self) -> &str {
155        self.url()
156    }
157
158    #[getter]
159    #[pyo3(name = "api_key")]
160    #[must_use]
161    pub fn py_api_key(&self) -> Option<&str> {
162        self.api_key()
163    }
164
165    #[getter]
166    #[pyo3(name = "api_key_masked")]
167    #[must_use]
168    pub fn py_api_key_masked(&self) -> Option<String> {
169        self.api_key_masked()
170    }
171
172    #[pyo3(name = "is_active")]
173    fn py_is_active(&mut self) -> bool {
174        self.is_active()
175    }
176
177    #[pyo3(name = "is_closed")]
178    fn py_is_closed(&mut self) -> bool {
179        self.is_closed()
180    }
181
182    #[pyo3(name = "cancel_all_requests")]
183    pub fn py_cancel_all_requests(&self) {
184        self.cancel_all_requests();
185    }
186
187    #[pyo3(name = "get_subscriptions")]
188    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
189        let channels = self.get_subscriptions(instrument_id);
190
191        // Convert to OKX channel names
192        channels
193            .iter()
194            .map(|c| {
195                serde_json::to_value(c)
196                    .ok()
197                    .and_then(|v| v.as_str().map(String::from))
198                    .unwrap_or_else(|| c.to_string())
199            })
200            .collect()
201    }
202
203    /// Sets the VIP level for this client.
204    ///
205    /// The VIP level determines which WebSocket channels are available.
206    #[pyo3(name = "set_vip_level")]
207    fn py_set_vip_level(&self, vip_level: OKXVipLevel) {
208        self.set_vip_level(vip_level);
209    }
210
211    /// Gets the current VIP level.
212    #[pyo3(name = "vip_level")]
213    #[getter]
214    fn py_vip_level(&self) -> OKXVipLevel {
215        self.vip_level()
216    }
217
218    #[pyo3(name = "connect")]
219    fn py_connect<'py>(
220        &mut self,
221        py: Python<'py>,
222        instruments: Vec<Py<PyAny>>,
223        callback: Py<PyAny>,
224    ) -> PyResult<Bound<'py, PyAny>> {
225        let mut instruments_any = Vec::new();
226        for inst in instruments {
227            let inst_any = pyobject_to_instrument_any(py, inst)?;
228            instruments_any.push(inst_any);
229        }
230
231        self.cache_instruments(instruments_any);
232
233        let mut client = self.clone();
234
235        pyo3_async_runtimes::tokio::future_into_py(py, async move {
236            client.connect().await.map_err(to_pyruntime_err)?;
237
238            let stream = client.stream();
239
240            // Keep client alive in the spawned task to prevent handler from dropping
241            get_runtime().spawn(async move {
242                let _client = client;
243                tokio::pin!(stream);
244
245                while let Some(msg) = stream.next().await {
246                    match msg {
247                        NautilusWsMessage::Instrument(msg) => {
248                            call_python_with_data(&callback, |py| {
249                                instrument_any_to_pyobject(py, *msg)
250                            });
251                        }
252                        NautilusWsMessage::Data(msg) => Python::attach(|py| {
253                            for data in msg {
254                                let py_obj = data_to_pycapsule(py, data);
255                                call_python(py, &callback, py_obj);
256                            }
257                        }),
258                        NautilusWsMessage::FundingRates(msg) => {
259                            for data in msg {
260                                call_python_with_data(&callback, |py| data.into_py_any(py));
261                            }
262                        }
263                        NautilusWsMessage::OrderAccepted(msg) => {
264                            call_python_with_data(&callback, |py| msg.into_py_any(py));
265                        }
266                        NautilusWsMessage::OrderCanceled(msg) => {
267                            call_python_with_data(&callback, |py| msg.into_py_any(py));
268                        }
269                        NautilusWsMessage::OrderExpired(msg) => {
270                            call_python_with_data(&callback, |py| msg.into_py_any(py));
271                        }
272                        NautilusWsMessage::OrderRejected(msg) => {
273                            call_python_with_data(&callback, |py| msg.into_py_any(py));
274                        }
275                        NautilusWsMessage::OrderCancelRejected(msg) => {
276                            call_python_with_data(&callback, |py| msg.into_py_any(py));
277                        }
278                        NautilusWsMessage::OrderModifyRejected(msg) => {
279                            call_python_with_data(&callback, |py| msg.into_py_any(py));
280                        }
281                        NautilusWsMessage::OrderTriggered(msg) => {
282                            call_python_with_data(&callback, |py| msg.into_py_any(py));
283                        }
284                        NautilusWsMessage::OrderUpdated(msg) => {
285                            call_python_with_data(&callback, |py| msg.into_py_any(py));
286                        }
287                        NautilusWsMessage::ExecutionReports(msg) => {
288                            for report in msg {
289                                match report {
290                                    ExecutionReport::Order(report) => {
291                                        call_python_with_data(&callback, |py| {
292                                            report.into_py_any(py)
293                                        });
294                                    }
295                                    ExecutionReport::Fill(report) => {
296                                        call_python_with_data(&callback, |py| {
297                                            report.into_py_any(py)
298                                        });
299                                    }
300                                };
301                            }
302                        }
303                        NautilusWsMessage::Deltas(msg) => Python::attach(|py| {
304                            let py_obj =
305                                data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(msg)));
306                            call_python(py, &callback, py_obj);
307                        }),
308                        NautilusWsMessage::AccountUpdate(msg) => {
309                            call_python_with_data(&callback, |py| msg.into_py_any(py));
310                        }
311                        NautilusWsMessage::PositionUpdate(msg) => {
312                            call_python_with_data(&callback, |py| msg.into_py_any(py));
313                        }
314                        NautilusWsMessage::Reconnected => {} // Nothing to handle
315                        NautilusWsMessage::Authenticated => {} // Nothing to handle
316                        NautilusWsMessage::Error(msg) => {
317                            call_python_with_data(&callback, |py| msg.into_py_any(py));
318                        }
319                        NautilusWsMessage::Raw(msg) => {
320                            log::debug!("Received raw message, skipping: {msg}");
321                        }
322                    }
323                }
324            });
325
326            Ok(())
327        })
328    }
329
330    #[pyo3(name = "wait_until_active")]
331    fn py_wait_until_active<'py>(
332        &self,
333        py: Python<'py>,
334        timeout_secs: f64,
335    ) -> PyResult<Bound<'py, PyAny>> {
336        let client = self.clone();
337
338        pyo3_async_runtimes::tokio::future_into_py(py, async move {
339            client
340                .wait_until_active(timeout_secs)
341                .await
342                .map_err(to_pyruntime_err)?;
343            Ok(())
344        })
345    }
346
347    #[pyo3(name = "close")]
348    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
349        let mut client = self.clone();
350
351        pyo3_async_runtimes::tokio::future_into_py(py, async move {
352            if let Err(e) = client.close().await {
353                log::error!("Error on close: {e}");
354            }
355            Ok(())
356        })
357    }
358
359    #[pyo3(name = "subscribe_instruments")]
360    fn py_subscribe_instruments<'py>(
361        &self,
362        py: Python<'py>,
363        instrument_type: OKXInstrumentType,
364    ) -> PyResult<Bound<'py, PyAny>> {
365        let client = self.clone();
366
367        pyo3_async_runtimes::tokio::future_into_py(py, async move {
368            if let Err(e) = client.subscribe_instruments(instrument_type).await {
369                log::error!("Failed to subscribe to instruments '{instrument_type}': {e}");
370            }
371            Ok(())
372        })
373    }
374
375    #[pyo3(name = "subscribe_instrument")]
376    fn py_subscribe_instrument<'py>(
377        &self,
378        py: Python<'py>,
379        instrument_id: InstrumentId,
380    ) -> PyResult<Bound<'py, PyAny>> {
381        let client = self.clone();
382
383        pyo3_async_runtimes::tokio::future_into_py(py, async move {
384            if let Err(e) = client.subscribe_instrument(instrument_id).await {
385                log::error!("Failed to subscribe to instrument {instrument_id}: {e}");
386            }
387            Ok(())
388        })
389    }
390
391    #[pyo3(name = "subscribe_book")]
392    fn py_subscribe_book<'py>(
393        &self,
394        py: Python<'py>,
395        instrument_id: InstrumentId,
396    ) -> PyResult<Bound<'py, PyAny>> {
397        let client = self.clone();
398
399        pyo3_async_runtimes::tokio::future_into_py(py, async move {
400            client
401                .subscribe_book(instrument_id)
402                .await
403                .map_err(to_pyvalue_err)
404        })
405    }
406
407    #[pyo3(name = "subscribe_book50_l2_tbt")]
408    fn py_subscribe_book50_l2_tbt<'py>(
409        &self,
410        py: Python<'py>,
411        instrument_id: InstrumentId,
412    ) -> PyResult<Bound<'py, PyAny>> {
413        let client = self.clone();
414
415        pyo3_async_runtimes::tokio::future_into_py(py, async move {
416            if let Err(e) = client.subscribe_book50_l2_tbt(instrument_id).await {
417                log::error!("Failed to subscribe to book50_tbt: {e}");
418            }
419            Ok(())
420        })
421    }
422
423    #[pyo3(name = "subscribe_book_l2_tbt")]
424    fn py_subscribe_book_l2_tbt<'py>(
425        &self,
426        py: Python<'py>,
427        instrument_id: InstrumentId,
428    ) -> PyResult<Bound<'py, PyAny>> {
429        let client = self.clone();
430
431        pyo3_async_runtimes::tokio::future_into_py(py, async move {
432            if let Err(e) = client.subscribe_book_l2_tbt(instrument_id).await {
433                log::error!("Failed to subscribe to books_l2_tbt: {e}");
434            }
435            Ok(())
436        })
437    }
438
439    #[pyo3(name = "subscribe_book_with_depth")]
440    fn py_subscribe_book_with_depth<'py>(
441        &self,
442        py: Python<'py>,
443        instrument_id: InstrumentId,
444        depth: u16,
445    ) -> PyResult<Bound<'py, PyAny>> {
446        let client = self.clone();
447
448        pyo3_async_runtimes::tokio::future_into_py(py, async move {
449            client
450                .subscribe_book_with_depth(instrument_id, depth)
451                .await
452                .map_err(to_pyvalue_err)
453        })
454    }
455
456    #[pyo3(name = "subscribe_book_depth5")]
457    fn py_subscribe_book_depth5<'py>(
458        &self,
459        py: Python<'py>,
460        instrument_id: InstrumentId,
461    ) -> PyResult<Bound<'py, PyAny>> {
462        let client = self.clone();
463
464        pyo3_async_runtimes::tokio::future_into_py(py, async move {
465            if let Err(e) = client.subscribe_book_depth5(instrument_id).await {
466                log::error!("Failed to subscribe to books5: {e}");
467            }
468            Ok(())
469        })
470    }
471
472    #[pyo3(name = "subscribe_quotes")]
473    fn py_subscribe_quotes<'py>(
474        &self,
475        py: Python<'py>,
476        instrument_id: InstrumentId,
477    ) -> PyResult<Bound<'py, PyAny>> {
478        let client = self.clone();
479
480        pyo3_async_runtimes::tokio::future_into_py(py, async move {
481            if let Err(e) = client.subscribe_quotes(instrument_id).await {
482                log::error!("Failed to subscribe to quotes: {e}");
483            }
484            Ok(())
485        })
486    }
487
488    #[pyo3(name = "subscribe_trades")]
489    fn py_subscribe_trades<'py>(
490        &self,
491        py: Python<'py>,
492        instrument_id: InstrumentId,
493        aggregated: bool,
494    ) -> PyResult<Bound<'py, PyAny>> {
495        let client = self.clone();
496
497        pyo3_async_runtimes::tokio::future_into_py(py, async move {
498            if let Err(e) = client.subscribe_trades(instrument_id, aggregated).await {
499                log::error!("Failed to subscribe to trades: {e}");
500            }
501            Ok(())
502        })
503    }
504
505    #[pyo3(name = "subscribe_bars")]
506    fn py_subscribe_bars<'py>(
507        &self,
508        py: Python<'py>,
509        bar_type: BarType,
510    ) -> PyResult<Bound<'py, PyAny>> {
511        let client = self.clone();
512
513        pyo3_async_runtimes::tokio::future_into_py(py, async move {
514            if let Err(e) = client.subscribe_bars(bar_type).await {
515                log::error!("Failed to subscribe to bars: {e}");
516            }
517            Ok(())
518        })
519    }
520
521    #[pyo3(name = "unsubscribe_book")]
522    fn py_unsubscribe_book<'py>(
523        &self,
524        py: Python<'py>,
525        instrument_id: InstrumentId,
526    ) -> PyResult<Bound<'py, PyAny>> {
527        let client = self.clone();
528
529        pyo3_async_runtimes::tokio::future_into_py(py, async move {
530            if let Err(e) = client.unsubscribe_book(instrument_id).await {
531                log::error!("Failed to unsubscribe from order book: {e}");
532            }
533            Ok(())
534        })
535    }
536
537    #[pyo3(name = "unsubscribe_book_depth5")]
538    fn py_unsubscribe_book_depth5<'py>(
539        &self,
540        py: Python<'py>,
541        instrument_id: InstrumentId,
542    ) -> PyResult<Bound<'py, PyAny>> {
543        let client = self.clone();
544
545        pyo3_async_runtimes::tokio::future_into_py(py, async move {
546            if let Err(e) = client.unsubscribe_book_depth5(instrument_id).await {
547                log::error!("Failed to unsubscribe from books5: {e}");
548            }
549            Ok(())
550        })
551    }
552
553    #[pyo3(name = "unsubscribe_book50_l2_tbt")]
554    fn py_unsubscribe_book50_l2_tbt<'py>(
555        &self,
556        py: Python<'py>,
557        instrument_id: InstrumentId,
558    ) -> PyResult<Bound<'py, PyAny>> {
559        let client = self.clone();
560
561        pyo3_async_runtimes::tokio::future_into_py(py, async move {
562            if let Err(e) = client.unsubscribe_book50_l2_tbt(instrument_id).await {
563                log::error!("Failed to unsubscribe from books50_l2_tbt: {e}");
564            }
565            Ok(())
566        })
567    }
568
569    #[pyo3(name = "unsubscribe_book_l2_tbt")]
570    fn py_unsubscribe_book_l2_tbt<'py>(
571        &self,
572        py: Python<'py>,
573        instrument_id: InstrumentId,
574    ) -> PyResult<Bound<'py, PyAny>> {
575        let client = self.clone();
576
577        pyo3_async_runtimes::tokio::future_into_py(py, async move {
578            if let Err(e) = client.unsubscribe_book_l2_tbt(instrument_id).await {
579                log::error!("Failed to unsubscribe from books_l2_tbt: {e}");
580            }
581            Ok(())
582        })
583    }
584
585    #[pyo3(name = "unsubscribe_quotes")]
586    fn py_unsubscribe_quotes<'py>(
587        &self,
588        py: Python<'py>,
589        instrument_id: InstrumentId,
590    ) -> PyResult<Bound<'py, PyAny>> {
591        let client = self.clone();
592
593        pyo3_async_runtimes::tokio::future_into_py(py, async move {
594            if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
595                log::error!("Failed to unsubscribe from quotes: {e}");
596            }
597            Ok(())
598        })
599    }
600
601    #[pyo3(name = "unsubscribe_trades")]
602    fn py_unsubscribe_trades<'py>(
603        &self,
604        py: Python<'py>,
605        instrument_id: InstrumentId,
606        aggregated: bool,
607    ) -> PyResult<Bound<'py, PyAny>> {
608        let client = self.clone();
609
610        pyo3_async_runtimes::tokio::future_into_py(py, async move {
611            if let Err(e) = client.unsubscribe_trades(instrument_id, aggregated).await {
612                log::error!("Failed to unsubscribe from trades: {e}");
613            }
614            Ok(())
615        })
616    }
617
618    #[pyo3(name = "unsubscribe_bars")]
619    fn py_unsubscribe_bars<'py>(
620        &self,
621        py: Python<'py>,
622        bar_type: BarType,
623    ) -> PyResult<Bound<'py, PyAny>> {
624        let client = self.clone();
625
626        pyo3_async_runtimes::tokio::future_into_py(py, async move {
627            if let Err(e) = client.unsubscribe_bars(bar_type).await {
628                log::error!("Failed to unsubscribe from bars: {e}");
629            }
630            Ok(())
631        })
632    }
633
634    #[pyo3(name = "subscribe_ticker")]
635    fn py_subscribe_ticker<'py>(
636        &self,
637        py: Python<'py>,
638        instrument_id: InstrumentId,
639    ) -> PyResult<Bound<'py, PyAny>> {
640        let client = self.clone();
641
642        pyo3_async_runtimes::tokio::future_into_py(py, async move {
643            if let Err(e) = client.subscribe_ticker(instrument_id).await {
644                log::error!("Failed to subscribe to ticker: {e}");
645            }
646            Ok(())
647        })
648    }
649
650    #[pyo3(name = "unsubscribe_ticker")]
651    fn py_unsubscribe_ticker<'py>(
652        &self,
653        py: Python<'py>,
654        instrument_id: InstrumentId,
655    ) -> PyResult<Bound<'py, PyAny>> {
656        let client = self.clone();
657
658        pyo3_async_runtimes::tokio::future_into_py(py, async move {
659            if let Err(e) = client.unsubscribe_ticker(instrument_id).await {
660                log::error!("Failed to unsubscribe from ticker: {e}");
661            }
662            Ok(())
663        })
664    }
665
666    #[pyo3(name = "subscribe_mark_prices")]
667    fn py_subscribe_mark_prices<'py>(
668        &self,
669        py: Python<'py>,
670        instrument_id: InstrumentId,
671    ) -> PyResult<Bound<'py, PyAny>> {
672        let client = self.clone();
673
674        pyo3_async_runtimes::tokio::future_into_py(py, async move {
675            if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
676                log::error!("Failed to subscribe to mark prices: {e}");
677            }
678            Ok(())
679        })
680    }
681
682    #[pyo3(name = "unsubscribe_mark_prices")]
683    fn py_unsubscribe_mark_prices<'py>(
684        &self,
685        py: Python<'py>,
686        instrument_id: InstrumentId,
687    ) -> PyResult<Bound<'py, PyAny>> {
688        let client = self.clone();
689
690        pyo3_async_runtimes::tokio::future_into_py(py, async move {
691            if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
692                log::error!("Failed to unsubscribe from mark prices: {e}");
693            }
694            Ok(())
695        })
696    }
697
698    #[pyo3(name = "subscribe_index_prices")]
699    fn py_subscribe_index_prices<'py>(
700        &self,
701        py: Python<'py>,
702        instrument_id: InstrumentId,
703    ) -> PyResult<Bound<'py, PyAny>> {
704        let client = self.clone();
705
706        pyo3_async_runtimes::tokio::future_into_py(py, async move {
707            if let Err(e) = client.subscribe_index_prices(instrument_id).await {
708                log::error!("Failed to subscribe to index prices: {e}");
709            }
710            Ok(())
711        })
712    }
713
714    #[pyo3(name = "unsubscribe_index_prices")]
715    fn py_unsubscribe_index_prices<'py>(
716        &self,
717        py: Python<'py>,
718        instrument_id: InstrumentId,
719    ) -> PyResult<Bound<'py, PyAny>> {
720        let client = self.clone();
721
722        pyo3_async_runtimes::tokio::future_into_py(py, async move {
723            if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
724                log::error!("Failed to unsubscribe from index prices: {e}");
725            }
726            Ok(())
727        })
728    }
729
730    #[pyo3(name = "subscribe_funding_rates")]
731    fn py_subscribe_funding_rates<'py>(
732        &self,
733        py: Python<'py>,
734        instrument_id: InstrumentId,
735    ) -> PyResult<Bound<'py, PyAny>> {
736        let client = self.clone();
737
738        pyo3_async_runtimes::tokio::future_into_py(py, async move {
739            if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
740                log::error!("Failed to subscribe to funding rates: {e}");
741            }
742            Ok(())
743        })
744    }
745
746    #[pyo3(name = "unsubscribe_funding_rates")]
747    fn py_unsubscribe_funding_rates<'py>(
748        &self,
749        py: Python<'py>,
750        instrument_id: InstrumentId,
751    ) -> PyResult<Bound<'py, PyAny>> {
752        let client = self.clone();
753
754        pyo3_async_runtimes::tokio::future_into_py(py, async move {
755            if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
756                log::error!("Failed to unsubscribe from funding rates: {e}");
757            }
758            Ok(())
759        })
760    }
761
762    #[pyo3(name = "subscribe_orders")]
763    fn py_subscribe_orders<'py>(
764        &self,
765        py: Python<'py>,
766        instrument_type: OKXInstrumentType,
767    ) -> PyResult<Bound<'py, PyAny>> {
768        let client = self.clone();
769
770        pyo3_async_runtimes::tokio::future_into_py(py, async move {
771            if let Err(e) = client.subscribe_orders(instrument_type).await {
772                log::error!("Failed to subscribe to orders '{instrument_type}': {e}");
773            }
774            Ok(())
775        })
776    }
777
778    #[pyo3(name = "unsubscribe_orders")]
779    fn py_unsubscribe_orders<'py>(
780        &self,
781        py: Python<'py>,
782        instrument_type: OKXInstrumentType,
783    ) -> PyResult<Bound<'py, PyAny>> {
784        let client = self.clone();
785
786        pyo3_async_runtimes::tokio::future_into_py(py, async move {
787            if let Err(e) = client.unsubscribe_orders(instrument_type).await {
788                log::error!("Failed to unsubscribe from orders '{instrument_type}': {e}");
789            }
790            Ok(())
791        })
792    }
793
794    #[pyo3(name = "subscribe_orders_algo")]
795    fn py_subscribe_orders_algo<'py>(
796        &self,
797        py: Python<'py>,
798        instrument_type: OKXInstrumentType,
799    ) -> PyResult<Bound<'py, PyAny>> {
800        let client = self.clone();
801
802        pyo3_async_runtimes::tokio::future_into_py(py, async move {
803            if let Err(e) = client.subscribe_orders_algo(instrument_type).await {
804                log::error!("Failed to subscribe to algo orders '{instrument_type}': {e}");
805            }
806            Ok(())
807        })
808    }
809
810    #[pyo3(name = "unsubscribe_orders_algo")]
811    fn py_unsubscribe_orders_algo<'py>(
812        &self,
813        py: Python<'py>,
814        instrument_type: OKXInstrumentType,
815    ) -> PyResult<Bound<'py, PyAny>> {
816        let client = self.clone();
817
818        pyo3_async_runtimes::tokio::future_into_py(py, async move {
819            if let Err(e) = client.unsubscribe_orders_algo(instrument_type).await {
820                log::error!("Failed to unsubscribe from algo orders '{instrument_type}': {e}");
821            }
822            Ok(())
823        })
824    }
825
826    #[pyo3(name = "subscribe_fills")]
827    fn py_subscribe_fills<'py>(
828        &self,
829        py: Python<'py>,
830        instrument_type: OKXInstrumentType,
831    ) -> PyResult<Bound<'py, PyAny>> {
832        let client = self.clone();
833
834        pyo3_async_runtimes::tokio::future_into_py(py, async move {
835            if let Err(e) = client.subscribe_fills(instrument_type).await {
836                log::error!("Failed to subscribe to fills '{instrument_type}': {e}");
837            }
838            Ok(())
839        })
840    }
841
842    #[pyo3(name = "unsubscribe_fills")]
843    fn py_unsubscribe_fills<'py>(
844        &self,
845        py: Python<'py>,
846        instrument_type: OKXInstrumentType,
847    ) -> PyResult<Bound<'py, PyAny>> {
848        let client = self.clone();
849
850        pyo3_async_runtimes::tokio::future_into_py(py, async move {
851            if let Err(e) = client.unsubscribe_fills(instrument_type).await {
852                log::error!("Failed to unsubscribe from fills '{instrument_type}': {e}");
853            }
854            Ok(())
855        })
856    }
857
858    #[pyo3(name = "subscribe_account")]
859    fn py_subscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
860        let client = self.clone();
861
862        pyo3_async_runtimes::tokio::future_into_py(py, async move {
863            if let Err(e) = client.subscribe_account().await {
864                log::error!("Failed to subscribe to account: {e}");
865            }
866            Ok(())
867        })
868    }
869
870    #[pyo3(name = "unsubscribe_account")]
871    fn py_unsubscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
872        let client = self.clone();
873
874        pyo3_async_runtimes::tokio::future_into_py(py, async move {
875            if let Err(e) = client.unsubscribe_account().await {
876                log::error!("Failed to unsubscribe from account: {e}");
877            }
878            Ok(())
879        })
880    }
881
882    #[pyo3(name = "submit_order")]
883    #[pyo3(signature = (
884        trader_id,
885        strategy_id,
886        instrument_id,
887        td_mode,
888        client_order_id,
889        order_side,
890        order_type,
891        quantity,
892        time_in_force=None,
893        price=None,
894        trigger_price=None,
895        post_only=None,
896        reduce_only=None,
897        quote_quantity=None,
898        position_side=None,
899    ))]
900    #[allow(clippy::too_many_arguments)]
901    fn py_submit_order<'py>(
902        &self,
903        py: Python<'py>,
904        trader_id: TraderId,
905        strategy_id: StrategyId,
906        instrument_id: InstrumentId,
907        td_mode: OKXTradeMode,
908        client_order_id: ClientOrderId,
909        order_side: OrderSide,
910        order_type: OrderType,
911        quantity: Quantity,
912        time_in_force: Option<TimeInForce>,
913        price: Option<Price>,
914        trigger_price: Option<Price>,
915        post_only: Option<bool>,
916        reduce_only: Option<bool>,
917        quote_quantity: Option<bool>,
918        position_side: Option<PositionSide>,
919    ) -> PyResult<Bound<'py, PyAny>> {
920        let client = self.clone();
921
922        pyo3_async_runtimes::tokio::future_into_py(py, async move {
923            client
924                .submit_order(
925                    trader_id,
926                    strategy_id,
927                    instrument_id,
928                    td_mode,
929                    client_order_id,
930                    order_side,
931                    order_type,
932                    quantity,
933                    time_in_force,
934                    price,
935                    trigger_price,
936                    post_only,
937                    reduce_only,
938                    quote_quantity,
939                    position_side,
940                )
941                .await
942                .map_err(to_pyvalue_err)
943        })
944    }
945
946    #[pyo3(name = "cancel_order", signature = (
947        trader_id,
948        strategy_id,
949        instrument_id,
950        client_order_id=None,
951        venue_order_id=None,
952    ))]
953    #[allow(clippy::too_many_arguments)]
954    fn py_cancel_order<'py>(
955        &self,
956        py: Python<'py>,
957        trader_id: TraderId,
958        strategy_id: StrategyId,
959        instrument_id: InstrumentId,
960        client_order_id: Option<ClientOrderId>,
961        venue_order_id: Option<VenueOrderId>,
962    ) -> PyResult<Bound<'py, PyAny>> {
963        let client = self.clone();
964
965        pyo3_async_runtimes::tokio::future_into_py(py, async move {
966            client
967                .cancel_order(
968                    trader_id,
969                    strategy_id,
970                    instrument_id,
971                    client_order_id,
972                    venue_order_id,
973                )
974                .await
975                .map_err(to_pyvalue_err)
976        })
977    }
978
979    #[pyo3(name = "modify_order")]
980    #[pyo3(signature = (
981        trader_id,
982        strategy_id,
983        instrument_id,
984        client_order_id=None,
985        venue_order_id=None,
986        price=None,
987        quantity=None,
988    ))]
989    #[allow(clippy::too_many_arguments)]
990    fn py_modify_order<'py>(
991        &self,
992        py: Python<'py>,
993        trader_id: TraderId,
994        strategy_id: StrategyId,
995        instrument_id: InstrumentId,
996        client_order_id: Option<ClientOrderId>,
997        venue_order_id: Option<VenueOrderId>,
998        price: Option<Price>,
999        quantity: Option<Quantity>,
1000    ) -> PyResult<Bound<'py, PyAny>> {
1001        let client = self.clone();
1002
1003        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1004            client
1005                .modify_order(
1006                    trader_id,
1007                    strategy_id,
1008                    instrument_id,
1009                    client_order_id,
1010                    price,
1011                    quantity,
1012                    venue_order_id,
1013                )
1014                .await
1015                .map_err(to_pyvalue_err)
1016        })
1017    }
1018
1019    #[allow(clippy::type_complexity)]
1020    #[pyo3(name = "batch_submit_orders")]
1021    fn py_batch_submit_orders<'py>(
1022        &self,
1023        py: Python<'py>,
1024        orders: Vec<Py<PyAny>>,
1025    ) -> PyResult<Bound<'py, PyAny>> {
1026        let mut domain_orders = Vec::with_capacity(orders.len());
1027
1028        for obj in orders {
1029            let (
1030                instrument_type,
1031                instrument_id,
1032                td_mode,
1033                client_order_id,
1034                order_side,
1035                order_type,
1036                quantity,
1037                position_side,
1038                price,
1039                trigger_price,
1040                post_only,
1041                reduce_only,
1042            ): (
1043                OKXInstrumentType,
1044                InstrumentId,
1045                OKXTradeMode,
1046                ClientOrderId,
1047                OrderSide,
1048                OrderType,
1049                Quantity,
1050                Option<PositionSide>,
1051                Option<Price>,
1052                Option<Price>,
1053                Option<bool>,
1054                Option<bool>,
1055            ) = obj.extract(py).map_err(to_pyruntime_err)?;
1056
1057            domain_orders.push((
1058                instrument_type,
1059                instrument_id,
1060                td_mode,
1061                client_order_id,
1062                order_side,
1063                position_side,
1064                order_type,
1065                quantity,
1066                price,
1067                trigger_price,
1068                post_only,
1069                reduce_only,
1070            ));
1071        }
1072
1073        let client = self.clone();
1074
1075        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1076            client
1077                .batch_submit_orders(domain_orders)
1078                .await
1079                .map_err(to_pyvalue_err)
1080        })
1081    }
1082
1083    /// Cancels multiple orders via WebSocket.
1084    #[pyo3(name = "batch_cancel_orders")]
1085    fn py_batch_cancel_orders<'py>(
1086        &self,
1087        py: Python<'py>,
1088        cancels: Vec<Py<PyAny>>,
1089    ) -> PyResult<Bound<'py, PyAny>> {
1090        let mut batched_cancels = Vec::with_capacity(cancels.len());
1091
1092        for obj in cancels {
1093            let (instrument_id, client_order_id, order_id): (
1094                InstrumentId,
1095                Option<ClientOrderId>,
1096                Option<VenueOrderId>,
1097            ) = obj.extract(py).map_err(to_pyruntime_err)?;
1098            batched_cancels.push((instrument_id, client_order_id, order_id));
1099        }
1100
1101        let client = self.clone();
1102
1103        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1104            client
1105                .batch_cancel_orders(batched_cancels)
1106                .await
1107                .map_err(to_pyvalue_err)
1108        })
1109    }
1110
1111    #[pyo3(name = "batch_modify_orders")]
1112    fn py_batch_modify_orders<'py>(
1113        &self,
1114        py: Python<'py>,
1115        orders: Vec<Py<PyAny>>,
1116    ) -> PyResult<Bound<'py, PyAny>> {
1117        let mut domain_orders = Vec::with_capacity(orders.len());
1118
1119        for obj in orders {
1120            let (
1121                instrument_type,
1122                instrument_id,
1123                client_order_id,
1124                new_client_order_id,
1125                price,
1126                quantity,
1127            ): (
1128                String,
1129                InstrumentId,
1130                ClientOrderId,
1131                ClientOrderId,
1132                Option<Price>,
1133                Option<Quantity>,
1134            ) = obj.extract(py).map_err(to_pyruntime_err)?;
1135            let inst_type =
1136                OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1137            domain_orders.push((
1138                inst_type,
1139                instrument_id,
1140                client_order_id,
1141                new_client_order_id,
1142                price,
1143                quantity,
1144            ));
1145        }
1146
1147        let client = self.clone();
1148
1149        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1150            client
1151                .batch_modify_orders(domain_orders)
1152                .await
1153                .map_err(to_pyvalue_err)
1154        })
1155    }
1156
1157    #[pyo3(name = "mass_cancel_orders")]
1158    fn py_mass_cancel_orders<'py>(
1159        &self,
1160        py: Python<'py>,
1161        instrument_id: InstrumentId,
1162    ) -> PyResult<Bound<'py, PyAny>> {
1163        let client = self.clone();
1164
1165        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1166            client
1167                .mass_cancel_orders(instrument_id)
1168                .await
1169                .map_err(to_pyvalue_err)
1170        })
1171    }
1172
1173    #[pyo3(name = "cache_instruments")]
1174    fn py_cache_instruments(&self, py: Python<'_>, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
1175        let instruments: Result<Vec<_>, _> = instruments
1176            .into_iter()
1177            .map(|inst| pyobject_to_instrument_any(py, inst))
1178            .collect();
1179        self.cache_instruments(instruments?);
1180        Ok(())
1181    }
1182
1183    #[pyo3(name = "cache_instrument")]
1184    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
1185        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
1186        Ok(())
1187    }
1188
1189    #[pyo3(name = "cache_inst_id_codes")]
1190    fn py_cache_inst_id_codes(&self, mappings: Vec<(String, u64)>) {
1191        let ustr_mappings = mappings
1192            .into_iter()
1193            .map(|(inst_id, code)| (Ustr::from(&inst_id), code));
1194        self.cache_inst_id_codes(ustr_mappings);
1195    }
1196}
1197
1198fn call_python_with_data<F>(callback: &Py<PyAny>, data_converter: F)
1199where
1200    F: FnOnce(Python) -> PyResult<Py<PyAny>>,
1201{
1202    Python::attach(|py| match data_converter(py) {
1203        Ok(py_obj) => call_python(py, callback, py_obj),
1204        Err(e) => log::error!("Failed to convert data to Python object: {e}"),
1205    });
1206}