nautilus_okx/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the OKX WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - RwLock is preferred over Mutex (many reads, few writes).
43
44use std::str::FromStr;
45
46use futures_util::StreamExt;
47use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
48use nautilus_model::{
49    data::{BarType, Data, OrderBookDeltas_API},
50    enums::{OrderSide, OrderType, PositionSide, TimeInForce},
51    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
52    python::{
53        data::data_to_pycapsule,
54        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
55    },
56    types::{Price, Quantity},
57};
58use pyo3::{IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
59
60use crate::{
61    common::enums::{OKXInstrumentType, OKXTradeMode},
62    websocket::{
63        OKXWebSocketClient,
64        messages::{ExecutionReport, NautilusWsMessage, OKXWebSocketError},
65    },
66};
67
68#[pyo3::pymethods]
69impl OKXWebSocketError {
70    #[getter]
71    pub fn code(&self) -> &str {
72        &self.code
73    }
74
75    #[getter]
76    pub fn message(&self) -> &str {
77        &self.message
78    }
79
80    #[getter]
81    pub fn conn_id(&self) -> Option<&str> {
82        self.conn_id.as_deref()
83    }
84
85    #[getter]
86    pub fn ts_event(&self) -> u64 {
87        self.timestamp
88    }
89
90    fn __repr__(&self) -> String {
91        format!(
92            "OKXWebSocketError(code='{}', message='{}', conn_id={:?}, ts_event={})",
93            self.code, self.message, self.conn_id, self.timestamp
94        )
95    }
96}
97
98#[pymethods]
99impl OKXWebSocketClient {
100    #[new]
101    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
102    fn py_new(
103        url: Option<String>,
104        api_key: Option<String>,
105        api_secret: Option<String>,
106        api_passphrase: Option<String>,
107        account_id: Option<AccountId>,
108        heartbeat: Option<u64>,
109    ) -> PyResult<Self> {
110        Self::new(
111            url,
112            api_key,
113            api_secret,
114            api_passphrase,
115            account_id,
116            heartbeat,
117        )
118        .map_err(to_pyvalue_err)
119    }
120
121    #[staticmethod]
122    #[pyo3(name = "with_credentials")]
123    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
124    fn py_with_credentials(
125        url: Option<String>,
126        api_key: Option<String>,
127        api_secret: Option<String>,
128        api_passphrase: Option<String>,
129        account_id: Option<AccountId>,
130        heartbeat: Option<u64>,
131    ) -> PyResult<Self> {
132        Self::with_credentials(
133            url,
134            api_key,
135            api_secret,
136            api_passphrase,
137            account_id,
138            heartbeat,
139        )
140        .map_err(to_pyvalue_err)
141    }
142
143    #[staticmethod]
144    #[pyo3(name = "from_env")]
145    fn py_from_env() -> PyResult<Self> {
146        Self::from_env().map_err(to_pyvalue_err)
147    }
148
149    #[getter]
150    #[pyo3(name = "url")]
151    #[must_use]
152    pub fn py_url(&self) -> &str {
153        self.url()
154    }
155
156    #[getter]
157    #[pyo3(name = "api_key")]
158    #[must_use]
159    pub fn py_api_key(&self) -> Option<&str> {
160        self.api_key()
161    }
162
163    #[pyo3(name = "is_active")]
164    fn py_is_active(&mut self) -> bool {
165        self.is_active()
166    }
167
168    #[pyo3(name = "is_closed")]
169    fn py_is_closed(&mut self) -> bool {
170        self.is_closed()
171    }
172
173    #[pyo3(name = "get_subscriptions")]
174    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
175        let channels = self.get_subscriptions(instrument_id);
176
177        // Convert to OKX channel names
178        channels
179            .iter()
180            .map(|c| {
181                serde_json::to_value(c)
182                    .ok()
183                    .and_then(|v| v.as_str().map(String::from))
184                    .unwrap_or_else(|| c.to_string())
185            })
186            .collect()
187    }
188
189    #[pyo3(name = "connect")]
190    fn py_connect<'py>(
191        &mut self,
192        py: Python<'py>,
193        instruments: Vec<PyObject>,
194        callback: PyObject,
195    ) -> PyResult<Bound<'py, PyAny>> {
196        let mut instruments_any = Vec::new();
197        for inst in instruments {
198            let inst_any = pyobject_to_instrument_any(py, inst)?;
199            instruments_any.push(inst_any);
200        }
201
202        self.initialize_instruments_cache(instruments_any);
203
204        let mut client = self.clone();
205
206        pyo3_async_runtimes::tokio::future_into_py(py, async move {
207            client.connect().await.map_err(to_pyruntime_err)?;
208
209            let stream = client.stream();
210
211            tokio::spawn(async move {
212                tokio::pin!(stream);
213
214                while let Some(msg) = stream.next().await {
215                    match msg {
216                        NautilusWsMessage::Instrument(msg) => {
217                            call_python_with_data(&callback, |py| {
218                                instrument_any_to_pyobject(py, *msg)
219                            });
220                        }
221                        NautilusWsMessage::Data(msg) => Python::with_gil(|py| {
222                            for data in msg {
223                                let py_obj = data_to_pycapsule(py, data);
224                                call_python(py, &callback, py_obj);
225                            }
226                        }),
227                        NautilusWsMessage::FundingRates(msg) => {
228                            for data in msg {
229                                call_python_with_data(&callback, |py| data.into_py_any(py));
230                            }
231                        }
232                        NautilusWsMessage::OrderRejected(msg) => {
233                            call_python_with_data(&callback, |py| msg.into_py_any(py))
234                        }
235                        NautilusWsMessage::OrderCancelRejected(msg) => {
236                            call_python_with_data(&callback, |py| msg.into_py_any(py))
237                        }
238                        NautilusWsMessage::OrderModifyRejected(msg) => {
239                            call_python_with_data(&callback, |py| msg.into_py_any(py))
240                        }
241                        NautilusWsMessage::ExecutionReports(msg) => {
242                            for report in msg {
243                                match report {
244                                    ExecutionReport::Order(report) => {
245                                        call_python_with_data(&callback, |py| {
246                                            report.into_py_any(py)
247                                        })
248                                    }
249                                    ExecutionReport::Fill(report) => {
250                                        call_python_with_data(&callback, |py| {
251                                            report.into_py_any(py)
252                                        })
253                                    }
254                                };
255                            }
256                        }
257                        NautilusWsMessage::Deltas(msg) => Python::with_gil(|py| {
258                            let py_obj =
259                                data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(msg)));
260                            call_python(py, &callback, py_obj);
261                        }),
262                        NautilusWsMessage::AccountUpdate(msg) => {
263                            call_python_with_data(&callback, |py| msg.py_to_dict(py));
264                        }
265                        NautilusWsMessage::Reconnected => {} // Nothing to handle
266                        NautilusWsMessage::Error(msg) => {
267                            call_python_with_data(&callback, |py| msg.into_py_any(py));
268                        }
269                        NautilusWsMessage::Raw(msg) => {
270                            tracing::debug!("Received raw message, skipping: {msg}");
271                        }
272                    }
273                }
274            });
275
276            Ok(())
277        })
278    }
279
280    #[pyo3(name = "wait_until_active")]
281    fn py_wait_until_active<'py>(
282        &self,
283        py: Python<'py>,
284        timeout_secs: f64,
285    ) -> PyResult<Bound<'py, PyAny>> {
286        let client = self.clone();
287
288        pyo3_async_runtimes::tokio::future_into_py(py, async move {
289            client
290                .wait_until_active(timeout_secs)
291                .await
292                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
293            Ok(())
294        })
295    }
296
297    #[pyo3(name = "close")]
298    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
299        let mut client = self.clone();
300
301        pyo3_async_runtimes::tokio::future_into_py(py, async move {
302            if let Err(e) = client.close().await {
303                log::error!("Error on close: {e}");
304            }
305            Ok(())
306        })
307    }
308
309    #[pyo3(name = "subscribe_instruments")]
310    fn py_subscribe_instruments<'py>(
311        &self,
312        py: Python<'py>,
313        instrument_type: OKXInstrumentType,
314    ) -> PyResult<Bound<'py, PyAny>> {
315        let client = self.clone();
316
317        pyo3_async_runtimes::tokio::future_into_py(py, async move {
318            if let Err(e) = client.subscribe_instruments(instrument_type).await {
319                log::error!("Failed to subscribe to instruments '{instrument_type}': {e}");
320            }
321            Ok(())
322        })
323    }
324
325    #[pyo3(name = "subscribe_instrument")]
326    fn py_subscribe_instrument<'py>(
327        &self,
328        py: Python<'py>,
329        instrument_id: InstrumentId,
330    ) -> PyResult<Bound<'py, PyAny>> {
331        let client = self.clone();
332
333        pyo3_async_runtimes::tokio::future_into_py(py, async move {
334            if let Err(e) = client.subscribe_instrument(instrument_id).await {
335                log::error!("Failed to subscribe to instrument {instrument_id}: {e}");
336            }
337            Ok(())
338        })
339    }
340
341    #[pyo3(name = "subscribe_book")]
342    fn py_subscribe_book<'py>(
343        &self,
344        py: Python<'py>,
345        instrument_id: InstrumentId,
346    ) -> PyResult<Bound<'py, PyAny>> {
347        let client = self.clone();
348
349        pyo3_async_runtimes::tokio::future_into_py(py, async move {
350            if let Err(e) = client.subscribe_book(instrument_id).await {
351                log::error!("Failed to subscribe to order book: {e}");
352            }
353            Ok(())
354        })
355    }
356
357    #[pyo3(name = "subscribe_book50_l2_tbt")]
358    fn py_subscribe_book50_l2_tbt<'py>(
359        &self,
360        py: Python<'py>,
361        instrument_id: InstrumentId,
362    ) -> PyResult<Bound<'py, PyAny>> {
363        let client = self.clone();
364
365        pyo3_async_runtimes::tokio::future_into_py(py, async move {
366            if let Err(e) = client.subscribe_books50_l2_tbt(instrument_id).await {
367                log::error!("Failed to subscribe to books50_tbt: {e}");
368            }
369            Ok(())
370        })
371    }
372
373    #[pyo3(name = "subscribe_book_l2_tbt")]
374    fn py_subscribe_book_l2_tbt<'py>(
375        &self,
376        py: Python<'py>,
377        instrument_id: InstrumentId,
378    ) -> PyResult<Bound<'py, PyAny>> {
379        let client = self.clone();
380
381        pyo3_async_runtimes::tokio::future_into_py(py, async move {
382            if let Err(e) = client.subscribe_book_l2_tbt(instrument_id).await {
383                log::error!("Failed to subscribe to books_l2_tbt: {e}");
384            }
385            Ok(())
386        })
387    }
388
389    #[pyo3(name = "subscribe_book_depth5")]
390    fn py_subscribe_book_depth5<'py>(
391        &self,
392        py: Python<'py>,
393        instrument_id: InstrumentId,
394    ) -> PyResult<Bound<'py, PyAny>> {
395        let client = self.clone();
396
397        pyo3_async_runtimes::tokio::future_into_py(py, async move {
398            if let Err(e) = client.subscribe_book_depth5(instrument_id).await {
399                log::error!("Failed to subscribe to books5: {e}");
400            }
401            Ok(())
402        })
403    }
404
405    #[pyo3(name = "subscribe_quotes")]
406    fn py_subscribe_quotes<'py>(
407        &self,
408        py: Python<'py>,
409        instrument_id: InstrumentId,
410    ) -> PyResult<Bound<'py, PyAny>> {
411        let client = self.clone();
412
413        pyo3_async_runtimes::tokio::future_into_py(py, async move {
414            if let Err(e) = client.subscribe_quotes(instrument_id).await {
415                log::error!("Failed to subscribe to quotes: {e}");
416            }
417            Ok(())
418        })
419    }
420
421    #[pyo3(name = "subscribe_trades")]
422    fn py_subscribe_trades<'py>(
423        &self,
424        py: Python<'py>,
425        instrument_id: InstrumentId,
426        aggregated: bool,
427    ) -> PyResult<Bound<'py, PyAny>> {
428        let client = self.clone();
429
430        pyo3_async_runtimes::tokio::future_into_py(py, async move {
431            if let Err(e) = client.subscribe_trades(instrument_id, aggregated).await {
432                log::error!("Failed to subscribe to trades: {e}");
433            }
434            Ok(())
435        })
436    }
437
438    #[pyo3(name = "subscribe_bars")]
439    fn py_subscribe_bars<'py>(
440        &self,
441        py: Python<'py>,
442        bar_type: BarType,
443    ) -> PyResult<Bound<'py, PyAny>> {
444        let client = self.clone();
445
446        pyo3_async_runtimes::tokio::future_into_py(py, async move {
447            if let Err(e) = client.subscribe_bars(bar_type).await {
448                log::error!("Failed to subscribe to bars: {e}");
449            }
450            Ok(())
451        })
452    }
453
454    #[pyo3(name = "unsubscribe_book")]
455    fn py_unsubscribe_book<'py>(
456        &self,
457        py: Python<'py>,
458        instrument_id: InstrumentId,
459    ) -> PyResult<Bound<'py, PyAny>> {
460        let client = self.clone();
461
462        pyo3_async_runtimes::tokio::future_into_py(py, async move {
463            if let Err(e) = client.unsubscribe_book(instrument_id).await {
464                log::error!("Failed to unsubscribe from order book: {e}");
465            }
466            Ok(())
467        })
468    }
469
470    #[pyo3(name = "unsubscribe_book_depth5")]
471    fn py_unsubscribe_book_depth5<'py>(
472        &self,
473        py: Python<'py>,
474        instrument_id: InstrumentId,
475    ) -> PyResult<Bound<'py, PyAny>> {
476        let client = self.clone();
477
478        pyo3_async_runtimes::tokio::future_into_py(py, async move {
479            if let Err(e) = client.unsubscribe_book_depth5(instrument_id).await {
480                log::error!("Failed to unsubscribe from books5: {e}");
481            }
482            Ok(())
483        })
484    }
485
486    #[pyo3(name = "unsubscribe_book50_l2_tbt")]
487    fn py_unsubscribe_book50_l2_tbt<'py>(
488        &self,
489        py: Python<'py>,
490        instrument_id: InstrumentId,
491    ) -> PyResult<Bound<'py, PyAny>> {
492        let client = self.clone();
493
494        pyo3_async_runtimes::tokio::future_into_py(py, async move {
495            if let Err(e) = client.unsubscribe_book50_l2_tbt(instrument_id).await {
496                log::error!("Failed to unsubscribe from books50_l2_tbt: {e}");
497            }
498            Ok(())
499        })
500    }
501
502    #[pyo3(name = "unsubscribe_book_l2_tbt")]
503    fn py_unsubscribe_book_l2_tbt<'py>(
504        &self,
505        py: Python<'py>,
506        instrument_id: InstrumentId,
507    ) -> PyResult<Bound<'py, PyAny>> {
508        let client = self.clone();
509
510        pyo3_async_runtimes::tokio::future_into_py(py, async move {
511            if let Err(e) = client.unsubscribe_book_l2_tbt(instrument_id).await {
512                log::error!("Failed to unsubscribe from books_l2_tbt: {e}");
513            }
514            Ok(())
515        })
516    }
517
518    #[pyo3(name = "unsubscribe_quotes")]
519    fn py_unsubscribe_quotes<'py>(
520        &self,
521        py: Python<'py>,
522        instrument_id: InstrumentId,
523    ) -> PyResult<Bound<'py, PyAny>> {
524        let client = self.clone();
525
526        pyo3_async_runtimes::tokio::future_into_py(py, async move {
527            if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
528                log::error!("Failed to unsubscribe from quotes: {e}");
529            }
530            Ok(())
531        })
532    }
533
534    #[pyo3(name = "unsubscribe_trades")]
535    fn py_unsubscribe_trades<'py>(
536        &self,
537        py: Python<'py>,
538        instrument_id: InstrumentId,
539        aggregated: bool,
540    ) -> PyResult<Bound<'py, PyAny>> {
541        let client = self.clone();
542
543        pyo3_async_runtimes::tokio::future_into_py(py, async move {
544            if let Err(e) = client.unsubscribe_trades(instrument_id, aggregated).await {
545                log::error!("Failed to unsubscribe from trades: {e}");
546            }
547            Ok(())
548        })
549    }
550
551    #[pyo3(name = "unsubscribe_bars")]
552    fn py_unsubscribe_bars<'py>(
553        &self,
554        py: Python<'py>,
555        bar_type: BarType,
556    ) -> PyResult<Bound<'py, PyAny>> {
557        let client = self.clone();
558
559        pyo3_async_runtimes::tokio::future_into_py(py, async move {
560            if let Err(e) = client.unsubscribe_bars(bar_type).await {
561                log::error!("Failed to unsubscribe from bars: {e}");
562            }
563            Ok(())
564        })
565    }
566
567    #[pyo3(name = "subscribe_ticker")]
568    fn py_subscribe_ticker<'py>(
569        &self,
570        py: Python<'py>,
571        instrument_id: InstrumentId,
572    ) -> PyResult<Bound<'py, PyAny>> {
573        let client = self.clone();
574
575        pyo3_async_runtimes::tokio::future_into_py(py, async move {
576            if let Err(e) = client.subscribe_ticker(instrument_id).await {
577                log::error!("Failed to subscribe to ticker: {e}");
578            }
579            Ok(())
580        })
581    }
582
583    #[pyo3(name = "unsubscribe_ticker")]
584    fn py_unsubscribe_ticker<'py>(
585        &self,
586        py: Python<'py>,
587        instrument_id: InstrumentId,
588    ) -> PyResult<Bound<'py, PyAny>> {
589        let client = self.clone();
590
591        pyo3_async_runtimes::tokio::future_into_py(py, async move {
592            if let Err(e) = client.unsubscribe_ticker(instrument_id).await {
593                log::error!("Failed to unsubscribe from ticker: {e}");
594            }
595            Ok(())
596        })
597    }
598
599    #[pyo3(name = "subscribe_mark_prices")]
600    fn py_subscribe_mark_prices<'py>(
601        &self,
602        py: Python<'py>,
603        instrument_id: InstrumentId,
604    ) -> PyResult<Bound<'py, PyAny>> {
605        let client = self.clone();
606
607        pyo3_async_runtimes::tokio::future_into_py(py, async move {
608            if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
609                log::error!("Failed to subscribe to mark prices: {e}");
610            }
611            Ok(())
612        })
613    }
614
615    #[pyo3(name = "unsubscribe_mark_prices")]
616    fn py_unsubscribe_mark_prices<'py>(
617        &self,
618        py: Python<'py>,
619        instrument_id: InstrumentId,
620    ) -> PyResult<Bound<'py, PyAny>> {
621        let client = self.clone();
622
623        pyo3_async_runtimes::tokio::future_into_py(py, async move {
624            if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
625                log::error!("Failed to unsubscribe from mark prices: {e}");
626            }
627            Ok(())
628        })
629    }
630
631    #[pyo3(name = "subscribe_index_prices")]
632    fn py_subscribe_index_prices<'py>(
633        &self,
634        py: Python<'py>,
635        instrument_id: InstrumentId,
636    ) -> PyResult<Bound<'py, PyAny>> {
637        let client = self.clone();
638
639        pyo3_async_runtimes::tokio::future_into_py(py, async move {
640            if let Err(e) = client.subscribe_index_prices(instrument_id).await {
641                log::error!("Failed to subscribe to index prices: {e}");
642            }
643            Ok(())
644        })
645    }
646
647    #[pyo3(name = "unsubscribe_index_prices")]
648    fn py_unsubscribe_index_prices<'py>(
649        &self,
650        py: Python<'py>,
651        instrument_id: InstrumentId,
652    ) -> PyResult<Bound<'py, PyAny>> {
653        let client = self.clone();
654
655        pyo3_async_runtimes::tokio::future_into_py(py, async move {
656            if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
657                log::error!("Failed to unsubscribe from index prices: {e}");
658            }
659            Ok(())
660        })
661    }
662
663    #[pyo3(name = "subscribe_funding_rates")]
664    fn py_subscribe_funding_rates<'py>(
665        &self,
666        py: Python<'py>,
667        instrument_id: InstrumentId,
668    ) -> PyResult<Bound<'py, PyAny>> {
669        let client = self.clone();
670
671        pyo3_async_runtimes::tokio::future_into_py(py, async move {
672            if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
673                log::error!("Failed to subscribe to funding rates: {e}");
674            }
675            Ok(())
676        })
677    }
678
679    #[pyo3(name = "unsubscribe_funding_rates")]
680    fn py_unsubscribe_funding_rates<'py>(
681        &self,
682        py: Python<'py>,
683        instrument_id: InstrumentId,
684    ) -> PyResult<Bound<'py, PyAny>> {
685        let client = self.clone();
686
687        pyo3_async_runtimes::tokio::future_into_py(py, async move {
688            if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
689                log::error!("Failed to unsubscribe from funding rates: {e}");
690            }
691            Ok(())
692        })
693    }
694
695    #[pyo3(name = "subscribe_orders")]
696    fn py_subscribe_orders<'py>(
697        &self,
698        py: Python<'py>,
699        instrument_type: OKXInstrumentType,
700    ) -> PyResult<Bound<'py, PyAny>> {
701        let client = self.clone();
702
703        pyo3_async_runtimes::tokio::future_into_py(py, async move {
704            if let Err(e) = client.subscribe_orders(instrument_type).await {
705                log::error!("Failed to subscribe to orders '{instrument_type}': {e}");
706            }
707            Ok(())
708        })
709    }
710
711    #[pyo3(name = "unsubscribe_orders")]
712    fn py_unsubscribe_orders<'py>(
713        &self,
714        py: Python<'py>,
715        instrument_type: OKXInstrumentType,
716    ) -> PyResult<Bound<'py, PyAny>> {
717        let client = self.clone();
718
719        pyo3_async_runtimes::tokio::future_into_py(py, async move {
720            if let Err(e) = client.unsubscribe_orders(instrument_type).await {
721                log::error!("Failed to unsubscribe from orders '{instrument_type}': {e}");
722            }
723            Ok(())
724        })
725    }
726
727    #[pyo3(name = "subscribe_fills")]
728    fn py_subscribe_fills<'py>(
729        &self,
730        py: Python<'py>,
731        instrument_type: OKXInstrumentType,
732    ) -> PyResult<Bound<'py, PyAny>> {
733        let client = self.clone();
734
735        pyo3_async_runtimes::tokio::future_into_py(py, async move {
736            if let Err(e) = client.subscribe_fills(instrument_type).await {
737                log::error!("Failed to subscribe to fills '{instrument_type}': {e}");
738            }
739            Ok(())
740        })
741    }
742
743    #[pyo3(name = "unsubscribe_fills")]
744    fn py_unsubscribe_fills<'py>(
745        &self,
746        py: Python<'py>,
747        instrument_type: OKXInstrumentType,
748    ) -> PyResult<Bound<'py, PyAny>> {
749        let client = self.clone();
750
751        pyo3_async_runtimes::tokio::future_into_py(py, async move {
752            if let Err(e) = client.unsubscribe_fills(instrument_type).await {
753                log::error!("Failed to unsubscribe from fills '{instrument_type}': {e}");
754            }
755            Ok(())
756        })
757    }
758
759    #[pyo3(name = "subscribe_account")]
760    fn py_subscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
761        let client = self.clone();
762
763        pyo3_async_runtimes::tokio::future_into_py(py, async move {
764            if let Err(e) = client.subscribe_account().await {
765                log::error!("Failed to subscribe to account: {e}");
766            }
767            Ok(())
768        })
769    }
770
771    #[pyo3(name = "unsubscribe_account")]
772    fn py_unsubscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
773        let client = self.clone();
774
775        pyo3_async_runtimes::tokio::future_into_py(py, async move {
776            if let Err(e) = client.unsubscribe_account().await {
777                log::error!("Failed to unsubscribe from account: {e}");
778            }
779            Ok(())
780        })
781    }
782
783    /// Submits a new order via WebSocket.
784    #[pyo3(name = "submit_order")]
785    #[pyo3(signature = (
786        trader_id,
787        strategy_id,
788        instrument_id,
789        td_mode,
790        client_order_id,
791        order_side,
792        order_type,
793        quantity,
794        time_in_force=None,
795        price=None,
796        trigger_price=None,
797        post_only=None,
798        reduce_only=None,
799        quote_quantity=None,
800        position_side=None,
801    ))]
802    #[allow(clippy::too_many_arguments)]
803    fn py_submit_order<'py>(
804        &self,
805        py: Python<'py>,
806        trader_id: TraderId,
807        strategy_id: StrategyId,
808        instrument_id: InstrumentId,
809        td_mode: OKXTradeMode,
810        client_order_id: ClientOrderId,
811        order_side: OrderSide,
812        order_type: OrderType,
813        quantity: Quantity,
814        time_in_force: Option<TimeInForce>,
815        price: Option<Price>,
816        trigger_price: Option<Price>,
817        post_only: Option<bool>,
818        reduce_only: Option<bool>,
819        quote_quantity: Option<bool>,
820        position_side: Option<PositionSide>,
821    ) -> PyResult<Bound<'py, PyAny>> {
822        let client = self.clone();
823
824        pyo3_async_runtimes::tokio::future_into_py(py, async move {
825            client
826                .submit_order(
827                    trader_id,
828                    strategy_id,
829                    instrument_id,
830                    td_mode,
831                    client_order_id,
832                    order_side,
833                    order_type,
834                    quantity,
835                    time_in_force,
836                    price,
837                    trigger_price,
838                    post_only,
839                    reduce_only,
840                    quote_quantity,
841                    position_side,
842                )
843                .await
844                .map_err(to_pyvalue_err)
845        })
846    }
847
848    /// Cancels an existing order via WebSocket.
849    #[pyo3(name = "cancel_order")]
850    #[pyo3(signature = (
851        trader_id,
852        strategy_id,
853        instrument_id,
854        client_order_id=None,
855        venue_order_id=None,
856    ))]
857    #[allow(clippy::too_many_arguments)]
858    fn py_cancel_order<'py>(
859        &self,
860        py: Python<'py>,
861        trader_id: TraderId,
862        strategy_id: StrategyId,
863        instrument_id: InstrumentId,
864        client_order_id: Option<ClientOrderId>,
865        venue_order_id: Option<VenueOrderId>,
866    ) -> PyResult<Bound<'py, PyAny>> {
867        let client = self.clone();
868
869        pyo3_async_runtimes::tokio::future_into_py(py, async move {
870            client
871                .cancel_order(
872                    trader_id,
873                    strategy_id,
874                    instrument_id,
875                    client_order_id,
876                    venue_order_id,
877                )
878                .await
879                .map_err(to_pyvalue_err)
880        })
881    }
882
883    /// Modify an existing order via WebSocket.
884    #[pyo3(name = "modify_order")]
885    #[pyo3(signature = (
886        trader_id,
887        strategy_id,
888        instrument_id,
889        client_order_id=None,
890        venue_order_id=None,
891        price=None,
892        quantity=None,
893    ))]
894    #[allow(clippy::too_many_arguments)]
895    fn py_modify_order<'py>(
896        &self,
897        py: Python<'py>,
898        trader_id: TraderId,
899        strategy_id: StrategyId,
900        instrument_id: InstrumentId,
901        client_order_id: Option<ClientOrderId>,
902        venue_order_id: Option<VenueOrderId>,
903        price: Option<Price>,
904        quantity: Option<Quantity>,
905    ) -> PyResult<Bound<'py, PyAny>> {
906        let client = self.clone();
907
908        pyo3_async_runtimes::tokio::future_into_py(py, async move {
909            client
910                .modify_order(
911                    trader_id,
912                    strategy_id,
913                    instrument_id,
914                    client_order_id,
915                    price,
916                    quantity,
917                    venue_order_id,
918                )
919                .await
920                .map_err(to_pyvalue_err)
921        })
922    }
923
924    /// Submits multiple orders via WebSocket.
925    #[allow(clippy::type_complexity)]
926    #[pyo3(name = "batch_submit_orders")]
927    fn py_batch_submit_orders<'py>(
928        &self,
929        py: Python<'py>,
930        orders: Vec<PyObject>,
931    ) -> PyResult<Bound<'py, PyAny>> {
932        let mut domain_orders = Vec::with_capacity(orders.len());
933
934        for obj in orders {
935            let (
936                instrument_type,
937                instrument_id,
938                td_mode,
939                client_order_id,
940                order_side,
941                order_type,
942                quantity,
943                position_side,
944                price,
945                trigger_price,
946                post_only,
947                reduce_only,
948            ): (
949                String,
950                InstrumentId,
951                String,
952                ClientOrderId,
953                OrderSide,
954                OrderType,
955                Quantity,
956                Option<PositionSide>,
957                Option<Price>,
958                Option<Price>,
959                Option<bool>,
960                Option<bool>,
961            ) = obj
962                .extract(py)
963                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
964
965            let inst_type =
966                OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
967            let trade_mode = OKXTradeMode::from_str(&td_mode).map_err(to_pyvalue_err)?;
968
969            domain_orders.push((
970                inst_type,
971                instrument_id,
972                trade_mode,
973                client_order_id,
974                order_side,
975                position_side,
976                order_type,
977                quantity,
978                price,
979                trigger_price,
980                post_only,
981                reduce_only,
982            ));
983        }
984
985        let client = self.clone();
986
987        pyo3_async_runtimes::tokio::future_into_py(py, async move {
988            client
989                .batch_submit_orders(domain_orders)
990                .await
991                .map_err(to_pyvalue_err)
992        })
993    }
994
995    /// Cancels multiple orders via WebSocket.
996    #[pyo3(name = "batch_cancel_orders")]
997    fn py_batch_cancel_orders<'py>(
998        &self,
999        py: Python<'py>,
1000        orders: Vec<PyObject>,
1001    ) -> PyResult<Bound<'py, PyAny>> {
1002        let mut domain_orders = Vec::with_capacity(orders.len());
1003
1004        for obj in orders {
1005            let (instrument_type, instrument_id, client_order_id, order_id): (
1006                String,
1007                InstrumentId,
1008                Option<ClientOrderId>,
1009                Option<String>,
1010            ) = obj
1011                .extract(py)
1012                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
1013            let inst_type =
1014                OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1015            domain_orders.push((inst_type, instrument_id, client_order_id, order_id));
1016        }
1017
1018        let client = self.clone();
1019
1020        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1021            client
1022                .batch_cancel_orders(domain_orders)
1023                .await
1024                .map_err(to_pyvalue_err)
1025        })
1026    }
1027
1028    /// Modifies multiple orders via WebSocket.
1029    #[pyo3(name = "batch_modify_orders")]
1030    fn py_batch_modify_orders<'py>(
1031        &self,
1032        py: Python<'py>,
1033        orders: Vec<PyObject>,
1034    ) -> PyResult<Bound<'py, PyAny>> {
1035        let mut domain_orders = Vec::with_capacity(orders.len());
1036
1037        for obj in orders {
1038            let (
1039                instrument_type,
1040                instrument_id,
1041                client_order_id,
1042                new_client_order_id,
1043                price,
1044                quantity,
1045            ): (
1046                String,
1047                InstrumentId,
1048                ClientOrderId,
1049                ClientOrderId,
1050                Option<Price>,
1051                Option<Quantity>,
1052            ) = obj
1053                .extract(py)
1054                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
1055            let inst_type =
1056                OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1057            domain_orders.push((
1058                inst_type,
1059                instrument_id,
1060                client_order_id,
1061                new_client_order_id,
1062                price,
1063                quantity,
1064            ));
1065        }
1066
1067        let client = self.clone();
1068
1069        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1070            client
1071                .batch_modify_orders(domain_orders)
1072                .await
1073                .map_err(to_pyvalue_err)
1074        })
1075    }
1076}
1077
1078pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
1079    if let Err(e) = callback.call1(py, (py_obj,)) {
1080        tracing::error!("Error calling Python: {e}");
1081    }
1082}
1083
1084fn call_python_with_data<F>(callback: &PyObject, data_converter: F)
1085where
1086    F: FnOnce(Python) -> PyResult<PyObject>,
1087{
1088    Python::with_gil(|py| match data_converter(py) {
1089        Ok(py_obj) => call_python(py, callback, py_obj),
1090        Err(e) => tracing::error!("Failed to convert data to Python object: {e}"),
1091    });
1092}