nautilus_okx/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the OKX WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - RwLock is preferred over Mutex (many reads, few writes).
43
44use std::str::FromStr;
45
46use futures_util::StreamExt;
47use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
48use nautilus_model::{
49    data::{BarType, Data, OrderBookDeltas_API},
50    enums::{OrderSide, OrderType, PositionSide, TimeInForce},
51    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
52    python::{
53        data::data_to_pycapsule,
54        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
55    },
56    types::{Price, Quantity},
57};
58use pyo3::{IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
59
60use crate::{
61    common::enums::{OKXInstrumentType, OKXTradeMode, OKXVipLevel},
62    websocket::{
63        OKXWebSocketClient,
64        messages::{ExecutionReport, NautilusWsMessage, OKXWebSocketError},
65    },
66};
67
68#[pyo3::pymethods]
69impl OKXWebSocketError {
70    #[getter]
71    pub fn code(&self) -> &str {
72        &self.code
73    }
74
75    #[getter]
76    pub fn message(&self) -> &str {
77        &self.message
78    }
79
80    #[getter]
81    pub fn conn_id(&self) -> Option<&str> {
82        self.conn_id.as_deref()
83    }
84
85    #[getter]
86    pub fn ts_event(&self) -> u64 {
87        self.timestamp
88    }
89
90    fn __repr__(&self) -> String {
91        format!(
92            "OKXWebSocketError(code='{}', message='{}', conn_id={:?}, ts_event={})",
93            self.code, self.message, self.conn_id, self.timestamp
94        )
95    }
96}
97
98#[pymethods]
99impl OKXWebSocketClient {
100    #[new]
101    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
102    fn py_new(
103        url: Option<String>,
104        api_key: Option<String>,
105        api_secret: Option<String>,
106        api_passphrase: Option<String>,
107        account_id: Option<AccountId>,
108        heartbeat: Option<u64>,
109    ) -> PyResult<Self> {
110        Self::new(
111            url,
112            api_key,
113            api_secret,
114            api_passphrase,
115            account_id,
116            heartbeat,
117        )
118        .map_err(to_pyvalue_err)
119    }
120
121    #[staticmethod]
122    #[pyo3(name = "with_credentials")]
123    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
124    fn py_with_credentials(
125        url: Option<String>,
126        api_key: Option<String>,
127        api_secret: Option<String>,
128        api_passphrase: Option<String>,
129        account_id: Option<AccountId>,
130        heartbeat: Option<u64>,
131    ) -> PyResult<Self> {
132        Self::with_credentials(
133            url,
134            api_key,
135            api_secret,
136            api_passphrase,
137            account_id,
138            heartbeat,
139        )
140        .map_err(to_pyvalue_err)
141    }
142
143    #[staticmethod]
144    #[pyo3(name = "from_env")]
145    fn py_from_env() -> PyResult<Self> {
146        Self::from_env().map_err(to_pyvalue_err)
147    }
148
149    #[getter]
150    #[pyo3(name = "url")]
151    #[must_use]
152    pub fn py_url(&self) -> &str {
153        self.url()
154    }
155
156    #[getter]
157    #[pyo3(name = "api_key")]
158    #[must_use]
159    pub fn py_api_key(&self) -> Option<&str> {
160        self.api_key()
161    }
162
163    #[pyo3(name = "is_active")]
164    fn py_is_active(&mut self) -> bool {
165        self.is_active()
166    }
167
168    #[pyo3(name = "is_closed")]
169    fn py_is_closed(&mut self) -> bool {
170        self.is_closed()
171    }
172
173    #[pyo3(name = "cancel_all_requests")]
174    pub fn py_cancel_all_requests(&self) {
175        self.cancel_all_requests();
176    }
177
178    #[pyo3(name = "get_subscriptions")]
179    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
180        let channels = self.get_subscriptions(instrument_id);
181
182        // Convert to OKX channel names
183        channels
184            .iter()
185            .map(|c| {
186                serde_json::to_value(c)
187                    .ok()
188                    .and_then(|v| v.as_str().map(String::from))
189                    .unwrap_or_else(|| c.to_string())
190            })
191            .collect()
192    }
193
194    /// Sets the VIP level for this client.
195    ///
196    /// The VIP level determines which WebSocket channels are available.
197    #[pyo3(name = "set_vip_level")]
198    fn py_set_vip_level(&self, vip_level: OKXVipLevel) {
199        self.set_vip_level(vip_level);
200    }
201
202    /// Gets the current VIP level.
203    #[pyo3(name = "vip_level")]
204    #[getter]
205    fn py_vip_level(&self) -> OKXVipLevel {
206        self.vip_level()
207    }
208
209    #[pyo3(name = "connect")]
210    fn py_connect<'py>(
211        &mut self,
212        py: Python<'py>,
213        instruments: Vec<Py<PyAny>>,
214        callback: Py<PyAny>,
215    ) -> PyResult<Bound<'py, PyAny>> {
216        let mut instruments_any = Vec::new();
217        for inst in instruments {
218            let inst_any = pyobject_to_instrument_any(py, inst)?;
219            instruments_any.push(inst_any);
220        }
221
222        self.initialize_instruments_cache(instruments_any);
223
224        let mut client = self.clone();
225
226        pyo3_async_runtimes::tokio::future_into_py(py, async move {
227            client.connect().await.map_err(to_pyruntime_err)?;
228
229            let stream = client.stream();
230
231            tokio::spawn(async move {
232                tokio::pin!(stream);
233
234                while let Some(msg) = stream.next().await {
235                    match msg {
236                        NautilusWsMessage::Instrument(msg) => {
237                            call_python_with_data(&callback, |py| {
238                                instrument_any_to_pyobject(py, *msg)
239                            });
240                        }
241                        NautilusWsMessage::Data(msg) => Python::attach(|py| {
242                            for data in msg {
243                                let py_obj = data_to_pycapsule(py, data);
244                                call_python(py, &callback, py_obj);
245                            }
246                        }),
247                        NautilusWsMessage::FundingRates(msg) => {
248                            for data in msg {
249                                call_python_with_data(&callback, |py| data.into_py_any(py));
250                            }
251                        }
252                        NautilusWsMessage::OrderRejected(msg) => {
253                            call_python_with_data(&callback, |py| msg.into_py_any(py))
254                        }
255                        NautilusWsMessage::OrderCancelRejected(msg) => {
256                            call_python_with_data(&callback, |py| msg.into_py_any(py))
257                        }
258                        NautilusWsMessage::OrderModifyRejected(msg) => {
259                            call_python_with_data(&callback, |py| msg.into_py_any(py))
260                        }
261                        NautilusWsMessage::ExecutionReports(msg) => {
262                            for report in msg {
263                                match report {
264                                    ExecutionReport::Order(report) => {
265                                        call_python_with_data(&callback, |py| {
266                                            report.into_py_any(py)
267                                        })
268                                    }
269                                    ExecutionReport::Fill(report) => {
270                                        call_python_with_data(&callback, |py| {
271                                            report.into_py_any(py)
272                                        })
273                                    }
274                                };
275                            }
276                        }
277                        NautilusWsMessage::Deltas(msg) => Python::attach(|py| {
278                            let py_obj =
279                                data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(msg)));
280                            call_python(py, &callback, py_obj);
281                        }),
282                        NautilusWsMessage::AccountUpdate(msg) => {
283                            call_python_with_data(&callback, |py| msg.into_py_any(py));
284                        }
285                        NautilusWsMessage::Reconnected => {} // Nothing to handle
286                        NautilusWsMessage::Error(msg) => {
287                            call_python_with_data(&callback, |py| msg.into_py_any(py));
288                        }
289                        NautilusWsMessage::Raw(msg) => {
290                            tracing::debug!("Received raw message, skipping: {msg}");
291                        }
292                    }
293                }
294            });
295
296            Ok(())
297        })
298    }
299
300    #[pyo3(name = "wait_until_active")]
301    fn py_wait_until_active<'py>(
302        &self,
303        py: Python<'py>,
304        timeout_secs: f64,
305    ) -> PyResult<Bound<'py, PyAny>> {
306        let client = self.clone();
307
308        pyo3_async_runtimes::tokio::future_into_py(py, async move {
309            client
310                .wait_until_active(timeout_secs)
311                .await
312                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
313            Ok(())
314        })
315    }
316
317    #[pyo3(name = "close")]
318    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
319        let mut client = self.clone();
320
321        pyo3_async_runtimes::tokio::future_into_py(py, async move {
322            if let Err(e) = client.close().await {
323                log::error!("Error on close: {e}");
324            }
325            Ok(())
326        })
327    }
328
329    #[pyo3(name = "subscribe_instruments")]
330    fn py_subscribe_instruments<'py>(
331        &self,
332        py: Python<'py>,
333        instrument_type: OKXInstrumentType,
334    ) -> PyResult<Bound<'py, PyAny>> {
335        let client = self.clone();
336
337        pyo3_async_runtimes::tokio::future_into_py(py, async move {
338            if let Err(e) = client.subscribe_instruments(instrument_type).await {
339                log::error!("Failed to subscribe to instruments '{instrument_type}': {e}");
340            }
341            Ok(())
342        })
343    }
344
345    #[pyo3(name = "subscribe_instrument")]
346    fn py_subscribe_instrument<'py>(
347        &self,
348        py: Python<'py>,
349        instrument_id: InstrumentId,
350    ) -> PyResult<Bound<'py, PyAny>> {
351        let client = self.clone();
352
353        pyo3_async_runtimes::tokio::future_into_py(py, async move {
354            if let Err(e) = client.subscribe_instrument(instrument_id).await {
355                log::error!("Failed to subscribe to instrument {instrument_id}: {e}");
356            }
357            Ok(())
358        })
359    }
360
361    #[pyo3(name = "subscribe_book")]
362    fn py_subscribe_book<'py>(
363        &self,
364        py: Python<'py>,
365        instrument_id: InstrumentId,
366    ) -> PyResult<Bound<'py, PyAny>> {
367        let client = self.clone();
368
369        pyo3_async_runtimes::tokio::future_into_py(py, async move {
370            client
371                .subscribe_book(instrument_id)
372                .await
373                .map_err(to_pyvalue_err)
374        })
375    }
376
377    #[pyo3(name = "subscribe_book50_l2_tbt")]
378    fn py_subscribe_book50_l2_tbt<'py>(
379        &self,
380        py: Python<'py>,
381        instrument_id: InstrumentId,
382    ) -> PyResult<Bound<'py, PyAny>> {
383        let client = self.clone();
384
385        pyo3_async_runtimes::tokio::future_into_py(py, async move {
386            if let Err(e) = client.subscribe_book50_l2_tbt(instrument_id).await {
387                log::error!("Failed to subscribe to book50_tbt: {e}");
388            }
389            Ok(())
390        })
391    }
392
393    #[pyo3(name = "subscribe_book_l2_tbt")]
394    fn py_subscribe_book_l2_tbt<'py>(
395        &self,
396        py: Python<'py>,
397        instrument_id: InstrumentId,
398    ) -> PyResult<Bound<'py, PyAny>> {
399        let client = self.clone();
400
401        pyo3_async_runtimes::tokio::future_into_py(py, async move {
402            if let Err(e) = client.subscribe_book_l2_tbt(instrument_id).await {
403                log::error!("Failed to subscribe to books_l2_tbt: {e}");
404            }
405            Ok(())
406        })
407    }
408
409    #[pyo3(name = "subscribe_book_with_depth")]
410    fn py_subscribe_book_with_depth<'py>(
411        &self,
412        py: Python<'py>,
413        instrument_id: InstrumentId,
414        depth: u16,
415    ) -> PyResult<Bound<'py, PyAny>> {
416        let client = self.clone();
417
418        pyo3_async_runtimes::tokio::future_into_py(py, async move {
419            client
420                .subscribe_book_with_depth(instrument_id, depth)
421                .await
422                .map_err(to_pyvalue_err)
423        })
424    }
425
426    #[pyo3(name = "subscribe_book_depth5")]
427    fn py_subscribe_book_depth5<'py>(
428        &self,
429        py: Python<'py>,
430        instrument_id: InstrumentId,
431    ) -> PyResult<Bound<'py, PyAny>> {
432        let client = self.clone();
433
434        pyo3_async_runtimes::tokio::future_into_py(py, async move {
435            if let Err(e) = client.subscribe_book_depth5(instrument_id).await {
436                log::error!("Failed to subscribe to books5: {e}");
437            }
438            Ok(())
439        })
440    }
441
442    #[pyo3(name = "subscribe_quotes")]
443    fn py_subscribe_quotes<'py>(
444        &self,
445        py: Python<'py>,
446        instrument_id: InstrumentId,
447    ) -> PyResult<Bound<'py, PyAny>> {
448        let client = self.clone();
449
450        pyo3_async_runtimes::tokio::future_into_py(py, async move {
451            if let Err(e) = client.subscribe_quotes(instrument_id).await {
452                log::error!("Failed to subscribe to quotes: {e}");
453            }
454            Ok(())
455        })
456    }
457
458    #[pyo3(name = "subscribe_trades")]
459    fn py_subscribe_trades<'py>(
460        &self,
461        py: Python<'py>,
462        instrument_id: InstrumentId,
463        aggregated: bool,
464    ) -> PyResult<Bound<'py, PyAny>> {
465        let client = self.clone();
466
467        pyo3_async_runtimes::tokio::future_into_py(py, async move {
468            if let Err(e) = client.subscribe_trades(instrument_id, aggregated).await {
469                log::error!("Failed to subscribe to trades: {e}");
470            }
471            Ok(())
472        })
473    }
474
475    #[pyo3(name = "subscribe_bars")]
476    fn py_subscribe_bars<'py>(
477        &self,
478        py: Python<'py>,
479        bar_type: BarType,
480    ) -> PyResult<Bound<'py, PyAny>> {
481        let client = self.clone();
482
483        pyo3_async_runtimes::tokio::future_into_py(py, async move {
484            if let Err(e) = client.subscribe_bars(bar_type).await {
485                log::error!("Failed to subscribe to bars: {e}");
486            }
487            Ok(())
488        })
489    }
490
491    #[pyo3(name = "unsubscribe_book")]
492    fn py_unsubscribe_book<'py>(
493        &self,
494        py: Python<'py>,
495        instrument_id: InstrumentId,
496    ) -> PyResult<Bound<'py, PyAny>> {
497        let client = self.clone();
498
499        pyo3_async_runtimes::tokio::future_into_py(py, async move {
500            if let Err(e) = client.unsubscribe_book(instrument_id).await {
501                log::error!("Failed to unsubscribe from order book: {e}");
502            }
503            Ok(())
504        })
505    }
506
507    #[pyo3(name = "unsubscribe_book_depth5")]
508    fn py_unsubscribe_book_depth5<'py>(
509        &self,
510        py: Python<'py>,
511        instrument_id: InstrumentId,
512    ) -> PyResult<Bound<'py, PyAny>> {
513        let client = self.clone();
514
515        pyo3_async_runtimes::tokio::future_into_py(py, async move {
516            if let Err(e) = client.unsubscribe_book_depth5(instrument_id).await {
517                log::error!("Failed to unsubscribe from books5: {e}");
518            }
519            Ok(())
520        })
521    }
522
523    #[pyo3(name = "unsubscribe_book50_l2_tbt")]
524    fn py_unsubscribe_book50_l2_tbt<'py>(
525        &self,
526        py: Python<'py>,
527        instrument_id: InstrumentId,
528    ) -> PyResult<Bound<'py, PyAny>> {
529        let client = self.clone();
530
531        pyo3_async_runtimes::tokio::future_into_py(py, async move {
532            if let Err(e) = client.unsubscribe_book50_l2_tbt(instrument_id).await {
533                log::error!("Failed to unsubscribe from books50_l2_tbt: {e}");
534            }
535            Ok(())
536        })
537    }
538
539    #[pyo3(name = "unsubscribe_book_l2_tbt")]
540    fn py_unsubscribe_book_l2_tbt<'py>(
541        &self,
542        py: Python<'py>,
543        instrument_id: InstrumentId,
544    ) -> PyResult<Bound<'py, PyAny>> {
545        let client = self.clone();
546
547        pyo3_async_runtimes::tokio::future_into_py(py, async move {
548            if let Err(e) = client.unsubscribe_book_l2_tbt(instrument_id).await {
549                log::error!("Failed to unsubscribe from books_l2_tbt: {e}");
550            }
551            Ok(())
552        })
553    }
554
555    #[pyo3(name = "unsubscribe_quotes")]
556    fn py_unsubscribe_quotes<'py>(
557        &self,
558        py: Python<'py>,
559        instrument_id: InstrumentId,
560    ) -> PyResult<Bound<'py, PyAny>> {
561        let client = self.clone();
562
563        pyo3_async_runtimes::tokio::future_into_py(py, async move {
564            if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
565                log::error!("Failed to unsubscribe from quotes: {e}");
566            }
567            Ok(())
568        })
569    }
570
571    #[pyo3(name = "unsubscribe_trades")]
572    fn py_unsubscribe_trades<'py>(
573        &self,
574        py: Python<'py>,
575        instrument_id: InstrumentId,
576        aggregated: bool,
577    ) -> PyResult<Bound<'py, PyAny>> {
578        let client = self.clone();
579
580        pyo3_async_runtimes::tokio::future_into_py(py, async move {
581            if let Err(e) = client.unsubscribe_trades(instrument_id, aggregated).await {
582                log::error!("Failed to unsubscribe from trades: {e}");
583            }
584            Ok(())
585        })
586    }
587
588    #[pyo3(name = "unsubscribe_bars")]
589    fn py_unsubscribe_bars<'py>(
590        &self,
591        py: Python<'py>,
592        bar_type: BarType,
593    ) -> PyResult<Bound<'py, PyAny>> {
594        let client = self.clone();
595
596        pyo3_async_runtimes::tokio::future_into_py(py, async move {
597            if let Err(e) = client.unsubscribe_bars(bar_type).await {
598                log::error!("Failed to unsubscribe from bars: {e}");
599            }
600            Ok(())
601        })
602    }
603
604    #[pyo3(name = "subscribe_ticker")]
605    fn py_subscribe_ticker<'py>(
606        &self,
607        py: Python<'py>,
608        instrument_id: InstrumentId,
609    ) -> PyResult<Bound<'py, PyAny>> {
610        let client = self.clone();
611
612        pyo3_async_runtimes::tokio::future_into_py(py, async move {
613            if let Err(e) = client.subscribe_ticker(instrument_id).await {
614                log::error!("Failed to subscribe to ticker: {e}");
615            }
616            Ok(())
617        })
618    }
619
620    #[pyo3(name = "unsubscribe_ticker")]
621    fn py_unsubscribe_ticker<'py>(
622        &self,
623        py: Python<'py>,
624        instrument_id: InstrumentId,
625    ) -> PyResult<Bound<'py, PyAny>> {
626        let client = self.clone();
627
628        pyo3_async_runtimes::tokio::future_into_py(py, async move {
629            if let Err(e) = client.unsubscribe_ticker(instrument_id).await {
630                log::error!("Failed to unsubscribe from ticker: {e}");
631            }
632            Ok(())
633        })
634    }
635
636    #[pyo3(name = "subscribe_mark_prices")]
637    fn py_subscribe_mark_prices<'py>(
638        &self,
639        py: Python<'py>,
640        instrument_id: InstrumentId,
641    ) -> PyResult<Bound<'py, PyAny>> {
642        let client = self.clone();
643
644        pyo3_async_runtimes::tokio::future_into_py(py, async move {
645            if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
646                log::error!("Failed to subscribe to mark prices: {e}");
647            }
648            Ok(())
649        })
650    }
651
652    #[pyo3(name = "unsubscribe_mark_prices")]
653    fn py_unsubscribe_mark_prices<'py>(
654        &self,
655        py: Python<'py>,
656        instrument_id: InstrumentId,
657    ) -> PyResult<Bound<'py, PyAny>> {
658        let client = self.clone();
659
660        pyo3_async_runtimes::tokio::future_into_py(py, async move {
661            if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
662                log::error!("Failed to unsubscribe from mark prices: {e}");
663            }
664            Ok(())
665        })
666    }
667
668    #[pyo3(name = "subscribe_index_prices")]
669    fn py_subscribe_index_prices<'py>(
670        &self,
671        py: Python<'py>,
672        instrument_id: InstrumentId,
673    ) -> PyResult<Bound<'py, PyAny>> {
674        let client = self.clone();
675
676        pyo3_async_runtimes::tokio::future_into_py(py, async move {
677            if let Err(e) = client.subscribe_index_prices(instrument_id).await {
678                log::error!("Failed to subscribe to index prices: {e}");
679            }
680            Ok(())
681        })
682    }
683
684    #[pyo3(name = "unsubscribe_index_prices")]
685    fn py_unsubscribe_index_prices<'py>(
686        &self,
687        py: Python<'py>,
688        instrument_id: InstrumentId,
689    ) -> PyResult<Bound<'py, PyAny>> {
690        let client = self.clone();
691
692        pyo3_async_runtimes::tokio::future_into_py(py, async move {
693            if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
694                log::error!("Failed to unsubscribe from index prices: {e}");
695            }
696            Ok(())
697        })
698    }
699
700    #[pyo3(name = "subscribe_funding_rates")]
701    fn py_subscribe_funding_rates<'py>(
702        &self,
703        py: Python<'py>,
704        instrument_id: InstrumentId,
705    ) -> PyResult<Bound<'py, PyAny>> {
706        let client = self.clone();
707
708        pyo3_async_runtimes::tokio::future_into_py(py, async move {
709            if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
710                log::error!("Failed to subscribe to funding rates: {e}");
711            }
712            Ok(())
713        })
714    }
715
716    #[pyo3(name = "unsubscribe_funding_rates")]
717    fn py_unsubscribe_funding_rates<'py>(
718        &self,
719        py: Python<'py>,
720        instrument_id: InstrumentId,
721    ) -> PyResult<Bound<'py, PyAny>> {
722        let client = self.clone();
723
724        pyo3_async_runtimes::tokio::future_into_py(py, async move {
725            if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
726                log::error!("Failed to unsubscribe from funding rates: {e}");
727            }
728            Ok(())
729        })
730    }
731
732    #[pyo3(name = "subscribe_orders")]
733    fn py_subscribe_orders<'py>(
734        &self,
735        py: Python<'py>,
736        instrument_type: OKXInstrumentType,
737    ) -> PyResult<Bound<'py, PyAny>> {
738        let client = self.clone();
739
740        pyo3_async_runtimes::tokio::future_into_py(py, async move {
741            if let Err(e) = client.subscribe_orders(instrument_type).await {
742                log::error!("Failed to subscribe to orders '{instrument_type}': {e}");
743            }
744            Ok(())
745        })
746    }
747
748    #[pyo3(name = "unsubscribe_orders")]
749    fn py_unsubscribe_orders<'py>(
750        &self,
751        py: Python<'py>,
752        instrument_type: OKXInstrumentType,
753    ) -> PyResult<Bound<'py, PyAny>> {
754        let client = self.clone();
755
756        pyo3_async_runtimes::tokio::future_into_py(py, async move {
757            if let Err(e) = client.unsubscribe_orders(instrument_type).await {
758                log::error!("Failed to unsubscribe from orders '{instrument_type}': {e}");
759            }
760            Ok(())
761        })
762    }
763
764    #[pyo3(name = "subscribe_orders_algo")]
765    fn py_subscribe_orders_algo<'py>(
766        &self,
767        py: Python<'py>,
768        instrument_type: OKXInstrumentType,
769    ) -> PyResult<Bound<'py, PyAny>> {
770        let client = self.clone();
771
772        pyo3_async_runtimes::tokio::future_into_py(py, async move {
773            if let Err(e) = client.subscribe_orders_algo(instrument_type).await {
774                log::error!("Failed to subscribe to algo orders '{instrument_type}': {e}");
775            }
776            Ok(())
777        })
778    }
779
780    #[pyo3(name = "unsubscribe_orders_algo")]
781    fn py_unsubscribe_orders_algo<'py>(
782        &self,
783        py: Python<'py>,
784        instrument_type: OKXInstrumentType,
785    ) -> PyResult<Bound<'py, PyAny>> {
786        let client = self.clone();
787
788        pyo3_async_runtimes::tokio::future_into_py(py, async move {
789            if let Err(e) = client.unsubscribe_orders_algo(instrument_type).await {
790                log::error!("Failed to unsubscribe from algo orders '{instrument_type}': {e}");
791            }
792            Ok(())
793        })
794    }
795
796    #[pyo3(name = "subscribe_fills")]
797    fn py_subscribe_fills<'py>(
798        &self,
799        py: Python<'py>,
800        instrument_type: OKXInstrumentType,
801    ) -> PyResult<Bound<'py, PyAny>> {
802        let client = self.clone();
803
804        pyo3_async_runtimes::tokio::future_into_py(py, async move {
805            if let Err(e) = client.subscribe_fills(instrument_type).await {
806                log::error!("Failed to subscribe to fills '{instrument_type}': {e}");
807            }
808            Ok(())
809        })
810    }
811
812    #[pyo3(name = "unsubscribe_fills")]
813    fn py_unsubscribe_fills<'py>(
814        &self,
815        py: Python<'py>,
816        instrument_type: OKXInstrumentType,
817    ) -> PyResult<Bound<'py, PyAny>> {
818        let client = self.clone();
819
820        pyo3_async_runtimes::tokio::future_into_py(py, async move {
821            if let Err(e) = client.unsubscribe_fills(instrument_type).await {
822                log::error!("Failed to unsubscribe from fills '{instrument_type}': {e}");
823            }
824            Ok(())
825        })
826    }
827
828    #[pyo3(name = "subscribe_account")]
829    fn py_subscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
830        let client = self.clone();
831
832        pyo3_async_runtimes::tokio::future_into_py(py, async move {
833            if let Err(e) = client.subscribe_account().await {
834                log::error!("Failed to subscribe to account: {e}");
835            }
836            Ok(())
837        })
838    }
839
840    #[pyo3(name = "unsubscribe_account")]
841    fn py_unsubscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
842        let client = self.clone();
843
844        pyo3_async_runtimes::tokio::future_into_py(py, async move {
845            if let Err(e) = client.unsubscribe_account().await {
846                log::error!("Failed to unsubscribe from account: {e}");
847            }
848            Ok(())
849        })
850    }
851
852    #[pyo3(name = "submit_order")]
853    #[pyo3(signature = (
854        trader_id,
855        strategy_id,
856        instrument_id,
857        td_mode,
858        client_order_id,
859        order_side,
860        order_type,
861        quantity,
862        time_in_force=None,
863        price=None,
864        trigger_price=None,
865        post_only=None,
866        reduce_only=None,
867        quote_quantity=None,
868        position_side=None,
869    ))]
870    #[allow(clippy::too_many_arguments)]
871    fn py_submit_order<'py>(
872        &self,
873        py: Python<'py>,
874        trader_id: TraderId,
875        strategy_id: StrategyId,
876        instrument_id: InstrumentId,
877        td_mode: OKXTradeMode,
878        client_order_id: ClientOrderId,
879        order_side: OrderSide,
880        order_type: OrderType,
881        quantity: Quantity,
882        time_in_force: Option<TimeInForce>,
883        price: Option<Price>,
884        trigger_price: Option<Price>,
885        post_only: Option<bool>,
886        reduce_only: Option<bool>,
887        quote_quantity: Option<bool>,
888        position_side: Option<PositionSide>,
889    ) -> PyResult<Bound<'py, PyAny>> {
890        let client = self.clone();
891
892        pyo3_async_runtimes::tokio::future_into_py(py, async move {
893            client
894                .submit_order(
895                    trader_id,
896                    strategy_id,
897                    instrument_id,
898                    td_mode,
899                    client_order_id,
900                    order_side,
901                    order_type,
902                    quantity,
903                    time_in_force,
904                    price,
905                    trigger_price,
906                    post_only,
907                    reduce_only,
908                    quote_quantity,
909                    position_side,
910                )
911                .await
912                .map_err(to_pyvalue_err)
913        })
914    }
915
916    #[pyo3(name = "cancel_order", signature = (
917        trader_id,
918        strategy_id,
919        instrument_id,
920        client_order_id=None,
921        venue_order_id=None,
922    ))]
923    #[allow(clippy::too_many_arguments)]
924    fn py_cancel_order<'py>(
925        &self,
926        py: Python<'py>,
927        trader_id: TraderId,
928        strategy_id: StrategyId,
929        instrument_id: InstrumentId,
930        client_order_id: Option<ClientOrderId>,
931        venue_order_id: Option<VenueOrderId>,
932    ) -> PyResult<Bound<'py, PyAny>> {
933        let client = self.clone();
934
935        pyo3_async_runtimes::tokio::future_into_py(py, async move {
936            client
937                .cancel_order(
938                    trader_id,
939                    strategy_id,
940                    instrument_id,
941                    client_order_id,
942                    venue_order_id,
943                )
944                .await
945                .map_err(to_pyvalue_err)
946        })
947    }
948
949    #[pyo3(name = "modify_order")]
950    #[pyo3(signature = (
951        trader_id,
952        strategy_id,
953        instrument_id,
954        client_order_id=None,
955        venue_order_id=None,
956        price=None,
957        quantity=None,
958    ))]
959    #[allow(clippy::too_many_arguments)]
960    fn py_modify_order<'py>(
961        &self,
962        py: Python<'py>,
963        trader_id: TraderId,
964        strategy_id: StrategyId,
965        instrument_id: InstrumentId,
966        client_order_id: Option<ClientOrderId>,
967        venue_order_id: Option<VenueOrderId>,
968        price: Option<Price>,
969        quantity: Option<Quantity>,
970    ) -> PyResult<Bound<'py, PyAny>> {
971        let client = self.clone();
972
973        pyo3_async_runtimes::tokio::future_into_py(py, async move {
974            client
975                .modify_order(
976                    trader_id,
977                    strategy_id,
978                    instrument_id,
979                    client_order_id,
980                    price,
981                    quantity,
982                    venue_order_id,
983                )
984                .await
985                .map_err(to_pyvalue_err)
986        })
987    }
988
989    #[allow(clippy::type_complexity)]
990    #[pyo3(name = "batch_submit_orders")]
991    fn py_batch_submit_orders<'py>(
992        &self,
993        py: Python<'py>,
994        orders: Vec<Py<PyAny>>,
995    ) -> PyResult<Bound<'py, PyAny>> {
996        let mut domain_orders = Vec::with_capacity(orders.len());
997
998        for obj in orders {
999            let (
1000                instrument_type,
1001                instrument_id,
1002                td_mode,
1003                client_order_id,
1004                order_side,
1005                order_type,
1006                quantity,
1007                position_side,
1008                price,
1009                trigger_price,
1010                post_only,
1011                reduce_only,
1012            ): (
1013                String,
1014                InstrumentId,
1015                String,
1016                ClientOrderId,
1017                OrderSide,
1018                OrderType,
1019                Quantity,
1020                Option<PositionSide>,
1021                Option<Price>,
1022                Option<Price>,
1023                Option<bool>,
1024                Option<bool>,
1025            ) = obj
1026                .extract(py)
1027                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
1028
1029            let inst_type =
1030                OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1031            let trade_mode = OKXTradeMode::from_str(&td_mode).map_err(to_pyvalue_err)?;
1032
1033            domain_orders.push((
1034                inst_type,
1035                instrument_id,
1036                trade_mode,
1037                client_order_id,
1038                order_side,
1039                position_side,
1040                order_type,
1041                quantity,
1042                price,
1043                trigger_price,
1044                post_only,
1045                reduce_only,
1046            ));
1047        }
1048
1049        let client = self.clone();
1050
1051        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1052            client
1053                .batch_submit_orders(domain_orders)
1054                .await
1055                .map_err(to_pyvalue_err)
1056        })
1057    }
1058
1059    /// Cancels multiple orders via WebSocket.
1060    #[pyo3(name = "batch_cancel_orders")]
1061    fn py_batch_cancel_orders<'py>(
1062        &self,
1063        py: Python<'py>,
1064        orders: Vec<Py<PyAny>>,
1065    ) -> PyResult<Bound<'py, PyAny>> {
1066        let mut domain_orders = Vec::with_capacity(orders.len());
1067
1068        for obj in orders {
1069            let (instrument_type, instrument_id, client_order_id, order_id): (
1070                String,
1071                InstrumentId,
1072                Option<ClientOrderId>,
1073                Option<String>,
1074            ) = obj
1075                .extract(py)
1076                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
1077            let inst_type =
1078                OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1079            domain_orders.push((inst_type, instrument_id, client_order_id, order_id));
1080        }
1081
1082        let client = self.clone();
1083
1084        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1085            client
1086                .batch_cancel_orders(domain_orders)
1087                .await
1088                .map_err(to_pyvalue_err)
1089        })
1090    }
1091
1092    #[pyo3(name = "batch_modify_orders")]
1093    fn py_batch_modify_orders<'py>(
1094        &self,
1095        py: Python<'py>,
1096        orders: Vec<Py<PyAny>>,
1097    ) -> PyResult<Bound<'py, PyAny>> {
1098        let mut domain_orders = Vec::with_capacity(orders.len());
1099
1100        for obj in orders {
1101            let (
1102                instrument_type,
1103                instrument_id,
1104                client_order_id,
1105                new_client_order_id,
1106                price,
1107                quantity,
1108            ): (
1109                String,
1110                InstrumentId,
1111                ClientOrderId,
1112                ClientOrderId,
1113                Option<Price>,
1114                Option<Quantity>,
1115            ) = obj
1116                .extract(py)
1117                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
1118            let inst_type =
1119                OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1120            domain_orders.push((
1121                inst_type,
1122                instrument_id,
1123                client_order_id,
1124                new_client_order_id,
1125                price,
1126                quantity,
1127            ));
1128        }
1129
1130        let client = self.clone();
1131
1132        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1133            client
1134                .batch_modify_orders(domain_orders)
1135                .await
1136                .map_err(to_pyvalue_err)
1137        })
1138    }
1139
1140    #[pyo3(name = "mass_cancel_orders")]
1141    fn py_mass_cancel_orders<'py>(
1142        &self,
1143        py: Python<'py>,
1144        instrument_id: InstrumentId,
1145    ) -> PyResult<Bound<'py, PyAny>> {
1146        let client = self.clone();
1147
1148        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1149            client
1150                .mass_cancel_orders(instrument_id)
1151                .await
1152                .map_err(to_pyvalue_err)
1153        })
1154    }
1155}
1156
1157pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
1158    if let Err(e) = callback.call1(py, (py_obj,)) {
1159        tracing::error!("Error calling Python: {e}");
1160    }
1161}
1162
1163fn call_python_with_data<F>(callback: &Py<PyAny>, data_converter: F)
1164where
1165    F: FnOnce(Python) -> PyResult<Py<PyAny>>,
1166{
1167    Python::attach(|py| match data_converter(py) {
1168        Ok(py_obj) => call_python(py, callback, py_obj),
1169        Err(e) => tracing::error!("Failed to convert data to Python object: {e}"),
1170    });
1171}