1use std::{
32 collections::VecDeque,
33 sync::{
34 Arc,
35 atomic::{AtomicBool, AtomicU64, Ordering},
36 },
37};
38
39use ahash::AHashMap;
40use dashmap::DashMap;
41use nautilus_core::{AtomicTime, UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
42use nautilus_model::{
43 enums::{OrderStatus, OrderType, TimeInForce},
44 events::{AccountState, OrderCancelRejected, OrderModifyRejected, OrderRejected},
45 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
46 instruments::{Instrument, InstrumentAny},
47 reports::OrderStatusReport,
48 types::{Money, Quantity},
49};
50use nautilus_network::{
51 RECONNECTED,
52 retry::{RetryManager, create_websocket_retry_manager},
53 websocket::{AuthTracker, SubscriptionState, TEXT_PING, TEXT_PONG, WebSocketClient},
54};
55use serde_json::Value;
56use tokio_tungstenite::tungstenite::Message;
57use ustr::Ustr;
58
59use super::{
60 enums::{OKXSubscriptionEvent, OKXWsChannel, OKXWsOperation},
61 error::OKXWsError,
62 messages::{
63 ExecutionReport, NautilusWsMessage, OKXAlgoOrderMsg, OKXBookMsg, OKXOrderMsg,
64 OKXSubscription, OKXSubscriptionArg, OKXWebSocketArg, OKXWebSocketError, OKXWsMessage,
65 OKXWsRequest, WsAmendOrderParams, WsCancelAlgoOrderParamsBuilder,
66 WsCancelOrderParamsBuilder, WsMassCancelParams, WsPostAlgoOrderParams, WsPostOrderParams,
67 },
68 parse::{parse_algo_order_msg, parse_book_msg_vec, parse_order_msg, parse_ws_message_data},
69 subscription::topic_from_websocket_arg,
70};
71use crate::{
72 common::{
73 consts::{
74 OKX_POST_ONLY_CANCEL_REASON, OKX_POST_ONLY_CANCEL_SOURCE, OKX_POST_ONLY_ERROR_CODE,
75 should_retry_error_code,
76 },
77 enums::{
78 OKXBookAction, OKXInstrumentType, OKXOrderStatus, OKXOrderType, OKXSide,
79 OKXTargetCurrency, OKXTradeMode,
80 },
81 parse::{
82 determine_order_type, okx_instrument_type, parse_account_state, parse_client_order_id,
83 parse_millisecond_timestamp, parse_position_status_report, parse_price, parse_quantity,
84 },
85 },
86 http::models::{OKXAccount, OKXPosition},
87 websocket::client::{
88 OKX_RATE_LIMIT_KEY_AMEND, OKX_RATE_LIMIT_KEY_CANCEL, OKX_RATE_LIMIT_KEY_ORDER,
89 OKX_RATE_LIMIT_KEY_SUBSCRIPTION,
90 },
91};
92
93type PlaceRequestData = (
95 PendingOrderParams,
96 ClientOrderId,
97 TraderId,
98 StrategyId,
99 InstrumentId,
100);
101
102type CancelRequestData = (
104 ClientOrderId,
105 TraderId,
106 StrategyId,
107 InstrumentId,
108 Option<VenueOrderId>,
109);
110
111type AmendRequestData = (
113 ClientOrderId,
114 TraderId,
115 StrategyId,
116 InstrumentId,
117 Option<VenueOrderId>,
118);
119
120#[derive(Debug)]
121pub enum PendingOrderParams {
122 Regular(WsPostOrderParams),
123 Algo(WsPostAlgoOrderParams),
124}
125
126#[allow(
128 clippy::large_enum_variant,
129 reason = "Commands are ephemeral and immediately consumed"
130)]
131#[allow(missing_debug_implementations)]
132pub enum HandlerCommand {
133 SetClient(WebSocketClient),
134 Disconnect,
135 Authenticate {
136 payload: String,
137 },
138 InitializeInstruments(Vec<InstrumentAny>),
139 UpdateInstrument(InstrumentAny),
140 Subscribe {
141 args: Vec<OKXSubscriptionArg>,
142 },
143 Unsubscribe {
144 args: Vec<OKXSubscriptionArg>,
145 },
146 PlaceOrder {
147 params: WsPostOrderParams,
148 client_order_id: ClientOrderId,
149 trader_id: TraderId,
150 strategy_id: StrategyId,
151 instrument_id: InstrumentId,
152 },
153 PlaceAlgoOrder {
154 params: WsPostAlgoOrderParams,
155 client_order_id: ClientOrderId,
156 trader_id: TraderId,
157 strategy_id: StrategyId,
158 instrument_id: InstrumentId,
159 },
160 AmendOrder {
161 params: WsAmendOrderParams,
162 client_order_id: ClientOrderId,
163 trader_id: TraderId,
164 strategy_id: StrategyId,
165 instrument_id: InstrumentId,
166 venue_order_id: Option<VenueOrderId>,
167 },
168 CancelOrder {
169 client_order_id: Option<ClientOrderId>,
170 venue_order_id: Option<VenueOrderId>,
171 instrument_id: InstrumentId,
172 trader_id: TraderId,
173 strategy_id: StrategyId,
174 },
175 CancelAlgoOrder {
176 client_order_id: Option<ClientOrderId>,
177 algo_order_id: Option<VenueOrderId>,
178 instrument_id: InstrumentId,
179 trader_id: TraderId,
180 strategy_id: StrategyId,
181 },
182 MassCancel {
183 instrument_id: InstrumentId,
184 },
185 BatchPlaceOrders {
186 args: Vec<Value>,
187 request_id: String,
188 },
189 BatchAmendOrders {
190 args: Vec<Value>,
191 request_id: String,
192 },
193 BatchCancelOrders {
194 args: Vec<Value>,
195 request_id: String,
196 },
197}
198
199pub(super) struct OKXWsFeedHandler {
200 clock: &'static AtomicTime,
201 account_id: AccountId,
202 signal: Arc<AtomicBool>,
203 inner: Option<WebSocketClient>,
204 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
205 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
206 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
207 auth_tracker: AuthTracker,
208 subscriptions_state: SubscriptionState,
209 retry_manager: RetryManager<OKXWsError>,
210 pending_place_requests: AHashMap<String, PlaceRequestData>,
211 pending_cancel_requests: AHashMap<String, CancelRequestData>,
212 pending_amend_requests: AHashMap<String, AmendRequestData>,
213 pending_mass_cancel_requests: AHashMap<String, InstrumentId>,
214 pending_messages: VecDeque<NautilusWsMessage>,
215 active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
216 client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
217 emitted_order_accepted: Arc<DashMap<VenueOrderId, ()>>,
218 instruments_cache: AHashMap<Ustr, InstrumentAny>,
219 fee_cache: AHashMap<Ustr, Money>, filled_qty_cache: AHashMap<Ustr, Quantity>, funding_rate_cache: AHashMap<Ustr, (Ustr, u64)>, last_account_state: Option<AccountState>,
223 request_id_counter: AtomicU64,
224}
225
226impl OKXWsFeedHandler {
227 #[allow(clippy::too_many_arguments)]
229 pub fn new(
230 account_id: AccountId,
231 signal: Arc<AtomicBool>,
232 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
233 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
234 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
235 active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
236 client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
237 emitted_order_accepted: Arc<DashMap<VenueOrderId, ()>>,
238 auth_tracker: AuthTracker,
239 subscriptions_state: SubscriptionState,
240 ) -> Self {
241 Self {
242 clock: get_atomic_clock_realtime(),
243 account_id,
244 signal,
245 inner: None,
246 cmd_rx,
247 raw_rx,
248 out_tx,
249 auth_tracker,
250 subscriptions_state,
251 retry_manager: create_websocket_retry_manager(),
252 pending_place_requests: AHashMap::new(),
253 pending_cancel_requests: AHashMap::new(),
254 pending_amend_requests: AHashMap::new(),
255 pending_mass_cancel_requests: AHashMap::new(),
256 pending_messages: VecDeque::new(),
257 active_client_orders,
258 client_id_aliases,
259 emitted_order_accepted,
260 instruments_cache: AHashMap::new(),
261 fee_cache: AHashMap::new(),
262 filled_qty_cache: AHashMap::new(),
263 funding_rate_cache: AHashMap::new(),
264 last_account_state: None,
265 request_id_counter: AtomicU64::new(0),
266 }
267 }
268
269 pub(super) fn is_stopped(&self) -> bool {
270 self.signal.load(std::sync::atomic::Ordering::Relaxed)
271 }
272
273 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
274 self.out_tx.send(msg).map_err(|_| ())
275 }
276
277 async fn send_with_retry(
279 &self,
280 payload: String,
281 rate_limit_keys: Option<Vec<String>>,
282 ) -> Result<(), OKXWsError> {
283 if let Some(client) = &self.inner {
284 self.retry_manager
285 .execute_with_retry(
286 "websocket_send",
287 || {
288 let payload = payload.clone();
289 let keys = rate_limit_keys.clone();
290 async move {
291 client
292 .send_text(payload, keys)
293 .await
294 .map_err(|e| OKXWsError::ClientError(format!("Send failed: {e}")))
295 }
296 },
297 should_retry_okx_error,
298 create_okx_timeout_error,
299 )
300 .await
301 } else {
302 Err(OKXWsError::ClientError(
303 "No active WebSocket client".to_string(),
304 ))
305 }
306 }
307
308 pub(super) async fn send_pong(&self) -> anyhow::Result<()> {
310 match self.send_with_retry(TEXT_PONG.to_string(), None).await {
311 Ok(()) => {
312 tracing::trace!("Sent pong response to OKX text ping");
313 Ok(())
314 }
315 Err(e) => {
316 tracing::warn!(error = %e, "Failed to send pong after retries");
317 Err(anyhow::anyhow!("Failed to send pong: {e}"))
318 }
319 }
320 }
321
322 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
323 if let Some(message) = self.pending_messages.pop_front() {
324 return Some(message);
325 }
326
327 loop {
328 tokio::select! {
329 Some(cmd) = self.cmd_rx.recv() => {
330 match cmd {
331 HandlerCommand::SetClient(client) => {
332 tracing::info!("Handler received WebSocket client");
333 self.inner = Some(client);
334 }
335 HandlerCommand::Disconnect => {
336 tracing::info!("Handler disconnecting WebSocket client");
337 self.inner = None;
338 }
339 HandlerCommand::Authenticate { payload } => {
340 if let Err(e) = self.send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()])).await {
341 tracing::error!(error = %e, "Failed to send authentication message after retries");
342 }
343 }
344 HandlerCommand::InitializeInstruments(instruments) => {
345 for inst in instruments {
346 self.instruments_cache.insert(inst.symbol().inner(), inst);
347 }
348 }
349 HandlerCommand::UpdateInstrument(inst) => {
350 self.instruments_cache.insert(inst.symbol().inner(), inst);
351 }
352 HandlerCommand::Subscribe { args } => {
353 if let Err(e) = self.handle_subscribe(args).await {
354 tracing::error!(error = %e, "Failed to handle subscribe command");
355 }
356 }
357 HandlerCommand::Unsubscribe { args } => {
358 if let Err(e) = self.handle_unsubscribe(args).await {
359 tracing::error!(error = %e, "Failed to handle unsubscribe command");
360 }
361 }
362 HandlerCommand::CancelOrder {
363 client_order_id,
364 venue_order_id,
365 instrument_id,
366 trader_id,
367 strategy_id,
368 } => {
369 if let Err(e) = self
370 .handle_cancel_order(
371 client_order_id,
372 venue_order_id,
373 instrument_id,
374 trader_id,
375 strategy_id,
376 )
377 .await
378 {
379 tracing::error!(error = %e, "Failed to handle cancel order command");
380 }
381 }
382 HandlerCommand::CancelAlgoOrder {
383 client_order_id,
384 algo_order_id,
385 instrument_id,
386 trader_id,
387 strategy_id,
388 } => {
389 if let Err(e) = self
390 .handle_cancel_algo_order(
391 client_order_id,
392 algo_order_id,
393 instrument_id,
394 trader_id,
395 strategy_id,
396 )
397 .await
398 {
399 tracing::error!(error = %e, "Failed to handle cancel algo order command");
400 }
401 }
402 HandlerCommand::PlaceOrder {
403 params,
404 client_order_id,
405 trader_id,
406 strategy_id,
407 instrument_id,
408 } => {
409 if let Err(e) = self
410 .handle_place_order(
411 params,
412 client_order_id,
413 trader_id,
414 strategy_id,
415 instrument_id,
416 )
417 .await
418 {
419 tracing::error!(error = %e, "Failed to handle place order command");
420 }
421 }
422 HandlerCommand::PlaceAlgoOrder {
423 params,
424 client_order_id,
425 trader_id,
426 strategy_id,
427 instrument_id,
428 } => {
429 if let Err(e) = self
430 .handle_place_algo_order(
431 params,
432 client_order_id,
433 trader_id,
434 strategy_id,
435 instrument_id,
436 )
437 .await
438 {
439 tracing::error!(error = %e, "Failed to handle place algo order command");
440 }
441 }
442 HandlerCommand::AmendOrder {
443 params,
444 client_order_id,
445 trader_id,
446 strategy_id,
447 instrument_id,
448 venue_order_id,
449 } => {
450 if let Err(e) = self
451 .handle_amend_order(
452 params,
453 client_order_id,
454 trader_id,
455 strategy_id,
456 instrument_id,
457 venue_order_id,
458 )
459 .await
460 {
461 tracing::error!(error = %e, "Failed to handle amend order command");
462 }
463 }
464 HandlerCommand::MassCancel { instrument_id } => {
465 if let Err(e) = self.handle_mass_cancel(instrument_id).await {
466 tracing::error!(error = %e, "Failed to handle mass cancel command");
467 }
468 }
469 HandlerCommand::BatchCancelOrders { args, request_id } => {
470 if let Err(e) = self.handle_batch_cancel_orders(args, request_id).await {
471 tracing::error!(error = %e, "Failed to handle batch cancel orders command");
472 }
473 }
474 HandlerCommand::BatchPlaceOrders { args, request_id } => {
475 if let Err(e) = self.handle_batch_place_orders(args, request_id).await {
476 tracing::error!(error = %e, "Failed to handle batch place orders command");
477 }
478 }
479 HandlerCommand::BatchAmendOrders { args, request_id } => {
480 if let Err(e) = self.handle_batch_amend_orders(args, request_id).await {
481 tracing::error!(error = %e, "Failed to handle batch amend orders command");
482 }
483 }
484 }
485 continue;
487 }
488
489 _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
490 if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
491 tracing::debug!("Stop signal received during idle period");
492 return None;
493 }
494 continue;
495 }
496
497 msg = self.raw_rx.recv() => {
498 let event = match msg {
499 Some(msg) => match Self::parse_raw_message(msg) {
500 Some(event) => event,
501 None => continue,
502 },
503 None => {
504 tracing::debug!("WebSocket stream closed");
505 return None;
506 }
507 };
508
509 let ts_init = self.clock.get_time_ns();
510
511 match event {
512 OKXWsMessage::Ping => {
513 if let Err(e) = self.send_pong().await {
514 tracing::warn!(error = %e, "Failed to send pong response");
515 }
516 continue;
517 }
518 OKXWsMessage::Login {
519 code, msg, conn_id, ..
520 } => {
521 if code == "0" {
522 self.auth_tracker.succeed();
523
524 return Some(NautilusWsMessage::Authenticated);
528 }
529
530 tracing::error!(error = %msg, "WebSocket authentication failed");
531 self.auth_tracker.fail(msg.clone());
532
533 let error = OKXWebSocketError {
534 code,
535 message: msg,
536 conn_id: Some(conn_id),
537 timestamp: self.clock.get_time_ns().as_u64(),
538 };
539 self.pending_messages
540 .push_back(NautilusWsMessage::Error(error));
541 continue;
542 }
543 OKXWsMessage::BookData { arg, action, data } => {
544 if let Some(msg) = self.handle_book_data(arg, action, data, ts_init) {
545 return Some(msg);
546 }
547 continue;
548 }
549 OKXWsMessage::OrderResponse {
550 id,
551 op,
552 code,
553 msg,
554 data,
555 } => {
556 if let Some(msg) = self.handle_order_response(id, op, code, msg, data, ts_init) {
557 return Some(msg);
558 }
559 continue;
560 }
561 OKXWsMessage::Data { arg, data } => {
562 let OKXWebSocketArg {
563 channel, inst_id, ..
564 } = arg;
565
566 match channel {
567 OKXWsChannel::Account => {
568 if let Some(msg) = self.handle_account_data(data, ts_init) {
569 return Some(msg);
570 }
571 continue;
572 }
573 OKXWsChannel::Positions => {
574 self.handle_positions_data(data, ts_init);
575 continue;
576 }
577 OKXWsChannel::Orders => {
578 if let Some(msg) = self.handle_orders_data(data, ts_init) {
579 return Some(msg);
580 }
581 continue;
582 }
583 OKXWsChannel::OrdersAlgo => {
584 if let Some(msg) = self.handle_algo_orders_data(data, ts_init) {
585 return Some(msg);
586 }
587 continue;
588 }
589 _ => {
590 if let Some(msg) =
591 self.handle_other_channel_data(channel, inst_id, data, ts_init)
592 {
593 return Some(msg);
594 }
595 continue;
596 }
597 }
598 }
599 OKXWsMessage::Error { code, msg } => {
600 let error = OKXWebSocketError {
601 code,
602 message: msg,
603 conn_id: None,
604 timestamp: self.clock.get_time_ns().as_u64(),
605 };
606 return Some(NautilusWsMessage::Error(error));
607 }
608 OKXWsMessage::Reconnected => {
609 return Some(NautilusWsMessage::Reconnected);
610 }
611 OKXWsMessage::Subscription {
612 event,
613 arg,
614 code,
615 msg,
616 ..
617 } => {
618 let topic = topic_from_websocket_arg(&arg);
619 let success = code.as_deref().is_none_or(|c| c == "0");
620
621 match event {
622 OKXSubscriptionEvent::Subscribe => {
623 if success {
624 self.subscriptions_state.confirm_subscribe(&topic);
625 } else {
626 tracing::warn!(?topic, error = ?msg, code = ?code, "Subscription failed");
627 self.subscriptions_state.mark_failure(&topic);
628 }
629 }
630 OKXSubscriptionEvent::Unsubscribe => {
631 if success {
632 self.subscriptions_state.confirm_unsubscribe(&topic);
633 } else {
634 tracing::warn!(?topic, error = ?msg, code = ?code, "Unsubscription failed - restoring subscription");
635 self.subscriptions_state.confirm_unsubscribe(&topic); self.subscriptions_state.mark_subscribe(&topic); self.subscriptions_state.confirm_subscribe(&topic); }
640 }
641 }
642
643 continue;
644 }
645 OKXWsMessage::ChannelConnCount { .. } => continue,
646 }
647 }
648
649 else => {
651 tracing::debug!("Handler shutting down: stream ended or command channel closed");
652 return None;
653 }
654 }
655 }
656 }
657
658 pub(super) fn is_post_only_auto_cancel(msg: &OKXOrderMsg) -> bool {
659 if msg.state != OKXOrderStatus::Canceled {
660 return false;
661 }
662
663 let cancel_source_matches = matches!(
664 msg.cancel_source.as_deref(),
665 Some(source) if source == OKX_POST_ONLY_CANCEL_SOURCE
666 );
667
668 let reason_matches = matches!(
669 msg.cancel_source_reason.as_deref(),
670 Some(reason) if reason.contains("POST_ONLY")
671 );
672
673 if !(cancel_source_matches || reason_matches) {
674 return false;
675 }
676
677 msg.acc_fill_sz
678 .as_ref()
679 .is_none_or(|filled| filled == "0" || filled.is_empty())
680 }
681
682 fn try_handle_post_only_auto_cancel(
683 &mut self,
684 msg: &OKXOrderMsg,
685 ts_init: UnixNanos,
686 exec_reports: &mut Vec<ExecutionReport>,
687 ) -> bool {
688 if !Self::is_post_only_auto_cancel(msg) {
689 return false;
690 }
691
692 let Some(client_order_id) = parse_client_order_id(&msg.cl_ord_id) else {
693 return false;
694 };
695
696 let Some((_, (trader_id, strategy_id, instrument_id))) =
697 self.active_client_orders.remove(&client_order_id)
698 else {
699 return false;
700 };
701
702 self.client_id_aliases.remove(&client_order_id);
703
704 if !exec_reports.is_empty() {
705 let reports = std::mem::take(exec_reports);
706 self.pending_messages
707 .push_back(NautilusWsMessage::ExecutionReports(reports));
708 }
709
710 let reason = msg
711 .cancel_source_reason
712 .as_ref()
713 .filter(|reason| !reason.is_empty())
714 .map_or_else(
715 || Ustr::from(OKX_POST_ONLY_CANCEL_REASON),
716 |reason| Ustr::from(reason.as_str()),
717 );
718
719 let ts_event = parse_millisecond_timestamp(msg.u_time);
720 let rejected = OrderRejected::new(
721 trader_id,
722 strategy_id,
723 instrument_id,
724 client_order_id,
725 self.account_id,
726 reason,
727 UUID4::new(),
728 ts_event,
729 ts_init,
730 false,
731 true,
732 );
733
734 self.pending_messages
735 .push_back(NautilusWsMessage::OrderRejected(rejected));
736
737 true
738 }
739
740 fn register_client_order_aliases(
741 &self,
742 raw_child: &Option<ClientOrderId>,
743 parent_from_msg: &Option<ClientOrderId>,
744 ) -> Option<ClientOrderId> {
745 if let Some(parent) = parent_from_msg {
746 self.client_id_aliases.insert(*parent, *parent);
747 if let Some(child) = raw_child.as_ref().filter(|child| **child != *parent) {
748 self.client_id_aliases.insert(*child, *parent);
749 }
750 Some(*parent)
751 } else if let Some(child) = raw_child.as_ref() {
752 if let Some(mapped) = self.client_id_aliases.get(child) {
753 Some(*mapped.value())
754 } else {
755 self.client_id_aliases.insert(*child, *child);
756 Some(*child)
757 }
758 } else {
759 None
760 }
761 }
762
763 fn adjust_execution_report(
764 &self,
765 report: ExecutionReport,
766 effective_client_id: &Option<ClientOrderId>,
767 raw_child: &Option<ClientOrderId>,
768 ) -> ExecutionReport {
769 match report {
770 ExecutionReport::Order(status_report) => {
771 let mut adjusted = status_report;
772 let mut final_id = *effective_client_id;
773
774 if final_id.is_none() {
775 final_id = adjusted.client_order_id;
776 }
777
778 if final_id.is_none()
779 && let Some(child) = raw_child.as_ref()
780 && let Some(mapped) = self.client_id_aliases.get(child)
781 {
782 final_id = Some(*mapped.value());
783 }
784
785 if let Some(final_id_value) = final_id {
786 if adjusted.client_order_id != Some(final_id_value) {
787 adjusted = adjusted.with_client_order_id(final_id_value);
788 }
789 self.client_id_aliases
790 .insert(final_id_value, final_id_value);
791
792 if let Some(child) =
793 raw_child.as_ref().filter(|child| **child != final_id_value)
794 {
795 adjusted = adjusted.with_linked_order_ids(vec![*child]);
796 }
797 }
798
799 ExecutionReport::Order(adjusted)
800 }
801 ExecutionReport::Fill(mut fill_report) => {
802 let mut final_id = *effective_client_id;
803 if final_id.is_none() {
804 final_id = fill_report.client_order_id;
805 }
806 if final_id.is_none()
807 && let Some(child) = raw_child.as_ref()
808 && let Some(mapped) = self.client_id_aliases.get(child)
809 {
810 final_id = Some(*mapped.value());
811 }
812
813 if let Some(final_id_value) = final_id {
814 fill_report.client_order_id = Some(final_id_value);
815 self.client_id_aliases
816 .insert(final_id_value, final_id_value);
817 }
818
819 ExecutionReport::Fill(fill_report)
820 }
821 }
822 }
823
824 fn update_caches_with_report(&mut self, report: &ExecutionReport) {
825 match report {
826 ExecutionReport::Fill(fill_report) => {
827 let order_id = fill_report.venue_order_id.inner();
828 let current_fee = self
829 .fee_cache
830 .get(&order_id)
831 .copied()
832 .unwrap_or_else(|| Money::new(0.0, fill_report.commission.currency));
833 let total_fee = current_fee + fill_report.commission;
834 self.fee_cache.insert(order_id, total_fee);
835
836 let current_filled_qty = self
837 .filled_qty_cache
838 .get(&order_id)
839 .copied()
840 .unwrap_or_else(|| Quantity::zero(fill_report.last_qty.precision));
841 let total_filled_qty = current_filled_qty + fill_report.last_qty;
842 self.filled_qty_cache.insert(order_id, total_filled_qty);
843 }
844 ExecutionReport::Order(status_report) => {
845 if matches!(status_report.order_status, OrderStatus::Filled) {
846 self.fee_cache.remove(&status_report.venue_order_id.inner());
847 self.filled_qty_cache
848 .remove(&status_report.venue_order_id.inner());
849 }
850
851 if matches!(
852 status_report.order_status,
853 OrderStatus::Canceled
854 | OrderStatus::Expired
855 | OrderStatus::Filled
856 | OrderStatus::Rejected,
857 ) {
858 if let Some(client_order_id) = status_report.client_order_id {
859 self.active_client_orders.remove(&client_order_id);
860 self.client_id_aliases.remove(&client_order_id);
861 }
862 if let Some(linked) = &status_report.linked_order_ids {
863 for child in linked {
864 self.client_id_aliases.remove(child);
865 }
866 }
867 }
868 }
869 }
870 }
871
872 #[allow(clippy::too_many_lines)]
873 fn handle_order_response(
874 &mut self,
875 id: Option<String>,
876 op: OKXWsOperation,
877 code: String,
878 msg: String,
879 data: Vec<Value>,
880 ts_init: UnixNanos,
881 ) -> Option<NautilusWsMessage> {
882 if code == "0" {
883 tracing::debug!("Order operation successful: id={id:?} op={op} code={code}");
884
885 if op == OKXWsOperation::BatchCancelOrders {
886 tracing::debug!(
887 "Batch cancel operation successful: id={id:?} cancelled_count={}",
888 data.len()
889 );
890
891 for (idx, entry) in data.iter().enumerate() {
893 if let Some(entry_code) = entry.get("sCode").and_then(|v| v.as_str())
894 && entry_code != "0"
895 {
896 let entry_msg = entry
897 .get("sMsg")
898 .and_then(|v| v.as_str())
899 .unwrap_or("Unknown error");
900
901 if let Some(cl_ord_id_str) = entry
902 .get("clOrdId")
903 .and_then(|v| v.as_str())
904 .filter(|s| !s.is_empty())
905 {
906 tracing::error!(
907 "Batch cancel partial failure for order {}: sCode={} sMsg={}",
908 cl_ord_id_str,
909 entry_code,
910 entry_msg
911 );
912 } else {
914 tracing::error!(
915 "Batch cancel entry[{}] failed: sCode={} sMsg={} data={:?}",
916 idx,
917 entry_code,
918 entry_msg,
919 entry
920 );
921 }
922 }
923 }
924
925 return None;
926 } else if op == OKXWsOperation::MassCancel
927 && let Some(request_id) = &id
928 && let Some(instrument_id) = self.pending_mass_cancel_requests.remove(request_id)
929 {
930 tracing::info!(
931 "Mass cancel operation successful for instrument: {}",
932 instrument_id
933 );
934 } else if op == OKXWsOperation::Order
935 && let Some(request_id) = &id
936 && let Some((params, client_order_id, _trader_id, _strategy_id, instrument_id)) =
937 self.pending_place_requests.remove(request_id)
938 {
939 let (venue_order_id, ts_accepted) = if let Some(first) = data.first() {
940 let ord_id = first
941 .get("ordId")
942 .and_then(|v| v.as_str())
943 .filter(|s| !s.is_empty())
944 .map(VenueOrderId::new);
945
946 let ts = first
947 .get("ts")
948 .and_then(|v| v.as_str())
949 .and_then(|s| s.parse::<u64>().ok())
950 .map_or_else(
951 || self.clock.get_time_ns(),
952 |ms| UnixNanos::from(ms * 1_000_000),
953 );
954
955 (ord_id, ts)
956 } else {
957 (None, self.clock.get_time_ns())
958 };
959
960 if let Some(instrument) = self.instruments_cache.get(&instrument_id.symbol.inner())
961 {
962 match params {
963 PendingOrderParams::Regular(order_params) => {
964 let order_type = determine_order_type(
965 order_params.ord_type,
966 order_params.px.as_deref().unwrap_or(""),
967 );
968
969 let is_explicit_quote_sized = order_params
970 .tgt_ccy
971 .is_some_and(|tgt| tgt == OKXTargetCurrency::QuoteCcy);
972
973 let is_implicit_quote_sized = order_params.tgt_ccy.is_none()
975 && order_params.side == OKXSide::Buy
976 && order_type == OrderType::Market
977 && order_params.td_mode == OKXTradeMode::Cash
978 && instrument.instrument_class().as_ref() == "SPOT";
979
980 if is_explicit_quote_sized || is_implicit_quote_sized {
981 tracing::info!(
986 "Skipping synthetic OrderAccepted for {} quote-sized order: client_order_id={client_order_id}, venue_order_id={venue_order_id:?}",
987 if is_explicit_quote_sized {
988 "explicit"
989 } else {
990 "implicit"
991 },
992 );
993 return None;
994 }
995
996 let order_side = order_params.side.into();
997 let time_in_force = match order_params.ord_type {
998 OKXOrderType::Fok => TimeInForce::Fok,
999 OKXOrderType::Ioc | OKXOrderType::OptimalLimitIoc => {
1000 TimeInForce::Ioc
1001 }
1002 _ => TimeInForce::Gtc,
1003 };
1004
1005 let size_precision = instrument.size_precision();
1006 let quantity = match parse_quantity(&order_params.sz, size_precision) {
1007 Ok(q) => q,
1008 Err(e) => {
1009 tracing::error!(
1010 "Failed to parse quantity for accepted order: {e}"
1011 );
1012 return None;
1013 }
1014 };
1015
1016 let filled_qty = Quantity::zero(size_precision);
1017
1018 let mut report = OrderStatusReport::new(
1019 self.account_id,
1020 instrument_id,
1021 Some(client_order_id),
1022 venue_order_id.unwrap_or_else(|| VenueOrderId::new("PENDING")),
1023 order_side,
1024 order_type,
1025 time_in_force,
1026 OrderStatus::Accepted,
1027 quantity,
1028 filled_qty,
1029 ts_accepted,
1030 ts_accepted, ts_init,
1032 None, );
1034
1035 if let Some(px) = &order_params.px
1036 && !px.is_empty()
1037 && let Ok(price) = parse_price(px, instrument.price_precision())
1038 {
1039 report = report.with_price(price);
1040 }
1041
1042 if let Some(true) = order_params.reduce_only {
1043 report = report.with_reduce_only(true);
1044 }
1045
1046 if order_type == OrderType::Limit
1047 && order_params.ord_type == OKXOrderType::PostOnly
1048 {
1049 report = report.with_post_only(true);
1050 }
1051
1052 if let Some(ref v_order_id) = venue_order_id {
1053 self.emitted_order_accepted.insert(*v_order_id, ());
1054 }
1055
1056 tracing::debug!(
1057 "Order accepted: client_order_id={client_order_id}, venue_order_id={:?}",
1058 venue_order_id
1059 );
1060
1061 return Some(NautilusWsMessage::ExecutionReports(vec![
1062 ExecutionReport::Order(report),
1063 ]));
1064 }
1065 PendingOrderParams::Algo(_) => {
1066 tracing::info!(
1067 "Algo order placement confirmed: client_order_id={client_order_id}, venue_order_id={:?}",
1068 venue_order_id
1069 );
1070 }
1071 }
1072 } else {
1073 tracing::error!("Instrument not found for accepted order: {instrument_id}");
1074 }
1075 }
1076
1077 if let Some(first) = data.first()
1078 && let Some(success_msg) = first.get("sMsg").and_then(|value| value.as_str())
1079 {
1080 tracing::debug!("Order details: {success_msg}");
1081 }
1082
1083 return None;
1084 }
1085
1086 let error_msg = data
1087 .first()
1088 .and_then(|d| d.get("sMsg"))
1089 .and_then(|s| s.as_str())
1090 .unwrap_or(&msg)
1091 .to_string();
1092
1093 if let Some(first) = data.first() {
1094 tracing::debug!(
1095 "Error data fields: {}",
1096 serde_json::to_string_pretty(first)
1097 .unwrap_or_else(|_| "unable to serialize".to_string())
1098 );
1099 }
1100
1101 tracing::warn!("Order operation failed: id={id:?} op={op} code={code} msg={error_msg}");
1102
1103 let ts_event = self.clock.get_time_ns();
1104
1105 if let Some(request_id) = &id {
1106 match op {
1107 OKXWsOperation::Order => {
1108 if let Some((_params, client_order_id, trader_id, strategy_id, instrument_id)) =
1109 self.pending_place_requests.remove(request_id)
1110 {
1111 let due_post_only = is_post_only_rejection(code.as_str(), &data);
1112 let rejected = OrderRejected::new(
1113 trader_id,
1114 strategy_id,
1115 instrument_id,
1116 client_order_id,
1117 self.account_id,
1118 Ustr::from(error_msg.as_str()),
1119 UUID4::new(),
1120 ts_event,
1121 ts_init,
1122 false, due_post_only,
1124 );
1125
1126 return Some(NautilusWsMessage::OrderRejected(rejected));
1127 }
1128 }
1129 OKXWsOperation::CancelOrder => {
1130 if let Some((
1131 client_order_id,
1132 trader_id,
1133 strategy_id,
1134 instrument_id,
1135 venue_order_id,
1136 )) = self.pending_cancel_requests.remove(request_id)
1137 {
1138 let rejected = OrderCancelRejected::new(
1139 trader_id,
1140 strategy_id,
1141 instrument_id,
1142 client_order_id,
1143 Ustr::from(error_msg.as_str()),
1144 UUID4::new(),
1145 ts_event,
1146 ts_init,
1147 false, venue_order_id,
1149 Some(self.account_id),
1150 );
1151
1152 return Some(NautilusWsMessage::OrderCancelRejected(rejected));
1153 }
1154 }
1155 OKXWsOperation::AmendOrder => {
1156 if let Some((
1157 client_order_id,
1158 trader_id,
1159 strategy_id,
1160 instrument_id,
1161 venue_order_id,
1162 )) = self.pending_amend_requests.remove(request_id)
1163 {
1164 let rejected = OrderModifyRejected::new(
1165 trader_id,
1166 strategy_id,
1167 instrument_id,
1168 client_order_id,
1169 Ustr::from(error_msg.as_str()),
1170 UUID4::new(),
1171 ts_event,
1172 ts_init,
1173 false, venue_order_id,
1175 Some(self.account_id),
1176 );
1177
1178 return Some(NautilusWsMessage::OrderModifyRejected(rejected));
1179 }
1180 }
1181 OKXWsOperation::OrderAlgo => {
1182 if let Some((_params, client_order_id, trader_id, strategy_id, instrument_id)) =
1183 self.pending_place_requests.remove(request_id)
1184 {
1185 let due_post_only = is_post_only_rejection(code.as_str(), &data);
1186 let rejected = OrderRejected::new(
1187 trader_id,
1188 strategy_id,
1189 instrument_id,
1190 client_order_id,
1191 self.account_id,
1192 Ustr::from(error_msg.as_str()),
1193 UUID4::new(),
1194 ts_event,
1195 ts_init,
1196 false, due_post_only,
1198 );
1199
1200 return Some(NautilusWsMessage::OrderRejected(rejected));
1201 }
1202 }
1203 OKXWsOperation::CancelAlgos => {
1204 if let Some((
1205 client_order_id,
1206 trader_id,
1207 strategy_id,
1208 instrument_id,
1209 venue_order_id,
1210 )) = self.pending_cancel_requests.remove(request_id)
1211 {
1212 let rejected = OrderCancelRejected::new(
1213 trader_id,
1214 strategy_id,
1215 instrument_id,
1216 client_order_id,
1217 Ustr::from(error_msg.as_str()),
1218 UUID4::new(),
1219 ts_event,
1220 ts_init,
1221 false, venue_order_id,
1223 Some(self.account_id),
1224 );
1225
1226 return Some(NautilusWsMessage::OrderCancelRejected(rejected));
1227 }
1228 }
1229 OKXWsOperation::MassCancel => {
1230 if let Some(instrument_id) =
1231 self.pending_mass_cancel_requests.remove(request_id)
1232 {
1233 tracing::error!(
1234 "Mass cancel operation failed for {}: code={code} msg={error_msg}",
1235 instrument_id
1236 );
1237 let error = OKXWebSocketError {
1238 code,
1239 message: format!(
1240 "Mass cancel failed for {}: {}",
1241 instrument_id, error_msg
1242 ),
1243 conn_id: None,
1244 timestamp: ts_event.as_u64(),
1245 };
1246 return Some(NautilusWsMessage::Error(error));
1247 } else {
1248 tracing::error!(
1249 "Mass cancel operation failed: code={code} msg={error_msg}"
1250 );
1251 }
1252 }
1253 OKXWsOperation::BatchCancelOrders => {
1254 tracing::warn!(
1255 "Batch cancel operation failed: id={id:?} code={code} msg={error_msg} data_count={}",
1256 data.len()
1257 );
1258
1259 for (idx, entry) in data.iter().enumerate() {
1261 let entry_code =
1262 entry.get("sCode").and_then(|v| v.as_str()).unwrap_or(&code);
1263 let entry_msg = entry
1264 .get("sMsg")
1265 .and_then(|v| v.as_str())
1266 .unwrap_or(&error_msg);
1267
1268 if entry_code != "0" {
1269 if let Some(cl_ord_id_str) = entry
1271 .get("clOrdId")
1272 .and_then(|v| v.as_str())
1273 .filter(|s| !s.is_empty())
1274 {
1275 tracing::error!(
1276 "Batch cancel failed for order {}: sCode={} sMsg={}",
1277 cl_ord_id_str,
1278 entry_code,
1279 entry_msg
1280 );
1281 } else {
1284 tracing::error!(
1285 "Batch cancel entry[{}] failed: sCode={} sMsg={} data={:?}",
1286 idx,
1287 entry_code,
1288 entry_msg,
1289 entry
1290 );
1291 }
1292 }
1293 }
1294
1295 let error = OKXWebSocketError {
1297 code,
1298 message: format!("Batch cancel failed: {}", error_msg),
1299 conn_id: None,
1300 timestamp: ts_event.as_u64(),
1301 };
1302 return Some(NautilusWsMessage::Error(error));
1303 }
1304 _ => tracing::warn!("Unhandled operation type for rejection: {op}"),
1305 }
1306 }
1307
1308 let error = OKXWebSocketError {
1309 code,
1310 message: error_msg,
1311 conn_id: None,
1312 timestamp: ts_event.as_u64(),
1313 };
1314 Some(NautilusWsMessage::Error(error))
1315 }
1316
1317 fn handle_book_data(
1318 &self,
1319 arg: OKXWebSocketArg,
1320 action: OKXBookAction,
1321 data: Vec<OKXBookMsg>,
1322 ts_init: UnixNanos,
1323 ) -> Option<NautilusWsMessage> {
1324 let Some(inst_id) = arg.inst_id else {
1325 tracing::error!("Instrument ID missing for book data event");
1326 return None;
1327 };
1328
1329 let inst = self.instruments_cache.get(&inst_id)?;
1330
1331 let instrument_id = inst.id();
1332 let price_precision = inst.price_precision();
1333 let size_precision = inst.size_precision();
1334
1335 match parse_book_msg_vec(
1336 data,
1337 &instrument_id,
1338 price_precision,
1339 size_precision,
1340 action,
1341 ts_init,
1342 ) {
1343 Ok(payloads) => Some(NautilusWsMessage::Data(payloads)),
1344 Err(e) => {
1345 tracing::error!("Failed to parse book message: {e}");
1346 None
1347 }
1348 }
1349 }
1350
1351 fn handle_account_data(
1352 &mut self,
1353 data: Value,
1354 ts_init: UnixNanos,
1355 ) -> Option<NautilusWsMessage> {
1356 match serde_json::from_value::<Vec<OKXAccount>>(data) {
1357 Ok(accounts) => {
1358 if let Some(account) = accounts.first() {
1359 match parse_account_state(account, self.account_id, ts_init) {
1360 Ok(account_state) => {
1361 if let Some(last_account_state) = &self.last_account_state
1362 && account_state.has_same_balances_and_margins(last_account_state)
1363 {
1364 return None;
1365 }
1366 self.last_account_state = Some(account_state.clone());
1367 Some(NautilusWsMessage::AccountUpdate(account_state))
1368 }
1369 Err(e) => {
1370 tracing::error!("Failed to parse account state: {e}");
1371 None
1372 }
1373 }
1374 } else {
1375 None
1376 }
1377 }
1378 Err(e) => {
1379 tracing::error!("Failed to parse account data: {e}");
1380 None
1381 }
1382 }
1383 }
1384
1385 fn handle_positions_data(&mut self, data: Value, ts_init: UnixNanos) {
1386 match serde_json::from_value::<Vec<OKXPosition>>(data) {
1387 Ok(positions) => {
1388 tracing::debug!("Received {} position update(s)", positions.len());
1389
1390 for position in positions {
1391 let instrument_id =
1392 match InstrumentId::from_as_ref(format!("{}.OKX", position.inst_id)) {
1393 Ok(id) => id,
1394 Err(e) => {
1395 tracing::error!(
1396 "Failed to parse instrument ID from {}: {e}",
1397 position.inst_id
1398 );
1399 continue;
1400 }
1401 };
1402
1403 let instrument = match self.instruments_cache.get(&position.inst_id) {
1404 Some(inst) => inst,
1405 None => {
1406 tracing::warn!(
1407 "Received position update for unknown instrument {}, skipping",
1408 instrument_id
1409 );
1410 continue;
1411 }
1412 };
1413
1414 let size_precision = instrument.size_precision();
1415
1416 match parse_position_status_report(
1417 position,
1418 self.account_id,
1419 instrument_id,
1420 size_precision,
1421 ts_init,
1422 ) {
1423 Ok(position_report) => {
1424 self.pending_messages
1425 .push_back(NautilusWsMessage::PositionUpdate(position_report));
1426 }
1427 Err(e) => {
1428 tracing::error!(
1429 "Failed to parse position status report for {}: {e}",
1430 instrument_id
1431 );
1432 }
1433 }
1434 }
1435 }
1436 Err(e) => {
1437 tracing::error!("Failed to parse positions data: {e}");
1438 }
1439 }
1440 }
1441
1442 fn handle_orders_data(&mut self, data: Value, ts_init: UnixNanos) -> Option<NautilusWsMessage> {
1443 let orders: Vec<OKXOrderMsg> = match serde_json::from_value(data) {
1444 Ok(orders) => orders,
1445 Err(e) => {
1446 tracing::error!("Failed to deserialize orders channel payload: {e}");
1447 return None;
1448 }
1449 };
1450
1451 tracing::debug!(
1452 "Received {} order message(s) from orders channel",
1453 orders.len()
1454 );
1455
1456 let mut exec_reports: Vec<ExecutionReport> = Vec::with_capacity(orders.len());
1457
1458 for msg in orders {
1459 tracing::debug!(
1460 "Processing order message: inst_id={}, cl_ord_id={}, state={:?}, exec_type={:?}",
1461 msg.inst_id,
1462 msg.cl_ord_id,
1463 msg.state,
1464 msg.exec_type
1465 );
1466
1467 if self.try_handle_post_only_auto_cancel(&msg, ts_init, &mut exec_reports) {
1468 continue;
1469 }
1470
1471 let raw_child = parse_client_order_id(&msg.cl_ord_id);
1472 let parent_from_msg = msg
1473 .algo_cl_ord_id
1474 .as_ref()
1475 .filter(|value| !value.is_empty())
1476 .map(ClientOrderId::new);
1477 let effective_client_id =
1478 self.register_client_order_aliases(&raw_child, &parent_from_msg);
1479
1480 match parse_order_msg(
1481 &msg,
1482 self.account_id,
1483 &self.instruments_cache,
1484 &self.fee_cache,
1485 &self.filled_qty_cache,
1486 ts_init,
1487 ) {
1488 Ok(report) => {
1489 tracing::debug!("Successfully parsed execution report: {:?}", report);
1490
1491 let is_duplicate_accepted =
1492 if let ExecutionReport::Order(ref status_report) = report {
1493 if status_report.order_status == OrderStatus::Accepted {
1494 self.emitted_order_accepted
1495 .contains_key(&status_report.venue_order_id)
1496 } else {
1497 false
1498 }
1499 } else {
1500 false
1501 };
1502
1503 if is_duplicate_accepted {
1504 tracing::debug!(
1505 "Skipping duplicate OrderAccepted for venue_order_id={}",
1506 if let ExecutionReport::Order(ref r) = report {
1507 r.venue_order_id.to_string()
1508 } else {
1509 "unknown".to_string()
1510 }
1511 );
1512 continue;
1513 }
1514
1515 if let ExecutionReport::Order(ref status_report) = report
1516 && status_report.order_status == OrderStatus::Accepted
1517 {
1518 self.emitted_order_accepted
1519 .insert(status_report.venue_order_id, ());
1520 }
1521
1522 let adjusted =
1523 self.adjust_execution_report(report, &effective_client_id, &raw_child);
1524
1525 if let ExecutionReport::Order(ref status_report) = adjusted
1527 && matches!(
1528 status_report.order_status,
1529 OrderStatus::Filled
1530 | OrderStatus::Canceled
1531 | OrderStatus::Expired
1532 | OrderStatus::Rejected
1533 )
1534 {
1535 self.emitted_order_accepted
1536 .remove(&status_report.venue_order_id);
1537 }
1538
1539 self.update_caches_with_report(&adjusted);
1540 exec_reports.push(adjusted);
1541 }
1542 Err(e) => tracing::error!("Failed to parse order message: {e}"),
1543 }
1544 }
1545
1546 if !exec_reports.is_empty() {
1547 tracing::debug!(
1548 "Pushing {} execution report(s) to message queue",
1549 exec_reports.len()
1550 );
1551 self.pending_messages
1552 .push_back(NautilusWsMessage::ExecutionReports(exec_reports));
1553 } else {
1554 tracing::debug!("No execution reports generated from order messages");
1555 }
1556
1557 self.pending_messages.pop_front()
1558 }
1559
1560 fn handle_algo_orders_data(
1561 &mut self,
1562 data: Value,
1563 ts_init: UnixNanos,
1564 ) -> Option<NautilusWsMessage> {
1565 let orders: Vec<OKXAlgoOrderMsg> = match serde_json::from_value(data) {
1566 Ok(orders) => orders,
1567 Err(e) => {
1568 tracing::error!("Failed to deserialize algo orders payload: {e}");
1569 return None;
1570 }
1571 };
1572
1573 let mut exec_reports: Vec<ExecutionReport> = Vec::with_capacity(orders.len());
1574
1575 for msg in orders {
1576 let raw_child = parse_client_order_id(&msg.cl_ord_id);
1577 let parent_from_msg = parse_client_order_id(&msg.algo_cl_ord_id);
1578 let effective_client_id =
1579 self.register_client_order_aliases(&raw_child, &parent_from_msg);
1580
1581 match parse_algo_order_msg(msg, self.account_id, &self.instruments_cache, ts_init) {
1582 Ok(report) => {
1583 let adjusted =
1584 self.adjust_execution_report(report, &effective_client_id, &raw_child);
1585 self.update_caches_with_report(&adjusted);
1586 exec_reports.push(adjusted);
1587 }
1588 Err(e) => {
1589 tracing::error!("Failed to parse algo order message: {e}");
1590 }
1591 }
1592 }
1593
1594 if !exec_reports.is_empty() {
1595 Some(NautilusWsMessage::ExecutionReports(exec_reports))
1596 } else {
1597 None
1598 }
1599 }
1600
1601 fn handle_other_channel_data(
1602 &mut self,
1603 channel: OKXWsChannel,
1604 inst_id: Option<Ustr>,
1605 data: Value,
1606 ts_init: UnixNanos,
1607 ) -> Option<NautilusWsMessage> {
1608 let Some(inst_id) = inst_id else {
1609 tracing::error!("No instrument for channel {:?}", channel);
1610 return None;
1611 };
1612
1613 let Some(instrument) = self.instruments_cache.get(&inst_id) else {
1614 tracing::error!(
1615 "No instrument for channel {:?}, inst_id {:?}",
1616 channel,
1617 inst_id
1618 );
1619 return None;
1620 };
1621
1622 let instrument_id = instrument.id();
1623 let price_precision = instrument.price_precision();
1624 let size_precision = instrument.size_precision();
1625
1626 match parse_ws_message_data(
1627 &channel,
1628 data,
1629 &instrument_id,
1630 price_precision,
1631 size_precision,
1632 ts_init,
1633 &mut self.funding_rate_cache,
1634 &self.instruments_cache,
1635 ) {
1636 Ok(Some(msg)) => {
1637 if let NautilusWsMessage::Instrument(ref inst) = msg {
1638 self.instruments_cache
1639 .insert(inst.symbol().inner(), inst.as_ref().clone());
1640 }
1641 Some(msg)
1642 }
1643 Ok(None) => None,
1644 Err(e) => {
1645 tracing::error!("Error parsing message for channel {:?}: {e}", channel);
1646 None
1647 }
1648 }
1649 }
1650
1651 pub(crate) fn parse_raw_message(
1652 msg: tokio_tungstenite::tungstenite::Message,
1653 ) -> Option<OKXWsMessage> {
1654 match msg {
1655 tokio_tungstenite::tungstenite::Message::Text(text) => {
1656 if text == TEXT_PONG {
1657 tracing::trace!("Received pong from OKX");
1658 return None;
1659 }
1660 if text == TEXT_PING {
1661 tracing::trace!("Received ping from OKX (text)");
1662 return Some(OKXWsMessage::Ping);
1663 }
1664
1665 if text == RECONNECTED {
1666 tracing::debug!("Received WebSocket reconnection signal");
1667 return Some(OKXWsMessage::Reconnected);
1668 }
1669 tracing::trace!("Received WebSocket message: {text}");
1670
1671 match serde_json::from_str(&text) {
1672 Ok(ws_event) => match &ws_event {
1673 OKXWsMessage::Error { code, msg } => {
1674 tracing::error!("WebSocket error: {code} - {msg}");
1675 Some(ws_event)
1676 }
1677 OKXWsMessage::Login {
1678 event,
1679 code,
1680 msg,
1681 conn_id,
1682 } => {
1683 if code == "0" {
1684 tracing::info!(conn_id = %conn_id, "WebSocket authenticated");
1685 } else {
1686 tracing::error!(event = %event, code = %code, error = %msg, "WebSocket authentication failed");
1687 }
1688 Some(ws_event)
1689 }
1690 OKXWsMessage::Subscription {
1691 event,
1692 arg,
1693 conn_id,
1694 ..
1695 } => {
1696 let channel_str = serde_json::to_string(&arg.channel)
1697 .expect("Invalid OKX websocket channel")
1698 .trim_matches('"')
1699 .to_string();
1700 tracing::debug!("{event}d: channel={channel_str}, conn_id={conn_id}");
1701 Some(ws_event)
1702 }
1703 OKXWsMessage::ChannelConnCount {
1704 event: _,
1705 channel,
1706 conn_count,
1707 conn_id,
1708 } => {
1709 let channel_str = serde_json::to_string(&channel)
1710 .expect("Invalid OKX websocket channel")
1711 .trim_matches('"')
1712 .to_string();
1713 tracing::debug!(
1714 "Channel connection status: channel={channel_str}, connections={conn_count}, conn_id={conn_id}",
1715 );
1716 None
1717 }
1718 OKXWsMessage::Ping => {
1719 tracing::trace!("Ignoring ping event parsed from text payload");
1720 None
1721 }
1722 OKXWsMessage::Data { .. } => Some(ws_event),
1723 OKXWsMessage::BookData { .. } => Some(ws_event),
1724 OKXWsMessage::OrderResponse {
1725 id,
1726 op,
1727 code,
1728 msg: _,
1729 data,
1730 } => {
1731 if code == "0" {
1732 tracing::debug!(
1733 "Order operation successful: id={:?}, op={op}, code={code}",
1734 id
1735 );
1736
1737 if let Some(order_data) = data.first() {
1738 let success_msg = order_data
1739 .get("sMsg")
1740 .and_then(|s| s.as_str())
1741 .unwrap_or("Order operation successful");
1742 tracing::debug!("Order success details: {success_msg}");
1743 }
1744 }
1745 Some(ws_event)
1746 }
1747 OKXWsMessage::Reconnected => {
1748 tracing::warn!("Unexpected Reconnected event from deserialization");
1750 None
1751 }
1752 },
1753 Err(e) => {
1754 tracing::error!("Failed to parse message: {e}: {text}");
1755 None
1756 }
1757 }
1758 }
1759 Message::Ping(_payload) => {
1760 tracing::trace!("Received binary ping frame from OKX");
1761 Some(OKXWsMessage::Ping)
1762 }
1763 Message::Pong(payload) => {
1764 tracing::trace!("Received pong frame from OKX ({} bytes)", payload.len());
1765 None
1766 }
1767 Message::Binary(msg) => {
1768 tracing::debug!("Raw binary: {msg:?}");
1769 None
1770 }
1771 Message::Close(_) => {
1772 tracing::debug!("Received close message");
1773 None
1774 }
1775 msg => {
1776 tracing::warn!("Unexpected message: {msg}");
1777 None
1778 }
1779 }
1780 }
1781
1782 fn generate_unique_request_id(&self) -> String {
1783 self.request_id_counter
1784 .fetch_add(1, Ordering::SeqCst)
1785 .to_string()
1786 }
1787
1788 fn get_instrument_type_and_family_from_instrument(
1789 instrument: &InstrumentAny,
1790 ) -> anyhow::Result<(OKXInstrumentType, String)> {
1791 let inst_type = okx_instrument_type(instrument)?;
1792 let symbol = instrument.symbol().inner();
1793
1794 let inst_family = match instrument {
1796 InstrumentAny::CurrencyPair(_) => symbol.as_str().to_string(),
1797 InstrumentAny::CryptoPerpetual(_) => {
1798 symbol
1800 .as_str()
1801 .strip_suffix("-SWAP")
1802 .unwrap_or(symbol.as_str())
1803 .to_string()
1804 }
1805 InstrumentAny::CryptoFuture(_) => {
1806 let s = symbol.as_str();
1809 if let Some(idx) = s.rfind('-') {
1810 s[..idx].to_string()
1811 } else {
1812 s.to_string()
1813 }
1814 }
1815 _ => {
1816 anyhow::bail!("Unsupported instrument type for OKX");
1817 }
1818 };
1819
1820 Ok((inst_type, inst_family))
1821 }
1822
1823 async fn handle_mass_cancel(&mut self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1824 let instrument = self
1825 .instruments_cache
1826 .get(&instrument_id.symbol.inner())
1827 .ok_or_else(|| anyhow::anyhow!("Unknown instrument {instrument_id}"))?;
1828
1829 let (inst_type, inst_family) =
1830 Self::get_instrument_type_and_family_from_instrument(instrument)?;
1831
1832 let params = WsMassCancelParams {
1833 inst_type,
1834 inst_family: Ustr::from(&inst_family),
1835 };
1836
1837 let args =
1838 vec![serde_json::to_value(params).map_err(|e| anyhow::anyhow!("JSON error: {e}"))?];
1839
1840 let request_id = self.generate_unique_request_id();
1841
1842 self.pending_mass_cancel_requests
1843 .insert(request_id.clone(), instrument_id);
1844
1845 let request = OKXWsRequest {
1846 id: Some(request_id.clone()),
1847 op: OKXWsOperation::MassCancel,
1848 exp_time: None,
1849 args,
1850 };
1851
1852 let payload = serde_json::to_string(&request)
1853 .map_err(|e| anyhow::anyhow!("Failed to serialize mass cancel request: {e}"))?;
1854
1855 match self
1856 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
1857 .await
1858 {
1859 Ok(()) => {
1860 tracing::debug!("Sent mass cancel for {instrument_id}");
1861 Ok(())
1862 }
1863 Err(e) => {
1864 tracing::error!(error = %e, "Failed to send mass cancel after retries");
1865
1866 self.pending_mass_cancel_requests.remove(&request_id);
1867
1868 let error = OKXWebSocketError {
1869 code: "CLIENT_ERROR".to_string(),
1870 message: format!("Mass cancel failed for {}: {}", instrument_id, e),
1871 conn_id: None,
1872 timestamp: self.clock.get_time_ns().as_u64(),
1873 };
1874 let _ = self.send(NautilusWsMessage::Error(error));
1875
1876 Err(anyhow::anyhow!("Failed to send mass cancel: {e}"))
1877 }
1878 }
1879 }
1880
1881 async fn handle_batch_cancel_orders(
1882 &self,
1883 args: Vec<Value>,
1884 request_id: String,
1885 ) -> anyhow::Result<()> {
1886 let request = OKXWsRequest {
1887 id: Some(request_id),
1888 op: OKXWsOperation::BatchCancelOrders,
1889 exp_time: None,
1890 args,
1891 };
1892
1893 let payload = serde_json::to_string(&request)
1894 .map_err(|e| anyhow::anyhow!("Failed to serialize batch cancel request: {e}"))?;
1895
1896 if let Some(client) = &self.inner {
1897 client
1898 .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
1899 .await
1900 .map_err(|e| anyhow::anyhow!("Failed to send batch cancel: {e}"))?;
1901 tracing::debug!("Sent batch cancel orders");
1902 Ok(())
1903 } else {
1904 Err(anyhow::anyhow!("No active WebSocket client"))
1905 }
1906 }
1907
1908 async fn handle_batch_place_orders(
1909 &self,
1910 args: Vec<Value>,
1911 request_id: String,
1912 ) -> anyhow::Result<()> {
1913 let request = OKXWsRequest {
1914 id: Some(request_id),
1915 op: OKXWsOperation::BatchOrders,
1916 exp_time: None,
1917 args,
1918 };
1919
1920 let payload = serde_json::to_string(&request)
1921 .map_err(|e| anyhow::anyhow!("Failed to serialize batch place request: {e}"))?;
1922
1923 if let Some(client) = &self.inner {
1924 client
1925 .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
1926 .await
1927 .map_err(|e| anyhow::anyhow!("Failed to send batch place: {e}"))?;
1928 tracing::debug!("Sent batch place orders");
1929 Ok(())
1930 } else {
1931 Err(anyhow::anyhow!("No active WebSocket client"))
1932 }
1933 }
1934
1935 async fn handle_batch_amend_orders(
1936 &self,
1937 args: Vec<Value>,
1938 request_id: String,
1939 ) -> anyhow::Result<()> {
1940 let request = OKXWsRequest {
1941 id: Some(request_id),
1942 op: OKXWsOperation::BatchAmendOrders,
1943 exp_time: None,
1944 args,
1945 };
1946
1947 let payload = serde_json::to_string(&request)
1948 .map_err(|e| anyhow::anyhow!("Failed to serialize batch amend request: {e}"))?;
1949
1950 if let Some(client) = &self.inner {
1951 client
1952 .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_AMEND.to_string()]))
1953 .await
1954 .map_err(|e| anyhow::anyhow!("Failed to send batch amend: {e}"))?;
1955 tracing::debug!("Sent batch amend orders");
1956 Ok(())
1957 } else {
1958 Err(anyhow::anyhow!("No active WebSocket client"))
1959 }
1960 }
1961
1962 async fn handle_subscribe(&self, args: Vec<OKXSubscriptionArg>) -> anyhow::Result<()> {
1963 for arg in &args {
1964 tracing::debug!(channel = ?arg.channel, inst_id = ?arg.inst_id, "Subscribing to channel");
1965 }
1966
1967 let message = OKXSubscription {
1968 op: OKXWsOperation::Subscribe,
1969 args,
1970 };
1971
1972 let json_txt = serde_json::to_string(&message)
1973 .map_err(|e| anyhow::anyhow!("Failed to serialize subscription: {e}"))?;
1974
1975 self.send_with_retry(
1976 json_txt,
1977 Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
1978 )
1979 .await
1980 .map_err(|e| anyhow::anyhow!("Failed to send subscription after retries: {e}"))?;
1981 Ok(())
1982 }
1983
1984 async fn handle_unsubscribe(&self, args: Vec<OKXSubscriptionArg>) -> anyhow::Result<()> {
1985 for arg in &args {
1986 tracing::debug!(channel = ?arg.channel, inst_id = ?arg.inst_id, "Unsubscribing from channel");
1987 }
1988
1989 let message = OKXSubscription {
1990 op: OKXWsOperation::Unsubscribe,
1991 args,
1992 };
1993
1994 let json_txt = serde_json::to_string(&message)
1995 .map_err(|e| anyhow::anyhow!("Failed to serialize unsubscription: {e}"))?;
1996
1997 self.send_with_retry(
1998 json_txt,
1999 Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
2000 )
2001 .await
2002 .map_err(|e| anyhow::anyhow!("Failed to send unsubscription after retries: {e}"))?;
2003 Ok(())
2004 }
2005
2006 async fn handle_place_order(
2007 &mut self,
2008 params: WsPostOrderParams,
2009 client_order_id: ClientOrderId,
2010 trader_id: TraderId,
2011 strategy_id: StrategyId,
2012 instrument_id: InstrumentId,
2013 ) -> anyhow::Result<()> {
2014 let request_id = self.generate_unique_request_id();
2015
2016 self.pending_place_requests.insert(
2017 request_id.clone(),
2018 (
2019 PendingOrderParams::Regular(params.clone()),
2020 client_order_id,
2021 trader_id,
2022 strategy_id,
2023 instrument_id,
2024 ),
2025 );
2026
2027 let request = OKXWsRequest {
2028 id: Some(request_id.clone()),
2029 op: OKXWsOperation::Order,
2030 exp_time: None,
2031 args: vec![params],
2032 };
2033
2034 let payload = serde_json::to_string(&request)
2035 .map_err(|e| anyhow::anyhow!("Failed to serialize place order request: {e}"))?;
2036
2037 match self
2038 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2039 .await
2040 {
2041 Ok(()) => {
2042 tracing::debug!("Sent place order request");
2043 Ok(())
2044 }
2045 Err(e) => {
2046 tracing::error!(error = %e, "Failed to send place order after retries");
2047
2048 self.pending_place_requests.remove(&request_id);
2049
2050 let ts_now = self.clock.get_time_ns();
2051 let rejected = OrderRejected::new(
2052 trader_id,
2053 strategy_id,
2054 instrument_id,
2055 client_order_id,
2056 self.account_id,
2057 Ustr::from(&format!("WebSocket send failed: {e}")),
2058 UUID4::new(),
2059 ts_now, ts_now, false, false, );
2064 let _ = self.send(NautilusWsMessage::OrderRejected(rejected));
2065
2066 Err(anyhow::anyhow!("Failed to send place order: {e}"))
2067 }
2068 }
2069 }
2070
2071 async fn handle_place_algo_order(
2072 &mut self,
2073 params: WsPostAlgoOrderParams,
2074 client_order_id: ClientOrderId,
2075 trader_id: TraderId,
2076 strategy_id: StrategyId,
2077 instrument_id: InstrumentId,
2078 ) -> anyhow::Result<()> {
2079 let request_id = self.generate_unique_request_id();
2080
2081 self.pending_place_requests.insert(
2082 request_id.clone(),
2083 (
2084 PendingOrderParams::Algo(params.clone()),
2085 client_order_id,
2086 trader_id,
2087 strategy_id,
2088 instrument_id,
2089 ),
2090 );
2091
2092 let request = OKXWsRequest {
2093 id: Some(request_id.clone()),
2094 op: OKXWsOperation::OrderAlgo,
2095 exp_time: None,
2096 args: vec![params],
2097 };
2098
2099 let payload = serde_json::to_string(&request)
2100 .map_err(|e| anyhow::anyhow!("Failed to serialize place algo order request: {e}"))?;
2101
2102 match self
2103 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2104 .await
2105 {
2106 Ok(()) => {
2107 tracing::debug!("Sent place algo order request");
2108 Ok(())
2109 }
2110 Err(e) => {
2111 tracing::error!(error = %e, "Failed to send place algo order after retries");
2112
2113 self.pending_place_requests.remove(&request_id);
2114
2115 let ts_now = self.clock.get_time_ns();
2116 let rejected = OrderRejected::new(
2117 trader_id,
2118 strategy_id,
2119 instrument_id,
2120 client_order_id,
2121 self.account_id,
2122 Ustr::from(&format!("WebSocket send failed: {e}")),
2123 UUID4::new(),
2124 ts_now, ts_now, false, false, );
2129 let _ = self.send(NautilusWsMessage::OrderRejected(rejected));
2130
2131 Err(anyhow::anyhow!("Failed to send place algo order: {e}"))
2132 }
2133 }
2134 }
2135
2136 async fn handle_cancel_order(
2137 &mut self,
2138 client_order_id: Option<ClientOrderId>,
2139 venue_order_id: Option<VenueOrderId>,
2140 instrument_id: InstrumentId,
2141 trader_id: TraderId,
2142 strategy_id: StrategyId,
2143 ) -> anyhow::Result<()> {
2144 let mut builder = WsCancelOrderParamsBuilder::default();
2145 builder.inst_id(instrument_id.symbol.as_str());
2146
2147 if let Some(venue_order_id) = venue_order_id {
2148 builder.ord_id(venue_order_id.as_str());
2149 }
2150
2151 if let Some(client_order_id) = client_order_id {
2152 builder.cl_ord_id(client_order_id.as_str());
2153 }
2154
2155 let params = builder
2156 .build()
2157 .map_err(|e| anyhow::anyhow!("Failed to build cancel params: {e}"))?;
2158
2159 let request_id = self.generate_unique_request_id();
2160
2161 if let Some(client_order_id) = client_order_id {
2163 self.pending_cancel_requests.insert(
2164 request_id.clone(),
2165 (
2166 client_order_id,
2167 trader_id,
2168 strategy_id,
2169 instrument_id,
2170 venue_order_id,
2171 ),
2172 );
2173 }
2174
2175 let request = OKXWsRequest {
2176 id: Some(request_id.clone()),
2177 op: OKXWsOperation::CancelOrder,
2178 exp_time: None,
2179 args: vec![params],
2180 };
2181
2182 let payload = serde_json::to_string(&request)
2183 .map_err(|e| anyhow::anyhow!("Failed to serialize cancel request: {e}"))?;
2184
2185 match self
2186 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
2187 .await
2188 {
2189 Ok(()) => {
2190 tracing::debug!("Sent cancel order request");
2191 Ok(())
2192 }
2193 Err(e) => {
2194 tracing::error!(error = %e, "Failed to send cancel order after retries");
2195
2196 self.pending_cancel_requests.remove(&request_id);
2197
2198 if let Some(client_order_id) = client_order_id {
2199 let ts_now = self.clock.get_time_ns();
2200 let rejected = OrderCancelRejected::new(
2201 trader_id,
2202 strategy_id,
2203 instrument_id,
2204 client_order_id,
2205 Ustr::from(&format!("WebSocket send failed: {e}")),
2206 UUID4::new(),
2207 ts_now, ts_now, false, venue_order_id,
2211 Some(self.account_id),
2212 );
2213 let _ = self.send(NautilusWsMessage::OrderCancelRejected(rejected));
2214 }
2215
2216 Err(anyhow::anyhow!("Failed to send cancel order: {e}"))
2217 }
2218 }
2219 }
2220
2221 async fn handle_amend_order(
2222 &mut self,
2223 params: WsAmendOrderParams,
2224 client_order_id: ClientOrderId,
2225 trader_id: TraderId,
2226 strategy_id: StrategyId,
2227 instrument_id: InstrumentId,
2228 venue_order_id: Option<VenueOrderId>,
2229 ) -> anyhow::Result<()> {
2230 let request_id = self.generate_unique_request_id();
2231
2232 self.pending_amend_requests.insert(
2233 request_id.clone(),
2234 (
2235 client_order_id,
2236 trader_id,
2237 strategy_id,
2238 instrument_id,
2239 venue_order_id,
2240 ),
2241 );
2242
2243 let request = OKXWsRequest {
2244 id: Some(request_id.clone()),
2245 op: OKXWsOperation::AmendOrder,
2246 exp_time: None,
2247 args: vec![params],
2248 };
2249
2250 let payload = serde_json::to_string(&request)
2251 .map_err(|e| anyhow::anyhow!("Failed to serialize amend order request: {e}"))?;
2252
2253 match self
2254 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_AMEND.to_string()]))
2255 .await
2256 {
2257 Ok(()) => {
2258 tracing::debug!("Sent amend order request");
2259 Ok(())
2260 }
2261 Err(e) => {
2262 tracing::error!(error = %e, "Failed to send amend order after retries");
2263
2264 self.pending_amend_requests.remove(&request_id);
2265
2266 let ts_now = self.clock.get_time_ns();
2267 let rejected = OrderModifyRejected::new(
2268 trader_id,
2269 strategy_id,
2270 instrument_id,
2271 client_order_id,
2272 Ustr::from(&format!("WebSocket send failed: {e}")),
2273 UUID4::new(),
2274 ts_now, ts_now, false, venue_order_id,
2278 Some(self.account_id),
2279 );
2280 let _ = self.send(NautilusWsMessage::OrderModifyRejected(rejected));
2281
2282 Err(anyhow::anyhow!("Failed to send amend order: {e}"))
2283 }
2284 }
2285 }
2286
2287 async fn handle_cancel_algo_order(
2288 &mut self,
2289 client_order_id: Option<ClientOrderId>,
2290 algo_order_id: Option<VenueOrderId>,
2291 instrument_id: InstrumentId,
2292 trader_id: TraderId,
2293 strategy_id: StrategyId,
2294 ) -> anyhow::Result<()> {
2295 let mut builder = WsCancelAlgoOrderParamsBuilder::default();
2296 builder.inst_id(instrument_id.symbol.as_str());
2297
2298 if let Some(client_order_id) = &client_order_id {
2299 builder.algo_cl_ord_id(client_order_id.as_str());
2300 }
2301
2302 if let Some(algo_id) = &algo_order_id {
2303 builder.algo_id(algo_id.as_str());
2304 }
2305
2306 let params = builder
2307 .build()
2308 .map_err(|e| anyhow::anyhow!("Failed to build cancel algo params: {e}"))?;
2309
2310 let request_id = self.generate_unique_request_id();
2311
2312 if let Some(client_order_id) = client_order_id {
2314 self.pending_cancel_requests.insert(
2315 request_id.clone(),
2316 (client_order_id, trader_id, strategy_id, instrument_id, None),
2317 );
2318 }
2319
2320 let request = OKXWsRequest {
2321 id: Some(request_id.clone()),
2322 op: OKXWsOperation::CancelAlgos,
2323 exp_time: None,
2324 args: vec![params],
2325 };
2326
2327 let payload = serde_json::to_string(&request)
2328 .map_err(|e| anyhow::anyhow!("Failed to serialize cancel algo request: {e}"))?;
2329
2330 match self
2331 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
2332 .await
2333 {
2334 Ok(()) => {
2335 tracing::debug!("Sent cancel algo order request");
2336 Ok(())
2337 }
2338 Err(e) => {
2339 tracing::error!(error = %e, "Failed to send cancel algo order after retries");
2340
2341 self.pending_cancel_requests.remove(&request_id);
2342
2343 if let Some(client_order_id) = client_order_id {
2344 let ts_now = self.clock.get_time_ns();
2345 let rejected = OrderCancelRejected::new(
2346 trader_id,
2347 strategy_id,
2348 instrument_id,
2349 client_order_id,
2350 Ustr::from(&format!("WebSocket send failed: {e}")),
2351 UUID4::new(),
2352 ts_now, ts_now, false, None,
2356 Some(self.account_id),
2357 );
2358 let _ = self.send(NautilusWsMessage::OrderCancelRejected(rejected));
2359 }
2360
2361 Err(anyhow::anyhow!("Failed to send cancel algo order: {e}"))
2362 }
2363 }
2364 }
2365}
2366
2367pub fn is_post_only_rejection(code: &str, data: &[Value]) -> bool {
2369 if code == OKX_POST_ONLY_ERROR_CODE {
2370 return true;
2371 }
2372
2373 for entry in data {
2374 if let Some(s_code) = entry.get("sCode").and_then(|value| value.as_str())
2375 && s_code == OKX_POST_ONLY_ERROR_CODE
2376 {
2377 return true;
2378 }
2379
2380 if let Some(inner_code) = entry.get("code").and_then(|value| value.as_str())
2381 && inner_code == OKX_POST_ONLY_ERROR_CODE
2382 {
2383 return true;
2384 }
2385 }
2386
2387 false
2388}
2389
2390fn should_retry_okx_error(error: &OKXWsError) -> bool {
2392 match error {
2393 OKXWsError::OkxError { error_code, .. } => should_retry_error_code(error_code),
2394 OKXWsError::TungsteniteError(_) => true, OKXWsError::ClientError(msg) => {
2396 let msg_lower = msg.to_lowercase();
2398 msg_lower.contains("timeout")
2399 || msg_lower.contains("timed out")
2400 || msg_lower.contains("connection")
2401 || msg_lower.contains("network")
2402 }
2403 OKXWsError::AuthenticationError(_)
2404 | OKXWsError::JsonError(_)
2405 | OKXWsError::ParsingError(_) => {
2406 false
2408 }
2409 }
2410}
2411
2412fn create_okx_timeout_error(msg: String) -> OKXWsError {
2414 OKXWsError::ClientError(msg)
2415}
2416
2417#[cfg(test)]
2422mod tests {
2423 use rstest::rstest;
2424
2425 #[rstest]
2426 fn test_is_post_only_rejection_detects_by_code() {
2427 assert!(super::is_post_only_rejection("51019", &[]));
2428 }
2429
2430 #[rstest]
2431 fn test_is_post_only_rejection_detects_by_inner_code() {
2432 let data = vec![serde_json::json!({
2433 "sCode": "51019"
2434 })];
2435 assert!(super::is_post_only_rejection("50000", &data));
2436 }
2437
2438 #[rstest]
2439 fn test_is_post_only_rejection_false_for_unrelated_error() {
2440 let data = vec![serde_json::json!({
2441 "sMsg": "Insufficient balance"
2442 })];
2443 assert!(!super::is_post_only_rejection("50000", &data));
2444 }
2445}