1use std::str::FromStr;
45
46use futures_util::StreamExt;
47use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
48use nautilus_model::{
49 data::{BarType, Data, OrderBookDeltas_API},
50 enums::{OrderSide, OrderType, PositionSide, TimeInForce},
51 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
52 python::{
53 data::data_to_pycapsule,
54 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
55 },
56 types::{Price, Quantity},
57};
58use pyo3::{IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
59
60use crate::{
61 common::enums::{OKXInstrumentType, OKXTradeMode},
62 websocket::{
63 OKXWebSocketClient,
64 messages::{ExecutionReport, NautilusWsMessage, OKXWebSocketError},
65 },
66};
67
68#[pyo3::pymethods]
69impl OKXWebSocketError {
70 #[getter]
71 pub fn code(&self) -> &str {
72 &self.code
73 }
74
75 #[getter]
76 pub fn message(&self) -> &str {
77 &self.message
78 }
79
80 #[getter]
81 pub fn conn_id(&self) -> Option<&str> {
82 self.conn_id.as_deref()
83 }
84
85 #[getter]
86 pub fn ts_event(&self) -> u64 {
87 self.timestamp
88 }
89
90 fn __repr__(&self) -> String {
91 format!(
92 "OKXWebSocketError(code='{}', message='{}', conn_id={:?}, ts_event={})",
93 self.code, self.message, self.conn_id, self.timestamp
94 )
95 }
96}
97
98#[pymethods]
99impl OKXWebSocketClient {
100 #[new]
101 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
102 fn py_new(
103 url: Option<String>,
104 api_key: Option<String>,
105 api_secret: Option<String>,
106 api_passphrase: Option<String>,
107 account_id: Option<AccountId>,
108 heartbeat: Option<u64>,
109 ) -> PyResult<Self> {
110 Self::new(
111 url,
112 api_key,
113 api_secret,
114 api_passphrase,
115 account_id,
116 heartbeat,
117 )
118 .map_err(to_pyvalue_err)
119 }
120
121 #[staticmethod]
122 #[pyo3(name = "with_credentials")]
123 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
124 fn py_with_credentials(
125 url: Option<String>,
126 api_key: Option<String>,
127 api_secret: Option<String>,
128 api_passphrase: Option<String>,
129 account_id: Option<AccountId>,
130 heartbeat: Option<u64>,
131 ) -> PyResult<Self> {
132 Self::with_credentials(
133 url,
134 api_key,
135 api_secret,
136 api_passphrase,
137 account_id,
138 heartbeat,
139 )
140 .map_err(to_pyvalue_err)
141 }
142
143 #[staticmethod]
144 #[pyo3(name = "from_env")]
145 fn py_from_env() -> PyResult<Self> {
146 Self::from_env().map_err(to_pyvalue_err)
147 }
148
149 #[getter]
150 #[pyo3(name = "url")]
151 #[must_use]
152 pub fn py_url(&self) -> &str {
153 self.url()
154 }
155
156 #[getter]
157 #[pyo3(name = "api_key")]
158 #[must_use]
159 pub fn py_api_key(&self) -> Option<&str> {
160 self.api_key()
161 }
162
163 #[pyo3(name = "is_active")]
164 fn py_is_active(&mut self) -> bool {
165 self.is_active()
166 }
167
168 #[pyo3(name = "is_closed")]
169 fn py_is_closed(&mut self) -> bool {
170 self.is_closed()
171 }
172
173 #[pyo3(name = "cancel_all_requests")]
174 pub fn py_cancel_all_requests(&self) {
175 self.cancel_all_requests();
176 }
177
178 #[pyo3(name = "get_subscriptions")]
179 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
180 let channels = self.get_subscriptions(instrument_id);
181
182 channels
184 .iter()
185 .map(|c| {
186 serde_json::to_value(c)
187 .ok()
188 .and_then(|v| v.as_str().map(String::from))
189 .unwrap_or_else(|| c.to_string())
190 })
191 .collect()
192 }
193
194 #[pyo3(name = "connect")]
195 fn py_connect<'py>(
196 &mut self,
197 py: Python<'py>,
198 instruments: Vec<PyObject>,
199 callback: PyObject,
200 ) -> PyResult<Bound<'py, PyAny>> {
201 let mut instruments_any = Vec::new();
202 for inst in instruments {
203 let inst_any = pyobject_to_instrument_any(py, inst)?;
204 instruments_any.push(inst_any);
205 }
206
207 self.initialize_instruments_cache(instruments_any);
208
209 let mut client = self.clone();
210
211 pyo3_async_runtimes::tokio::future_into_py(py, async move {
212 client.connect().await.map_err(to_pyruntime_err)?;
213
214 let stream = client.stream();
215
216 tokio::spawn(async move {
217 tokio::pin!(stream);
218
219 while let Some(msg) = stream.next().await {
220 match msg {
221 NautilusWsMessage::Instrument(msg) => {
222 call_python_with_data(&callback, |py| {
223 instrument_any_to_pyobject(py, *msg)
224 });
225 }
226 NautilusWsMessage::Data(msg) => Python::with_gil(|py| {
227 for data in msg {
228 let py_obj = data_to_pycapsule(py, data);
229 call_python(py, &callback, py_obj);
230 }
231 }),
232 NautilusWsMessage::FundingRates(msg) => {
233 for data in msg {
234 call_python_with_data(&callback, |py| data.into_py_any(py));
235 }
236 }
237 NautilusWsMessage::OrderRejected(msg) => {
238 call_python_with_data(&callback, |py| msg.into_py_any(py))
239 }
240 NautilusWsMessage::OrderCancelRejected(msg) => {
241 call_python_with_data(&callback, |py| msg.into_py_any(py))
242 }
243 NautilusWsMessage::OrderModifyRejected(msg) => {
244 call_python_with_data(&callback, |py| msg.into_py_any(py))
245 }
246 NautilusWsMessage::ExecutionReports(msg) => {
247 for report in msg {
248 match report {
249 ExecutionReport::Order(report) => {
250 call_python_with_data(&callback, |py| {
251 report.into_py_any(py)
252 })
253 }
254 ExecutionReport::Fill(report) => {
255 call_python_with_data(&callback, |py| {
256 report.into_py_any(py)
257 })
258 }
259 };
260 }
261 }
262 NautilusWsMessage::Deltas(msg) => Python::with_gil(|py| {
263 let py_obj =
264 data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(msg)));
265 call_python(py, &callback, py_obj);
266 }),
267 NautilusWsMessage::AccountUpdate(msg) => {
268 call_python_with_data(&callback, |py| msg.py_to_dict(py));
269 }
270 NautilusWsMessage::Reconnected => {} NautilusWsMessage::Error(msg) => {
272 call_python_with_data(&callback, |py| msg.into_py_any(py));
273 }
274 NautilusWsMessage::Raw(msg) => {
275 tracing::debug!("Received raw message, skipping: {msg}");
276 }
277 }
278 }
279 });
280
281 Ok(())
282 })
283 }
284
285 #[pyo3(name = "wait_until_active")]
286 fn py_wait_until_active<'py>(
287 &self,
288 py: Python<'py>,
289 timeout_secs: f64,
290 ) -> PyResult<Bound<'py, PyAny>> {
291 let client = self.clone();
292
293 pyo3_async_runtimes::tokio::future_into_py(py, async move {
294 client
295 .wait_until_active(timeout_secs)
296 .await
297 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
298 Ok(())
299 })
300 }
301
302 #[pyo3(name = "close")]
303 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
304 let mut client = self.clone();
305
306 pyo3_async_runtimes::tokio::future_into_py(py, async move {
307 if let Err(e) = client.close().await {
308 log::error!("Error on close: {e}");
309 }
310 Ok(())
311 })
312 }
313
314 #[pyo3(name = "subscribe_instruments")]
315 fn py_subscribe_instruments<'py>(
316 &self,
317 py: Python<'py>,
318 instrument_type: OKXInstrumentType,
319 ) -> PyResult<Bound<'py, PyAny>> {
320 let client = self.clone();
321
322 pyo3_async_runtimes::tokio::future_into_py(py, async move {
323 if let Err(e) = client.subscribe_instruments(instrument_type).await {
324 log::error!("Failed to subscribe to instruments '{instrument_type}': {e}");
325 }
326 Ok(())
327 })
328 }
329
330 #[pyo3(name = "subscribe_instrument")]
331 fn py_subscribe_instrument<'py>(
332 &self,
333 py: Python<'py>,
334 instrument_id: InstrumentId,
335 ) -> PyResult<Bound<'py, PyAny>> {
336 let client = self.clone();
337
338 pyo3_async_runtimes::tokio::future_into_py(py, async move {
339 if let Err(e) = client.subscribe_instrument(instrument_id).await {
340 log::error!("Failed to subscribe to instrument {instrument_id}: {e}");
341 }
342 Ok(())
343 })
344 }
345
346 #[pyo3(name = "subscribe_book")]
347 fn py_subscribe_book<'py>(
348 &self,
349 py: Python<'py>,
350 instrument_id: InstrumentId,
351 ) -> PyResult<Bound<'py, PyAny>> {
352 let client = self.clone();
353
354 pyo3_async_runtimes::tokio::future_into_py(py, async move {
355 if let Err(e) = client.subscribe_book(instrument_id).await {
356 log::error!("Failed to subscribe to order book: {e}");
357 }
358 Ok(())
359 })
360 }
361
362 #[pyo3(name = "subscribe_book50_l2_tbt")]
363 fn py_subscribe_book50_l2_tbt<'py>(
364 &self,
365 py: Python<'py>,
366 instrument_id: InstrumentId,
367 ) -> PyResult<Bound<'py, PyAny>> {
368 let client = self.clone();
369
370 pyo3_async_runtimes::tokio::future_into_py(py, async move {
371 if let Err(e) = client.subscribe_books50_l2_tbt(instrument_id).await {
372 log::error!("Failed to subscribe to books50_tbt: {e}");
373 }
374 Ok(())
375 })
376 }
377
378 #[pyo3(name = "subscribe_book_l2_tbt")]
379 fn py_subscribe_book_l2_tbt<'py>(
380 &self,
381 py: Python<'py>,
382 instrument_id: InstrumentId,
383 ) -> PyResult<Bound<'py, PyAny>> {
384 let client = self.clone();
385
386 pyo3_async_runtimes::tokio::future_into_py(py, async move {
387 if let Err(e) = client.subscribe_book_l2_tbt(instrument_id).await {
388 log::error!("Failed to subscribe to books_l2_tbt: {e}");
389 }
390 Ok(())
391 })
392 }
393
394 #[pyo3(name = "subscribe_book_depth5")]
395 fn py_subscribe_book_depth5<'py>(
396 &self,
397 py: Python<'py>,
398 instrument_id: InstrumentId,
399 ) -> PyResult<Bound<'py, PyAny>> {
400 let client = self.clone();
401
402 pyo3_async_runtimes::tokio::future_into_py(py, async move {
403 if let Err(e) = client.subscribe_book_depth5(instrument_id).await {
404 log::error!("Failed to subscribe to books5: {e}");
405 }
406 Ok(())
407 })
408 }
409
410 #[pyo3(name = "subscribe_quotes")]
411 fn py_subscribe_quotes<'py>(
412 &self,
413 py: Python<'py>,
414 instrument_id: InstrumentId,
415 ) -> PyResult<Bound<'py, PyAny>> {
416 let client = self.clone();
417
418 pyo3_async_runtimes::tokio::future_into_py(py, async move {
419 if let Err(e) = client.subscribe_quotes(instrument_id).await {
420 log::error!("Failed to subscribe to quotes: {e}");
421 }
422 Ok(())
423 })
424 }
425
426 #[pyo3(name = "subscribe_trades")]
427 fn py_subscribe_trades<'py>(
428 &self,
429 py: Python<'py>,
430 instrument_id: InstrumentId,
431 aggregated: bool,
432 ) -> PyResult<Bound<'py, PyAny>> {
433 let client = self.clone();
434
435 pyo3_async_runtimes::tokio::future_into_py(py, async move {
436 if let Err(e) = client.subscribe_trades(instrument_id, aggregated).await {
437 log::error!("Failed to subscribe to trades: {e}");
438 }
439 Ok(())
440 })
441 }
442
443 #[pyo3(name = "subscribe_bars")]
444 fn py_subscribe_bars<'py>(
445 &self,
446 py: Python<'py>,
447 bar_type: BarType,
448 ) -> PyResult<Bound<'py, PyAny>> {
449 let client = self.clone();
450
451 pyo3_async_runtimes::tokio::future_into_py(py, async move {
452 if let Err(e) = client.subscribe_bars(bar_type).await {
453 log::error!("Failed to subscribe to bars: {e}");
454 }
455 Ok(())
456 })
457 }
458
459 #[pyo3(name = "unsubscribe_book")]
460 fn py_unsubscribe_book<'py>(
461 &self,
462 py: Python<'py>,
463 instrument_id: InstrumentId,
464 ) -> PyResult<Bound<'py, PyAny>> {
465 let client = self.clone();
466
467 pyo3_async_runtimes::tokio::future_into_py(py, async move {
468 if let Err(e) = client.unsubscribe_book(instrument_id).await {
469 log::error!("Failed to unsubscribe from order book: {e}");
470 }
471 Ok(())
472 })
473 }
474
475 #[pyo3(name = "unsubscribe_book_depth5")]
476 fn py_unsubscribe_book_depth5<'py>(
477 &self,
478 py: Python<'py>,
479 instrument_id: InstrumentId,
480 ) -> PyResult<Bound<'py, PyAny>> {
481 let client = self.clone();
482
483 pyo3_async_runtimes::tokio::future_into_py(py, async move {
484 if let Err(e) = client.unsubscribe_book_depth5(instrument_id).await {
485 log::error!("Failed to unsubscribe from books5: {e}");
486 }
487 Ok(())
488 })
489 }
490
491 #[pyo3(name = "unsubscribe_book50_l2_tbt")]
492 fn py_unsubscribe_book50_l2_tbt<'py>(
493 &self,
494 py: Python<'py>,
495 instrument_id: InstrumentId,
496 ) -> PyResult<Bound<'py, PyAny>> {
497 let client = self.clone();
498
499 pyo3_async_runtimes::tokio::future_into_py(py, async move {
500 if let Err(e) = client.unsubscribe_book50_l2_tbt(instrument_id).await {
501 log::error!("Failed to unsubscribe from books50_l2_tbt: {e}");
502 }
503 Ok(())
504 })
505 }
506
507 #[pyo3(name = "unsubscribe_book_l2_tbt")]
508 fn py_unsubscribe_book_l2_tbt<'py>(
509 &self,
510 py: Python<'py>,
511 instrument_id: InstrumentId,
512 ) -> PyResult<Bound<'py, PyAny>> {
513 let client = self.clone();
514
515 pyo3_async_runtimes::tokio::future_into_py(py, async move {
516 if let Err(e) = client.unsubscribe_book_l2_tbt(instrument_id).await {
517 log::error!("Failed to unsubscribe from books_l2_tbt: {e}");
518 }
519 Ok(())
520 })
521 }
522
523 #[pyo3(name = "unsubscribe_quotes")]
524 fn py_unsubscribe_quotes<'py>(
525 &self,
526 py: Python<'py>,
527 instrument_id: InstrumentId,
528 ) -> PyResult<Bound<'py, PyAny>> {
529 let client = self.clone();
530
531 pyo3_async_runtimes::tokio::future_into_py(py, async move {
532 if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
533 log::error!("Failed to unsubscribe from quotes: {e}");
534 }
535 Ok(())
536 })
537 }
538
539 #[pyo3(name = "unsubscribe_trades")]
540 fn py_unsubscribe_trades<'py>(
541 &self,
542 py: Python<'py>,
543 instrument_id: InstrumentId,
544 aggregated: bool,
545 ) -> PyResult<Bound<'py, PyAny>> {
546 let client = self.clone();
547
548 pyo3_async_runtimes::tokio::future_into_py(py, async move {
549 if let Err(e) = client.unsubscribe_trades(instrument_id, aggregated).await {
550 log::error!("Failed to unsubscribe from trades: {e}");
551 }
552 Ok(())
553 })
554 }
555
556 #[pyo3(name = "unsubscribe_bars")]
557 fn py_unsubscribe_bars<'py>(
558 &self,
559 py: Python<'py>,
560 bar_type: BarType,
561 ) -> PyResult<Bound<'py, PyAny>> {
562 let client = self.clone();
563
564 pyo3_async_runtimes::tokio::future_into_py(py, async move {
565 if let Err(e) = client.unsubscribe_bars(bar_type).await {
566 log::error!("Failed to unsubscribe from bars: {e}");
567 }
568 Ok(())
569 })
570 }
571
572 #[pyo3(name = "subscribe_ticker")]
573 fn py_subscribe_ticker<'py>(
574 &self,
575 py: Python<'py>,
576 instrument_id: InstrumentId,
577 ) -> PyResult<Bound<'py, PyAny>> {
578 let client = self.clone();
579
580 pyo3_async_runtimes::tokio::future_into_py(py, async move {
581 if let Err(e) = client.subscribe_ticker(instrument_id).await {
582 log::error!("Failed to subscribe to ticker: {e}");
583 }
584 Ok(())
585 })
586 }
587
588 #[pyo3(name = "unsubscribe_ticker")]
589 fn py_unsubscribe_ticker<'py>(
590 &self,
591 py: Python<'py>,
592 instrument_id: InstrumentId,
593 ) -> PyResult<Bound<'py, PyAny>> {
594 let client = self.clone();
595
596 pyo3_async_runtimes::tokio::future_into_py(py, async move {
597 if let Err(e) = client.unsubscribe_ticker(instrument_id).await {
598 log::error!("Failed to unsubscribe from ticker: {e}");
599 }
600 Ok(())
601 })
602 }
603
604 #[pyo3(name = "subscribe_mark_prices")]
605 fn py_subscribe_mark_prices<'py>(
606 &self,
607 py: Python<'py>,
608 instrument_id: InstrumentId,
609 ) -> PyResult<Bound<'py, PyAny>> {
610 let client = self.clone();
611
612 pyo3_async_runtimes::tokio::future_into_py(py, async move {
613 if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
614 log::error!("Failed to subscribe to mark prices: {e}");
615 }
616 Ok(())
617 })
618 }
619
620 #[pyo3(name = "unsubscribe_mark_prices")]
621 fn py_unsubscribe_mark_prices<'py>(
622 &self,
623 py: Python<'py>,
624 instrument_id: InstrumentId,
625 ) -> PyResult<Bound<'py, PyAny>> {
626 let client = self.clone();
627
628 pyo3_async_runtimes::tokio::future_into_py(py, async move {
629 if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
630 log::error!("Failed to unsubscribe from mark prices: {e}");
631 }
632 Ok(())
633 })
634 }
635
636 #[pyo3(name = "subscribe_index_prices")]
637 fn py_subscribe_index_prices<'py>(
638 &self,
639 py: Python<'py>,
640 instrument_id: InstrumentId,
641 ) -> PyResult<Bound<'py, PyAny>> {
642 let client = self.clone();
643
644 pyo3_async_runtimes::tokio::future_into_py(py, async move {
645 if let Err(e) = client.subscribe_index_prices(instrument_id).await {
646 log::error!("Failed to subscribe to index prices: {e}");
647 }
648 Ok(())
649 })
650 }
651
652 #[pyo3(name = "unsubscribe_index_prices")]
653 fn py_unsubscribe_index_prices<'py>(
654 &self,
655 py: Python<'py>,
656 instrument_id: InstrumentId,
657 ) -> PyResult<Bound<'py, PyAny>> {
658 let client = self.clone();
659
660 pyo3_async_runtimes::tokio::future_into_py(py, async move {
661 if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
662 log::error!("Failed to unsubscribe from index prices: {e}");
663 }
664 Ok(())
665 })
666 }
667
668 #[pyo3(name = "subscribe_funding_rates")]
669 fn py_subscribe_funding_rates<'py>(
670 &self,
671 py: Python<'py>,
672 instrument_id: InstrumentId,
673 ) -> PyResult<Bound<'py, PyAny>> {
674 let client = self.clone();
675
676 pyo3_async_runtimes::tokio::future_into_py(py, async move {
677 if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
678 log::error!("Failed to subscribe to funding rates: {e}");
679 }
680 Ok(())
681 })
682 }
683
684 #[pyo3(name = "unsubscribe_funding_rates")]
685 fn py_unsubscribe_funding_rates<'py>(
686 &self,
687 py: Python<'py>,
688 instrument_id: InstrumentId,
689 ) -> PyResult<Bound<'py, PyAny>> {
690 let client = self.clone();
691
692 pyo3_async_runtimes::tokio::future_into_py(py, async move {
693 if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
694 log::error!("Failed to unsubscribe from funding rates: {e}");
695 }
696 Ok(())
697 })
698 }
699
700 #[pyo3(name = "subscribe_orders")]
701 fn py_subscribe_orders<'py>(
702 &self,
703 py: Python<'py>,
704 instrument_type: OKXInstrumentType,
705 ) -> PyResult<Bound<'py, PyAny>> {
706 let client = self.clone();
707
708 pyo3_async_runtimes::tokio::future_into_py(py, async move {
709 if let Err(e) = client.subscribe_orders(instrument_type).await {
710 log::error!("Failed to subscribe to orders '{instrument_type}': {e}");
711 }
712 Ok(())
713 })
714 }
715
716 #[pyo3(name = "unsubscribe_orders")]
717 fn py_unsubscribe_orders<'py>(
718 &self,
719 py: Python<'py>,
720 instrument_type: OKXInstrumentType,
721 ) -> PyResult<Bound<'py, PyAny>> {
722 let client = self.clone();
723
724 pyo3_async_runtimes::tokio::future_into_py(py, async move {
725 if let Err(e) = client.unsubscribe_orders(instrument_type).await {
726 log::error!("Failed to unsubscribe from orders '{instrument_type}': {e}");
727 }
728 Ok(())
729 })
730 }
731
732 #[pyo3(name = "subscribe_fills")]
733 fn py_subscribe_fills<'py>(
734 &self,
735 py: Python<'py>,
736 instrument_type: OKXInstrumentType,
737 ) -> PyResult<Bound<'py, PyAny>> {
738 let client = self.clone();
739
740 pyo3_async_runtimes::tokio::future_into_py(py, async move {
741 if let Err(e) = client.subscribe_fills(instrument_type).await {
742 log::error!("Failed to subscribe to fills '{instrument_type}': {e}");
743 }
744 Ok(())
745 })
746 }
747
748 #[pyo3(name = "unsubscribe_fills")]
749 fn py_unsubscribe_fills<'py>(
750 &self,
751 py: Python<'py>,
752 instrument_type: OKXInstrumentType,
753 ) -> PyResult<Bound<'py, PyAny>> {
754 let client = self.clone();
755
756 pyo3_async_runtimes::tokio::future_into_py(py, async move {
757 if let Err(e) = client.unsubscribe_fills(instrument_type).await {
758 log::error!("Failed to unsubscribe from fills '{instrument_type}': {e}");
759 }
760 Ok(())
761 })
762 }
763
764 #[pyo3(name = "subscribe_account")]
765 fn py_subscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
766 let client = self.clone();
767
768 pyo3_async_runtimes::tokio::future_into_py(py, async move {
769 if let Err(e) = client.subscribe_account().await {
770 log::error!("Failed to subscribe to account: {e}");
771 }
772 Ok(())
773 })
774 }
775
776 #[pyo3(name = "unsubscribe_account")]
777 fn py_unsubscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
778 let client = self.clone();
779
780 pyo3_async_runtimes::tokio::future_into_py(py, async move {
781 if let Err(e) = client.unsubscribe_account().await {
782 log::error!("Failed to unsubscribe from account: {e}");
783 }
784 Ok(())
785 })
786 }
787
788 #[pyo3(name = "submit_order")]
789 #[pyo3(signature = (
790 trader_id,
791 strategy_id,
792 instrument_id,
793 td_mode,
794 client_order_id,
795 order_side,
796 order_type,
797 quantity,
798 time_in_force=None,
799 price=None,
800 trigger_price=None,
801 post_only=None,
802 reduce_only=None,
803 quote_quantity=None,
804 position_side=None,
805 ))]
806 #[allow(clippy::too_many_arguments)]
807 fn py_submit_order<'py>(
808 &self,
809 py: Python<'py>,
810 trader_id: TraderId,
811 strategy_id: StrategyId,
812 instrument_id: InstrumentId,
813 td_mode: OKXTradeMode,
814 client_order_id: ClientOrderId,
815 order_side: OrderSide,
816 order_type: OrderType,
817 quantity: Quantity,
818 time_in_force: Option<TimeInForce>,
819 price: Option<Price>,
820 trigger_price: Option<Price>,
821 post_only: Option<bool>,
822 reduce_only: Option<bool>,
823 quote_quantity: Option<bool>,
824 position_side: Option<PositionSide>,
825 ) -> PyResult<Bound<'py, PyAny>> {
826 let client = self.clone();
827
828 pyo3_async_runtimes::tokio::future_into_py(py, async move {
829 client
830 .submit_order(
831 trader_id,
832 strategy_id,
833 instrument_id,
834 td_mode,
835 client_order_id,
836 order_side,
837 order_type,
838 quantity,
839 time_in_force,
840 price,
841 trigger_price,
842 post_only,
843 reduce_only,
844 quote_quantity,
845 position_side,
846 )
847 .await
848 .map_err(to_pyvalue_err)
849 })
850 }
851
852 #[pyo3(name = "cancel_order")]
853 #[pyo3(signature = (
854 trader_id,
855 strategy_id,
856 instrument_id,
857 client_order_id=None,
858 venue_order_id=None,
859 ))]
860 #[allow(clippy::too_many_arguments)]
861 fn py_cancel_order<'py>(
862 &self,
863 py: Python<'py>,
864 trader_id: TraderId,
865 strategy_id: StrategyId,
866 instrument_id: InstrumentId,
867 client_order_id: Option<ClientOrderId>,
868 venue_order_id: Option<VenueOrderId>,
869 ) -> PyResult<Bound<'py, PyAny>> {
870 let client = self.clone();
871
872 pyo3_async_runtimes::tokio::future_into_py(py, async move {
873 client
874 .cancel_order(
875 trader_id,
876 strategy_id,
877 instrument_id,
878 client_order_id,
879 venue_order_id,
880 )
881 .await
882 .map_err(to_pyvalue_err)
883 })
884 }
885
886 #[pyo3(name = "modify_order")]
887 #[pyo3(signature = (
888 trader_id,
889 strategy_id,
890 instrument_id,
891 client_order_id=None,
892 venue_order_id=None,
893 price=None,
894 quantity=None,
895 ))]
896 #[allow(clippy::too_many_arguments)]
897 fn py_modify_order<'py>(
898 &self,
899 py: Python<'py>,
900 trader_id: TraderId,
901 strategy_id: StrategyId,
902 instrument_id: InstrumentId,
903 client_order_id: Option<ClientOrderId>,
904 venue_order_id: Option<VenueOrderId>,
905 price: Option<Price>,
906 quantity: Option<Quantity>,
907 ) -> PyResult<Bound<'py, PyAny>> {
908 let client = self.clone();
909
910 pyo3_async_runtimes::tokio::future_into_py(py, async move {
911 client
912 .modify_order(
913 trader_id,
914 strategy_id,
915 instrument_id,
916 client_order_id,
917 price,
918 quantity,
919 venue_order_id,
920 )
921 .await
922 .map_err(to_pyvalue_err)
923 })
924 }
925
926 #[allow(clippy::type_complexity)]
927 #[pyo3(name = "batch_submit_orders")]
928 fn py_batch_submit_orders<'py>(
929 &self,
930 py: Python<'py>,
931 orders: Vec<PyObject>,
932 ) -> PyResult<Bound<'py, PyAny>> {
933 let mut domain_orders = Vec::with_capacity(orders.len());
934
935 for obj in orders {
936 let (
937 instrument_type,
938 instrument_id,
939 td_mode,
940 client_order_id,
941 order_side,
942 order_type,
943 quantity,
944 position_side,
945 price,
946 trigger_price,
947 post_only,
948 reduce_only,
949 ): (
950 String,
951 InstrumentId,
952 String,
953 ClientOrderId,
954 OrderSide,
955 OrderType,
956 Quantity,
957 Option<PositionSide>,
958 Option<Price>,
959 Option<Price>,
960 Option<bool>,
961 Option<bool>,
962 ) = obj
963 .extract(py)
964 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
965
966 let inst_type =
967 OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
968 let trade_mode = OKXTradeMode::from_str(&td_mode).map_err(to_pyvalue_err)?;
969
970 domain_orders.push((
971 inst_type,
972 instrument_id,
973 trade_mode,
974 client_order_id,
975 order_side,
976 position_side,
977 order_type,
978 quantity,
979 price,
980 trigger_price,
981 post_only,
982 reduce_only,
983 ));
984 }
985
986 let client = self.clone();
987
988 pyo3_async_runtimes::tokio::future_into_py(py, async move {
989 client
990 .batch_submit_orders(domain_orders)
991 .await
992 .map_err(to_pyvalue_err)
993 })
994 }
995
996 #[pyo3(name = "batch_cancel_orders")]
998 fn py_batch_cancel_orders<'py>(
999 &self,
1000 py: Python<'py>,
1001 orders: Vec<PyObject>,
1002 ) -> PyResult<Bound<'py, PyAny>> {
1003 let mut domain_orders = Vec::with_capacity(orders.len());
1004
1005 for obj in orders {
1006 let (instrument_type, instrument_id, client_order_id, order_id): (
1007 String,
1008 InstrumentId,
1009 Option<ClientOrderId>,
1010 Option<String>,
1011 ) = obj
1012 .extract(py)
1013 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
1014 let inst_type =
1015 OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1016 domain_orders.push((inst_type, instrument_id, client_order_id, order_id));
1017 }
1018
1019 let client = self.clone();
1020
1021 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1022 client
1023 .batch_cancel_orders(domain_orders)
1024 .await
1025 .map_err(to_pyvalue_err)
1026 })
1027 }
1028
1029 #[pyo3(name = "batch_modify_orders")]
1030 fn py_batch_modify_orders<'py>(
1031 &self,
1032 py: Python<'py>,
1033 orders: Vec<PyObject>,
1034 ) -> PyResult<Bound<'py, PyAny>> {
1035 let mut domain_orders = Vec::with_capacity(orders.len());
1036
1037 for obj in orders {
1038 let (
1039 instrument_type,
1040 instrument_id,
1041 client_order_id,
1042 new_client_order_id,
1043 price,
1044 quantity,
1045 ): (
1046 String,
1047 InstrumentId,
1048 ClientOrderId,
1049 ClientOrderId,
1050 Option<Price>,
1051 Option<Quantity>,
1052 ) = obj
1053 .extract(py)
1054 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
1055 let inst_type =
1056 OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1057 domain_orders.push((
1058 inst_type,
1059 instrument_id,
1060 client_order_id,
1061 new_client_order_id,
1062 price,
1063 quantity,
1064 ));
1065 }
1066
1067 let client = self.clone();
1068
1069 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1070 client
1071 .batch_modify_orders(domain_orders)
1072 .await
1073 .map_err(to_pyvalue_err)
1074 })
1075 }
1076
1077 #[pyo3(name = "mass_cancel_orders")]
1078 fn py_mass_cancel_orders<'py>(
1079 &self,
1080 py: Python<'py>,
1081 instrument_id: InstrumentId,
1082 ) -> PyResult<Bound<'py, PyAny>> {
1083 let client = self.clone();
1084
1085 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1086 client
1087 .mass_cancel_orders(instrument_id)
1088 .await
1089 .map_err(to_pyvalue_err)
1090 })
1091 }
1092}
1093
1094pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
1095 if let Err(e) = callback.call1(py, (py_obj,)) {
1096 tracing::error!("Error calling Python: {e}");
1097 }
1098}
1099
1100fn call_python_with_data<F>(callback: &PyObject, data_converter: F)
1101where
1102 F: FnOnce(Python) -> PyResult<PyObject>,
1103{
1104 Python::with_gil(|py| match data_converter(py) {
1105 Ok(py_obj) => call_python(py, callback, py_obj),
1106 Err(e) => tracing::error!("Failed to convert data to Python object: {e}"),
1107 });
1108}