nautilus_okx/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the OKX WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - RwLock is preferred over Mutex (many reads, few writes).
43
44use std::str::FromStr;
45
46use futures_util::StreamExt;
47use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
48use nautilus_model::{
49    data::{BarType, Data, OrderBookDeltas_API},
50    enums::{OrderSide, OrderType, PositionSide, TimeInForce},
51    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
52    python::{
53        data::data_to_pycapsule,
54        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
55    },
56    types::{Price, Quantity},
57};
58use pyo3::{IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
59
60use crate::{
61    common::enums::{OKXInstrumentType, OKXTradeMode},
62    websocket::{
63        OKXWebSocketClient,
64        messages::{ExecutionReport, NautilusWsMessage, OKXWebSocketError},
65    },
66};
67
68#[pyo3::pymethods]
69impl OKXWebSocketError {
70    #[getter]
71    pub fn code(&self) -> &str {
72        &self.code
73    }
74
75    #[getter]
76    pub fn message(&self) -> &str {
77        &self.message
78    }
79
80    #[getter]
81    pub fn conn_id(&self) -> Option<&str> {
82        self.conn_id.as_deref()
83    }
84
85    #[getter]
86    pub fn ts_event(&self) -> u64 {
87        self.timestamp
88    }
89
90    fn __repr__(&self) -> String {
91        format!(
92            "OKXWebSocketError(code='{}', message='{}', conn_id={:?}, ts_event={})",
93            self.code, self.message, self.conn_id, self.timestamp
94        )
95    }
96}
97
98#[pymethods]
99impl OKXWebSocketClient {
100    #[new]
101    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
102    fn py_new(
103        url: Option<String>,
104        api_key: Option<String>,
105        api_secret: Option<String>,
106        api_passphrase: Option<String>,
107        account_id: Option<AccountId>,
108        heartbeat: Option<u64>,
109    ) -> PyResult<Self> {
110        Self::new(
111            url,
112            api_key,
113            api_secret,
114            api_passphrase,
115            account_id,
116            heartbeat,
117        )
118        .map_err(to_pyvalue_err)
119    }
120
121    #[staticmethod]
122    #[pyo3(name = "with_credentials")]
123    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
124    fn py_with_credentials(
125        url: Option<String>,
126        api_key: Option<String>,
127        api_secret: Option<String>,
128        api_passphrase: Option<String>,
129        account_id: Option<AccountId>,
130        heartbeat: Option<u64>,
131    ) -> PyResult<Self> {
132        Self::with_credentials(
133            url,
134            api_key,
135            api_secret,
136            api_passphrase,
137            account_id,
138            heartbeat,
139        )
140        .map_err(to_pyvalue_err)
141    }
142
143    #[staticmethod]
144    #[pyo3(name = "from_env")]
145    fn py_from_env() -> PyResult<Self> {
146        Self::from_env().map_err(to_pyvalue_err)
147    }
148
149    #[getter]
150    #[pyo3(name = "url")]
151    #[must_use]
152    pub fn py_url(&self) -> &str {
153        self.url()
154    }
155
156    #[getter]
157    #[pyo3(name = "api_key")]
158    #[must_use]
159    pub fn py_api_key(&self) -> Option<&str> {
160        self.api_key()
161    }
162
163    #[pyo3(name = "is_active")]
164    fn py_is_active(&mut self) -> bool {
165        self.is_active()
166    }
167
168    #[pyo3(name = "is_closed")]
169    fn py_is_closed(&mut self) -> bool {
170        self.is_closed()
171    }
172
173    #[pyo3(name = "cancel_all_requests")]
174    pub fn py_cancel_all_requests(&self) {
175        self.cancel_all_requests();
176    }
177
178    #[pyo3(name = "get_subscriptions")]
179    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
180        let channels = self.get_subscriptions(instrument_id);
181
182        // Convert to OKX channel names
183        channels
184            .iter()
185            .map(|c| {
186                serde_json::to_value(c)
187                    .ok()
188                    .and_then(|v| v.as_str().map(String::from))
189                    .unwrap_or_else(|| c.to_string())
190            })
191            .collect()
192    }
193
194    #[pyo3(name = "connect")]
195    fn py_connect<'py>(
196        &mut self,
197        py: Python<'py>,
198        instruments: Vec<PyObject>,
199        callback: PyObject,
200    ) -> PyResult<Bound<'py, PyAny>> {
201        let mut instruments_any = Vec::new();
202        for inst in instruments {
203            let inst_any = pyobject_to_instrument_any(py, inst)?;
204            instruments_any.push(inst_any);
205        }
206
207        self.initialize_instruments_cache(instruments_any);
208
209        let mut client = self.clone();
210
211        pyo3_async_runtimes::tokio::future_into_py(py, async move {
212            client.connect().await.map_err(to_pyruntime_err)?;
213
214            let stream = client.stream();
215
216            tokio::spawn(async move {
217                tokio::pin!(stream);
218
219                while let Some(msg) = stream.next().await {
220                    match msg {
221                        NautilusWsMessage::Instrument(msg) => {
222                            call_python_with_data(&callback, |py| {
223                                instrument_any_to_pyobject(py, *msg)
224                            });
225                        }
226                        NautilusWsMessage::Data(msg) => Python::with_gil(|py| {
227                            for data in msg {
228                                let py_obj = data_to_pycapsule(py, data);
229                                call_python(py, &callback, py_obj);
230                            }
231                        }),
232                        NautilusWsMessage::FundingRates(msg) => {
233                            for data in msg {
234                                call_python_with_data(&callback, |py| data.into_py_any(py));
235                            }
236                        }
237                        NautilusWsMessage::OrderRejected(msg) => {
238                            call_python_with_data(&callback, |py| msg.into_py_any(py))
239                        }
240                        NautilusWsMessage::OrderCancelRejected(msg) => {
241                            call_python_with_data(&callback, |py| msg.into_py_any(py))
242                        }
243                        NautilusWsMessage::OrderModifyRejected(msg) => {
244                            call_python_with_data(&callback, |py| msg.into_py_any(py))
245                        }
246                        NautilusWsMessage::ExecutionReports(msg) => {
247                            for report in msg {
248                                match report {
249                                    ExecutionReport::Order(report) => {
250                                        call_python_with_data(&callback, |py| {
251                                            report.into_py_any(py)
252                                        })
253                                    }
254                                    ExecutionReport::Fill(report) => {
255                                        call_python_with_data(&callback, |py| {
256                                            report.into_py_any(py)
257                                        })
258                                    }
259                                };
260                            }
261                        }
262                        NautilusWsMessage::Deltas(msg) => Python::with_gil(|py| {
263                            let py_obj =
264                                data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(msg)));
265                            call_python(py, &callback, py_obj);
266                        }),
267                        NautilusWsMessage::AccountUpdate(msg) => {
268                            call_python_with_data(&callback, |py| msg.py_to_dict(py));
269                        }
270                        NautilusWsMessage::Reconnected => {} // Nothing to handle
271                        NautilusWsMessage::Error(msg) => {
272                            call_python_with_data(&callback, |py| msg.into_py_any(py));
273                        }
274                        NautilusWsMessage::Raw(msg) => {
275                            tracing::debug!("Received raw message, skipping: {msg}");
276                        }
277                    }
278                }
279            });
280
281            Ok(())
282        })
283    }
284
285    #[pyo3(name = "wait_until_active")]
286    fn py_wait_until_active<'py>(
287        &self,
288        py: Python<'py>,
289        timeout_secs: f64,
290    ) -> PyResult<Bound<'py, PyAny>> {
291        let client = self.clone();
292
293        pyo3_async_runtimes::tokio::future_into_py(py, async move {
294            client
295                .wait_until_active(timeout_secs)
296                .await
297                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
298            Ok(())
299        })
300    }
301
302    #[pyo3(name = "close")]
303    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
304        let mut client = self.clone();
305
306        pyo3_async_runtimes::tokio::future_into_py(py, async move {
307            if let Err(e) = client.close().await {
308                log::error!("Error on close: {e}");
309            }
310            Ok(())
311        })
312    }
313
314    #[pyo3(name = "subscribe_instruments")]
315    fn py_subscribe_instruments<'py>(
316        &self,
317        py: Python<'py>,
318        instrument_type: OKXInstrumentType,
319    ) -> PyResult<Bound<'py, PyAny>> {
320        let client = self.clone();
321
322        pyo3_async_runtimes::tokio::future_into_py(py, async move {
323            if let Err(e) = client.subscribe_instruments(instrument_type).await {
324                log::error!("Failed to subscribe to instruments '{instrument_type}': {e}");
325            }
326            Ok(())
327        })
328    }
329
330    #[pyo3(name = "subscribe_instrument")]
331    fn py_subscribe_instrument<'py>(
332        &self,
333        py: Python<'py>,
334        instrument_id: InstrumentId,
335    ) -> PyResult<Bound<'py, PyAny>> {
336        let client = self.clone();
337
338        pyo3_async_runtimes::tokio::future_into_py(py, async move {
339            if let Err(e) = client.subscribe_instrument(instrument_id).await {
340                log::error!("Failed to subscribe to instrument {instrument_id}: {e}");
341            }
342            Ok(())
343        })
344    }
345
346    #[pyo3(name = "subscribe_book")]
347    fn py_subscribe_book<'py>(
348        &self,
349        py: Python<'py>,
350        instrument_id: InstrumentId,
351    ) -> PyResult<Bound<'py, PyAny>> {
352        let client = self.clone();
353
354        pyo3_async_runtimes::tokio::future_into_py(py, async move {
355            if let Err(e) = client.subscribe_book(instrument_id).await {
356                log::error!("Failed to subscribe to order book: {e}");
357            }
358            Ok(())
359        })
360    }
361
362    #[pyo3(name = "subscribe_book50_l2_tbt")]
363    fn py_subscribe_book50_l2_tbt<'py>(
364        &self,
365        py: Python<'py>,
366        instrument_id: InstrumentId,
367    ) -> PyResult<Bound<'py, PyAny>> {
368        let client = self.clone();
369
370        pyo3_async_runtimes::tokio::future_into_py(py, async move {
371            if let Err(e) = client.subscribe_books50_l2_tbt(instrument_id).await {
372                log::error!("Failed to subscribe to books50_tbt: {e}");
373            }
374            Ok(())
375        })
376    }
377
378    #[pyo3(name = "subscribe_book_l2_tbt")]
379    fn py_subscribe_book_l2_tbt<'py>(
380        &self,
381        py: Python<'py>,
382        instrument_id: InstrumentId,
383    ) -> PyResult<Bound<'py, PyAny>> {
384        let client = self.clone();
385
386        pyo3_async_runtimes::tokio::future_into_py(py, async move {
387            if let Err(e) = client.subscribe_book_l2_tbt(instrument_id).await {
388                log::error!("Failed to subscribe to books_l2_tbt: {e}");
389            }
390            Ok(())
391        })
392    }
393
394    #[pyo3(name = "subscribe_book_depth5")]
395    fn py_subscribe_book_depth5<'py>(
396        &self,
397        py: Python<'py>,
398        instrument_id: InstrumentId,
399    ) -> PyResult<Bound<'py, PyAny>> {
400        let client = self.clone();
401
402        pyo3_async_runtimes::tokio::future_into_py(py, async move {
403            if let Err(e) = client.subscribe_book_depth5(instrument_id).await {
404                log::error!("Failed to subscribe to books5: {e}");
405            }
406            Ok(())
407        })
408    }
409
410    #[pyo3(name = "subscribe_quotes")]
411    fn py_subscribe_quotes<'py>(
412        &self,
413        py: Python<'py>,
414        instrument_id: InstrumentId,
415    ) -> PyResult<Bound<'py, PyAny>> {
416        let client = self.clone();
417
418        pyo3_async_runtimes::tokio::future_into_py(py, async move {
419            if let Err(e) = client.subscribe_quotes(instrument_id).await {
420                log::error!("Failed to subscribe to quotes: {e}");
421            }
422            Ok(())
423        })
424    }
425
426    #[pyo3(name = "subscribe_trades")]
427    fn py_subscribe_trades<'py>(
428        &self,
429        py: Python<'py>,
430        instrument_id: InstrumentId,
431        aggregated: bool,
432    ) -> PyResult<Bound<'py, PyAny>> {
433        let client = self.clone();
434
435        pyo3_async_runtimes::tokio::future_into_py(py, async move {
436            if let Err(e) = client.subscribe_trades(instrument_id, aggregated).await {
437                log::error!("Failed to subscribe to trades: {e}");
438            }
439            Ok(())
440        })
441    }
442
443    #[pyo3(name = "subscribe_bars")]
444    fn py_subscribe_bars<'py>(
445        &self,
446        py: Python<'py>,
447        bar_type: BarType,
448    ) -> PyResult<Bound<'py, PyAny>> {
449        let client = self.clone();
450
451        pyo3_async_runtimes::tokio::future_into_py(py, async move {
452            if let Err(e) = client.subscribe_bars(bar_type).await {
453                log::error!("Failed to subscribe to bars: {e}");
454            }
455            Ok(())
456        })
457    }
458
459    #[pyo3(name = "unsubscribe_book")]
460    fn py_unsubscribe_book<'py>(
461        &self,
462        py: Python<'py>,
463        instrument_id: InstrumentId,
464    ) -> PyResult<Bound<'py, PyAny>> {
465        let client = self.clone();
466
467        pyo3_async_runtimes::tokio::future_into_py(py, async move {
468            if let Err(e) = client.unsubscribe_book(instrument_id).await {
469                log::error!("Failed to unsubscribe from order book: {e}");
470            }
471            Ok(())
472        })
473    }
474
475    #[pyo3(name = "unsubscribe_book_depth5")]
476    fn py_unsubscribe_book_depth5<'py>(
477        &self,
478        py: Python<'py>,
479        instrument_id: InstrumentId,
480    ) -> PyResult<Bound<'py, PyAny>> {
481        let client = self.clone();
482
483        pyo3_async_runtimes::tokio::future_into_py(py, async move {
484            if let Err(e) = client.unsubscribe_book_depth5(instrument_id).await {
485                log::error!("Failed to unsubscribe from books5: {e}");
486            }
487            Ok(())
488        })
489    }
490
491    #[pyo3(name = "unsubscribe_book50_l2_tbt")]
492    fn py_unsubscribe_book50_l2_tbt<'py>(
493        &self,
494        py: Python<'py>,
495        instrument_id: InstrumentId,
496    ) -> PyResult<Bound<'py, PyAny>> {
497        let client = self.clone();
498
499        pyo3_async_runtimes::tokio::future_into_py(py, async move {
500            if let Err(e) = client.unsubscribe_book50_l2_tbt(instrument_id).await {
501                log::error!("Failed to unsubscribe from books50_l2_tbt: {e}");
502            }
503            Ok(())
504        })
505    }
506
507    #[pyo3(name = "unsubscribe_book_l2_tbt")]
508    fn py_unsubscribe_book_l2_tbt<'py>(
509        &self,
510        py: Python<'py>,
511        instrument_id: InstrumentId,
512    ) -> PyResult<Bound<'py, PyAny>> {
513        let client = self.clone();
514
515        pyo3_async_runtimes::tokio::future_into_py(py, async move {
516            if let Err(e) = client.unsubscribe_book_l2_tbt(instrument_id).await {
517                log::error!("Failed to unsubscribe from books_l2_tbt: {e}");
518            }
519            Ok(())
520        })
521    }
522
523    #[pyo3(name = "unsubscribe_quotes")]
524    fn py_unsubscribe_quotes<'py>(
525        &self,
526        py: Python<'py>,
527        instrument_id: InstrumentId,
528    ) -> PyResult<Bound<'py, PyAny>> {
529        let client = self.clone();
530
531        pyo3_async_runtimes::tokio::future_into_py(py, async move {
532            if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
533                log::error!("Failed to unsubscribe from quotes: {e}");
534            }
535            Ok(())
536        })
537    }
538
539    #[pyo3(name = "unsubscribe_trades")]
540    fn py_unsubscribe_trades<'py>(
541        &self,
542        py: Python<'py>,
543        instrument_id: InstrumentId,
544        aggregated: bool,
545    ) -> PyResult<Bound<'py, PyAny>> {
546        let client = self.clone();
547
548        pyo3_async_runtimes::tokio::future_into_py(py, async move {
549            if let Err(e) = client.unsubscribe_trades(instrument_id, aggregated).await {
550                log::error!("Failed to unsubscribe from trades: {e}");
551            }
552            Ok(())
553        })
554    }
555
556    #[pyo3(name = "unsubscribe_bars")]
557    fn py_unsubscribe_bars<'py>(
558        &self,
559        py: Python<'py>,
560        bar_type: BarType,
561    ) -> PyResult<Bound<'py, PyAny>> {
562        let client = self.clone();
563
564        pyo3_async_runtimes::tokio::future_into_py(py, async move {
565            if let Err(e) = client.unsubscribe_bars(bar_type).await {
566                log::error!("Failed to unsubscribe from bars: {e}");
567            }
568            Ok(())
569        })
570    }
571
572    #[pyo3(name = "subscribe_ticker")]
573    fn py_subscribe_ticker<'py>(
574        &self,
575        py: Python<'py>,
576        instrument_id: InstrumentId,
577    ) -> PyResult<Bound<'py, PyAny>> {
578        let client = self.clone();
579
580        pyo3_async_runtimes::tokio::future_into_py(py, async move {
581            if let Err(e) = client.subscribe_ticker(instrument_id).await {
582                log::error!("Failed to subscribe to ticker: {e}");
583            }
584            Ok(())
585        })
586    }
587
588    #[pyo3(name = "unsubscribe_ticker")]
589    fn py_unsubscribe_ticker<'py>(
590        &self,
591        py: Python<'py>,
592        instrument_id: InstrumentId,
593    ) -> PyResult<Bound<'py, PyAny>> {
594        let client = self.clone();
595
596        pyo3_async_runtimes::tokio::future_into_py(py, async move {
597            if let Err(e) = client.unsubscribe_ticker(instrument_id).await {
598                log::error!("Failed to unsubscribe from ticker: {e}");
599            }
600            Ok(())
601        })
602    }
603
604    #[pyo3(name = "subscribe_mark_prices")]
605    fn py_subscribe_mark_prices<'py>(
606        &self,
607        py: Python<'py>,
608        instrument_id: InstrumentId,
609    ) -> PyResult<Bound<'py, PyAny>> {
610        let client = self.clone();
611
612        pyo3_async_runtimes::tokio::future_into_py(py, async move {
613            if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
614                log::error!("Failed to subscribe to mark prices: {e}");
615            }
616            Ok(())
617        })
618    }
619
620    #[pyo3(name = "unsubscribe_mark_prices")]
621    fn py_unsubscribe_mark_prices<'py>(
622        &self,
623        py: Python<'py>,
624        instrument_id: InstrumentId,
625    ) -> PyResult<Bound<'py, PyAny>> {
626        let client = self.clone();
627
628        pyo3_async_runtimes::tokio::future_into_py(py, async move {
629            if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
630                log::error!("Failed to unsubscribe from mark prices: {e}");
631            }
632            Ok(())
633        })
634    }
635
636    #[pyo3(name = "subscribe_index_prices")]
637    fn py_subscribe_index_prices<'py>(
638        &self,
639        py: Python<'py>,
640        instrument_id: InstrumentId,
641    ) -> PyResult<Bound<'py, PyAny>> {
642        let client = self.clone();
643
644        pyo3_async_runtimes::tokio::future_into_py(py, async move {
645            if let Err(e) = client.subscribe_index_prices(instrument_id).await {
646                log::error!("Failed to subscribe to index prices: {e}");
647            }
648            Ok(())
649        })
650    }
651
652    #[pyo3(name = "unsubscribe_index_prices")]
653    fn py_unsubscribe_index_prices<'py>(
654        &self,
655        py: Python<'py>,
656        instrument_id: InstrumentId,
657    ) -> PyResult<Bound<'py, PyAny>> {
658        let client = self.clone();
659
660        pyo3_async_runtimes::tokio::future_into_py(py, async move {
661            if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
662                log::error!("Failed to unsubscribe from index prices: {e}");
663            }
664            Ok(())
665        })
666    }
667
668    #[pyo3(name = "subscribe_funding_rates")]
669    fn py_subscribe_funding_rates<'py>(
670        &self,
671        py: Python<'py>,
672        instrument_id: InstrumentId,
673    ) -> PyResult<Bound<'py, PyAny>> {
674        let client = self.clone();
675
676        pyo3_async_runtimes::tokio::future_into_py(py, async move {
677            if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
678                log::error!("Failed to subscribe to funding rates: {e}");
679            }
680            Ok(())
681        })
682    }
683
684    #[pyo3(name = "unsubscribe_funding_rates")]
685    fn py_unsubscribe_funding_rates<'py>(
686        &self,
687        py: Python<'py>,
688        instrument_id: InstrumentId,
689    ) -> PyResult<Bound<'py, PyAny>> {
690        let client = self.clone();
691
692        pyo3_async_runtimes::tokio::future_into_py(py, async move {
693            if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
694                log::error!("Failed to unsubscribe from funding rates: {e}");
695            }
696            Ok(())
697        })
698    }
699
700    #[pyo3(name = "subscribe_orders")]
701    fn py_subscribe_orders<'py>(
702        &self,
703        py: Python<'py>,
704        instrument_type: OKXInstrumentType,
705    ) -> PyResult<Bound<'py, PyAny>> {
706        let client = self.clone();
707
708        pyo3_async_runtimes::tokio::future_into_py(py, async move {
709            if let Err(e) = client.subscribe_orders(instrument_type).await {
710                log::error!("Failed to subscribe to orders '{instrument_type}': {e}");
711            }
712            Ok(())
713        })
714    }
715
716    #[pyo3(name = "unsubscribe_orders")]
717    fn py_unsubscribe_orders<'py>(
718        &self,
719        py: Python<'py>,
720        instrument_type: OKXInstrumentType,
721    ) -> PyResult<Bound<'py, PyAny>> {
722        let client = self.clone();
723
724        pyo3_async_runtimes::tokio::future_into_py(py, async move {
725            if let Err(e) = client.unsubscribe_orders(instrument_type).await {
726                log::error!("Failed to unsubscribe from orders '{instrument_type}': {e}");
727            }
728            Ok(())
729        })
730    }
731
732    #[pyo3(name = "subscribe_fills")]
733    fn py_subscribe_fills<'py>(
734        &self,
735        py: Python<'py>,
736        instrument_type: OKXInstrumentType,
737    ) -> PyResult<Bound<'py, PyAny>> {
738        let client = self.clone();
739
740        pyo3_async_runtimes::tokio::future_into_py(py, async move {
741            if let Err(e) = client.subscribe_fills(instrument_type).await {
742                log::error!("Failed to subscribe to fills '{instrument_type}': {e}");
743            }
744            Ok(())
745        })
746    }
747
748    #[pyo3(name = "unsubscribe_fills")]
749    fn py_unsubscribe_fills<'py>(
750        &self,
751        py: Python<'py>,
752        instrument_type: OKXInstrumentType,
753    ) -> PyResult<Bound<'py, PyAny>> {
754        let client = self.clone();
755
756        pyo3_async_runtimes::tokio::future_into_py(py, async move {
757            if let Err(e) = client.unsubscribe_fills(instrument_type).await {
758                log::error!("Failed to unsubscribe from fills '{instrument_type}': {e}");
759            }
760            Ok(())
761        })
762    }
763
764    #[pyo3(name = "subscribe_account")]
765    fn py_subscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
766        let client = self.clone();
767
768        pyo3_async_runtimes::tokio::future_into_py(py, async move {
769            if let Err(e) = client.subscribe_account().await {
770                log::error!("Failed to subscribe to account: {e}");
771            }
772            Ok(())
773        })
774    }
775
776    #[pyo3(name = "unsubscribe_account")]
777    fn py_unsubscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
778        let client = self.clone();
779
780        pyo3_async_runtimes::tokio::future_into_py(py, async move {
781            if let Err(e) = client.unsubscribe_account().await {
782                log::error!("Failed to unsubscribe from account: {e}");
783            }
784            Ok(())
785        })
786    }
787
788    #[pyo3(name = "submit_order")]
789    #[pyo3(signature = (
790        trader_id,
791        strategy_id,
792        instrument_id,
793        td_mode,
794        client_order_id,
795        order_side,
796        order_type,
797        quantity,
798        time_in_force=None,
799        price=None,
800        trigger_price=None,
801        post_only=None,
802        reduce_only=None,
803        quote_quantity=None,
804        position_side=None,
805    ))]
806    #[allow(clippy::too_many_arguments)]
807    fn py_submit_order<'py>(
808        &self,
809        py: Python<'py>,
810        trader_id: TraderId,
811        strategy_id: StrategyId,
812        instrument_id: InstrumentId,
813        td_mode: OKXTradeMode,
814        client_order_id: ClientOrderId,
815        order_side: OrderSide,
816        order_type: OrderType,
817        quantity: Quantity,
818        time_in_force: Option<TimeInForce>,
819        price: Option<Price>,
820        trigger_price: Option<Price>,
821        post_only: Option<bool>,
822        reduce_only: Option<bool>,
823        quote_quantity: Option<bool>,
824        position_side: Option<PositionSide>,
825    ) -> PyResult<Bound<'py, PyAny>> {
826        let client = self.clone();
827
828        pyo3_async_runtimes::tokio::future_into_py(py, async move {
829            client
830                .submit_order(
831                    trader_id,
832                    strategy_id,
833                    instrument_id,
834                    td_mode,
835                    client_order_id,
836                    order_side,
837                    order_type,
838                    quantity,
839                    time_in_force,
840                    price,
841                    trigger_price,
842                    post_only,
843                    reduce_only,
844                    quote_quantity,
845                    position_side,
846                )
847                .await
848                .map_err(to_pyvalue_err)
849        })
850    }
851
852    #[pyo3(name = "cancel_order")]
853    #[pyo3(signature = (
854        trader_id,
855        strategy_id,
856        instrument_id,
857        client_order_id=None,
858        venue_order_id=None,
859    ))]
860    #[allow(clippy::too_many_arguments)]
861    fn py_cancel_order<'py>(
862        &self,
863        py: Python<'py>,
864        trader_id: TraderId,
865        strategy_id: StrategyId,
866        instrument_id: InstrumentId,
867        client_order_id: Option<ClientOrderId>,
868        venue_order_id: Option<VenueOrderId>,
869    ) -> PyResult<Bound<'py, PyAny>> {
870        let client = self.clone();
871
872        pyo3_async_runtimes::tokio::future_into_py(py, async move {
873            client
874                .cancel_order(
875                    trader_id,
876                    strategy_id,
877                    instrument_id,
878                    client_order_id,
879                    venue_order_id,
880                )
881                .await
882                .map_err(to_pyvalue_err)
883        })
884    }
885
886    #[pyo3(name = "modify_order")]
887    #[pyo3(signature = (
888        trader_id,
889        strategy_id,
890        instrument_id,
891        client_order_id=None,
892        venue_order_id=None,
893        price=None,
894        quantity=None,
895    ))]
896    #[allow(clippy::too_many_arguments)]
897    fn py_modify_order<'py>(
898        &self,
899        py: Python<'py>,
900        trader_id: TraderId,
901        strategy_id: StrategyId,
902        instrument_id: InstrumentId,
903        client_order_id: Option<ClientOrderId>,
904        venue_order_id: Option<VenueOrderId>,
905        price: Option<Price>,
906        quantity: Option<Quantity>,
907    ) -> PyResult<Bound<'py, PyAny>> {
908        let client = self.clone();
909
910        pyo3_async_runtimes::tokio::future_into_py(py, async move {
911            client
912                .modify_order(
913                    trader_id,
914                    strategy_id,
915                    instrument_id,
916                    client_order_id,
917                    price,
918                    quantity,
919                    venue_order_id,
920                )
921                .await
922                .map_err(to_pyvalue_err)
923        })
924    }
925
926    #[allow(clippy::type_complexity)]
927    #[pyo3(name = "batch_submit_orders")]
928    fn py_batch_submit_orders<'py>(
929        &self,
930        py: Python<'py>,
931        orders: Vec<PyObject>,
932    ) -> PyResult<Bound<'py, PyAny>> {
933        let mut domain_orders = Vec::with_capacity(orders.len());
934
935        for obj in orders {
936            let (
937                instrument_type,
938                instrument_id,
939                td_mode,
940                client_order_id,
941                order_side,
942                order_type,
943                quantity,
944                position_side,
945                price,
946                trigger_price,
947                post_only,
948                reduce_only,
949            ): (
950                String,
951                InstrumentId,
952                String,
953                ClientOrderId,
954                OrderSide,
955                OrderType,
956                Quantity,
957                Option<PositionSide>,
958                Option<Price>,
959                Option<Price>,
960                Option<bool>,
961                Option<bool>,
962            ) = obj
963                .extract(py)
964                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
965
966            let inst_type =
967                OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
968            let trade_mode = OKXTradeMode::from_str(&td_mode).map_err(to_pyvalue_err)?;
969
970            domain_orders.push((
971                inst_type,
972                instrument_id,
973                trade_mode,
974                client_order_id,
975                order_side,
976                position_side,
977                order_type,
978                quantity,
979                price,
980                trigger_price,
981                post_only,
982                reduce_only,
983            ));
984        }
985
986        let client = self.clone();
987
988        pyo3_async_runtimes::tokio::future_into_py(py, async move {
989            client
990                .batch_submit_orders(domain_orders)
991                .await
992                .map_err(to_pyvalue_err)
993        })
994    }
995
996    /// Cancels multiple orders via WebSocket.
997    #[pyo3(name = "batch_cancel_orders")]
998    fn py_batch_cancel_orders<'py>(
999        &self,
1000        py: Python<'py>,
1001        orders: Vec<PyObject>,
1002    ) -> PyResult<Bound<'py, PyAny>> {
1003        let mut domain_orders = Vec::with_capacity(orders.len());
1004
1005        for obj in orders {
1006            let (instrument_type, instrument_id, client_order_id, order_id): (
1007                String,
1008                InstrumentId,
1009                Option<ClientOrderId>,
1010                Option<String>,
1011            ) = obj
1012                .extract(py)
1013                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
1014            let inst_type =
1015                OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1016            domain_orders.push((inst_type, instrument_id, client_order_id, order_id));
1017        }
1018
1019        let client = self.clone();
1020
1021        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1022            client
1023                .batch_cancel_orders(domain_orders)
1024                .await
1025                .map_err(to_pyvalue_err)
1026        })
1027    }
1028
1029    #[pyo3(name = "batch_modify_orders")]
1030    fn py_batch_modify_orders<'py>(
1031        &self,
1032        py: Python<'py>,
1033        orders: Vec<PyObject>,
1034    ) -> PyResult<Bound<'py, PyAny>> {
1035        let mut domain_orders = Vec::with_capacity(orders.len());
1036
1037        for obj in orders {
1038            let (
1039                instrument_type,
1040                instrument_id,
1041                client_order_id,
1042                new_client_order_id,
1043                price,
1044                quantity,
1045            ): (
1046                String,
1047                InstrumentId,
1048                ClientOrderId,
1049                ClientOrderId,
1050                Option<Price>,
1051                Option<Quantity>,
1052            ) = obj
1053                .extract(py)
1054                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
1055            let inst_type =
1056                OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1057            domain_orders.push((
1058                inst_type,
1059                instrument_id,
1060                client_order_id,
1061                new_client_order_id,
1062                price,
1063                quantity,
1064            ));
1065        }
1066
1067        let client = self.clone();
1068
1069        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1070            client
1071                .batch_modify_orders(domain_orders)
1072                .await
1073                .map_err(to_pyvalue_err)
1074        })
1075    }
1076
1077    #[pyo3(name = "mass_cancel_orders")]
1078    fn py_mass_cancel_orders<'py>(
1079        &self,
1080        py: Python<'py>,
1081        instrument_id: InstrumentId,
1082    ) -> PyResult<Bound<'py, PyAny>> {
1083        let client = self.clone();
1084
1085        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1086            client
1087                .mass_cancel_orders(instrument_id)
1088                .await
1089                .map_err(to_pyvalue_err)
1090        })
1091    }
1092}
1093
1094pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
1095    if let Err(e) = callback.call1(py, (py_obj,)) {
1096        tracing::error!("Error calling Python: {e}");
1097    }
1098}
1099
1100fn call_python_with_data<F>(callback: &PyObject, data_converter: F)
1101where
1102    F: FnOnce(Python) -> PyResult<PyObject>,
1103{
1104    Python::with_gil(|py| match data_converter(py) {
1105        Ok(py_obj) => call_python(py, callback, py_obj),
1106        Err(e) => tracing::error!("Failed to convert data to Python object: {e}"),
1107    });
1108}