1use std::str::FromStr;
45
46use futures_util::StreamExt;
47use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
48use nautilus_model::{
49 data::{BarType, Data, OrderBookDeltas_API},
50 enums::{OrderSide, OrderType, PositionSide, TimeInForce},
51 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
52 python::{
53 data::data_to_pycapsule,
54 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
55 },
56 types::{Price, Quantity},
57};
58use pyo3::{IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
59
60use crate::{
61 common::enums::{OKXInstrumentType, OKXTradeMode, OKXVipLevel},
62 websocket::{
63 OKXWebSocketClient,
64 messages::{ExecutionReport, NautilusWsMessage, OKXWebSocketError},
65 },
66};
67
68#[pyo3::pymethods]
69impl OKXWebSocketError {
70 #[getter]
71 pub fn code(&self) -> &str {
72 &self.code
73 }
74
75 #[getter]
76 pub fn message(&self) -> &str {
77 &self.message
78 }
79
80 #[getter]
81 pub fn conn_id(&self) -> Option<&str> {
82 self.conn_id.as_deref()
83 }
84
85 #[getter]
86 pub fn ts_event(&self) -> u64 {
87 self.timestamp
88 }
89
90 fn __repr__(&self) -> String {
91 format!(
92 "OKXWebSocketError(code='{}', message='{}', conn_id={:?}, ts_event={})",
93 self.code, self.message, self.conn_id, self.timestamp
94 )
95 }
96}
97
98#[pymethods]
99impl OKXWebSocketClient {
100 #[new]
101 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
102 fn py_new(
103 url: Option<String>,
104 api_key: Option<String>,
105 api_secret: Option<String>,
106 api_passphrase: Option<String>,
107 account_id: Option<AccountId>,
108 heartbeat: Option<u64>,
109 ) -> PyResult<Self> {
110 Self::new(
111 url,
112 api_key,
113 api_secret,
114 api_passphrase,
115 account_id,
116 heartbeat,
117 )
118 .map_err(to_pyvalue_err)
119 }
120
121 #[staticmethod]
122 #[pyo3(name = "with_credentials")]
123 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
124 fn py_with_credentials(
125 url: Option<String>,
126 api_key: Option<String>,
127 api_secret: Option<String>,
128 api_passphrase: Option<String>,
129 account_id: Option<AccountId>,
130 heartbeat: Option<u64>,
131 ) -> PyResult<Self> {
132 Self::with_credentials(
133 url,
134 api_key,
135 api_secret,
136 api_passphrase,
137 account_id,
138 heartbeat,
139 )
140 .map_err(to_pyvalue_err)
141 }
142
143 #[staticmethod]
144 #[pyo3(name = "from_env")]
145 fn py_from_env() -> PyResult<Self> {
146 Self::from_env().map_err(to_pyvalue_err)
147 }
148
149 #[getter]
150 #[pyo3(name = "url")]
151 #[must_use]
152 pub fn py_url(&self) -> &str {
153 self.url()
154 }
155
156 #[getter]
157 #[pyo3(name = "api_key")]
158 #[must_use]
159 pub fn py_api_key(&self) -> Option<&str> {
160 self.api_key()
161 }
162
163 #[getter]
164 #[pyo3(name = "api_key_masked")]
165 #[must_use]
166 pub fn py_api_key_masked(&self) -> Option<String> {
167 self.api_key_masked()
168 }
169
170 #[pyo3(name = "is_active")]
171 fn py_is_active(&mut self) -> bool {
172 self.is_active()
173 }
174
175 #[pyo3(name = "is_closed")]
176 fn py_is_closed(&mut self) -> bool {
177 self.is_closed()
178 }
179
180 #[pyo3(name = "cancel_all_requests")]
181 pub fn py_cancel_all_requests(&self) {
182 self.cancel_all_requests();
183 }
184
185 #[pyo3(name = "get_subscriptions")]
186 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
187 let channels = self.get_subscriptions(instrument_id);
188
189 channels
191 .iter()
192 .map(|c| {
193 serde_json::to_value(c)
194 .ok()
195 .and_then(|v| v.as_str().map(String::from))
196 .unwrap_or_else(|| c.to_string())
197 })
198 .collect()
199 }
200
201 #[pyo3(name = "set_vip_level")]
205 fn py_set_vip_level(&self, vip_level: OKXVipLevel) {
206 self.set_vip_level(vip_level);
207 }
208
209 #[pyo3(name = "vip_level")]
211 #[getter]
212 fn py_vip_level(&self) -> OKXVipLevel {
213 self.vip_level()
214 }
215
216 #[pyo3(name = "connect")]
217 fn py_connect<'py>(
218 &mut self,
219 py: Python<'py>,
220 instruments: Vec<Py<PyAny>>,
221 callback: Py<PyAny>,
222 ) -> PyResult<Bound<'py, PyAny>> {
223 let mut instruments_any = Vec::new();
224 for inst in instruments {
225 let inst_any = pyobject_to_instrument_any(py, inst)?;
226 instruments_any.push(inst_any);
227 }
228
229 self.cache_instruments(instruments_any);
230
231 let mut client = self.clone();
232
233 pyo3_async_runtimes::tokio::future_into_py(py, async move {
234 client.connect().await.map_err(to_pyruntime_err)?;
235
236 let stream = client.stream();
237
238 tokio::spawn(async move {
240 let _client = client;
241 tokio::pin!(stream);
242
243 while let Some(msg) = stream.next().await {
244 match msg {
245 NautilusWsMessage::Instrument(msg) => {
246 call_python_with_data(&callback, |py| {
247 instrument_any_to_pyobject(py, *msg)
248 });
249 }
250 NautilusWsMessage::Data(msg) => Python::attach(|py| {
251 for data in msg {
252 let py_obj = data_to_pycapsule(py, data);
253 call_python(py, &callback, py_obj);
254 }
255 }),
256 NautilusWsMessage::FundingRates(msg) => {
257 for data in msg {
258 call_python_with_data(&callback, |py| data.into_py_any(py));
259 }
260 }
261 NautilusWsMessage::OrderRejected(msg) => {
262 call_python_with_data(&callback, |py| msg.into_py_any(py));
263 }
264 NautilusWsMessage::OrderCancelRejected(msg) => {
265 call_python_with_data(&callback, |py| msg.into_py_any(py));
266 }
267 NautilusWsMessage::OrderModifyRejected(msg) => {
268 call_python_with_data(&callback, |py| msg.into_py_any(py));
269 }
270 NautilusWsMessage::ExecutionReports(msg) => {
271 for report in msg {
272 match report {
273 ExecutionReport::Order(report) => {
274 call_python_with_data(&callback, |py| {
275 report.into_py_any(py)
276 });
277 }
278 ExecutionReport::Fill(report) => {
279 call_python_with_data(&callback, |py| {
280 report.into_py_any(py)
281 });
282 }
283 };
284 }
285 }
286 NautilusWsMessage::Deltas(msg) => Python::attach(|py| {
287 let py_obj =
288 data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(msg)));
289 call_python(py, &callback, py_obj);
290 }),
291 NautilusWsMessage::AccountUpdate(msg) => {
292 call_python_with_data(&callback, |py| msg.into_py_any(py));
293 }
294 NautilusWsMessage::PositionUpdate(msg) => {
295 call_python_with_data(&callback, |py| msg.into_py_any(py));
296 }
297 NautilusWsMessage::Reconnected => {} NautilusWsMessage::Authenticated => {} NautilusWsMessage::Error(msg) => {
300 call_python_with_data(&callback, |py| msg.into_py_any(py));
301 }
302 NautilusWsMessage::Raw(msg) => {
303 tracing::debug!("Received raw message, skipping: {msg}");
304 }
305 }
306 }
307 });
308
309 Ok(())
310 })
311 }
312
313 #[pyo3(name = "wait_until_active")]
314 fn py_wait_until_active<'py>(
315 &self,
316 py: Python<'py>,
317 timeout_secs: f64,
318 ) -> PyResult<Bound<'py, PyAny>> {
319 let client = self.clone();
320
321 pyo3_async_runtimes::tokio::future_into_py(py, async move {
322 client
323 .wait_until_active(timeout_secs)
324 .await
325 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
326 Ok(())
327 })
328 }
329
330 #[pyo3(name = "close")]
331 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
332 let mut client = self.clone();
333
334 pyo3_async_runtimes::tokio::future_into_py(py, async move {
335 if let Err(e) = client.close().await {
336 log::error!("Error on close: {e}");
337 }
338 Ok(())
339 })
340 }
341
342 #[pyo3(name = "subscribe_instruments")]
343 fn py_subscribe_instruments<'py>(
344 &self,
345 py: Python<'py>,
346 instrument_type: OKXInstrumentType,
347 ) -> PyResult<Bound<'py, PyAny>> {
348 let client = self.clone();
349
350 pyo3_async_runtimes::tokio::future_into_py(py, async move {
351 if let Err(e) = client.subscribe_instruments(instrument_type).await {
352 log::error!("Failed to subscribe to instruments '{instrument_type}': {e}");
353 }
354 Ok(())
355 })
356 }
357
358 #[pyo3(name = "subscribe_instrument")]
359 fn py_subscribe_instrument<'py>(
360 &self,
361 py: Python<'py>,
362 instrument_id: InstrumentId,
363 ) -> PyResult<Bound<'py, PyAny>> {
364 let client = self.clone();
365
366 pyo3_async_runtimes::tokio::future_into_py(py, async move {
367 if let Err(e) = client.subscribe_instrument(instrument_id).await {
368 log::error!("Failed to subscribe to instrument {instrument_id}: {e}");
369 }
370 Ok(())
371 })
372 }
373
374 #[pyo3(name = "subscribe_book")]
375 fn py_subscribe_book<'py>(
376 &self,
377 py: Python<'py>,
378 instrument_id: InstrumentId,
379 ) -> PyResult<Bound<'py, PyAny>> {
380 let client = self.clone();
381
382 pyo3_async_runtimes::tokio::future_into_py(py, async move {
383 client
384 .subscribe_book(instrument_id)
385 .await
386 .map_err(to_pyvalue_err)
387 })
388 }
389
390 #[pyo3(name = "subscribe_book50_l2_tbt")]
391 fn py_subscribe_book50_l2_tbt<'py>(
392 &self,
393 py: Python<'py>,
394 instrument_id: InstrumentId,
395 ) -> PyResult<Bound<'py, PyAny>> {
396 let client = self.clone();
397
398 pyo3_async_runtimes::tokio::future_into_py(py, async move {
399 if let Err(e) = client.subscribe_book50_l2_tbt(instrument_id).await {
400 log::error!("Failed to subscribe to book50_tbt: {e}");
401 }
402 Ok(())
403 })
404 }
405
406 #[pyo3(name = "subscribe_book_l2_tbt")]
407 fn py_subscribe_book_l2_tbt<'py>(
408 &self,
409 py: Python<'py>,
410 instrument_id: InstrumentId,
411 ) -> PyResult<Bound<'py, PyAny>> {
412 let client = self.clone();
413
414 pyo3_async_runtimes::tokio::future_into_py(py, async move {
415 if let Err(e) = client.subscribe_book_l2_tbt(instrument_id).await {
416 log::error!("Failed to subscribe to books_l2_tbt: {e}");
417 }
418 Ok(())
419 })
420 }
421
422 #[pyo3(name = "subscribe_book_with_depth")]
423 fn py_subscribe_book_with_depth<'py>(
424 &self,
425 py: Python<'py>,
426 instrument_id: InstrumentId,
427 depth: u16,
428 ) -> PyResult<Bound<'py, PyAny>> {
429 let client = self.clone();
430
431 pyo3_async_runtimes::tokio::future_into_py(py, async move {
432 client
433 .subscribe_book_with_depth(instrument_id, depth)
434 .await
435 .map_err(to_pyvalue_err)
436 })
437 }
438
439 #[pyo3(name = "subscribe_book_depth5")]
440 fn py_subscribe_book_depth5<'py>(
441 &self,
442 py: Python<'py>,
443 instrument_id: InstrumentId,
444 ) -> PyResult<Bound<'py, PyAny>> {
445 let client = self.clone();
446
447 pyo3_async_runtimes::tokio::future_into_py(py, async move {
448 if let Err(e) = client.subscribe_book_depth5(instrument_id).await {
449 log::error!("Failed to subscribe to books5: {e}");
450 }
451 Ok(())
452 })
453 }
454
455 #[pyo3(name = "subscribe_quotes")]
456 fn py_subscribe_quotes<'py>(
457 &self,
458 py: Python<'py>,
459 instrument_id: InstrumentId,
460 ) -> PyResult<Bound<'py, PyAny>> {
461 let client = self.clone();
462
463 pyo3_async_runtimes::tokio::future_into_py(py, async move {
464 if let Err(e) = client.subscribe_quotes(instrument_id).await {
465 log::error!("Failed to subscribe to quotes: {e}");
466 }
467 Ok(())
468 })
469 }
470
471 #[pyo3(name = "subscribe_trades")]
472 fn py_subscribe_trades<'py>(
473 &self,
474 py: Python<'py>,
475 instrument_id: InstrumentId,
476 aggregated: bool,
477 ) -> PyResult<Bound<'py, PyAny>> {
478 let client = self.clone();
479
480 pyo3_async_runtimes::tokio::future_into_py(py, async move {
481 if let Err(e) = client.subscribe_trades(instrument_id, aggregated).await {
482 log::error!("Failed to subscribe to trades: {e}");
483 }
484 Ok(())
485 })
486 }
487
488 #[pyo3(name = "subscribe_bars")]
489 fn py_subscribe_bars<'py>(
490 &self,
491 py: Python<'py>,
492 bar_type: BarType,
493 ) -> PyResult<Bound<'py, PyAny>> {
494 let client = self.clone();
495
496 pyo3_async_runtimes::tokio::future_into_py(py, async move {
497 if let Err(e) = client.subscribe_bars(bar_type).await {
498 log::error!("Failed to subscribe to bars: {e}");
499 }
500 Ok(())
501 })
502 }
503
504 #[pyo3(name = "unsubscribe_book")]
505 fn py_unsubscribe_book<'py>(
506 &self,
507 py: Python<'py>,
508 instrument_id: InstrumentId,
509 ) -> PyResult<Bound<'py, PyAny>> {
510 let client = self.clone();
511
512 pyo3_async_runtimes::tokio::future_into_py(py, async move {
513 if let Err(e) = client.unsubscribe_book(instrument_id).await {
514 log::error!("Failed to unsubscribe from order book: {e}");
515 }
516 Ok(())
517 })
518 }
519
520 #[pyo3(name = "unsubscribe_book_depth5")]
521 fn py_unsubscribe_book_depth5<'py>(
522 &self,
523 py: Python<'py>,
524 instrument_id: InstrumentId,
525 ) -> PyResult<Bound<'py, PyAny>> {
526 let client = self.clone();
527
528 pyo3_async_runtimes::tokio::future_into_py(py, async move {
529 if let Err(e) = client.unsubscribe_book_depth5(instrument_id).await {
530 log::error!("Failed to unsubscribe from books5: {e}");
531 }
532 Ok(())
533 })
534 }
535
536 #[pyo3(name = "unsubscribe_book50_l2_tbt")]
537 fn py_unsubscribe_book50_l2_tbt<'py>(
538 &self,
539 py: Python<'py>,
540 instrument_id: InstrumentId,
541 ) -> PyResult<Bound<'py, PyAny>> {
542 let client = self.clone();
543
544 pyo3_async_runtimes::tokio::future_into_py(py, async move {
545 if let Err(e) = client.unsubscribe_book50_l2_tbt(instrument_id).await {
546 log::error!("Failed to unsubscribe from books50_l2_tbt: {e}");
547 }
548 Ok(())
549 })
550 }
551
552 #[pyo3(name = "unsubscribe_book_l2_tbt")]
553 fn py_unsubscribe_book_l2_tbt<'py>(
554 &self,
555 py: Python<'py>,
556 instrument_id: InstrumentId,
557 ) -> PyResult<Bound<'py, PyAny>> {
558 let client = self.clone();
559
560 pyo3_async_runtimes::tokio::future_into_py(py, async move {
561 if let Err(e) = client.unsubscribe_book_l2_tbt(instrument_id).await {
562 log::error!("Failed to unsubscribe from books_l2_tbt: {e}");
563 }
564 Ok(())
565 })
566 }
567
568 #[pyo3(name = "unsubscribe_quotes")]
569 fn py_unsubscribe_quotes<'py>(
570 &self,
571 py: Python<'py>,
572 instrument_id: InstrumentId,
573 ) -> PyResult<Bound<'py, PyAny>> {
574 let client = self.clone();
575
576 pyo3_async_runtimes::tokio::future_into_py(py, async move {
577 if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
578 log::error!("Failed to unsubscribe from quotes: {e}");
579 }
580 Ok(())
581 })
582 }
583
584 #[pyo3(name = "unsubscribe_trades")]
585 fn py_unsubscribe_trades<'py>(
586 &self,
587 py: Python<'py>,
588 instrument_id: InstrumentId,
589 aggregated: bool,
590 ) -> PyResult<Bound<'py, PyAny>> {
591 let client = self.clone();
592
593 pyo3_async_runtimes::tokio::future_into_py(py, async move {
594 if let Err(e) = client.unsubscribe_trades(instrument_id, aggregated).await {
595 log::error!("Failed to unsubscribe from trades: {e}");
596 }
597 Ok(())
598 })
599 }
600
601 #[pyo3(name = "unsubscribe_bars")]
602 fn py_unsubscribe_bars<'py>(
603 &self,
604 py: Python<'py>,
605 bar_type: BarType,
606 ) -> PyResult<Bound<'py, PyAny>> {
607 let client = self.clone();
608
609 pyo3_async_runtimes::tokio::future_into_py(py, async move {
610 if let Err(e) = client.unsubscribe_bars(bar_type).await {
611 log::error!("Failed to unsubscribe from bars: {e}");
612 }
613 Ok(())
614 })
615 }
616
617 #[pyo3(name = "subscribe_ticker")]
618 fn py_subscribe_ticker<'py>(
619 &self,
620 py: Python<'py>,
621 instrument_id: InstrumentId,
622 ) -> PyResult<Bound<'py, PyAny>> {
623 let client = self.clone();
624
625 pyo3_async_runtimes::tokio::future_into_py(py, async move {
626 if let Err(e) = client.subscribe_ticker(instrument_id).await {
627 log::error!("Failed to subscribe to ticker: {e}");
628 }
629 Ok(())
630 })
631 }
632
633 #[pyo3(name = "unsubscribe_ticker")]
634 fn py_unsubscribe_ticker<'py>(
635 &self,
636 py: Python<'py>,
637 instrument_id: InstrumentId,
638 ) -> PyResult<Bound<'py, PyAny>> {
639 let client = self.clone();
640
641 pyo3_async_runtimes::tokio::future_into_py(py, async move {
642 if let Err(e) = client.unsubscribe_ticker(instrument_id).await {
643 log::error!("Failed to unsubscribe from ticker: {e}");
644 }
645 Ok(())
646 })
647 }
648
649 #[pyo3(name = "subscribe_mark_prices")]
650 fn py_subscribe_mark_prices<'py>(
651 &self,
652 py: Python<'py>,
653 instrument_id: InstrumentId,
654 ) -> PyResult<Bound<'py, PyAny>> {
655 let client = self.clone();
656
657 pyo3_async_runtimes::tokio::future_into_py(py, async move {
658 if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
659 log::error!("Failed to subscribe to mark prices: {e}");
660 }
661 Ok(())
662 })
663 }
664
665 #[pyo3(name = "unsubscribe_mark_prices")]
666 fn py_unsubscribe_mark_prices<'py>(
667 &self,
668 py: Python<'py>,
669 instrument_id: InstrumentId,
670 ) -> PyResult<Bound<'py, PyAny>> {
671 let client = self.clone();
672
673 pyo3_async_runtimes::tokio::future_into_py(py, async move {
674 if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
675 log::error!("Failed to unsubscribe from mark prices: {e}");
676 }
677 Ok(())
678 })
679 }
680
681 #[pyo3(name = "subscribe_index_prices")]
682 fn py_subscribe_index_prices<'py>(
683 &self,
684 py: Python<'py>,
685 instrument_id: InstrumentId,
686 ) -> PyResult<Bound<'py, PyAny>> {
687 let client = self.clone();
688
689 pyo3_async_runtimes::tokio::future_into_py(py, async move {
690 if let Err(e) = client.subscribe_index_prices(instrument_id).await {
691 log::error!("Failed to subscribe to index prices: {e}");
692 }
693 Ok(())
694 })
695 }
696
697 #[pyo3(name = "unsubscribe_index_prices")]
698 fn py_unsubscribe_index_prices<'py>(
699 &self,
700 py: Python<'py>,
701 instrument_id: InstrumentId,
702 ) -> PyResult<Bound<'py, PyAny>> {
703 let client = self.clone();
704
705 pyo3_async_runtimes::tokio::future_into_py(py, async move {
706 if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
707 log::error!("Failed to unsubscribe from index prices: {e}");
708 }
709 Ok(())
710 })
711 }
712
713 #[pyo3(name = "subscribe_funding_rates")]
714 fn py_subscribe_funding_rates<'py>(
715 &self,
716 py: Python<'py>,
717 instrument_id: InstrumentId,
718 ) -> PyResult<Bound<'py, PyAny>> {
719 let client = self.clone();
720
721 pyo3_async_runtimes::tokio::future_into_py(py, async move {
722 if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
723 log::error!("Failed to subscribe to funding rates: {e}");
724 }
725 Ok(())
726 })
727 }
728
729 #[pyo3(name = "unsubscribe_funding_rates")]
730 fn py_unsubscribe_funding_rates<'py>(
731 &self,
732 py: Python<'py>,
733 instrument_id: InstrumentId,
734 ) -> PyResult<Bound<'py, PyAny>> {
735 let client = self.clone();
736
737 pyo3_async_runtimes::tokio::future_into_py(py, async move {
738 if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
739 log::error!("Failed to unsubscribe from funding rates: {e}");
740 }
741 Ok(())
742 })
743 }
744
745 #[pyo3(name = "subscribe_orders")]
746 fn py_subscribe_orders<'py>(
747 &self,
748 py: Python<'py>,
749 instrument_type: OKXInstrumentType,
750 ) -> PyResult<Bound<'py, PyAny>> {
751 let client = self.clone();
752
753 pyo3_async_runtimes::tokio::future_into_py(py, async move {
754 if let Err(e) = client.subscribe_orders(instrument_type).await {
755 log::error!("Failed to subscribe to orders '{instrument_type}': {e}");
756 }
757 Ok(())
758 })
759 }
760
761 #[pyo3(name = "unsubscribe_orders")]
762 fn py_unsubscribe_orders<'py>(
763 &self,
764 py: Python<'py>,
765 instrument_type: OKXInstrumentType,
766 ) -> PyResult<Bound<'py, PyAny>> {
767 let client = self.clone();
768
769 pyo3_async_runtimes::tokio::future_into_py(py, async move {
770 if let Err(e) = client.unsubscribe_orders(instrument_type).await {
771 log::error!("Failed to unsubscribe from orders '{instrument_type}': {e}");
772 }
773 Ok(())
774 })
775 }
776
777 #[pyo3(name = "subscribe_orders_algo")]
778 fn py_subscribe_orders_algo<'py>(
779 &self,
780 py: Python<'py>,
781 instrument_type: OKXInstrumentType,
782 ) -> PyResult<Bound<'py, PyAny>> {
783 let client = self.clone();
784
785 pyo3_async_runtimes::tokio::future_into_py(py, async move {
786 if let Err(e) = client.subscribe_orders_algo(instrument_type).await {
787 log::error!("Failed to subscribe to algo orders '{instrument_type}': {e}");
788 }
789 Ok(())
790 })
791 }
792
793 #[pyo3(name = "unsubscribe_orders_algo")]
794 fn py_unsubscribe_orders_algo<'py>(
795 &self,
796 py: Python<'py>,
797 instrument_type: OKXInstrumentType,
798 ) -> PyResult<Bound<'py, PyAny>> {
799 let client = self.clone();
800
801 pyo3_async_runtimes::tokio::future_into_py(py, async move {
802 if let Err(e) = client.unsubscribe_orders_algo(instrument_type).await {
803 log::error!("Failed to unsubscribe from algo orders '{instrument_type}': {e}");
804 }
805 Ok(())
806 })
807 }
808
809 #[pyo3(name = "subscribe_fills")]
810 fn py_subscribe_fills<'py>(
811 &self,
812 py: Python<'py>,
813 instrument_type: OKXInstrumentType,
814 ) -> PyResult<Bound<'py, PyAny>> {
815 let client = self.clone();
816
817 pyo3_async_runtimes::tokio::future_into_py(py, async move {
818 if let Err(e) = client.subscribe_fills(instrument_type).await {
819 log::error!("Failed to subscribe to fills '{instrument_type}': {e}");
820 }
821 Ok(())
822 })
823 }
824
825 #[pyo3(name = "unsubscribe_fills")]
826 fn py_unsubscribe_fills<'py>(
827 &self,
828 py: Python<'py>,
829 instrument_type: OKXInstrumentType,
830 ) -> PyResult<Bound<'py, PyAny>> {
831 let client = self.clone();
832
833 pyo3_async_runtimes::tokio::future_into_py(py, async move {
834 if let Err(e) = client.unsubscribe_fills(instrument_type).await {
835 log::error!("Failed to unsubscribe from fills '{instrument_type}': {e}");
836 }
837 Ok(())
838 })
839 }
840
841 #[pyo3(name = "subscribe_account")]
842 fn py_subscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
843 let client = self.clone();
844
845 pyo3_async_runtimes::tokio::future_into_py(py, async move {
846 if let Err(e) = client.subscribe_account().await {
847 log::error!("Failed to subscribe to account: {e}");
848 }
849 Ok(())
850 })
851 }
852
853 #[pyo3(name = "unsubscribe_account")]
854 fn py_unsubscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
855 let client = self.clone();
856
857 pyo3_async_runtimes::tokio::future_into_py(py, async move {
858 if let Err(e) = client.unsubscribe_account().await {
859 log::error!("Failed to unsubscribe from account: {e}");
860 }
861 Ok(())
862 })
863 }
864
865 #[pyo3(name = "submit_order")]
866 #[pyo3(signature = (
867 trader_id,
868 strategy_id,
869 instrument_id,
870 td_mode,
871 client_order_id,
872 order_side,
873 order_type,
874 quantity,
875 time_in_force=None,
876 price=None,
877 trigger_price=None,
878 post_only=None,
879 reduce_only=None,
880 quote_quantity=None,
881 position_side=None,
882 ))]
883 #[allow(clippy::too_many_arguments)]
884 fn py_submit_order<'py>(
885 &self,
886 py: Python<'py>,
887 trader_id: TraderId,
888 strategy_id: StrategyId,
889 instrument_id: InstrumentId,
890 td_mode: OKXTradeMode,
891 client_order_id: ClientOrderId,
892 order_side: OrderSide,
893 order_type: OrderType,
894 quantity: Quantity,
895 time_in_force: Option<TimeInForce>,
896 price: Option<Price>,
897 trigger_price: Option<Price>,
898 post_only: Option<bool>,
899 reduce_only: Option<bool>,
900 quote_quantity: Option<bool>,
901 position_side: Option<PositionSide>,
902 ) -> PyResult<Bound<'py, PyAny>> {
903 let client = self.clone();
904
905 pyo3_async_runtimes::tokio::future_into_py(py, async move {
906 client
907 .submit_order(
908 trader_id,
909 strategy_id,
910 instrument_id,
911 td_mode,
912 client_order_id,
913 order_side,
914 order_type,
915 quantity,
916 time_in_force,
917 price,
918 trigger_price,
919 post_only,
920 reduce_only,
921 quote_quantity,
922 position_side,
923 )
924 .await
925 .map_err(to_pyvalue_err)
926 })
927 }
928
929 #[pyo3(name = "cancel_order", signature = (
930 trader_id,
931 strategy_id,
932 instrument_id,
933 client_order_id=None,
934 venue_order_id=None,
935 ))]
936 #[allow(clippy::too_many_arguments)]
937 fn py_cancel_order<'py>(
938 &self,
939 py: Python<'py>,
940 trader_id: TraderId,
941 strategy_id: StrategyId,
942 instrument_id: InstrumentId,
943 client_order_id: Option<ClientOrderId>,
944 venue_order_id: Option<VenueOrderId>,
945 ) -> PyResult<Bound<'py, PyAny>> {
946 let client = self.clone();
947
948 pyo3_async_runtimes::tokio::future_into_py(py, async move {
949 client
950 .cancel_order(
951 trader_id,
952 strategy_id,
953 instrument_id,
954 client_order_id,
955 venue_order_id,
956 )
957 .await
958 .map_err(to_pyvalue_err)
959 })
960 }
961
962 #[pyo3(name = "modify_order")]
963 #[pyo3(signature = (
964 trader_id,
965 strategy_id,
966 instrument_id,
967 client_order_id=None,
968 venue_order_id=None,
969 price=None,
970 quantity=None,
971 ))]
972 #[allow(clippy::too_many_arguments)]
973 fn py_modify_order<'py>(
974 &self,
975 py: Python<'py>,
976 trader_id: TraderId,
977 strategy_id: StrategyId,
978 instrument_id: InstrumentId,
979 client_order_id: Option<ClientOrderId>,
980 venue_order_id: Option<VenueOrderId>,
981 price: Option<Price>,
982 quantity: Option<Quantity>,
983 ) -> PyResult<Bound<'py, PyAny>> {
984 let client = self.clone();
985
986 pyo3_async_runtimes::tokio::future_into_py(py, async move {
987 client
988 .modify_order(
989 trader_id,
990 strategy_id,
991 instrument_id,
992 client_order_id,
993 price,
994 quantity,
995 venue_order_id,
996 )
997 .await
998 .map_err(to_pyvalue_err)
999 })
1000 }
1001
1002 #[allow(clippy::type_complexity)]
1003 #[pyo3(name = "batch_submit_orders")]
1004 fn py_batch_submit_orders<'py>(
1005 &self,
1006 py: Python<'py>,
1007 orders: Vec<Py<PyAny>>,
1008 ) -> PyResult<Bound<'py, PyAny>> {
1009 let mut domain_orders = Vec::with_capacity(orders.len());
1010
1011 for obj in orders {
1012 let (
1013 instrument_type,
1014 instrument_id,
1015 td_mode,
1016 client_order_id,
1017 order_side,
1018 order_type,
1019 quantity,
1020 position_side,
1021 price,
1022 trigger_price,
1023 post_only,
1024 reduce_only,
1025 ): (
1026 OKXInstrumentType,
1027 InstrumentId,
1028 OKXTradeMode,
1029 ClientOrderId,
1030 OrderSide,
1031 OrderType,
1032 Quantity,
1033 Option<PositionSide>,
1034 Option<Price>,
1035 Option<Price>,
1036 Option<bool>,
1037 Option<bool>,
1038 ) = obj
1039 .extract(py)
1040 .map_err(|e: PyErr| PyRuntimeError::new_err(e.to_string()))?;
1041
1042 domain_orders.push((
1043 instrument_type,
1044 instrument_id,
1045 td_mode,
1046 client_order_id,
1047 order_side,
1048 position_side,
1049 order_type,
1050 quantity,
1051 price,
1052 trigger_price,
1053 post_only,
1054 reduce_only,
1055 ));
1056 }
1057
1058 let client = self.clone();
1059
1060 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1061 client
1062 .batch_submit_orders(domain_orders)
1063 .await
1064 .map_err(to_pyvalue_err)
1065 })
1066 }
1067
1068 #[pyo3(name = "batch_cancel_orders")]
1070 fn py_batch_cancel_orders<'py>(
1071 &self,
1072 py: Python<'py>,
1073 cancels: Vec<Py<PyAny>>,
1074 ) -> PyResult<Bound<'py, PyAny>> {
1075 let mut batched_cancels = Vec::with_capacity(cancels.len());
1076
1077 for obj in cancels {
1078 let (instrument_id, client_order_id, order_id): (
1079 InstrumentId,
1080 Option<ClientOrderId>,
1081 Option<VenueOrderId>,
1082 ) = obj
1083 .extract(py)
1084 .map_err(|e: PyErr| PyRuntimeError::new_err(e.to_string()))?;
1085 batched_cancels.push((instrument_id, client_order_id, order_id));
1086 }
1087
1088 let client = self.clone();
1089
1090 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1091 client
1092 .batch_cancel_orders(batched_cancels)
1093 .await
1094 .map_err(to_pyvalue_err)
1095 })
1096 }
1097
1098 #[pyo3(name = "batch_modify_orders")]
1099 fn py_batch_modify_orders<'py>(
1100 &self,
1101 py: Python<'py>,
1102 orders: Vec<Py<PyAny>>,
1103 ) -> PyResult<Bound<'py, PyAny>> {
1104 let mut domain_orders = Vec::with_capacity(orders.len());
1105
1106 for obj in orders {
1107 let (
1108 instrument_type,
1109 instrument_id,
1110 client_order_id,
1111 new_client_order_id,
1112 price,
1113 quantity,
1114 ): (
1115 String,
1116 InstrumentId,
1117 ClientOrderId,
1118 ClientOrderId,
1119 Option<Price>,
1120 Option<Quantity>,
1121 ) = obj
1122 .extract(py)
1123 .map_err(|e: PyErr| PyRuntimeError::new_err(e.to_string()))?;
1124 let inst_type =
1125 OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1126 domain_orders.push((
1127 inst_type,
1128 instrument_id,
1129 client_order_id,
1130 new_client_order_id,
1131 price,
1132 quantity,
1133 ));
1134 }
1135
1136 let client = self.clone();
1137
1138 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1139 client
1140 .batch_modify_orders(domain_orders)
1141 .await
1142 .map_err(to_pyvalue_err)
1143 })
1144 }
1145
1146 #[pyo3(name = "mass_cancel_orders")]
1147 fn py_mass_cancel_orders<'py>(
1148 &self,
1149 py: Python<'py>,
1150 instrument_id: InstrumentId,
1151 ) -> PyResult<Bound<'py, PyAny>> {
1152 let client = self.clone();
1153
1154 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1155 client
1156 .mass_cancel_orders(instrument_id)
1157 .await
1158 .map_err(to_pyvalue_err)
1159 })
1160 }
1161
1162 #[pyo3(name = "cache_instruments")]
1163 fn py_cache_instruments(&self, py: Python<'_>, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
1164 let instruments: Result<Vec<_>, _> = instruments
1165 .into_iter()
1166 .map(|inst| pyobject_to_instrument_any(py, inst))
1167 .collect();
1168 self.cache_instruments(instruments?);
1169 Ok(())
1170 }
1171
1172 #[pyo3(name = "cache_instrument")]
1173 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
1174 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
1175 Ok(())
1176 }
1177}
1178
1179pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
1180 if let Err(e) = callback.call1(py, (py_obj,)) {
1181 tracing::error!("Error calling Python: {e}");
1182 }
1183}
1184
1185fn call_python_with_data<F>(callback: &Py<PyAny>, data_converter: F)
1186where
1187 F: FnOnce(Python) -> PyResult<Py<PyAny>>,
1188{
1189 Python::attach(|py| match data_converter(py) {
1190 Ok(py_obj) => call_python(py, callback, py_obj),
1191 Err(e) => tracing::error!("Failed to convert data to Python object: {e}"),
1192 });
1193}