1use std::str::FromStr;
45
46use futures_util::StreamExt;
47use nautilus_common::live::get_runtime;
48use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
49use nautilus_model::{
50 data::{BarType, Data, OrderBookDeltas_API},
51 enums::{OrderSide, OrderType, PositionSide, TimeInForce},
52 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
53 python::{
54 data::data_to_pycapsule,
55 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
56 },
57 types::{Price, Quantity},
58};
59use pyo3::{IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
60
61use crate::{
62 common::enums::{OKXInstrumentType, OKXTradeMode, OKXVipLevel},
63 websocket::{
64 OKXWebSocketClient,
65 messages::{ExecutionReport, NautilusWsMessage, OKXWebSocketError},
66 },
67};
68
69#[pyo3::pymethods]
70impl OKXWebSocketError {
71 #[getter]
72 pub fn code(&self) -> &str {
73 &self.code
74 }
75
76 #[getter]
77 pub fn message(&self) -> &str {
78 &self.message
79 }
80
81 #[getter]
82 pub fn conn_id(&self) -> Option<&str> {
83 self.conn_id.as_deref()
84 }
85
86 #[getter]
87 pub fn ts_event(&self) -> u64 {
88 self.timestamp
89 }
90
91 fn __repr__(&self) -> String {
92 format!(
93 "OKXWebSocketError(code='{}', message='{}', conn_id={:?}, ts_event={})",
94 self.code, self.message, self.conn_id, self.timestamp
95 )
96 }
97}
98
99#[pymethods]
100impl OKXWebSocketClient {
101 #[new]
102 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
103 fn py_new(
104 url: Option<String>,
105 api_key: Option<String>,
106 api_secret: Option<String>,
107 api_passphrase: Option<String>,
108 account_id: Option<AccountId>,
109 heartbeat: Option<u64>,
110 ) -> PyResult<Self> {
111 Self::new(
112 url,
113 api_key,
114 api_secret,
115 api_passphrase,
116 account_id,
117 heartbeat,
118 )
119 .map_err(to_pyvalue_err)
120 }
121
122 #[staticmethod]
123 #[pyo3(name = "with_credentials")]
124 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
125 fn py_with_credentials(
126 url: Option<String>,
127 api_key: Option<String>,
128 api_secret: Option<String>,
129 api_passphrase: Option<String>,
130 account_id: Option<AccountId>,
131 heartbeat: Option<u64>,
132 ) -> PyResult<Self> {
133 Self::with_credentials(
134 url,
135 api_key,
136 api_secret,
137 api_passphrase,
138 account_id,
139 heartbeat,
140 )
141 .map_err(to_pyvalue_err)
142 }
143
144 #[staticmethod]
145 #[pyo3(name = "from_env")]
146 fn py_from_env() -> PyResult<Self> {
147 Self::from_env().map_err(to_pyvalue_err)
148 }
149
150 #[getter]
151 #[pyo3(name = "url")]
152 #[must_use]
153 pub fn py_url(&self) -> &str {
154 self.url()
155 }
156
157 #[getter]
158 #[pyo3(name = "api_key")]
159 #[must_use]
160 pub fn py_api_key(&self) -> Option<&str> {
161 self.api_key()
162 }
163
164 #[getter]
165 #[pyo3(name = "api_key_masked")]
166 #[must_use]
167 pub fn py_api_key_masked(&self) -> Option<String> {
168 self.api_key_masked()
169 }
170
171 #[pyo3(name = "is_active")]
172 fn py_is_active(&mut self) -> bool {
173 self.is_active()
174 }
175
176 #[pyo3(name = "is_closed")]
177 fn py_is_closed(&mut self) -> bool {
178 self.is_closed()
179 }
180
181 #[pyo3(name = "cancel_all_requests")]
182 pub fn py_cancel_all_requests(&self) {
183 self.cancel_all_requests();
184 }
185
186 #[pyo3(name = "get_subscriptions")]
187 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
188 let channels = self.get_subscriptions(instrument_id);
189
190 channels
192 .iter()
193 .map(|c| {
194 serde_json::to_value(c)
195 .ok()
196 .and_then(|v| v.as_str().map(String::from))
197 .unwrap_or_else(|| c.to_string())
198 })
199 .collect()
200 }
201
202 #[pyo3(name = "set_vip_level")]
206 fn py_set_vip_level(&self, vip_level: OKXVipLevel) {
207 self.set_vip_level(vip_level);
208 }
209
210 #[pyo3(name = "vip_level")]
212 #[getter]
213 fn py_vip_level(&self) -> OKXVipLevel {
214 self.vip_level()
215 }
216
217 #[pyo3(name = "connect")]
218 fn py_connect<'py>(
219 &mut self,
220 py: Python<'py>,
221 instruments: Vec<Py<PyAny>>,
222 callback: Py<PyAny>,
223 ) -> PyResult<Bound<'py, PyAny>> {
224 let mut instruments_any = Vec::new();
225 for inst in instruments {
226 let inst_any = pyobject_to_instrument_any(py, inst)?;
227 instruments_any.push(inst_any);
228 }
229
230 self.cache_instruments(instruments_any);
231
232 let mut client = self.clone();
233
234 pyo3_async_runtimes::tokio::future_into_py(py, async move {
235 client.connect().await.map_err(to_pyruntime_err)?;
236
237 let stream = client.stream();
238
239 get_runtime().spawn(async move {
241 let _client = client;
242 tokio::pin!(stream);
243
244 while let Some(msg) = stream.next().await {
245 match msg {
246 NautilusWsMessage::Instrument(msg) => {
247 call_python_with_data(&callback, |py| {
248 instrument_any_to_pyobject(py, *msg)
249 });
250 }
251 NautilusWsMessage::Data(msg) => Python::attach(|py| {
252 for data in msg {
253 let py_obj = data_to_pycapsule(py, data);
254 call_python(py, &callback, py_obj);
255 }
256 }),
257 NautilusWsMessage::FundingRates(msg) => {
258 for data in msg {
259 call_python_with_data(&callback, |py| data.into_py_any(py));
260 }
261 }
262 NautilusWsMessage::OrderAccepted(msg) => {
263 call_python_with_data(&callback, |py| msg.into_py_any(py));
264 }
265 NautilusWsMessage::OrderCanceled(msg) => {
266 call_python_with_data(&callback, |py| msg.into_py_any(py));
267 }
268 NautilusWsMessage::OrderExpired(msg) => {
269 call_python_with_data(&callback, |py| msg.into_py_any(py));
270 }
271 NautilusWsMessage::OrderRejected(msg) => {
272 call_python_with_data(&callback, |py| msg.into_py_any(py));
273 }
274 NautilusWsMessage::OrderCancelRejected(msg) => {
275 call_python_with_data(&callback, |py| msg.into_py_any(py));
276 }
277 NautilusWsMessage::OrderModifyRejected(msg) => {
278 call_python_with_data(&callback, |py| msg.into_py_any(py));
279 }
280 NautilusWsMessage::OrderTriggered(msg) => {
281 call_python_with_data(&callback, |py| msg.into_py_any(py));
282 }
283 NautilusWsMessage::OrderUpdated(msg) => {
284 call_python_with_data(&callback, |py| msg.into_py_any(py));
285 }
286 NautilusWsMessage::ExecutionReports(msg) => {
287 for report in msg {
288 match report {
289 ExecutionReport::Order(report) => {
290 call_python_with_data(&callback, |py| {
291 report.into_py_any(py)
292 });
293 }
294 ExecutionReport::Fill(report) => {
295 call_python_with_data(&callback, |py| {
296 report.into_py_any(py)
297 });
298 }
299 };
300 }
301 }
302 NautilusWsMessage::Deltas(msg) => Python::attach(|py| {
303 let py_obj =
304 data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(msg)));
305 call_python(py, &callback, py_obj);
306 }),
307 NautilusWsMessage::AccountUpdate(msg) => {
308 call_python_with_data(&callback, |py| msg.into_py_any(py));
309 }
310 NautilusWsMessage::PositionUpdate(msg) => {
311 call_python_with_data(&callback, |py| msg.into_py_any(py));
312 }
313 NautilusWsMessage::Reconnected => {} NautilusWsMessage::Authenticated => {} NautilusWsMessage::Error(msg) => {
316 call_python_with_data(&callback, |py| msg.into_py_any(py));
317 }
318 NautilusWsMessage::Raw(msg) => {
319 tracing::debug!("Received raw message, skipping: {msg}");
320 }
321 }
322 }
323 });
324
325 Ok(())
326 })
327 }
328
329 #[pyo3(name = "wait_until_active")]
330 fn py_wait_until_active<'py>(
331 &self,
332 py: Python<'py>,
333 timeout_secs: f64,
334 ) -> PyResult<Bound<'py, PyAny>> {
335 let client = self.clone();
336
337 pyo3_async_runtimes::tokio::future_into_py(py, async move {
338 client
339 .wait_until_active(timeout_secs)
340 .await
341 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
342 Ok(())
343 })
344 }
345
346 #[pyo3(name = "close")]
347 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
348 let mut client = self.clone();
349
350 pyo3_async_runtimes::tokio::future_into_py(py, async move {
351 if let Err(e) = client.close().await {
352 log::error!("Error on close: {e}");
353 }
354 Ok(())
355 })
356 }
357
358 #[pyo3(name = "subscribe_instruments")]
359 fn py_subscribe_instruments<'py>(
360 &self,
361 py: Python<'py>,
362 instrument_type: OKXInstrumentType,
363 ) -> PyResult<Bound<'py, PyAny>> {
364 let client = self.clone();
365
366 pyo3_async_runtimes::tokio::future_into_py(py, async move {
367 if let Err(e) = client.subscribe_instruments(instrument_type).await {
368 log::error!("Failed to subscribe to instruments '{instrument_type}': {e}");
369 }
370 Ok(())
371 })
372 }
373
374 #[pyo3(name = "subscribe_instrument")]
375 fn py_subscribe_instrument<'py>(
376 &self,
377 py: Python<'py>,
378 instrument_id: InstrumentId,
379 ) -> PyResult<Bound<'py, PyAny>> {
380 let client = self.clone();
381
382 pyo3_async_runtimes::tokio::future_into_py(py, async move {
383 if let Err(e) = client.subscribe_instrument(instrument_id).await {
384 log::error!("Failed to subscribe to instrument {instrument_id}: {e}");
385 }
386 Ok(())
387 })
388 }
389
390 #[pyo3(name = "subscribe_book")]
391 fn py_subscribe_book<'py>(
392 &self,
393 py: Python<'py>,
394 instrument_id: InstrumentId,
395 ) -> PyResult<Bound<'py, PyAny>> {
396 let client = self.clone();
397
398 pyo3_async_runtimes::tokio::future_into_py(py, async move {
399 client
400 .subscribe_book(instrument_id)
401 .await
402 .map_err(to_pyvalue_err)
403 })
404 }
405
406 #[pyo3(name = "subscribe_book50_l2_tbt")]
407 fn py_subscribe_book50_l2_tbt<'py>(
408 &self,
409 py: Python<'py>,
410 instrument_id: InstrumentId,
411 ) -> PyResult<Bound<'py, PyAny>> {
412 let client = self.clone();
413
414 pyo3_async_runtimes::tokio::future_into_py(py, async move {
415 if let Err(e) = client.subscribe_book50_l2_tbt(instrument_id).await {
416 log::error!("Failed to subscribe to book50_tbt: {e}");
417 }
418 Ok(())
419 })
420 }
421
422 #[pyo3(name = "subscribe_book_l2_tbt")]
423 fn py_subscribe_book_l2_tbt<'py>(
424 &self,
425 py: Python<'py>,
426 instrument_id: InstrumentId,
427 ) -> PyResult<Bound<'py, PyAny>> {
428 let client = self.clone();
429
430 pyo3_async_runtimes::tokio::future_into_py(py, async move {
431 if let Err(e) = client.subscribe_book_l2_tbt(instrument_id).await {
432 log::error!("Failed to subscribe to books_l2_tbt: {e}");
433 }
434 Ok(())
435 })
436 }
437
438 #[pyo3(name = "subscribe_book_with_depth")]
439 fn py_subscribe_book_with_depth<'py>(
440 &self,
441 py: Python<'py>,
442 instrument_id: InstrumentId,
443 depth: u16,
444 ) -> PyResult<Bound<'py, PyAny>> {
445 let client = self.clone();
446
447 pyo3_async_runtimes::tokio::future_into_py(py, async move {
448 client
449 .subscribe_book_with_depth(instrument_id, depth)
450 .await
451 .map_err(to_pyvalue_err)
452 })
453 }
454
455 #[pyo3(name = "subscribe_book_depth5")]
456 fn py_subscribe_book_depth5<'py>(
457 &self,
458 py: Python<'py>,
459 instrument_id: InstrumentId,
460 ) -> PyResult<Bound<'py, PyAny>> {
461 let client = self.clone();
462
463 pyo3_async_runtimes::tokio::future_into_py(py, async move {
464 if let Err(e) = client.subscribe_book_depth5(instrument_id).await {
465 log::error!("Failed to subscribe to books5: {e}");
466 }
467 Ok(())
468 })
469 }
470
471 #[pyo3(name = "subscribe_quotes")]
472 fn py_subscribe_quotes<'py>(
473 &self,
474 py: Python<'py>,
475 instrument_id: InstrumentId,
476 ) -> PyResult<Bound<'py, PyAny>> {
477 let client = self.clone();
478
479 pyo3_async_runtimes::tokio::future_into_py(py, async move {
480 if let Err(e) = client.subscribe_quotes(instrument_id).await {
481 log::error!("Failed to subscribe to quotes: {e}");
482 }
483 Ok(())
484 })
485 }
486
487 #[pyo3(name = "subscribe_trades")]
488 fn py_subscribe_trades<'py>(
489 &self,
490 py: Python<'py>,
491 instrument_id: InstrumentId,
492 aggregated: bool,
493 ) -> PyResult<Bound<'py, PyAny>> {
494 let client = self.clone();
495
496 pyo3_async_runtimes::tokio::future_into_py(py, async move {
497 if let Err(e) = client.subscribe_trades(instrument_id, aggregated).await {
498 log::error!("Failed to subscribe to trades: {e}");
499 }
500 Ok(())
501 })
502 }
503
504 #[pyo3(name = "subscribe_bars")]
505 fn py_subscribe_bars<'py>(
506 &self,
507 py: Python<'py>,
508 bar_type: BarType,
509 ) -> PyResult<Bound<'py, PyAny>> {
510 let client = self.clone();
511
512 pyo3_async_runtimes::tokio::future_into_py(py, async move {
513 if let Err(e) = client.subscribe_bars(bar_type).await {
514 log::error!("Failed to subscribe to bars: {e}");
515 }
516 Ok(())
517 })
518 }
519
520 #[pyo3(name = "unsubscribe_book")]
521 fn py_unsubscribe_book<'py>(
522 &self,
523 py: Python<'py>,
524 instrument_id: InstrumentId,
525 ) -> PyResult<Bound<'py, PyAny>> {
526 let client = self.clone();
527
528 pyo3_async_runtimes::tokio::future_into_py(py, async move {
529 if let Err(e) = client.unsubscribe_book(instrument_id).await {
530 log::error!("Failed to unsubscribe from order book: {e}");
531 }
532 Ok(())
533 })
534 }
535
536 #[pyo3(name = "unsubscribe_book_depth5")]
537 fn py_unsubscribe_book_depth5<'py>(
538 &self,
539 py: Python<'py>,
540 instrument_id: InstrumentId,
541 ) -> PyResult<Bound<'py, PyAny>> {
542 let client = self.clone();
543
544 pyo3_async_runtimes::tokio::future_into_py(py, async move {
545 if let Err(e) = client.unsubscribe_book_depth5(instrument_id).await {
546 log::error!("Failed to unsubscribe from books5: {e}");
547 }
548 Ok(())
549 })
550 }
551
552 #[pyo3(name = "unsubscribe_book50_l2_tbt")]
553 fn py_unsubscribe_book50_l2_tbt<'py>(
554 &self,
555 py: Python<'py>,
556 instrument_id: InstrumentId,
557 ) -> PyResult<Bound<'py, PyAny>> {
558 let client = self.clone();
559
560 pyo3_async_runtimes::tokio::future_into_py(py, async move {
561 if let Err(e) = client.unsubscribe_book50_l2_tbt(instrument_id).await {
562 log::error!("Failed to unsubscribe from books50_l2_tbt: {e}");
563 }
564 Ok(())
565 })
566 }
567
568 #[pyo3(name = "unsubscribe_book_l2_tbt")]
569 fn py_unsubscribe_book_l2_tbt<'py>(
570 &self,
571 py: Python<'py>,
572 instrument_id: InstrumentId,
573 ) -> PyResult<Bound<'py, PyAny>> {
574 let client = self.clone();
575
576 pyo3_async_runtimes::tokio::future_into_py(py, async move {
577 if let Err(e) = client.unsubscribe_book_l2_tbt(instrument_id).await {
578 log::error!("Failed to unsubscribe from books_l2_tbt: {e}");
579 }
580 Ok(())
581 })
582 }
583
584 #[pyo3(name = "unsubscribe_quotes")]
585 fn py_unsubscribe_quotes<'py>(
586 &self,
587 py: Python<'py>,
588 instrument_id: InstrumentId,
589 ) -> PyResult<Bound<'py, PyAny>> {
590 let client = self.clone();
591
592 pyo3_async_runtimes::tokio::future_into_py(py, async move {
593 if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
594 log::error!("Failed to unsubscribe from quotes: {e}");
595 }
596 Ok(())
597 })
598 }
599
600 #[pyo3(name = "unsubscribe_trades")]
601 fn py_unsubscribe_trades<'py>(
602 &self,
603 py: Python<'py>,
604 instrument_id: InstrumentId,
605 aggregated: bool,
606 ) -> PyResult<Bound<'py, PyAny>> {
607 let client = self.clone();
608
609 pyo3_async_runtimes::tokio::future_into_py(py, async move {
610 if let Err(e) = client.unsubscribe_trades(instrument_id, aggregated).await {
611 log::error!("Failed to unsubscribe from trades: {e}");
612 }
613 Ok(())
614 })
615 }
616
617 #[pyo3(name = "unsubscribe_bars")]
618 fn py_unsubscribe_bars<'py>(
619 &self,
620 py: Python<'py>,
621 bar_type: BarType,
622 ) -> PyResult<Bound<'py, PyAny>> {
623 let client = self.clone();
624
625 pyo3_async_runtimes::tokio::future_into_py(py, async move {
626 if let Err(e) = client.unsubscribe_bars(bar_type).await {
627 log::error!("Failed to unsubscribe from bars: {e}");
628 }
629 Ok(())
630 })
631 }
632
633 #[pyo3(name = "subscribe_ticker")]
634 fn py_subscribe_ticker<'py>(
635 &self,
636 py: Python<'py>,
637 instrument_id: InstrumentId,
638 ) -> PyResult<Bound<'py, PyAny>> {
639 let client = self.clone();
640
641 pyo3_async_runtimes::tokio::future_into_py(py, async move {
642 if let Err(e) = client.subscribe_ticker(instrument_id).await {
643 log::error!("Failed to subscribe to ticker: {e}");
644 }
645 Ok(())
646 })
647 }
648
649 #[pyo3(name = "unsubscribe_ticker")]
650 fn py_unsubscribe_ticker<'py>(
651 &self,
652 py: Python<'py>,
653 instrument_id: InstrumentId,
654 ) -> PyResult<Bound<'py, PyAny>> {
655 let client = self.clone();
656
657 pyo3_async_runtimes::tokio::future_into_py(py, async move {
658 if let Err(e) = client.unsubscribe_ticker(instrument_id).await {
659 log::error!("Failed to unsubscribe from ticker: {e}");
660 }
661 Ok(())
662 })
663 }
664
665 #[pyo3(name = "subscribe_mark_prices")]
666 fn py_subscribe_mark_prices<'py>(
667 &self,
668 py: Python<'py>,
669 instrument_id: InstrumentId,
670 ) -> PyResult<Bound<'py, PyAny>> {
671 let client = self.clone();
672
673 pyo3_async_runtimes::tokio::future_into_py(py, async move {
674 if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
675 log::error!("Failed to subscribe to mark prices: {e}");
676 }
677 Ok(())
678 })
679 }
680
681 #[pyo3(name = "unsubscribe_mark_prices")]
682 fn py_unsubscribe_mark_prices<'py>(
683 &self,
684 py: Python<'py>,
685 instrument_id: InstrumentId,
686 ) -> PyResult<Bound<'py, PyAny>> {
687 let client = self.clone();
688
689 pyo3_async_runtimes::tokio::future_into_py(py, async move {
690 if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
691 log::error!("Failed to unsubscribe from mark prices: {e}");
692 }
693 Ok(())
694 })
695 }
696
697 #[pyo3(name = "subscribe_index_prices")]
698 fn py_subscribe_index_prices<'py>(
699 &self,
700 py: Python<'py>,
701 instrument_id: InstrumentId,
702 ) -> PyResult<Bound<'py, PyAny>> {
703 let client = self.clone();
704
705 pyo3_async_runtimes::tokio::future_into_py(py, async move {
706 if let Err(e) = client.subscribe_index_prices(instrument_id).await {
707 log::error!("Failed to subscribe to index prices: {e}");
708 }
709 Ok(())
710 })
711 }
712
713 #[pyo3(name = "unsubscribe_index_prices")]
714 fn py_unsubscribe_index_prices<'py>(
715 &self,
716 py: Python<'py>,
717 instrument_id: InstrumentId,
718 ) -> PyResult<Bound<'py, PyAny>> {
719 let client = self.clone();
720
721 pyo3_async_runtimes::tokio::future_into_py(py, async move {
722 if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
723 log::error!("Failed to unsubscribe from index prices: {e}");
724 }
725 Ok(())
726 })
727 }
728
729 #[pyo3(name = "subscribe_funding_rates")]
730 fn py_subscribe_funding_rates<'py>(
731 &self,
732 py: Python<'py>,
733 instrument_id: InstrumentId,
734 ) -> PyResult<Bound<'py, PyAny>> {
735 let client = self.clone();
736
737 pyo3_async_runtimes::tokio::future_into_py(py, async move {
738 if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
739 log::error!("Failed to subscribe to funding rates: {e}");
740 }
741 Ok(())
742 })
743 }
744
745 #[pyo3(name = "unsubscribe_funding_rates")]
746 fn py_unsubscribe_funding_rates<'py>(
747 &self,
748 py: Python<'py>,
749 instrument_id: InstrumentId,
750 ) -> PyResult<Bound<'py, PyAny>> {
751 let client = self.clone();
752
753 pyo3_async_runtimes::tokio::future_into_py(py, async move {
754 if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
755 log::error!("Failed to unsubscribe from funding rates: {e}");
756 }
757 Ok(())
758 })
759 }
760
761 #[pyo3(name = "subscribe_orders")]
762 fn py_subscribe_orders<'py>(
763 &self,
764 py: Python<'py>,
765 instrument_type: OKXInstrumentType,
766 ) -> PyResult<Bound<'py, PyAny>> {
767 let client = self.clone();
768
769 pyo3_async_runtimes::tokio::future_into_py(py, async move {
770 if let Err(e) = client.subscribe_orders(instrument_type).await {
771 log::error!("Failed to subscribe to orders '{instrument_type}': {e}");
772 }
773 Ok(())
774 })
775 }
776
777 #[pyo3(name = "unsubscribe_orders")]
778 fn py_unsubscribe_orders<'py>(
779 &self,
780 py: Python<'py>,
781 instrument_type: OKXInstrumentType,
782 ) -> PyResult<Bound<'py, PyAny>> {
783 let client = self.clone();
784
785 pyo3_async_runtimes::tokio::future_into_py(py, async move {
786 if let Err(e) = client.unsubscribe_orders(instrument_type).await {
787 log::error!("Failed to unsubscribe from orders '{instrument_type}': {e}");
788 }
789 Ok(())
790 })
791 }
792
793 #[pyo3(name = "subscribe_orders_algo")]
794 fn py_subscribe_orders_algo<'py>(
795 &self,
796 py: Python<'py>,
797 instrument_type: OKXInstrumentType,
798 ) -> PyResult<Bound<'py, PyAny>> {
799 let client = self.clone();
800
801 pyo3_async_runtimes::tokio::future_into_py(py, async move {
802 if let Err(e) = client.subscribe_orders_algo(instrument_type).await {
803 log::error!("Failed to subscribe to algo orders '{instrument_type}': {e}");
804 }
805 Ok(())
806 })
807 }
808
809 #[pyo3(name = "unsubscribe_orders_algo")]
810 fn py_unsubscribe_orders_algo<'py>(
811 &self,
812 py: Python<'py>,
813 instrument_type: OKXInstrumentType,
814 ) -> PyResult<Bound<'py, PyAny>> {
815 let client = self.clone();
816
817 pyo3_async_runtimes::tokio::future_into_py(py, async move {
818 if let Err(e) = client.unsubscribe_orders_algo(instrument_type).await {
819 log::error!("Failed to unsubscribe from algo orders '{instrument_type}': {e}");
820 }
821 Ok(())
822 })
823 }
824
825 #[pyo3(name = "subscribe_fills")]
826 fn py_subscribe_fills<'py>(
827 &self,
828 py: Python<'py>,
829 instrument_type: OKXInstrumentType,
830 ) -> PyResult<Bound<'py, PyAny>> {
831 let client = self.clone();
832
833 pyo3_async_runtimes::tokio::future_into_py(py, async move {
834 if let Err(e) = client.subscribe_fills(instrument_type).await {
835 log::error!("Failed to subscribe to fills '{instrument_type}': {e}");
836 }
837 Ok(())
838 })
839 }
840
841 #[pyo3(name = "unsubscribe_fills")]
842 fn py_unsubscribe_fills<'py>(
843 &self,
844 py: Python<'py>,
845 instrument_type: OKXInstrumentType,
846 ) -> PyResult<Bound<'py, PyAny>> {
847 let client = self.clone();
848
849 pyo3_async_runtimes::tokio::future_into_py(py, async move {
850 if let Err(e) = client.unsubscribe_fills(instrument_type).await {
851 log::error!("Failed to unsubscribe from fills '{instrument_type}': {e}");
852 }
853 Ok(())
854 })
855 }
856
857 #[pyo3(name = "subscribe_account")]
858 fn py_subscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
859 let client = self.clone();
860
861 pyo3_async_runtimes::tokio::future_into_py(py, async move {
862 if let Err(e) = client.subscribe_account().await {
863 log::error!("Failed to subscribe to account: {e}");
864 }
865 Ok(())
866 })
867 }
868
869 #[pyo3(name = "unsubscribe_account")]
870 fn py_unsubscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
871 let client = self.clone();
872
873 pyo3_async_runtimes::tokio::future_into_py(py, async move {
874 if let Err(e) = client.unsubscribe_account().await {
875 log::error!("Failed to unsubscribe from account: {e}");
876 }
877 Ok(())
878 })
879 }
880
881 #[pyo3(name = "submit_order")]
882 #[pyo3(signature = (
883 trader_id,
884 strategy_id,
885 instrument_id,
886 td_mode,
887 client_order_id,
888 order_side,
889 order_type,
890 quantity,
891 time_in_force=None,
892 price=None,
893 trigger_price=None,
894 post_only=None,
895 reduce_only=None,
896 quote_quantity=None,
897 position_side=None,
898 ))]
899 #[allow(clippy::too_many_arguments)]
900 fn py_submit_order<'py>(
901 &self,
902 py: Python<'py>,
903 trader_id: TraderId,
904 strategy_id: StrategyId,
905 instrument_id: InstrumentId,
906 td_mode: OKXTradeMode,
907 client_order_id: ClientOrderId,
908 order_side: OrderSide,
909 order_type: OrderType,
910 quantity: Quantity,
911 time_in_force: Option<TimeInForce>,
912 price: Option<Price>,
913 trigger_price: Option<Price>,
914 post_only: Option<bool>,
915 reduce_only: Option<bool>,
916 quote_quantity: Option<bool>,
917 position_side: Option<PositionSide>,
918 ) -> PyResult<Bound<'py, PyAny>> {
919 let client = self.clone();
920
921 pyo3_async_runtimes::tokio::future_into_py(py, async move {
922 client
923 .submit_order(
924 trader_id,
925 strategy_id,
926 instrument_id,
927 td_mode,
928 client_order_id,
929 order_side,
930 order_type,
931 quantity,
932 time_in_force,
933 price,
934 trigger_price,
935 post_only,
936 reduce_only,
937 quote_quantity,
938 position_side,
939 )
940 .await
941 .map_err(to_pyvalue_err)
942 })
943 }
944
945 #[pyo3(name = "cancel_order", signature = (
946 trader_id,
947 strategy_id,
948 instrument_id,
949 client_order_id=None,
950 venue_order_id=None,
951 ))]
952 #[allow(clippy::too_many_arguments)]
953 fn py_cancel_order<'py>(
954 &self,
955 py: Python<'py>,
956 trader_id: TraderId,
957 strategy_id: StrategyId,
958 instrument_id: InstrumentId,
959 client_order_id: Option<ClientOrderId>,
960 venue_order_id: Option<VenueOrderId>,
961 ) -> PyResult<Bound<'py, PyAny>> {
962 let client = self.clone();
963
964 pyo3_async_runtimes::tokio::future_into_py(py, async move {
965 client
966 .cancel_order(
967 trader_id,
968 strategy_id,
969 instrument_id,
970 client_order_id,
971 venue_order_id,
972 )
973 .await
974 .map_err(to_pyvalue_err)
975 })
976 }
977
978 #[pyo3(name = "modify_order")]
979 #[pyo3(signature = (
980 trader_id,
981 strategy_id,
982 instrument_id,
983 client_order_id=None,
984 venue_order_id=None,
985 price=None,
986 quantity=None,
987 ))]
988 #[allow(clippy::too_many_arguments)]
989 fn py_modify_order<'py>(
990 &self,
991 py: Python<'py>,
992 trader_id: TraderId,
993 strategy_id: StrategyId,
994 instrument_id: InstrumentId,
995 client_order_id: Option<ClientOrderId>,
996 venue_order_id: Option<VenueOrderId>,
997 price: Option<Price>,
998 quantity: Option<Quantity>,
999 ) -> PyResult<Bound<'py, PyAny>> {
1000 let client = self.clone();
1001
1002 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1003 client
1004 .modify_order(
1005 trader_id,
1006 strategy_id,
1007 instrument_id,
1008 client_order_id,
1009 price,
1010 quantity,
1011 venue_order_id,
1012 )
1013 .await
1014 .map_err(to_pyvalue_err)
1015 })
1016 }
1017
1018 #[allow(clippy::type_complexity)]
1019 #[pyo3(name = "batch_submit_orders")]
1020 fn py_batch_submit_orders<'py>(
1021 &self,
1022 py: Python<'py>,
1023 orders: Vec<Py<PyAny>>,
1024 ) -> PyResult<Bound<'py, PyAny>> {
1025 let mut domain_orders = Vec::with_capacity(orders.len());
1026
1027 for obj in orders {
1028 let (
1029 instrument_type,
1030 instrument_id,
1031 td_mode,
1032 client_order_id,
1033 order_side,
1034 order_type,
1035 quantity,
1036 position_side,
1037 price,
1038 trigger_price,
1039 post_only,
1040 reduce_only,
1041 ): (
1042 OKXInstrumentType,
1043 InstrumentId,
1044 OKXTradeMode,
1045 ClientOrderId,
1046 OrderSide,
1047 OrderType,
1048 Quantity,
1049 Option<PositionSide>,
1050 Option<Price>,
1051 Option<Price>,
1052 Option<bool>,
1053 Option<bool>,
1054 ) = obj
1055 .extract(py)
1056 .map_err(|e: PyErr| PyRuntimeError::new_err(e.to_string()))?;
1057
1058 domain_orders.push((
1059 instrument_type,
1060 instrument_id,
1061 td_mode,
1062 client_order_id,
1063 order_side,
1064 position_side,
1065 order_type,
1066 quantity,
1067 price,
1068 trigger_price,
1069 post_only,
1070 reduce_only,
1071 ));
1072 }
1073
1074 let client = self.clone();
1075
1076 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1077 client
1078 .batch_submit_orders(domain_orders)
1079 .await
1080 .map_err(to_pyvalue_err)
1081 })
1082 }
1083
1084 #[pyo3(name = "batch_cancel_orders")]
1086 fn py_batch_cancel_orders<'py>(
1087 &self,
1088 py: Python<'py>,
1089 cancels: Vec<Py<PyAny>>,
1090 ) -> PyResult<Bound<'py, PyAny>> {
1091 let mut batched_cancels = Vec::with_capacity(cancels.len());
1092
1093 for obj in cancels {
1094 let (instrument_id, client_order_id, order_id): (
1095 InstrumentId,
1096 Option<ClientOrderId>,
1097 Option<VenueOrderId>,
1098 ) = obj
1099 .extract(py)
1100 .map_err(|e: PyErr| PyRuntimeError::new_err(e.to_string()))?;
1101 batched_cancels.push((instrument_id, client_order_id, order_id));
1102 }
1103
1104 let client = self.clone();
1105
1106 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1107 client
1108 .batch_cancel_orders(batched_cancels)
1109 .await
1110 .map_err(to_pyvalue_err)
1111 })
1112 }
1113
1114 #[pyo3(name = "batch_modify_orders")]
1115 fn py_batch_modify_orders<'py>(
1116 &self,
1117 py: Python<'py>,
1118 orders: Vec<Py<PyAny>>,
1119 ) -> PyResult<Bound<'py, PyAny>> {
1120 let mut domain_orders = Vec::with_capacity(orders.len());
1121
1122 for obj in orders {
1123 let (
1124 instrument_type,
1125 instrument_id,
1126 client_order_id,
1127 new_client_order_id,
1128 price,
1129 quantity,
1130 ): (
1131 String,
1132 InstrumentId,
1133 ClientOrderId,
1134 ClientOrderId,
1135 Option<Price>,
1136 Option<Quantity>,
1137 ) = obj
1138 .extract(py)
1139 .map_err(|e: PyErr| PyRuntimeError::new_err(e.to_string()))?;
1140 let inst_type =
1141 OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1142 domain_orders.push((
1143 inst_type,
1144 instrument_id,
1145 client_order_id,
1146 new_client_order_id,
1147 price,
1148 quantity,
1149 ));
1150 }
1151
1152 let client = self.clone();
1153
1154 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1155 client
1156 .batch_modify_orders(domain_orders)
1157 .await
1158 .map_err(to_pyvalue_err)
1159 })
1160 }
1161
1162 #[pyo3(name = "mass_cancel_orders")]
1163 fn py_mass_cancel_orders<'py>(
1164 &self,
1165 py: Python<'py>,
1166 instrument_id: InstrumentId,
1167 ) -> PyResult<Bound<'py, PyAny>> {
1168 let client = self.clone();
1169
1170 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1171 client
1172 .mass_cancel_orders(instrument_id)
1173 .await
1174 .map_err(to_pyvalue_err)
1175 })
1176 }
1177
1178 #[pyo3(name = "cache_instruments")]
1179 fn py_cache_instruments(&self, py: Python<'_>, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
1180 let instruments: Result<Vec<_>, _> = instruments
1181 .into_iter()
1182 .map(|inst| pyobject_to_instrument_any(py, inst))
1183 .collect();
1184 self.cache_instruments(instruments?);
1185 Ok(())
1186 }
1187
1188 #[pyo3(name = "cache_instrument")]
1189 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
1190 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
1191 Ok(())
1192 }
1193}
1194
1195pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
1196 if let Err(e) = callback.call1(py, (py_obj,)) {
1197 tracing::error!("Error calling Python: {e}");
1198 }
1199}
1200
1201fn call_python_with_data<F>(callback: &Py<PyAny>, data_converter: F)
1202where
1203 F: FnOnce(Python) -> PyResult<Py<PyAny>>,
1204{
1205 Python::attach(|py| match data_converter(py) {
1206 Ok(py_obj) => call_python(py, callback, py_obj),
1207 Err(e) => tracing::error!("Failed to convert data to Python object: {e}"),
1208 });
1209}