1use std::str::FromStr;
45
46use futures_util::StreamExt;
47use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
48use nautilus_model::{
49 data::{BarType, Data, OrderBookDeltas_API},
50 enums::{OrderSide, OrderType, PositionSide, TimeInForce},
51 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
52 python::{
53 data::data_to_pycapsule,
54 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
55 },
56 types::{Price, Quantity},
57};
58use pyo3::{IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
59
60use crate::{
61 common::enums::{OKXInstrumentType, OKXTradeMode},
62 websocket::{
63 OKXWebSocketClient,
64 messages::{ExecutionReport, NautilusWsMessage, OKXWebSocketError},
65 },
66};
67
68#[pyo3::pymethods]
69impl OKXWebSocketError {
70 #[getter]
71 pub fn code(&self) -> &str {
72 &self.code
73 }
74
75 #[getter]
76 pub fn message(&self) -> &str {
77 &self.message
78 }
79
80 #[getter]
81 pub fn conn_id(&self) -> Option<&str> {
82 self.conn_id.as_deref()
83 }
84
85 #[getter]
86 pub fn ts_event(&self) -> u64 {
87 self.timestamp
88 }
89
90 fn __repr__(&self) -> String {
91 format!(
92 "OKXWebSocketError(code='{}', message='{}', conn_id={:?}, ts_event={})",
93 self.code, self.message, self.conn_id, self.timestamp
94 )
95 }
96}
97
98#[pymethods]
99impl OKXWebSocketClient {
100 #[new]
101 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
102 fn py_new(
103 url: Option<String>,
104 api_key: Option<String>,
105 api_secret: Option<String>,
106 api_passphrase: Option<String>,
107 account_id: Option<AccountId>,
108 heartbeat: Option<u64>,
109 ) -> PyResult<Self> {
110 Self::new(
111 url,
112 api_key,
113 api_secret,
114 api_passphrase,
115 account_id,
116 heartbeat,
117 )
118 .map_err(to_pyvalue_err)
119 }
120
121 #[staticmethod]
122 #[pyo3(name = "with_credentials")]
123 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
124 fn py_with_credentials(
125 url: Option<String>,
126 api_key: Option<String>,
127 api_secret: Option<String>,
128 api_passphrase: Option<String>,
129 account_id: Option<AccountId>,
130 heartbeat: Option<u64>,
131 ) -> PyResult<Self> {
132 Self::with_credentials(
133 url,
134 api_key,
135 api_secret,
136 api_passphrase,
137 account_id,
138 heartbeat,
139 )
140 .map_err(to_pyvalue_err)
141 }
142
143 #[staticmethod]
144 #[pyo3(name = "from_env")]
145 fn py_from_env() -> PyResult<Self> {
146 Self::from_env().map_err(to_pyvalue_err)
147 }
148
149 #[getter]
150 #[pyo3(name = "url")]
151 #[must_use]
152 pub fn py_url(&self) -> &str {
153 self.url()
154 }
155
156 #[getter]
157 #[pyo3(name = "api_key")]
158 #[must_use]
159 pub fn py_api_key(&self) -> Option<&str> {
160 self.api_key()
161 }
162
163 #[pyo3(name = "is_active")]
164 fn py_is_active(&mut self) -> bool {
165 self.is_active()
166 }
167
168 #[pyo3(name = "is_closed")]
169 fn py_is_closed(&mut self) -> bool {
170 self.is_closed()
171 }
172
173 #[pyo3(name = "get_subscriptions")]
174 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
175 let channels = self.get_subscriptions(instrument_id);
176
177 channels
179 .iter()
180 .map(|c| {
181 serde_json::to_value(c)
182 .ok()
183 .and_then(|v| v.as_str().map(String::from))
184 .unwrap_or_else(|| c.to_string())
185 })
186 .collect()
187 }
188
189 #[pyo3(name = "connect")]
190 fn py_connect<'py>(
191 &mut self,
192 py: Python<'py>,
193 instruments: Vec<PyObject>,
194 callback: PyObject,
195 ) -> PyResult<Bound<'py, PyAny>> {
196 let mut instruments_any = Vec::new();
197 for inst in instruments {
198 let inst_any = pyobject_to_instrument_any(py, inst)?;
199 instruments_any.push(inst_any);
200 }
201
202 self.initialize_instruments_cache(instruments_any);
203
204 let mut client = self.clone();
205
206 pyo3_async_runtimes::tokio::future_into_py(py, async move {
207 client.connect().await.map_err(to_pyruntime_err)?;
208
209 let stream = client.stream();
210
211 tokio::spawn(async move {
212 tokio::pin!(stream);
213
214 while let Some(msg) = stream.next().await {
215 match msg {
216 NautilusWsMessage::Instrument(msg) => {
217 call_python_with_data(&callback, |py| {
218 instrument_any_to_pyobject(py, *msg)
219 });
220 }
221 NautilusWsMessage::Data(msg) => Python::with_gil(|py| {
222 for data in msg {
223 let py_obj = data_to_pycapsule(py, data);
224 call_python(py, &callback, py_obj);
225 }
226 }),
227 NautilusWsMessage::FundingRates(msg) => {
228 for data in msg {
229 call_python_with_data(&callback, |py| data.into_py_any(py));
230 }
231 }
232 NautilusWsMessage::OrderRejected(msg) => {
233 call_python_with_data(&callback, |py| msg.into_py_any(py))
234 }
235 NautilusWsMessage::OrderCancelRejected(msg) => {
236 call_python_with_data(&callback, |py| msg.into_py_any(py))
237 }
238 NautilusWsMessage::OrderModifyRejected(msg) => {
239 call_python_with_data(&callback, |py| msg.into_py_any(py))
240 }
241 NautilusWsMessage::ExecutionReports(msg) => {
242 for report in msg {
243 match report {
244 ExecutionReport::Order(report) => {
245 call_python_with_data(&callback, |py| {
246 report.into_py_any(py)
247 })
248 }
249 ExecutionReport::Fill(report) => {
250 call_python_with_data(&callback, |py| {
251 report.into_py_any(py)
252 })
253 }
254 };
255 }
256 }
257 NautilusWsMessage::Deltas(msg) => Python::with_gil(|py| {
258 let py_obj =
259 data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(msg)));
260 call_python(py, &callback, py_obj);
261 }),
262 NautilusWsMessage::AccountUpdate(msg) => {
263 call_python_with_data(&callback, |py| msg.py_to_dict(py));
264 }
265 NautilusWsMessage::Reconnected => {} NautilusWsMessage::Error(msg) => {
267 call_python_with_data(&callback, |py| msg.into_py_any(py));
268 }
269 NautilusWsMessage::Raw(msg) => {
270 tracing::debug!("Received raw message, skipping: {msg}");
271 }
272 }
273 }
274 });
275
276 Ok(())
277 })
278 }
279
280 #[pyo3(name = "wait_until_active")]
281 fn py_wait_until_active<'py>(
282 &self,
283 py: Python<'py>,
284 timeout_secs: f64,
285 ) -> PyResult<Bound<'py, PyAny>> {
286 let client = self.clone();
287
288 pyo3_async_runtimes::tokio::future_into_py(py, async move {
289 client
290 .wait_until_active(timeout_secs)
291 .await
292 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
293 Ok(())
294 })
295 }
296
297 #[pyo3(name = "close")]
298 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
299 let mut client = self.clone();
300
301 pyo3_async_runtimes::tokio::future_into_py(py, async move {
302 if let Err(e) = client.close().await {
303 log::error!("Error on close: {e}");
304 }
305 Ok(())
306 })
307 }
308
309 #[pyo3(name = "subscribe_instruments")]
310 fn py_subscribe_instruments<'py>(
311 &self,
312 py: Python<'py>,
313 instrument_type: OKXInstrumentType,
314 ) -> PyResult<Bound<'py, PyAny>> {
315 let client = self.clone();
316
317 pyo3_async_runtimes::tokio::future_into_py(py, async move {
318 if let Err(e) = client.subscribe_instruments(instrument_type).await {
319 log::error!("Failed to subscribe to instruments '{instrument_type}': {e}");
320 }
321 Ok(())
322 })
323 }
324
325 #[pyo3(name = "subscribe_instrument")]
326 fn py_subscribe_instrument<'py>(
327 &self,
328 py: Python<'py>,
329 instrument_id: InstrumentId,
330 ) -> PyResult<Bound<'py, PyAny>> {
331 let client = self.clone();
332
333 pyo3_async_runtimes::tokio::future_into_py(py, async move {
334 if let Err(e) = client.subscribe_instrument(instrument_id).await {
335 log::error!("Failed to subscribe to instrument {instrument_id}: {e}");
336 }
337 Ok(())
338 })
339 }
340
341 #[pyo3(name = "subscribe_book")]
342 fn py_subscribe_book<'py>(
343 &self,
344 py: Python<'py>,
345 instrument_id: InstrumentId,
346 ) -> PyResult<Bound<'py, PyAny>> {
347 let client = self.clone();
348
349 pyo3_async_runtimes::tokio::future_into_py(py, async move {
350 if let Err(e) = client.subscribe_book(instrument_id).await {
351 log::error!("Failed to subscribe to order book: {e}");
352 }
353 Ok(())
354 })
355 }
356
357 #[pyo3(name = "subscribe_book50_l2_tbt")]
358 fn py_subscribe_book50_l2_tbt<'py>(
359 &self,
360 py: Python<'py>,
361 instrument_id: InstrumentId,
362 ) -> PyResult<Bound<'py, PyAny>> {
363 let client = self.clone();
364
365 pyo3_async_runtimes::tokio::future_into_py(py, async move {
366 if let Err(e) = client.subscribe_books50_l2_tbt(instrument_id).await {
367 log::error!("Failed to subscribe to books50_tbt: {e}");
368 }
369 Ok(())
370 })
371 }
372
373 #[pyo3(name = "subscribe_book_l2_tbt")]
374 fn py_subscribe_book_l2_tbt<'py>(
375 &self,
376 py: Python<'py>,
377 instrument_id: InstrumentId,
378 ) -> PyResult<Bound<'py, PyAny>> {
379 let client = self.clone();
380
381 pyo3_async_runtimes::tokio::future_into_py(py, async move {
382 if let Err(e) = client.subscribe_book_l2_tbt(instrument_id).await {
383 log::error!("Failed to subscribe to books_l2_tbt: {e}");
384 }
385 Ok(())
386 })
387 }
388
389 #[pyo3(name = "subscribe_book_depth5")]
390 fn py_subscribe_book_depth5<'py>(
391 &self,
392 py: Python<'py>,
393 instrument_id: InstrumentId,
394 ) -> PyResult<Bound<'py, PyAny>> {
395 let client = self.clone();
396
397 pyo3_async_runtimes::tokio::future_into_py(py, async move {
398 if let Err(e) = client.subscribe_book_depth5(instrument_id).await {
399 log::error!("Failed to subscribe to books5: {e}");
400 }
401 Ok(())
402 })
403 }
404
405 #[pyo3(name = "subscribe_quotes")]
406 fn py_subscribe_quotes<'py>(
407 &self,
408 py: Python<'py>,
409 instrument_id: InstrumentId,
410 ) -> PyResult<Bound<'py, PyAny>> {
411 let client = self.clone();
412
413 pyo3_async_runtimes::tokio::future_into_py(py, async move {
414 if let Err(e) = client.subscribe_quotes(instrument_id).await {
415 log::error!("Failed to subscribe to quotes: {e}");
416 }
417 Ok(())
418 })
419 }
420
421 #[pyo3(name = "subscribe_trades")]
422 fn py_subscribe_trades<'py>(
423 &self,
424 py: Python<'py>,
425 instrument_id: InstrumentId,
426 aggregated: bool,
427 ) -> PyResult<Bound<'py, PyAny>> {
428 let client = self.clone();
429
430 pyo3_async_runtimes::tokio::future_into_py(py, async move {
431 if let Err(e) = client.subscribe_trades(instrument_id, aggregated).await {
432 log::error!("Failed to subscribe to trades: {e}");
433 }
434 Ok(())
435 })
436 }
437
438 #[pyo3(name = "subscribe_bars")]
439 fn py_subscribe_bars<'py>(
440 &self,
441 py: Python<'py>,
442 bar_type: BarType,
443 ) -> PyResult<Bound<'py, PyAny>> {
444 let client = self.clone();
445
446 pyo3_async_runtimes::tokio::future_into_py(py, async move {
447 if let Err(e) = client.subscribe_bars(bar_type).await {
448 log::error!("Failed to subscribe to bars: {e}");
449 }
450 Ok(())
451 })
452 }
453
454 #[pyo3(name = "unsubscribe_book")]
455 fn py_unsubscribe_book<'py>(
456 &self,
457 py: Python<'py>,
458 instrument_id: InstrumentId,
459 ) -> PyResult<Bound<'py, PyAny>> {
460 let client = self.clone();
461
462 pyo3_async_runtimes::tokio::future_into_py(py, async move {
463 if let Err(e) = client.unsubscribe_book(instrument_id).await {
464 log::error!("Failed to unsubscribe from order book: {e}");
465 }
466 Ok(())
467 })
468 }
469
470 #[pyo3(name = "unsubscribe_book_depth5")]
471 fn py_unsubscribe_book_depth5<'py>(
472 &self,
473 py: Python<'py>,
474 instrument_id: InstrumentId,
475 ) -> PyResult<Bound<'py, PyAny>> {
476 let client = self.clone();
477
478 pyo3_async_runtimes::tokio::future_into_py(py, async move {
479 if let Err(e) = client.unsubscribe_book_depth5(instrument_id).await {
480 log::error!("Failed to unsubscribe from books5: {e}");
481 }
482 Ok(())
483 })
484 }
485
486 #[pyo3(name = "unsubscribe_book50_l2_tbt")]
487 fn py_unsubscribe_book50_l2_tbt<'py>(
488 &self,
489 py: Python<'py>,
490 instrument_id: InstrumentId,
491 ) -> PyResult<Bound<'py, PyAny>> {
492 let client = self.clone();
493
494 pyo3_async_runtimes::tokio::future_into_py(py, async move {
495 if let Err(e) = client.unsubscribe_book50_l2_tbt(instrument_id).await {
496 log::error!("Failed to unsubscribe from books50_l2_tbt: {e}");
497 }
498 Ok(())
499 })
500 }
501
502 #[pyo3(name = "unsubscribe_book_l2_tbt")]
503 fn py_unsubscribe_book_l2_tbt<'py>(
504 &self,
505 py: Python<'py>,
506 instrument_id: InstrumentId,
507 ) -> PyResult<Bound<'py, PyAny>> {
508 let client = self.clone();
509
510 pyo3_async_runtimes::tokio::future_into_py(py, async move {
511 if let Err(e) = client.unsubscribe_book_l2_tbt(instrument_id).await {
512 log::error!("Failed to unsubscribe from books_l2_tbt: {e}");
513 }
514 Ok(())
515 })
516 }
517
518 #[pyo3(name = "unsubscribe_quotes")]
519 fn py_unsubscribe_quotes<'py>(
520 &self,
521 py: Python<'py>,
522 instrument_id: InstrumentId,
523 ) -> PyResult<Bound<'py, PyAny>> {
524 let client = self.clone();
525
526 pyo3_async_runtimes::tokio::future_into_py(py, async move {
527 if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
528 log::error!("Failed to unsubscribe from quotes: {e}");
529 }
530 Ok(())
531 })
532 }
533
534 #[pyo3(name = "unsubscribe_trades")]
535 fn py_unsubscribe_trades<'py>(
536 &self,
537 py: Python<'py>,
538 instrument_id: InstrumentId,
539 aggregated: bool,
540 ) -> PyResult<Bound<'py, PyAny>> {
541 let client = self.clone();
542
543 pyo3_async_runtimes::tokio::future_into_py(py, async move {
544 if let Err(e) = client.unsubscribe_trades(instrument_id, aggregated).await {
545 log::error!("Failed to unsubscribe from trades: {e}");
546 }
547 Ok(())
548 })
549 }
550
551 #[pyo3(name = "unsubscribe_bars")]
552 fn py_unsubscribe_bars<'py>(
553 &self,
554 py: Python<'py>,
555 bar_type: BarType,
556 ) -> PyResult<Bound<'py, PyAny>> {
557 let client = self.clone();
558
559 pyo3_async_runtimes::tokio::future_into_py(py, async move {
560 if let Err(e) = client.unsubscribe_bars(bar_type).await {
561 log::error!("Failed to unsubscribe from bars: {e}");
562 }
563 Ok(())
564 })
565 }
566
567 #[pyo3(name = "subscribe_ticker")]
568 fn py_subscribe_ticker<'py>(
569 &self,
570 py: Python<'py>,
571 instrument_id: InstrumentId,
572 ) -> PyResult<Bound<'py, PyAny>> {
573 let client = self.clone();
574
575 pyo3_async_runtimes::tokio::future_into_py(py, async move {
576 if let Err(e) = client.subscribe_ticker(instrument_id).await {
577 log::error!("Failed to subscribe to ticker: {e}");
578 }
579 Ok(())
580 })
581 }
582
583 #[pyo3(name = "unsubscribe_ticker")]
584 fn py_unsubscribe_ticker<'py>(
585 &self,
586 py: Python<'py>,
587 instrument_id: InstrumentId,
588 ) -> PyResult<Bound<'py, PyAny>> {
589 let client = self.clone();
590
591 pyo3_async_runtimes::tokio::future_into_py(py, async move {
592 if let Err(e) = client.unsubscribe_ticker(instrument_id).await {
593 log::error!("Failed to unsubscribe from ticker: {e}");
594 }
595 Ok(())
596 })
597 }
598
599 #[pyo3(name = "subscribe_mark_prices")]
600 fn py_subscribe_mark_prices<'py>(
601 &self,
602 py: Python<'py>,
603 instrument_id: InstrumentId,
604 ) -> PyResult<Bound<'py, PyAny>> {
605 let client = self.clone();
606
607 pyo3_async_runtimes::tokio::future_into_py(py, async move {
608 if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
609 log::error!("Failed to subscribe to mark prices: {e}");
610 }
611 Ok(())
612 })
613 }
614
615 #[pyo3(name = "unsubscribe_mark_prices")]
616 fn py_unsubscribe_mark_prices<'py>(
617 &self,
618 py: Python<'py>,
619 instrument_id: InstrumentId,
620 ) -> PyResult<Bound<'py, PyAny>> {
621 let client = self.clone();
622
623 pyo3_async_runtimes::tokio::future_into_py(py, async move {
624 if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
625 log::error!("Failed to unsubscribe from mark prices: {e}");
626 }
627 Ok(())
628 })
629 }
630
631 #[pyo3(name = "subscribe_index_prices")]
632 fn py_subscribe_index_prices<'py>(
633 &self,
634 py: Python<'py>,
635 instrument_id: InstrumentId,
636 ) -> PyResult<Bound<'py, PyAny>> {
637 let client = self.clone();
638
639 pyo3_async_runtimes::tokio::future_into_py(py, async move {
640 if let Err(e) = client.subscribe_index_prices(instrument_id).await {
641 log::error!("Failed to subscribe to index prices: {e}");
642 }
643 Ok(())
644 })
645 }
646
647 #[pyo3(name = "unsubscribe_index_prices")]
648 fn py_unsubscribe_index_prices<'py>(
649 &self,
650 py: Python<'py>,
651 instrument_id: InstrumentId,
652 ) -> PyResult<Bound<'py, PyAny>> {
653 let client = self.clone();
654
655 pyo3_async_runtimes::tokio::future_into_py(py, async move {
656 if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
657 log::error!("Failed to unsubscribe from index prices: {e}");
658 }
659 Ok(())
660 })
661 }
662
663 #[pyo3(name = "subscribe_funding_rates")]
664 fn py_subscribe_funding_rates<'py>(
665 &self,
666 py: Python<'py>,
667 instrument_id: InstrumentId,
668 ) -> PyResult<Bound<'py, PyAny>> {
669 let client = self.clone();
670
671 pyo3_async_runtimes::tokio::future_into_py(py, async move {
672 if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
673 log::error!("Failed to subscribe to funding rates: {e}");
674 }
675 Ok(())
676 })
677 }
678
679 #[pyo3(name = "unsubscribe_funding_rates")]
680 fn py_unsubscribe_funding_rates<'py>(
681 &self,
682 py: Python<'py>,
683 instrument_id: InstrumentId,
684 ) -> PyResult<Bound<'py, PyAny>> {
685 let client = self.clone();
686
687 pyo3_async_runtimes::tokio::future_into_py(py, async move {
688 if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
689 log::error!("Failed to unsubscribe from funding rates: {e}");
690 }
691 Ok(())
692 })
693 }
694
695 #[pyo3(name = "subscribe_orders")]
696 fn py_subscribe_orders<'py>(
697 &self,
698 py: Python<'py>,
699 instrument_type: OKXInstrumentType,
700 ) -> PyResult<Bound<'py, PyAny>> {
701 let client = self.clone();
702
703 pyo3_async_runtimes::tokio::future_into_py(py, async move {
704 if let Err(e) = client.subscribe_orders(instrument_type).await {
705 log::error!("Failed to subscribe to orders '{instrument_type}': {e}");
706 }
707 Ok(())
708 })
709 }
710
711 #[pyo3(name = "unsubscribe_orders")]
712 fn py_unsubscribe_orders<'py>(
713 &self,
714 py: Python<'py>,
715 instrument_type: OKXInstrumentType,
716 ) -> PyResult<Bound<'py, PyAny>> {
717 let client = self.clone();
718
719 pyo3_async_runtimes::tokio::future_into_py(py, async move {
720 if let Err(e) = client.unsubscribe_orders(instrument_type).await {
721 log::error!("Failed to unsubscribe from orders '{instrument_type}': {e}");
722 }
723 Ok(())
724 })
725 }
726
727 #[pyo3(name = "subscribe_fills")]
728 fn py_subscribe_fills<'py>(
729 &self,
730 py: Python<'py>,
731 instrument_type: OKXInstrumentType,
732 ) -> PyResult<Bound<'py, PyAny>> {
733 let client = self.clone();
734
735 pyo3_async_runtimes::tokio::future_into_py(py, async move {
736 if let Err(e) = client.subscribe_fills(instrument_type).await {
737 log::error!("Failed to subscribe to fills '{instrument_type}': {e}");
738 }
739 Ok(())
740 })
741 }
742
743 #[pyo3(name = "unsubscribe_fills")]
744 fn py_unsubscribe_fills<'py>(
745 &self,
746 py: Python<'py>,
747 instrument_type: OKXInstrumentType,
748 ) -> PyResult<Bound<'py, PyAny>> {
749 let client = self.clone();
750
751 pyo3_async_runtimes::tokio::future_into_py(py, async move {
752 if let Err(e) = client.unsubscribe_fills(instrument_type).await {
753 log::error!("Failed to unsubscribe from fills '{instrument_type}': {e}");
754 }
755 Ok(())
756 })
757 }
758
759 #[pyo3(name = "subscribe_account")]
760 fn py_subscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
761 let client = self.clone();
762
763 pyo3_async_runtimes::tokio::future_into_py(py, async move {
764 if let Err(e) = client.subscribe_account().await {
765 log::error!("Failed to subscribe to account: {e}");
766 }
767 Ok(())
768 })
769 }
770
771 #[pyo3(name = "unsubscribe_account")]
772 fn py_unsubscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
773 let client = self.clone();
774
775 pyo3_async_runtimes::tokio::future_into_py(py, async move {
776 if let Err(e) = client.unsubscribe_account().await {
777 log::error!("Failed to unsubscribe from account: {e}");
778 }
779 Ok(())
780 })
781 }
782
783 #[pyo3(name = "submit_order")]
785 #[pyo3(signature = (
786 trader_id,
787 strategy_id,
788 instrument_id,
789 td_mode,
790 client_order_id,
791 order_side,
792 order_type,
793 quantity,
794 time_in_force=None,
795 price=None,
796 trigger_price=None,
797 post_only=None,
798 reduce_only=None,
799 quote_quantity=None,
800 position_side=None,
801 ))]
802 #[allow(clippy::too_many_arguments)]
803 fn py_submit_order<'py>(
804 &self,
805 py: Python<'py>,
806 trader_id: TraderId,
807 strategy_id: StrategyId,
808 instrument_id: InstrumentId,
809 td_mode: OKXTradeMode,
810 client_order_id: ClientOrderId,
811 order_side: OrderSide,
812 order_type: OrderType,
813 quantity: Quantity,
814 time_in_force: Option<TimeInForce>,
815 price: Option<Price>,
816 trigger_price: Option<Price>,
817 post_only: Option<bool>,
818 reduce_only: Option<bool>,
819 quote_quantity: Option<bool>,
820 position_side: Option<PositionSide>,
821 ) -> PyResult<Bound<'py, PyAny>> {
822 let client = self.clone();
823
824 pyo3_async_runtimes::tokio::future_into_py(py, async move {
825 client
826 .submit_order(
827 trader_id,
828 strategy_id,
829 instrument_id,
830 td_mode,
831 client_order_id,
832 order_side,
833 order_type,
834 quantity,
835 time_in_force,
836 price,
837 trigger_price,
838 post_only,
839 reduce_only,
840 quote_quantity,
841 position_side,
842 )
843 .await
844 .map_err(to_pyvalue_err)
845 })
846 }
847
848 #[pyo3(name = "cancel_order")]
850 #[pyo3(signature = (
851 trader_id,
852 strategy_id,
853 instrument_id,
854 client_order_id=None,
855 venue_order_id=None,
856 ))]
857 #[allow(clippy::too_many_arguments)]
858 fn py_cancel_order<'py>(
859 &self,
860 py: Python<'py>,
861 trader_id: TraderId,
862 strategy_id: StrategyId,
863 instrument_id: InstrumentId,
864 client_order_id: Option<ClientOrderId>,
865 venue_order_id: Option<VenueOrderId>,
866 ) -> PyResult<Bound<'py, PyAny>> {
867 let client = self.clone();
868
869 pyo3_async_runtimes::tokio::future_into_py(py, async move {
870 client
871 .cancel_order(
872 trader_id,
873 strategy_id,
874 instrument_id,
875 client_order_id,
876 venue_order_id,
877 )
878 .await
879 .map_err(to_pyvalue_err)
880 })
881 }
882
883 #[pyo3(name = "modify_order")]
885 #[pyo3(signature = (
886 trader_id,
887 strategy_id,
888 instrument_id,
889 client_order_id=None,
890 venue_order_id=None,
891 price=None,
892 quantity=None,
893 ))]
894 #[allow(clippy::too_many_arguments)]
895 fn py_modify_order<'py>(
896 &self,
897 py: Python<'py>,
898 trader_id: TraderId,
899 strategy_id: StrategyId,
900 instrument_id: InstrumentId,
901 client_order_id: Option<ClientOrderId>,
902 venue_order_id: Option<VenueOrderId>,
903 price: Option<Price>,
904 quantity: Option<Quantity>,
905 ) -> PyResult<Bound<'py, PyAny>> {
906 let client = self.clone();
907
908 pyo3_async_runtimes::tokio::future_into_py(py, async move {
909 client
910 .modify_order(
911 trader_id,
912 strategy_id,
913 instrument_id,
914 client_order_id,
915 price,
916 quantity,
917 venue_order_id,
918 )
919 .await
920 .map_err(to_pyvalue_err)
921 })
922 }
923
924 #[allow(clippy::type_complexity)]
926 #[pyo3(name = "batch_submit_orders")]
927 fn py_batch_submit_orders<'py>(
928 &self,
929 py: Python<'py>,
930 orders: Vec<PyObject>,
931 ) -> PyResult<Bound<'py, PyAny>> {
932 let mut domain_orders = Vec::with_capacity(orders.len());
933
934 for obj in orders {
935 let (
936 instrument_type,
937 instrument_id,
938 td_mode,
939 client_order_id,
940 order_side,
941 order_type,
942 quantity,
943 position_side,
944 price,
945 trigger_price,
946 post_only,
947 reduce_only,
948 ): (
949 String,
950 InstrumentId,
951 String,
952 ClientOrderId,
953 OrderSide,
954 OrderType,
955 Quantity,
956 Option<PositionSide>,
957 Option<Price>,
958 Option<Price>,
959 Option<bool>,
960 Option<bool>,
961 ) = obj
962 .extract(py)
963 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
964
965 let inst_type =
966 OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
967 let trade_mode = OKXTradeMode::from_str(&td_mode).map_err(to_pyvalue_err)?;
968
969 domain_orders.push((
970 inst_type,
971 instrument_id,
972 trade_mode,
973 client_order_id,
974 order_side,
975 position_side,
976 order_type,
977 quantity,
978 price,
979 trigger_price,
980 post_only,
981 reduce_only,
982 ));
983 }
984
985 let client = self.clone();
986
987 pyo3_async_runtimes::tokio::future_into_py(py, async move {
988 client
989 .batch_submit_orders(domain_orders)
990 .await
991 .map_err(to_pyvalue_err)
992 })
993 }
994
995 #[pyo3(name = "batch_cancel_orders")]
997 fn py_batch_cancel_orders<'py>(
998 &self,
999 py: Python<'py>,
1000 orders: Vec<PyObject>,
1001 ) -> PyResult<Bound<'py, PyAny>> {
1002 let mut domain_orders = Vec::with_capacity(orders.len());
1003
1004 for obj in orders {
1005 let (instrument_type, instrument_id, client_order_id, order_id): (
1006 String,
1007 InstrumentId,
1008 Option<ClientOrderId>,
1009 Option<String>,
1010 ) = obj
1011 .extract(py)
1012 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
1013 let inst_type =
1014 OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1015 domain_orders.push((inst_type, instrument_id, client_order_id, order_id));
1016 }
1017
1018 let client = self.clone();
1019
1020 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1021 client
1022 .batch_cancel_orders(domain_orders)
1023 .await
1024 .map_err(to_pyvalue_err)
1025 })
1026 }
1027
1028 #[pyo3(name = "batch_modify_orders")]
1030 fn py_batch_modify_orders<'py>(
1031 &self,
1032 py: Python<'py>,
1033 orders: Vec<PyObject>,
1034 ) -> PyResult<Bound<'py, PyAny>> {
1035 let mut domain_orders = Vec::with_capacity(orders.len());
1036
1037 for obj in orders {
1038 let (
1039 instrument_type,
1040 instrument_id,
1041 client_order_id,
1042 new_client_order_id,
1043 price,
1044 quantity,
1045 ): (
1046 String,
1047 InstrumentId,
1048 ClientOrderId,
1049 ClientOrderId,
1050 Option<Price>,
1051 Option<Quantity>,
1052 ) = obj
1053 .extract(py)
1054 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
1055 let inst_type =
1056 OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1057 domain_orders.push((
1058 inst_type,
1059 instrument_id,
1060 client_order_id,
1061 new_client_order_id,
1062 price,
1063 quantity,
1064 ));
1065 }
1066
1067 let client = self.clone();
1068
1069 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1070 client
1071 .batch_modify_orders(domain_orders)
1072 .await
1073 .map_err(to_pyvalue_err)
1074 })
1075 }
1076}
1077
1078pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
1079 if let Err(e) = callback.call1(py, (py_obj,)) {
1080 tracing::error!("Error calling Python: {e}");
1081 }
1082}
1083
1084fn call_python_with_data<F>(callback: &PyObject, data_converter: F)
1085where
1086 F: FnOnce(Python) -> PyResult<PyObject>,
1087{
1088 Python::with_gil(|py| match data_converter(py) {
1089 Ok(py_obj) => call_python(py, callback, py_obj),
1090 Err(e) => tracing::error!("Failed to convert data to Python object: {e}"),
1091 });
1092}