1use std::{
26 collections::VecDeque,
27 fmt::Debug,
28 num::NonZeroU32,
29 sync::{
30 Arc, LazyLock,
31 atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
32 },
33 time::{Duration, SystemTime},
34};
35
36use ahash::{AHashMap, AHashSet};
37use dashmap::DashMap;
38use futures_util::Stream;
39use nautilus_common::runtime::get_runtime;
40use nautilus_core::{
41 UUID4,
42 consts::NAUTILUS_USER_AGENT,
43 env::{get_env_var, get_or_env_var},
44 nanos::UnixNanos,
45 time::get_atomic_clock_realtime,
46};
47use nautilus_model::{
48 data::BarType,
49 enums::{OrderSide, OrderStatus, OrderType, PositionSide, TimeInForce, TriggerType},
50 events::{AccountState, OrderCancelRejected, OrderModifyRejected, OrderRejected},
51 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
52 instruments::{Instrument, InstrumentAny},
53 reports::OrderStatusReport,
54 types::{Money, Price, Quantity},
55};
56use nautilus_network::{
57 RECONNECTED,
58 ratelimiter::quota::Quota,
59 retry::{RetryManager, create_websocket_retry_manager},
60 websocket::{
61 PingHandler, TEXT_PING, TEXT_PONG, WebSocketClient, WebSocketConfig,
62 channel_message_handler,
63 },
64};
65use reqwest::header::USER_AGENT;
66use serde_json::Value;
67use tokio::sync::mpsc::UnboundedReceiver;
68use tokio_tungstenite::tungstenite::{Error, Message};
69use tokio_util::sync::CancellationToken;
70use ustr::Ustr;
71
72use super::{
73 auth::{AUTHENTICATION_TIMEOUT_SECS, AuthTracker},
74 enums::{OKXSubscriptionEvent, OKXWsChannel, OKXWsOperation},
75 error::OKXWsError,
76 messages::{
77 ExecutionReport, NautilusWsMessage, OKXAuthentication, OKXAuthenticationArg,
78 OKXSubscription, OKXSubscriptionArg, OKXWebSocketArg, OKXWebSocketError, OKXWebSocketEvent,
79 OKXWsRequest, WsAmendOrderParams, WsAmendOrderParamsBuilder, WsCancelAlgoOrderParams,
80 WsCancelAlgoOrderParamsBuilder, WsCancelOrderParams, WsCancelOrderParamsBuilder,
81 WsMassCancelParams, WsPostAlgoOrderParams, WsPostAlgoOrderParamsBuilder, WsPostOrderParams,
82 WsPostOrderParamsBuilder,
83 },
84 parse::{parse_book_msg_vec, parse_ws_message_data},
85 subscription::{SubscriptionState, topic_from_subscription_arg, topic_from_websocket_arg},
86};
87use crate::{
88 common::{
89 consts::{
90 OKX_NAUTILUS_BROKER_ID, OKX_POST_ONLY_CANCEL_REASON, OKX_POST_ONLY_CANCEL_SOURCE,
91 OKX_POST_ONLY_ERROR_CODE, OKX_SUPPORTED_ORDER_TYPES, OKX_SUPPORTED_TIME_IN_FORCE,
92 OKX_WS_PUBLIC_URL, should_retry_error_code,
93 },
94 credential::Credential,
95 enums::{
96 OKXInstrumentType, OKXOrderStatus, OKXOrderType, OKXPositionSide, OKXSide,
97 OKXTargetCurrency, OKXTradeMode, OKXTriggerType, OKXVipLevel,
98 conditional_order_to_algo_type, is_conditional_order,
99 },
100 parse::{
101 bar_spec_as_okx_channel, okx_instrument_type, parse_account_state,
102 parse_client_order_id, parse_millisecond_timestamp, parse_price, parse_quantity,
103 },
104 },
105 http::models::OKXAccount,
106 websocket::{
107 messages::{OKXAlgoOrderMsg, OKXOrderMsg},
108 parse::{parse_algo_order_msg, parse_order_msg},
109 },
110};
111
112enum PendingOrderParams {
113 Regular(WsPostOrderParams),
114 Algo(()),
115}
116
117type PlaceRequestData = (
118 PendingOrderParams,
119 ClientOrderId,
120 TraderId,
121 StrategyId,
122 InstrumentId,
123);
124type CancelRequestData = (
125 ClientOrderId,
126 TraderId,
127 StrategyId,
128 InstrumentId,
129 Option<VenueOrderId>,
130);
131type AmendRequestData = (
132 ClientOrderId,
133 TraderId,
134 StrategyId,
135 InstrumentId,
136 Option<VenueOrderId>,
137);
138type MassCancelRequestData = InstrumentId;
139
140pub static OKX_WS_CONNECTION_QUOTA: LazyLock<Quota> =
144 LazyLock::new(|| Quota::per_second(NonZeroU32::new(3).unwrap()));
145
146pub static OKX_WS_SUBSCRIPTION_QUOTA: LazyLock<Quota> =
151 LazyLock::new(|| Quota::per_hour(NonZeroU32::new(480).unwrap()));
152
153pub static OKX_WS_ORDER_QUOTA: LazyLock<Quota> =
158 LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
159
160fn should_retry_okx_error(error: &OKXWsError) -> bool {
162 match error {
163 OKXWsError::OkxError { error_code, .. } => should_retry_error_code(error_code),
164 OKXWsError::TungsteniteError(_) => true, OKXWsError::ClientError(msg) => {
166 let msg_lower = msg.to_lowercase();
168 msg_lower.contains("timeout")
169 || msg_lower.contains("timed out")
170 || msg_lower.contains("connection")
171 || msg_lower.contains("network")
172 }
173 OKXWsError::AuthenticationError(_)
174 | OKXWsError::JsonError(_)
175 | OKXWsError::ParsingError(_) => {
176 false
178 }
179 }
180}
181
182fn create_okx_timeout_error(msg: String) -> OKXWsError {
184 OKXWsError::ClientError(msg)
185}
186
187fn channel_requires_auth(channel: &OKXWsChannel) -> bool {
188 matches!(
189 channel,
190 OKXWsChannel::Account
191 | OKXWsChannel::Orders
192 | OKXWsChannel::Fills
193 | OKXWsChannel::OrdersAlgo
194 )
195}
196
197#[derive(Clone)]
199#[cfg_attr(
200 feature = "python",
201 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
202)]
203pub struct OKXWebSocketClient {
204 url: String,
205 account_id: AccountId,
206 vip_level: Arc<AtomicU8>,
207 credential: Option<Credential>,
208 heartbeat: Option<u64>,
209 inner: Arc<tokio::sync::RwLock<Option<WebSocketClient>>>,
210 auth_tracker: AuthTracker,
211 rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
212 signal: Arc<AtomicBool>,
213 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
214 subscriptions_inst_type: Arc<DashMap<OKXWsChannel, AHashSet<OKXInstrumentType>>>,
215 subscriptions_inst_family: Arc<DashMap<OKXWsChannel, AHashSet<Ustr>>>,
216 subscriptions_inst_id: Arc<DashMap<OKXWsChannel, AHashSet<Ustr>>>,
217 subscriptions_bare: Arc<DashMap<OKXWsChannel, bool>>, subscriptions_state: SubscriptionState,
219 request_id_counter: Arc<AtomicU64>,
220 pending_place_requests: Arc<DashMap<String, PlaceRequestData>>,
221 pending_cancel_requests: Arc<DashMap<String, CancelRequestData>>,
222 pending_amend_requests: Arc<DashMap<String, AmendRequestData>>,
223 pending_mass_cancel_requests: Arc<DashMap<String, MassCancelRequestData>>,
224 active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
225 emitted_order_accepted: Arc<DashMap<VenueOrderId, ()>>, client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
227 instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
228 retry_manager: Arc<RetryManager<OKXWsError>>,
229 cancellation_token: CancellationToken,
230}
231
232impl Default for OKXWebSocketClient {
233 fn default() -> Self {
234 Self::new(None, None, None, None, None, None).unwrap()
235 }
236}
237
238impl Debug for OKXWebSocketClient {
239 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240 f.debug_struct(stringify!(OKXWebSocketClient))
241 .field("url", &self.url)
242 .field(
243 "credential",
244 &self.credential.as_ref().map(|_| "<redacted>"),
245 )
246 .field("heartbeat", &self.heartbeat)
247 .finish_non_exhaustive()
248 }
249}
250
251impl OKXWebSocketClient {
252 pub fn new(
258 url: Option<String>,
259 api_key: Option<String>,
260 api_secret: Option<String>,
261 api_passphrase: Option<String>,
262 account_id: Option<AccountId>,
263 heartbeat: Option<u64>,
264 ) -> anyhow::Result<Self> {
265 let url = url.unwrap_or(OKX_WS_PUBLIC_URL.to_string());
266 let account_id = account_id.unwrap_or(AccountId::from("OKX-master"));
267
268 let credential = match (api_key, api_secret, api_passphrase) {
269 (Some(key), Some(secret), Some(passphrase)) => {
270 Some(Credential::new(key, secret, passphrase))
271 }
272 (None, None, None) => None,
273 _ => anyhow::bail!(
274 "`api_key`, `api_secret`, `api_passphrase` credentials must be provided together"
275 ),
276 };
277
278 let signal = Arc::new(AtomicBool::new(false));
279 let subscriptions_inst_type = Arc::new(DashMap::new());
280 let subscriptions_inst_family = Arc::new(DashMap::new());
281 let subscriptions_inst_id = Arc::new(DashMap::new());
282 let subscriptions_bare = Arc::new(DashMap::new());
283 let subscriptions_state = SubscriptionState::new();
284
285 Ok(Self {
286 url,
287 account_id,
288 vip_level: Arc::new(AtomicU8::new(0)), credential,
290 heartbeat,
291 inner: Arc::new(tokio::sync::RwLock::new(None)),
292 auth_tracker: AuthTracker::new(),
293 rx: None,
294 signal,
295 task_handle: None,
296 subscriptions_inst_type,
297 subscriptions_inst_family,
298 subscriptions_inst_id,
299 subscriptions_bare,
300 subscriptions_state,
301 request_id_counter: Arc::new(AtomicU64::new(1)),
302 pending_place_requests: Arc::new(DashMap::new()),
303 pending_cancel_requests: Arc::new(DashMap::new()),
304 pending_amend_requests: Arc::new(DashMap::new()),
305 pending_mass_cancel_requests: Arc::new(DashMap::new()),
306 active_client_orders: Arc::new(DashMap::new()),
307 emitted_order_accepted: Arc::new(DashMap::new()),
308 client_id_aliases: Arc::new(DashMap::new()),
309 instruments_cache: Arc::new(AHashMap::new()),
310 retry_manager: Arc::new(create_websocket_retry_manager()?),
311 cancellation_token: CancellationToken::new(),
312 })
313 }
314
315 pub fn with_credentials(
322 url: Option<String>,
323 api_key: Option<String>,
324 api_secret: Option<String>,
325 api_passphrase: Option<String>,
326 account_id: Option<AccountId>,
327 heartbeat: Option<u64>,
328 ) -> anyhow::Result<Self> {
329 let url = url.unwrap_or(OKX_WS_PUBLIC_URL.to_string());
330 let api_key = get_or_env_var(api_key, "OKX_API_KEY")?;
331 let api_secret = get_or_env_var(api_secret, "OKX_API_SECRET")?;
332 let api_passphrase = get_or_env_var(api_passphrase, "OKX_API_PASSPHRASE")?;
333
334 Self::new(
335 Some(url),
336 Some(api_key),
337 Some(api_secret),
338 Some(api_passphrase),
339 account_id,
340 heartbeat,
341 )
342 }
343
344 pub fn from_env() -> anyhow::Result<Self> {
351 let url = get_env_var("OKX_WS_URL")?;
352 let api_key = get_env_var("OKX_API_KEY")?;
353 let api_secret = get_env_var("OKX_API_SECRET")?;
354 let api_passphrase = get_env_var("OKX_API_PASSPHRASE")?;
355
356 Self::new(
357 Some(url),
358 Some(api_key),
359 Some(api_secret),
360 Some(api_passphrase),
361 None,
362 None,
363 )
364 }
365
366 pub fn cancel_all_requests(&self) {
368 self.cancellation_token.cancel();
369 }
370
371 pub fn cancellation_token(&self) -> &CancellationToken {
373 &self.cancellation_token
374 }
375
376 pub fn url(&self) -> &str {
378 self.url.as_str()
379 }
380
381 pub fn api_key(&self) -> Option<&str> {
383 self.credential.clone().map(|c| c.api_key.as_str())
384 }
385
386 pub fn is_active(&self) -> bool {
389 match self.inner.try_read() {
391 Ok(guard) => match &*guard {
392 Some(inner) => inner.is_active(),
393 None => false,
394 },
395 Err(_) => false, }
397 }
398
399 pub fn is_closed(&self) -> bool {
401 match self.inner.try_read() {
403 Ok(guard) => match &*guard {
404 Some(inner) => inner.is_closed(),
405 None => true,
406 },
407 Err(_) => true, }
409 }
410
411 pub fn initialize_instruments_cache(&mut self, instruments: Vec<InstrumentAny>) {
413 let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
414 for inst in instruments {
415 instruments_cache.insert(inst.symbol().inner(), inst.clone());
416 }
417
418 self.instruments_cache = Arc::new(instruments_cache);
419 }
420
421 pub fn set_vip_level(&self, vip_level: OKXVipLevel) {
425 self.vip_level.store(vip_level as u8, Ordering::Relaxed);
426 }
427
428 pub fn vip_level(&self) -> OKXVipLevel {
430 let level = self.vip_level.load(Ordering::Relaxed);
431 OKXVipLevel::from(level)
432 }
433
434 pub async fn connect(&mut self) -> anyhow::Result<()> {
444 let (message_handler, reader) = channel_message_handler();
445
446 let inner_for_ping = self.inner.clone();
447 let ping_handler: PingHandler = Arc::new(move |payload: Vec<u8>| {
448 let inner = inner_for_ping.clone();
449
450 get_runtime().spawn(async move {
451 let len = payload.len();
452 let guard = inner.read().await;
453
454 if let Some(client) = guard.as_ref() {
455 if let Err(e) = client.send_pong(payload).await {
456 tracing::warn!(error = %e, "Failed to send pong frame");
457 } else {
458 tracing::trace!("Sent pong frame ({len} bytes)");
459 }
460 } else {
461 tracing::debug!("Ping received with no active websocket client");
462 }
463 });
464 });
465
466 let config = WebSocketConfig {
467 url: self.url.clone(),
468 headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
469 heartbeat: self.heartbeat,
470 heartbeat_msg: Some(TEXT_PING.to_string()),
471 message_handler: Some(message_handler),
472 ping_handler: Some(ping_handler),
473 reconnect_timeout_ms: Some(5_000),
474 reconnect_delay_initial_ms: None, reconnect_delay_max_ms: None, reconnect_backoff_factor: None, reconnect_jitter_ms: None, };
479
480 let keyed_quotas = vec![
482 ("subscription".to_string(), *OKX_WS_SUBSCRIPTION_QUOTA),
483 ("order".to_string(), *OKX_WS_ORDER_QUOTA),
484 ("cancel".to_string(), *OKX_WS_ORDER_QUOTA),
485 ("amend".to_string(), *OKX_WS_ORDER_QUOTA),
486 ];
487
488 let client = WebSocketClient::connect(
489 config,
490 None, keyed_quotas,
492 Some(*OKX_WS_CONNECTION_QUOTA), )
494 .await?;
495
496 {
498 let mut inner_guard = self.inner.write().await;
499 *inner_guard = Some(client);
500 }
501
502 let account_id = self.account_id;
503 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
504
505 self.rx = Some(Arc::new(rx));
506 let signal = self.signal.clone();
507 let pending_place_requests = self.pending_place_requests.clone();
508 let pending_cancel_requests = self.pending_cancel_requests.clone();
509 let pending_amend_requests = self.pending_amend_requests.clone();
510 let pending_mass_cancel_requests = self.pending_mass_cancel_requests.clone();
511 let active_client_orders = self.active_client_orders.clone();
512 let emitted_order_accepted = self.emitted_order_accepted.clone();
513 let auth_tracker = self.auth_tracker.clone();
514
515 let instruments_cache = self.instruments_cache.clone();
516 let inner_client = self.inner.clone();
517 let credential_clone = self.credential.clone();
518 let subscriptions_inst_type = self.subscriptions_inst_type.clone();
519 let subscriptions_inst_family = self.subscriptions_inst_family.clone();
520 let subscriptions_inst_id = self.subscriptions_inst_id.clone();
521 let subscriptions_bare = self.subscriptions_bare.clone();
522 let subscriptions_state = self.subscriptions_state.clone();
523 let client_id_aliases = self.client_id_aliases.clone();
524
525 let stream_handle = get_runtime().spawn({
526 let auth_tracker = auth_tracker.clone();
527 let signal = signal.clone();
528 async move {
529 let mut handler = OKXWsMessageHandler::new(
530 account_id,
531 instruments_cache,
532 reader,
533 signal.clone(),
534 inner_client.clone(),
535 tx,
536 pending_place_requests,
537 pending_cancel_requests,
538 pending_amend_requests,
539 pending_mass_cancel_requests,
540 active_client_orders,
541 client_id_aliases,
542 emitted_order_accepted,
543 auth_tracker.clone(),
544 subscriptions_state.clone(),
545 );
546
547 loop {
549 match handler.next().await {
550 Some(NautilusWsMessage::Reconnected) => {
551 if signal.load(Ordering::Relaxed) {
552 tracing::debug!("Skipping reconnection resubscription due to stop signal");
553 continue;
554 }
555
556 tracing::debug!("Handling WebSocket reconnection");
557
558 let auth_tracker_for_task = auth_tracker.clone();
559 let inner_client_for_task = inner_client.clone();
560 let subscriptions_inst_type_for_task = subscriptions_inst_type.clone();
561 let subscriptions_inst_family_for_task = subscriptions_inst_family.clone();
562 let subscriptions_inst_id_for_task = subscriptions_inst_id.clone();
563 let subscriptions_bare_for_task = subscriptions_bare.clone();
564 let subscriptions_state_for_task = subscriptions_state.clone();
565
566 let auth_wait = if let Some(cred) = &credential_clone {
567 let rx = auth_tracker.begin();
568 let inner_guard = inner_client.read().await;
569
570 if let Some(client) = &*inner_guard {
571 let timestamp = SystemTime::now()
572 .duration_since(SystemTime::UNIX_EPOCH)
573 .expect("System time should be after UNIX epoch")
574 .as_secs()
575 .to_string();
576 let signature =
577 cred.sign(×tamp, "GET", "/users/self/verify", "");
578
579 let auth_message = OKXAuthentication {
580 op: "login",
581 args: vec![OKXAuthenticationArg {
582 api_key: cred.api_key.to_string(),
583 passphrase: cred.api_passphrase.clone(),
584 timestamp,
585 sign: signature,
586 }],
587 };
588
589 if let Err(e) = client
590 .send_text(serde_json::to_string(&auth_message).unwrap(), None)
591 .await
592 {
593 tracing::error!(
594 "Failed to send re-authentication request: {e}",
595 );
596 auth_tracker.fail(e.to_string());
597 } else {
598 tracing::debug!(
599 "Sent re-authentication request, waiting for response before resubscribing",
600 );
601 }
602 } else {
603 auth_tracker
604 .fail("Cannot authenticate: not connected".to_string());
605 }
606
607 drop(inner_guard);
608
609 Some(rx)
610 } else {
611 None
612 };
613
614 get_runtime().spawn(async move {
615 let auth_succeeded = match auth_wait {
616 Some(rx) => match auth_tracker_for_task
617 .wait_for_result(
618 Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS),
619 rx,
620 )
621 .await
622 {
623 Ok(()) => {
624 tracing::debug!(
625 "Authentication successful after reconnect, proceeding with resubscription",
626 );
627 true
628 }
629 Err(e) => {
630 tracing::error!(
631 "Authentication after reconnect failed: {e}",
632 );
633 false
634 }
635 },
636 None => true,
637 };
638
639 let confirmed_topic_count = subscriptions_state_for_task.len();
640 if confirmed_topic_count == 0 {
641 tracing::debug!(
642 "No confirmed subscriptions recorded before reconnect; resubscribe will rely on pending topics"
643 );
644 } else {
645 tracing::debug!(confirmed_topic_count, "Confirmed subscriptions recorded before reconnect");
646 }
647 let confirmed_topics = subscriptions_state_for_task.confirmed();
648 if confirmed_topic_count <= 10 {
649 let topics: Vec<_> = confirmed_topics
650 .iter()
651 .map(|entry| entry.key().clone())
652 .collect();
653 if !topics.is_empty() {
654 tracing::trace!(topics = ?topics, "Confirmed topics before reconnect");
655 }
656 }
657 drop(confirmed_topics);
658
659 let pending_topics = subscriptions_state_for_task.pending();
660 let pending_topic_count = pending_topics.len();
661 if pending_topic_count > 0 {
662 tracing::debug!(pending_topic_count, "Pending subscriptions awaiting replay after reconnect");
663 }
664 drop(pending_topics);
665
666 let inner_guard = inner_client_for_task.read().await;
667 if let Some(client) = &*inner_guard {
668 let should_resubscribe = |channel: &OKXWsChannel| {
669 if channel_requires_auth(channel) && !auth_succeeded {
670 tracing::warn!(
671 ?channel,
672 "Skipping private channel resubscription due to missing authentication",
673 );
674 return false;
675 }
676 true
677 };
678
679 let mut inst_type_args = Vec::new();
680 for entry in subscriptions_inst_type_for_task.iter() {
681 let (channel, inst_types) = entry.pair();
682 if !should_resubscribe(channel) {
683 continue;
684 }
685 for inst_type in inst_types.iter() {
686 let arg = OKXSubscriptionArg {
687 channel: channel.clone(),
688 inst_type: Some(*inst_type),
689 inst_family: None,
690 inst_id: None,
691 };
692 let topic = topic_from_subscription_arg(&arg);
693 subscriptions_state_for_task.mark_subscribe(&topic);
694 inst_type_args.push(arg);
695 }
696 }
697 if !inst_type_args.is_empty() {
698 let sub_request = OKXSubscription {
699 op: OKXWsOperation::Subscribe,
700 args: inst_type_args,
701 };
702 if let Err(e) = client
703 .send_text(
704 serde_json::to_string(&sub_request).unwrap(),
705 None,
706 )
707 .await
708 {
709 tracing::error!(
710 "Failed to re-subscribe inst_type channels: {e}",
711 );
712 }
713 }
714
715 let mut inst_family_args = Vec::new();
716 for entry in subscriptions_inst_family_for_task.iter() {
717 let (channel, inst_families) = entry.pair();
718 if !should_resubscribe(channel) {
719 continue;
720 }
721 for inst_family in inst_families.iter() {
722 let arg = OKXSubscriptionArg {
723 channel: channel.clone(),
724 inst_type: None,
725 inst_family: Some(*inst_family),
726 inst_id: None,
727 };
728 let topic = topic_from_subscription_arg(&arg);
729 subscriptions_state_for_task.mark_subscribe(&topic);
730 inst_family_args.push(arg);
731 }
732 }
733 if !inst_family_args.is_empty() {
734 let sub_request = OKXSubscription {
735 op: OKXWsOperation::Subscribe,
736 args: inst_family_args,
737 };
738 if let Err(e) = client
739 .send_text(
740 serde_json::to_string(&sub_request).unwrap(),
741 None,
742 )
743 .await
744 {
745 tracing::error!(
746 "Failed to re-subscribe inst_family channels: {e}",
747 );
748 }
749 }
750
751 let mut inst_id_args = Vec::new();
752 for entry in subscriptions_inst_id_for_task.iter() {
753 let (channel, inst_ids) = entry.pair();
754 if !should_resubscribe(channel) {
755 continue;
756 }
757 for inst_id in inst_ids.iter() {
758 let arg = OKXSubscriptionArg {
759 channel: channel.clone(),
760 inst_type: None,
761 inst_family: None,
762 inst_id: Some(*inst_id),
763 };
764 let topic = topic_from_subscription_arg(&arg);
765 subscriptions_state_for_task.mark_subscribe(&topic);
766 inst_id_args.push(arg);
767 }
768 }
769 if !inst_id_args.is_empty() {
770 let sub_request = OKXSubscription {
771 op: OKXWsOperation::Subscribe,
772 args: inst_id_args,
773 };
774 if let Err(e) = client
775 .send_text(
776 serde_json::to_string(&sub_request).unwrap(),
777 None,
778 )
779 .await
780 {
781 tracing::error!(
782 "Failed to re-subscribe inst_id channels: {e}",
783 );
784 }
785 }
786
787 let mut bare_args = Vec::new();
788 for entry in subscriptions_bare_for_task.iter() {
789 let channel = entry.key();
790 if !should_resubscribe(channel) {
791 continue;
792 }
793 let arg = OKXSubscriptionArg {
794 channel: channel.clone(),
795 inst_type: None,
796 inst_family: None,
797 inst_id: None,
798 };
799 let topic = topic_from_subscription_arg(&arg);
800 subscriptions_state_for_task.mark_subscribe(&topic);
801 bare_args.push(arg);
802 }
803 if !bare_args.is_empty() {
804 let sub_request = OKXSubscription {
805 op: OKXWsOperation::Subscribe,
806 args: bare_args,
807 };
808 if let Err(e) = client
809 .send_text(
810 serde_json::to_string(&sub_request).unwrap(),
811 None,
812 )
813 .await
814 {
815 tracing::error!(
816 "Failed to re-subscribe bare channels: {e}",
817 );
818 }
819 }
820
821 tracing::debug!("Completed re-subscription after reconnect");
822 } else {
823 tracing::warn!(
824 "Skipping resubscription after reconnect: websocket client unavailable",
825 );
826 }
827 });
828
829 continue;
830 }
831 Some(msg) => {
832 if handler.tx.send(msg).is_err() {
833 tracing::error!(
834 "Failed to send message through channel: receiver dropped",
835 );
836 break;
837 }
838 }
839 None => {
840 if handler.is_stopped() {
841 tracing::debug!(
842 "Stop signal received, ending message processing",
843 );
844 break;
845 }
846 tracing::warn!("WebSocket stream ended unexpectedly");
847 break;
848 }
849 }
850 }
851 }
852 });
853
854 self.task_handle = Some(Arc::new(stream_handle));
855
856 if self.credential.is_some() {
857 self.authenticate().await?;
858 }
859
860 Ok(())
861 }
862
863 async fn authenticate(&self) -> Result<(), Error> {
865 let credential = self.credential.as_ref().ok_or_else(|| {
866 Error::Io(std::io::Error::other(
867 "API credentials not available to authenticate",
868 ))
869 })?;
870
871 let rx = self.auth_tracker.begin();
872
873 let timestamp = SystemTime::now()
874 .duration_since(SystemTime::UNIX_EPOCH)
875 .expect("System time should be after UNIX epoch")
876 .as_secs()
877 .to_string();
878 let signature = credential.sign(×tamp, "GET", "/users/self/verify", "");
879
880 let auth_message = OKXAuthentication {
881 op: "login",
882 args: vec![OKXAuthenticationArg {
883 api_key: credential.api_key.to_string(),
884 passphrase: credential.api_passphrase.clone(),
885 timestamp,
886 sign: signature,
887 }],
888 };
889
890 {
891 let inner_guard = self.inner.read().await;
892 if let Some(inner) = &*inner_guard {
893 if let Err(e) = inner
894 .send_text(serde_json::to_string(&auth_message).unwrap(), None)
895 .await
896 {
897 tracing::error!("Error sending auth message: {e:?}");
898 self.auth_tracker.fail(e.to_string());
899 return Err(Error::Io(std::io::Error::other(e.to_string())));
900 }
901 } else {
902 log::error!("Cannot authenticate: not connected");
903 self.auth_tracker
904 .fail("Cannot authenticate: not connected".to_string());
905 return Err(Error::ConnectionClosed);
906 }
907 }
908
909 match self
910 .auth_tracker
911 .wait_for_result(Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS), rx)
912 .await
913 {
914 Ok(()) => {
915 tracing::info!("Authentication confirmed by client");
916 Ok(())
917 }
918 Err(e) => {
919 tracing::error!("Authentication failed: {e}");
920 Err(Error::Io(std::io::Error::other(e.to_string())))
921 }
922 }
923 }
924
925 pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + 'static {
933 let rx = self
934 .rx
935 .take()
936 .expect("Data stream receiver already taken or not connected");
937 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
938 async_stream::stream! {
939 while let Some(data) = rx.recv().await {
940 yield data;
941 }
942 }
943 }
944
945 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), OKXWsError> {
951 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
952
953 tokio::time::timeout(timeout, async {
954 while !self.is_active() {
955 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
956 }
957 })
958 .await
959 .map_err(|_| {
960 OKXWsError::ClientError(format!(
961 "WebSocket connection timeout after {timeout_secs} seconds"
962 ))
963 })?;
964
965 Ok(())
966 }
967
968 pub async fn close(&mut self) -> Result<(), Error> {
975 log::debug!("Starting close process");
976
977 self.signal.store(true, Ordering::Relaxed);
978
979 {
980 let inner_guard = self.inner.read().await;
981 if let Some(inner) = &*inner_guard {
982 log::debug!("Disconnecting websocket");
983
984 match tokio::time::timeout(Duration::from_secs(3), inner.disconnect()).await {
985 Ok(()) => log::debug!("Websocket disconnected successfully"),
986 Err(_) => {
987 log::warn!(
988 "Timeout waiting for websocket disconnect, continuing with cleanup"
989 );
990 }
991 }
992 } else {
993 log::debug!("No active connection to disconnect");
994 }
995 }
996
997 if let Some(stream_handle) = self.task_handle.take() {
999 match Arc::try_unwrap(stream_handle) {
1000 Ok(handle) => {
1001 log::debug!("Waiting for stream handle to complete");
1002 match tokio::time::timeout(Duration::from_secs(2), handle).await {
1003 Ok(Ok(())) => log::debug!("Stream handle completed successfully"),
1004 Ok(Err(e)) => log::error!("Stream handle encountered an error: {e:?}"),
1005 Err(_) => {
1006 log::warn!(
1007 "Timeout waiting for stream handle, task may still be running"
1008 );
1009 }
1011 }
1012 }
1013 Err(arc_handle) => {
1014 log::debug!(
1015 "Cannot take ownership of stream handle - other references exist, aborting task"
1016 );
1017 arc_handle.abort();
1018 }
1019 }
1020 } else {
1021 log::debug!("No stream handle to await");
1022 }
1023
1024 log::debug!("Close process completed");
1025
1026 Ok(())
1027 }
1028
1029 pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<OKXWsChannel> {
1031 let symbol = instrument_id.symbol.inner();
1032 let mut channels = Vec::new();
1033
1034 for entry in self.subscriptions_inst_id.iter() {
1035 let (channel, instruments) = entry.pair();
1036 if instruments.contains(&symbol) {
1037 channels.push(channel.clone());
1038 }
1039 }
1040
1041 channels
1042 }
1043
1044 fn generate_unique_request_id(&self) -> String {
1045 self.request_id_counter
1046 .fetch_add(1, Ordering::SeqCst)
1047 .to_string()
1048 }
1049
1050 #[allow(
1051 clippy::result_large_err,
1052 reason = "OKXWsError contains large tungstenite::Error variant"
1053 )]
1054 fn get_instrument_type_and_family(
1055 &self,
1056 symbol: Ustr,
1057 ) -> Result<(OKXInstrumentType, String), OKXWsError> {
1058 let instrument = self.instruments_cache.get(&symbol).ok_or_else(|| {
1060 OKXWsError::ClientError(format!("Instrument not found in cache: {symbol}"))
1061 })?;
1062
1063 let inst_type =
1064 okx_instrument_type(instrument).map_err(|e| OKXWsError::ClientError(e.to_string()))?;
1065
1066 let inst_family = match instrument {
1068 InstrumentAny::CurrencyPair(_) => symbol.as_str().to_string(),
1069 InstrumentAny::CryptoPerpetual(_) => {
1070 symbol
1072 .as_str()
1073 .strip_suffix("-SWAP")
1074 .unwrap_or(symbol.as_str())
1075 .to_string()
1076 }
1077 InstrumentAny::CryptoFuture(_) => {
1078 let parts: Vec<&str> = symbol.as_str().split('-').collect();
1080 if parts.len() >= 2 {
1081 format!("{}-{}", parts[0], parts[1])
1082 } else {
1083 return Err(OKXWsError::ClientError(format!(
1084 "Unable to parse futures instrument family from symbol: {symbol}",
1085 )));
1086 }
1087 }
1088 InstrumentAny::CryptoOption(_) => {
1089 let parts: Vec<&str> = symbol.as_str().split('-').collect();
1091 if parts.len() >= 2 {
1092 format!("{}-{}", parts[0], parts[1])
1093 } else {
1094 return Err(OKXWsError::ClientError(format!(
1095 "Unable to parse option instrument family from symbol: {symbol}",
1096 )));
1097 }
1098 }
1099 _ => {
1100 return Err(OKXWsError::ClientError(format!(
1101 "Unsupported instrument type: {instrument:?}",
1102 )));
1103 }
1104 };
1105
1106 Ok((inst_type, inst_family))
1107 }
1108
1109 async fn subscribe(&self, args: Vec<OKXSubscriptionArg>) -> Result<(), OKXWsError> {
1110 for arg in &args {
1111 let topic = topic_from_subscription_arg(arg);
1112 self.subscriptions_state.mark_subscribe(&topic);
1113
1114 if arg.inst_type.is_none() && arg.inst_family.is_none() && arg.inst_id.is_none() {
1116 self.subscriptions_bare.insert(arg.channel.clone(), true);
1118 } else {
1119 if let Some(inst_type) = &arg.inst_type {
1121 self.subscriptions_inst_type
1122 .entry(arg.channel.clone())
1123 .or_default()
1124 .insert(*inst_type);
1125 }
1126
1127 if let Some(inst_family) = &arg.inst_family {
1129 self.subscriptions_inst_family
1130 .entry(arg.channel.clone())
1131 .or_default()
1132 .insert(*inst_family);
1133 }
1134
1135 if let Some(inst_id) = &arg.inst_id {
1137 self.subscriptions_inst_id
1138 .entry(arg.channel.clone())
1139 .or_default()
1140 .insert(*inst_id);
1141 }
1142 }
1143 }
1144
1145 let message = OKXSubscription {
1146 op: OKXWsOperation::Subscribe,
1147 args,
1148 };
1149
1150 let json_txt =
1151 serde_json::to_string(&message).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
1152
1153 {
1154 let inner_guard = self.inner.read().await;
1155 if let Some(inner) = &*inner_guard {
1156 if let Err(e) = inner
1157 .send_text(json_txt, Some(vec!["subscription".to_string()]))
1158 .await
1159 {
1160 tracing::error!("Error sending message: {e:?}");
1161 }
1162 } else {
1163 return Err(OKXWsError::ClientError(
1164 "Cannot send message: not connected".to_string(),
1165 ));
1166 }
1167 }
1168
1169 Ok(())
1170 }
1171
1172 #[allow(clippy::collapsible_if, reason = "Clearer uncollapsed")]
1173 async fn unsubscribe(&self, args: Vec<OKXSubscriptionArg>) -> Result<(), OKXWsError> {
1174 for arg in &args {
1175 let topic = topic_from_subscription_arg(arg);
1176 self.subscriptions_state.mark_unsubscribe(&topic);
1177
1178 if arg.inst_type.is_none() && arg.inst_family.is_none() && arg.inst_id.is_none() {
1180 self.subscriptions_bare.remove(&arg.channel);
1182 } else {
1183 if let Some(inst_type) = &arg.inst_type {
1185 if let Some(mut entry) = self.subscriptions_inst_type.get_mut(&arg.channel) {
1186 entry.remove(inst_type);
1187 if entry.is_empty() {
1188 drop(entry);
1189 self.subscriptions_inst_type.remove(&arg.channel);
1190 }
1191 }
1192 }
1193
1194 if let Some(inst_family) = &arg.inst_family {
1196 if let Some(mut entry) = self.subscriptions_inst_family.get_mut(&arg.channel) {
1197 entry.remove(inst_family);
1198 if entry.is_empty() {
1199 drop(entry);
1200 self.subscriptions_inst_family.remove(&arg.channel);
1201 }
1202 }
1203 }
1204
1205 if let Some(inst_id) = &arg.inst_id {
1207 if let Some(mut entry) = self.subscriptions_inst_id.get_mut(&arg.channel) {
1208 entry.remove(inst_id);
1209 if entry.is_empty() {
1210 drop(entry);
1211 self.subscriptions_inst_id.remove(&arg.channel);
1212 }
1213 }
1214 }
1215 }
1216 }
1217
1218 let message = OKXSubscription {
1219 op: OKXWsOperation::Unsubscribe,
1220 args,
1221 };
1222
1223 let json_txt = serde_json::to_string(&message).expect("Must be valid JSON");
1224
1225 {
1226 let inner_guard = self.inner.read().await;
1227 if let Some(inner) = &*inner_guard {
1228 if let Err(e) = inner
1229 .send_text(json_txt, Some(vec!["subscription".to_string()]))
1230 .await
1231 {
1232 tracing::error!("Error sending message: {e:?}");
1233 }
1234 } else {
1235 log::error!("Cannot send message: not connected");
1236 }
1237 }
1238
1239 Ok(())
1240 }
1241
1242 pub async fn unsubscribe_all(&self) -> Result<(), OKXWsError> {
1251 let mut all_args = Vec::new();
1252
1253 for entry in self.subscriptions_inst_type.iter() {
1254 let (channel, inst_types) = entry.pair();
1255 for inst_type in inst_types.iter() {
1256 all_args.push(OKXSubscriptionArg {
1257 channel: channel.clone(),
1258 inst_type: Some(*inst_type),
1259 inst_family: None,
1260 inst_id: None,
1261 });
1262 }
1263 }
1264
1265 for entry in self.subscriptions_inst_family.iter() {
1266 let (channel, inst_families) = entry.pair();
1267 for inst_family in inst_families.iter() {
1268 all_args.push(OKXSubscriptionArg {
1269 channel: channel.clone(),
1270 inst_type: None,
1271 inst_family: Some(*inst_family),
1272 inst_id: None,
1273 });
1274 }
1275 }
1276
1277 for entry in self.subscriptions_inst_id.iter() {
1278 let (channel, inst_ids) = entry.pair();
1279 for inst_id in inst_ids.iter() {
1280 all_args.push(OKXSubscriptionArg {
1281 channel: channel.clone(),
1282 inst_type: None,
1283 inst_family: None,
1284 inst_id: Some(*inst_id),
1285 });
1286 }
1287 }
1288
1289 for entry in self.subscriptions_bare.iter() {
1290 let channel = entry.key();
1291 all_args.push(OKXSubscriptionArg {
1292 channel: channel.clone(),
1293 inst_type: None,
1294 inst_family: None,
1295 inst_id: None,
1296 });
1297 }
1298
1299 if all_args.is_empty() {
1300 tracing::debug!("No active subscriptions to unsubscribe from");
1301 return Ok(());
1302 }
1303
1304 tracing::debug!("Batched unsubscribe from {} channels", all_args.len());
1305
1306 const BATCH_SIZE: usize = 256;
1307
1308 for chunk in all_args.chunks(BATCH_SIZE) {
1309 self.unsubscribe(chunk.to_vec()).await?;
1310 }
1311
1312 Ok(())
1313 }
1314
1315 #[allow(dead_code)]
1316 async fn resubscribe_all(&self) {
1317 let mut subs_bare = Vec::new();
1319 for entry in self.subscriptions_bare.iter() {
1320 let channel = entry.key();
1321 subs_bare.push(channel.clone());
1322 }
1323
1324 let mut subs_inst_type = Vec::new();
1325 for entry in self.subscriptions_inst_type.iter() {
1326 let (channel, inst_types) = entry.pair();
1327 if !inst_types.is_empty() {
1328 subs_inst_type.push((channel.clone(), inst_types.clone()));
1329 }
1330 }
1331
1332 let mut subs_inst_family = Vec::new();
1333 for entry in self.subscriptions_inst_family.iter() {
1334 let (channel, inst_families) = entry.pair();
1335 if !inst_families.is_empty() {
1336 subs_inst_family.push((channel.clone(), inst_families.clone()));
1337 }
1338 }
1339
1340 let mut subs_inst_id = Vec::new();
1341 for entry in self.subscriptions_inst_id.iter() {
1342 let (channel, inst_ids) = entry.pair();
1343 if !inst_ids.is_empty() {
1344 subs_inst_id.push((channel.clone(), inst_ids.clone()));
1345 }
1346 }
1347
1348 for (channel, inst_types) in subs_inst_type {
1350 if inst_types.is_empty() {
1351 continue;
1352 }
1353
1354 tracing::debug!("Resubscribing: channel={channel}, instrument_types={inst_types:?}");
1355
1356 for inst_type in inst_types {
1357 let arg = OKXSubscriptionArg {
1358 channel: channel.clone(),
1359 inst_type: Some(inst_type),
1360 inst_family: None,
1361 inst_id: None,
1362 };
1363
1364 if let Err(e) = self.subscribe(vec![arg]).await {
1365 tracing::error!(
1366 "Failed to resubscribe to channel {channel} with instrument type: {e}"
1367 );
1368 }
1369 }
1370 }
1371
1372 for (channel, inst_families) in subs_inst_family {
1374 if inst_families.is_empty() {
1375 continue;
1376 }
1377
1378 tracing::debug!(
1379 "Resubscribing: channel={channel}, instrument_families={inst_families:?}"
1380 );
1381
1382 for inst_family in inst_families {
1383 let arg = OKXSubscriptionArg {
1384 channel: channel.clone(),
1385 inst_type: None,
1386 inst_family: Some(inst_family),
1387 inst_id: None,
1388 };
1389
1390 if let Err(e) = self.subscribe(vec![arg]).await {
1391 tracing::error!(
1392 "Failed to resubscribe to channel {channel} with instrument family: {e}"
1393 );
1394 }
1395 }
1396 }
1397
1398 for (channel, inst_ids) in subs_inst_id {
1400 if inst_ids.is_empty() {
1401 continue;
1402 }
1403
1404 tracing::debug!("Resubscribing: channel={channel}, instrument_ids={inst_ids:?}");
1405
1406 for inst_id in inst_ids {
1407 let arg = OKXSubscriptionArg {
1408 channel: channel.clone(),
1409 inst_type: None,
1410 inst_family: None,
1411 inst_id: Some(inst_id),
1412 };
1413
1414 if let Err(e) = self.subscribe(vec![arg]).await {
1415 tracing::error!(
1416 "Failed to resubscribe to channel {channel} with instrument ID: {e}"
1417 );
1418 }
1419 }
1420 }
1421
1422 for channel in subs_bare {
1424 tracing::debug!("Resubscribing to bare channel: {channel}");
1425
1426 let arg = OKXSubscriptionArg {
1427 channel,
1428 inst_type: None,
1429 inst_family: None,
1430 inst_id: None,
1431 };
1432
1433 if let Err(e) = self.subscribe(vec![arg]).await {
1434 tracing::error!("Failed to resubscribe to bare channel: {e}");
1435 }
1436 }
1437 }
1438
1439 pub async fn subscribe_instruments(
1451 &self,
1452 instrument_type: OKXInstrumentType,
1453 ) -> Result<(), OKXWsError> {
1454 let arg = OKXSubscriptionArg {
1455 channel: OKXWsChannel::Instruments,
1456 inst_type: Some(instrument_type),
1457 inst_family: None,
1458 inst_id: None,
1459 };
1460 self.subscribe(vec![arg]).await
1461 }
1462
1463 pub async fn subscribe_instrument(
1475 &self,
1476 instrument_id: InstrumentId,
1477 ) -> Result<(), OKXWsError> {
1478 let arg = OKXSubscriptionArg {
1479 channel: OKXWsChannel::Instruments,
1480 inst_type: None,
1481 inst_family: None,
1482 inst_id: Some(instrument_id.symbol.inner()),
1483 };
1484 self.subscribe(vec![arg]).await
1485 }
1486
1487 pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1496 self.subscribe_book_with_depth(instrument_id, 0).await
1497 }
1498
1499 pub(crate) async fn subscribe_books_channel(
1501 &self,
1502 instrument_id: InstrumentId,
1503 ) -> Result<(), OKXWsError> {
1504 let arg = OKXSubscriptionArg {
1505 channel: OKXWsChannel::Books,
1506 inst_type: None,
1507 inst_family: None,
1508 inst_id: Some(instrument_id.symbol.inner()),
1509 };
1510 self.subscribe(vec![arg]).await
1511 }
1512
1513 pub async fn subscribe_book_depth5(
1525 &self,
1526 instrument_id: InstrumentId,
1527 ) -> Result<(), OKXWsError> {
1528 let arg = OKXSubscriptionArg {
1529 channel: OKXWsChannel::Books5,
1530 inst_type: None,
1531 inst_family: None,
1532 inst_id: Some(instrument_id.symbol.inner()),
1533 };
1534 self.subscribe(vec![arg]).await
1535 }
1536
1537 pub async fn subscribe_book50_l2_tbt(
1549 &self,
1550 instrument_id: InstrumentId,
1551 ) -> Result<(), OKXWsError> {
1552 let arg = OKXSubscriptionArg {
1553 channel: OKXWsChannel::Books50Tbt,
1554 inst_type: None,
1555 inst_family: None,
1556 inst_id: Some(instrument_id.symbol.inner()),
1557 };
1558 self.subscribe(vec![arg]).await
1559 }
1560
1561 pub async fn subscribe_book_l2_tbt(
1573 &self,
1574 instrument_id: InstrumentId,
1575 ) -> Result<(), OKXWsError> {
1576 let arg = OKXSubscriptionArg {
1577 channel: OKXWsChannel::BooksTbt,
1578 inst_type: None,
1579 inst_family: None,
1580 inst_id: Some(instrument_id.symbol.inner()),
1581 };
1582 self.subscribe(vec![arg]).await
1583 }
1584
1585 pub async fn subscribe_book_with_depth(
1599 &self,
1600 instrument_id: InstrumentId,
1601 depth: u16,
1602 ) -> anyhow::Result<()> {
1603 let vip = self.vip_level();
1604
1605 match depth {
1606 50 => {
1607 if vip < OKXVipLevel::Vip4 {
1608 anyhow::bail!(
1609 "VIP level {vip} insufficient for 50 depth subscription (requires VIP4)"
1610 );
1611 }
1612 self.subscribe_book50_l2_tbt(instrument_id)
1613 .await
1614 .map_err(|e| anyhow::anyhow!(e))
1615 }
1616 0 | 400 => {
1617 if vip >= OKXVipLevel::Vip5 {
1618 self.subscribe_book_l2_tbt(instrument_id)
1619 .await
1620 .map_err(|e| anyhow::anyhow!(e))
1621 } else {
1622 self.subscribe_books_channel(instrument_id)
1623 .await
1624 .map_err(|e| anyhow::anyhow!(e))
1625 }
1626 }
1627 _ => anyhow::bail!("Invalid depth {depth}, must be 0, 50, or 400"),
1628 }
1629 }
1630
1631 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), OKXWsError> {
1644 let arg = OKXSubscriptionArg {
1645 channel: OKXWsChannel::BboTbt,
1646 inst_type: None,
1647 inst_family: None,
1648 inst_id: Some(instrument_id.symbol.inner()),
1649 };
1650 self.subscribe(vec![arg]).await
1651 }
1652
1653 pub async fn subscribe_trades(
1667 &self,
1668 instrument_id: InstrumentId,
1669 aggregated: bool,
1670 ) -> Result<(), OKXWsError> {
1671 let channel = if aggregated {
1672 OKXWsChannel::TradesAll
1673 } else {
1674 OKXWsChannel::Trades
1675 };
1676
1677 let arg = OKXSubscriptionArg {
1678 channel,
1679 inst_type: None,
1680 inst_family: None,
1681 inst_id: Some(instrument_id.symbol.inner()),
1682 };
1683 self.subscribe(vec![arg]).await
1684 }
1685
1686 pub async fn subscribe_ticker(&self, instrument_id: InstrumentId) -> Result<(), OKXWsError> {
1698 let arg = OKXSubscriptionArg {
1699 channel: OKXWsChannel::Tickers,
1700 inst_type: None,
1701 inst_family: None,
1702 inst_id: Some(instrument_id.symbol.inner()),
1703 };
1704 self.subscribe(vec![arg]).await
1705 }
1706
1707 pub async fn subscribe_mark_prices(
1719 &self,
1720 instrument_id: InstrumentId,
1721 ) -> Result<(), OKXWsError> {
1722 let arg = OKXSubscriptionArg {
1723 channel: OKXWsChannel::MarkPrice,
1724 inst_type: None,
1725 inst_family: None,
1726 inst_id: Some(instrument_id.symbol.inner()),
1727 };
1728 self.subscribe(vec![arg]).await
1729 }
1730
1731 pub async fn subscribe_index_prices(
1743 &self,
1744 instrument_id: InstrumentId,
1745 ) -> Result<(), OKXWsError> {
1746 let arg = OKXSubscriptionArg {
1747 channel: OKXWsChannel::IndexTickers,
1748 inst_type: None,
1749 inst_family: None,
1750 inst_id: Some(instrument_id.symbol.inner()),
1751 };
1752 self.subscribe(vec![arg]).await
1753 }
1754
1755 pub async fn subscribe_funding_rates(
1767 &self,
1768 instrument_id: InstrumentId,
1769 ) -> Result<(), OKXWsError> {
1770 let arg = OKXSubscriptionArg {
1771 channel: OKXWsChannel::FundingRate,
1772 inst_type: None,
1773 inst_family: None,
1774 inst_id: Some(instrument_id.symbol.inner()),
1775 };
1776 self.subscribe(vec![arg]).await
1777 }
1778
1779 pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), OKXWsError> {
1791 let channel = bar_spec_as_okx_channel(bar_type.spec())
1793 .map_err(|e| OKXWsError::ClientError(e.to_string()))?;
1794
1795 let arg = OKXSubscriptionArg {
1796 channel,
1797 inst_type: None,
1798 inst_family: None,
1799 inst_id: Some(bar_type.instrument_id().symbol.inner()),
1800 };
1801 self.subscribe(vec![arg]).await
1802 }
1803
1804 pub async fn unsubscribe_instruments(
1810 &self,
1811 instrument_type: OKXInstrumentType,
1812 ) -> Result<(), OKXWsError> {
1813 let arg = OKXSubscriptionArg {
1814 channel: OKXWsChannel::Instruments,
1815 inst_type: Some(instrument_type),
1816 inst_family: None,
1817 inst_id: None,
1818 };
1819 self.unsubscribe(vec![arg]).await
1820 }
1821
1822 pub async fn unsubscribe_instrument(
1828 &self,
1829 instrument_id: InstrumentId,
1830 ) -> Result<(), OKXWsError> {
1831 let arg = OKXSubscriptionArg {
1832 channel: OKXWsChannel::Instruments,
1833 inst_type: None,
1834 inst_family: None,
1835 inst_id: Some(instrument_id.symbol.inner()),
1836 };
1837 self.unsubscribe(vec![arg]).await
1838 }
1839
1840 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), OKXWsError> {
1846 let arg = OKXSubscriptionArg {
1847 channel: OKXWsChannel::Books,
1848 inst_type: None,
1849 inst_family: None,
1850 inst_id: Some(instrument_id.symbol.inner()),
1851 };
1852 self.unsubscribe(vec![arg]).await
1853 }
1854
1855 pub async fn unsubscribe_book_depth5(
1861 &self,
1862 instrument_id: InstrumentId,
1863 ) -> Result<(), OKXWsError> {
1864 let arg = OKXSubscriptionArg {
1865 channel: OKXWsChannel::Books5,
1866 inst_type: None,
1867 inst_family: None,
1868 inst_id: Some(instrument_id.symbol.inner()),
1869 };
1870 self.unsubscribe(vec![arg]).await
1871 }
1872
1873 pub async fn unsubscribe_book50_l2_tbt(
1879 &self,
1880 instrument_id: InstrumentId,
1881 ) -> Result<(), OKXWsError> {
1882 let arg = OKXSubscriptionArg {
1883 channel: OKXWsChannel::Books50Tbt,
1884 inst_type: None,
1885 inst_family: None,
1886 inst_id: Some(instrument_id.symbol.inner()),
1887 };
1888 self.unsubscribe(vec![arg]).await
1889 }
1890
1891 pub async fn unsubscribe_book_l2_tbt(
1897 &self,
1898 instrument_id: InstrumentId,
1899 ) -> Result<(), OKXWsError> {
1900 let arg = OKXSubscriptionArg {
1901 channel: OKXWsChannel::BooksTbt,
1902 inst_type: None,
1903 inst_family: None,
1904 inst_id: Some(instrument_id.symbol.inner()),
1905 };
1906 self.unsubscribe(vec![arg]).await
1907 }
1908
1909 pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), OKXWsError> {
1915 let arg = OKXSubscriptionArg {
1916 channel: OKXWsChannel::BboTbt,
1917 inst_type: None,
1918 inst_family: None,
1919 inst_id: Some(instrument_id.symbol.inner()),
1920 };
1921 self.unsubscribe(vec![arg]).await
1922 }
1923
1924 pub async fn unsubscribe_ticker(&self, instrument_id: InstrumentId) -> Result<(), OKXWsError> {
1930 let arg = OKXSubscriptionArg {
1931 channel: OKXWsChannel::Tickers,
1932 inst_type: None,
1933 inst_family: None,
1934 inst_id: Some(instrument_id.symbol.inner()),
1935 };
1936 self.unsubscribe(vec![arg]).await
1937 }
1938
1939 pub async fn unsubscribe_mark_prices(
1945 &self,
1946 instrument_id: InstrumentId,
1947 ) -> Result<(), OKXWsError> {
1948 let arg = OKXSubscriptionArg {
1949 channel: OKXWsChannel::MarkPrice,
1950 inst_type: None,
1951 inst_family: None,
1952 inst_id: Some(instrument_id.symbol.inner()),
1953 };
1954 self.unsubscribe(vec![arg]).await
1955 }
1956
1957 pub async fn unsubscribe_index_prices(
1963 &self,
1964 instrument_id: InstrumentId,
1965 ) -> Result<(), OKXWsError> {
1966 let arg = OKXSubscriptionArg {
1967 channel: OKXWsChannel::IndexTickers,
1968 inst_type: None,
1969 inst_family: None,
1970 inst_id: Some(instrument_id.symbol.inner()),
1971 };
1972 self.unsubscribe(vec![arg]).await
1973 }
1974
1975 pub async fn unsubscribe_funding_rates(
1981 &self,
1982 instrument_id: InstrumentId,
1983 ) -> Result<(), OKXWsError> {
1984 let arg = OKXSubscriptionArg {
1985 channel: OKXWsChannel::FundingRate,
1986 inst_type: None,
1987 inst_family: None,
1988 inst_id: Some(instrument_id.symbol.inner()),
1989 };
1990 self.unsubscribe(vec![arg]).await
1991 }
1992
1993 pub async fn unsubscribe_trades(
1999 &self,
2000 instrument_id: InstrumentId,
2001 aggregated: bool,
2002 ) -> Result<(), OKXWsError> {
2003 let channel = if aggregated {
2004 OKXWsChannel::TradesAll
2005 } else {
2006 OKXWsChannel::Trades
2007 };
2008
2009 let arg = OKXSubscriptionArg {
2010 channel,
2011 inst_type: None,
2012 inst_family: None,
2013 inst_id: Some(instrument_id.symbol.inner()),
2014 };
2015 self.unsubscribe(vec![arg]).await
2016 }
2017
2018 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), OKXWsError> {
2024 let channel = bar_spec_as_okx_channel(bar_type.spec())
2026 .map_err(|e| OKXWsError::ClientError(e.to_string()))?;
2027
2028 let arg = OKXSubscriptionArg {
2029 channel,
2030 inst_type: None,
2031 inst_family: None,
2032 inst_id: Some(bar_type.instrument_id().symbol.inner()),
2033 };
2034 self.unsubscribe(vec![arg]).await
2035 }
2036
2037 pub async fn subscribe_orders(
2043 &self,
2044 instrument_type: OKXInstrumentType,
2045 ) -> Result<(), OKXWsError> {
2046 let arg = OKXSubscriptionArg {
2047 channel: OKXWsChannel::Orders,
2048 inst_type: Some(instrument_type),
2049 inst_family: None,
2050 inst_id: None,
2051 };
2052 self.subscribe(vec![arg]).await
2053 }
2054
2055 pub async fn unsubscribe_orders(
2061 &self,
2062 instrument_type: OKXInstrumentType,
2063 ) -> Result<(), OKXWsError> {
2064 let arg = OKXSubscriptionArg {
2065 channel: OKXWsChannel::Orders,
2066 inst_type: Some(instrument_type),
2067 inst_family: None,
2068 inst_id: None,
2069 };
2070 self.unsubscribe(vec![arg]).await
2071 }
2072
2073 pub async fn subscribe_orders_algo(
2079 &self,
2080 instrument_type: OKXInstrumentType,
2081 ) -> Result<(), OKXWsError> {
2082 let arg = OKXSubscriptionArg {
2083 channel: OKXWsChannel::OrdersAlgo,
2084 inst_type: Some(instrument_type),
2085 inst_family: None,
2086 inst_id: None,
2087 };
2088 self.subscribe(vec![arg]).await
2089 }
2090
2091 pub async fn unsubscribe_orders_algo(
2097 &self,
2098 instrument_type: OKXInstrumentType,
2099 ) -> Result<(), OKXWsError> {
2100 let arg = OKXSubscriptionArg {
2101 channel: OKXWsChannel::OrdersAlgo,
2102 inst_type: Some(instrument_type),
2103 inst_family: None,
2104 inst_id: None,
2105 };
2106 self.unsubscribe(vec![arg]).await
2107 }
2108
2109 pub async fn subscribe_fills(
2115 &self,
2116 instrument_type: OKXInstrumentType,
2117 ) -> Result<(), OKXWsError> {
2118 let arg = OKXSubscriptionArg {
2119 channel: OKXWsChannel::Fills,
2120 inst_type: Some(instrument_type),
2121 inst_family: None,
2122 inst_id: None,
2123 };
2124 self.subscribe(vec![arg]).await
2125 }
2126
2127 pub async fn unsubscribe_fills(
2133 &self,
2134 instrument_type: OKXInstrumentType,
2135 ) -> Result<(), OKXWsError> {
2136 let arg = OKXSubscriptionArg {
2137 channel: OKXWsChannel::Fills,
2138 inst_type: Some(instrument_type),
2139 inst_family: None,
2140 inst_id: None,
2141 };
2142 self.unsubscribe(vec![arg]).await
2143 }
2144
2145 pub async fn subscribe_account(&self) -> Result<(), OKXWsError> {
2151 let arg = OKXSubscriptionArg {
2152 channel: OKXWsChannel::Account,
2153 inst_type: None,
2154 inst_family: None,
2155 inst_id: None,
2156 };
2157 self.subscribe(vec![arg]).await
2158 }
2159
2160 pub async fn unsubscribe_account(&self) -> Result<(), OKXWsError> {
2166 let arg = OKXSubscriptionArg {
2167 channel: OKXWsChannel::Account,
2168 inst_type: None,
2169 inst_family: None,
2170 inst_id: None,
2171 };
2172 self.unsubscribe(vec![arg]).await
2173 }
2174
2175 async fn ws_cancel_order(
2181 &self,
2182 params: WsCancelOrderParams,
2183 request_id: Option<String>,
2184 ) -> Result<(), OKXWsError> {
2185 let request_id = request_id.unwrap_or(self.generate_unique_request_id());
2186
2187 let req = OKXWsRequest {
2188 id: Some(request_id),
2189 op: OKXWsOperation::CancelOrder,
2190 args: vec![params],
2191 exp_time: None,
2192 };
2193
2194 let txt = serde_json::to_string(&req).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2195
2196 {
2197 let inner_guard = self.inner.read().await;
2198 if let Some(inner) = &*inner_guard {
2199 if let Err(e) = inner.send_text(txt, Some(vec!["cancel".to_string()])).await {
2200 tracing::error!("Error sending message: {e:?}");
2201 }
2202 Ok(())
2203 } else {
2204 Err(OKXWsError::ClientError("Not connected".to_string()))
2205 }
2206 }
2207 }
2208
2209 async fn ws_mass_cancel_with_id(
2215 &self,
2216 args: Vec<Value>,
2217 request_id: String,
2218 ) -> Result<(), OKXWsError> {
2219 let req = OKXWsRequest {
2220 id: Some(request_id),
2221 op: OKXWsOperation::MassCancel,
2222 args,
2223 exp_time: None,
2224 };
2225
2226 let txt = serde_json::to_string(&req).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2227
2228 {
2229 let inner_guard = self.inner.read().await;
2230 if let Some(inner) = &*inner_guard {
2231 if let Err(e) = inner.send_text(txt, Some(vec!["cancel".to_string()])).await {
2232 tracing::error!("Error sending message: {e:?}");
2233 }
2234 Ok(())
2235 } else {
2236 Err(OKXWsError::ClientError("Not connected".to_string()))
2237 }
2238 }
2239 }
2240
2241 async fn ws_amend_order(
2247 &self,
2248 params: WsAmendOrderParams,
2249 request_id: Option<String>,
2250 ) -> Result<(), OKXWsError> {
2251 let request_id = request_id.unwrap_or(self.generate_unique_request_id());
2252
2253 let req = OKXWsRequest {
2254 id: Some(request_id),
2255 op: OKXWsOperation::AmendOrder,
2256 args: vec![params],
2257 exp_time: None,
2258 };
2259
2260 let txt = serde_json::to_string(&req).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2261
2262 {
2263 let inner_guard = self.inner.read().await;
2264 if let Some(inner) = &*inner_guard {
2265 if let Err(e) = inner.send_text(txt, Some(vec!["amend".to_string()])).await {
2266 tracing::error!("Error sending message: {e:?}");
2267 }
2268 Ok(())
2269 } else {
2270 Err(OKXWsError::ClientError("Not connected".to_string()))
2271 }
2272 }
2273 }
2274
2275 async fn ws_batch_place_orders(&self, args: Vec<Value>) -> Result<(), OKXWsError> {
2281 let request_id = self.generate_unique_request_id();
2282
2283 let req = OKXWsRequest {
2284 id: Some(request_id),
2285 op: OKXWsOperation::BatchOrders,
2286 args,
2287 exp_time: None,
2288 };
2289
2290 let txt = serde_json::to_string(&req).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2291
2292 {
2293 let inner_guard = self.inner.read().await;
2294 if let Some(inner) = &*inner_guard {
2295 if let Err(e) = inner.send_text(txt, Some(vec!["order".to_string()])).await {
2296 tracing::error!("Error sending message: {e:?}");
2297 }
2298 Ok(())
2299 } else {
2300 Err(OKXWsError::ClientError("Not connected".to_string()))
2301 }
2302 }
2303 }
2304
2305 async fn ws_batch_cancel_orders(&self, args: Vec<Value>) -> Result<(), OKXWsError> {
2311 let request_id = self.generate_unique_request_id();
2312
2313 let req = OKXWsRequest {
2314 id: Some(request_id),
2315 op: OKXWsOperation::BatchCancelOrders,
2316 args,
2317 exp_time: None,
2318 };
2319
2320 let txt = serde_json::to_string(&req).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2321
2322 {
2323 let inner_guard = self.inner.read().await;
2324 if let Some(inner) = &*inner_guard {
2325 if let Err(e) = inner.send_text(txt, Some(vec!["cancel".to_string()])).await {
2326 tracing::error!("Error sending message: {e:?}");
2327 }
2328 Ok(())
2329 } else {
2330 Err(OKXWsError::ClientError("Not connected".to_string()))
2331 }
2332 }
2333 }
2334
2335 async fn ws_batch_amend_orders(&self, args: Vec<Value>) -> Result<(), OKXWsError> {
2341 let request_id = self.generate_unique_request_id();
2342
2343 let req = OKXWsRequest {
2344 id: Some(request_id),
2345 op: OKXWsOperation::BatchAmendOrders,
2346 args,
2347 exp_time: None,
2348 };
2349
2350 let txt = serde_json::to_string(&req).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2351
2352 {
2353 let inner_guard = self.inner.read().await;
2354 if let Some(inner) = &*inner_guard {
2355 if let Err(e) = inner.send_text(txt, Some(vec!["amend".to_string()])).await {
2356 tracing::error!("Error sending message: {e:?}");
2357 }
2358 Ok(())
2359 } else {
2360 Err(OKXWsError::ClientError("Not connected".to_string()))
2361 }
2362 }
2363 }
2364
2365 #[allow(clippy::too_many_arguments)]
2377 pub async fn submit_order(
2378 &self,
2379 trader_id: TraderId,
2380 strategy_id: StrategyId,
2381 instrument_id: InstrumentId,
2382 td_mode: OKXTradeMode,
2383 client_order_id: ClientOrderId,
2384 order_side: OrderSide,
2385 order_type: OrderType,
2386 quantity: Quantity,
2387 time_in_force: Option<TimeInForce>,
2388 price: Option<Price>,
2389 trigger_price: Option<Price>,
2390 post_only: Option<bool>,
2391 reduce_only: Option<bool>,
2392 quote_quantity: Option<bool>,
2393 position_side: Option<PositionSide>,
2394 ) -> Result<(), OKXWsError> {
2395 if !OKX_SUPPORTED_ORDER_TYPES.contains(&order_type) {
2396 return Err(OKXWsError::ClientError(format!(
2397 "Unsupported order type: {order_type:?}",
2398 )));
2399 }
2400
2401 if let Some(tif) = time_in_force
2402 && !OKX_SUPPORTED_TIME_IN_FORCE.contains(&tif)
2403 {
2404 return Err(OKXWsError::ClientError(format!(
2405 "Unsupported time in force: {tif:?}",
2406 )));
2407 }
2408
2409 let mut builder = WsPostOrderParamsBuilder::default();
2410
2411 builder.inst_id(instrument_id.symbol.as_str());
2412 builder.td_mode(td_mode);
2413 builder.cl_ord_id(client_order_id.as_str());
2414
2415 let instrument = self
2416 .instruments_cache
2417 .get(&instrument_id.symbol.inner())
2418 .ok_or_else(|| {
2419 OKXWsError::ClientError(format!("Unknown instrument {instrument_id}"))
2420 })?;
2421
2422 let instrument_type =
2423 okx_instrument_type(instrument).map_err(|e| OKXWsError::ClientError(e.to_string()))?;
2424 let quote_currency = instrument.quote_currency();
2425
2426 match instrument_type {
2427 OKXInstrumentType::Spot => {
2428 builder.ccy(quote_currency.to_string());
2430 }
2431 OKXInstrumentType::Margin => {
2432 builder.ccy(quote_currency.to_string());
2433
2434 if let Some(ro) = reduce_only
2435 && ro
2436 {
2437 builder.reduce_only(ro);
2438 }
2439 }
2440 OKXInstrumentType::Swap | OKXInstrumentType::Futures => {
2441 builder.ccy(quote_currency.to_string());
2443
2444 if position_side.is_none() {
2447 builder.pos_side(OKXPositionSide::Net);
2448 }
2449 }
2450 _ => {
2451 builder.ccy(quote_currency.to_string());
2452
2453 if position_side.is_none() {
2455 builder.pos_side(OKXPositionSide::Net);
2456 }
2457
2458 if let Some(ro) = reduce_only
2459 && ro
2460 {
2461 builder.reduce_only(ro);
2462 }
2463 }
2464 };
2465
2466 if instrument_type == OKXInstrumentType::Spot
2473 && order_type == OrderType::Market
2474 && td_mode == OKXTradeMode::Cash
2475 {
2476 match quote_quantity {
2477 Some(true) => {
2478 builder.tgt_ccy(OKXTargetCurrency::QuoteCcy);
2480 }
2481 Some(false) => {
2482 if order_side == OrderSide::Buy {
2483 builder.tgt_ccy(OKXTargetCurrency::BaseCcy);
2485 }
2486 }
2488 None => {
2489 }
2491 }
2492 }
2493
2494 builder.side(order_side);
2495
2496 if let Some(pos_side) = position_side {
2497 builder.pos_side(pos_side);
2498 };
2499
2500 let (okx_ord_type, price) = if post_only.unwrap_or(false) {
2501 (OKXOrderType::PostOnly, price)
2502 } else {
2503 (OKXOrderType::from(order_type), price)
2504 };
2505
2506 log::debug!(
2507 "Order type mapping: order_type={:?}, time_in_force={:?}, post_only={:?} -> okx_ord_type={:?}",
2508 order_type,
2509 time_in_force,
2510 post_only,
2511 okx_ord_type
2512 );
2513
2514 builder.ord_type(okx_ord_type);
2515 builder.sz(quantity.to_string());
2516
2517 if let Some(tp) = trigger_price {
2518 builder.px(tp.to_string());
2519 } else if let Some(p) = price {
2520 builder.px(p.to_string());
2521 }
2522
2523 builder.tag(OKX_NAUTILUS_BROKER_ID);
2524
2525 let params = builder
2526 .build()
2527 .map_err(|e| OKXWsError::ClientError(format!("Build order params error: {e}")))?;
2528
2529 let request_id = self.generate_unique_request_id();
2530
2531 self.pending_place_requests.insert(
2532 request_id.clone(),
2533 (
2534 PendingOrderParams::Regular(params.clone()),
2535 client_order_id,
2536 trader_id,
2537 strategy_id,
2538 instrument_id,
2539 ),
2540 );
2541
2542 self.active_client_orders
2543 .insert(client_order_id, (trader_id, strategy_id, instrument_id));
2544
2545 self.retry_manager
2546 .execute_with_retry_with_cancel(
2547 "submit_order",
2548 || {
2549 let params = params.clone();
2550 let request_id = request_id.clone();
2551 async move { self.ws_place_order(params, Some(request_id)).await }
2552 },
2553 should_retry_okx_error,
2554 create_okx_timeout_error,
2555 &self.cancellation_token,
2556 )
2557 .await
2558 }
2559
2560 #[allow(clippy::too_many_arguments)]
2571 pub async fn cancel_order(
2572 &self,
2573 trader_id: TraderId,
2574 strategy_id: StrategyId,
2575 instrument_id: InstrumentId,
2576 client_order_id: Option<ClientOrderId>,
2577 venue_order_id: Option<VenueOrderId>,
2578 ) -> Result<(), OKXWsError> {
2579 let mut builder = WsCancelOrderParamsBuilder::default();
2580 builder.inst_id(instrument_id.symbol.as_str());
2583
2584 if let Some(venue_order_id) = venue_order_id {
2585 builder.ord_id(venue_order_id.as_str());
2586 }
2587
2588 if let Some(client_order_id) = client_order_id {
2590 builder.cl_ord_id(client_order_id.as_str());
2591 }
2592
2593 let params = builder
2594 .build()
2595 .map_err(|e| OKXWsError::ClientError(format!("Build cancel params error: {e}")))?;
2596
2597 let request_id = self.generate_unique_request_id();
2598
2599 if let Some(client_order_id) = client_order_id {
2602 self.pending_cancel_requests.insert(
2603 request_id.clone(),
2604 (
2605 client_order_id,
2606 trader_id,
2607 strategy_id,
2608 instrument_id,
2609 venue_order_id,
2610 ),
2611 );
2612 }
2613
2614 self.retry_manager
2615 .execute_with_retry_with_cancel(
2616 "cancel_order",
2617 || {
2618 let params = params.clone();
2619 let request_id = request_id.clone();
2620 async move { self.ws_cancel_order(params, Some(request_id)).await }
2621 },
2622 should_retry_okx_error,
2623 create_okx_timeout_error,
2624 &self.cancellation_token,
2625 )
2626 .await
2627 }
2628
2629 async fn ws_place_order(
2635 &self,
2636 params: WsPostOrderParams,
2637 request_id: Option<String>,
2638 ) -> Result<(), OKXWsError> {
2639 let request_id = request_id.unwrap_or(self.generate_unique_request_id());
2640
2641 let req = OKXWsRequest {
2642 id: Some(request_id),
2643 op: OKXWsOperation::Order,
2644 exp_time: None,
2645 args: vec![params],
2646 };
2647
2648 let txt = serde_json::to_string(&req).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2649
2650 {
2651 let inner_guard = self.inner.read().await;
2652 if let Some(inner) = &*inner_guard {
2653 if let Err(e) = inner.send_text(txt, Some(vec!["order".to_string()])).await {
2654 tracing::error!("Error sending message: {e:?}");
2655 }
2656 Ok(())
2657 } else {
2658 Err(OKXWsError::ClientError("Not connected".to_string()))
2659 }
2660 }
2661 }
2662
2663 #[allow(clippy::too_many_arguments)]
2674 pub async fn modify_order(
2675 &self,
2676 trader_id: TraderId,
2677 strategy_id: StrategyId,
2678 instrument_id: InstrumentId,
2679 client_order_id: Option<ClientOrderId>,
2680 price: Option<Price>,
2681 quantity: Option<Quantity>,
2682 venue_order_id: Option<VenueOrderId>,
2683 ) -> Result<(), OKXWsError> {
2684 let mut builder = WsAmendOrderParamsBuilder::default();
2685
2686 builder.inst_id(instrument_id.symbol.as_str());
2687
2688 if let Some(venue_order_id) = venue_order_id {
2689 builder.ord_id(venue_order_id.as_str());
2690 }
2691
2692 if let Some(client_order_id) = client_order_id {
2693 builder.cl_ord_id(client_order_id.as_str());
2694 }
2695
2696 if let Some(price) = price {
2697 builder.new_px(price.to_string());
2698 }
2699
2700 if let Some(quantity) = quantity {
2701 builder.new_sz(quantity.to_string());
2702 }
2703
2704 let params = builder
2705 .build()
2706 .map_err(|e| OKXWsError::ClientError(format!("Build amend params error: {e}")))?;
2707
2708 let request_id = self
2710 .request_id_counter
2711 .fetch_add(1, Ordering::SeqCst)
2712 .to_string();
2713
2714 if let Some(client_order_id) = client_order_id {
2717 self.pending_amend_requests.insert(
2718 request_id.clone(),
2719 (
2720 client_order_id,
2721 trader_id,
2722 strategy_id,
2723 instrument_id,
2724 venue_order_id,
2725 ),
2726 );
2727 }
2728
2729 self.retry_manager
2730 .execute_with_retry_with_cancel(
2731 "modify_order",
2732 || {
2733 let params = params.clone();
2734 let request_id = request_id.clone();
2735 async move { self.ws_amend_order(params, Some(request_id)).await }
2736 },
2737 should_retry_okx_error,
2738 create_okx_timeout_error,
2739 &self.cancellation_token,
2740 )
2741 .await
2742 }
2743
2744 #[allow(clippy::type_complexity)]
2751 #[allow(clippy::too_many_arguments)]
2752 pub async fn batch_submit_orders(
2753 &self,
2754 orders: Vec<(
2755 OKXInstrumentType,
2756 InstrumentId,
2757 OKXTradeMode,
2758 ClientOrderId,
2759 OrderSide,
2760 Option<PositionSide>,
2761 OrderType,
2762 Quantity,
2763 Option<Price>,
2764 Option<Price>,
2765 Option<bool>,
2766 Option<bool>,
2767 )>,
2768 ) -> Result<(), OKXWsError> {
2769 let mut args: Vec<Value> = Vec::with_capacity(orders.len());
2770 for (
2771 inst_type,
2772 inst_id,
2773 td_mode,
2774 cl_ord_id,
2775 ord_side,
2776 pos_side,
2777 ord_type,
2778 qty,
2779 pr,
2780 tp,
2781 post_only,
2782 reduce_only,
2783 ) in orders
2784 {
2785 let mut builder = WsPostOrderParamsBuilder::default();
2786 builder.inst_type(inst_type);
2787 builder.inst_id(inst_id.symbol.inner());
2788 builder.td_mode(td_mode);
2789 builder.cl_ord_id(cl_ord_id.as_str());
2790 builder.side(ord_side);
2791
2792 if let Some(ps) = pos_side {
2793 builder.pos_side(OKXPositionSide::from(ps));
2794 }
2795
2796 let okx_ord_type = if post_only.unwrap_or(false) {
2797 OKXOrderType::PostOnly
2798 } else {
2799 OKXOrderType::from(ord_type)
2800 };
2801
2802 builder.ord_type(okx_ord_type);
2803 builder.sz(qty.to_string());
2804
2805 if let Some(p) = pr {
2806 builder.px(p.to_string());
2807 } else if let Some(p) = tp {
2808 builder.px(p.to_string());
2809 }
2810
2811 if let Some(ro) = reduce_only {
2812 builder.reduce_only(ro);
2813 }
2814
2815 builder.tag(OKX_NAUTILUS_BROKER_ID);
2816
2817 let params = builder
2818 .build()
2819 .map_err(|e| OKXWsError::ClientError(format!("Build order params error: {e}")))?;
2820 let val =
2821 serde_json::to_value(params).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2822 args.push(val);
2823 }
2824
2825 self.ws_batch_place_orders(args).await
2826 }
2827
2828 #[allow(clippy::type_complexity)]
2841 pub async fn batch_cancel_orders(
2842 &self,
2843 orders: Vec<(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>)>,
2844 ) -> Result<(), OKXWsError> {
2845 let mut args: Vec<Value> = Vec::with_capacity(orders.len());
2846 for (inst_id, cl_ord_id, ord_id) in orders {
2847 let mut builder = WsCancelOrderParamsBuilder::default();
2848 builder.inst_id(inst_id.symbol.inner());
2850
2851 if let Some(c) = cl_ord_id {
2852 builder.cl_ord_id(c.as_str());
2853 }
2854
2855 if let Some(o) = ord_id {
2856 builder.ord_id(o.as_str());
2857 }
2858
2859 let params = builder.build().map_err(|e| {
2860 OKXWsError::ClientError(format!("Build cancel batch params error: {e}"))
2861 })?;
2862 let val =
2863 serde_json::to_value(params).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2864 args.push(val);
2865 }
2866
2867 self.ws_batch_cancel_orders(args).await
2868 }
2869
2870 pub async fn mass_cancel_orders(&self, inst_id: InstrumentId) -> Result<(), OKXWsError> {
2884 let (inst_type, inst_family) =
2885 self.get_instrument_type_and_family(inst_id.symbol.inner())?;
2886
2887 let params = WsMassCancelParams {
2888 inst_type,
2889 inst_family: Ustr::from(&inst_family),
2890 };
2891
2892 let args =
2893 vec![serde_json::to_value(params).map_err(|e| OKXWsError::JsonError(e.to_string()))?];
2894
2895 let request_id = self.generate_unique_request_id();
2896
2897 self.pending_mass_cancel_requests
2898 .insert(request_id.clone(), inst_id);
2899
2900 self.retry_manager
2901 .execute_with_retry_with_cancel(
2902 "mass_cancel_orders",
2903 || {
2904 let args = args.clone();
2905 let request_id = request_id.clone();
2906 async move { self.ws_mass_cancel_with_id(args, request_id).await }
2907 },
2908 should_retry_okx_error,
2909 create_okx_timeout_error,
2910 &self.cancellation_token,
2911 )
2912 .await
2913 }
2914
2915 #[allow(clippy::type_complexity)]
2922 #[allow(clippy::too_many_arguments)]
2923 pub async fn batch_modify_orders(
2924 &self,
2925 orders: Vec<(
2926 OKXInstrumentType,
2927 InstrumentId,
2928 ClientOrderId,
2929 ClientOrderId,
2930 Option<Price>,
2931 Option<Quantity>,
2932 )>,
2933 ) -> Result<(), OKXWsError> {
2934 let mut args: Vec<Value> = Vec::with_capacity(orders.len());
2935 for (_inst_type, inst_id, cl_ord_id, new_cl_ord_id, pr, sz) in orders {
2936 let mut builder = WsAmendOrderParamsBuilder::default();
2937 builder.inst_id(inst_id.symbol.inner());
2939 builder.cl_ord_id(cl_ord_id.as_str());
2940 builder.new_cl_ord_id(new_cl_ord_id.as_str());
2941
2942 if let Some(p) = pr {
2943 builder.new_px(p.to_string());
2944 }
2945
2946 if let Some(q) = sz {
2947 builder.new_sz(q.to_string());
2948 }
2949
2950 let params = builder.build().map_err(|e| {
2951 OKXWsError::ClientError(format!("Build amend batch params error: {e}"))
2952 })?;
2953 let val =
2954 serde_json::to_value(params).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
2955 args.push(val);
2956 }
2957
2958 self.ws_batch_amend_orders(args).await
2959 }
2960
2961 #[allow(clippy::too_many_arguments)]
2972 pub async fn submit_algo_order(
2973 &self,
2974 trader_id: TraderId,
2975 strategy_id: StrategyId,
2976 instrument_id: InstrumentId,
2977 td_mode: OKXTradeMode,
2978 client_order_id: ClientOrderId,
2979 order_side: OrderSide,
2980 order_type: OrderType,
2981 quantity: Quantity,
2982 trigger_price: Price,
2983 trigger_type: Option<TriggerType>,
2984 limit_price: Option<Price>,
2985 reduce_only: Option<bool>,
2986 ) -> Result<(), OKXWsError> {
2987 if !is_conditional_order(order_type) {
2988 return Err(OKXWsError::ClientError(format!(
2989 "Order type {order_type:?} is not a conditional order"
2990 )));
2991 }
2992
2993 let mut builder = WsPostAlgoOrderParamsBuilder::default();
2994 if !matches!(order_side, OrderSide::Buy | OrderSide::Sell) {
2995 return Err(OKXWsError::ClientError(
2996 "Invalid order side for OKX".to_string(),
2997 ));
2998 }
2999
3000 builder.inst_id(instrument_id.symbol.inner());
3001 builder.td_mode(td_mode);
3002 builder.cl_ord_id(client_order_id.as_str());
3003 builder.side(order_side);
3004 builder.ord_type(
3005 conditional_order_to_algo_type(order_type)
3006 .map_err(|e| OKXWsError::ClientError(e.to_string()))?,
3007 );
3008 builder.sz(quantity.to_string());
3009 builder.trigger_px(trigger_price.to_string());
3010
3011 let okx_trigger_type = trigger_type.map_or(OKXTriggerType::Last, Into::into);
3013 builder.trigger_px_type(okx_trigger_type);
3014
3015 if matches!(order_type, OrderType::StopLimit | OrderType::LimitIfTouched)
3017 && let Some(price) = limit_price
3018 {
3019 builder.order_px(price.to_string());
3020 }
3021
3022 if let Some(reduce) = reduce_only {
3023 builder.reduce_only(reduce);
3024 }
3025
3026 builder.tag(OKX_NAUTILUS_BROKER_ID);
3027
3028 let params = builder
3029 .build()
3030 .map_err(|e| OKXWsError::ClientError(format!("Build algo order params error: {e}")))?;
3031
3032 let request_id = self.generate_unique_request_id();
3033
3034 self.pending_place_requests.insert(
3035 request_id.clone(),
3036 (
3037 PendingOrderParams::Algo(()),
3038 client_order_id,
3039 trader_id,
3040 strategy_id,
3041 instrument_id,
3042 ),
3043 );
3044
3045 self.retry_manager
3046 .execute_with_retry_with_cancel(
3047 "submit_algo_order",
3048 || {
3049 let params = params.clone();
3050 let request_id = request_id.clone();
3051 async move { self.ws_place_algo_order(params, Some(request_id)).await }
3052 },
3053 should_retry_okx_error,
3054 create_okx_timeout_error,
3055 &self.cancellation_token,
3056 )
3057 .await
3058 }
3059
3060 pub async fn cancel_algo_order(
3071 &self,
3072 trader_id: TraderId,
3073 strategy_id: StrategyId,
3074 instrument_id: InstrumentId,
3075 client_order_id: Option<ClientOrderId>,
3076 algo_order_id: Option<String>,
3077 ) -> Result<(), OKXWsError> {
3078 let mut builder = WsCancelAlgoOrderParamsBuilder::default();
3079 builder.inst_id(instrument_id.symbol.inner());
3080
3081 if let Some(client_order_id) = client_order_id {
3082 builder.algo_cl_ord_id(client_order_id.as_str());
3083 }
3084
3085 if let Some(algo_id) = algo_order_id {
3086 builder.algo_id(algo_id);
3087 }
3088
3089 let params = builder
3090 .build()
3091 .map_err(|e| OKXWsError::ClientError(format!("Build cancel algo params error: {e}")))?;
3092
3093 let request_id = self.generate_unique_request_id();
3094
3095 if let Some(client_order_id) = client_order_id {
3097 self.pending_cancel_requests.insert(
3098 request_id.clone(),
3099 (client_order_id, trader_id, strategy_id, instrument_id, None),
3100 );
3101 }
3102
3103 self.retry_manager
3104 .execute_with_retry_with_cancel(
3105 "cancel_algo_order",
3106 || {
3107 let params = params.clone();
3108 let request_id = request_id.clone();
3109 async move { self.ws_cancel_algo_order(params, Some(request_id)).await }
3110 },
3111 should_retry_okx_error,
3112 create_okx_timeout_error,
3113 &self.cancellation_token,
3114 )
3115 .await
3116 }
3117
3118 async fn ws_place_algo_order(
3120 &self,
3121 params: WsPostAlgoOrderParams,
3122 request_id: Option<String>,
3123 ) -> Result<(), OKXWsError> {
3124 let request_id = request_id.unwrap_or(self.generate_unique_request_id());
3125
3126 let req = OKXWsRequest {
3127 id: Some(request_id),
3128 op: OKXWsOperation::OrderAlgo,
3129 exp_time: None,
3130 args: vec![params],
3131 };
3132
3133 let txt = serde_json::to_string(&req).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
3134
3135 {
3136 let inner_guard = self.inner.read().await;
3137 if let Some(inner) = &*inner_guard {
3138 if let Err(e) = inner
3139 .send_text(txt, Some(vec!["orders-algo".to_string()]))
3140 .await
3141 {
3142 tracing::error!("Error sending algo order message: {e:?}");
3143 }
3144 Ok(())
3145 } else {
3146 Err(OKXWsError::ClientError("Not connected".to_string()))
3147 }
3148 }
3149 }
3150
3151 async fn ws_cancel_algo_order(
3153 &self,
3154 params: WsCancelAlgoOrderParams,
3155 request_id: Option<String>,
3156 ) -> Result<(), OKXWsError> {
3157 let request_id = request_id.unwrap_or(self.generate_unique_request_id());
3158
3159 let req = OKXWsRequest {
3160 id: Some(request_id),
3161 op: OKXWsOperation::CancelAlgos,
3162 exp_time: None,
3163 args: vec![params],
3164 };
3165
3166 let txt = serde_json::to_string(&req).map_err(|e| OKXWsError::JsonError(e.to_string()))?;
3167
3168 {
3169 let inner_guard = self.inner.read().await;
3170 if let Some(inner) = &*inner_guard {
3171 if let Err(e) = inner
3172 .send_text(txt, Some(vec!["cancel-algos".to_string()]))
3173 .await
3174 {
3175 tracing::error!("Error sending cancel algo message: {e:?}");
3176 }
3177 Ok(())
3178 } else {
3179 Err(OKXWsError::ClientError("Not connected".to_string()))
3180 }
3181 }
3182 }
3183}
3184
3185struct OKXFeedHandler {
3186 receiver: UnboundedReceiver<Message>,
3187 signal: Arc<AtomicBool>,
3188}
3189
3190impl OKXFeedHandler {
3191 pub fn new(receiver: UnboundedReceiver<Message>, signal: Arc<AtomicBool>) -> Self {
3193 Self { receiver, signal }
3194 }
3195
3196 async fn next(&mut self) -> Option<OKXWebSocketEvent> {
3198 loop {
3199 tokio::select! {
3200 msg = self.receiver.recv() => match msg {
3201 Some(msg) => match msg {
3202 Message::Text(text) => {
3203 if text == TEXT_PONG {
3205 tracing::trace!("Received pong from OKX");
3206 continue;
3207 }
3208 if text == TEXT_PING {
3209 tracing::trace!("Received ping from OKX (text)");
3210 return Some(OKXWebSocketEvent::Ping);
3211 }
3212
3213 if text == RECONNECTED {
3215 tracing::debug!("Received WebSocket reconnection signal");
3216 return Some(OKXWebSocketEvent::Reconnected);
3217 }
3218 tracing::trace!("Received WebSocket message: {text}");
3219
3220 match serde_json::from_str(&text) {
3221 Ok(ws_event) => match &ws_event {
3222 OKXWebSocketEvent::Error { code, msg } => {
3223 tracing::error!("WebSocket error: {code} - {msg}");
3224 return Some(ws_event);
3225 }
3226 OKXWebSocketEvent::Login {
3227 event,
3228 code,
3229 msg,
3230 conn_id,
3231 } => {
3232 if code == "0" {
3233 tracing::info!(
3234 "Successfully authenticated with OKX WebSocket, conn_id={conn_id}"
3235 );
3236 } else {
3237 tracing::error!(
3238 "Authentication failed: {event} {code} - {msg}"
3239 );
3240 }
3241 return Some(ws_event);
3242 }
3243 OKXWebSocketEvent::Subscription {
3244 event,
3245 arg,
3246 conn_id, .. } => {
3247 let channel_str = serde_json::to_string(&arg.channel)
3248 .expect("Invalid OKX websocket channel")
3249 .trim_matches('"')
3250 .to_string();
3251 tracing::debug!(
3252 "{event}d: channel={channel_str}, conn_id={conn_id}"
3253 );
3254 continue;
3255 }
3256 OKXWebSocketEvent::ChannelConnCount {
3257 event: _,
3258 channel,
3259 conn_count,
3260 conn_id,
3261 } => {
3262 let channel_str = serde_json::to_string(&channel)
3263 .expect("Invalid OKX websocket channel")
3264 .trim_matches('"')
3265 .to_string();
3266 tracing::debug!(
3267 "Channel connection status: channel={channel_str}, connections={conn_count}, conn_id={conn_id}",
3268 );
3269 continue;
3270 }
3271 OKXWebSocketEvent::Ping => {
3272 tracing::trace!("Ignoring ping event parsed from text payload");
3273 continue;
3274 }
3275 OKXWebSocketEvent::Data { .. } => return Some(ws_event),
3276 OKXWebSocketEvent::BookData { .. } => return Some(ws_event),
3277 OKXWebSocketEvent::OrderResponse {
3278 id,
3279 op,
3280 code,
3281 msg: _,
3282 data,
3283 } => {
3284 if code == "0" {
3285 tracing::debug!(
3286 "Order operation successful: id={:?}, op={op}, code={code}",
3287 id
3288 );
3289
3290 if let Some(order_data) = data.first() {
3292 let success_msg = order_data
3293 .get("sMsg")
3294 .and_then(|s| s.as_str())
3295 .unwrap_or("Order operation successful");
3296 tracing::debug!("Order success details: {success_msg}");
3297 }
3298 }
3299 return Some(ws_event);
3300 }
3301 OKXWebSocketEvent::Reconnected => {
3302 tracing::warn!("Unexpected Reconnected event from deserialization");
3304 continue;
3305 }
3306 },
3307 Err(e) => {
3308 tracing::error!("Failed to parse message: {e}: {text}");
3309 return None;
3310 }
3311 }
3312 }
3313 Message::Ping(payload) => {
3314 tracing::trace!("Received ping frame from OKX ({} bytes)", payload.len());
3315 continue;
3316 }
3317 Message::Pong(payload) => {
3318 tracing::trace!("Received pong frame from OKX ({} bytes)", payload.len());
3319 continue;
3320 }
3321 Message::Binary(msg) => {
3322 tracing::debug!("Raw binary: {msg:?}");
3323 }
3324 Message::Close(_) => {
3325 tracing::debug!("Received close message");
3326 return None;
3327 }
3328 msg => {
3329 tracing::warn!("Unexpected message: {msg}");
3330 }
3331 }
3332 None => {
3333 tracing::info!("WebSocket stream closed");
3334 return None;
3335 }
3336 },
3337 _ = tokio::time::sleep(Duration::from_millis(1)) => {
3338 if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
3339 tracing::debug!("Stop signal received");
3340 return None;
3341 }
3342 }
3343 }
3344 }
3345 }
3346}
3347
3348struct OKXWsMessageHandler {
3349 account_id: AccountId,
3350 inner: Arc<tokio::sync::RwLock<Option<WebSocketClient>>>,
3351 handler: OKXFeedHandler,
3352 #[allow(dead_code)]
3353 tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
3354 pending_place_requests: Arc<DashMap<String, PlaceRequestData>>,
3355 pending_cancel_requests: Arc<DashMap<String, CancelRequestData>>,
3356 pending_amend_requests: Arc<DashMap<String, AmendRequestData>>,
3357 pending_mass_cancel_requests: Arc<DashMap<String, MassCancelRequestData>>,
3358 active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
3359 client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
3360 emitted_order_accepted: Arc<DashMap<VenueOrderId, ()>>,
3361 instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
3362 last_account_state: Option<AccountState>,
3363 fee_cache: AHashMap<Ustr, Money>, filled_qty_cache: AHashMap<Ustr, Quantity>, funding_rate_cache: AHashMap<Ustr, (Ustr, u64)>, auth_tracker: AuthTracker,
3367 pending_messages: VecDeque<NautilusWsMessage>,
3368 subscriptions_state: SubscriptionState,
3369}
3370
3371impl OKXWsMessageHandler {
3372 fn schedule_text_pong(&self) {
3373 let inner = self.inner.clone();
3374 get_runtime().spawn(async move {
3375 let guard = inner.read().await;
3376
3377 if let Some(client) = guard.as_ref() {
3378 if let Err(e) = client.send_text(TEXT_PONG.to_string(), None).await {
3379 tracing::warn!(error = %e, "Failed to send pong response to OKX text ping");
3380 } else {
3381 tracing::trace!("Sent pong response to OKX text ping");
3382 }
3383 } else {
3384 tracing::debug!("Received text ping with no active websocket client");
3385 }
3386 });
3387 }
3388
3389 fn try_handle_post_only_auto_cancel(
3390 &mut self,
3391 msg: &OKXOrderMsg,
3392 ts_init: UnixNanos,
3393 exec_reports: &mut Vec<ExecutionReport>,
3394 ) -> bool {
3395 if !Self::is_post_only_auto_cancel(msg) {
3396 return false;
3397 }
3398
3399 let Some(client_order_id) = parse_client_order_id(&msg.cl_ord_id) else {
3400 return false;
3401 };
3402
3403 let Some((_, (trader_id, strategy_id, instrument_id))) =
3404 self.active_client_orders.remove(&client_order_id)
3405 else {
3406 return false;
3407 };
3408
3409 self.client_id_aliases.remove(&client_order_id);
3410
3411 if !exec_reports.is_empty() {
3412 let reports = std::mem::take(exec_reports);
3413 self.pending_messages
3414 .push_back(NautilusWsMessage::ExecutionReports(reports));
3415 }
3416
3417 let reason = msg
3418 .cancel_source_reason
3419 .as_ref()
3420 .filter(|reason| !reason.is_empty())
3421 .map_or_else(
3422 || Ustr::from(OKX_POST_ONLY_CANCEL_REASON),
3423 |reason| Ustr::from(reason.as_str()),
3424 );
3425
3426 let ts_event = parse_millisecond_timestamp(msg.u_time);
3427 let rejected = OrderRejected::new(
3428 trader_id,
3429 strategy_id,
3430 instrument_id,
3431 client_order_id,
3432 self.account_id,
3433 reason,
3434 UUID4::new(),
3435 ts_event,
3436 ts_init,
3437 false,
3438 true,
3439 );
3440
3441 self.pending_messages
3442 .push_back(NautilusWsMessage::OrderRejected(rejected));
3443
3444 true
3445 }
3446
3447 fn is_post_only_auto_cancel(msg: &OKXOrderMsg) -> bool {
3448 if msg.state != OKXOrderStatus::Canceled {
3449 return false;
3450 }
3451
3452 let cancel_source_matches = matches!(
3453 msg.cancel_source.as_deref(),
3454 Some(source) if source == OKX_POST_ONLY_CANCEL_SOURCE
3455 );
3456
3457 let reason_matches = matches!(
3458 msg.cancel_source_reason.as_deref(),
3459 Some(reason) if reason.contains("POST_ONLY")
3460 );
3461
3462 if !(cancel_source_matches || reason_matches) {
3463 return false;
3464 }
3465
3466 msg.acc_fill_sz
3467 .as_ref()
3468 .is_none_or(|filled| filled == "0" || filled.is_empty())
3469 }
3470
3471 fn register_client_order_aliases(
3472 &self,
3473 raw_child: &Option<ClientOrderId>,
3474 parent_from_msg: &Option<ClientOrderId>,
3475 ) -> Option<ClientOrderId> {
3476 if let Some(parent) = parent_from_msg {
3477 self.client_id_aliases.insert(*parent, *parent);
3478 if let Some(child) = raw_child.as_ref().filter(|child| **child != *parent) {
3479 self.client_id_aliases.insert(*child, *parent);
3480 }
3481 Some(*parent)
3482 } else if let Some(child) = raw_child.as_ref() {
3483 if let Some(mapped) = self.client_id_aliases.get(child) {
3484 Some(*mapped.value())
3485 } else {
3486 self.client_id_aliases.insert(*child, *child);
3487 Some(*child)
3488 }
3489 } else {
3490 None
3491 }
3492 }
3493
3494 fn adjust_execution_report(
3495 &self,
3496 report: ExecutionReport,
3497 effective_client_id: &Option<ClientOrderId>,
3498 raw_child: &Option<ClientOrderId>,
3499 ) -> ExecutionReport {
3500 match report {
3501 ExecutionReport::Order(status_report) => {
3502 let mut adjusted = status_report;
3503 let mut final_id = *effective_client_id;
3504
3505 if final_id.is_none() {
3506 final_id = adjusted.client_order_id;
3507 }
3508
3509 if final_id.is_none()
3510 && let Some(child) = raw_child.as_ref()
3511 && let Some(mapped) = self.client_id_aliases.get(child)
3512 {
3513 final_id = Some(*mapped.value());
3514 }
3515
3516 if let Some(final_id_value) = final_id {
3517 if adjusted.client_order_id != Some(final_id_value) {
3518 adjusted = adjusted.with_client_order_id(final_id_value);
3519 }
3520 self.client_id_aliases
3521 .insert(final_id_value, final_id_value);
3522
3523 if let Some(child) =
3524 raw_child.as_ref().filter(|child| **child != final_id_value)
3525 {
3526 adjusted = adjusted.with_linked_order_ids(vec![*child]);
3527 }
3528 }
3529
3530 ExecutionReport::Order(adjusted)
3531 }
3532 ExecutionReport::Fill(mut fill_report) => {
3533 let mut final_id = *effective_client_id;
3534 if final_id.is_none() {
3535 final_id = fill_report.client_order_id;
3536 }
3537 if final_id.is_none()
3538 && let Some(child) = raw_child.as_ref()
3539 && let Some(mapped) = self.client_id_aliases.get(child)
3540 {
3541 final_id = Some(*mapped.value());
3542 }
3543
3544 if let Some(final_id_value) = final_id {
3545 fill_report.client_order_id = Some(final_id_value);
3546 self.client_id_aliases
3547 .insert(final_id_value, final_id_value);
3548 }
3549
3550 ExecutionReport::Fill(fill_report)
3551 }
3552 }
3553 }
3554
3555 fn update_caches_with_report(&mut self, report: &ExecutionReport) {
3556 match report {
3557 ExecutionReport::Fill(fill_report) => {
3558 let order_id = fill_report.venue_order_id.inner();
3559 let current_fee = self
3560 .fee_cache
3561 .get(&order_id)
3562 .copied()
3563 .unwrap_or_else(|| Money::new(0.0, fill_report.commission.currency));
3564 let total_fee = current_fee + fill_report.commission;
3565 self.fee_cache.insert(order_id, total_fee);
3566
3567 let current_filled_qty = self
3568 .filled_qty_cache
3569 .get(&order_id)
3570 .copied()
3571 .unwrap_or_else(|| Quantity::zero(fill_report.last_qty.precision));
3572 let total_filled_qty = current_filled_qty + fill_report.last_qty;
3573 self.filled_qty_cache.insert(order_id, total_filled_qty);
3574 }
3575 ExecutionReport::Order(status_report) => {
3576 if matches!(status_report.order_status, OrderStatus::Filled) {
3577 self.fee_cache.remove(&status_report.venue_order_id.inner());
3578 self.filled_qty_cache
3579 .remove(&status_report.venue_order_id.inner());
3580 }
3581
3582 if matches!(
3583 status_report.order_status,
3584 OrderStatus::Canceled
3585 | OrderStatus::Expired
3586 | OrderStatus::Filled
3587 | OrderStatus::Rejected,
3588 ) {
3589 if let Some(client_order_id) = status_report.client_order_id {
3590 self.active_client_orders.remove(&client_order_id);
3591 self.client_id_aliases.remove(&client_order_id);
3592 }
3593 if let Some(linked) = &status_report.linked_order_ids {
3594 for child in linked {
3595 self.client_id_aliases.remove(child);
3596 }
3597 }
3598 }
3599 }
3600 }
3601 }
3602
3603 #[allow(clippy::too_many_arguments)]
3605 pub fn new(
3606 account_id: AccountId,
3607 instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
3608 reader: UnboundedReceiver<Message>,
3609 signal: Arc<AtomicBool>,
3610 inner: Arc<tokio::sync::RwLock<Option<WebSocketClient>>>,
3611 tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
3612 pending_place_requests: Arc<DashMap<String, PlaceRequestData>>,
3613 pending_cancel_requests: Arc<DashMap<String, CancelRequestData>>,
3614 pending_amend_requests: Arc<DashMap<String, AmendRequestData>>,
3615 pending_mass_cancel_requests: Arc<DashMap<String, MassCancelRequestData>>,
3616 active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
3617 client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
3618 emitted_order_accepted: Arc<DashMap<VenueOrderId, ()>>,
3619 auth_tracker: AuthTracker,
3620 subscriptions_state: SubscriptionState,
3621 ) -> Self {
3622 Self {
3623 account_id,
3624 inner,
3625 handler: OKXFeedHandler::new(reader, signal),
3626 tx,
3627 pending_place_requests,
3628 pending_cancel_requests,
3629 pending_amend_requests,
3630 pending_mass_cancel_requests,
3631 active_client_orders,
3632 client_id_aliases,
3633 emitted_order_accepted,
3634 instruments_cache,
3635 last_account_state: None,
3636 fee_cache: AHashMap::new(),
3637 filled_qty_cache: AHashMap::new(),
3638 funding_rate_cache: AHashMap::new(),
3639 auth_tracker,
3640 pending_messages: VecDeque::new(),
3641 subscriptions_state,
3642 }
3643 }
3644
3645 fn is_stopped(&self) -> bool {
3646 self.handler
3647 .signal
3648 .load(std::sync::atomic::Ordering::Relaxed)
3649 }
3650
3651 #[allow(dead_code)]
3652 async fn run(&mut self) {
3653 while let Some(data) = self.next().await {
3654 if let Err(e) = self.tx.send(data) {
3655 tracing::error!("Error sending data: {e}");
3656 break; }
3658 }
3659 }
3660
3661 async fn next(&mut self) -> Option<NautilusWsMessage> {
3662 if let Some(message) = self.pending_messages.pop_front() {
3663 return Some(message);
3664 }
3665
3666 let clock = get_atomic_clock_realtime();
3667
3668 while let Some(event) = self.handler.next().await {
3669 let ts_init = clock.get_time_ns();
3670
3671 match event {
3672 OKXWebSocketEvent::Ping => {
3673 self.schedule_text_pong();
3674 continue;
3675 }
3676 OKXWebSocketEvent::Login {
3677 code, msg, conn_id, ..
3678 } => {
3679 if code == "0" {
3680 self.auth_tracker.succeed();
3681 continue;
3682 }
3683
3684 tracing::error!("Authentication failed: {msg}");
3685 self.auth_tracker.fail(msg.clone());
3686
3687 let error = OKXWebSocketError {
3688 code,
3689 message: msg,
3690 conn_id: Some(conn_id),
3691 timestamp: clock.get_time_ns().as_u64(),
3692 };
3693 self.pending_messages
3694 .push_back(NautilusWsMessage::Error(error));
3695 continue;
3696 }
3697 OKXWebSocketEvent::BookData { arg, action, data } => {
3698 let Some(inst_id) = arg.inst_id else {
3699 tracing::error!("Instrument ID missing for book data event");
3700 continue;
3701 };
3702
3703 let Some(inst) = self.instruments_cache.get(&inst_id) else {
3704 continue;
3705 };
3706
3707 let instrument_id = inst.id();
3708 let price_precision = inst.price_precision();
3709 let size_precision = inst.size_precision();
3710
3711 match parse_book_msg_vec(
3712 data,
3713 &instrument_id,
3714 price_precision,
3715 size_precision,
3716 action,
3717 ts_init,
3718 ) {
3719 Ok(payloads) => return Some(NautilusWsMessage::Data(payloads)),
3720 Err(e) => {
3721 tracing::error!("Failed to parse book message: {e}");
3722 continue;
3723 }
3724 }
3725 }
3726 OKXWebSocketEvent::OrderResponse {
3727 id,
3728 op,
3729 code,
3730 msg,
3731 data,
3732 } => {
3733 if code == "0" {
3734 tracing::debug!(
3735 "Order operation successful: id={id:?} op={op} code={code}"
3736 );
3737
3738 if op == OKXWsOperation::MassCancel
3739 && let Some(request_id) = &id
3740 && let Some((_, instrument_id)) =
3741 self.pending_mass_cancel_requests.remove(request_id)
3742 {
3743 tracing::info!(
3744 "Mass cancel operation successful for instrument: {}",
3745 instrument_id
3746 );
3747 } else if op == OKXWsOperation::Order
3748 && let Some(request_id) = &id
3749 && let Some((
3750 _,
3751 (params, client_order_id, _trader_id, _strategy_id, instrument_id),
3752 )) = self.pending_place_requests.remove(request_id)
3753 {
3754 let (venue_order_id, ts_accepted) = if let Some(first) = data.first() {
3755 let ord_id = first
3756 .get("ordId")
3757 .and_then(|v| v.as_str())
3758 .filter(|s| !s.is_empty())
3759 .map(VenueOrderId::new);
3760
3761 let ts = first
3762 .get("ts")
3763 .and_then(|v| v.as_str())
3764 .and_then(|s| s.parse::<u64>().ok())
3765 .map_or_else(
3766 || clock.get_time_ns(),
3767 |ms| UnixNanos::from(ms * 1_000_000),
3768 );
3769
3770 (ord_id, ts)
3771 } else {
3772 (None, clock.get_time_ns())
3773 };
3774
3775 if let Some(instrument) = self
3776 .instruments_cache
3777 .get(&Ustr::from(instrument_id.symbol.as_str()))
3778 {
3779 match params {
3780 PendingOrderParams::Regular(order_params) => {
3781 let is_explicit_quote_sized = order_params
3783 .tgt_ccy
3784 .is_some_and(|tgt| tgt == OKXTargetCurrency::QuoteCcy);
3785
3786 let is_implicit_quote_sized =
3789 order_params.tgt_ccy.is_none()
3790 && order_params.side == OKXSide::Buy
3791 && matches!(
3792 order_params.ord_type,
3793 OKXOrderType::Market
3794 )
3795 && order_params.td_mode == OKXTradeMode::Cash
3796 && instrument.instrument_class().as_ref() == "SPOT";
3797
3798 if is_explicit_quote_sized || is_implicit_quote_sized {
3799 tracing::info!(
3804 "Skipping synthetic OrderAccepted for {} quote-sized order: client_order_id={client_order_id}, venue_order_id={:?}",
3805 if is_explicit_quote_sized {
3806 "explicit"
3807 } else {
3808 "implicit"
3809 },
3810 venue_order_id
3811 );
3812 continue;
3813 }
3814
3815 let order_side = order_params.side.into();
3816 let order_type = order_params.ord_type.into();
3817 let time_in_force = match order_params.ord_type {
3818 OKXOrderType::Fok => TimeInForce::Fok,
3819 OKXOrderType::Ioc | OKXOrderType::OptimalLimitIoc => {
3820 TimeInForce::Ioc
3821 }
3822 _ => TimeInForce::Gtc,
3823 };
3824
3825 let size_precision = instrument.size_precision();
3826 let quantity = match parse_quantity(
3827 &order_params.sz,
3828 size_precision,
3829 ) {
3830 Ok(q) => q,
3831 Err(e) => {
3832 tracing::error!(
3833 "Failed to parse quantity for accepted order: {e}"
3834 );
3835 continue;
3836 }
3837 };
3838
3839 let filled_qty = Quantity::zero(size_precision);
3840
3841 let mut report = OrderStatusReport::new(
3842 self.account_id,
3843 instrument_id,
3844 Some(client_order_id),
3845 venue_order_id
3846 .unwrap_or_else(|| VenueOrderId::new("PENDING")),
3847 order_side,
3848 order_type,
3849 time_in_force,
3850 OrderStatus::Accepted,
3851 quantity,
3852 filled_qty,
3853 ts_accepted,
3854 ts_accepted, ts_init,
3856 None, );
3858
3859 if let Some(px) = &order_params.px
3860 && !px.is_empty()
3861 && let Ok(price) =
3862 parse_price(px, instrument.price_precision())
3863 {
3864 report = report.with_price(price);
3865 }
3866
3867 if let Some(true) = order_params.reduce_only {
3868 report = report.with_reduce_only(true);
3869 }
3870
3871 if order_type == OrderType::Limit
3872 && order_params.ord_type == OKXOrderType::PostOnly
3873 {
3874 report = report.with_post_only(true);
3875 }
3876
3877 if let Some(ref v_order_id) = venue_order_id {
3878 self.emitted_order_accepted.insert(*v_order_id, ());
3879 }
3880
3881 tracing::debug!(
3882 "Order accepted: client_order_id={client_order_id}, venue_order_id={:?}",
3883 venue_order_id
3884 );
3885
3886 return Some(NautilusWsMessage::ExecutionReports(vec![
3887 ExecutionReport::Order(report),
3888 ]));
3889 }
3890 PendingOrderParams::Algo(_) => {
3891 tracing::info!(
3892 "Algo order placement confirmed: client_order_id={client_order_id}, venue_order_id={:?}",
3893 venue_order_id
3894 );
3895 }
3896 }
3897 } else {
3898 tracing::error!(
3899 "Instrument not found for accepted order: {instrument_id}"
3900 );
3901 }
3902 }
3903
3904 if let Some(first) = data.first()
3905 && let Some(success_msg) =
3906 first.get("sMsg").and_then(|value| value.as_str())
3907 {
3908 tracing::debug!("Order details: {success_msg}");
3909 }
3910
3911 continue;
3912 }
3913
3914 let error_msg = data
3915 .first()
3916 .and_then(|d| d.get("sMsg"))
3917 .and_then(|s| s.as_str())
3918 .unwrap_or(&msg)
3919 .to_string();
3920
3921 if let Some(first) = data.first() {
3922 tracing::debug!(
3923 "Error data fields: {}",
3924 serde_json::to_string_pretty(first)
3925 .unwrap_or_else(|_| "unable to serialize".to_string())
3926 );
3927 }
3928
3929 tracing::warn!(
3930 "Order operation failed: id={id:?} op={op} code={code} msg={error_msg}"
3931 );
3932
3933 if let Some(request_id) = &id {
3934 match op {
3935 OKXWsOperation::Order => {
3936 if let Some((
3937 _,
3938 (
3939 _params,
3940 client_order_id,
3941 trader_id,
3942 strategy_id,
3943 instrument_id,
3944 ),
3945 )) = self.pending_place_requests.remove(request_id)
3946 {
3947 let ts_event = clock.get_time_ns();
3948 let due_post_only =
3949 is_post_only_rejection(code.as_str(), &data);
3950 let rejected = OrderRejected::new(
3951 trader_id,
3952 strategy_id,
3953 instrument_id,
3954 client_order_id,
3955 self.account_id,
3956 Ustr::from(error_msg.as_str()),
3957 UUID4::new(),
3958 ts_event,
3959 ts_init,
3960 false, due_post_only,
3962 );
3963
3964 return Some(NautilusWsMessage::OrderRejected(rejected));
3965 }
3966 }
3967 OKXWsOperation::CancelOrder => {
3968 if let Some((
3969 _,
3970 (
3971 client_order_id,
3972 trader_id,
3973 strategy_id,
3974 instrument_id,
3975 venue_order_id,
3976 ),
3977 )) = self.pending_cancel_requests.remove(request_id)
3978 {
3979 let ts_event = clock.get_time_ns();
3980 let rejected = OrderCancelRejected::new(
3981 trader_id,
3982 strategy_id,
3983 instrument_id,
3984 client_order_id,
3985 Ustr::from(error_msg.as_str()),
3986 UUID4::new(),
3987 ts_event,
3988 ts_init,
3989 false, venue_order_id,
3991 Some(self.account_id),
3992 );
3993
3994 return Some(NautilusWsMessage::OrderCancelRejected(rejected));
3995 }
3996 }
3997 OKXWsOperation::AmendOrder => {
3998 if let Some((
3999 _,
4000 (
4001 client_order_id,
4002 trader_id,
4003 strategy_id,
4004 instrument_id,
4005 venue_order_id,
4006 ),
4007 )) = self.pending_amend_requests.remove(request_id)
4008 {
4009 let ts_event = clock.get_time_ns();
4010 let rejected = OrderModifyRejected::new(
4011 trader_id,
4012 strategy_id,
4013 instrument_id,
4014 client_order_id,
4015 Ustr::from(error_msg.as_str()),
4016 UUID4::new(),
4017 ts_event,
4018 ts_init,
4019 false, venue_order_id,
4021 Some(self.account_id),
4022 );
4023
4024 return Some(NautilusWsMessage::OrderModifyRejected(rejected));
4025 }
4026 }
4027 OKXWsOperation::MassCancel => {
4028 if let Some((_, instrument_id)) =
4029 self.pending_mass_cancel_requests.remove(request_id)
4030 {
4031 tracing::error!(
4032 "Mass cancel operation failed for {}: code={code} msg={error_msg}",
4033 instrument_id
4034 );
4035 let error = OKXWebSocketError {
4036 code,
4037 message: format!(
4038 "Mass cancel failed for {}: {}",
4039 instrument_id, error_msg
4040 ),
4041 conn_id: None,
4042 timestamp: clock.get_time_ns().as_u64(),
4043 };
4044 return Some(NautilusWsMessage::Error(error));
4045 } else {
4046 tracing::error!(
4047 "Mass cancel operation failed: code={code} msg={error_msg}"
4048 );
4049 }
4050 }
4051 _ => tracing::warn!("Unhandled operation type for rejection: {op}"),
4052 }
4053 }
4054
4055 let error = OKXWebSocketError {
4056 code,
4057 message: error_msg,
4058 conn_id: None,
4059 timestamp: clock.get_time_ns().as_u64(),
4060 };
4061 return Some(NautilusWsMessage::Error(error));
4062 }
4063 OKXWebSocketEvent::Data { arg, data } => {
4064 let OKXWebSocketArg {
4065 channel, inst_id, ..
4066 } = arg;
4067
4068 match channel {
4069 OKXWsChannel::Account => {
4070 match serde_json::from_value::<Vec<OKXAccount>>(data) {
4071 Ok(accounts) => {
4072 if let Some(account) = accounts.first() {
4073 match parse_account_state(account, self.account_id, ts_init)
4074 {
4075 Ok(account_state) => {
4076 if let Some(last_account_state) =
4077 &self.last_account_state
4078 && account_state.has_same_balances_and_margins(
4079 last_account_state,
4080 )
4081 {
4082 continue;
4083 }
4084 self.last_account_state =
4085 Some(account_state.clone());
4086 return Some(NautilusWsMessage::AccountUpdate(
4087 account_state,
4088 ));
4089 }
4090 Err(e) => tracing::error!(
4091 "Failed to parse account state: {e}"
4092 ),
4093 }
4094 }
4095 }
4096 Err(e) => tracing::error!("Failed to parse account data: {e}"),
4097 }
4098 continue;
4099 }
4100 OKXWsChannel::Orders => {
4101 let orders: Vec<OKXOrderMsg> = match serde_json::from_value(data) {
4102 Ok(orders) => orders,
4103 Err(e) => {
4104 tracing::error!(
4105 "Failed to deserialize orders channel payload: {e}"
4106 );
4107 continue;
4108 }
4109 };
4110
4111 tracing::debug!(
4112 "Received {} order message(s) from orders channel",
4113 orders.len()
4114 );
4115
4116 let mut exec_reports: Vec<ExecutionReport> =
4117 Vec::with_capacity(orders.len());
4118
4119 for msg in orders {
4120 tracing::debug!(
4121 "Processing order message: inst_id={}, cl_ord_id={}, state={:?}, exec_type={:?}",
4122 msg.inst_id,
4123 msg.cl_ord_id,
4124 msg.state,
4125 msg.exec_type
4126 );
4127
4128 if self.try_handle_post_only_auto_cancel(
4129 &msg,
4130 ts_init,
4131 &mut exec_reports,
4132 ) {
4133 continue;
4134 }
4135
4136 let raw_child = parse_client_order_id(&msg.cl_ord_id);
4137 let parent_from_msg = msg
4138 .algo_cl_ord_id
4139 .as_ref()
4140 .filter(|value| !value.is_empty())
4141 .map(ClientOrderId::new);
4142 let effective_client_id = self
4143 .register_client_order_aliases(&raw_child, &parent_from_msg);
4144
4145 match parse_order_msg(
4146 &msg,
4147 self.account_id,
4148 &self.instruments_cache,
4149 &self.fee_cache,
4150 &self.filled_qty_cache,
4151 ts_init,
4152 ) {
4153 Ok(report) => {
4154 tracing::debug!(
4155 "Successfully parsed execution report: {:?}",
4156 report
4157 );
4158
4159 let is_duplicate_accepted =
4161 if let ExecutionReport::Order(ref status_report) =
4162 report
4163 {
4164 if status_report.order_status
4165 == OrderStatus::Accepted
4166 {
4167 self.emitted_order_accepted
4168 .contains_key(&status_report.venue_order_id)
4169 } else {
4170 false
4171 }
4172 } else {
4173 false
4174 };
4175
4176 if is_duplicate_accepted {
4177 tracing::debug!(
4178 "Skipping duplicate OrderAccepted for venue_order_id={}",
4179 if let ExecutionReport::Order(ref r) = report {
4180 r.venue_order_id.to_string()
4181 } else {
4182 "unknown".to_string()
4183 }
4184 );
4185 continue;
4186 }
4187
4188 if let ExecutionReport::Order(ref status_report) = report
4189 && status_report.order_status == OrderStatus::Accepted
4190 {
4191 self.emitted_order_accepted
4192 .insert(status_report.venue_order_id, ());
4193 }
4194
4195 let adjusted = self.adjust_execution_report(
4196 report,
4197 &effective_client_id,
4198 &raw_child,
4199 );
4200
4201 if let ExecutionReport::Order(ref status_report) = adjusted
4203 && matches!(
4204 status_report.order_status,
4205 OrderStatus::Filled
4206 | OrderStatus::Canceled
4207 | OrderStatus::Expired
4208 | OrderStatus::Rejected
4209 )
4210 {
4211 self.emitted_order_accepted
4212 .remove(&status_report.venue_order_id);
4213 }
4214
4215 self.update_caches_with_report(&adjusted);
4216 exec_reports.push(adjusted);
4217 }
4218 Err(e) => tracing::error!("Failed to parse order message: {e}"),
4219 }
4220 }
4221
4222 if !exec_reports.is_empty() {
4223 tracing::debug!(
4224 "Pushing {} execution report(s) to message queue",
4225 exec_reports.len()
4226 );
4227 self.pending_messages
4228 .push_back(NautilusWsMessage::ExecutionReports(exec_reports));
4229 } else {
4230 tracing::debug!(
4231 "No execution reports generated from order messages"
4232 );
4233 }
4234
4235 if let Some(message) = self.pending_messages.pop_front() {
4236 return Some(message);
4237 }
4238
4239 continue;
4240 }
4241 OKXWsChannel::OrdersAlgo => {
4242 let orders: Vec<OKXAlgoOrderMsg> = match serde_json::from_value(data) {
4243 Ok(orders) => orders,
4244 Err(e) => {
4245 tracing::error!(
4246 "Failed to deserialize algo orders payload: {e}"
4247 );
4248 continue;
4249 }
4250 };
4251
4252 let mut exec_reports: Vec<ExecutionReport> =
4253 Vec::with_capacity(orders.len());
4254
4255 for msg in orders {
4256 let raw_child = parse_client_order_id(&msg.cl_ord_id);
4257 let parent_from_msg = parse_client_order_id(&msg.algo_cl_ord_id);
4258 let effective_client_id = self
4259 .register_client_order_aliases(&raw_child, &parent_from_msg);
4260
4261 match parse_algo_order_msg(
4262 msg,
4263 self.account_id,
4264 &self.instruments_cache,
4265 ts_init,
4266 ) {
4267 Ok(report) => {
4268 let adjusted = self.adjust_execution_report(
4269 report,
4270 &effective_client_id,
4271 &raw_child,
4272 );
4273 self.update_caches_with_report(&adjusted);
4274 exec_reports.push(adjusted);
4275 }
4276 Err(e) => {
4277 tracing::error!("Failed to parse algo order message: {e}");
4278 }
4279 }
4280 }
4281
4282 if !exec_reports.is_empty() {
4283 return Some(NautilusWsMessage::ExecutionReports(exec_reports));
4284 }
4285
4286 continue;
4287 }
4288 _ => {
4289 let Some(inst_id) = inst_id else {
4290 tracing::error!("No instrument for channel {:?}", channel);
4291 continue;
4292 };
4293
4294 let Some(instrument) = self.instruments_cache.get(&inst_id) else {
4295 tracing::error!(
4296 "No instrument for channel {:?}, inst_id {:?}",
4297 channel,
4298 inst_id
4299 );
4300 continue;
4301 };
4302
4303 let instrument_id = instrument.id();
4304 let price_precision = instrument.price_precision();
4305 let size_precision = instrument.size_precision();
4306
4307 match parse_ws_message_data(
4308 &channel,
4309 data,
4310 &instrument_id,
4311 price_precision,
4312 size_precision,
4313 ts_init,
4314 &mut self.funding_rate_cache,
4315 &self.instruments_cache,
4316 ) {
4317 Ok(Some(msg)) => return Some(msg),
4318 Ok(None) => continue,
4319 Err(e) => {
4320 tracing::error!(
4321 "Error parsing message for channel {:?}: {e}",
4322 channel
4323 );
4324 continue;
4325 }
4326 }
4327 }
4328 }
4329 }
4330 OKXWebSocketEvent::Error { code, msg } => {
4331 let error = OKXWebSocketError {
4332 code,
4333 message: msg,
4334 conn_id: None,
4335 timestamp: clock.get_time_ns().as_u64(),
4336 };
4337 return Some(NautilusWsMessage::Error(error));
4338 }
4339 OKXWebSocketEvent::Reconnected => {
4340 return Some(NautilusWsMessage::Reconnected);
4341 }
4342 OKXWebSocketEvent::Subscription {
4343 event,
4344 arg,
4345 code,
4346 msg,
4347 ..
4348 } => {
4349 let topic = topic_from_websocket_arg(&arg);
4350 let success = code.as_deref().is_none_or(|c| c == "0");
4351
4352 match event {
4353 OKXSubscriptionEvent::Subscribe => {
4354 if success {
4355 self.subscriptions_state.confirm(&topic);
4356 } else {
4357 tracing::warn!(?topic, error = ?msg, code = ?code, "Subscription failed");
4358 self.subscriptions_state.mark_failure(&topic);
4359 }
4360 }
4361 OKXSubscriptionEvent::Unsubscribe => {
4362 if success {
4363 self.subscriptions_state.clear_pending(&topic);
4364 } else {
4365 tracing::warn!(?topic, error = ?msg, code = ?code, "Unsubscription failed");
4366 self.subscriptions_state.mark_failure(&topic);
4367 }
4368 }
4369 }
4370
4371 continue;
4372 }
4373 OKXWebSocketEvent::ChannelConnCount { .. } => continue,
4374 }
4375 }
4376
4377 None
4378 }
4379}
4380
4381pub fn is_post_only_rejection(code: &str, data: &[Value]) -> bool {
4383 if code == OKX_POST_ONLY_ERROR_CODE {
4384 return true;
4385 }
4386
4387 for entry in data {
4388 if let Some(s_code) = entry.get("sCode").and_then(|value| value.as_str())
4389 && s_code == OKX_POST_ONLY_ERROR_CODE
4390 {
4391 return true;
4392 }
4393
4394 if let Some(inner_code) = entry.get("code").and_then(|value| value.as_str())
4395 && inner_code == OKX_POST_ONLY_ERROR_CODE
4396 {
4397 return true;
4398 }
4399 }
4400
4401 false
4402}
4403
4404#[cfg(test)]
4409mod tests {
4410 use futures_util;
4411 use rstest::rstest;
4412
4413 use super::*;
4414 use crate::common::enums::{OKXExecType, OKXOrderCategory, OKXSide};
4415
4416 #[rstest]
4417 fn test_timestamp_format_for_websocket_auth() {
4418 let timestamp = SystemTime::now()
4419 .duration_since(SystemTime::UNIX_EPOCH)
4420 .expect("System time should be after UNIX epoch")
4421 .as_secs()
4422 .to_string();
4423
4424 assert!(timestamp.parse::<u64>().is_ok());
4425 assert_eq!(timestamp.len(), 10);
4426 assert!(timestamp.chars().all(|c| c.is_ascii_digit()));
4427 }
4428
4429 #[rstest]
4430 fn test_new_without_credentials() {
4431 let client = OKXWebSocketClient::default();
4432 assert!(client.credential.is_none());
4433 assert_eq!(client.api_key(), None);
4434 }
4435
4436 #[rstest]
4437 fn test_new_with_credentials() {
4438 let client = OKXWebSocketClient::new(
4439 None,
4440 Some("test_key".to_string()),
4441 Some("test_secret".to_string()),
4442 Some("test_passphrase".to_string()),
4443 None,
4444 None,
4445 )
4446 .unwrap();
4447 assert!(client.credential.is_some());
4448 assert_eq!(client.api_key(), Some("test_key"));
4449 }
4450
4451 #[rstest]
4452 fn test_new_partial_credentials_fails() {
4453 let result = OKXWebSocketClient::new(
4454 None,
4455 Some("test_key".to_string()),
4456 None,
4457 Some("test_passphrase".to_string()),
4458 None,
4459 None,
4460 );
4461 assert!(result.is_err());
4462 }
4463
4464 #[rstest]
4465 fn test_request_id_generation() {
4466 let client = OKXWebSocketClient::default();
4467
4468 let initial_counter = client.request_id_counter.load(Ordering::SeqCst);
4469
4470 let id1 = client.request_id_counter.fetch_add(1, Ordering::SeqCst);
4471 let id2 = client.request_id_counter.fetch_add(1, Ordering::SeqCst);
4472
4473 assert_eq!(id1, initial_counter);
4474 assert_eq!(id2, initial_counter + 1);
4475 assert_eq!(
4476 client.request_id_counter.load(Ordering::SeqCst),
4477 initial_counter + 2
4478 );
4479 }
4480
4481 #[rstest]
4482 fn test_client_state_management() {
4483 let client = OKXWebSocketClient::default();
4484
4485 assert!(client.is_closed());
4486 assert!(!client.is_active());
4487
4488 let client_with_heartbeat =
4489 OKXWebSocketClient::new(None, None, None, None, None, Some(30)).unwrap();
4490
4491 assert!(client_with_heartbeat.heartbeat.is_some());
4492 assert_eq!(client_with_heartbeat.heartbeat.unwrap(), 30);
4493 }
4494
4495 #[rstest]
4496 fn test_request_cache_operations() {
4497 let client = OKXWebSocketClient::default();
4498
4499 assert_eq!(client.pending_place_requests.len(), 0);
4500 assert_eq!(client.pending_cancel_requests.len(), 0);
4501 assert_eq!(client.pending_amend_requests.len(), 0);
4502
4503 let client_order_id = ClientOrderId::from("test-order-123");
4504 let trader_id = TraderId::from("test-trader-001");
4505 let strategy_id = StrategyId::from("test-strategy-001");
4506 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
4507
4508 let dummy_params = WsPostOrderParamsBuilder::default()
4509 .inst_id("BTC-USDT".to_string())
4510 .td_mode(OKXTradeMode::Cash)
4511 .side(OKXSide::Buy)
4512 .ord_type(OKXOrderType::Limit)
4513 .sz("1".to_string())
4514 .build()
4515 .unwrap();
4516
4517 client.pending_place_requests.insert(
4518 "place-123".to_string(),
4519 (
4520 PendingOrderParams::Regular(dummy_params),
4521 client_order_id,
4522 trader_id,
4523 strategy_id,
4524 instrument_id,
4525 ),
4526 );
4527
4528 assert_eq!(client.pending_place_requests.len(), 1);
4529 assert!(client.pending_place_requests.contains_key("place-123"));
4530
4531 let removed = client.pending_place_requests.remove("place-123");
4532 assert!(removed.is_some());
4533 assert_eq!(client.pending_place_requests.len(), 0);
4534 }
4535
4536 #[rstest]
4537 fn test_websocket_error_handling() {
4538 let clock = get_atomic_clock_realtime();
4539 let ts = clock.get_time_ns().as_u64();
4540
4541 let error = OKXWebSocketError {
4542 code: "60012".to_string(),
4543 message: "Invalid request".to_string(),
4544 conn_id: None,
4545 timestamp: ts,
4546 };
4547
4548 assert_eq!(error.code, "60012");
4549 assert_eq!(error.message, "Invalid request");
4550 assert_eq!(error.timestamp, ts);
4551
4552 let nautilus_msg = NautilusWsMessage::Error(error);
4553 match nautilus_msg {
4554 NautilusWsMessage::Error(e) => {
4555 assert_eq!(e.code, "60012");
4556 assert_eq!(e.message, "Invalid request");
4557 }
4558 _ => panic!("Expected Error variant"),
4559 }
4560 }
4561
4562 #[rstest]
4563 fn test_request_id_generation_sequence() {
4564 let client = OKXWebSocketClient::default();
4565
4566 let initial_counter = client
4567 .request_id_counter
4568 .load(std::sync::atomic::Ordering::SeqCst);
4569 let mut ids = Vec::new();
4570 for _ in 0..10 {
4571 let id = client
4572 .request_id_counter
4573 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
4574 ids.push(id);
4575 }
4576
4577 for (i, &id) in ids.iter().enumerate() {
4578 assert_eq!(id, initial_counter + i as u64);
4579 }
4580
4581 assert_eq!(
4582 client
4583 .request_id_counter
4584 .load(std::sync::atomic::Ordering::SeqCst),
4585 initial_counter + 10
4586 );
4587 }
4588
4589 #[rstest]
4590 fn test_client_state_transitions() {
4591 let client = OKXWebSocketClient::default();
4592
4593 assert!(client.is_closed());
4594 assert!(!client.is_active());
4595
4596 let client_with_heartbeat = OKXWebSocketClient::new(
4597 None,
4598 None,
4599 None,
4600 None,
4601 None,
4602 Some(30), )
4604 .unwrap();
4605
4606 assert!(client_with_heartbeat.heartbeat.is_some());
4607 assert_eq!(client_with_heartbeat.heartbeat.unwrap(), 30);
4608
4609 let account_id = AccountId::from("test-account-123");
4610 let client_with_account =
4611 OKXWebSocketClient::new(None, None, None, None, Some(account_id), None).unwrap();
4612
4613 assert_eq!(client_with_account.account_id, account_id);
4614 }
4615
4616 #[tokio::test]
4617 async fn test_concurrent_request_handling() {
4618 let client = Arc::new(OKXWebSocketClient::default());
4619
4620 let initial_counter = client
4621 .request_id_counter
4622 .load(std::sync::atomic::Ordering::SeqCst);
4623 let mut handles = Vec::new();
4624
4625 for i in 0..10 {
4626 let client_clone = Arc::clone(&client);
4627 let handle = tokio::spawn(async move {
4628 let client_order_id = ClientOrderId::from(format!("order-{i}").as_str());
4629 let trader_id = TraderId::from("trader-001");
4630 let strategy_id = StrategyId::from("strategy-001");
4631 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
4632
4633 let request_id = client_clone
4634 .request_id_counter
4635 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
4636 let request_id_str = request_id.to_string();
4637
4638 let dummy_params = WsPostOrderParamsBuilder::default()
4639 .inst_id(instrument_id.symbol.to_string())
4640 .td_mode(OKXTradeMode::Cash)
4641 .side(OKXSide::Buy)
4642 .ord_type(OKXOrderType::Limit)
4643 .sz("1".to_string())
4644 .build()
4645 .unwrap();
4646
4647 client_clone.pending_place_requests.insert(
4648 request_id_str.clone(),
4649 (
4650 PendingOrderParams::Regular(dummy_params),
4651 client_order_id,
4652 trader_id,
4653 strategy_id,
4654 instrument_id,
4655 ),
4656 );
4657
4658 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
4660
4661 let removed = client_clone.pending_place_requests.remove(&request_id_str);
4663 assert!(removed.is_some());
4664
4665 request_id
4666 });
4667 handles.push(handle);
4668 }
4669
4670 let results: Vec<_> = futures_util::future::join_all(handles).await;
4672
4673 assert_eq!(results.len(), 10);
4674 for result in results {
4675 assert!(result.is_ok());
4676 }
4677
4678 assert_eq!(client.pending_place_requests.len(), 0);
4679
4680 let final_counter = client
4681 .request_id_counter
4682 .load(std::sync::atomic::Ordering::SeqCst);
4683 assert_eq!(final_counter, initial_counter + 10);
4684 }
4685
4686 #[rstest]
4687 fn test_websocket_error_scenarios() {
4688 let clock = get_atomic_clock_realtime();
4689 let ts = clock.get_time_ns().as_u64();
4690
4691 let error_scenarios = vec![
4692 ("60012", "Invalid request", None),
4693 ("60009", "Invalid API key", Some("conn-123".to_string())),
4694 ("60014", "Too many requests", None),
4695 ("50001", "Order not found", None),
4696 ];
4697
4698 for (code, message, conn_id) in error_scenarios {
4699 let error = OKXWebSocketError {
4700 code: code.to_string(),
4701 message: message.to_string(),
4702 conn_id: conn_id.clone(),
4703 timestamp: ts,
4704 };
4705
4706 assert_eq!(error.code, code);
4707 assert_eq!(error.message, message);
4708 assert_eq!(error.conn_id, conn_id);
4709 assert_eq!(error.timestamp, ts);
4710
4711 let nautilus_msg = NautilusWsMessage::Error(error);
4712 match nautilus_msg {
4713 NautilusWsMessage::Error(e) => {
4714 assert_eq!(e.code, code);
4715 assert_eq!(e.message, message);
4716 assert_eq!(e.conn_id, conn_id);
4717 }
4718 _ => panic!("Expected Error variant"),
4719 }
4720 }
4721 }
4722
4723 #[tokio::test]
4724 async fn test_feed_handler_reconnection_detection() {
4725 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
4726 let signal = Arc::new(AtomicBool::new(false));
4727 let mut handler = OKXFeedHandler::new(rx, signal.clone());
4728
4729 tx.send(Message::Text(RECONNECTED.to_string().into()))
4730 .unwrap();
4731
4732 let result = handler.next().await;
4733 assert!(matches!(result, Some(OKXWebSocketEvent::Reconnected)));
4734 }
4735
4736 #[tokio::test]
4737 async fn test_feed_handler_normal_message_processing() {
4738 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
4739 let signal = Arc::new(AtomicBool::new(false));
4740 let mut handler = OKXFeedHandler::new(rx, signal.clone());
4741
4742 let ping_msg = TEXT_PING;
4744 tx.send(Message::Text(ping_msg.to_string().into())).unwrap();
4745
4746 let sub_msg = r#"{
4748 "event": "subscribe",
4749 "arg": {
4750 "channel": "tickers",
4751 "instType": "SPOT"
4752 },
4753 "connId": "a4d3ae55"
4754 }"#;
4755
4756 tx.send(Message::Text(sub_msg.to_string().into())).unwrap();
4757
4758 let first = handler.next().await;
4759 assert!(matches!(first, Some(OKXWebSocketEvent::Ping)));
4760
4761 signal.store(true, std::sync::atomic::Ordering::Relaxed);
4763
4764 let result = handler.next().await;
4765 assert!(result.is_none());
4766 }
4767
4768 #[tokio::test]
4769 async fn test_feed_handler_stop_signal() {
4770 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel();
4771 let signal = Arc::new(AtomicBool::new(true)); let mut handler = OKXFeedHandler::new(rx, signal.clone());
4773
4774 let result = handler.next().await;
4775 assert!(result.is_none());
4776 }
4777
4778 #[tokio::test]
4779 async fn test_feed_handler_close_message() {
4780 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
4781 let signal = Arc::new(AtomicBool::new(false));
4782 let mut handler = OKXFeedHandler::new(rx, signal.clone());
4783
4784 tx.send(Message::Close(None)).unwrap();
4786
4787 let result = handler.next().await;
4788 assert!(result.is_none());
4789 }
4790
4791 #[tokio::test]
4792 async fn test_reconnection_message_constant() {
4793 assert_eq!(RECONNECTED, "__RECONNECTED__");
4794 }
4795
4796 #[tokio::test]
4797 async fn test_multiple_reconnection_signals() {
4798 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
4799 let signal = Arc::new(AtomicBool::new(false));
4800 let mut handler = OKXFeedHandler::new(rx, signal.clone());
4801
4802 for _ in 0..3 {
4804 tx.send(Message::Text(RECONNECTED.to_string().into()))
4805 .unwrap();
4806
4807 let result = handler.next().await;
4808 assert!(matches!(result, Some(OKXWebSocketEvent::Reconnected)));
4809 }
4810 }
4811
4812 #[tokio::test]
4813 async fn test_wait_until_active_timeout() {
4814 let client = OKXWebSocketClient::new(
4815 None,
4816 Some("test_key".to_string()),
4817 Some("test_secret".to_string()),
4818 Some("test_passphrase".to_string()),
4819 Some(AccountId::from("test-account")),
4820 None,
4821 )
4822 .unwrap();
4823
4824 let result = client.wait_until_active(0.1).await;
4826
4827 assert!(result.is_err());
4828 assert!(!client.is_active());
4829 }
4830
4831 fn sample_canceled_order_msg() -> OKXOrderMsg {
4832 OKXOrderMsg {
4833 acc_fill_sz: Some("0".to_string()),
4834 avg_px: "0".to_string(),
4835 c_time: 0,
4836 cancel_source: None,
4837 cancel_source_reason: None,
4838 category: OKXOrderCategory::Normal,
4839 ccy: ustr::Ustr::from("USDT"),
4840 cl_ord_id: "order-1".to_string(),
4841 algo_cl_ord_id: None,
4842 fee: None,
4843 fee_ccy: ustr::Ustr::from("USDT"),
4844 fill_px: "0".to_string(),
4845 fill_sz: "0".to_string(),
4846 fill_time: 0,
4847 inst_id: ustr::Ustr::from("ETH-USDT-SWAP"),
4848 inst_type: OKXInstrumentType::Swap,
4849 lever: "1".to_string(),
4850 ord_id: ustr::Ustr::from("123456"),
4851 ord_type: OKXOrderType::Limit,
4852 pnl: "0".to_string(),
4853 pos_side: OKXPositionSide::Net,
4854 px: "0".to_string(),
4855 reduce_only: "false".to_string(),
4856 side: OKXSide::Buy,
4857 state: OKXOrderStatus::Canceled,
4858 exec_type: OKXExecType::None,
4859 sz: "1".to_string(),
4860 td_mode: OKXTradeMode::Cross,
4861 tgt_ccy: None,
4862 trade_id: String::new(),
4863 u_time: 0,
4864 }
4865 }
4866
4867 #[rstest]
4868 fn test_is_post_only_auto_cancel_detects_cancel_source() {
4869 let mut msg = sample_canceled_order_msg();
4870 msg.cancel_source = Some(super::OKX_POST_ONLY_CANCEL_SOURCE.to_string());
4871
4872 assert!(OKXWsMessageHandler::is_post_only_auto_cancel(&msg));
4873 }
4874
4875 #[rstest]
4876 fn test_is_post_only_auto_cancel_detects_reason() {
4877 let mut msg = sample_canceled_order_msg();
4878 msg.cancel_source_reason = Some("POST_ONLY would take liquidity".to_string());
4879
4880 assert!(OKXWsMessageHandler::is_post_only_auto_cancel(&msg));
4881 }
4882
4883 #[rstest]
4884 fn test_is_post_only_auto_cancel_false_without_markers() {
4885 let msg = sample_canceled_order_msg();
4886
4887 assert!(!OKXWsMessageHandler::is_post_only_auto_cancel(&msg));
4888 }
4889
4890 #[rstest]
4891 fn test_is_post_only_auto_cancel_false_for_order_type_only() {
4892 let mut msg = sample_canceled_order_msg();
4893 msg.ord_type = OKXOrderType::PostOnly;
4894
4895 assert!(!OKXWsMessageHandler::is_post_only_auto_cancel(&msg));
4896 }
4897
4898 #[rstest]
4899 fn test_is_post_only_rejection_detects_by_code() {
4900 assert!(super::is_post_only_rejection("51019", &[]));
4901 }
4902
4903 #[rstest]
4904 fn test_is_post_only_rejection_detects_by_inner_code() {
4905 let data = vec![serde_json::json!({
4906 "sCode": "51019"
4907 })];
4908 assert!(super::is_post_only_rejection("50000", &data));
4909 }
4910
4911 #[rstest]
4912 fn test_is_post_only_rejection_false_for_unrelated_error() {
4913 let data = vec![serde_json::json!({
4914 "sMsg": "Insufficient balance"
4915 })];
4916 assert!(!super::is_post_only_rejection("50000", &data));
4917 }
4918
4919 #[tokio::test]
4920 async fn test_batch_cancel_orders_with_multiple_orders() {
4921 use nautilus_model::identifiers::{ClientOrderId, InstrumentId, VenueOrderId};
4922
4923 let client = OKXWebSocketClient::new(
4924 Some("wss://test.okx.com".to_string()),
4925 None,
4926 None,
4927 None,
4928 None,
4929 None,
4930 )
4931 .expect("Failed to create client");
4932
4933 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
4934 let client_order_id1 = ClientOrderId::new("order1");
4935 let client_order_id2 = ClientOrderId::new("order2");
4936 let venue_order_id1 = VenueOrderId::new("venue1");
4937 let venue_order_id2 = VenueOrderId::new("venue2");
4938
4939 let orders = vec![
4940 (instrument_id, Some(client_order_id1), Some(venue_order_id1)),
4941 (instrument_id, Some(client_order_id2), Some(venue_order_id2)),
4942 ];
4943
4944 let result = client.batch_cancel_orders(orders).await;
4946
4947 assert!(result.is_err());
4949 }
4950
4951 #[tokio::test]
4952 async fn test_batch_cancel_orders_with_only_client_order_id() {
4953 use nautilus_model::identifiers::{ClientOrderId, InstrumentId};
4954
4955 let client = OKXWebSocketClient::new(
4956 Some("wss://test.okx.com".to_string()),
4957 None,
4958 None,
4959 None,
4960 None,
4961 None,
4962 )
4963 .expect("Failed to create client");
4964
4965 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
4966 let client_order_id = ClientOrderId::new("order1");
4967
4968 let orders = vec![(instrument_id, Some(client_order_id), None)];
4969
4970 let result = client.batch_cancel_orders(orders).await;
4971
4972 assert!(result.is_err());
4974 }
4975
4976 #[tokio::test]
4977 async fn test_batch_cancel_orders_with_only_venue_order_id() {
4978 use nautilus_model::identifiers::{InstrumentId, VenueOrderId};
4979
4980 let client = OKXWebSocketClient::new(
4981 Some("wss://test.okx.com".to_string()),
4982 None,
4983 None,
4984 None,
4985 None,
4986 None,
4987 )
4988 .expect("Failed to create client");
4989
4990 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
4991 let venue_order_id = VenueOrderId::new("venue1");
4992
4993 let orders = vec![(instrument_id, None, Some(venue_order_id))];
4994
4995 let result = client.batch_cancel_orders(orders).await;
4996
4997 assert!(result.is_err());
4999 }
5000
5001 #[tokio::test]
5002 async fn test_batch_cancel_orders_with_both_ids() {
5003 use nautilus_model::identifiers::{ClientOrderId, InstrumentId, VenueOrderId};
5004
5005 let client = OKXWebSocketClient::new(
5006 Some("wss://test.okx.com".to_string()),
5007 None,
5008 None,
5009 None,
5010 None,
5011 None,
5012 )
5013 .expect("Failed to create client");
5014
5015 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
5016 let client_order_id = ClientOrderId::new("order1");
5017 let venue_order_id = VenueOrderId::new("venue1");
5018
5019 let orders = vec![(instrument_id, Some(client_order_id), Some(venue_order_id))];
5020
5021 let result = client.batch_cancel_orders(orders).await;
5022
5023 assert!(result.is_err());
5025 }
5026}