1use std::str::FromStr;
45
46use futures_util::StreamExt;
47use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
48use nautilus_model::{
49 data::{BarType, Data, OrderBookDeltas_API},
50 enums::{OrderSide, OrderType, PositionSide, TimeInForce},
51 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
52 python::{
53 data::data_to_pycapsule,
54 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
55 },
56 types::{Price, Quantity},
57};
58use pyo3::{IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
59
60use crate::{
61 common::enums::{OKXInstrumentType, OKXTradeMode, OKXVipLevel},
62 websocket::{
63 OKXWebSocketClient,
64 messages::{ExecutionReport, NautilusWsMessage, OKXWebSocketError},
65 },
66};
67
68#[pyo3::pymethods]
69impl OKXWebSocketError {
70 #[getter]
71 pub fn code(&self) -> &str {
72 &self.code
73 }
74
75 #[getter]
76 pub fn message(&self) -> &str {
77 &self.message
78 }
79
80 #[getter]
81 pub fn conn_id(&self) -> Option<&str> {
82 self.conn_id.as_deref()
83 }
84
85 #[getter]
86 pub fn ts_event(&self) -> u64 {
87 self.timestamp
88 }
89
90 fn __repr__(&self) -> String {
91 format!(
92 "OKXWebSocketError(code='{}', message='{}', conn_id={:?}, ts_event={})",
93 self.code, self.message, self.conn_id, self.timestamp
94 )
95 }
96}
97
98#[pymethods]
99impl OKXWebSocketClient {
100 #[new]
101 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
102 fn py_new(
103 url: Option<String>,
104 api_key: Option<String>,
105 api_secret: Option<String>,
106 api_passphrase: Option<String>,
107 account_id: Option<AccountId>,
108 heartbeat: Option<u64>,
109 ) -> PyResult<Self> {
110 Self::new(
111 url,
112 api_key,
113 api_secret,
114 api_passphrase,
115 account_id,
116 heartbeat,
117 )
118 .map_err(to_pyvalue_err)
119 }
120
121 #[staticmethod]
122 #[pyo3(name = "with_credentials")]
123 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None))]
124 fn py_with_credentials(
125 url: Option<String>,
126 api_key: Option<String>,
127 api_secret: Option<String>,
128 api_passphrase: Option<String>,
129 account_id: Option<AccountId>,
130 heartbeat: Option<u64>,
131 ) -> PyResult<Self> {
132 Self::with_credentials(
133 url,
134 api_key,
135 api_secret,
136 api_passphrase,
137 account_id,
138 heartbeat,
139 )
140 .map_err(to_pyvalue_err)
141 }
142
143 #[staticmethod]
144 #[pyo3(name = "from_env")]
145 fn py_from_env() -> PyResult<Self> {
146 Self::from_env().map_err(to_pyvalue_err)
147 }
148
149 #[getter]
150 #[pyo3(name = "url")]
151 #[must_use]
152 pub fn py_url(&self) -> &str {
153 self.url()
154 }
155
156 #[getter]
157 #[pyo3(name = "api_key")]
158 #[must_use]
159 pub fn py_api_key(&self) -> Option<&str> {
160 self.api_key()
161 }
162
163 #[getter]
164 #[pyo3(name = "api_key_masked")]
165 #[must_use]
166 pub fn py_api_key_masked(&self) -> Option<String> {
167 self.api_key_masked()
168 }
169
170 #[pyo3(name = "is_active")]
171 fn py_is_active(&mut self) -> bool {
172 self.is_active()
173 }
174
175 #[pyo3(name = "is_closed")]
176 fn py_is_closed(&mut self) -> bool {
177 self.is_closed()
178 }
179
180 #[pyo3(name = "cancel_all_requests")]
181 pub fn py_cancel_all_requests(&self) {
182 self.cancel_all_requests();
183 }
184
185 #[pyo3(name = "get_subscriptions")]
186 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
187 let channels = self.get_subscriptions(instrument_id);
188
189 channels
191 .iter()
192 .map(|c| {
193 serde_json::to_value(c)
194 .ok()
195 .and_then(|v| v.as_str().map(String::from))
196 .unwrap_or_else(|| c.to_string())
197 })
198 .collect()
199 }
200
201 #[pyo3(name = "set_vip_level")]
205 fn py_set_vip_level(&self, vip_level: OKXVipLevel) {
206 self.set_vip_level(vip_level);
207 }
208
209 #[pyo3(name = "vip_level")]
211 #[getter]
212 fn py_vip_level(&self) -> OKXVipLevel {
213 self.vip_level()
214 }
215
216 #[pyo3(name = "connect")]
217 fn py_connect<'py>(
218 &mut self,
219 py: Python<'py>,
220 instruments: Vec<Py<PyAny>>,
221 callback: Py<PyAny>,
222 ) -> PyResult<Bound<'py, PyAny>> {
223 let mut instruments_any = Vec::new();
224 for inst in instruments {
225 let inst_any = pyobject_to_instrument_any(py, inst)?;
226 instruments_any.push(inst_any);
227 }
228
229 self.cache_instruments(instruments_any);
230
231 let mut client = self.clone();
232
233 pyo3_async_runtimes::tokio::future_into_py(py, async move {
234 client.connect().await.map_err(to_pyruntime_err)?;
235
236 let stream = client.stream();
237
238 tokio::spawn(async move {
240 let _client = client;
241 tokio::pin!(stream);
242
243 while let Some(msg) = stream.next().await {
244 match msg {
245 NautilusWsMessage::Instrument(msg) => {
246 call_python_with_data(&callback, |py| {
247 instrument_any_to_pyobject(py, *msg)
248 });
249 }
250 NautilusWsMessage::Data(msg) => Python::attach(|py| {
251 for data in msg {
252 let py_obj = data_to_pycapsule(py, data);
253 call_python(py, &callback, py_obj);
254 }
255 }),
256 NautilusWsMessage::FundingRates(msg) => {
257 for data in msg {
258 call_python_with_data(&callback, |py| data.into_py_any(py));
259 }
260 }
261 NautilusWsMessage::OrderAccepted(msg) => {
262 call_python_with_data(&callback, |py| msg.into_py_any(py));
263 }
264 NautilusWsMessage::OrderCanceled(msg) => {
265 call_python_with_data(&callback, |py| msg.into_py_any(py));
266 }
267 NautilusWsMessage::OrderExpired(msg) => {
268 call_python_with_data(&callback, |py| msg.into_py_any(py));
269 }
270 NautilusWsMessage::OrderRejected(msg) => {
271 call_python_with_data(&callback, |py| msg.into_py_any(py));
272 }
273 NautilusWsMessage::OrderCancelRejected(msg) => {
274 call_python_with_data(&callback, |py| msg.into_py_any(py));
275 }
276 NautilusWsMessage::OrderModifyRejected(msg) => {
277 call_python_with_data(&callback, |py| msg.into_py_any(py));
278 }
279 NautilusWsMessage::OrderTriggered(msg) => {
280 call_python_with_data(&callback, |py| msg.into_py_any(py));
281 }
282 NautilusWsMessage::OrderUpdated(msg) => {
283 call_python_with_data(&callback, |py| msg.into_py_any(py));
284 }
285 NautilusWsMessage::ExecutionReports(msg) => {
286 for report in msg {
287 match report {
288 ExecutionReport::Order(report) => {
289 call_python_with_data(&callback, |py| {
290 report.into_py_any(py)
291 });
292 }
293 ExecutionReport::Fill(report) => {
294 call_python_with_data(&callback, |py| {
295 report.into_py_any(py)
296 });
297 }
298 };
299 }
300 }
301 NautilusWsMessage::Deltas(msg) => Python::attach(|py| {
302 let py_obj =
303 data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(msg)));
304 call_python(py, &callback, py_obj);
305 }),
306 NautilusWsMessage::AccountUpdate(msg) => {
307 call_python_with_data(&callback, |py| msg.into_py_any(py));
308 }
309 NautilusWsMessage::PositionUpdate(msg) => {
310 call_python_with_data(&callback, |py| msg.into_py_any(py));
311 }
312 NautilusWsMessage::Reconnected => {} NautilusWsMessage::Authenticated => {} NautilusWsMessage::Error(msg) => {
315 call_python_with_data(&callback, |py| msg.into_py_any(py));
316 }
317 NautilusWsMessage::Raw(msg) => {
318 tracing::debug!("Received raw message, skipping: {msg}");
319 }
320 }
321 }
322 });
323
324 Ok(())
325 })
326 }
327
328 #[pyo3(name = "wait_until_active")]
329 fn py_wait_until_active<'py>(
330 &self,
331 py: Python<'py>,
332 timeout_secs: f64,
333 ) -> PyResult<Bound<'py, PyAny>> {
334 let client = self.clone();
335
336 pyo3_async_runtimes::tokio::future_into_py(py, async move {
337 client
338 .wait_until_active(timeout_secs)
339 .await
340 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
341 Ok(())
342 })
343 }
344
345 #[pyo3(name = "close")]
346 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
347 let mut client = self.clone();
348
349 pyo3_async_runtimes::tokio::future_into_py(py, async move {
350 if let Err(e) = client.close().await {
351 log::error!("Error on close: {e}");
352 }
353 Ok(())
354 })
355 }
356
357 #[pyo3(name = "subscribe_instruments")]
358 fn py_subscribe_instruments<'py>(
359 &self,
360 py: Python<'py>,
361 instrument_type: OKXInstrumentType,
362 ) -> PyResult<Bound<'py, PyAny>> {
363 let client = self.clone();
364
365 pyo3_async_runtimes::tokio::future_into_py(py, async move {
366 if let Err(e) = client.subscribe_instruments(instrument_type).await {
367 log::error!("Failed to subscribe to instruments '{instrument_type}': {e}");
368 }
369 Ok(())
370 })
371 }
372
373 #[pyo3(name = "subscribe_instrument")]
374 fn py_subscribe_instrument<'py>(
375 &self,
376 py: Python<'py>,
377 instrument_id: InstrumentId,
378 ) -> PyResult<Bound<'py, PyAny>> {
379 let client = self.clone();
380
381 pyo3_async_runtimes::tokio::future_into_py(py, async move {
382 if let Err(e) = client.subscribe_instrument(instrument_id).await {
383 log::error!("Failed to subscribe to instrument {instrument_id}: {e}");
384 }
385 Ok(())
386 })
387 }
388
389 #[pyo3(name = "subscribe_book")]
390 fn py_subscribe_book<'py>(
391 &self,
392 py: Python<'py>,
393 instrument_id: InstrumentId,
394 ) -> PyResult<Bound<'py, PyAny>> {
395 let client = self.clone();
396
397 pyo3_async_runtimes::tokio::future_into_py(py, async move {
398 client
399 .subscribe_book(instrument_id)
400 .await
401 .map_err(to_pyvalue_err)
402 })
403 }
404
405 #[pyo3(name = "subscribe_book50_l2_tbt")]
406 fn py_subscribe_book50_l2_tbt<'py>(
407 &self,
408 py: Python<'py>,
409 instrument_id: InstrumentId,
410 ) -> PyResult<Bound<'py, PyAny>> {
411 let client = self.clone();
412
413 pyo3_async_runtimes::tokio::future_into_py(py, async move {
414 if let Err(e) = client.subscribe_book50_l2_tbt(instrument_id).await {
415 log::error!("Failed to subscribe to book50_tbt: {e}");
416 }
417 Ok(())
418 })
419 }
420
421 #[pyo3(name = "subscribe_book_l2_tbt")]
422 fn py_subscribe_book_l2_tbt<'py>(
423 &self,
424 py: Python<'py>,
425 instrument_id: InstrumentId,
426 ) -> PyResult<Bound<'py, PyAny>> {
427 let client = self.clone();
428
429 pyo3_async_runtimes::tokio::future_into_py(py, async move {
430 if let Err(e) = client.subscribe_book_l2_tbt(instrument_id).await {
431 log::error!("Failed to subscribe to books_l2_tbt: {e}");
432 }
433 Ok(())
434 })
435 }
436
437 #[pyo3(name = "subscribe_book_with_depth")]
438 fn py_subscribe_book_with_depth<'py>(
439 &self,
440 py: Python<'py>,
441 instrument_id: InstrumentId,
442 depth: u16,
443 ) -> PyResult<Bound<'py, PyAny>> {
444 let client = self.clone();
445
446 pyo3_async_runtimes::tokio::future_into_py(py, async move {
447 client
448 .subscribe_book_with_depth(instrument_id, depth)
449 .await
450 .map_err(to_pyvalue_err)
451 })
452 }
453
454 #[pyo3(name = "subscribe_book_depth5")]
455 fn py_subscribe_book_depth5<'py>(
456 &self,
457 py: Python<'py>,
458 instrument_id: InstrumentId,
459 ) -> PyResult<Bound<'py, PyAny>> {
460 let client = self.clone();
461
462 pyo3_async_runtimes::tokio::future_into_py(py, async move {
463 if let Err(e) = client.subscribe_book_depth5(instrument_id).await {
464 log::error!("Failed to subscribe to books5: {e}");
465 }
466 Ok(())
467 })
468 }
469
470 #[pyo3(name = "subscribe_quotes")]
471 fn py_subscribe_quotes<'py>(
472 &self,
473 py: Python<'py>,
474 instrument_id: InstrumentId,
475 ) -> PyResult<Bound<'py, PyAny>> {
476 let client = self.clone();
477
478 pyo3_async_runtimes::tokio::future_into_py(py, async move {
479 if let Err(e) = client.subscribe_quotes(instrument_id).await {
480 log::error!("Failed to subscribe to quotes: {e}");
481 }
482 Ok(())
483 })
484 }
485
486 #[pyo3(name = "subscribe_trades")]
487 fn py_subscribe_trades<'py>(
488 &self,
489 py: Python<'py>,
490 instrument_id: InstrumentId,
491 aggregated: bool,
492 ) -> PyResult<Bound<'py, PyAny>> {
493 let client = self.clone();
494
495 pyo3_async_runtimes::tokio::future_into_py(py, async move {
496 if let Err(e) = client.subscribe_trades(instrument_id, aggregated).await {
497 log::error!("Failed to subscribe to trades: {e}");
498 }
499 Ok(())
500 })
501 }
502
503 #[pyo3(name = "subscribe_bars")]
504 fn py_subscribe_bars<'py>(
505 &self,
506 py: Python<'py>,
507 bar_type: BarType,
508 ) -> PyResult<Bound<'py, PyAny>> {
509 let client = self.clone();
510
511 pyo3_async_runtimes::tokio::future_into_py(py, async move {
512 if let Err(e) = client.subscribe_bars(bar_type).await {
513 log::error!("Failed to subscribe to bars: {e}");
514 }
515 Ok(())
516 })
517 }
518
519 #[pyo3(name = "unsubscribe_book")]
520 fn py_unsubscribe_book<'py>(
521 &self,
522 py: Python<'py>,
523 instrument_id: InstrumentId,
524 ) -> PyResult<Bound<'py, PyAny>> {
525 let client = self.clone();
526
527 pyo3_async_runtimes::tokio::future_into_py(py, async move {
528 if let Err(e) = client.unsubscribe_book(instrument_id).await {
529 log::error!("Failed to unsubscribe from order book: {e}");
530 }
531 Ok(())
532 })
533 }
534
535 #[pyo3(name = "unsubscribe_book_depth5")]
536 fn py_unsubscribe_book_depth5<'py>(
537 &self,
538 py: Python<'py>,
539 instrument_id: InstrumentId,
540 ) -> PyResult<Bound<'py, PyAny>> {
541 let client = self.clone();
542
543 pyo3_async_runtimes::tokio::future_into_py(py, async move {
544 if let Err(e) = client.unsubscribe_book_depth5(instrument_id).await {
545 log::error!("Failed to unsubscribe from books5: {e}");
546 }
547 Ok(())
548 })
549 }
550
551 #[pyo3(name = "unsubscribe_book50_l2_tbt")]
552 fn py_unsubscribe_book50_l2_tbt<'py>(
553 &self,
554 py: Python<'py>,
555 instrument_id: InstrumentId,
556 ) -> PyResult<Bound<'py, PyAny>> {
557 let client = self.clone();
558
559 pyo3_async_runtimes::tokio::future_into_py(py, async move {
560 if let Err(e) = client.unsubscribe_book50_l2_tbt(instrument_id).await {
561 log::error!("Failed to unsubscribe from books50_l2_tbt: {e}");
562 }
563 Ok(())
564 })
565 }
566
567 #[pyo3(name = "unsubscribe_book_l2_tbt")]
568 fn py_unsubscribe_book_l2_tbt<'py>(
569 &self,
570 py: Python<'py>,
571 instrument_id: InstrumentId,
572 ) -> PyResult<Bound<'py, PyAny>> {
573 let client = self.clone();
574
575 pyo3_async_runtimes::tokio::future_into_py(py, async move {
576 if let Err(e) = client.unsubscribe_book_l2_tbt(instrument_id).await {
577 log::error!("Failed to unsubscribe from books_l2_tbt: {e}");
578 }
579 Ok(())
580 })
581 }
582
583 #[pyo3(name = "unsubscribe_quotes")]
584 fn py_unsubscribe_quotes<'py>(
585 &self,
586 py: Python<'py>,
587 instrument_id: InstrumentId,
588 ) -> PyResult<Bound<'py, PyAny>> {
589 let client = self.clone();
590
591 pyo3_async_runtimes::tokio::future_into_py(py, async move {
592 if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
593 log::error!("Failed to unsubscribe from quotes: {e}");
594 }
595 Ok(())
596 })
597 }
598
599 #[pyo3(name = "unsubscribe_trades")]
600 fn py_unsubscribe_trades<'py>(
601 &self,
602 py: Python<'py>,
603 instrument_id: InstrumentId,
604 aggregated: bool,
605 ) -> PyResult<Bound<'py, PyAny>> {
606 let client = self.clone();
607
608 pyo3_async_runtimes::tokio::future_into_py(py, async move {
609 if let Err(e) = client.unsubscribe_trades(instrument_id, aggregated).await {
610 log::error!("Failed to unsubscribe from trades: {e}");
611 }
612 Ok(())
613 })
614 }
615
616 #[pyo3(name = "unsubscribe_bars")]
617 fn py_unsubscribe_bars<'py>(
618 &self,
619 py: Python<'py>,
620 bar_type: BarType,
621 ) -> PyResult<Bound<'py, PyAny>> {
622 let client = self.clone();
623
624 pyo3_async_runtimes::tokio::future_into_py(py, async move {
625 if let Err(e) = client.unsubscribe_bars(bar_type).await {
626 log::error!("Failed to unsubscribe from bars: {e}");
627 }
628 Ok(())
629 })
630 }
631
632 #[pyo3(name = "subscribe_ticker")]
633 fn py_subscribe_ticker<'py>(
634 &self,
635 py: Python<'py>,
636 instrument_id: InstrumentId,
637 ) -> PyResult<Bound<'py, PyAny>> {
638 let client = self.clone();
639
640 pyo3_async_runtimes::tokio::future_into_py(py, async move {
641 if let Err(e) = client.subscribe_ticker(instrument_id).await {
642 log::error!("Failed to subscribe to ticker: {e}");
643 }
644 Ok(())
645 })
646 }
647
648 #[pyo3(name = "unsubscribe_ticker")]
649 fn py_unsubscribe_ticker<'py>(
650 &self,
651 py: Python<'py>,
652 instrument_id: InstrumentId,
653 ) -> PyResult<Bound<'py, PyAny>> {
654 let client = self.clone();
655
656 pyo3_async_runtimes::tokio::future_into_py(py, async move {
657 if let Err(e) = client.unsubscribe_ticker(instrument_id).await {
658 log::error!("Failed to unsubscribe from ticker: {e}");
659 }
660 Ok(())
661 })
662 }
663
664 #[pyo3(name = "subscribe_mark_prices")]
665 fn py_subscribe_mark_prices<'py>(
666 &self,
667 py: Python<'py>,
668 instrument_id: InstrumentId,
669 ) -> PyResult<Bound<'py, PyAny>> {
670 let client = self.clone();
671
672 pyo3_async_runtimes::tokio::future_into_py(py, async move {
673 if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
674 log::error!("Failed to subscribe to mark prices: {e}");
675 }
676 Ok(())
677 })
678 }
679
680 #[pyo3(name = "unsubscribe_mark_prices")]
681 fn py_unsubscribe_mark_prices<'py>(
682 &self,
683 py: Python<'py>,
684 instrument_id: InstrumentId,
685 ) -> PyResult<Bound<'py, PyAny>> {
686 let client = self.clone();
687
688 pyo3_async_runtimes::tokio::future_into_py(py, async move {
689 if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
690 log::error!("Failed to unsubscribe from mark prices: {e}");
691 }
692 Ok(())
693 })
694 }
695
696 #[pyo3(name = "subscribe_index_prices")]
697 fn py_subscribe_index_prices<'py>(
698 &self,
699 py: Python<'py>,
700 instrument_id: InstrumentId,
701 ) -> PyResult<Bound<'py, PyAny>> {
702 let client = self.clone();
703
704 pyo3_async_runtimes::tokio::future_into_py(py, async move {
705 if let Err(e) = client.subscribe_index_prices(instrument_id).await {
706 log::error!("Failed to subscribe to index prices: {e}");
707 }
708 Ok(())
709 })
710 }
711
712 #[pyo3(name = "unsubscribe_index_prices")]
713 fn py_unsubscribe_index_prices<'py>(
714 &self,
715 py: Python<'py>,
716 instrument_id: InstrumentId,
717 ) -> PyResult<Bound<'py, PyAny>> {
718 let client = self.clone();
719
720 pyo3_async_runtimes::tokio::future_into_py(py, async move {
721 if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
722 log::error!("Failed to unsubscribe from index prices: {e}");
723 }
724 Ok(())
725 })
726 }
727
728 #[pyo3(name = "subscribe_funding_rates")]
729 fn py_subscribe_funding_rates<'py>(
730 &self,
731 py: Python<'py>,
732 instrument_id: InstrumentId,
733 ) -> PyResult<Bound<'py, PyAny>> {
734 let client = self.clone();
735
736 pyo3_async_runtimes::tokio::future_into_py(py, async move {
737 if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
738 log::error!("Failed to subscribe to funding rates: {e}");
739 }
740 Ok(())
741 })
742 }
743
744 #[pyo3(name = "unsubscribe_funding_rates")]
745 fn py_unsubscribe_funding_rates<'py>(
746 &self,
747 py: Python<'py>,
748 instrument_id: InstrumentId,
749 ) -> PyResult<Bound<'py, PyAny>> {
750 let client = self.clone();
751
752 pyo3_async_runtimes::tokio::future_into_py(py, async move {
753 if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
754 log::error!("Failed to unsubscribe from funding rates: {e}");
755 }
756 Ok(())
757 })
758 }
759
760 #[pyo3(name = "subscribe_orders")]
761 fn py_subscribe_orders<'py>(
762 &self,
763 py: Python<'py>,
764 instrument_type: OKXInstrumentType,
765 ) -> PyResult<Bound<'py, PyAny>> {
766 let client = self.clone();
767
768 pyo3_async_runtimes::tokio::future_into_py(py, async move {
769 if let Err(e) = client.subscribe_orders(instrument_type).await {
770 log::error!("Failed to subscribe to orders '{instrument_type}': {e}");
771 }
772 Ok(())
773 })
774 }
775
776 #[pyo3(name = "unsubscribe_orders")]
777 fn py_unsubscribe_orders<'py>(
778 &self,
779 py: Python<'py>,
780 instrument_type: OKXInstrumentType,
781 ) -> PyResult<Bound<'py, PyAny>> {
782 let client = self.clone();
783
784 pyo3_async_runtimes::tokio::future_into_py(py, async move {
785 if let Err(e) = client.unsubscribe_orders(instrument_type).await {
786 log::error!("Failed to unsubscribe from orders '{instrument_type}': {e}");
787 }
788 Ok(())
789 })
790 }
791
792 #[pyo3(name = "subscribe_orders_algo")]
793 fn py_subscribe_orders_algo<'py>(
794 &self,
795 py: Python<'py>,
796 instrument_type: OKXInstrumentType,
797 ) -> PyResult<Bound<'py, PyAny>> {
798 let client = self.clone();
799
800 pyo3_async_runtimes::tokio::future_into_py(py, async move {
801 if let Err(e) = client.subscribe_orders_algo(instrument_type).await {
802 log::error!("Failed to subscribe to algo orders '{instrument_type}': {e}");
803 }
804 Ok(())
805 })
806 }
807
808 #[pyo3(name = "unsubscribe_orders_algo")]
809 fn py_unsubscribe_orders_algo<'py>(
810 &self,
811 py: Python<'py>,
812 instrument_type: OKXInstrumentType,
813 ) -> PyResult<Bound<'py, PyAny>> {
814 let client = self.clone();
815
816 pyo3_async_runtimes::tokio::future_into_py(py, async move {
817 if let Err(e) = client.unsubscribe_orders_algo(instrument_type).await {
818 log::error!("Failed to unsubscribe from algo orders '{instrument_type}': {e}");
819 }
820 Ok(())
821 })
822 }
823
824 #[pyo3(name = "subscribe_fills")]
825 fn py_subscribe_fills<'py>(
826 &self,
827 py: Python<'py>,
828 instrument_type: OKXInstrumentType,
829 ) -> PyResult<Bound<'py, PyAny>> {
830 let client = self.clone();
831
832 pyo3_async_runtimes::tokio::future_into_py(py, async move {
833 if let Err(e) = client.subscribe_fills(instrument_type).await {
834 log::error!("Failed to subscribe to fills '{instrument_type}': {e}");
835 }
836 Ok(())
837 })
838 }
839
840 #[pyo3(name = "unsubscribe_fills")]
841 fn py_unsubscribe_fills<'py>(
842 &self,
843 py: Python<'py>,
844 instrument_type: OKXInstrumentType,
845 ) -> PyResult<Bound<'py, PyAny>> {
846 let client = self.clone();
847
848 pyo3_async_runtimes::tokio::future_into_py(py, async move {
849 if let Err(e) = client.unsubscribe_fills(instrument_type).await {
850 log::error!("Failed to unsubscribe from fills '{instrument_type}': {e}");
851 }
852 Ok(())
853 })
854 }
855
856 #[pyo3(name = "subscribe_account")]
857 fn py_subscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
858 let client = self.clone();
859
860 pyo3_async_runtimes::tokio::future_into_py(py, async move {
861 if let Err(e) = client.subscribe_account().await {
862 log::error!("Failed to subscribe to account: {e}");
863 }
864 Ok(())
865 })
866 }
867
868 #[pyo3(name = "unsubscribe_account")]
869 fn py_unsubscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
870 let client = self.clone();
871
872 pyo3_async_runtimes::tokio::future_into_py(py, async move {
873 if let Err(e) = client.unsubscribe_account().await {
874 log::error!("Failed to unsubscribe from account: {e}");
875 }
876 Ok(())
877 })
878 }
879
880 #[pyo3(name = "submit_order")]
881 #[pyo3(signature = (
882 trader_id,
883 strategy_id,
884 instrument_id,
885 td_mode,
886 client_order_id,
887 order_side,
888 order_type,
889 quantity,
890 time_in_force=None,
891 price=None,
892 trigger_price=None,
893 post_only=None,
894 reduce_only=None,
895 quote_quantity=None,
896 position_side=None,
897 ))]
898 #[allow(clippy::too_many_arguments)]
899 fn py_submit_order<'py>(
900 &self,
901 py: Python<'py>,
902 trader_id: TraderId,
903 strategy_id: StrategyId,
904 instrument_id: InstrumentId,
905 td_mode: OKXTradeMode,
906 client_order_id: ClientOrderId,
907 order_side: OrderSide,
908 order_type: OrderType,
909 quantity: Quantity,
910 time_in_force: Option<TimeInForce>,
911 price: Option<Price>,
912 trigger_price: Option<Price>,
913 post_only: Option<bool>,
914 reduce_only: Option<bool>,
915 quote_quantity: Option<bool>,
916 position_side: Option<PositionSide>,
917 ) -> PyResult<Bound<'py, PyAny>> {
918 let client = self.clone();
919
920 pyo3_async_runtimes::tokio::future_into_py(py, async move {
921 client
922 .submit_order(
923 trader_id,
924 strategy_id,
925 instrument_id,
926 td_mode,
927 client_order_id,
928 order_side,
929 order_type,
930 quantity,
931 time_in_force,
932 price,
933 trigger_price,
934 post_only,
935 reduce_only,
936 quote_quantity,
937 position_side,
938 )
939 .await
940 .map_err(to_pyvalue_err)
941 })
942 }
943
944 #[pyo3(name = "cancel_order", signature = (
945 trader_id,
946 strategy_id,
947 instrument_id,
948 client_order_id=None,
949 venue_order_id=None,
950 ))]
951 #[allow(clippy::too_many_arguments)]
952 fn py_cancel_order<'py>(
953 &self,
954 py: Python<'py>,
955 trader_id: TraderId,
956 strategy_id: StrategyId,
957 instrument_id: InstrumentId,
958 client_order_id: Option<ClientOrderId>,
959 venue_order_id: Option<VenueOrderId>,
960 ) -> PyResult<Bound<'py, PyAny>> {
961 let client = self.clone();
962
963 pyo3_async_runtimes::tokio::future_into_py(py, async move {
964 client
965 .cancel_order(
966 trader_id,
967 strategy_id,
968 instrument_id,
969 client_order_id,
970 venue_order_id,
971 )
972 .await
973 .map_err(to_pyvalue_err)
974 })
975 }
976
977 #[pyo3(name = "modify_order")]
978 #[pyo3(signature = (
979 trader_id,
980 strategy_id,
981 instrument_id,
982 client_order_id=None,
983 venue_order_id=None,
984 price=None,
985 quantity=None,
986 ))]
987 #[allow(clippy::too_many_arguments)]
988 fn py_modify_order<'py>(
989 &self,
990 py: Python<'py>,
991 trader_id: TraderId,
992 strategy_id: StrategyId,
993 instrument_id: InstrumentId,
994 client_order_id: Option<ClientOrderId>,
995 venue_order_id: Option<VenueOrderId>,
996 price: Option<Price>,
997 quantity: Option<Quantity>,
998 ) -> PyResult<Bound<'py, PyAny>> {
999 let client = self.clone();
1000
1001 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1002 client
1003 .modify_order(
1004 trader_id,
1005 strategy_id,
1006 instrument_id,
1007 client_order_id,
1008 price,
1009 quantity,
1010 venue_order_id,
1011 )
1012 .await
1013 .map_err(to_pyvalue_err)
1014 })
1015 }
1016
1017 #[allow(clippy::type_complexity)]
1018 #[pyo3(name = "batch_submit_orders")]
1019 fn py_batch_submit_orders<'py>(
1020 &self,
1021 py: Python<'py>,
1022 orders: Vec<Py<PyAny>>,
1023 ) -> PyResult<Bound<'py, PyAny>> {
1024 let mut domain_orders = Vec::with_capacity(orders.len());
1025
1026 for obj in orders {
1027 let (
1028 instrument_type,
1029 instrument_id,
1030 td_mode,
1031 client_order_id,
1032 order_side,
1033 order_type,
1034 quantity,
1035 position_side,
1036 price,
1037 trigger_price,
1038 post_only,
1039 reduce_only,
1040 ): (
1041 OKXInstrumentType,
1042 InstrumentId,
1043 OKXTradeMode,
1044 ClientOrderId,
1045 OrderSide,
1046 OrderType,
1047 Quantity,
1048 Option<PositionSide>,
1049 Option<Price>,
1050 Option<Price>,
1051 Option<bool>,
1052 Option<bool>,
1053 ) = obj
1054 .extract(py)
1055 .map_err(|e: PyErr| PyRuntimeError::new_err(e.to_string()))?;
1056
1057 domain_orders.push((
1058 instrument_type,
1059 instrument_id,
1060 td_mode,
1061 client_order_id,
1062 order_side,
1063 position_side,
1064 order_type,
1065 quantity,
1066 price,
1067 trigger_price,
1068 post_only,
1069 reduce_only,
1070 ));
1071 }
1072
1073 let client = self.clone();
1074
1075 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1076 client
1077 .batch_submit_orders(domain_orders)
1078 .await
1079 .map_err(to_pyvalue_err)
1080 })
1081 }
1082
1083 #[pyo3(name = "batch_cancel_orders")]
1085 fn py_batch_cancel_orders<'py>(
1086 &self,
1087 py: Python<'py>,
1088 cancels: Vec<Py<PyAny>>,
1089 ) -> PyResult<Bound<'py, PyAny>> {
1090 let mut batched_cancels = Vec::with_capacity(cancels.len());
1091
1092 for obj in cancels {
1093 let (instrument_id, client_order_id, order_id): (
1094 InstrumentId,
1095 Option<ClientOrderId>,
1096 Option<VenueOrderId>,
1097 ) = obj
1098 .extract(py)
1099 .map_err(|e: PyErr| PyRuntimeError::new_err(e.to_string()))?;
1100 batched_cancels.push((instrument_id, client_order_id, order_id));
1101 }
1102
1103 let client = self.clone();
1104
1105 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1106 client
1107 .batch_cancel_orders(batched_cancels)
1108 .await
1109 .map_err(to_pyvalue_err)
1110 })
1111 }
1112
1113 #[pyo3(name = "batch_modify_orders")]
1114 fn py_batch_modify_orders<'py>(
1115 &self,
1116 py: Python<'py>,
1117 orders: Vec<Py<PyAny>>,
1118 ) -> PyResult<Bound<'py, PyAny>> {
1119 let mut domain_orders = Vec::with_capacity(orders.len());
1120
1121 for obj in orders {
1122 let (
1123 instrument_type,
1124 instrument_id,
1125 client_order_id,
1126 new_client_order_id,
1127 price,
1128 quantity,
1129 ): (
1130 String,
1131 InstrumentId,
1132 ClientOrderId,
1133 ClientOrderId,
1134 Option<Price>,
1135 Option<Quantity>,
1136 ) = obj
1137 .extract(py)
1138 .map_err(|e: PyErr| PyRuntimeError::new_err(e.to_string()))?;
1139 let inst_type =
1140 OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1141 domain_orders.push((
1142 inst_type,
1143 instrument_id,
1144 client_order_id,
1145 new_client_order_id,
1146 price,
1147 quantity,
1148 ));
1149 }
1150
1151 let client = self.clone();
1152
1153 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1154 client
1155 .batch_modify_orders(domain_orders)
1156 .await
1157 .map_err(to_pyvalue_err)
1158 })
1159 }
1160
1161 #[pyo3(name = "mass_cancel_orders")]
1162 fn py_mass_cancel_orders<'py>(
1163 &self,
1164 py: Python<'py>,
1165 instrument_id: InstrumentId,
1166 ) -> PyResult<Bound<'py, PyAny>> {
1167 let client = self.clone();
1168
1169 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1170 client
1171 .mass_cancel_orders(instrument_id)
1172 .await
1173 .map_err(to_pyvalue_err)
1174 })
1175 }
1176
1177 #[pyo3(name = "cache_instruments")]
1178 fn py_cache_instruments(&self, py: Python<'_>, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
1179 let instruments: Result<Vec<_>, _> = instruments
1180 .into_iter()
1181 .map(|inst| pyobject_to_instrument_any(py, inst))
1182 .collect();
1183 self.cache_instruments(instruments?);
1184 Ok(())
1185 }
1186
1187 #[pyo3(name = "cache_instrument")]
1188 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
1189 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
1190 Ok(())
1191 }
1192}
1193
1194pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
1195 if let Err(e) = callback.call1(py, (py_obj,)) {
1196 tracing::error!("Error calling Python: {e}");
1197 }
1198}
1199
1200fn call_python_with_data<F>(callback: &Py<PyAny>, data_converter: F)
1201where
1202 F: FnOnce(Python) -> PyResult<Py<PyAny>>,
1203{
1204 Python::attach(|py| match data_converter(py) {
1205 Ok(py_obj) => call_python(py, callback, py_obj),
1206 Err(e) => tracing::error!("Failed to convert data to Python object: {e}"),
1207 });
1208}