nautilus_okx/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the OKX WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - RwLock is preferred over Mutex (many reads, few writes).
43
44use std::str::FromStr;
45
46use futures_util::StreamExt;
47use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
48use nautilus_model::{
49    data::{BarType, Data, OrderBookDeltas_API},
50    enums::{OrderSide, OrderType, PositionSide, TimeInForce},
51    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
52    python::{
53        data::data_to_pycapsule,
54        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
55    },
56    types::{Price, Quantity},
57};
58use pyo3::{IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
59
60use crate::{
61    common::enums::{OKXInstrumentType, OKXTradeMode, OKXVipLevel},
62    websocket::{
63        OKXWebSocketClient,
64        messages::{ExecutionReport, NautilusWsMessage, OKXWebSocketError},
65    },
66};
67
68#[pyo3::pymethods]
69impl OKXWebSocketError {
70    #[getter]
71    pub fn code(&self) -> &str {
72        &self.code
73    }
74
75    #[getter]
76    pub fn message(&self) -> &str {
77        &self.message
78    }
79
80    #[getter]
81    pub fn conn_id(&self) -> Option<&str> {
82        self.conn_id.as_deref()
83    }
84
85    #[getter]
86    pub fn ts_event(&self) -> u64 {
87        self.timestamp
88    }
89
90    fn __repr__(&self) -> String {
91        format!(
92            "OKXWebSocketError(code='{}', message='{}', conn_id={:?}, ts_event={})",
93            self.code, self.message, self.conn_id, self.timestamp
94        )
95    }
96}
97
98#[pymethods]
99impl OKXWebSocketClient {
100    #[new]
101    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
102    fn py_new(
103        url: Option<String>,
104        api_key: Option<String>,
105        api_secret: Option<String>,
106        api_passphrase: Option<String>,
107        account_id: Option<AccountId>,
108        heartbeat: Option<u64>,
109    ) -> PyResult<Self> {
110        Self::new(
111            url,
112            api_key,
113            api_secret,
114            api_passphrase,
115            account_id,
116            heartbeat,
117        )
118        .map_err(to_pyvalue_err)
119    }
120
121    #[staticmethod]
122    #[pyo3(name = "with_credentials")]
123    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
124    fn py_with_credentials(
125        url: Option<String>,
126        api_key: Option<String>,
127        api_secret: Option<String>,
128        api_passphrase: Option<String>,
129        account_id: Option<AccountId>,
130        heartbeat: Option<u64>,
131    ) -> PyResult<Self> {
132        Self::with_credentials(
133            url,
134            api_key,
135            api_secret,
136            api_passphrase,
137            account_id,
138            heartbeat,
139        )
140        .map_err(to_pyvalue_err)
141    }
142
143    #[staticmethod]
144    #[pyo3(name = "from_env")]
145    fn py_from_env() -> PyResult<Self> {
146        Self::from_env().map_err(to_pyvalue_err)
147    }
148
149    #[getter]
150    #[pyo3(name = "url")]
151    #[must_use]
152    pub fn py_url(&self) -> &str {
153        self.url()
154    }
155
156    #[getter]
157    #[pyo3(name = "api_key")]
158    #[must_use]
159    pub fn py_api_key(&self) -> Option<&str> {
160        self.api_key()
161    }
162
163    #[getter]
164    #[pyo3(name = "api_key_masked")]
165    #[must_use]
166    pub fn py_api_key_masked(&self) -> Option<String> {
167        self.api_key_masked()
168    }
169
170    #[pyo3(name = "is_active")]
171    fn py_is_active(&mut self) -> bool {
172        self.is_active()
173    }
174
175    #[pyo3(name = "is_closed")]
176    fn py_is_closed(&mut self) -> bool {
177        self.is_closed()
178    }
179
180    #[pyo3(name = "cancel_all_requests")]
181    pub fn py_cancel_all_requests(&self) {
182        self.cancel_all_requests();
183    }
184
185    #[pyo3(name = "get_subscriptions")]
186    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
187        let channels = self.get_subscriptions(instrument_id);
188
189        // Convert to OKX channel names
190        channels
191            .iter()
192            .map(|c| {
193                serde_json::to_value(c)
194                    .ok()
195                    .and_then(|v| v.as_str().map(String::from))
196                    .unwrap_or_else(|| c.to_string())
197            })
198            .collect()
199    }
200
201    /// Sets the VIP level for this client.
202    ///
203    /// The VIP level determines which WebSocket channels are available.
204    #[pyo3(name = "set_vip_level")]
205    fn py_set_vip_level(&self, vip_level: OKXVipLevel) {
206        self.set_vip_level(vip_level);
207    }
208
209    /// Gets the current VIP level.
210    #[pyo3(name = "vip_level")]
211    #[getter]
212    fn py_vip_level(&self) -> OKXVipLevel {
213        self.vip_level()
214    }
215
216    #[pyo3(name = "connect")]
217    fn py_connect<'py>(
218        &mut self,
219        py: Python<'py>,
220        instruments: Vec<Py<PyAny>>,
221        callback: Py<PyAny>,
222    ) -> PyResult<Bound<'py, PyAny>> {
223        let mut instruments_any = Vec::new();
224        for inst in instruments {
225            let inst_any = pyobject_to_instrument_any(py, inst)?;
226            instruments_any.push(inst_any);
227        }
228
229        self.cache_instruments(instruments_any);
230
231        let mut client = self.clone();
232
233        pyo3_async_runtimes::tokio::future_into_py(py, async move {
234            client.connect().await.map_err(to_pyruntime_err)?;
235
236            let stream = client.stream();
237
238            // Keep client alive in the spawned task to prevent handler from dropping
239            tokio::spawn(async move {
240                let _client = client;
241                tokio::pin!(stream);
242
243                while let Some(msg) = stream.next().await {
244                    match msg {
245                        NautilusWsMessage::Instrument(msg) => {
246                            call_python_with_data(&callback, |py| {
247                                instrument_any_to_pyobject(py, *msg)
248                            });
249                        }
250                        NautilusWsMessage::Data(msg) => Python::attach(|py| {
251                            for data in msg {
252                                let py_obj = data_to_pycapsule(py, data);
253                                call_python(py, &callback, py_obj);
254                            }
255                        }),
256                        NautilusWsMessage::FundingRates(msg) => {
257                            for data in msg {
258                                call_python_with_data(&callback, |py| data.into_py_any(py));
259                            }
260                        }
261                        NautilusWsMessage::OrderAccepted(msg) => {
262                            call_python_with_data(&callback, |py| msg.into_py_any(py));
263                        }
264                        NautilusWsMessage::OrderCanceled(msg) => {
265                            call_python_with_data(&callback, |py| msg.into_py_any(py));
266                        }
267                        NautilusWsMessage::OrderExpired(msg) => {
268                            call_python_with_data(&callback, |py| msg.into_py_any(py));
269                        }
270                        NautilusWsMessage::OrderRejected(msg) => {
271                            call_python_with_data(&callback, |py| msg.into_py_any(py));
272                        }
273                        NautilusWsMessage::OrderCancelRejected(msg) => {
274                            call_python_with_data(&callback, |py| msg.into_py_any(py));
275                        }
276                        NautilusWsMessage::OrderModifyRejected(msg) => {
277                            call_python_with_data(&callback, |py| msg.into_py_any(py));
278                        }
279                        NautilusWsMessage::OrderTriggered(msg) => {
280                            call_python_with_data(&callback, |py| msg.into_py_any(py));
281                        }
282                        NautilusWsMessage::OrderUpdated(msg) => {
283                            call_python_with_data(&callback, |py| msg.into_py_any(py));
284                        }
285                        NautilusWsMessage::ExecutionReports(msg) => {
286                            for report in msg {
287                                match report {
288                                    ExecutionReport::Order(report) => {
289                                        call_python_with_data(&callback, |py| {
290                                            report.into_py_any(py)
291                                        });
292                                    }
293                                    ExecutionReport::Fill(report) => {
294                                        call_python_with_data(&callback, |py| {
295                                            report.into_py_any(py)
296                                        });
297                                    }
298                                };
299                            }
300                        }
301                        NautilusWsMessage::Deltas(msg) => Python::attach(|py| {
302                            let py_obj =
303                                data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(msg)));
304                            call_python(py, &callback, py_obj);
305                        }),
306                        NautilusWsMessage::AccountUpdate(msg) => {
307                            call_python_with_data(&callback, |py| msg.into_py_any(py));
308                        }
309                        NautilusWsMessage::PositionUpdate(msg) => {
310                            call_python_with_data(&callback, |py| msg.into_py_any(py));
311                        }
312                        NautilusWsMessage::Reconnected => {} // Nothing to handle
313                        NautilusWsMessage::Authenticated => {} // Nothing to handle
314                        NautilusWsMessage::Error(msg) => {
315                            call_python_with_data(&callback, |py| msg.into_py_any(py));
316                        }
317                        NautilusWsMessage::Raw(msg) => {
318                            tracing::debug!("Received raw message, skipping: {msg}");
319                        }
320                    }
321                }
322            });
323
324            Ok(())
325        })
326    }
327
328    #[pyo3(name = "wait_until_active")]
329    fn py_wait_until_active<'py>(
330        &self,
331        py: Python<'py>,
332        timeout_secs: f64,
333    ) -> PyResult<Bound<'py, PyAny>> {
334        let client = self.clone();
335
336        pyo3_async_runtimes::tokio::future_into_py(py, async move {
337            client
338                .wait_until_active(timeout_secs)
339                .await
340                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
341            Ok(())
342        })
343    }
344
345    #[pyo3(name = "close")]
346    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
347        let mut client = self.clone();
348
349        pyo3_async_runtimes::tokio::future_into_py(py, async move {
350            if let Err(e) = client.close().await {
351                log::error!("Error on close: {e}");
352            }
353            Ok(())
354        })
355    }
356
357    #[pyo3(name = "subscribe_instruments")]
358    fn py_subscribe_instruments<'py>(
359        &self,
360        py: Python<'py>,
361        instrument_type: OKXInstrumentType,
362    ) -> PyResult<Bound<'py, PyAny>> {
363        let client = self.clone();
364
365        pyo3_async_runtimes::tokio::future_into_py(py, async move {
366            if let Err(e) = client.subscribe_instruments(instrument_type).await {
367                log::error!("Failed to subscribe to instruments '{instrument_type}': {e}");
368            }
369            Ok(())
370        })
371    }
372
373    #[pyo3(name = "subscribe_instrument")]
374    fn py_subscribe_instrument<'py>(
375        &self,
376        py: Python<'py>,
377        instrument_id: InstrumentId,
378    ) -> PyResult<Bound<'py, PyAny>> {
379        let client = self.clone();
380
381        pyo3_async_runtimes::tokio::future_into_py(py, async move {
382            if let Err(e) = client.subscribe_instrument(instrument_id).await {
383                log::error!("Failed to subscribe to instrument {instrument_id}: {e}");
384            }
385            Ok(())
386        })
387    }
388
389    #[pyo3(name = "subscribe_book")]
390    fn py_subscribe_book<'py>(
391        &self,
392        py: Python<'py>,
393        instrument_id: InstrumentId,
394    ) -> PyResult<Bound<'py, PyAny>> {
395        let client = self.clone();
396
397        pyo3_async_runtimes::tokio::future_into_py(py, async move {
398            client
399                .subscribe_book(instrument_id)
400                .await
401                .map_err(to_pyvalue_err)
402        })
403    }
404
405    #[pyo3(name = "subscribe_book50_l2_tbt")]
406    fn py_subscribe_book50_l2_tbt<'py>(
407        &self,
408        py: Python<'py>,
409        instrument_id: InstrumentId,
410    ) -> PyResult<Bound<'py, PyAny>> {
411        let client = self.clone();
412
413        pyo3_async_runtimes::tokio::future_into_py(py, async move {
414            if let Err(e) = client.subscribe_book50_l2_tbt(instrument_id).await {
415                log::error!("Failed to subscribe to book50_tbt: {e}");
416            }
417            Ok(())
418        })
419    }
420
421    #[pyo3(name = "subscribe_book_l2_tbt")]
422    fn py_subscribe_book_l2_tbt<'py>(
423        &self,
424        py: Python<'py>,
425        instrument_id: InstrumentId,
426    ) -> PyResult<Bound<'py, PyAny>> {
427        let client = self.clone();
428
429        pyo3_async_runtimes::tokio::future_into_py(py, async move {
430            if let Err(e) = client.subscribe_book_l2_tbt(instrument_id).await {
431                log::error!("Failed to subscribe to books_l2_tbt: {e}");
432            }
433            Ok(())
434        })
435    }
436
437    #[pyo3(name = "subscribe_book_with_depth")]
438    fn py_subscribe_book_with_depth<'py>(
439        &self,
440        py: Python<'py>,
441        instrument_id: InstrumentId,
442        depth: u16,
443    ) -> PyResult<Bound<'py, PyAny>> {
444        let client = self.clone();
445
446        pyo3_async_runtimes::tokio::future_into_py(py, async move {
447            client
448                .subscribe_book_with_depth(instrument_id, depth)
449                .await
450                .map_err(to_pyvalue_err)
451        })
452    }
453
454    #[pyo3(name = "subscribe_book_depth5")]
455    fn py_subscribe_book_depth5<'py>(
456        &self,
457        py: Python<'py>,
458        instrument_id: InstrumentId,
459    ) -> PyResult<Bound<'py, PyAny>> {
460        let client = self.clone();
461
462        pyo3_async_runtimes::tokio::future_into_py(py, async move {
463            if let Err(e) = client.subscribe_book_depth5(instrument_id).await {
464                log::error!("Failed to subscribe to books5: {e}");
465            }
466            Ok(())
467        })
468    }
469
470    #[pyo3(name = "subscribe_quotes")]
471    fn py_subscribe_quotes<'py>(
472        &self,
473        py: Python<'py>,
474        instrument_id: InstrumentId,
475    ) -> PyResult<Bound<'py, PyAny>> {
476        let client = self.clone();
477
478        pyo3_async_runtimes::tokio::future_into_py(py, async move {
479            if let Err(e) = client.subscribe_quotes(instrument_id).await {
480                log::error!("Failed to subscribe to quotes: {e}");
481            }
482            Ok(())
483        })
484    }
485
486    #[pyo3(name = "subscribe_trades")]
487    fn py_subscribe_trades<'py>(
488        &self,
489        py: Python<'py>,
490        instrument_id: InstrumentId,
491        aggregated: bool,
492    ) -> PyResult<Bound<'py, PyAny>> {
493        let client = self.clone();
494
495        pyo3_async_runtimes::tokio::future_into_py(py, async move {
496            if let Err(e) = client.subscribe_trades(instrument_id, aggregated).await {
497                log::error!("Failed to subscribe to trades: {e}");
498            }
499            Ok(())
500        })
501    }
502
503    #[pyo3(name = "subscribe_bars")]
504    fn py_subscribe_bars<'py>(
505        &self,
506        py: Python<'py>,
507        bar_type: BarType,
508    ) -> PyResult<Bound<'py, PyAny>> {
509        let client = self.clone();
510
511        pyo3_async_runtimes::tokio::future_into_py(py, async move {
512            if let Err(e) = client.subscribe_bars(bar_type).await {
513                log::error!("Failed to subscribe to bars: {e}");
514            }
515            Ok(())
516        })
517    }
518
519    #[pyo3(name = "unsubscribe_book")]
520    fn py_unsubscribe_book<'py>(
521        &self,
522        py: Python<'py>,
523        instrument_id: InstrumentId,
524    ) -> PyResult<Bound<'py, PyAny>> {
525        let client = self.clone();
526
527        pyo3_async_runtimes::tokio::future_into_py(py, async move {
528            if let Err(e) = client.unsubscribe_book(instrument_id).await {
529                log::error!("Failed to unsubscribe from order book: {e}");
530            }
531            Ok(())
532        })
533    }
534
535    #[pyo3(name = "unsubscribe_book_depth5")]
536    fn py_unsubscribe_book_depth5<'py>(
537        &self,
538        py: Python<'py>,
539        instrument_id: InstrumentId,
540    ) -> PyResult<Bound<'py, PyAny>> {
541        let client = self.clone();
542
543        pyo3_async_runtimes::tokio::future_into_py(py, async move {
544            if let Err(e) = client.unsubscribe_book_depth5(instrument_id).await {
545                log::error!("Failed to unsubscribe from books5: {e}");
546            }
547            Ok(())
548        })
549    }
550
551    #[pyo3(name = "unsubscribe_book50_l2_tbt")]
552    fn py_unsubscribe_book50_l2_tbt<'py>(
553        &self,
554        py: Python<'py>,
555        instrument_id: InstrumentId,
556    ) -> PyResult<Bound<'py, PyAny>> {
557        let client = self.clone();
558
559        pyo3_async_runtimes::tokio::future_into_py(py, async move {
560            if let Err(e) = client.unsubscribe_book50_l2_tbt(instrument_id).await {
561                log::error!("Failed to unsubscribe from books50_l2_tbt: {e}");
562            }
563            Ok(())
564        })
565    }
566
567    #[pyo3(name = "unsubscribe_book_l2_tbt")]
568    fn py_unsubscribe_book_l2_tbt<'py>(
569        &self,
570        py: Python<'py>,
571        instrument_id: InstrumentId,
572    ) -> PyResult<Bound<'py, PyAny>> {
573        let client = self.clone();
574
575        pyo3_async_runtimes::tokio::future_into_py(py, async move {
576            if let Err(e) = client.unsubscribe_book_l2_tbt(instrument_id).await {
577                log::error!("Failed to unsubscribe from books_l2_tbt: {e}");
578            }
579            Ok(())
580        })
581    }
582
583    #[pyo3(name = "unsubscribe_quotes")]
584    fn py_unsubscribe_quotes<'py>(
585        &self,
586        py: Python<'py>,
587        instrument_id: InstrumentId,
588    ) -> PyResult<Bound<'py, PyAny>> {
589        let client = self.clone();
590
591        pyo3_async_runtimes::tokio::future_into_py(py, async move {
592            if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
593                log::error!("Failed to unsubscribe from quotes: {e}");
594            }
595            Ok(())
596        })
597    }
598
599    #[pyo3(name = "unsubscribe_trades")]
600    fn py_unsubscribe_trades<'py>(
601        &self,
602        py: Python<'py>,
603        instrument_id: InstrumentId,
604        aggregated: bool,
605    ) -> PyResult<Bound<'py, PyAny>> {
606        let client = self.clone();
607
608        pyo3_async_runtimes::tokio::future_into_py(py, async move {
609            if let Err(e) = client.unsubscribe_trades(instrument_id, aggregated).await {
610                log::error!("Failed to unsubscribe from trades: {e}");
611            }
612            Ok(())
613        })
614    }
615
616    #[pyo3(name = "unsubscribe_bars")]
617    fn py_unsubscribe_bars<'py>(
618        &self,
619        py: Python<'py>,
620        bar_type: BarType,
621    ) -> PyResult<Bound<'py, PyAny>> {
622        let client = self.clone();
623
624        pyo3_async_runtimes::tokio::future_into_py(py, async move {
625            if let Err(e) = client.unsubscribe_bars(bar_type).await {
626                log::error!("Failed to unsubscribe from bars: {e}");
627            }
628            Ok(())
629        })
630    }
631
632    #[pyo3(name = "subscribe_ticker")]
633    fn py_subscribe_ticker<'py>(
634        &self,
635        py: Python<'py>,
636        instrument_id: InstrumentId,
637    ) -> PyResult<Bound<'py, PyAny>> {
638        let client = self.clone();
639
640        pyo3_async_runtimes::tokio::future_into_py(py, async move {
641            if let Err(e) = client.subscribe_ticker(instrument_id).await {
642                log::error!("Failed to subscribe to ticker: {e}");
643            }
644            Ok(())
645        })
646    }
647
648    #[pyo3(name = "unsubscribe_ticker")]
649    fn py_unsubscribe_ticker<'py>(
650        &self,
651        py: Python<'py>,
652        instrument_id: InstrumentId,
653    ) -> PyResult<Bound<'py, PyAny>> {
654        let client = self.clone();
655
656        pyo3_async_runtimes::tokio::future_into_py(py, async move {
657            if let Err(e) = client.unsubscribe_ticker(instrument_id).await {
658                log::error!("Failed to unsubscribe from ticker: {e}");
659            }
660            Ok(())
661        })
662    }
663
664    #[pyo3(name = "subscribe_mark_prices")]
665    fn py_subscribe_mark_prices<'py>(
666        &self,
667        py: Python<'py>,
668        instrument_id: InstrumentId,
669    ) -> PyResult<Bound<'py, PyAny>> {
670        let client = self.clone();
671
672        pyo3_async_runtimes::tokio::future_into_py(py, async move {
673            if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
674                log::error!("Failed to subscribe to mark prices: {e}");
675            }
676            Ok(())
677        })
678    }
679
680    #[pyo3(name = "unsubscribe_mark_prices")]
681    fn py_unsubscribe_mark_prices<'py>(
682        &self,
683        py: Python<'py>,
684        instrument_id: InstrumentId,
685    ) -> PyResult<Bound<'py, PyAny>> {
686        let client = self.clone();
687
688        pyo3_async_runtimes::tokio::future_into_py(py, async move {
689            if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
690                log::error!("Failed to unsubscribe from mark prices: {e}");
691            }
692            Ok(())
693        })
694    }
695
696    #[pyo3(name = "subscribe_index_prices")]
697    fn py_subscribe_index_prices<'py>(
698        &self,
699        py: Python<'py>,
700        instrument_id: InstrumentId,
701    ) -> PyResult<Bound<'py, PyAny>> {
702        let client = self.clone();
703
704        pyo3_async_runtimes::tokio::future_into_py(py, async move {
705            if let Err(e) = client.subscribe_index_prices(instrument_id).await {
706                log::error!("Failed to subscribe to index prices: {e}");
707            }
708            Ok(())
709        })
710    }
711
712    #[pyo3(name = "unsubscribe_index_prices")]
713    fn py_unsubscribe_index_prices<'py>(
714        &self,
715        py: Python<'py>,
716        instrument_id: InstrumentId,
717    ) -> PyResult<Bound<'py, PyAny>> {
718        let client = self.clone();
719
720        pyo3_async_runtimes::tokio::future_into_py(py, async move {
721            if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
722                log::error!("Failed to unsubscribe from index prices: {e}");
723            }
724            Ok(())
725        })
726    }
727
728    #[pyo3(name = "subscribe_funding_rates")]
729    fn py_subscribe_funding_rates<'py>(
730        &self,
731        py: Python<'py>,
732        instrument_id: InstrumentId,
733    ) -> PyResult<Bound<'py, PyAny>> {
734        let client = self.clone();
735
736        pyo3_async_runtimes::tokio::future_into_py(py, async move {
737            if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
738                log::error!("Failed to subscribe to funding rates: {e}");
739            }
740            Ok(())
741        })
742    }
743
744    #[pyo3(name = "unsubscribe_funding_rates")]
745    fn py_unsubscribe_funding_rates<'py>(
746        &self,
747        py: Python<'py>,
748        instrument_id: InstrumentId,
749    ) -> PyResult<Bound<'py, PyAny>> {
750        let client = self.clone();
751
752        pyo3_async_runtimes::tokio::future_into_py(py, async move {
753            if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
754                log::error!("Failed to unsubscribe from funding rates: {e}");
755            }
756            Ok(())
757        })
758    }
759
760    #[pyo3(name = "subscribe_orders")]
761    fn py_subscribe_orders<'py>(
762        &self,
763        py: Python<'py>,
764        instrument_type: OKXInstrumentType,
765    ) -> PyResult<Bound<'py, PyAny>> {
766        let client = self.clone();
767
768        pyo3_async_runtimes::tokio::future_into_py(py, async move {
769            if let Err(e) = client.subscribe_orders(instrument_type).await {
770                log::error!("Failed to subscribe to orders '{instrument_type}': {e}");
771            }
772            Ok(())
773        })
774    }
775
776    #[pyo3(name = "unsubscribe_orders")]
777    fn py_unsubscribe_orders<'py>(
778        &self,
779        py: Python<'py>,
780        instrument_type: OKXInstrumentType,
781    ) -> PyResult<Bound<'py, PyAny>> {
782        let client = self.clone();
783
784        pyo3_async_runtimes::tokio::future_into_py(py, async move {
785            if let Err(e) = client.unsubscribe_orders(instrument_type).await {
786                log::error!("Failed to unsubscribe from orders '{instrument_type}': {e}");
787            }
788            Ok(())
789        })
790    }
791
792    #[pyo3(name = "subscribe_orders_algo")]
793    fn py_subscribe_orders_algo<'py>(
794        &self,
795        py: Python<'py>,
796        instrument_type: OKXInstrumentType,
797    ) -> PyResult<Bound<'py, PyAny>> {
798        let client = self.clone();
799
800        pyo3_async_runtimes::tokio::future_into_py(py, async move {
801            if let Err(e) = client.subscribe_orders_algo(instrument_type).await {
802                log::error!("Failed to subscribe to algo orders '{instrument_type}': {e}");
803            }
804            Ok(())
805        })
806    }
807
808    #[pyo3(name = "unsubscribe_orders_algo")]
809    fn py_unsubscribe_orders_algo<'py>(
810        &self,
811        py: Python<'py>,
812        instrument_type: OKXInstrumentType,
813    ) -> PyResult<Bound<'py, PyAny>> {
814        let client = self.clone();
815
816        pyo3_async_runtimes::tokio::future_into_py(py, async move {
817            if let Err(e) = client.unsubscribe_orders_algo(instrument_type).await {
818                log::error!("Failed to unsubscribe from algo orders '{instrument_type}': {e}");
819            }
820            Ok(())
821        })
822    }
823
824    #[pyo3(name = "subscribe_fills")]
825    fn py_subscribe_fills<'py>(
826        &self,
827        py: Python<'py>,
828        instrument_type: OKXInstrumentType,
829    ) -> PyResult<Bound<'py, PyAny>> {
830        let client = self.clone();
831
832        pyo3_async_runtimes::tokio::future_into_py(py, async move {
833            if let Err(e) = client.subscribe_fills(instrument_type).await {
834                log::error!("Failed to subscribe to fills '{instrument_type}': {e}");
835            }
836            Ok(())
837        })
838    }
839
840    #[pyo3(name = "unsubscribe_fills")]
841    fn py_unsubscribe_fills<'py>(
842        &self,
843        py: Python<'py>,
844        instrument_type: OKXInstrumentType,
845    ) -> PyResult<Bound<'py, PyAny>> {
846        let client = self.clone();
847
848        pyo3_async_runtimes::tokio::future_into_py(py, async move {
849            if let Err(e) = client.unsubscribe_fills(instrument_type).await {
850                log::error!("Failed to unsubscribe from fills '{instrument_type}': {e}");
851            }
852            Ok(())
853        })
854    }
855
856    #[pyo3(name = "subscribe_account")]
857    fn py_subscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
858        let client = self.clone();
859
860        pyo3_async_runtimes::tokio::future_into_py(py, async move {
861            if let Err(e) = client.subscribe_account().await {
862                log::error!("Failed to subscribe to account: {e}");
863            }
864            Ok(())
865        })
866    }
867
868    #[pyo3(name = "unsubscribe_account")]
869    fn py_unsubscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
870        let client = self.clone();
871
872        pyo3_async_runtimes::tokio::future_into_py(py, async move {
873            if let Err(e) = client.unsubscribe_account().await {
874                log::error!("Failed to unsubscribe from account: {e}");
875            }
876            Ok(())
877        })
878    }
879
880    #[pyo3(name = "submit_order")]
881    #[pyo3(signature = (
882        trader_id,
883        strategy_id,
884        instrument_id,
885        td_mode,
886        client_order_id,
887        order_side,
888        order_type,
889        quantity,
890        time_in_force=None,
891        price=None,
892        trigger_price=None,
893        post_only=None,
894        reduce_only=None,
895        quote_quantity=None,
896        position_side=None,
897    ))]
898    #[allow(clippy::too_many_arguments)]
899    fn py_submit_order<'py>(
900        &self,
901        py: Python<'py>,
902        trader_id: TraderId,
903        strategy_id: StrategyId,
904        instrument_id: InstrumentId,
905        td_mode: OKXTradeMode,
906        client_order_id: ClientOrderId,
907        order_side: OrderSide,
908        order_type: OrderType,
909        quantity: Quantity,
910        time_in_force: Option<TimeInForce>,
911        price: Option<Price>,
912        trigger_price: Option<Price>,
913        post_only: Option<bool>,
914        reduce_only: Option<bool>,
915        quote_quantity: Option<bool>,
916        position_side: Option<PositionSide>,
917    ) -> PyResult<Bound<'py, PyAny>> {
918        let client = self.clone();
919
920        pyo3_async_runtimes::tokio::future_into_py(py, async move {
921            client
922                .submit_order(
923                    trader_id,
924                    strategy_id,
925                    instrument_id,
926                    td_mode,
927                    client_order_id,
928                    order_side,
929                    order_type,
930                    quantity,
931                    time_in_force,
932                    price,
933                    trigger_price,
934                    post_only,
935                    reduce_only,
936                    quote_quantity,
937                    position_side,
938                )
939                .await
940                .map_err(to_pyvalue_err)
941        })
942    }
943
944    #[pyo3(name = "cancel_order", signature = (
945        trader_id,
946        strategy_id,
947        instrument_id,
948        client_order_id=None,
949        venue_order_id=None,
950    ))]
951    #[allow(clippy::too_many_arguments)]
952    fn py_cancel_order<'py>(
953        &self,
954        py: Python<'py>,
955        trader_id: TraderId,
956        strategy_id: StrategyId,
957        instrument_id: InstrumentId,
958        client_order_id: Option<ClientOrderId>,
959        venue_order_id: Option<VenueOrderId>,
960    ) -> PyResult<Bound<'py, PyAny>> {
961        let client = self.clone();
962
963        pyo3_async_runtimes::tokio::future_into_py(py, async move {
964            client
965                .cancel_order(
966                    trader_id,
967                    strategy_id,
968                    instrument_id,
969                    client_order_id,
970                    venue_order_id,
971                )
972                .await
973                .map_err(to_pyvalue_err)
974        })
975    }
976
977    #[pyo3(name = "modify_order")]
978    #[pyo3(signature = (
979        trader_id,
980        strategy_id,
981        instrument_id,
982        client_order_id=None,
983        venue_order_id=None,
984        price=None,
985        quantity=None,
986    ))]
987    #[allow(clippy::too_many_arguments)]
988    fn py_modify_order<'py>(
989        &self,
990        py: Python<'py>,
991        trader_id: TraderId,
992        strategy_id: StrategyId,
993        instrument_id: InstrumentId,
994        client_order_id: Option<ClientOrderId>,
995        venue_order_id: Option<VenueOrderId>,
996        price: Option<Price>,
997        quantity: Option<Quantity>,
998    ) -> PyResult<Bound<'py, PyAny>> {
999        let client = self.clone();
1000
1001        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1002            client
1003                .modify_order(
1004                    trader_id,
1005                    strategy_id,
1006                    instrument_id,
1007                    client_order_id,
1008                    price,
1009                    quantity,
1010                    venue_order_id,
1011                )
1012                .await
1013                .map_err(to_pyvalue_err)
1014        })
1015    }
1016
1017    #[allow(clippy::type_complexity)]
1018    #[pyo3(name = "batch_submit_orders")]
1019    fn py_batch_submit_orders<'py>(
1020        &self,
1021        py: Python<'py>,
1022        orders: Vec<Py<PyAny>>,
1023    ) -> PyResult<Bound<'py, PyAny>> {
1024        let mut domain_orders = Vec::with_capacity(orders.len());
1025
1026        for obj in orders {
1027            let (
1028                instrument_type,
1029                instrument_id,
1030                td_mode,
1031                client_order_id,
1032                order_side,
1033                order_type,
1034                quantity,
1035                position_side,
1036                price,
1037                trigger_price,
1038                post_only,
1039                reduce_only,
1040            ): (
1041                OKXInstrumentType,
1042                InstrumentId,
1043                OKXTradeMode,
1044                ClientOrderId,
1045                OrderSide,
1046                OrderType,
1047                Quantity,
1048                Option<PositionSide>,
1049                Option<Price>,
1050                Option<Price>,
1051                Option<bool>,
1052                Option<bool>,
1053            ) = obj
1054                .extract(py)
1055                .map_err(|e: PyErr| PyRuntimeError::new_err(e.to_string()))?;
1056
1057            domain_orders.push((
1058                instrument_type,
1059                instrument_id,
1060                td_mode,
1061                client_order_id,
1062                order_side,
1063                position_side,
1064                order_type,
1065                quantity,
1066                price,
1067                trigger_price,
1068                post_only,
1069                reduce_only,
1070            ));
1071        }
1072
1073        let client = self.clone();
1074
1075        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1076            client
1077                .batch_submit_orders(domain_orders)
1078                .await
1079                .map_err(to_pyvalue_err)
1080        })
1081    }
1082
1083    /// Cancels multiple orders via WebSocket.
1084    #[pyo3(name = "batch_cancel_orders")]
1085    fn py_batch_cancel_orders<'py>(
1086        &self,
1087        py: Python<'py>,
1088        cancels: Vec<Py<PyAny>>,
1089    ) -> PyResult<Bound<'py, PyAny>> {
1090        let mut batched_cancels = Vec::with_capacity(cancels.len());
1091
1092        for obj in cancels {
1093            let (instrument_id, client_order_id, order_id): (
1094                InstrumentId,
1095                Option<ClientOrderId>,
1096                Option<VenueOrderId>,
1097            ) = obj
1098                .extract(py)
1099                .map_err(|e: PyErr| PyRuntimeError::new_err(e.to_string()))?;
1100            batched_cancels.push((instrument_id, client_order_id, order_id));
1101        }
1102
1103        let client = self.clone();
1104
1105        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1106            client
1107                .batch_cancel_orders(batched_cancels)
1108                .await
1109                .map_err(to_pyvalue_err)
1110        })
1111    }
1112
1113    #[pyo3(name = "batch_modify_orders")]
1114    fn py_batch_modify_orders<'py>(
1115        &self,
1116        py: Python<'py>,
1117        orders: Vec<Py<PyAny>>,
1118    ) -> PyResult<Bound<'py, PyAny>> {
1119        let mut domain_orders = Vec::with_capacity(orders.len());
1120
1121        for obj in orders {
1122            let (
1123                instrument_type,
1124                instrument_id,
1125                client_order_id,
1126                new_client_order_id,
1127                price,
1128                quantity,
1129            ): (
1130                String,
1131                InstrumentId,
1132                ClientOrderId,
1133                ClientOrderId,
1134                Option<Price>,
1135                Option<Quantity>,
1136            ) = obj
1137                .extract(py)
1138                .map_err(|e: PyErr| PyRuntimeError::new_err(e.to_string()))?;
1139            let inst_type =
1140                OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1141            domain_orders.push((
1142                inst_type,
1143                instrument_id,
1144                client_order_id,
1145                new_client_order_id,
1146                price,
1147                quantity,
1148            ));
1149        }
1150
1151        let client = self.clone();
1152
1153        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1154            client
1155                .batch_modify_orders(domain_orders)
1156                .await
1157                .map_err(to_pyvalue_err)
1158        })
1159    }
1160
1161    #[pyo3(name = "mass_cancel_orders")]
1162    fn py_mass_cancel_orders<'py>(
1163        &self,
1164        py: Python<'py>,
1165        instrument_id: InstrumentId,
1166    ) -> PyResult<Bound<'py, PyAny>> {
1167        let client = self.clone();
1168
1169        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1170            client
1171                .mass_cancel_orders(instrument_id)
1172                .await
1173                .map_err(to_pyvalue_err)
1174        })
1175    }
1176
1177    #[pyo3(name = "cache_instruments")]
1178    fn py_cache_instruments(&self, py: Python<'_>, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
1179        let instruments: Result<Vec<_>, _> = instruments
1180            .into_iter()
1181            .map(|inst| pyobject_to_instrument_any(py, inst))
1182            .collect();
1183        self.cache_instruments(instruments?);
1184        Ok(())
1185    }
1186
1187    #[pyo3(name = "cache_instrument")]
1188    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
1189        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
1190        Ok(())
1191    }
1192}
1193
1194pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
1195    if let Err(e) = callback.call1(py, (py_obj,)) {
1196        tracing::error!("Error calling Python: {e}");
1197    }
1198}
1199
1200fn call_python_with_data<F>(callback: &Py<PyAny>, data_converter: F)
1201where
1202    F: FnOnce(Python) -> PyResult<Py<PyAny>>,
1203{
1204    Python::attach(|py| match data_converter(py) {
1205        Ok(py_obj) => call_python(py, callback, py_obj),
1206        Err(e) => tracing::error!("Failed to convert data to Python object: {e}"),
1207    });
1208}