1use std::str::FromStr;
45
46use futures_util::StreamExt;
47use nautilus_common::live::get_runtime;
48use nautilus_core::python::{call_python, to_pyruntime_err, to_pyvalue_err};
49use nautilus_model::{
50 data::{BarType, Data, OrderBookDeltas_API},
51 enums::{OrderSide, OrderType, PositionSide, TimeInForce},
52 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
53 python::{
54 data::data_to_pycapsule,
55 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
56 },
57 types::{Price, Quantity},
58};
59use pyo3::{IntoPyObjectExt, prelude::*};
60use ustr::Ustr;
61
62use crate::{
63 common::enums::{OKXInstrumentType, OKXTradeMode, OKXVipLevel},
64 websocket::{
65 OKXWebSocketClient,
66 messages::{ExecutionReport, NautilusWsMessage, OKXWebSocketError},
67 },
68};
69
70#[pyo3::pymethods]
71impl OKXWebSocketError {
72 #[getter]
73 pub fn code(&self) -> &str {
74 &self.code
75 }
76
77 #[getter]
78 pub fn message(&self) -> &str {
79 &self.message
80 }
81
82 #[getter]
83 pub fn conn_id(&self) -> Option<&str> {
84 self.conn_id.as_deref()
85 }
86
87 #[getter]
88 pub fn ts_event(&self) -> u64 {
89 self.timestamp
90 }
91
92 fn __repr__(&self) -> String {
93 format!(
94 "OKXWebSocketError(code='{}', message='{}', conn_id={:?}, ts_event={})",
95 self.code, self.message, self.conn_id, self.timestamp
96 )
97 }
98}
99
100#[pymethods]
101impl OKXWebSocketClient {
102 #[new]
103 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
104 fn py_new(
105 url: Option<String>,
106 api_key: Option<String>,
107 api_secret: Option<String>,
108 api_passphrase: Option<String>,
109 account_id: Option<AccountId>,
110 heartbeat: Option<u64>,
111 ) -> PyResult<Self> {
112 Self::new(
113 url,
114 api_key,
115 api_secret,
116 api_passphrase,
117 account_id,
118 heartbeat,
119 )
120 .map_err(to_pyvalue_err)
121 }
122
123 #[staticmethod]
124 #[pyo3(name = "with_credentials")]
125 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
126 fn py_with_credentials(
127 url: Option<String>,
128 api_key: Option<String>,
129 api_secret: Option<String>,
130 api_passphrase: Option<String>,
131 account_id: Option<AccountId>,
132 heartbeat: Option<u64>,
133 ) -> PyResult<Self> {
134 Self::with_credentials(
135 url,
136 api_key,
137 api_secret,
138 api_passphrase,
139 account_id,
140 heartbeat,
141 )
142 .map_err(to_pyvalue_err)
143 }
144
145 #[staticmethod]
146 #[pyo3(name = "from_env")]
147 fn py_from_env() -> PyResult<Self> {
148 Self::from_env().map_err(to_pyvalue_err)
149 }
150
151 #[getter]
152 #[pyo3(name = "url")]
153 #[must_use]
154 pub fn py_url(&self) -> &str {
155 self.url()
156 }
157
158 #[getter]
159 #[pyo3(name = "api_key")]
160 #[must_use]
161 pub fn py_api_key(&self) -> Option<&str> {
162 self.api_key()
163 }
164
165 #[getter]
166 #[pyo3(name = "api_key_masked")]
167 #[must_use]
168 pub fn py_api_key_masked(&self) -> Option<String> {
169 self.api_key_masked()
170 }
171
172 #[pyo3(name = "is_active")]
173 fn py_is_active(&mut self) -> bool {
174 self.is_active()
175 }
176
177 #[pyo3(name = "is_closed")]
178 fn py_is_closed(&mut self) -> bool {
179 self.is_closed()
180 }
181
182 #[pyo3(name = "cancel_all_requests")]
183 pub fn py_cancel_all_requests(&self) {
184 self.cancel_all_requests();
185 }
186
187 #[pyo3(name = "get_subscriptions")]
188 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
189 let channels = self.get_subscriptions(instrument_id);
190
191 channels
193 .iter()
194 .map(|c| {
195 serde_json::to_value(c)
196 .ok()
197 .and_then(|v| v.as_str().map(String::from))
198 .unwrap_or_else(|| c.to_string())
199 })
200 .collect()
201 }
202
203 #[pyo3(name = "set_vip_level")]
207 fn py_set_vip_level(&self, vip_level: OKXVipLevel) {
208 self.set_vip_level(vip_level);
209 }
210
211 #[pyo3(name = "vip_level")]
213 #[getter]
214 fn py_vip_level(&self) -> OKXVipLevel {
215 self.vip_level()
216 }
217
218 #[pyo3(name = "connect")]
219 fn py_connect<'py>(
220 &mut self,
221 py: Python<'py>,
222 instruments: Vec<Py<PyAny>>,
223 callback: Py<PyAny>,
224 ) -> PyResult<Bound<'py, PyAny>> {
225 let mut instruments_any = Vec::new();
226 for inst in instruments {
227 let inst_any = pyobject_to_instrument_any(py, inst)?;
228 instruments_any.push(inst_any);
229 }
230
231 self.cache_instruments(instruments_any);
232
233 let mut client = self.clone();
234
235 pyo3_async_runtimes::tokio::future_into_py(py, async move {
236 client.connect().await.map_err(to_pyruntime_err)?;
237
238 let stream = client.stream();
239
240 get_runtime().spawn(async move {
242 let _client = client;
243 tokio::pin!(stream);
244
245 while let Some(msg) = stream.next().await {
246 match msg {
247 NautilusWsMessage::Instrument(msg) => {
248 call_python_with_data(&callback, |py| {
249 instrument_any_to_pyobject(py, *msg)
250 });
251 }
252 NautilusWsMessage::Data(msg) => Python::attach(|py| {
253 for data in msg {
254 let py_obj = data_to_pycapsule(py, data);
255 call_python(py, &callback, py_obj);
256 }
257 }),
258 NautilusWsMessage::FundingRates(msg) => {
259 for data in msg {
260 call_python_with_data(&callback, |py| data.into_py_any(py));
261 }
262 }
263 NautilusWsMessage::OrderAccepted(msg) => {
264 call_python_with_data(&callback, |py| msg.into_py_any(py));
265 }
266 NautilusWsMessage::OrderCanceled(msg) => {
267 call_python_with_data(&callback, |py| msg.into_py_any(py));
268 }
269 NautilusWsMessage::OrderExpired(msg) => {
270 call_python_with_data(&callback, |py| msg.into_py_any(py));
271 }
272 NautilusWsMessage::OrderRejected(msg) => {
273 call_python_with_data(&callback, |py| msg.into_py_any(py));
274 }
275 NautilusWsMessage::OrderCancelRejected(msg) => {
276 call_python_with_data(&callback, |py| msg.into_py_any(py));
277 }
278 NautilusWsMessage::OrderModifyRejected(msg) => {
279 call_python_with_data(&callback, |py| msg.into_py_any(py));
280 }
281 NautilusWsMessage::OrderTriggered(msg) => {
282 call_python_with_data(&callback, |py| msg.into_py_any(py));
283 }
284 NautilusWsMessage::OrderUpdated(msg) => {
285 call_python_with_data(&callback, |py| msg.into_py_any(py));
286 }
287 NautilusWsMessage::ExecutionReports(msg) => {
288 for report in msg {
289 match report {
290 ExecutionReport::Order(report) => {
291 call_python_with_data(&callback, |py| {
292 report.into_py_any(py)
293 });
294 }
295 ExecutionReport::Fill(report) => {
296 call_python_with_data(&callback, |py| {
297 report.into_py_any(py)
298 });
299 }
300 };
301 }
302 }
303 NautilusWsMessage::Deltas(msg) => Python::attach(|py| {
304 let py_obj =
305 data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(msg)));
306 call_python(py, &callback, py_obj);
307 }),
308 NautilusWsMessage::AccountUpdate(msg) => {
309 call_python_with_data(&callback, |py| msg.into_py_any(py));
310 }
311 NautilusWsMessage::PositionUpdate(msg) => {
312 call_python_with_data(&callback, |py| msg.into_py_any(py));
313 }
314 NautilusWsMessage::Reconnected => {} NautilusWsMessage::Authenticated => {} NautilusWsMessage::Error(msg) => {
317 call_python_with_data(&callback, |py| msg.into_py_any(py));
318 }
319 NautilusWsMessage::Raw(msg) => {
320 log::debug!("Received raw message, skipping: {msg}");
321 }
322 }
323 }
324 });
325
326 Ok(())
327 })
328 }
329
330 #[pyo3(name = "wait_until_active")]
331 fn py_wait_until_active<'py>(
332 &self,
333 py: Python<'py>,
334 timeout_secs: f64,
335 ) -> PyResult<Bound<'py, PyAny>> {
336 let client = self.clone();
337
338 pyo3_async_runtimes::tokio::future_into_py(py, async move {
339 client
340 .wait_until_active(timeout_secs)
341 .await
342 .map_err(to_pyruntime_err)?;
343 Ok(())
344 })
345 }
346
347 #[pyo3(name = "close")]
348 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
349 let mut client = self.clone();
350
351 pyo3_async_runtimes::tokio::future_into_py(py, async move {
352 if let Err(e) = client.close().await {
353 log::error!("Error on close: {e}");
354 }
355 Ok(())
356 })
357 }
358
359 #[pyo3(name = "subscribe_instruments")]
360 fn py_subscribe_instruments<'py>(
361 &self,
362 py: Python<'py>,
363 instrument_type: OKXInstrumentType,
364 ) -> PyResult<Bound<'py, PyAny>> {
365 let client = self.clone();
366
367 pyo3_async_runtimes::tokio::future_into_py(py, async move {
368 if let Err(e) = client.subscribe_instruments(instrument_type).await {
369 log::error!("Failed to subscribe to instruments '{instrument_type}': {e}");
370 }
371 Ok(())
372 })
373 }
374
375 #[pyo3(name = "subscribe_instrument")]
376 fn py_subscribe_instrument<'py>(
377 &self,
378 py: Python<'py>,
379 instrument_id: InstrumentId,
380 ) -> PyResult<Bound<'py, PyAny>> {
381 let client = self.clone();
382
383 pyo3_async_runtimes::tokio::future_into_py(py, async move {
384 if let Err(e) = client.subscribe_instrument(instrument_id).await {
385 log::error!("Failed to subscribe to instrument {instrument_id}: {e}");
386 }
387 Ok(())
388 })
389 }
390
391 #[pyo3(name = "subscribe_book")]
392 fn py_subscribe_book<'py>(
393 &self,
394 py: Python<'py>,
395 instrument_id: InstrumentId,
396 ) -> PyResult<Bound<'py, PyAny>> {
397 let client = self.clone();
398
399 pyo3_async_runtimes::tokio::future_into_py(py, async move {
400 client
401 .subscribe_book(instrument_id)
402 .await
403 .map_err(to_pyvalue_err)
404 })
405 }
406
407 #[pyo3(name = "subscribe_book50_l2_tbt")]
408 fn py_subscribe_book50_l2_tbt<'py>(
409 &self,
410 py: Python<'py>,
411 instrument_id: InstrumentId,
412 ) -> PyResult<Bound<'py, PyAny>> {
413 let client = self.clone();
414
415 pyo3_async_runtimes::tokio::future_into_py(py, async move {
416 if let Err(e) = client.subscribe_book50_l2_tbt(instrument_id).await {
417 log::error!("Failed to subscribe to book50_tbt: {e}");
418 }
419 Ok(())
420 })
421 }
422
423 #[pyo3(name = "subscribe_book_l2_tbt")]
424 fn py_subscribe_book_l2_tbt<'py>(
425 &self,
426 py: Python<'py>,
427 instrument_id: InstrumentId,
428 ) -> PyResult<Bound<'py, PyAny>> {
429 let client = self.clone();
430
431 pyo3_async_runtimes::tokio::future_into_py(py, async move {
432 if let Err(e) = client.subscribe_book_l2_tbt(instrument_id).await {
433 log::error!("Failed to subscribe to books_l2_tbt: {e}");
434 }
435 Ok(())
436 })
437 }
438
439 #[pyo3(name = "subscribe_book_with_depth")]
440 fn py_subscribe_book_with_depth<'py>(
441 &self,
442 py: Python<'py>,
443 instrument_id: InstrumentId,
444 depth: u16,
445 ) -> PyResult<Bound<'py, PyAny>> {
446 let client = self.clone();
447
448 pyo3_async_runtimes::tokio::future_into_py(py, async move {
449 client
450 .subscribe_book_with_depth(instrument_id, depth)
451 .await
452 .map_err(to_pyvalue_err)
453 })
454 }
455
456 #[pyo3(name = "subscribe_book_depth5")]
457 fn py_subscribe_book_depth5<'py>(
458 &self,
459 py: Python<'py>,
460 instrument_id: InstrumentId,
461 ) -> PyResult<Bound<'py, PyAny>> {
462 let client = self.clone();
463
464 pyo3_async_runtimes::tokio::future_into_py(py, async move {
465 if let Err(e) = client.subscribe_book_depth5(instrument_id).await {
466 log::error!("Failed to subscribe to books5: {e}");
467 }
468 Ok(())
469 })
470 }
471
472 #[pyo3(name = "subscribe_quotes")]
473 fn py_subscribe_quotes<'py>(
474 &self,
475 py: Python<'py>,
476 instrument_id: InstrumentId,
477 ) -> PyResult<Bound<'py, PyAny>> {
478 let client = self.clone();
479
480 pyo3_async_runtimes::tokio::future_into_py(py, async move {
481 if let Err(e) = client.subscribe_quotes(instrument_id).await {
482 log::error!("Failed to subscribe to quotes: {e}");
483 }
484 Ok(())
485 })
486 }
487
488 #[pyo3(name = "subscribe_trades")]
489 fn py_subscribe_trades<'py>(
490 &self,
491 py: Python<'py>,
492 instrument_id: InstrumentId,
493 aggregated: bool,
494 ) -> PyResult<Bound<'py, PyAny>> {
495 let client = self.clone();
496
497 pyo3_async_runtimes::tokio::future_into_py(py, async move {
498 if let Err(e) = client.subscribe_trades(instrument_id, aggregated).await {
499 log::error!("Failed to subscribe to trades: {e}");
500 }
501 Ok(())
502 })
503 }
504
505 #[pyo3(name = "subscribe_bars")]
506 fn py_subscribe_bars<'py>(
507 &self,
508 py: Python<'py>,
509 bar_type: BarType,
510 ) -> PyResult<Bound<'py, PyAny>> {
511 let client = self.clone();
512
513 pyo3_async_runtimes::tokio::future_into_py(py, async move {
514 if let Err(e) = client.subscribe_bars(bar_type).await {
515 log::error!("Failed to subscribe to bars: {e}");
516 }
517 Ok(())
518 })
519 }
520
521 #[pyo3(name = "unsubscribe_book")]
522 fn py_unsubscribe_book<'py>(
523 &self,
524 py: Python<'py>,
525 instrument_id: InstrumentId,
526 ) -> PyResult<Bound<'py, PyAny>> {
527 let client = self.clone();
528
529 pyo3_async_runtimes::tokio::future_into_py(py, async move {
530 if let Err(e) = client.unsubscribe_book(instrument_id).await {
531 log::error!("Failed to unsubscribe from order book: {e}");
532 }
533 Ok(())
534 })
535 }
536
537 #[pyo3(name = "unsubscribe_book_depth5")]
538 fn py_unsubscribe_book_depth5<'py>(
539 &self,
540 py: Python<'py>,
541 instrument_id: InstrumentId,
542 ) -> PyResult<Bound<'py, PyAny>> {
543 let client = self.clone();
544
545 pyo3_async_runtimes::tokio::future_into_py(py, async move {
546 if let Err(e) = client.unsubscribe_book_depth5(instrument_id).await {
547 log::error!("Failed to unsubscribe from books5: {e}");
548 }
549 Ok(())
550 })
551 }
552
553 #[pyo3(name = "unsubscribe_book50_l2_tbt")]
554 fn py_unsubscribe_book50_l2_tbt<'py>(
555 &self,
556 py: Python<'py>,
557 instrument_id: InstrumentId,
558 ) -> PyResult<Bound<'py, PyAny>> {
559 let client = self.clone();
560
561 pyo3_async_runtimes::tokio::future_into_py(py, async move {
562 if let Err(e) = client.unsubscribe_book50_l2_tbt(instrument_id).await {
563 log::error!("Failed to unsubscribe from books50_l2_tbt: {e}");
564 }
565 Ok(())
566 })
567 }
568
569 #[pyo3(name = "unsubscribe_book_l2_tbt")]
570 fn py_unsubscribe_book_l2_tbt<'py>(
571 &self,
572 py: Python<'py>,
573 instrument_id: InstrumentId,
574 ) -> PyResult<Bound<'py, PyAny>> {
575 let client = self.clone();
576
577 pyo3_async_runtimes::tokio::future_into_py(py, async move {
578 if let Err(e) = client.unsubscribe_book_l2_tbt(instrument_id).await {
579 log::error!("Failed to unsubscribe from books_l2_tbt: {e}");
580 }
581 Ok(())
582 })
583 }
584
585 #[pyo3(name = "unsubscribe_quotes")]
586 fn py_unsubscribe_quotes<'py>(
587 &self,
588 py: Python<'py>,
589 instrument_id: InstrumentId,
590 ) -> PyResult<Bound<'py, PyAny>> {
591 let client = self.clone();
592
593 pyo3_async_runtimes::tokio::future_into_py(py, async move {
594 if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
595 log::error!("Failed to unsubscribe from quotes: {e}");
596 }
597 Ok(())
598 })
599 }
600
601 #[pyo3(name = "unsubscribe_trades")]
602 fn py_unsubscribe_trades<'py>(
603 &self,
604 py: Python<'py>,
605 instrument_id: InstrumentId,
606 aggregated: bool,
607 ) -> PyResult<Bound<'py, PyAny>> {
608 let client = self.clone();
609
610 pyo3_async_runtimes::tokio::future_into_py(py, async move {
611 if let Err(e) = client.unsubscribe_trades(instrument_id, aggregated).await {
612 log::error!("Failed to unsubscribe from trades: {e}");
613 }
614 Ok(())
615 })
616 }
617
618 #[pyo3(name = "unsubscribe_bars")]
619 fn py_unsubscribe_bars<'py>(
620 &self,
621 py: Python<'py>,
622 bar_type: BarType,
623 ) -> PyResult<Bound<'py, PyAny>> {
624 let client = self.clone();
625
626 pyo3_async_runtimes::tokio::future_into_py(py, async move {
627 if let Err(e) = client.unsubscribe_bars(bar_type).await {
628 log::error!("Failed to unsubscribe from bars: {e}");
629 }
630 Ok(())
631 })
632 }
633
634 #[pyo3(name = "subscribe_ticker")]
635 fn py_subscribe_ticker<'py>(
636 &self,
637 py: Python<'py>,
638 instrument_id: InstrumentId,
639 ) -> PyResult<Bound<'py, PyAny>> {
640 let client = self.clone();
641
642 pyo3_async_runtimes::tokio::future_into_py(py, async move {
643 if let Err(e) = client.subscribe_ticker(instrument_id).await {
644 log::error!("Failed to subscribe to ticker: {e}");
645 }
646 Ok(())
647 })
648 }
649
650 #[pyo3(name = "unsubscribe_ticker")]
651 fn py_unsubscribe_ticker<'py>(
652 &self,
653 py: Python<'py>,
654 instrument_id: InstrumentId,
655 ) -> PyResult<Bound<'py, PyAny>> {
656 let client = self.clone();
657
658 pyo3_async_runtimes::tokio::future_into_py(py, async move {
659 if let Err(e) = client.unsubscribe_ticker(instrument_id).await {
660 log::error!("Failed to unsubscribe from ticker: {e}");
661 }
662 Ok(())
663 })
664 }
665
666 #[pyo3(name = "subscribe_mark_prices")]
667 fn py_subscribe_mark_prices<'py>(
668 &self,
669 py: Python<'py>,
670 instrument_id: InstrumentId,
671 ) -> PyResult<Bound<'py, PyAny>> {
672 let client = self.clone();
673
674 pyo3_async_runtimes::tokio::future_into_py(py, async move {
675 if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
676 log::error!("Failed to subscribe to mark prices: {e}");
677 }
678 Ok(())
679 })
680 }
681
682 #[pyo3(name = "unsubscribe_mark_prices")]
683 fn py_unsubscribe_mark_prices<'py>(
684 &self,
685 py: Python<'py>,
686 instrument_id: InstrumentId,
687 ) -> PyResult<Bound<'py, PyAny>> {
688 let client = self.clone();
689
690 pyo3_async_runtimes::tokio::future_into_py(py, async move {
691 if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
692 log::error!("Failed to unsubscribe from mark prices: {e}");
693 }
694 Ok(())
695 })
696 }
697
698 #[pyo3(name = "subscribe_index_prices")]
699 fn py_subscribe_index_prices<'py>(
700 &self,
701 py: Python<'py>,
702 instrument_id: InstrumentId,
703 ) -> PyResult<Bound<'py, PyAny>> {
704 let client = self.clone();
705
706 pyo3_async_runtimes::tokio::future_into_py(py, async move {
707 if let Err(e) = client.subscribe_index_prices(instrument_id).await {
708 log::error!("Failed to subscribe to index prices: {e}");
709 }
710 Ok(())
711 })
712 }
713
714 #[pyo3(name = "unsubscribe_index_prices")]
715 fn py_unsubscribe_index_prices<'py>(
716 &self,
717 py: Python<'py>,
718 instrument_id: InstrumentId,
719 ) -> PyResult<Bound<'py, PyAny>> {
720 let client = self.clone();
721
722 pyo3_async_runtimes::tokio::future_into_py(py, async move {
723 if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
724 log::error!("Failed to unsubscribe from index prices: {e}");
725 }
726 Ok(())
727 })
728 }
729
730 #[pyo3(name = "subscribe_funding_rates")]
731 fn py_subscribe_funding_rates<'py>(
732 &self,
733 py: Python<'py>,
734 instrument_id: InstrumentId,
735 ) -> PyResult<Bound<'py, PyAny>> {
736 let client = self.clone();
737
738 pyo3_async_runtimes::tokio::future_into_py(py, async move {
739 if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
740 log::error!("Failed to subscribe to funding rates: {e}");
741 }
742 Ok(())
743 })
744 }
745
746 #[pyo3(name = "unsubscribe_funding_rates")]
747 fn py_unsubscribe_funding_rates<'py>(
748 &self,
749 py: Python<'py>,
750 instrument_id: InstrumentId,
751 ) -> PyResult<Bound<'py, PyAny>> {
752 let client = self.clone();
753
754 pyo3_async_runtimes::tokio::future_into_py(py, async move {
755 if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
756 log::error!("Failed to unsubscribe from funding rates: {e}");
757 }
758 Ok(())
759 })
760 }
761
762 #[pyo3(name = "subscribe_orders")]
763 fn py_subscribe_orders<'py>(
764 &self,
765 py: Python<'py>,
766 instrument_type: OKXInstrumentType,
767 ) -> PyResult<Bound<'py, PyAny>> {
768 let client = self.clone();
769
770 pyo3_async_runtimes::tokio::future_into_py(py, async move {
771 if let Err(e) = client.subscribe_orders(instrument_type).await {
772 log::error!("Failed to subscribe to orders '{instrument_type}': {e}");
773 }
774 Ok(())
775 })
776 }
777
778 #[pyo3(name = "unsubscribe_orders")]
779 fn py_unsubscribe_orders<'py>(
780 &self,
781 py: Python<'py>,
782 instrument_type: OKXInstrumentType,
783 ) -> PyResult<Bound<'py, PyAny>> {
784 let client = self.clone();
785
786 pyo3_async_runtimes::tokio::future_into_py(py, async move {
787 if let Err(e) = client.unsubscribe_orders(instrument_type).await {
788 log::error!("Failed to unsubscribe from orders '{instrument_type}': {e}");
789 }
790 Ok(())
791 })
792 }
793
794 #[pyo3(name = "subscribe_orders_algo")]
795 fn py_subscribe_orders_algo<'py>(
796 &self,
797 py: Python<'py>,
798 instrument_type: OKXInstrumentType,
799 ) -> PyResult<Bound<'py, PyAny>> {
800 let client = self.clone();
801
802 pyo3_async_runtimes::tokio::future_into_py(py, async move {
803 if let Err(e) = client.subscribe_orders_algo(instrument_type).await {
804 log::error!("Failed to subscribe to algo orders '{instrument_type}': {e}");
805 }
806 Ok(())
807 })
808 }
809
810 #[pyo3(name = "unsubscribe_orders_algo")]
811 fn py_unsubscribe_orders_algo<'py>(
812 &self,
813 py: Python<'py>,
814 instrument_type: OKXInstrumentType,
815 ) -> PyResult<Bound<'py, PyAny>> {
816 let client = self.clone();
817
818 pyo3_async_runtimes::tokio::future_into_py(py, async move {
819 if let Err(e) = client.unsubscribe_orders_algo(instrument_type).await {
820 log::error!("Failed to unsubscribe from algo orders '{instrument_type}': {e}");
821 }
822 Ok(())
823 })
824 }
825
826 #[pyo3(name = "subscribe_fills")]
827 fn py_subscribe_fills<'py>(
828 &self,
829 py: Python<'py>,
830 instrument_type: OKXInstrumentType,
831 ) -> PyResult<Bound<'py, PyAny>> {
832 let client = self.clone();
833
834 pyo3_async_runtimes::tokio::future_into_py(py, async move {
835 if let Err(e) = client.subscribe_fills(instrument_type).await {
836 log::error!("Failed to subscribe to fills '{instrument_type}': {e}");
837 }
838 Ok(())
839 })
840 }
841
842 #[pyo3(name = "unsubscribe_fills")]
843 fn py_unsubscribe_fills<'py>(
844 &self,
845 py: Python<'py>,
846 instrument_type: OKXInstrumentType,
847 ) -> PyResult<Bound<'py, PyAny>> {
848 let client = self.clone();
849
850 pyo3_async_runtimes::tokio::future_into_py(py, async move {
851 if let Err(e) = client.unsubscribe_fills(instrument_type).await {
852 log::error!("Failed to unsubscribe from fills '{instrument_type}': {e}");
853 }
854 Ok(())
855 })
856 }
857
858 #[pyo3(name = "subscribe_account")]
859 fn py_subscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
860 let client = self.clone();
861
862 pyo3_async_runtimes::tokio::future_into_py(py, async move {
863 if let Err(e) = client.subscribe_account().await {
864 log::error!("Failed to subscribe to account: {e}");
865 }
866 Ok(())
867 })
868 }
869
870 #[pyo3(name = "unsubscribe_account")]
871 fn py_unsubscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
872 let client = self.clone();
873
874 pyo3_async_runtimes::tokio::future_into_py(py, async move {
875 if let Err(e) = client.unsubscribe_account().await {
876 log::error!("Failed to unsubscribe from account: {e}");
877 }
878 Ok(())
879 })
880 }
881
882 #[pyo3(name = "submit_order")]
883 #[pyo3(signature = (
884 trader_id,
885 strategy_id,
886 instrument_id,
887 td_mode,
888 client_order_id,
889 order_side,
890 order_type,
891 quantity,
892 time_in_force=None,
893 price=None,
894 trigger_price=None,
895 post_only=None,
896 reduce_only=None,
897 quote_quantity=None,
898 position_side=None,
899 ))]
900 #[allow(clippy::too_many_arguments)]
901 fn py_submit_order<'py>(
902 &self,
903 py: Python<'py>,
904 trader_id: TraderId,
905 strategy_id: StrategyId,
906 instrument_id: InstrumentId,
907 td_mode: OKXTradeMode,
908 client_order_id: ClientOrderId,
909 order_side: OrderSide,
910 order_type: OrderType,
911 quantity: Quantity,
912 time_in_force: Option<TimeInForce>,
913 price: Option<Price>,
914 trigger_price: Option<Price>,
915 post_only: Option<bool>,
916 reduce_only: Option<bool>,
917 quote_quantity: Option<bool>,
918 position_side: Option<PositionSide>,
919 ) -> PyResult<Bound<'py, PyAny>> {
920 let client = self.clone();
921
922 pyo3_async_runtimes::tokio::future_into_py(py, async move {
923 client
924 .submit_order(
925 trader_id,
926 strategy_id,
927 instrument_id,
928 td_mode,
929 client_order_id,
930 order_side,
931 order_type,
932 quantity,
933 time_in_force,
934 price,
935 trigger_price,
936 post_only,
937 reduce_only,
938 quote_quantity,
939 position_side,
940 )
941 .await
942 .map_err(to_pyvalue_err)
943 })
944 }
945
946 #[pyo3(name = "cancel_order", signature = (
947 trader_id,
948 strategy_id,
949 instrument_id,
950 client_order_id=None,
951 venue_order_id=None,
952 ))]
953 #[allow(clippy::too_many_arguments)]
954 fn py_cancel_order<'py>(
955 &self,
956 py: Python<'py>,
957 trader_id: TraderId,
958 strategy_id: StrategyId,
959 instrument_id: InstrumentId,
960 client_order_id: Option<ClientOrderId>,
961 venue_order_id: Option<VenueOrderId>,
962 ) -> PyResult<Bound<'py, PyAny>> {
963 let client = self.clone();
964
965 pyo3_async_runtimes::tokio::future_into_py(py, async move {
966 client
967 .cancel_order(
968 trader_id,
969 strategy_id,
970 instrument_id,
971 client_order_id,
972 venue_order_id,
973 )
974 .await
975 .map_err(to_pyvalue_err)
976 })
977 }
978
979 #[pyo3(name = "modify_order")]
980 #[pyo3(signature = (
981 trader_id,
982 strategy_id,
983 instrument_id,
984 client_order_id=None,
985 venue_order_id=None,
986 price=None,
987 quantity=None,
988 ))]
989 #[allow(clippy::too_many_arguments)]
990 fn py_modify_order<'py>(
991 &self,
992 py: Python<'py>,
993 trader_id: TraderId,
994 strategy_id: StrategyId,
995 instrument_id: InstrumentId,
996 client_order_id: Option<ClientOrderId>,
997 venue_order_id: Option<VenueOrderId>,
998 price: Option<Price>,
999 quantity: Option<Quantity>,
1000 ) -> PyResult<Bound<'py, PyAny>> {
1001 let client = self.clone();
1002
1003 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1004 client
1005 .modify_order(
1006 trader_id,
1007 strategy_id,
1008 instrument_id,
1009 client_order_id,
1010 price,
1011 quantity,
1012 venue_order_id,
1013 )
1014 .await
1015 .map_err(to_pyvalue_err)
1016 })
1017 }
1018
1019 #[allow(clippy::type_complexity)]
1020 #[pyo3(name = "batch_submit_orders")]
1021 fn py_batch_submit_orders<'py>(
1022 &self,
1023 py: Python<'py>,
1024 orders: Vec<Py<PyAny>>,
1025 ) -> PyResult<Bound<'py, PyAny>> {
1026 let mut domain_orders = Vec::with_capacity(orders.len());
1027
1028 for obj in orders {
1029 let (
1030 instrument_type,
1031 instrument_id,
1032 td_mode,
1033 client_order_id,
1034 order_side,
1035 order_type,
1036 quantity,
1037 position_side,
1038 price,
1039 trigger_price,
1040 post_only,
1041 reduce_only,
1042 ): (
1043 OKXInstrumentType,
1044 InstrumentId,
1045 OKXTradeMode,
1046 ClientOrderId,
1047 OrderSide,
1048 OrderType,
1049 Quantity,
1050 Option<PositionSide>,
1051 Option<Price>,
1052 Option<Price>,
1053 Option<bool>,
1054 Option<bool>,
1055 ) = obj.extract(py).map_err(to_pyruntime_err)?;
1056
1057 domain_orders.push((
1058 instrument_type,
1059 instrument_id,
1060 td_mode,
1061 client_order_id,
1062 order_side,
1063 position_side,
1064 order_type,
1065 quantity,
1066 price,
1067 trigger_price,
1068 post_only,
1069 reduce_only,
1070 ));
1071 }
1072
1073 let client = self.clone();
1074
1075 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1076 client
1077 .batch_submit_orders(domain_orders)
1078 .await
1079 .map_err(to_pyvalue_err)
1080 })
1081 }
1082
1083 #[pyo3(name = "batch_cancel_orders")]
1085 fn py_batch_cancel_orders<'py>(
1086 &self,
1087 py: Python<'py>,
1088 cancels: Vec<Py<PyAny>>,
1089 ) -> PyResult<Bound<'py, PyAny>> {
1090 let mut batched_cancels = Vec::with_capacity(cancels.len());
1091
1092 for obj in cancels {
1093 let (instrument_id, client_order_id, order_id): (
1094 InstrumentId,
1095 Option<ClientOrderId>,
1096 Option<VenueOrderId>,
1097 ) = obj.extract(py).map_err(to_pyruntime_err)?;
1098 batched_cancels.push((instrument_id, client_order_id, order_id));
1099 }
1100
1101 let client = self.clone();
1102
1103 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1104 client
1105 .batch_cancel_orders(batched_cancels)
1106 .await
1107 .map_err(to_pyvalue_err)
1108 })
1109 }
1110
1111 #[pyo3(name = "batch_modify_orders")]
1112 fn py_batch_modify_orders<'py>(
1113 &self,
1114 py: Python<'py>,
1115 orders: Vec<Py<PyAny>>,
1116 ) -> PyResult<Bound<'py, PyAny>> {
1117 let mut domain_orders = Vec::with_capacity(orders.len());
1118
1119 for obj in orders {
1120 let (
1121 instrument_type,
1122 instrument_id,
1123 client_order_id,
1124 new_client_order_id,
1125 price,
1126 quantity,
1127 ): (
1128 String,
1129 InstrumentId,
1130 ClientOrderId,
1131 ClientOrderId,
1132 Option<Price>,
1133 Option<Quantity>,
1134 ) = obj.extract(py).map_err(to_pyruntime_err)?;
1135 let inst_type =
1136 OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1137 domain_orders.push((
1138 inst_type,
1139 instrument_id,
1140 client_order_id,
1141 new_client_order_id,
1142 price,
1143 quantity,
1144 ));
1145 }
1146
1147 let client = self.clone();
1148
1149 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1150 client
1151 .batch_modify_orders(domain_orders)
1152 .await
1153 .map_err(to_pyvalue_err)
1154 })
1155 }
1156
1157 #[pyo3(name = "mass_cancel_orders")]
1158 fn py_mass_cancel_orders<'py>(
1159 &self,
1160 py: Python<'py>,
1161 instrument_id: InstrumentId,
1162 ) -> PyResult<Bound<'py, PyAny>> {
1163 let client = self.clone();
1164
1165 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1166 client
1167 .mass_cancel_orders(instrument_id)
1168 .await
1169 .map_err(to_pyvalue_err)
1170 })
1171 }
1172
1173 #[pyo3(name = "cache_instruments")]
1174 fn py_cache_instruments(&self, py: Python<'_>, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
1175 let instruments: Result<Vec<_>, _> = instruments
1176 .into_iter()
1177 .map(|inst| pyobject_to_instrument_any(py, inst))
1178 .collect();
1179 self.cache_instruments(instruments?);
1180 Ok(())
1181 }
1182
1183 #[pyo3(name = "cache_instrument")]
1184 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
1185 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
1186 Ok(())
1187 }
1188
1189 #[pyo3(name = "cache_inst_id_codes")]
1190 fn py_cache_inst_id_codes(&self, mappings: Vec<(String, u64)>) {
1191 let ustr_mappings = mappings
1192 .into_iter()
1193 .map(|(inst_id, code)| (Ustr::from(&inst_id), code));
1194 self.cache_inst_id_codes(ustr_mappings);
1195 }
1196}
1197
1198fn call_python_with_data<F>(callback: &Py<PyAny>, data_converter: F)
1199where
1200 F: FnOnce(Python) -> PyResult<Py<PyAny>>,
1201{
1202 Python::attach(|py| match data_converter(py) {
1203 Ok(py_obj) => call_python(py, callback, py_obj),
1204 Err(e) => log::error!("Failed to convert data to Python object: {e}"),
1205 });
1206}