nautilus_okx/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the OKX WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - RwLock is preferred over Mutex (many reads, few writes).
43
44use std::str::FromStr;
45
46use futures_util::StreamExt;
47use nautilus_common::live::get_runtime;
48use nautilus_core::python::{call_python, to_pyruntime_err, to_pyvalue_err};
49use nautilus_model::{
50    data::{BarType, Data, OrderBookDeltas_API},
51    enums::{OrderSide, OrderType, PositionSide, TimeInForce},
52    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
53    python::{
54        data::data_to_pycapsule,
55        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
56    },
57    types::{Price, Quantity},
58};
59use pyo3::{IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
60
61use crate::{
62    common::enums::{OKXInstrumentType, OKXTradeMode, OKXVipLevel},
63    websocket::{
64        OKXWebSocketClient,
65        messages::{ExecutionReport, NautilusWsMessage, OKXWebSocketError},
66    },
67};
68
69#[pyo3::pymethods]
70impl OKXWebSocketError {
71    #[getter]
72    pub fn code(&self) -> &str {
73        &self.code
74    }
75
76    #[getter]
77    pub fn message(&self) -> &str {
78        &self.message
79    }
80
81    #[getter]
82    pub fn conn_id(&self) -> Option<&str> {
83        self.conn_id.as_deref()
84    }
85
86    #[getter]
87    pub fn ts_event(&self) -> u64 {
88        self.timestamp
89    }
90
91    fn __repr__(&self) -> String {
92        format!(
93            "OKXWebSocketError(code='{}', message='{}', conn_id={:?}, ts_event={})",
94            self.code, self.message, self.conn_id, self.timestamp
95        )
96    }
97}
98
99#[pymethods]
100impl OKXWebSocketClient {
101    #[new]
102    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
103    fn py_new(
104        url: Option<String>,
105        api_key: Option<String>,
106        api_secret: Option<String>,
107        api_passphrase: Option<String>,
108        account_id: Option<AccountId>,
109        heartbeat: Option<u64>,
110    ) -> PyResult<Self> {
111        Self::new(
112            url,
113            api_key,
114            api_secret,
115            api_passphrase,
116            account_id,
117            heartbeat,
118        )
119        .map_err(to_pyvalue_err)
120    }
121
122    #[staticmethod]
123    #[pyo3(name = "with_credentials")]
124    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
125    fn py_with_credentials(
126        url: Option<String>,
127        api_key: Option<String>,
128        api_secret: Option<String>,
129        api_passphrase: Option<String>,
130        account_id: Option<AccountId>,
131        heartbeat: Option<u64>,
132    ) -> PyResult<Self> {
133        Self::with_credentials(
134            url,
135            api_key,
136            api_secret,
137            api_passphrase,
138            account_id,
139            heartbeat,
140        )
141        .map_err(to_pyvalue_err)
142    }
143
144    #[staticmethod]
145    #[pyo3(name = "from_env")]
146    fn py_from_env() -> PyResult<Self> {
147        Self::from_env().map_err(to_pyvalue_err)
148    }
149
150    #[getter]
151    #[pyo3(name = "url")]
152    #[must_use]
153    pub fn py_url(&self) -> &str {
154        self.url()
155    }
156
157    #[getter]
158    #[pyo3(name = "api_key")]
159    #[must_use]
160    pub fn py_api_key(&self) -> Option<&str> {
161        self.api_key()
162    }
163
164    #[getter]
165    #[pyo3(name = "api_key_masked")]
166    #[must_use]
167    pub fn py_api_key_masked(&self) -> Option<String> {
168        self.api_key_masked()
169    }
170
171    #[pyo3(name = "is_active")]
172    fn py_is_active(&mut self) -> bool {
173        self.is_active()
174    }
175
176    #[pyo3(name = "is_closed")]
177    fn py_is_closed(&mut self) -> bool {
178        self.is_closed()
179    }
180
181    #[pyo3(name = "cancel_all_requests")]
182    pub fn py_cancel_all_requests(&self) {
183        self.cancel_all_requests();
184    }
185
186    #[pyo3(name = "get_subscriptions")]
187    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
188        let channels = self.get_subscriptions(instrument_id);
189
190        // Convert to OKX channel names
191        channels
192            .iter()
193            .map(|c| {
194                serde_json::to_value(c)
195                    .ok()
196                    .and_then(|v| v.as_str().map(String::from))
197                    .unwrap_or_else(|| c.to_string())
198            })
199            .collect()
200    }
201
202    /// Sets the VIP level for this client.
203    ///
204    /// The VIP level determines which WebSocket channels are available.
205    #[pyo3(name = "set_vip_level")]
206    fn py_set_vip_level(&self, vip_level: OKXVipLevel) {
207        self.set_vip_level(vip_level);
208    }
209
210    /// Gets the current VIP level.
211    #[pyo3(name = "vip_level")]
212    #[getter]
213    fn py_vip_level(&self) -> OKXVipLevel {
214        self.vip_level()
215    }
216
217    #[pyo3(name = "connect")]
218    fn py_connect<'py>(
219        &mut self,
220        py: Python<'py>,
221        instruments: Vec<Py<PyAny>>,
222        callback: Py<PyAny>,
223    ) -> PyResult<Bound<'py, PyAny>> {
224        let mut instruments_any = Vec::new();
225        for inst in instruments {
226            let inst_any = pyobject_to_instrument_any(py, inst)?;
227            instruments_any.push(inst_any);
228        }
229
230        self.cache_instruments(instruments_any);
231
232        let mut client = self.clone();
233
234        pyo3_async_runtimes::tokio::future_into_py(py, async move {
235            client.connect().await.map_err(to_pyruntime_err)?;
236
237            let stream = client.stream();
238
239            // Keep client alive in the spawned task to prevent handler from dropping
240            get_runtime().spawn(async move {
241                let _client = client;
242                tokio::pin!(stream);
243
244                while let Some(msg) = stream.next().await {
245                    match msg {
246                        NautilusWsMessage::Instrument(msg) => {
247                            call_python_with_data(&callback, |py| {
248                                instrument_any_to_pyobject(py, *msg)
249                            });
250                        }
251                        NautilusWsMessage::Data(msg) => Python::attach(|py| {
252                            for data in msg {
253                                let py_obj = data_to_pycapsule(py, data);
254                                call_python(py, &callback, py_obj);
255                            }
256                        }),
257                        NautilusWsMessage::FundingRates(msg) => {
258                            for data in msg {
259                                call_python_with_data(&callback, |py| data.into_py_any(py));
260                            }
261                        }
262                        NautilusWsMessage::OrderAccepted(msg) => {
263                            call_python_with_data(&callback, |py| msg.into_py_any(py));
264                        }
265                        NautilusWsMessage::OrderCanceled(msg) => {
266                            call_python_with_data(&callback, |py| msg.into_py_any(py));
267                        }
268                        NautilusWsMessage::OrderExpired(msg) => {
269                            call_python_with_data(&callback, |py| msg.into_py_any(py));
270                        }
271                        NautilusWsMessage::OrderRejected(msg) => {
272                            call_python_with_data(&callback, |py| msg.into_py_any(py));
273                        }
274                        NautilusWsMessage::OrderCancelRejected(msg) => {
275                            call_python_with_data(&callback, |py| msg.into_py_any(py));
276                        }
277                        NautilusWsMessage::OrderModifyRejected(msg) => {
278                            call_python_with_data(&callback, |py| msg.into_py_any(py));
279                        }
280                        NautilusWsMessage::OrderTriggered(msg) => {
281                            call_python_with_data(&callback, |py| msg.into_py_any(py));
282                        }
283                        NautilusWsMessage::OrderUpdated(msg) => {
284                            call_python_with_data(&callback, |py| msg.into_py_any(py));
285                        }
286                        NautilusWsMessage::ExecutionReports(msg) => {
287                            for report in msg {
288                                match report {
289                                    ExecutionReport::Order(report) => {
290                                        call_python_with_data(&callback, |py| {
291                                            report.into_py_any(py)
292                                        });
293                                    }
294                                    ExecutionReport::Fill(report) => {
295                                        call_python_with_data(&callback, |py| {
296                                            report.into_py_any(py)
297                                        });
298                                    }
299                                };
300                            }
301                        }
302                        NautilusWsMessage::Deltas(msg) => Python::attach(|py| {
303                            let py_obj =
304                                data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(msg)));
305                            call_python(py, &callback, py_obj);
306                        }),
307                        NautilusWsMessage::AccountUpdate(msg) => {
308                            call_python_with_data(&callback, |py| msg.into_py_any(py));
309                        }
310                        NautilusWsMessage::PositionUpdate(msg) => {
311                            call_python_with_data(&callback, |py| msg.into_py_any(py));
312                        }
313                        NautilusWsMessage::Reconnected => {} // Nothing to handle
314                        NautilusWsMessage::Authenticated => {} // Nothing to handle
315                        NautilusWsMessage::Error(msg) => {
316                            call_python_with_data(&callback, |py| msg.into_py_any(py));
317                        }
318                        NautilusWsMessage::Raw(msg) => {
319                            log::debug!("Received raw message, skipping: {msg}");
320                        }
321                    }
322                }
323            });
324
325            Ok(())
326        })
327    }
328
329    #[pyo3(name = "wait_until_active")]
330    fn py_wait_until_active<'py>(
331        &self,
332        py: Python<'py>,
333        timeout_secs: f64,
334    ) -> PyResult<Bound<'py, PyAny>> {
335        let client = self.clone();
336
337        pyo3_async_runtimes::tokio::future_into_py(py, async move {
338            client
339                .wait_until_active(timeout_secs)
340                .await
341                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
342            Ok(())
343        })
344    }
345
346    #[pyo3(name = "close")]
347    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
348        let mut client = self.clone();
349
350        pyo3_async_runtimes::tokio::future_into_py(py, async move {
351            if let Err(e) = client.close().await {
352                log::error!("Error on close: {e}");
353            }
354            Ok(())
355        })
356    }
357
358    #[pyo3(name = "subscribe_instruments")]
359    fn py_subscribe_instruments<'py>(
360        &self,
361        py: Python<'py>,
362        instrument_type: OKXInstrumentType,
363    ) -> PyResult<Bound<'py, PyAny>> {
364        let client = self.clone();
365
366        pyo3_async_runtimes::tokio::future_into_py(py, async move {
367            if let Err(e) = client.subscribe_instruments(instrument_type).await {
368                log::error!("Failed to subscribe to instruments '{instrument_type}': {e}");
369            }
370            Ok(())
371        })
372    }
373
374    #[pyo3(name = "subscribe_instrument")]
375    fn py_subscribe_instrument<'py>(
376        &self,
377        py: Python<'py>,
378        instrument_id: InstrumentId,
379    ) -> PyResult<Bound<'py, PyAny>> {
380        let client = self.clone();
381
382        pyo3_async_runtimes::tokio::future_into_py(py, async move {
383            if let Err(e) = client.subscribe_instrument(instrument_id).await {
384                log::error!("Failed to subscribe to instrument {instrument_id}: {e}");
385            }
386            Ok(())
387        })
388    }
389
390    #[pyo3(name = "subscribe_book")]
391    fn py_subscribe_book<'py>(
392        &self,
393        py: Python<'py>,
394        instrument_id: InstrumentId,
395    ) -> PyResult<Bound<'py, PyAny>> {
396        let client = self.clone();
397
398        pyo3_async_runtimes::tokio::future_into_py(py, async move {
399            client
400                .subscribe_book(instrument_id)
401                .await
402                .map_err(to_pyvalue_err)
403        })
404    }
405
406    #[pyo3(name = "subscribe_book50_l2_tbt")]
407    fn py_subscribe_book50_l2_tbt<'py>(
408        &self,
409        py: Python<'py>,
410        instrument_id: InstrumentId,
411    ) -> PyResult<Bound<'py, PyAny>> {
412        let client = self.clone();
413
414        pyo3_async_runtimes::tokio::future_into_py(py, async move {
415            if let Err(e) = client.subscribe_book50_l2_tbt(instrument_id).await {
416                log::error!("Failed to subscribe to book50_tbt: {e}");
417            }
418            Ok(())
419        })
420    }
421
422    #[pyo3(name = "subscribe_book_l2_tbt")]
423    fn py_subscribe_book_l2_tbt<'py>(
424        &self,
425        py: Python<'py>,
426        instrument_id: InstrumentId,
427    ) -> PyResult<Bound<'py, PyAny>> {
428        let client = self.clone();
429
430        pyo3_async_runtimes::tokio::future_into_py(py, async move {
431            if let Err(e) = client.subscribe_book_l2_tbt(instrument_id).await {
432                log::error!("Failed to subscribe to books_l2_tbt: {e}");
433            }
434            Ok(())
435        })
436    }
437
438    #[pyo3(name = "subscribe_book_with_depth")]
439    fn py_subscribe_book_with_depth<'py>(
440        &self,
441        py: Python<'py>,
442        instrument_id: InstrumentId,
443        depth: u16,
444    ) -> PyResult<Bound<'py, PyAny>> {
445        let client = self.clone();
446
447        pyo3_async_runtimes::tokio::future_into_py(py, async move {
448            client
449                .subscribe_book_with_depth(instrument_id, depth)
450                .await
451                .map_err(to_pyvalue_err)
452        })
453    }
454
455    #[pyo3(name = "subscribe_book_depth5")]
456    fn py_subscribe_book_depth5<'py>(
457        &self,
458        py: Python<'py>,
459        instrument_id: InstrumentId,
460    ) -> PyResult<Bound<'py, PyAny>> {
461        let client = self.clone();
462
463        pyo3_async_runtimes::tokio::future_into_py(py, async move {
464            if let Err(e) = client.subscribe_book_depth5(instrument_id).await {
465                log::error!("Failed to subscribe to books5: {e}");
466            }
467            Ok(())
468        })
469    }
470
471    #[pyo3(name = "subscribe_quotes")]
472    fn py_subscribe_quotes<'py>(
473        &self,
474        py: Python<'py>,
475        instrument_id: InstrumentId,
476    ) -> PyResult<Bound<'py, PyAny>> {
477        let client = self.clone();
478
479        pyo3_async_runtimes::tokio::future_into_py(py, async move {
480            if let Err(e) = client.subscribe_quotes(instrument_id).await {
481                log::error!("Failed to subscribe to quotes: {e}");
482            }
483            Ok(())
484        })
485    }
486
487    #[pyo3(name = "subscribe_trades")]
488    fn py_subscribe_trades<'py>(
489        &self,
490        py: Python<'py>,
491        instrument_id: InstrumentId,
492        aggregated: bool,
493    ) -> PyResult<Bound<'py, PyAny>> {
494        let client = self.clone();
495
496        pyo3_async_runtimes::tokio::future_into_py(py, async move {
497            if let Err(e) = client.subscribe_trades(instrument_id, aggregated).await {
498                log::error!("Failed to subscribe to trades: {e}");
499            }
500            Ok(())
501        })
502    }
503
504    #[pyo3(name = "subscribe_bars")]
505    fn py_subscribe_bars<'py>(
506        &self,
507        py: Python<'py>,
508        bar_type: BarType,
509    ) -> PyResult<Bound<'py, PyAny>> {
510        let client = self.clone();
511
512        pyo3_async_runtimes::tokio::future_into_py(py, async move {
513            if let Err(e) = client.subscribe_bars(bar_type).await {
514                log::error!("Failed to subscribe to bars: {e}");
515            }
516            Ok(())
517        })
518    }
519
520    #[pyo3(name = "unsubscribe_book")]
521    fn py_unsubscribe_book<'py>(
522        &self,
523        py: Python<'py>,
524        instrument_id: InstrumentId,
525    ) -> PyResult<Bound<'py, PyAny>> {
526        let client = self.clone();
527
528        pyo3_async_runtimes::tokio::future_into_py(py, async move {
529            if let Err(e) = client.unsubscribe_book(instrument_id).await {
530                log::error!("Failed to unsubscribe from order book: {e}");
531            }
532            Ok(())
533        })
534    }
535
536    #[pyo3(name = "unsubscribe_book_depth5")]
537    fn py_unsubscribe_book_depth5<'py>(
538        &self,
539        py: Python<'py>,
540        instrument_id: InstrumentId,
541    ) -> PyResult<Bound<'py, PyAny>> {
542        let client = self.clone();
543
544        pyo3_async_runtimes::tokio::future_into_py(py, async move {
545            if let Err(e) = client.unsubscribe_book_depth5(instrument_id).await {
546                log::error!("Failed to unsubscribe from books5: {e}");
547            }
548            Ok(())
549        })
550    }
551
552    #[pyo3(name = "unsubscribe_book50_l2_tbt")]
553    fn py_unsubscribe_book50_l2_tbt<'py>(
554        &self,
555        py: Python<'py>,
556        instrument_id: InstrumentId,
557    ) -> PyResult<Bound<'py, PyAny>> {
558        let client = self.clone();
559
560        pyo3_async_runtimes::tokio::future_into_py(py, async move {
561            if let Err(e) = client.unsubscribe_book50_l2_tbt(instrument_id).await {
562                log::error!("Failed to unsubscribe from books50_l2_tbt: {e}");
563            }
564            Ok(())
565        })
566    }
567
568    #[pyo3(name = "unsubscribe_book_l2_tbt")]
569    fn py_unsubscribe_book_l2_tbt<'py>(
570        &self,
571        py: Python<'py>,
572        instrument_id: InstrumentId,
573    ) -> PyResult<Bound<'py, PyAny>> {
574        let client = self.clone();
575
576        pyo3_async_runtimes::tokio::future_into_py(py, async move {
577            if let Err(e) = client.unsubscribe_book_l2_tbt(instrument_id).await {
578                log::error!("Failed to unsubscribe from books_l2_tbt: {e}");
579            }
580            Ok(())
581        })
582    }
583
584    #[pyo3(name = "unsubscribe_quotes")]
585    fn py_unsubscribe_quotes<'py>(
586        &self,
587        py: Python<'py>,
588        instrument_id: InstrumentId,
589    ) -> PyResult<Bound<'py, PyAny>> {
590        let client = self.clone();
591
592        pyo3_async_runtimes::tokio::future_into_py(py, async move {
593            if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
594                log::error!("Failed to unsubscribe from quotes: {e}");
595            }
596            Ok(())
597        })
598    }
599
600    #[pyo3(name = "unsubscribe_trades")]
601    fn py_unsubscribe_trades<'py>(
602        &self,
603        py: Python<'py>,
604        instrument_id: InstrumentId,
605        aggregated: bool,
606    ) -> PyResult<Bound<'py, PyAny>> {
607        let client = self.clone();
608
609        pyo3_async_runtimes::tokio::future_into_py(py, async move {
610            if let Err(e) = client.unsubscribe_trades(instrument_id, aggregated).await {
611                log::error!("Failed to unsubscribe from trades: {e}");
612            }
613            Ok(())
614        })
615    }
616
617    #[pyo3(name = "unsubscribe_bars")]
618    fn py_unsubscribe_bars<'py>(
619        &self,
620        py: Python<'py>,
621        bar_type: BarType,
622    ) -> PyResult<Bound<'py, PyAny>> {
623        let client = self.clone();
624
625        pyo3_async_runtimes::tokio::future_into_py(py, async move {
626            if let Err(e) = client.unsubscribe_bars(bar_type).await {
627                log::error!("Failed to unsubscribe from bars: {e}");
628            }
629            Ok(())
630        })
631    }
632
633    #[pyo3(name = "subscribe_ticker")]
634    fn py_subscribe_ticker<'py>(
635        &self,
636        py: Python<'py>,
637        instrument_id: InstrumentId,
638    ) -> PyResult<Bound<'py, PyAny>> {
639        let client = self.clone();
640
641        pyo3_async_runtimes::tokio::future_into_py(py, async move {
642            if let Err(e) = client.subscribe_ticker(instrument_id).await {
643                log::error!("Failed to subscribe to ticker: {e}");
644            }
645            Ok(())
646        })
647    }
648
649    #[pyo3(name = "unsubscribe_ticker")]
650    fn py_unsubscribe_ticker<'py>(
651        &self,
652        py: Python<'py>,
653        instrument_id: InstrumentId,
654    ) -> PyResult<Bound<'py, PyAny>> {
655        let client = self.clone();
656
657        pyo3_async_runtimes::tokio::future_into_py(py, async move {
658            if let Err(e) = client.unsubscribe_ticker(instrument_id).await {
659                log::error!("Failed to unsubscribe from ticker: {e}");
660            }
661            Ok(())
662        })
663    }
664
665    #[pyo3(name = "subscribe_mark_prices")]
666    fn py_subscribe_mark_prices<'py>(
667        &self,
668        py: Python<'py>,
669        instrument_id: InstrumentId,
670    ) -> PyResult<Bound<'py, PyAny>> {
671        let client = self.clone();
672
673        pyo3_async_runtimes::tokio::future_into_py(py, async move {
674            if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
675                log::error!("Failed to subscribe to mark prices: {e}");
676            }
677            Ok(())
678        })
679    }
680
681    #[pyo3(name = "unsubscribe_mark_prices")]
682    fn py_unsubscribe_mark_prices<'py>(
683        &self,
684        py: Python<'py>,
685        instrument_id: InstrumentId,
686    ) -> PyResult<Bound<'py, PyAny>> {
687        let client = self.clone();
688
689        pyo3_async_runtimes::tokio::future_into_py(py, async move {
690            if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
691                log::error!("Failed to unsubscribe from mark prices: {e}");
692            }
693            Ok(())
694        })
695    }
696
697    #[pyo3(name = "subscribe_index_prices")]
698    fn py_subscribe_index_prices<'py>(
699        &self,
700        py: Python<'py>,
701        instrument_id: InstrumentId,
702    ) -> PyResult<Bound<'py, PyAny>> {
703        let client = self.clone();
704
705        pyo3_async_runtimes::tokio::future_into_py(py, async move {
706            if let Err(e) = client.subscribe_index_prices(instrument_id).await {
707                log::error!("Failed to subscribe to index prices: {e}");
708            }
709            Ok(())
710        })
711    }
712
713    #[pyo3(name = "unsubscribe_index_prices")]
714    fn py_unsubscribe_index_prices<'py>(
715        &self,
716        py: Python<'py>,
717        instrument_id: InstrumentId,
718    ) -> PyResult<Bound<'py, PyAny>> {
719        let client = self.clone();
720
721        pyo3_async_runtimes::tokio::future_into_py(py, async move {
722            if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
723                log::error!("Failed to unsubscribe from index prices: {e}");
724            }
725            Ok(())
726        })
727    }
728
729    #[pyo3(name = "subscribe_funding_rates")]
730    fn py_subscribe_funding_rates<'py>(
731        &self,
732        py: Python<'py>,
733        instrument_id: InstrumentId,
734    ) -> PyResult<Bound<'py, PyAny>> {
735        let client = self.clone();
736
737        pyo3_async_runtimes::tokio::future_into_py(py, async move {
738            if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
739                log::error!("Failed to subscribe to funding rates: {e}");
740            }
741            Ok(())
742        })
743    }
744
745    #[pyo3(name = "unsubscribe_funding_rates")]
746    fn py_unsubscribe_funding_rates<'py>(
747        &self,
748        py: Python<'py>,
749        instrument_id: InstrumentId,
750    ) -> PyResult<Bound<'py, PyAny>> {
751        let client = self.clone();
752
753        pyo3_async_runtimes::tokio::future_into_py(py, async move {
754            if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
755                log::error!("Failed to unsubscribe from funding rates: {e}");
756            }
757            Ok(())
758        })
759    }
760
761    #[pyo3(name = "subscribe_orders")]
762    fn py_subscribe_orders<'py>(
763        &self,
764        py: Python<'py>,
765        instrument_type: OKXInstrumentType,
766    ) -> PyResult<Bound<'py, PyAny>> {
767        let client = self.clone();
768
769        pyo3_async_runtimes::tokio::future_into_py(py, async move {
770            if let Err(e) = client.subscribe_orders(instrument_type).await {
771                log::error!("Failed to subscribe to orders '{instrument_type}': {e}");
772            }
773            Ok(())
774        })
775    }
776
777    #[pyo3(name = "unsubscribe_orders")]
778    fn py_unsubscribe_orders<'py>(
779        &self,
780        py: Python<'py>,
781        instrument_type: OKXInstrumentType,
782    ) -> PyResult<Bound<'py, PyAny>> {
783        let client = self.clone();
784
785        pyo3_async_runtimes::tokio::future_into_py(py, async move {
786            if let Err(e) = client.unsubscribe_orders(instrument_type).await {
787                log::error!("Failed to unsubscribe from orders '{instrument_type}': {e}");
788            }
789            Ok(())
790        })
791    }
792
793    #[pyo3(name = "subscribe_orders_algo")]
794    fn py_subscribe_orders_algo<'py>(
795        &self,
796        py: Python<'py>,
797        instrument_type: OKXInstrumentType,
798    ) -> PyResult<Bound<'py, PyAny>> {
799        let client = self.clone();
800
801        pyo3_async_runtimes::tokio::future_into_py(py, async move {
802            if let Err(e) = client.subscribe_orders_algo(instrument_type).await {
803                log::error!("Failed to subscribe to algo orders '{instrument_type}': {e}");
804            }
805            Ok(())
806        })
807    }
808
809    #[pyo3(name = "unsubscribe_orders_algo")]
810    fn py_unsubscribe_orders_algo<'py>(
811        &self,
812        py: Python<'py>,
813        instrument_type: OKXInstrumentType,
814    ) -> PyResult<Bound<'py, PyAny>> {
815        let client = self.clone();
816
817        pyo3_async_runtimes::tokio::future_into_py(py, async move {
818            if let Err(e) = client.unsubscribe_orders_algo(instrument_type).await {
819                log::error!("Failed to unsubscribe from algo orders '{instrument_type}': {e}");
820            }
821            Ok(())
822        })
823    }
824
825    #[pyo3(name = "subscribe_fills")]
826    fn py_subscribe_fills<'py>(
827        &self,
828        py: Python<'py>,
829        instrument_type: OKXInstrumentType,
830    ) -> PyResult<Bound<'py, PyAny>> {
831        let client = self.clone();
832
833        pyo3_async_runtimes::tokio::future_into_py(py, async move {
834            if let Err(e) = client.subscribe_fills(instrument_type).await {
835                log::error!("Failed to subscribe to fills '{instrument_type}': {e}");
836            }
837            Ok(())
838        })
839    }
840
841    #[pyo3(name = "unsubscribe_fills")]
842    fn py_unsubscribe_fills<'py>(
843        &self,
844        py: Python<'py>,
845        instrument_type: OKXInstrumentType,
846    ) -> PyResult<Bound<'py, PyAny>> {
847        let client = self.clone();
848
849        pyo3_async_runtimes::tokio::future_into_py(py, async move {
850            if let Err(e) = client.unsubscribe_fills(instrument_type).await {
851                log::error!("Failed to unsubscribe from fills '{instrument_type}': {e}");
852            }
853            Ok(())
854        })
855    }
856
857    #[pyo3(name = "subscribe_account")]
858    fn py_subscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
859        let client = self.clone();
860
861        pyo3_async_runtimes::tokio::future_into_py(py, async move {
862            if let Err(e) = client.subscribe_account().await {
863                log::error!("Failed to subscribe to account: {e}");
864            }
865            Ok(())
866        })
867    }
868
869    #[pyo3(name = "unsubscribe_account")]
870    fn py_unsubscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
871        let client = self.clone();
872
873        pyo3_async_runtimes::tokio::future_into_py(py, async move {
874            if let Err(e) = client.unsubscribe_account().await {
875                log::error!("Failed to unsubscribe from account: {e}");
876            }
877            Ok(())
878        })
879    }
880
881    #[pyo3(name = "submit_order")]
882    #[pyo3(signature = (
883        trader_id,
884        strategy_id,
885        instrument_id,
886        td_mode,
887        client_order_id,
888        order_side,
889        order_type,
890        quantity,
891        time_in_force=None,
892        price=None,
893        trigger_price=None,
894        post_only=None,
895        reduce_only=None,
896        quote_quantity=None,
897        position_side=None,
898    ))]
899    #[allow(clippy::too_many_arguments)]
900    fn py_submit_order<'py>(
901        &self,
902        py: Python<'py>,
903        trader_id: TraderId,
904        strategy_id: StrategyId,
905        instrument_id: InstrumentId,
906        td_mode: OKXTradeMode,
907        client_order_id: ClientOrderId,
908        order_side: OrderSide,
909        order_type: OrderType,
910        quantity: Quantity,
911        time_in_force: Option<TimeInForce>,
912        price: Option<Price>,
913        trigger_price: Option<Price>,
914        post_only: Option<bool>,
915        reduce_only: Option<bool>,
916        quote_quantity: Option<bool>,
917        position_side: Option<PositionSide>,
918    ) -> PyResult<Bound<'py, PyAny>> {
919        let client = self.clone();
920
921        pyo3_async_runtimes::tokio::future_into_py(py, async move {
922            client
923                .submit_order(
924                    trader_id,
925                    strategy_id,
926                    instrument_id,
927                    td_mode,
928                    client_order_id,
929                    order_side,
930                    order_type,
931                    quantity,
932                    time_in_force,
933                    price,
934                    trigger_price,
935                    post_only,
936                    reduce_only,
937                    quote_quantity,
938                    position_side,
939                )
940                .await
941                .map_err(to_pyvalue_err)
942        })
943    }
944
945    #[pyo3(name = "cancel_order", signature = (
946        trader_id,
947        strategy_id,
948        instrument_id,
949        client_order_id=None,
950        venue_order_id=None,
951    ))]
952    #[allow(clippy::too_many_arguments)]
953    fn py_cancel_order<'py>(
954        &self,
955        py: Python<'py>,
956        trader_id: TraderId,
957        strategy_id: StrategyId,
958        instrument_id: InstrumentId,
959        client_order_id: Option<ClientOrderId>,
960        venue_order_id: Option<VenueOrderId>,
961    ) -> PyResult<Bound<'py, PyAny>> {
962        let client = self.clone();
963
964        pyo3_async_runtimes::tokio::future_into_py(py, async move {
965            client
966                .cancel_order(
967                    trader_id,
968                    strategy_id,
969                    instrument_id,
970                    client_order_id,
971                    venue_order_id,
972                )
973                .await
974                .map_err(to_pyvalue_err)
975        })
976    }
977
978    #[pyo3(name = "modify_order")]
979    #[pyo3(signature = (
980        trader_id,
981        strategy_id,
982        instrument_id,
983        client_order_id=None,
984        venue_order_id=None,
985        price=None,
986        quantity=None,
987    ))]
988    #[allow(clippy::too_many_arguments)]
989    fn py_modify_order<'py>(
990        &self,
991        py: Python<'py>,
992        trader_id: TraderId,
993        strategy_id: StrategyId,
994        instrument_id: InstrumentId,
995        client_order_id: Option<ClientOrderId>,
996        venue_order_id: Option<VenueOrderId>,
997        price: Option<Price>,
998        quantity: Option<Quantity>,
999    ) -> PyResult<Bound<'py, PyAny>> {
1000        let client = self.clone();
1001
1002        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1003            client
1004                .modify_order(
1005                    trader_id,
1006                    strategy_id,
1007                    instrument_id,
1008                    client_order_id,
1009                    price,
1010                    quantity,
1011                    venue_order_id,
1012                )
1013                .await
1014                .map_err(to_pyvalue_err)
1015        })
1016    }
1017
1018    #[allow(clippy::type_complexity)]
1019    #[pyo3(name = "batch_submit_orders")]
1020    fn py_batch_submit_orders<'py>(
1021        &self,
1022        py: Python<'py>,
1023        orders: Vec<Py<PyAny>>,
1024    ) -> PyResult<Bound<'py, PyAny>> {
1025        let mut domain_orders = Vec::with_capacity(orders.len());
1026
1027        for obj in orders {
1028            let (
1029                instrument_type,
1030                instrument_id,
1031                td_mode,
1032                client_order_id,
1033                order_side,
1034                order_type,
1035                quantity,
1036                position_side,
1037                price,
1038                trigger_price,
1039                post_only,
1040                reduce_only,
1041            ): (
1042                OKXInstrumentType,
1043                InstrumentId,
1044                OKXTradeMode,
1045                ClientOrderId,
1046                OrderSide,
1047                OrderType,
1048                Quantity,
1049                Option<PositionSide>,
1050                Option<Price>,
1051                Option<Price>,
1052                Option<bool>,
1053                Option<bool>,
1054            ) = obj
1055                .extract(py)
1056                .map_err(|e: PyErr| PyRuntimeError::new_err(e.to_string()))?;
1057
1058            domain_orders.push((
1059                instrument_type,
1060                instrument_id,
1061                td_mode,
1062                client_order_id,
1063                order_side,
1064                position_side,
1065                order_type,
1066                quantity,
1067                price,
1068                trigger_price,
1069                post_only,
1070                reduce_only,
1071            ));
1072        }
1073
1074        let client = self.clone();
1075
1076        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1077            client
1078                .batch_submit_orders(domain_orders)
1079                .await
1080                .map_err(to_pyvalue_err)
1081        })
1082    }
1083
1084    /// Cancels multiple orders via WebSocket.
1085    #[pyo3(name = "batch_cancel_orders")]
1086    fn py_batch_cancel_orders<'py>(
1087        &self,
1088        py: Python<'py>,
1089        cancels: Vec<Py<PyAny>>,
1090    ) -> PyResult<Bound<'py, PyAny>> {
1091        let mut batched_cancels = Vec::with_capacity(cancels.len());
1092
1093        for obj in cancels {
1094            let (instrument_id, client_order_id, order_id): (
1095                InstrumentId,
1096                Option<ClientOrderId>,
1097                Option<VenueOrderId>,
1098            ) = obj
1099                .extract(py)
1100                .map_err(|e: PyErr| PyRuntimeError::new_err(e.to_string()))?;
1101            batched_cancels.push((instrument_id, client_order_id, order_id));
1102        }
1103
1104        let client = self.clone();
1105
1106        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1107            client
1108                .batch_cancel_orders(batched_cancels)
1109                .await
1110                .map_err(to_pyvalue_err)
1111        })
1112    }
1113
1114    #[pyo3(name = "batch_modify_orders")]
1115    fn py_batch_modify_orders<'py>(
1116        &self,
1117        py: Python<'py>,
1118        orders: Vec<Py<PyAny>>,
1119    ) -> PyResult<Bound<'py, PyAny>> {
1120        let mut domain_orders = Vec::with_capacity(orders.len());
1121
1122        for obj in orders {
1123            let (
1124                instrument_type,
1125                instrument_id,
1126                client_order_id,
1127                new_client_order_id,
1128                price,
1129                quantity,
1130            ): (
1131                String,
1132                InstrumentId,
1133                ClientOrderId,
1134                ClientOrderId,
1135                Option<Price>,
1136                Option<Quantity>,
1137            ) = obj
1138                .extract(py)
1139                .map_err(|e: PyErr| PyRuntimeError::new_err(e.to_string()))?;
1140            let inst_type =
1141                OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1142            domain_orders.push((
1143                inst_type,
1144                instrument_id,
1145                client_order_id,
1146                new_client_order_id,
1147                price,
1148                quantity,
1149            ));
1150        }
1151
1152        let client = self.clone();
1153
1154        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1155            client
1156                .batch_modify_orders(domain_orders)
1157                .await
1158                .map_err(to_pyvalue_err)
1159        })
1160    }
1161
1162    #[pyo3(name = "mass_cancel_orders")]
1163    fn py_mass_cancel_orders<'py>(
1164        &self,
1165        py: Python<'py>,
1166        instrument_id: InstrumentId,
1167    ) -> PyResult<Bound<'py, PyAny>> {
1168        let client = self.clone();
1169
1170        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1171            client
1172                .mass_cancel_orders(instrument_id)
1173                .await
1174                .map_err(to_pyvalue_err)
1175        })
1176    }
1177
1178    #[pyo3(name = "cache_instruments")]
1179    fn py_cache_instruments(&self, py: Python<'_>, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
1180        let instruments: Result<Vec<_>, _> = instruments
1181            .into_iter()
1182            .map(|inst| pyobject_to_instrument_any(py, inst))
1183            .collect();
1184        self.cache_instruments(instruments?);
1185        Ok(())
1186    }
1187
1188    #[pyo3(name = "cache_instrument")]
1189    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
1190        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
1191        Ok(())
1192    }
1193}
1194
1195fn call_python_with_data<F>(callback: &Py<PyAny>, data_converter: F)
1196where
1197    F: FnOnce(Python) -> PyResult<Py<PyAny>>,
1198{
1199    Python::attach(|py| match data_converter(py) {
1200        Ok(py_obj) => call_python(py, callback, py_obj),
1201        Err(e) => log::error!("Failed to convert data to Python object: {e}"),
1202    });
1203}