1use std::str::FromStr;
45
46use futures_util::StreamExt;
47use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
48use nautilus_model::{
49 data::{BarType, Data, OrderBookDeltas_API},
50 enums::{OrderSide, OrderType, PositionSide, TimeInForce},
51 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
52 python::{
53 data::data_to_pycapsule,
54 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
55 },
56 types::{Price, Quantity},
57};
58use pyo3::{IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
59
60use crate::{
61 common::enums::{OKXInstrumentType, OKXTradeMode, OKXVipLevel},
62 websocket::{
63 OKXWebSocketClient,
64 messages::{ExecutionReport, NautilusWsMessage, OKXWebSocketError},
65 },
66};
67
68#[pyo3::pymethods]
69impl OKXWebSocketError {
70 #[getter]
71 pub fn code(&self) -> &str {
72 &self.code
73 }
74
75 #[getter]
76 pub fn message(&self) -> &str {
77 &self.message
78 }
79
80 #[getter]
81 pub fn conn_id(&self) -> Option<&str> {
82 self.conn_id.as_deref()
83 }
84
85 #[getter]
86 pub fn ts_event(&self) -> u64 {
87 self.timestamp
88 }
89
90 fn __repr__(&self) -> String {
91 format!(
92 "OKXWebSocketError(code='{}', message='{}', conn_id={:?}, ts_event={})",
93 self.code, self.message, self.conn_id, self.timestamp
94 )
95 }
96}
97
98#[pymethods]
99impl OKXWebSocketClient {
100 #[new]
101 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
102 fn py_new(
103 url: Option<String>,
104 api_key: Option<String>,
105 api_secret: Option<String>,
106 api_passphrase: Option<String>,
107 account_id: Option<AccountId>,
108 heartbeat: Option<u64>,
109 ) -> PyResult<Self> {
110 Self::new(
111 url,
112 api_key,
113 api_secret,
114 api_passphrase,
115 account_id,
116 heartbeat,
117 )
118 .map_err(to_pyvalue_err)
119 }
120
121 #[staticmethod]
122 #[pyo3(name = "with_credentials")]
123 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
124 fn py_with_credentials(
125 url: Option<String>,
126 api_key: Option<String>,
127 api_secret: Option<String>,
128 api_passphrase: Option<String>,
129 account_id: Option<AccountId>,
130 heartbeat: Option<u64>,
131 ) -> PyResult<Self> {
132 Self::with_credentials(
133 url,
134 api_key,
135 api_secret,
136 api_passphrase,
137 account_id,
138 heartbeat,
139 )
140 .map_err(to_pyvalue_err)
141 }
142
143 #[staticmethod]
144 #[pyo3(name = "from_env")]
145 fn py_from_env() -> PyResult<Self> {
146 Self::from_env().map_err(to_pyvalue_err)
147 }
148
149 #[getter]
150 #[pyo3(name = "url")]
151 #[must_use]
152 pub fn py_url(&self) -> &str {
153 self.url()
154 }
155
156 #[getter]
157 #[pyo3(name = "api_key")]
158 #[must_use]
159 pub fn py_api_key(&self) -> Option<&str> {
160 self.api_key()
161 }
162
163 #[pyo3(name = "is_active")]
164 fn py_is_active(&mut self) -> bool {
165 self.is_active()
166 }
167
168 #[pyo3(name = "is_closed")]
169 fn py_is_closed(&mut self) -> bool {
170 self.is_closed()
171 }
172
173 #[pyo3(name = "cancel_all_requests")]
174 pub fn py_cancel_all_requests(&self) {
175 self.cancel_all_requests();
176 }
177
178 #[pyo3(name = "get_subscriptions")]
179 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
180 let channels = self.get_subscriptions(instrument_id);
181
182 channels
184 .iter()
185 .map(|c| {
186 serde_json::to_value(c)
187 .ok()
188 .and_then(|v| v.as_str().map(String::from))
189 .unwrap_or_else(|| c.to_string())
190 })
191 .collect()
192 }
193
194 #[pyo3(name = "set_vip_level")]
198 fn py_set_vip_level(&self, vip_level: OKXVipLevel) {
199 self.set_vip_level(vip_level);
200 }
201
202 #[pyo3(name = "vip_level")]
204 #[getter]
205 fn py_vip_level(&self) -> OKXVipLevel {
206 self.vip_level()
207 }
208
209 #[pyo3(name = "connect")]
210 fn py_connect<'py>(
211 &mut self,
212 py: Python<'py>,
213 instruments: Vec<Py<PyAny>>,
214 callback: Py<PyAny>,
215 ) -> PyResult<Bound<'py, PyAny>> {
216 let mut instruments_any = Vec::new();
217 for inst in instruments {
218 let inst_any = pyobject_to_instrument_any(py, inst)?;
219 instruments_any.push(inst_any);
220 }
221
222 self.initialize_instruments_cache(instruments_any);
223
224 let mut client = self.clone();
225
226 pyo3_async_runtimes::tokio::future_into_py(py, async move {
227 client.connect().await.map_err(to_pyruntime_err)?;
228
229 let stream = client.stream();
230
231 tokio::spawn(async move {
232 tokio::pin!(stream);
233
234 while let Some(msg) = stream.next().await {
235 match msg {
236 NautilusWsMessage::Instrument(msg) => {
237 call_python_with_data(&callback, |py| {
238 instrument_any_to_pyobject(py, *msg)
239 });
240 }
241 NautilusWsMessage::Data(msg) => Python::attach(|py| {
242 for data in msg {
243 let py_obj = data_to_pycapsule(py, data);
244 call_python(py, &callback, py_obj);
245 }
246 }),
247 NautilusWsMessage::FundingRates(msg) => {
248 for data in msg {
249 call_python_with_data(&callback, |py| data.into_py_any(py));
250 }
251 }
252 NautilusWsMessage::OrderRejected(msg) => {
253 call_python_with_data(&callback, |py| msg.into_py_any(py))
254 }
255 NautilusWsMessage::OrderCancelRejected(msg) => {
256 call_python_with_data(&callback, |py| msg.into_py_any(py))
257 }
258 NautilusWsMessage::OrderModifyRejected(msg) => {
259 call_python_with_data(&callback, |py| msg.into_py_any(py))
260 }
261 NautilusWsMessage::ExecutionReports(msg) => {
262 for report in msg {
263 match report {
264 ExecutionReport::Order(report) => {
265 call_python_with_data(&callback, |py| {
266 report.into_py_any(py)
267 })
268 }
269 ExecutionReport::Fill(report) => {
270 call_python_with_data(&callback, |py| {
271 report.into_py_any(py)
272 })
273 }
274 };
275 }
276 }
277 NautilusWsMessage::Deltas(msg) => Python::attach(|py| {
278 let py_obj =
279 data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(msg)));
280 call_python(py, &callback, py_obj);
281 }),
282 NautilusWsMessage::AccountUpdate(msg) => {
283 call_python_with_data(&callback, |py| msg.into_py_any(py));
284 }
285 NautilusWsMessage::Reconnected => {} NautilusWsMessage::Error(msg) => {
287 call_python_with_data(&callback, |py| msg.into_py_any(py));
288 }
289 NautilusWsMessage::Raw(msg) => {
290 tracing::debug!("Received raw message, skipping: {msg}");
291 }
292 }
293 }
294 });
295
296 Ok(())
297 })
298 }
299
300 #[pyo3(name = "wait_until_active")]
301 fn py_wait_until_active<'py>(
302 &self,
303 py: Python<'py>,
304 timeout_secs: f64,
305 ) -> PyResult<Bound<'py, PyAny>> {
306 let client = self.clone();
307
308 pyo3_async_runtimes::tokio::future_into_py(py, async move {
309 client
310 .wait_until_active(timeout_secs)
311 .await
312 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
313 Ok(())
314 })
315 }
316
317 #[pyo3(name = "close")]
318 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
319 let mut client = self.clone();
320
321 pyo3_async_runtimes::tokio::future_into_py(py, async move {
322 if let Err(e) = client.close().await {
323 log::error!("Error on close: {e}");
324 }
325 Ok(())
326 })
327 }
328
329 #[pyo3(name = "subscribe_instruments")]
330 fn py_subscribe_instruments<'py>(
331 &self,
332 py: Python<'py>,
333 instrument_type: OKXInstrumentType,
334 ) -> PyResult<Bound<'py, PyAny>> {
335 let client = self.clone();
336
337 pyo3_async_runtimes::tokio::future_into_py(py, async move {
338 if let Err(e) = client.subscribe_instruments(instrument_type).await {
339 log::error!("Failed to subscribe to instruments '{instrument_type}': {e}");
340 }
341 Ok(())
342 })
343 }
344
345 #[pyo3(name = "subscribe_instrument")]
346 fn py_subscribe_instrument<'py>(
347 &self,
348 py: Python<'py>,
349 instrument_id: InstrumentId,
350 ) -> PyResult<Bound<'py, PyAny>> {
351 let client = self.clone();
352
353 pyo3_async_runtimes::tokio::future_into_py(py, async move {
354 if let Err(e) = client.subscribe_instrument(instrument_id).await {
355 log::error!("Failed to subscribe to instrument {instrument_id}: {e}");
356 }
357 Ok(())
358 })
359 }
360
361 #[pyo3(name = "subscribe_book")]
362 fn py_subscribe_book<'py>(
363 &self,
364 py: Python<'py>,
365 instrument_id: InstrumentId,
366 ) -> PyResult<Bound<'py, PyAny>> {
367 let client = self.clone();
368
369 pyo3_async_runtimes::tokio::future_into_py(py, async move {
370 client
371 .subscribe_book(instrument_id)
372 .await
373 .map_err(to_pyvalue_err)
374 })
375 }
376
377 #[pyo3(name = "subscribe_book50_l2_tbt")]
378 fn py_subscribe_book50_l2_tbt<'py>(
379 &self,
380 py: Python<'py>,
381 instrument_id: InstrumentId,
382 ) -> PyResult<Bound<'py, PyAny>> {
383 let client = self.clone();
384
385 pyo3_async_runtimes::tokio::future_into_py(py, async move {
386 if let Err(e) = client.subscribe_book50_l2_tbt(instrument_id).await {
387 log::error!("Failed to subscribe to book50_tbt: {e}");
388 }
389 Ok(())
390 })
391 }
392
393 #[pyo3(name = "subscribe_book_l2_tbt")]
394 fn py_subscribe_book_l2_tbt<'py>(
395 &self,
396 py: Python<'py>,
397 instrument_id: InstrumentId,
398 ) -> PyResult<Bound<'py, PyAny>> {
399 let client = self.clone();
400
401 pyo3_async_runtimes::tokio::future_into_py(py, async move {
402 if let Err(e) = client.subscribe_book_l2_tbt(instrument_id).await {
403 log::error!("Failed to subscribe to books_l2_tbt: {e}");
404 }
405 Ok(())
406 })
407 }
408
409 #[pyo3(name = "subscribe_book_with_depth")]
410 fn py_subscribe_book_with_depth<'py>(
411 &self,
412 py: Python<'py>,
413 instrument_id: InstrumentId,
414 depth: u16,
415 ) -> PyResult<Bound<'py, PyAny>> {
416 let client = self.clone();
417
418 pyo3_async_runtimes::tokio::future_into_py(py, async move {
419 client
420 .subscribe_book_with_depth(instrument_id, depth)
421 .await
422 .map_err(to_pyvalue_err)
423 })
424 }
425
426 #[pyo3(name = "subscribe_book_depth5")]
427 fn py_subscribe_book_depth5<'py>(
428 &self,
429 py: Python<'py>,
430 instrument_id: InstrumentId,
431 ) -> PyResult<Bound<'py, PyAny>> {
432 let client = self.clone();
433
434 pyo3_async_runtimes::tokio::future_into_py(py, async move {
435 if let Err(e) = client.subscribe_book_depth5(instrument_id).await {
436 log::error!("Failed to subscribe to books5: {e}");
437 }
438 Ok(())
439 })
440 }
441
442 #[pyo3(name = "subscribe_quotes")]
443 fn py_subscribe_quotes<'py>(
444 &self,
445 py: Python<'py>,
446 instrument_id: InstrumentId,
447 ) -> PyResult<Bound<'py, PyAny>> {
448 let client = self.clone();
449
450 pyo3_async_runtimes::tokio::future_into_py(py, async move {
451 if let Err(e) = client.subscribe_quotes(instrument_id).await {
452 log::error!("Failed to subscribe to quotes: {e}");
453 }
454 Ok(())
455 })
456 }
457
458 #[pyo3(name = "subscribe_trades")]
459 fn py_subscribe_trades<'py>(
460 &self,
461 py: Python<'py>,
462 instrument_id: InstrumentId,
463 aggregated: bool,
464 ) -> PyResult<Bound<'py, PyAny>> {
465 let client = self.clone();
466
467 pyo3_async_runtimes::tokio::future_into_py(py, async move {
468 if let Err(e) = client.subscribe_trades(instrument_id, aggregated).await {
469 log::error!("Failed to subscribe to trades: {e}");
470 }
471 Ok(())
472 })
473 }
474
475 #[pyo3(name = "subscribe_bars")]
476 fn py_subscribe_bars<'py>(
477 &self,
478 py: Python<'py>,
479 bar_type: BarType,
480 ) -> PyResult<Bound<'py, PyAny>> {
481 let client = self.clone();
482
483 pyo3_async_runtimes::tokio::future_into_py(py, async move {
484 if let Err(e) = client.subscribe_bars(bar_type).await {
485 log::error!("Failed to subscribe to bars: {e}");
486 }
487 Ok(())
488 })
489 }
490
491 #[pyo3(name = "unsubscribe_book")]
492 fn py_unsubscribe_book<'py>(
493 &self,
494 py: Python<'py>,
495 instrument_id: InstrumentId,
496 ) -> PyResult<Bound<'py, PyAny>> {
497 let client = self.clone();
498
499 pyo3_async_runtimes::tokio::future_into_py(py, async move {
500 if let Err(e) = client.unsubscribe_book(instrument_id).await {
501 log::error!("Failed to unsubscribe from order book: {e}");
502 }
503 Ok(())
504 })
505 }
506
507 #[pyo3(name = "unsubscribe_book_depth5")]
508 fn py_unsubscribe_book_depth5<'py>(
509 &self,
510 py: Python<'py>,
511 instrument_id: InstrumentId,
512 ) -> PyResult<Bound<'py, PyAny>> {
513 let client = self.clone();
514
515 pyo3_async_runtimes::tokio::future_into_py(py, async move {
516 if let Err(e) = client.unsubscribe_book_depth5(instrument_id).await {
517 log::error!("Failed to unsubscribe from books5: {e}");
518 }
519 Ok(())
520 })
521 }
522
523 #[pyo3(name = "unsubscribe_book50_l2_tbt")]
524 fn py_unsubscribe_book50_l2_tbt<'py>(
525 &self,
526 py: Python<'py>,
527 instrument_id: InstrumentId,
528 ) -> PyResult<Bound<'py, PyAny>> {
529 let client = self.clone();
530
531 pyo3_async_runtimes::tokio::future_into_py(py, async move {
532 if let Err(e) = client.unsubscribe_book50_l2_tbt(instrument_id).await {
533 log::error!("Failed to unsubscribe from books50_l2_tbt: {e}");
534 }
535 Ok(())
536 })
537 }
538
539 #[pyo3(name = "unsubscribe_book_l2_tbt")]
540 fn py_unsubscribe_book_l2_tbt<'py>(
541 &self,
542 py: Python<'py>,
543 instrument_id: InstrumentId,
544 ) -> PyResult<Bound<'py, PyAny>> {
545 let client = self.clone();
546
547 pyo3_async_runtimes::tokio::future_into_py(py, async move {
548 if let Err(e) = client.unsubscribe_book_l2_tbt(instrument_id).await {
549 log::error!("Failed to unsubscribe from books_l2_tbt: {e}");
550 }
551 Ok(())
552 })
553 }
554
555 #[pyo3(name = "unsubscribe_quotes")]
556 fn py_unsubscribe_quotes<'py>(
557 &self,
558 py: Python<'py>,
559 instrument_id: InstrumentId,
560 ) -> PyResult<Bound<'py, PyAny>> {
561 let client = self.clone();
562
563 pyo3_async_runtimes::tokio::future_into_py(py, async move {
564 if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
565 log::error!("Failed to unsubscribe from quotes: {e}");
566 }
567 Ok(())
568 })
569 }
570
571 #[pyo3(name = "unsubscribe_trades")]
572 fn py_unsubscribe_trades<'py>(
573 &self,
574 py: Python<'py>,
575 instrument_id: InstrumentId,
576 aggregated: bool,
577 ) -> PyResult<Bound<'py, PyAny>> {
578 let client = self.clone();
579
580 pyo3_async_runtimes::tokio::future_into_py(py, async move {
581 if let Err(e) = client.unsubscribe_trades(instrument_id, aggregated).await {
582 log::error!("Failed to unsubscribe from trades: {e}");
583 }
584 Ok(())
585 })
586 }
587
588 #[pyo3(name = "unsubscribe_bars")]
589 fn py_unsubscribe_bars<'py>(
590 &self,
591 py: Python<'py>,
592 bar_type: BarType,
593 ) -> PyResult<Bound<'py, PyAny>> {
594 let client = self.clone();
595
596 pyo3_async_runtimes::tokio::future_into_py(py, async move {
597 if let Err(e) = client.unsubscribe_bars(bar_type).await {
598 log::error!("Failed to unsubscribe from bars: {e}");
599 }
600 Ok(())
601 })
602 }
603
604 #[pyo3(name = "subscribe_ticker")]
605 fn py_subscribe_ticker<'py>(
606 &self,
607 py: Python<'py>,
608 instrument_id: InstrumentId,
609 ) -> PyResult<Bound<'py, PyAny>> {
610 let client = self.clone();
611
612 pyo3_async_runtimes::tokio::future_into_py(py, async move {
613 if let Err(e) = client.subscribe_ticker(instrument_id).await {
614 log::error!("Failed to subscribe to ticker: {e}");
615 }
616 Ok(())
617 })
618 }
619
620 #[pyo3(name = "unsubscribe_ticker")]
621 fn py_unsubscribe_ticker<'py>(
622 &self,
623 py: Python<'py>,
624 instrument_id: InstrumentId,
625 ) -> PyResult<Bound<'py, PyAny>> {
626 let client = self.clone();
627
628 pyo3_async_runtimes::tokio::future_into_py(py, async move {
629 if let Err(e) = client.unsubscribe_ticker(instrument_id).await {
630 log::error!("Failed to unsubscribe from ticker: {e}");
631 }
632 Ok(())
633 })
634 }
635
636 #[pyo3(name = "subscribe_mark_prices")]
637 fn py_subscribe_mark_prices<'py>(
638 &self,
639 py: Python<'py>,
640 instrument_id: InstrumentId,
641 ) -> PyResult<Bound<'py, PyAny>> {
642 let client = self.clone();
643
644 pyo3_async_runtimes::tokio::future_into_py(py, async move {
645 if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
646 log::error!("Failed to subscribe to mark prices: {e}");
647 }
648 Ok(())
649 })
650 }
651
652 #[pyo3(name = "unsubscribe_mark_prices")]
653 fn py_unsubscribe_mark_prices<'py>(
654 &self,
655 py: Python<'py>,
656 instrument_id: InstrumentId,
657 ) -> PyResult<Bound<'py, PyAny>> {
658 let client = self.clone();
659
660 pyo3_async_runtimes::tokio::future_into_py(py, async move {
661 if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
662 log::error!("Failed to unsubscribe from mark prices: {e}");
663 }
664 Ok(())
665 })
666 }
667
668 #[pyo3(name = "subscribe_index_prices")]
669 fn py_subscribe_index_prices<'py>(
670 &self,
671 py: Python<'py>,
672 instrument_id: InstrumentId,
673 ) -> PyResult<Bound<'py, PyAny>> {
674 let client = self.clone();
675
676 pyo3_async_runtimes::tokio::future_into_py(py, async move {
677 if let Err(e) = client.subscribe_index_prices(instrument_id).await {
678 log::error!("Failed to subscribe to index prices: {e}");
679 }
680 Ok(())
681 })
682 }
683
684 #[pyo3(name = "unsubscribe_index_prices")]
685 fn py_unsubscribe_index_prices<'py>(
686 &self,
687 py: Python<'py>,
688 instrument_id: InstrumentId,
689 ) -> PyResult<Bound<'py, PyAny>> {
690 let client = self.clone();
691
692 pyo3_async_runtimes::tokio::future_into_py(py, async move {
693 if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
694 log::error!("Failed to unsubscribe from index prices: {e}");
695 }
696 Ok(())
697 })
698 }
699
700 #[pyo3(name = "subscribe_funding_rates")]
701 fn py_subscribe_funding_rates<'py>(
702 &self,
703 py: Python<'py>,
704 instrument_id: InstrumentId,
705 ) -> PyResult<Bound<'py, PyAny>> {
706 let client = self.clone();
707
708 pyo3_async_runtimes::tokio::future_into_py(py, async move {
709 if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
710 log::error!("Failed to subscribe to funding rates: {e}");
711 }
712 Ok(())
713 })
714 }
715
716 #[pyo3(name = "unsubscribe_funding_rates")]
717 fn py_unsubscribe_funding_rates<'py>(
718 &self,
719 py: Python<'py>,
720 instrument_id: InstrumentId,
721 ) -> PyResult<Bound<'py, PyAny>> {
722 let client = self.clone();
723
724 pyo3_async_runtimes::tokio::future_into_py(py, async move {
725 if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
726 log::error!("Failed to unsubscribe from funding rates: {e}");
727 }
728 Ok(())
729 })
730 }
731
732 #[pyo3(name = "subscribe_orders")]
733 fn py_subscribe_orders<'py>(
734 &self,
735 py: Python<'py>,
736 instrument_type: OKXInstrumentType,
737 ) -> PyResult<Bound<'py, PyAny>> {
738 let client = self.clone();
739
740 pyo3_async_runtimes::tokio::future_into_py(py, async move {
741 if let Err(e) = client.subscribe_orders(instrument_type).await {
742 log::error!("Failed to subscribe to orders '{instrument_type}': {e}");
743 }
744 Ok(())
745 })
746 }
747
748 #[pyo3(name = "unsubscribe_orders")]
749 fn py_unsubscribe_orders<'py>(
750 &self,
751 py: Python<'py>,
752 instrument_type: OKXInstrumentType,
753 ) -> PyResult<Bound<'py, PyAny>> {
754 let client = self.clone();
755
756 pyo3_async_runtimes::tokio::future_into_py(py, async move {
757 if let Err(e) = client.unsubscribe_orders(instrument_type).await {
758 log::error!("Failed to unsubscribe from orders '{instrument_type}': {e}");
759 }
760 Ok(())
761 })
762 }
763
764 #[pyo3(name = "subscribe_orders_algo")]
765 fn py_subscribe_orders_algo<'py>(
766 &self,
767 py: Python<'py>,
768 instrument_type: OKXInstrumentType,
769 ) -> PyResult<Bound<'py, PyAny>> {
770 let client = self.clone();
771
772 pyo3_async_runtimes::tokio::future_into_py(py, async move {
773 if let Err(e) = client.subscribe_orders_algo(instrument_type).await {
774 log::error!("Failed to subscribe to algo orders '{instrument_type}': {e}");
775 }
776 Ok(())
777 })
778 }
779
780 #[pyo3(name = "unsubscribe_orders_algo")]
781 fn py_unsubscribe_orders_algo<'py>(
782 &self,
783 py: Python<'py>,
784 instrument_type: OKXInstrumentType,
785 ) -> PyResult<Bound<'py, PyAny>> {
786 let client = self.clone();
787
788 pyo3_async_runtimes::tokio::future_into_py(py, async move {
789 if let Err(e) = client.unsubscribe_orders_algo(instrument_type).await {
790 log::error!("Failed to unsubscribe from algo orders '{instrument_type}': {e}");
791 }
792 Ok(())
793 })
794 }
795
796 #[pyo3(name = "subscribe_fills")]
797 fn py_subscribe_fills<'py>(
798 &self,
799 py: Python<'py>,
800 instrument_type: OKXInstrumentType,
801 ) -> PyResult<Bound<'py, PyAny>> {
802 let client = self.clone();
803
804 pyo3_async_runtimes::tokio::future_into_py(py, async move {
805 if let Err(e) = client.subscribe_fills(instrument_type).await {
806 log::error!("Failed to subscribe to fills '{instrument_type}': {e}");
807 }
808 Ok(())
809 })
810 }
811
812 #[pyo3(name = "unsubscribe_fills")]
813 fn py_unsubscribe_fills<'py>(
814 &self,
815 py: Python<'py>,
816 instrument_type: OKXInstrumentType,
817 ) -> PyResult<Bound<'py, PyAny>> {
818 let client = self.clone();
819
820 pyo3_async_runtimes::tokio::future_into_py(py, async move {
821 if let Err(e) = client.unsubscribe_fills(instrument_type).await {
822 log::error!("Failed to unsubscribe from fills '{instrument_type}': {e}");
823 }
824 Ok(())
825 })
826 }
827
828 #[pyo3(name = "subscribe_account")]
829 fn py_subscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
830 let client = self.clone();
831
832 pyo3_async_runtimes::tokio::future_into_py(py, async move {
833 if let Err(e) = client.subscribe_account().await {
834 log::error!("Failed to subscribe to account: {e}");
835 }
836 Ok(())
837 })
838 }
839
840 #[pyo3(name = "unsubscribe_account")]
841 fn py_unsubscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
842 let client = self.clone();
843
844 pyo3_async_runtimes::tokio::future_into_py(py, async move {
845 if let Err(e) = client.unsubscribe_account().await {
846 log::error!("Failed to unsubscribe from account: {e}");
847 }
848 Ok(())
849 })
850 }
851
852 #[pyo3(name = "submit_order")]
853 #[pyo3(signature = (
854 trader_id,
855 strategy_id,
856 instrument_id,
857 td_mode,
858 client_order_id,
859 order_side,
860 order_type,
861 quantity,
862 time_in_force=None,
863 price=None,
864 trigger_price=None,
865 post_only=None,
866 reduce_only=None,
867 quote_quantity=None,
868 position_side=None,
869 ))]
870 #[allow(clippy::too_many_arguments)]
871 fn py_submit_order<'py>(
872 &self,
873 py: Python<'py>,
874 trader_id: TraderId,
875 strategy_id: StrategyId,
876 instrument_id: InstrumentId,
877 td_mode: OKXTradeMode,
878 client_order_id: ClientOrderId,
879 order_side: OrderSide,
880 order_type: OrderType,
881 quantity: Quantity,
882 time_in_force: Option<TimeInForce>,
883 price: Option<Price>,
884 trigger_price: Option<Price>,
885 post_only: Option<bool>,
886 reduce_only: Option<bool>,
887 quote_quantity: Option<bool>,
888 position_side: Option<PositionSide>,
889 ) -> PyResult<Bound<'py, PyAny>> {
890 let client = self.clone();
891
892 pyo3_async_runtimes::tokio::future_into_py(py, async move {
893 client
894 .submit_order(
895 trader_id,
896 strategy_id,
897 instrument_id,
898 td_mode,
899 client_order_id,
900 order_side,
901 order_type,
902 quantity,
903 time_in_force,
904 price,
905 trigger_price,
906 post_only,
907 reduce_only,
908 quote_quantity,
909 position_side,
910 )
911 .await
912 .map_err(to_pyvalue_err)
913 })
914 }
915
916 #[pyo3(name = "cancel_order", signature = (
917 trader_id,
918 strategy_id,
919 instrument_id,
920 client_order_id=None,
921 venue_order_id=None,
922 ))]
923 #[allow(clippy::too_many_arguments)]
924 fn py_cancel_order<'py>(
925 &self,
926 py: Python<'py>,
927 trader_id: TraderId,
928 strategy_id: StrategyId,
929 instrument_id: InstrumentId,
930 client_order_id: Option<ClientOrderId>,
931 venue_order_id: Option<VenueOrderId>,
932 ) -> PyResult<Bound<'py, PyAny>> {
933 let client = self.clone();
934
935 pyo3_async_runtimes::tokio::future_into_py(py, async move {
936 client
937 .cancel_order(
938 trader_id,
939 strategy_id,
940 instrument_id,
941 client_order_id,
942 venue_order_id,
943 )
944 .await
945 .map_err(to_pyvalue_err)
946 })
947 }
948
949 #[pyo3(name = "modify_order")]
950 #[pyo3(signature = (
951 trader_id,
952 strategy_id,
953 instrument_id,
954 client_order_id=None,
955 venue_order_id=None,
956 price=None,
957 quantity=None,
958 ))]
959 #[allow(clippy::too_many_arguments)]
960 fn py_modify_order<'py>(
961 &self,
962 py: Python<'py>,
963 trader_id: TraderId,
964 strategy_id: StrategyId,
965 instrument_id: InstrumentId,
966 client_order_id: Option<ClientOrderId>,
967 venue_order_id: Option<VenueOrderId>,
968 price: Option<Price>,
969 quantity: Option<Quantity>,
970 ) -> PyResult<Bound<'py, PyAny>> {
971 let client = self.clone();
972
973 pyo3_async_runtimes::tokio::future_into_py(py, async move {
974 client
975 .modify_order(
976 trader_id,
977 strategy_id,
978 instrument_id,
979 client_order_id,
980 price,
981 quantity,
982 venue_order_id,
983 )
984 .await
985 .map_err(to_pyvalue_err)
986 })
987 }
988
989 #[allow(clippy::type_complexity)]
990 #[pyo3(name = "batch_submit_orders")]
991 fn py_batch_submit_orders<'py>(
992 &self,
993 py: Python<'py>,
994 orders: Vec<Py<PyAny>>,
995 ) -> PyResult<Bound<'py, PyAny>> {
996 let mut domain_orders = Vec::with_capacity(orders.len());
997
998 for obj in orders {
999 let (
1000 instrument_type,
1001 instrument_id,
1002 td_mode,
1003 client_order_id,
1004 order_side,
1005 order_type,
1006 quantity,
1007 position_side,
1008 price,
1009 trigger_price,
1010 post_only,
1011 reduce_only,
1012 ): (
1013 String,
1014 InstrumentId,
1015 String,
1016 ClientOrderId,
1017 OrderSide,
1018 OrderType,
1019 Quantity,
1020 Option<PositionSide>,
1021 Option<Price>,
1022 Option<Price>,
1023 Option<bool>,
1024 Option<bool>,
1025 ) = obj
1026 .extract(py)
1027 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
1028
1029 let inst_type =
1030 OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1031 let trade_mode = OKXTradeMode::from_str(&td_mode).map_err(to_pyvalue_err)?;
1032
1033 domain_orders.push((
1034 inst_type,
1035 instrument_id,
1036 trade_mode,
1037 client_order_id,
1038 order_side,
1039 position_side,
1040 order_type,
1041 quantity,
1042 price,
1043 trigger_price,
1044 post_only,
1045 reduce_only,
1046 ));
1047 }
1048
1049 let client = self.clone();
1050
1051 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1052 client
1053 .batch_submit_orders(domain_orders)
1054 .await
1055 .map_err(to_pyvalue_err)
1056 })
1057 }
1058
1059 #[pyo3(name = "batch_cancel_orders")]
1061 fn py_batch_cancel_orders<'py>(
1062 &self,
1063 py: Python<'py>,
1064 orders: Vec<Py<PyAny>>,
1065 ) -> PyResult<Bound<'py, PyAny>> {
1066 let mut domain_orders = Vec::with_capacity(orders.len());
1067
1068 for obj in orders {
1069 let (instrument_type, instrument_id, client_order_id, order_id): (
1070 String,
1071 InstrumentId,
1072 Option<ClientOrderId>,
1073 Option<String>,
1074 ) = obj
1075 .extract(py)
1076 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
1077 let inst_type =
1078 OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1079 domain_orders.push((inst_type, instrument_id, client_order_id, order_id));
1080 }
1081
1082 let client = self.clone();
1083
1084 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1085 client
1086 .batch_cancel_orders(domain_orders)
1087 .await
1088 .map_err(to_pyvalue_err)
1089 })
1090 }
1091
1092 #[pyo3(name = "batch_modify_orders")]
1093 fn py_batch_modify_orders<'py>(
1094 &self,
1095 py: Python<'py>,
1096 orders: Vec<Py<PyAny>>,
1097 ) -> PyResult<Bound<'py, PyAny>> {
1098 let mut domain_orders = Vec::with_capacity(orders.len());
1099
1100 for obj in orders {
1101 let (
1102 instrument_type,
1103 instrument_id,
1104 client_order_id,
1105 new_client_order_id,
1106 price,
1107 quantity,
1108 ): (
1109 String,
1110 InstrumentId,
1111 ClientOrderId,
1112 ClientOrderId,
1113 Option<Price>,
1114 Option<Quantity>,
1115 ) = obj
1116 .extract(py)
1117 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
1118 let inst_type =
1119 OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1120 domain_orders.push((
1121 inst_type,
1122 instrument_id,
1123 client_order_id,
1124 new_client_order_id,
1125 price,
1126 quantity,
1127 ));
1128 }
1129
1130 let client = self.clone();
1131
1132 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1133 client
1134 .batch_modify_orders(domain_orders)
1135 .await
1136 .map_err(to_pyvalue_err)
1137 })
1138 }
1139
1140 #[pyo3(name = "mass_cancel_orders")]
1141 fn py_mass_cancel_orders<'py>(
1142 &self,
1143 py: Python<'py>,
1144 instrument_id: InstrumentId,
1145 ) -> PyResult<Bound<'py, PyAny>> {
1146 let client = self.clone();
1147
1148 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1149 client
1150 .mass_cancel_orders(instrument_id)
1151 .await
1152 .map_err(to_pyvalue_err)
1153 })
1154 }
1155}
1156
1157pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
1158 if let Err(e) = callback.call1(py, (py_obj,)) {
1159 tracing::error!("Error calling Python: {e}");
1160 }
1161}
1162
1163fn call_python_with_data<F>(callback: &Py<PyAny>, data_converter: F)
1164where
1165 F: FnOnce(Python) -> PyResult<Py<PyAny>>,
1166{
1167 Python::attach(|py| match data_converter(py) {
1168 Ok(py_obj) => call_python(py, callback, py_obj),
1169 Err(e) => tracing::error!("Failed to convert data to Python object: {e}"),
1170 });
1171}