nautilus_okx/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the OKX WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - RwLock is preferred over Mutex (many reads, few writes).
43
44use std::str::FromStr;
45
46use futures_util::StreamExt;
47use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
48use nautilus_model::{
49    data::{BarType, Data, OrderBookDeltas_API},
50    enums::{OrderSide, OrderType, PositionSide, TimeInForce},
51    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
52    python::{
53        data::data_to_pycapsule,
54        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
55    },
56    types::{Price, Quantity},
57};
58use pyo3::{IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
59
60use crate::{
61    common::enums::{OKXInstrumentType, OKXTradeMode, OKXVipLevel},
62    websocket::{
63        OKXWebSocketClient,
64        messages::{ExecutionReport, NautilusWsMessage, OKXWebSocketError},
65    },
66};
67
68#[pyo3::pymethods]
69impl OKXWebSocketError {
70    #[getter]
71    pub fn code(&self) -> &str {
72        &self.code
73    }
74
75    #[getter]
76    pub fn message(&self) -> &str {
77        &self.message
78    }
79
80    #[getter]
81    pub fn conn_id(&self) -> Option<&str> {
82        self.conn_id.as_deref()
83    }
84
85    #[getter]
86    pub fn ts_event(&self) -> u64 {
87        self.timestamp
88    }
89
90    fn __repr__(&self) -> String {
91        format!(
92            "OKXWebSocketError(code='{}', message='{}', conn_id={:?}, ts_event={})",
93            self.code, self.message, self.conn_id, self.timestamp
94        )
95    }
96}
97
98#[pymethods]
99impl OKXWebSocketClient {
100    #[new]
101    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
102    fn py_new(
103        url: Option<String>,
104        api_key: Option<String>,
105        api_secret: Option<String>,
106        api_passphrase: Option<String>,
107        account_id: Option<AccountId>,
108        heartbeat: Option<u64>,
109    ) -> PyResult<Self> {
110        Self::new(
111            url,
112            api_key,
113            api_secret,
114            api_passphrase,
115            account_id,
116            heartbeat,
117        )
118        .map_err(to_pyvalue_err)
119    }
120
121    #[staticmethod]
122    #[pyo3(name = "with_credentials")]
123    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
124    fn py_with_credentials(
125        url: Option<String>,
126        api_key: Option<String>,
127        api_secret: Option<String>,
128        api_passphrase: Option<String>,
129        account_id: Option<AccountId>,
130        heartbeat: Option<u64>,
131    ) -> PyResult<Self> {
132        Self::with_credentials(
133            url,
134            api_key,
135            api_secret,
136            api_passphrase,
137            account_id,
138            heartbeat,
139        )
140        .map_err(to_pyvalue_err)
141    }
142
143    #[staticmethod]
144    #[pyo3(name = "from_env")]
145    fn py_from_env() -> PyResult<Self> {
146        Self::from_env().map_err(to_pyvalue_err)
147    }
148
149    #[getter]
150    #[pyo3(name = "url")]
151    #[must_use]
152    pub fn py_url(&self) -> &str {
153        self.url()
154    }
155
156    #[getter]
157    #[pyo3(name = "api_key")]
158    #[must_use]
159    pub fn py_api_key(&self) -> Option<&str> {
160        self.api_key()
161    }
162
163    #[getter]
164    #[pyo3(name = "api_key_masked")]
165    #[must_use]
166    pub fn py_api_key_masked(&self) -> Option<String> {
167        self.api_key_masked()
168    }
169
170    #[pyo3(name = "is_active")]
171    fn py_is_active(&mut self) -> bool {
172        self.is_active()
173    }
174
175    #[pyo3(name = "is_closed")]
176    fn py_is_closed(&mut self) -> bool {
177        self.is_closed()
178    }
179
180    #[pyo3(name = "cancel_all_requests")]
181    pub fn py_cancel_all_requests(&self) {
182        self.cancel_all_requests();
183    }
184
185    #[pyo3(name = "get_subscriptions")]
186    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
187        let channels = self.get_subscriptions(instrument_id);
188
189        // Convert to OKX channel names
190        channels
191            .iter()
192            .map(|c| {
193                serde_json::to_value(c)
194                    .ok()
195                    .and_then(|v| v.as_str().map(String::from))
196                    .unwrap_or_else(|| c.to_string())
197            })
198            .collect()
199    }
200
201    /// Sets the VIP level for this client.
202    ///
203    /// The VIP level determines which WebSocket channels are available.
204    #[pyo3(name = "set_vip_level")]
205    fn py_set_vip_level(&self, vip_level: OKXVipLevel) {
206        self.set_vip_level(vip_level);
207    }
208
209    /// Gets the current VIP level.
210    #[pyo3(name = "vip_level")]
211    #[getter]
212    fn py_vip_level(&self) -> OKXVipLevel {
213        self.vip_level()
214    }
215
216    #[pyo3(name = "connect")]
217    fn py_connect<'py>(
218        &mut self,
219        py: Python<'py>,
220        instruments: Vec<Py<PyAny>>,
221        callback: Py<PyAny>,
222    ) -> PyResult<Bound<'py, PyAny>> {
223        let mut instruments_any = Vec::new();
224        for inst in instruments {
225            let inst_any = pyobject_to_instrument_any(py, inst)?;
226            instruments_any.push(inst_any);
227        }
228
229        self.cache_instruments(instruments_any);
230
231        let mut client = self.clone();
232
233        pyo3_async_runtimes::tokio::future_into_py(py, async move {
234            client.connect().await.map_err(to_pyruntime_err)?;
235
236            let stream = client.stream();
237
238            // Keep client alive in the spawned task to prevent handler from dropping
239            tokio::spawn(async move {
240                let _client = client;
241                tokio::pin!(stream);
242
243                while let Some(msg) = stream.next().await {
244                    match msg {
245                        NautilusWsMessage::Instrument(msg) => {
246                            call_python_with_data(&callback, |py| {
247                                instrument_any_to_pyobject(py, *msg)
248                            });
249                        }
250                        NautilusWsMessage::Data(msg) => Python::attach(|py| {
251                            for data in msg {
252                                let py_obj = data_to_pycapsule(py, data);
253                                call_python(py, &callback, py_obj);
254                            }
255                        }),
256                        NautilusWsMessage::FundingRates(msg) => {
257                            for data in msg {
258                                call_python_with_data(&callback, |py| data.into_py_any(py));
259                            }
260                        }
261                        NautilusWsMessage::OrderRejected(msg) => {
262                            call_python_with_data(&callback, |py| msg.into_py_any(py));
263                        }
264                        NautilusWsMessage::OrderCancelRejected(msg) => {
265                            call_python_with_data(&callback, |py| msg.into_py_any(py));
266                        }
267                        NautilusWsMessage::OrderModifyRejected(msg) => {
268                            call_python_with_data(&callback, |py| msg.into_py_any(py));
269                        }
270                        NautilusWsMessage::ExecutionReports(msg) => {
271                            for report in msg {
272                                match report {
273                                    ExecutionReport::Order(report) => {
274                                        call_python_with_data(&callback, |py| {
275                                            report.into_py_any(py)
276                                        });
277                                    }
278                                    ExecutionReport::Fill(report) => {
279                                        call_python_with_data(&callback, |py| {
280                                            report.into_py_any(py)
281                                        });
282                                    }
283                                };
284                            }
285                        }
286                        NautilusWsMessage::Deltas(msg) => Python::attach(|py| {
287                            let py_obj =
288                                data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(msg)));
289                            call_python(py, &callback, py_obj);
290                        }),
291                        NautilusWsMessage::AccountUpdate(msg) => {
292                            call_python_with_data(&callback, |py| msg.into_py_any(py));
293                        }
294                        NautilusWsMessage::PositionUpdate(msg) => {
295                            call_python_with_data(&callback, |py| msg.into_py_any(py));
296                        }
297                        NautilusWsMessage::Reconnected => {} // Nothing to handle
298                        NautilusWsMessage::Authenticated => {} // Nothing to handle
299                        NautilusWsMessage::Error(msg) => {
300                            call_python_with_data(&callback, |py| msg.into_py_any(py));
301                        }
302                        NautilusWsMessage::Raw(msg) => {
303                            tracing::debug!("Received raw message, skipping: {msg}");
304                        }
305                    }
306                }
307            });
308
309            Ok(())
310        })
311    }
312
313    #[pyo3(name = "wait_until_active")]
314    fn py_wait_until_active<'py>(
315        &self,
316        py: Python<'py>,
317        timeout_secs: f64,
318    ) -> PyResult<Bound<'py, PyAny>> {
319        let client = self.clone();
320
321        pyo3_async_runtimes::tokio::future_into_py(py, async move {
322            client
323                .wait_until_active(timeout_secs)
324                .await
325                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
326            Ok(())
327        })
328    }
329
330    #[pyo3(name = "close")]
331    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
332        let mut client = self.clone();
333
334        pyo3_async_runtimes::tokio::future_into_py(py, async move {
335            if let Err(e) = client.close().await {
336                log::error!("Error on close: {e}");
337            }
338            Ok(())
339        })
340    }
341
342    #[pyo3(name = "subscribe_instruments")]
343    fn py_subscribe_instruments<'py>(
344        &self,
345        py: Python<'py>,
346        instrument_type: OKXInstrumentType,
347    ) -> PyResult<Bound<'py, PyAny>> {
348        let client = self.clone();
349
350        pyo3_async_runtimes::tokio::future_into_py(py, async move {
351            if let Err(e) = client.subscribe_instruments(instrument_type).await {
352                log::error!("Failed to subscribe to instruments '{instrument_type}': {e}");
353            }
354            Ok(())
355        })
356    }
357
358    #[pyo3(name = "subscribe_instrument")]
359    fn py_subscribe_instrument<'py>(
360        &self,
361        py: Python<'py>,
362        instrument_id: InstrumentId,
363    ) -> PyResult<Bound<'py, PyAny>> {
364        let client = self.clone();
365
366        pyo3_async_runtimes::tokio::future_into_py(py, async move {
367            if let Err(e) = client.subscribe_instrument(instrument_id).await {
368                log::error!("Failed to subscribe to instrument {instrument_id}: {e}");
369            }
370            Ok(())
371        })
372    }
373
374    #[pyo3(name = "subscribe_book")]
375    fn py_subscribe_book<'py>(
376        &self,
377        py: Python<'py>,
378        instrument_id: InstrumentId,
379    ) -> PyResult<Bound<'py, PyAny>> {
380        let client = self.clone();
381
382        pyo3_async_runtimes::tokio::future_into_py(py, async move {
383            client
384                .subscribe_book(instrument_id)
385                .await
386                .map_err(to_pyvalue_err)
387        })
388    }
389
390    #[pyo3(name = "subscribe_book50_l2_tbt")]
391    fn py_subscribe_book50_l2_tbt<'py>(
392        &self,
393        py: Python<'py>,
394        instrument_id: InstrumentId,
395    ) -> PyResult<Bound<'py, PyAny>> {
396        let client = self.clone();
397
398        pyo3_async_runtimes::tokio::future_into_py(py, async move {
399            if let Err(e) = client.subscribe_book50_l2_tbt(instrument_id).await {
400                log::error!("Failed to subscribe to book50_tbt: {e}");
401            }
402            Ok(())
403        })
404    }
405
406    #[pyo3(name = "subscribe_book_l2_tbt")]
407    fn py_subscribe_book_l2_tbt<'py>(
408        &self,
409        py: Python<'py>,
410        instrument_id: InstrumentId,
411    ) -> PyResult<Bound<'py, PyAny>> {
412        let client = self.clone();
413
414        pyo3_async_runtimes::tokio::future_into_py(py, async move {
415            if let Err(e) = client.subscribe_book_l2_tbt(instrument_id).await {
416                log::error!("Failed to subscribe to books_l2_tbt: {e}");
417            }
418            Ok(())
419        })
420    }
421
422    #[pyo3(name = "subscribe_book_with_depth")]
423    fn py_subscribe_book_with_depth<'py>(
424        &self,
425        py: Python<'py>,
426        instrument_id: InstrumentId,
427        depth: u16,
428    ) -> PyResult<Bound<'py, PyAny>> {
429        let client = self.clone();
430
431        pyo3_async_runtimes::tokio::future_into_py(py, async move {
432            client
433                .subscribe_book_with_depth(instrument_id, depth)
434                .await
435                .map_err(to_pyvalue_err)
436        })
437    }
438
439    #[pyo3(name = "subscribe_book_depth5")]
440    fn py_subscribe_book_depth5<'py>(
441        &self,
442        py: Python<'py>,
443        instrument_id: InstrumentId,
444    ) -> PyResult<Bound<'py, PyAny>> {
445        let client = self.clone();
446
447        pyo3_async_runtimes::tokio::future_into_py(py, async move {
448            if let Err(e) = client.subscribe_book_depth5(instrument_id).await {
449                log::error!("Failed to subscribe to books5: {e}");
450            }
451            Ok(())
452        })
453    }
454
455    #[pyo3(name = "subscribe_quotes")]
456    fn py_subscribe_quotes<'py>(
457        &self,
458        py: Python<'py>,
459        instrument_id: InstrumentId,
460    ) -> PyResult<Bound<'py, PyAny>> {
461        let client = self.clone();
462
463        pyo3_async_runtimes::tokio::future_into_py(py, async move {
464            if let Err(e) = client.subscribe_quotes(instrument_id).await {
465                log::error!("Failed to subscribe to quotes: {e}");
466            }
467            Ok(())
468        })
469    }
470
471    #[pyo3(name = "subscribe_trades")]
472    fn py_subscribe_trades<'py>(
473        &self,
474        py: Python<'py>,
475        instrument_id: InstrumentId,
476        aggregated: bool,
477    ) -> PyResult<Bound<'py, PyAny>> {
478        let client = self.clone();
479
480        pyo3_async_runtimes::tokio::future_into_py(py, async move {
481            if let Err(e) = client.subscribe_trades(instrument_id, aggregated).await {
482                log::error!("Failed to subscribe to trades: {e}");
483            }
484            Ok(())
485        })
486    }
487
488    #[pyo3(name = "subscribe_bars")]
489    fn py_subscribe_bars<'py>(
490        &self,
491        py: Python<'py>,
492        bar_type: BarType,
493    ) -> PyResult<Bound<'py, PyAny>> {
494        let client = self.clone();
495
496        pyo3_async_runtimes::tokio::future_into_py(py, async move {
497            if let Err(e) = client.subscribe_bars(bar_type).await {
498                log::error!("Failed to subscribe to bars: {e}");
499            }
500            Ok(())
501        })
502    }
503
504    #[pyo3(name = "unsubscribe_book")]
505    fn py_unsubscribe_book<'py>(
506        &self,
507        py: Python<'py>,
508        instrument_id: InstrumentId,
509    ) -> PyResult<Bound<'py, PyAny>> {
510        let client = self.clone();
511
512        pyo3_async_runtimes::tokio::future_into_py(py, async move {
513            if let Err(e) = client.unsubscribe_book(instrument_id).await {
514                log::error!("Failed to unsubscribe from order book: {e}");
515            }
516            Ok(())
517        })
518    }
519
520    #[pyo3(name = "unsubscribe_book_depth5")]
521    fn py_unsubscribe_book_depth5<'py>(
522        &self,
523        py: Python<'py>,
524        instrument_id: InstrumentId,
525    ) -> PyResult<Bound<'py, PyAny>> {
526        let client = self.clone();
527
528        pyo3_async_runtimes::tokio::future_into_py(py, async move {
529            if let Err(e) = client.unsubscribe_book_depth5(instrument_id).await {
530                log::error!("Failed to unsubscribe from books5: {e}");
531            }
532            Ok(())
533        })
534    }
535
536    #[pyo3(name = "unsubscribe_book50_l2_tbt")]
537    fn py_unsubscribe_book50_l2_tbt<'py>(
538        &self,
539        py: Python<'py>,
540        instrument_id: InstrumentId,
541    ) -> PyResult<Bound<'py, PyAny>> {
542        let client = self.clone();
543
544        pyo3_async_runtimes::tokio::future_into_py(py, async move {
545            if let Err(e) = client.unsubscribe_book50_l2_tbt(instrument_id).await {
546                log::error!("Failed to unsubscribe from books50_l2_tbt: {e}");
547            }
548            Ok(())
549        })
550    }
551
552    #[pyo3(name = "unsubscribe_book_l2_tbt")]
553    fn py_unsubscribe_book_l2_tbt<'py>(
554        &self,
555        py: Python<'py>,
556        instrument_id: InstrumentId,
557    ) -> PyResult<Bound<'py, PyAny>> {
558        let client = self.clone();
559
560        pyo3_async_runtimes::tokio::future_into_py(py, async move {
561            if let Err(e) = client.unsubscribe_book_l2_tbt(instrument_id).await {
562                log::error!("Failed to unsubscribe from books_l2_tbt: {e}");
563            }
564            Ok(())
565        })
566    }
567
568    #[pyo3(name = "unsubscribe_quotes")]
569    fn py_unsubscribe_quotes<'py>(
570        &self,
571        py: Python<'py>,
572        instrument_id: InstrumentId,
573    ) -> PyResult<Bound<'py, PyAny>> {
574        let client = self.clone();
575
576        pyo3_async_runtimes::tokio::future_into_py(py, async move {
577            if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
578                log::error!("Failed to unsubscribe from quotes: {e}");
579            }
580            Ok(())
581        })
582    }
583
584    #[pyo3(name = "unsubscribe_trades")]
585    fn py_unsubscribe_trades<'py>(
586        &self,
587        py: Python<'py>,
588        instrument_id: InstrumentId,
589        aggregated: bool,
590    ) -> PyResult<Bound<'py, PyAny>> {
591        let client = self.clone();
592
593        pyo3_async_runtimes::tokio::future_into_py(py, async move {
594            if let Err(e) = client.unsubscribe_trades(instrument_id, aggregated).await {
595                log::error!("Failed to unsubscribe from trades: {e}");
596            }
597            Ok(())
598        })
599    }
600
601    #[pyo3(name = "unsubscribe_bars")]
602    fn py_unsubscribe_bars<'py>(
603        &self,
604        py: Python<'py>,
605        bar_type: BarType,
606    ) -> PyResult<Bound<'py, PyAny>> {
607        let client = self.clone();
608
609        pyo3_async_runtimes::tokio::future_into_py(py, async move {
610            if let Err(e) = client.unsubscribe_bars(bar_type).await {
611                log::error!("Failed to unsubscribe from bars: {e}");
612            }
613            Ok(())
614        })
615    }
616
617    #[pyo3(name = "subscribe_ticker")]
618    fn py_subscribe_ticker<'py>(
619        &self,
620        py: Python<'py>,
621        instrument_id: InstrumentId,
622    ) -> PyResult<Bound<'py, PyAny>> {
623        let client = self.clone();
624
625        pyo3_async_runtimes::tokio::future_into_py(py, async move {
626            if let Err(e) = client.subscribe_ticker(instrument_id).await {
627                log::error!("Failed to subscribe to ticker: {e}");
628            }
629            Ok(())
630        })
631    }
632
633    #[pyo3(name = "unsubscribe_ticker")]
634    fn py_unsubscribe_ticker<'py>(
635        &self,
636        py: Python<'py>,
637        instrument_id: InstrumentId,
638    ) -> PyResult<Bound<'py, PyAny>> {
639        let client = self.clone();
640
641        pyo3_async_runtimes::tokio::future_into_py(py, async move {
642            if let Err(e) = client.unsubscribe_ticker(instrument_id).await {
643                log::error!("Failed to unsubscribe from ticker: {e}");
644            }
645            Ok(())
646        })
647    }
648
649    #[pyo3(name = "subscribe_mark_prices")]
650    fn py_subscribe_mark_prices<'py>(
651        &self,
652        py: Python<'py>,
653        instrument_id: InstrumentId,
654    ) -> PyResult<Bound<'py, PyAny>> {
655        let client = self.clone();
656
657        pyo3_async_runtimes::tokio::future_into_py(py, async move {
658            if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
659                log::error!("Failed to subscribe to mark prices: {e}");
660            }
661            Ok(())
662        })
663    }
664
665    #[pyo3(name = "unsubscribe_mark_prices")]
666    fn py_unsubscribe_mark_prices<'py>(
667        &self,
668        py: Python<'py>,
669        instrument_id: InstrumentId,
670    ) -> PyResult<Bound<'py, PyAny>> {
671        let client = self.clone();
672
673        pyo3_async_runtimes::tokio::future_into_py(py, async move {
674            if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
675                log::error!("Failed to unsubscribe from mark prices: {e}");
676            }
677            Ok(())
678        })
679    }
680
681    #[pyo3(name = "subscribe_index_prices")]
682    fn py_subscribe_index_prices<'py>(
683        &self,
684        py: Python<'py>,
685        instrument_id: InstrumentId,
686    ) -> PyResult<Bound<'py, PyAny>> {
687        let client = self.clone();
688
689        pyo3_async_runtimes::tokio::future_into_py(py, async move {
690            if let Err(e) = client.subscribe_index_prices(instrument_id).await {
691                log::error!("Failed to subscribe to index prices: {e}");
692            }
693            Ok(())
694        })
695    }
696
697    #[pyo3(name = "unsubscribe_index_prices")]
698    fn py_unsubscribe_index_prices<'py>(
699        &self,
700        py: Python<'py>,
701        instrument_id: InstrumentId,
702    ) -> PyResult<Bound<'py, PyAny>> {
703        let client = self.clone();
704
705        pyo3_async_runtimes::tokio::future_into_py(py, async move {
706            if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
707                log::error!("Failed to unsubscribe from index prices: {e}");
708            }
709            Ok(())
710        })
711    }
712
713    #[pyo3(name = "subscribe_funding_rates")]
714    fn py_subscribe_funding_rates<'py>(
715        &self,
716        py: Python<'py>,
717        instrument_id: InstrumentId,
718    ) -> PyResult<Bound<'py, PyAny>> {
719        let client = self.clone();
720
721        pyo3_async_runtimes::tokio::future_into_py(py, async move {
722            if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
723                log::error!("Failed to subscribe to funding rates: {e}");
724            }
725            Ok(())
726        })
727    }
728
729    #[pyo3(name = "unsubscribe_funding_rates")]
730    fn py_unsubscribe_funding_rates<'py>(
731        &self,
732        py: Python<'py>,
733        instrument_id: InstrumentId,
734    ) -> PyResult<Bound<'py, PyAny>> {
735        let client = self.clone();
736
737        pyo3_async_runtimes::tokio::future_into_py(py, async move {
738            if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
739                log::error!("Failed to unsubscribe from funding rates: {e}");
740            }
741            Ok(())
742        })
743    }
744
745    #[pyo3(name = "subscribe_orders")]
746    fn py_subscribe_orders<'py>(
747        &self,
748        py: Python<'py>,
749        instrument_type: OKXInstrumentType,
750    ) -> PyResult<Bound<'py, PyAny>> {
751        let client = self.clone();
752
753        pyo3_async_runtimes::tokio::future_into_py(py, async move {
754            if let Err(e) = client.subscribe_orders(instrument_type).await {
755                log::error!("Failed to subscribe to orders '{instrument_type}': {e}");
756            }
757            Ok(())
758        })
759    }
760
761    #[pyo3(name = "unsubscribe_orders")]
762    fn py_unsubscribe_orders<'py>(
763        &self,
764        py: Python<'py>,
765        instrument_type: OKXInstrumentType,
766    ) -> PyResult<Bound<'py, PyAny>> {
767        let client = self.clone();
768
769        pyo3_async_runtimes::tokio::future_into_py(py, async move {
770            if let Err(e) = client.unsubscribe_orders(instrument_type).await {
771                log::error!("Failed to unsubscribe from orders '{instrument_type}': {e}");
772            }
773            Ok(())
774        })
775    }
776
777    #[pyo3(name = "subscribe_orders_algo")]
778    fn py_subscribe_orders_algo<'py>(
779        &self,
780        py: Python<'py>,
781        instrument_type: OKXInstrumentType,
782    ) -> PyResult<Bound<'py, PyAny>> {
783        let client = self.clone();
784
785        pyo3_async_runtimes::tokio::future_into_py(py, async move {
786            if let Err(e) = client.subscribe_orders_algo(instrument_type).await {
787                log::error!("Failed to subscribe to algo orders '{instrument_type}': {e}");
788            }
789            Ok(())
790        })
791    }
792
793    #[pyo3(name = "unsubscribe_orders_algo")]
794    fn py_unsubscribe_orders_algo<'py>(
795        &self,
796        py: Python<'py>,
797        instrument_type: OKXInstrumentType,
798    ) -> PyResult<Bound<'py, PyAny>> {
799        let client = self.clone();
800
801        pyo3_async_runtimes::tokio::future_into_py(py, async move {
802            if let Err(e) = client.unsubscribe_orders_algo(instrument_type).await {
803                log::error!("Failed to unsubscribe from algo orders '{instrument_type}': {e}");
804            }
805            Ok(())
806        })
807    }
808
809    #[pyo3(name = "subscribe_fills")]
810    fn py_subscribe_fills<'py>(
811        &self,
812        py: Python<'py>,
813        instrument_type: OKXInstrumentType,
814    ) -> PyResult<Bound<'py, PyAny>> {
815        let client = self.clone();
816
817        pyo3_async_runtimes::tokio::future_into_py(py, async move {
818            if let Err(e) = client.subscribe_fills(instrument_type).await {
819                log::error!("Failed to subscribe to fills '{instrument_type}': {e}");
820            }
821            Ok(())
822        })
823    }
824
825    #[pyo3(name = "unsubscribe_fills")]
826    fn py_unsubscribe_fills<'py>(
827        &self,
828        py: Python<'py>,
829        instrument_type: OKXInstrumentType,
830    ) -> PyResult<Bound<'py, PyAny>> {
831        let client = self.clone();
832
833        pyo3_async_runtimes::tokio::future_into_py(py, async move {
834            if let Err(e) = client.unsubscribe_fills(instrument_type).await {
835                log::error!("Failed to unsubscribe from fills '{instrument_type}': {e}");
836            }
837            Ok(())
838        })
839    }
840
841    #[pyo3(name = "subscribe_account")]
842    fn py_subscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
843        let client = self.clone();
844
845        pyo3_async_runtimes::tokio::future_into_py(py, async move {
846            if let Err(e) = client.subscribe_account().await {
847                log::error!("Failed to subscribe to account: {e}");
848            }
849            Ok(())
850        })
851    }
852
853    #[pyo3(name = "unsubscribe_account")]
854    fn py_unsubscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
855        let client = self.clone();
856
857        pyo3_async_runtimes::tokio::future_into_py(py, async move {
858            if let Err(e) = client.unsubscribe_account().await {
859                log::error!("Failed to unsubscribe from account: {e}");
860            }
861            Ok(())
862        })
863    }
864
865    #[pyo3(name = "submit_order")]
866    #[pyo3(signature = (
867        trader_id,
868        strategy_id,
869        instrument_id,
870        td_mode,
871        client_order_id,
872        order_side,
873        order_type,
874        quantity,
875        time_in_force=None,
876        price=None,
877        trigger_price=None,
878        post_only=None,
879        reduce_only=None,
880        quote_quantity=None,
881        position_side=None,
882    ))]
883    #[allow(clippy::too_many_arguments)]
884    fn py_submit_order<'py>(
885        &self,
886        py: Python<'py>,
887        trader_id: TraderId,
888        strategy_id: StrategyId,
889        instrument_id: InstrumentId,
890        td_mode: OKXTradeMode,
891        client_order_id: ClientOrderId,
892        order_side: OrderSide,
893        order_type: OrderType,
894        quantity: Quantity,
895        time_in_force: Option<TimeInForce>,
896        price: Option<Price>,
897        trigger_price: Option<Price>,
898        post_only: Option<bool>,
899        reduce_only: Option<bool>,
900        quote_quantity: Option<bool>,
901        position_side: Option<PositionSide>,
902    ) -> PyResult<Bound<'py, PyAny>> {
903        let client = self.clone();
904
905        pyo3_async_runtimes::tokio::future_into_py(py, async move {
906            client
907                .submit_order(
908                    trader_id,
909                    strategy_id,
910                    instrument_id,
911                    td_mode,
912                    client_order_id,
913                    order_side,
914                    order_type,
915                    quantity,
916                    time_in_force,
917                    price,
918                    trigger_price,
919                    post_only,
920                    reduce_only,
921                    quote_quantity,
922                    position_side,
923                )
924                .await
925                .map_err(to_pyvalue_err)
926        })
927    }
928
929    #[pyo3(name = "cancel_order", signature = (
930        trader_id,
931        strategy_id,
932        instrument_id,
933        client_order_id=None,
934        venue_order_id=None,
935    ))]
936    #[allow(clippy::too_many_arguments)]
937    fn py_cancel_order<'py>(
938        &self,
939        py: Python<'py>,
940        trader_id: TraderId,
941        strategy_id: StrategyId,
942        instrument_id: InstrumentId,
943        client_order_id: Option<ClientOrderId>,
944        venue_order_id: Option<VenueOrderId>,
945    ) -> PyResult<Bound<'py, PyAny>> {
946        let client = self.clone();
947
948        pyo3_async_runtimes::tokio::future_into_py(py, async move {
949            client
950                .cancel_order(
951                    trader_id,
952                    strategy_id,
953                    instrument_id,
954                    client_order_id,
955                    venue_order_id,
956                )
957                .await
958                .map_err(to_pyvalue_err)
959        })
960    }
961
962    #[pyo3(name = "modify_order")]
963    #[pyo3(signature = (
964        trader_id,
965        strategy_id,
966        instrument_id,
967        client_order_id=None,
968        venue_order_id=None,
969        price=None,
970        quantity=None,
971    ))]
972    #[allow(clippy::too_many_arguments)]
973    fn py_modify_order<'py>(
974        &self,
975        py: Python<'py>,
976        trader_id: TraderId,
977        strategy_id: StrategyId,
978        instrument_id: InstrumentId,
979        client_order_id: Option<ClientOrderId>,
980        venue_order_id: Option<VenueOrderId>,
981        price: Option<Price>,
982        quantity: Option<Quantity>,
983    ) -> PyResult<Bound<'py, PyAny>> {
984        let client = self.clone();
985
986        pyo3_async_runtimes::tokio::future_into_py(py, async move {
987            client
988                .modify_order(
989                    trader_id,
990                    strategy_id,
991                    instrument_id,
992                    client_order_id,
993                    price,
994                    quantity,
995                    venue_order_id,
996                )
997                .await
998                .map_err(to_pyvalue_err)
999        })
1000    }
1001
1002    #[allow(clippy::type_complexity)]
1003    #[pyo3(name = "batch_submit_orders")]
1004    fn py_batch_submit_orders<'py>(
1005        &self,
1006        py: Python<'py>,
1007        orders: Vec<Py<PyAny>>,
1008    ) -> PyResult<Bound<'py, PyAny>> {
1009        let mut domain_orders = Vec::with_capacity(orders.len());
1010
1011        for obj in orders {
1012            let (
1013                instrument_type,
1014                instrument_id,
1015                td_mode,
1016                client_order_id,
1017                order_side,
1018                order_type,
1019                quantity,
1020                position_side,
1021                price,
1022                trigger_price,
1023                post_only,
1024                reduce_only,
1025            ): (
1026                OKXInstrumentType,
1027                InstrumentId,
1028                OKXTradeMode,
1029                ClientOrderId,
1030                OrderSide,
1031                OrderType,
1032                Quantity,
1033                Option<PositionSide>,
1034                Option<Price>,
1035                Option<Price>,
1036                Option<bool>,
1037                Option<bool>,
1038            ) = obj
1039                .extract(py)
1040                .map_err(|e: PyErr| PyRuntimeError::new_err(e.to_string()))?;
1041
1042            domain_orders.push((
1043                instrument_type,
1044                instrument_id,
1045                td_mode,
1046                client_order_id,
1047                order_side,
1048                position_side,
1049                order_type,
1050                quantity,
1051                price,
1052                trigger_price,
1053                post_only,
1054                reduce_only,
1055            ));
1056        }
1057
1058        let client = self.clone();
1059
1060        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1061            client
1062                .batch_submit_orders(domain_orders)
1063                .await
1064                .map_err(to_pyvalue_err)
1065        })
1066    }
1067
1068    /// Cancels multiple orders via WebSocket.
1069    #[pyo3(name = "batch_cancel_orders")]
1070    fn py_batch_cancel_orders<'py>(
1071        &self,
1072        py: Python<'py>,
1073        cancels: Vec<Py<PyAny>>,
1074    ) -> PyResult<Bound<'py, PyAny>> {
1075        let mut batched_cancels = Vec::with_capacity(cancels.len());
1076
1077        for obj in cancels {
1078            let (instrument_id, client_order_id, order_id): (
1079                InstrumentId,
1080                Option<ClientOrderId>,
1081                Option<VenueOrderId>,
1082            ) = obj
1083                .extract(py)
1084                .map_err(|e: PyErr| PyRuntimeError::new_err(e.to_string()))?;
1085            batched_cancels.push((instrument_id, client_order_id, order_id));
1086        }
1087
1088        let client = self.clone();
1089
1090        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1091            client
1092                .batch_cancel_orders(batched_cancels)
1093                .await
1094                .map_err(to_pyvalue_err)
1095        })
1096    }
1097
1098    #[pyo3(name = "batch_modify_orders")]
1099    fn py_batch_modify_orders<'py>(
1100        &self,
1101        py: Python<'py>,
1102        orders: Vec<Py<PyAny>>,
1103    ) -> PyResult<Bound<'py, PyAny>> {
1104        let mut domain_orders = Vec::with_capacity(orders.len());
1105
1106        for obj in orders {
1107            let (
1108                instrument_type,
1109                instrument_id,
1110                client_order_id,
1111                new_client_order_id,
1112                price,
1113                quantity,
1114            ): (
1115                String,
1116                InstrumentId,
1117                ClientOrderId,
1118                ClientOrderId,
1119                Option<Price>,
1120                Option<Quantity>,
1121            ) = obj
1122                .extract(py)
1123                .map_err(|e: PyErr| PyRuntimeError::new_err(e.to_string()))?;
1124            let inst_type =
1125                OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1126            domain_orders.push((
1127                inst_type,
1128                instrument_id,
1129                client_order_id,
1130                new_client_order_id,
1131                price,
1132                quantity,
1133            ));
1134        }
1135
1136        let client = self.clone();
1137
1138        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1139            client
1140                .batch_modify_orders(domain_orders)
1141                .await
1142                .map_err(to_pyvalue_err)
1143        })
1144    }
1145
1146    #[pyo3(name = "mass_cancel_orders")]
1147    fn py_mass_cancel_orders<'py>(
1148        &self,
1149        py: Python<'py>,
1150        instrument_id: InstrumentId,
1151    ) -> PyResult<Bound<'py, PyAny>> {
1152        let client = self.clone();
1153
1154        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1155            client
1156                .mass_cancel_orders(instrument_id)
1157                .await
1158                .map_err(to_pyvalue_err)
1159        })
1160    }
1161
1162    #[pyo3(name = "cache_instruments")]
1163    fn py_cache_instruments(&self, py: Python<'_>, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
1164        let instruments: Result<Vec<_>, _> = instruments
1165            .into_iter()
1166            .map(|inst| pyobject_to_instrument_any(py, inst))
1167            .collect();
1168        self.cache_instruments(instruments?);
1169        Ok(())
1170    }
1171
1172    #[pyo3(name = "cache_instrument")]
1173    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
1174        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
1175        Ok(())
1176    }
1177}
1178
1179pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
1180    if let Err(e) = callback.call1(py, (py_obj,)) {
1181        tracing::error!("Error calling Python: {e}");
1182    }
1183}
1184
1185fn call_python_with_data<F>(callback: &Py<PyAny>, data_converter: F)
1186where
1187    F: FnOnce(Python) -> PyResult<Py<PyAny>>,
1188{
1189    Python::attach(|py| match data_converter(py) {
1190        Ok(py_obj) => call_python(py, callback, py_obj),
1191        Err(e) => tracing::error!("Failed to convert data to Python object: {e}"),
1192    });
1193}