1use std::{
32 collections::VecDeque,
33 sync::{
34 Arc,
35 atomic::{AtomicBool, AtomicU64, Ordering},
36 },
37};
38
39use ahash::AHashMap;
40use dashmap::DashMap;
41use nautilus_core::{AtomicTime, UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
42use nautilus_model::{
43 enums::{OrderStatus, OrderType},
44 events::{
45 AccountState, OrderAccepted, OrderCancelRejected, OrderModifyRejected, OrderRejected,
46 },
47 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
48 instruments::{Instrument, InstrumentAny},
49 types::{Money, Quantity},
50};
51use nautilus_network::{
52 RECONNECTED,
53 retry::{RetryManager, create_websocket_retry_manager},
54 websocket::{AuthTracker, SubscriptionState, TEXT_PING, TEXT_PONG, WebSocketClient},
55};
56use serde_json::Value;
57use tokio_tungstenite::tungstenite::Message;
58use ustr::Ustr;
59
60use super::{
61 enums::{OKXSubscriptionEvent, OKXWsChannel, OKXWsOperation},
62 error::OKXWsError,
63 messages::{
64 ExecutionReport, NautilusWsMessage, OKXAlgoOrderMsg, OKXBookMsg, OKXOrderMsg,
65 OKXSubscription, OKXSubscriptionArg, OKXWebSocketArg, OKXWebSocketError, OKXWsMessage,
66 OKXWsRequest, WsAmendOrderParams, WsCancelAlgoOrderParamsBuilder,
67 WsCancelOrderParamsBuilder, WsMassCancelParams, WsPostAlgoOrderParams, WsPostOrderParams,
68 },
69 parse::{
70 OrderStateSnapshot, ParsedOrderEvent, parse_algo_order_msg, parse_book_msg_vec,
71 parse_order_event, parse_order_msg, parse_ws_message_data,
72 },
73 subscription::topic_from_websocket_arg,
74};
75use crate::{
76 common::{
77 consts::{
78 OKX_POST_ONLY_CANCEL_REASON, OKX_POST_ONLY_CANCEL_SOURCE, OKX_POST_ONLY_ERROR_CODE,
79 should_retry_error_code,
80 },
81 enums::{
82 OKXBookAction, OKXInstrumentType, OKXOrderStatus, OKXSide, OKXTargetCurrency,
83 OKXTradeMode,
84 },
85 parse::{
86 determine_order_type, is_market_price, okx_instrument_type, parse_account_state,
87 parse_client_order_id, parse_millisecond_timestamp, parse_position_status_report,
88 parse_price, parse_quantity,
89 },
90 },
91 http::models::{OKXAccount, OKXPosition},
92 websocket::client::{
93 OKX_RATE_LIMIT_KEY_AMEND, OKX_RATE_LIMIT_KEY_CANCEL, OKX_RATE_LIMIT_KEY_ORDER,
94 OKX_RATE_LIMIT_KEY_SUBSCRIPTION,
95 },
96};
97
98type PlaceRequestData = (
100 PendingOrderParams,
101 ClientOrderId,
102 TraderId,
103 StrategyId,
104 InstrumentId,
105);
106
107type CancelRequestData = (
109 ClientOrderId,
110 TraderId,
111 StrategyId,
112 InstrumentId,
113 Option<VenueOrderId>,
114);
115
116type AmendRequestData = (
118 ClientOrderId,
119 TraderId,
120 StrategyId,
121 InstrumentId,
122 Option<VenueOrderId>,
123);
124
125#[derive(Debug)]
126pub enum PendingOrderParams {
127 Regular(WsPostOrderParams),
128 Algo(WsPostAlgoOrderParams),
129}
130
131#[allow(
133 clippy::large_enum_variant,
134 reason = "Commands are ephemeral and immediately consumed"
135)]
136#[allow(missing_debug_implementations)]
137pub enum HandlerCommand {
138 SetClient(WebSocketClient),
139 Disconnect,
140 Authenticate {
141 payload: String,
142 },
143 InitializeInstruments(Vec<InstrumentAny>),
144 UpdateInstrument(InstrumentAny),
145 Subscribe {
146 args: Vec<OKXSubscriptionArg>,
147 },
148 Unsubscribe {
149 args: Vec<OKXSubscriptionArg>,
150 },
151 PlaceOrder {
152 params: WsPostOrderParams,
153 client_order_id: ClientOrderId,
154 trader_id: TraderId,
155 strategy_id: StrategyId,
156 instrument_id: InstrumentId,
157 },
158 PlaceAlgoOrder {
159 params: WsPostAlgoOrderParams,
160 client_order_id: ClientOrderId,
161 trader_id: TraderId,
162 strategy_id: StrategyId,
163 instrument_id: InstrumentId,
164 },
165 AmendOrder {
166 params: WsAmendOrderParams,
167 client_order_id: ClientOrderId,
168 trader_id: TraderId,
169 strategy_id: StrategyId,
170 instrument_id: InstrumentId,
171 venue_order_id: Option<VenueOrderId>,
172 },
173 CancelOrder {
174 client_order_id: Option<ClientOrderId>,
175 venue_order_id: Option<VenueOrderId>,
176 instrument_id: InstrumentId,
177 trader_id: TraderId,
178 strategy_id: StrategyId,
179 },
180 CancelAlgoOrder {
181 client_order_id: Option<ClientOrderId>,
182 algo_order_id: Option<VenueOrderId>,
183 instrument_id: InstrumentId,
184 trader_id: TraderId,
185 strategy_id: StrategyId,
186 },
187 MassCancel {
188 instrument_id: InstrumentId,
189 },
190 BatchPlaceOrders {
191 args: Vec<Value>,
192 request_id: String,
193 },
194 BatchAmendOrders {
195 args: Vec<Value>,
196 request_id: String,
197 },
198 BatchCancelOrders {
199 args: Vec<Value>,
200 request_id: String,
201 },
202}
203
204pub(super) struct OKXWsFeedHandler {
205 clock: &'static AtomicTime,
206 account_id: AccountId,
207 signal: Arc<AtomicBool>,
208 inner: Option<WebSocketClient>,
209 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
210 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
211 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
212 auth_tracker: AuthTracker,
213 subscriptions_state: SubscriptionState,
214 retry_manager: RetryManager<OKXWsError>,
215 pending_place_requests: AHashMap<String, PlaceRequestData>,
216 pending_cancel_requests: AHashMap<String, CancelRequestData>,
217 pending_amend_requests: AHashMap<String, AmendRequestData>,
218 pending_mass_cancel_requests: AHashMap<String, InstrumentId>,
219 pending_messages: VecDeque<NautilusWsMessage>,
220 active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
221 client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
222 emitted_order_accepted: AHashMap<VenueOrderId, ()>,
223 instruments_cache: AHashMap<Ustr, InstrumentAny>,
224 fee_cache: AHashMap<Ustr, Money>, filled_qty_cache: AHashMap<Ustr, Quantity>, order_state_cache: AHashMap<ClientOrderId, OrderStateSnapshot>,
227 funding_rate_cache: AHashMap<Ustr, (Ustr, u64)>, last_account_state: Option<AccountState>,
229 request_id_counter: AtomicU64,
230}
231
232impl OKXWsFeedHandler {
233 #[allow(clippy::too_many_arguments)]
235 pub fn new(
236 account_id: AccountId,
237 signal: Arc<AtomicBool>,
238 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
239 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
240 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
241 active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
242 client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
243 auth_tracker: AuthTracker,
244 subscriptions_state: SubscriptionState,
245 ) -> Self {
246 Self {
247 clock: get_atomic_clock_realtime(),
248 account_id,
249 signal,
250 inner: None,
251 cmd_rx,
252 raw_rx,
253 out_tx,
254 auth_tracker,
255 subscriptions_state,
256 retry_manager: create_websocket_retry_manager(),
257 pending_place_requests: AHashMap::new(),
258 pending_cancel_requests: AHashMap::new(),
259 pending_amend_requests: AHashMap::new(),
260 pending_mass_cancel_requests: AHashMap::new(),
261 pending_messages: VecDeque::new(),
262 active_client_orders,
263 client_id_aliases,
264 emitted_order_accepted: AHashMap::new(),
265 instruments_cache: AHashMap::new(),
266 fee_cache: AHashMap::new(),
267 filled_qty_cache: AHashMap::new(),
268 order_state_cache: AHashMap::new(),
269 funding_rate_cache: AHashMap::new(),
270 last_account_state: None,
271 request_id_counter: AtomicU64::new(0),
272 }
273 }
274
275 pub(super) fn is_stopped(&self) -> bool {
276 self.signal.load(std::sync::atomic::Ordering::Acquire)
277 }
278
279 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
280 self.out_tx.send(msg).map_err(|_| ())
281 }
282
283 async fn send_with_retry(
284 &self,
285 payload: String,
286 rate_limit_keys: Option<Vec<String>>,
287 ) -> Result<(), OKXWsError> {
288 if let Some(client) = &self.inner {
289 self.retry_manager
290 .execute_with_retry(
291 "websocket_send",
292 || {
293 let payload = payload.clone();
294 let keys = rate_limit_keys.clone();
295 async move {
296 client
297 .send_text(payload, keys)
298 .await
299 .map_err(|e| OKXWsError::ClientError(format!("Send failed: {e}")))
300 }
301 },
302 should_retry_okx_error,
303 create_okx_timeout_error,
304 )
305 .await
306 } else {
307 Err(OKXWsError::ClientError(
308 "No active WebSocket client".to_string(),
309 ))
310 }
311 }
312
313 pub(super) async fn send_pong(&self) -> anyhow::Result<()> {
314 match self.send_with_retry(TEXT_PONG.to_string(), None).await {
315 Ok(()) => {
316 log::trace!("Sent pong response to OKX text ping");
317 Ok(())
318 }
319 Err(e) => {
320 log::warn!("Failed to send pong after retries: error={e}");
321 Err(anyhow::anyhow!("Failed to send pong: {e}"))
322 }
323 }
324 }
325
326 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
327 if let Some(message) = self.pending_messages.pop_front() {
328 return Some(message);
329 }
330
331 loop {
332 tokio::select! {
333 Some(cmd) = self.cmd_rx.recv() => {
334 match cmd {
335 HandlerCommand::SetClient(client) => {
336 log::debug!("Handler received WebSocket client");
337 self.inner = Some(client);
338 }
339 HandlerCommand::Disconnect => {
340 log::debug!("Handler disconnecting WebSocket client");
341 self.inner = None;
342 return None;
343 }
344 HandlerCommand::Authenticate { payload } => {
345 if let Err(e) = self.send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()])).await {
346 log::error!("Failed to send authentication message after retries: error={e}");
347 }
348 }
349 HandlerCommand::InitializeInstruments(instruments) => {
350 for inst in instruments {
351 self.instruments_cache.insert(inst.symbol().inner(), inst);
352 }
353 }
354 HandlerCommand::UpdateInstrument(inst) => {
355 self.instruments_cache.insert(inst.symbol().inner(), inst);
356 }
357 HandlerCommand::Subscribe { args } => {
358 if let Err(e) = self.handle_subscribe(args).await {
359 log::error!("Failed to handle subscribe command: error={e}");
360 }
361 }
362 HandlerCommand::Unsubscribe { args } => {
363 if let Err(e) = self.handle_unsubscribe(args).await {
364 log::error!("Failed to handle unsubscribe command: error={e}");
365 }
366 }
367 HandlerCommand::CancelOrder {
368 client_order_id,
369 venue_order_id,
370 instrument_id,
371 trader_id,
372 strategy_id,
373 } => {
374 if let Err(e) = self
375 .handle_cancel_order(
376 client_order_id,
377 venue_order_id,
378 instrument_id,
379 trader_id,
380 strategy_id,
381 )
382 .await
383 {
384 log::error!("Failed to handle cancel order command: error={e}");
385 }
386 }
387 HandlerCommand::CancelAlgoOrder {
388 client_order_id,
389 algo_order_id,
390 instrument_id,
391 trader_id,
392 strategy_id,
393 } => {
394 if let Err(e) = self
395 .handle_cancel_algo_order(
396 client_order_id,
397 algo_order_id,
398 instrument_id,
399 trader_id,
400 strategy_id,
401 )
402 .await
403 {
404 log::error!("Failed to handle cancel algo order command: error={e}");
405 }
406 }
407 HandlerCommand::PlaceOrder {
408 params,
409 client_order_id,
410 trader_id,
411 strategy_id,
412 instrument_id,
413 } => {
414 if let Err(e) = self
415 .handle_place_order(
416 params,
417 client_order_id,
418 trader_id,
419 strategy_id,
420 instrument_id,
421 )
422 .await
423 {
424 log::error!("Failed to handle place order command: error={e}");
425 }
426 }
427 HandlerCommand::PlaceAlgoOrder {
428 params,
429 client_order_id,
430 trader_id,
431 strategy_id,
432 instrument_id,
433 } => {
434 if let Err(e) = self
435 .handle_place_algo_order(
436 params,
437 client_order_id,
438 trader_id,
439 strategy_id,
440 instrument_id,
441 )
442 .await
443 {
444 log::error!("Failed to handle place algo order command: error={e}");
445 }
446 }
447 HandlerCommand::AmendOrder {
448 params,
449 client_order_id,
450 trader_id,
451 strategy_id,
452 instrument_id,
453 venue_order_id,
454 } => {
455 if let Err(e) = self
456 .handle_amend_order(
457 params,
458 client_order_id,
459 trader_id,
460 strategy_id,
461 instrument_id,
462 venue_order_id,
463 )
464 .await
465 {
466 log::error!("Failed to handle amend order command: error={e}");
467 }
468 }
469 HandlerCommand::MassCancel { instrument_id } => {
470 if let Err(e) = self.handle_mass_cancel(instrument_id).await {
471 log::error!("Failed to handle mass cancel command: error={e}");
472 }
473 }
474 HandlerCommand::BatchCancelOrders { args, request_id } => {
475 if let Err(e) = self.handle_batch_cancel_orders(args, request_id).await {
476 log::error!("Failed to handle batch cancel orders command: error={e}");
477 }
478 }
479 HandlerCommand::BatchPlaceOrders { args, request_id } => {
480 if let Err(e) = self.handle_batch_place_orders(args, request_id).await {
481 log::error!("Failed to handle batch place orders command: error={e}");
482 }
483 }
484 HandlerCommand::BatchAmendOrders { args, request_id } => {
485 if let Err(e) = self.handle_batch_amend_orders(args, request_id).await {
486 log::error!("Failed to handle batch amend orders command: error={e}");
487 }
488 }
489 }
490 continue;
492 }
493
494 () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
495 if self.signal.load(std::sync::atomic::Ordering::Acquire) {
496 log::debug!("Stop signal received during idle period");
497 return None;
498 }
499 continue;
500 }
501
502 msg = self.raw_rx.recv() => {
503 let event = match msg {
504 Some(msg) => match Self::parse_raw_message(msg) {
505 Some(event) => event,
506 None => continue,
507 },
508 None => {
509 log::debug!("WebSocket stream closed");
510 return None;
511 }
512 };
513
514 let ts_init = self.clock.get_time_ns();
515
516 match event {
517 OKXWsMessage::Ping => {
518 if let Err(e) = self.send_pong().await {
519 log::warn!("Failed to send pong response: error={e}");
520 }
521 continue;
522 }
523 OKXWsMessage::Login {
524 code, msg, conn_id, ..
525 } => {
526 if code == "0" {
527 self.auth_tracker.succeed();
528
529 return Some(NautilusWsMessage::Authenticated);
533 }
534
535 log::error!("WebSocket authentication failed: error={msg}");
536 self.auth_tracker.fail(msg.clone());
537
538 let error = OKXWebSocketError {
539 code,
540 message: msg,
541 conn_id: Some(conn_id),
542 timestamp: self.clock.get_time_ns().as_u64(),
543 };
544 self.pending_messages
545 .push_back(NautilusWsMessage::Error(error));
546 continue;
547 }
548 OKXWsMessage::BookData { arg, action, data } => {
549 if let Some(msg) = self.handle_book_data(arg, action, data, ts_init) {
550 return Some(msg);
551 }
552 continue;
553 }
554 OKXWsMessage::OrderResponse {
555 id,
556 op,
557 code,
558 msg,
559 data,
560 } => {
561 if let Some(msg) = self.handle_order_response(id, op, code, msg, data, ts_init) {
562 return Some(msg);
563 }
564 continue;
565 }
566 OKXWsMessage::Data { arg, data } => {
567 let OKXWebSocketArg {
568 channel, inst_id, ..
569 } = arg;
570
571 match channel {
572 OKXWsChannel::Account => {
573 if let Some(msg) = self.handle_account_data(data, ts_init) {
574 return Some(msg);
575 }
576 continue;
577 }
578 OKXWsChannel::Positions => {
579 self.handle_positions_data(data, ts_init);
580 continue;
581 }
582 OKXWsChannel::Orders => {
583 if let Some(msg) = self.handle_orders_data(data, ts_init) {
584 return Some(msg);
585 }
586 continue;
587 }
588 OKXWsChannel::OrdersAlgo => {
589 if let Some(msg) = self.handle_algo_orders_data(data, ts_init) {
590 return Some(msg);
591 }
592 continue;
593 }
594 _ => {
595 if let Some(msg) =
596 self.handle_other_channel_data(channel, inst_id, data, ts_init)
597 {
598 return Some(msg);
599 }
600 continue;
601 }
602 }
603 }
604 OKXWsMessage::Error { code, msg } => {
605 let error = OKXWebSocketError {
606 code,
607 message: msg,
608 conn_id: None,
609 timestamp: self.clock.get_time_ns().as_u64(),
610 };
611 return Some(NautilusWsMessage::Error(error));
612 }
613 OKXWsMessage::Reconnected => {
614 return Some(NautilusWsMessage::Reconnected);
615 }
616 OKXWsMessage::Subscription {
617 event,
618 arg,
619 code,
620 msg,
621 ..
622 } => {
623 let topic = topic_from_websocket_arg(&arg);
624 let success = code.as_deref().is_none_or(|c| c == "0");
625
626 match event {
627 OKXSubscriptionEvent::Subscribe => {
628 if success {
629 self.subscriptions_state.confirm_subscribe(&topic);
630 } else {
631 log::warn!("Subscription failed: topic={topic:?}, error={msg:?}, code={code:?}");
632 self.subscriptions_state.mark_failure(&topic);
633 }
634 }
635 OKXSubscriptionEvent::Unsubscribe => {
636 if success {
637 self.subscriptions_state.confirm_unsubscribe(&topic);
638 } else {
639 log::warn!("Unsubscription failed - restoring subscription: topic={topic:?}, error={msg:?}, code={code:?}");
640 self.subscriptions_state.confirm_unsubscribe(&topic); self.subscriptions_state.mark_subscribe(&topic); self.subscriptions_state.confirm_subscribe(&topic); }
645 }
646 }
647
648 continue;
649 }
650 OKXWsMessage::ChannelConnCount { .. } => continue,
651 }
652 }
653
654 else => {
656 log::debug!("Handler shutting down: stream ended or command channel closed");
657 return None;
658 }
659 }
660 }
661 }
662
663 pub(super) fn is_post_only_auto_cancel(msg: &OKXOrderMsg) -> bool {
664 if msg.state != OKXOrderStatus::Canceled {
665 return false;
666 }
667
668 let cancel_source_matches = matches!(
669 msg.cancel_source.as_deref(),
670 Some(source) if source == OKX_POST_ONLY_CANCEL_SOURCE
671 );
672
673 let reason_matches = matches!(
674 msg.cancel_source_reason.as_deref(),
675 Some(reason) if reason.contains("POST_ONLY")
676 );
677
678 if !(cancel_source_matches || reason_matches) {
679 return false;
680 }
681
682 msg.acc_fill_sz
683 .as_ref()
684 .is_none_or(|filled| filled == "0" || filled.is_empty())
685 }
686
687 fn try_handle_post_only_auto_cancel(
688 &mut self,
689 msg: &OKXOrderMsg,
690 ts_init: UnixNanos,
691 exec_reports: &mut Vec<ExecutionReport>,
692 ) -> bool {
693 if !Self::is_post_only_auto_cancel(msg) {
694 return false;
695 }
696
697 let Some(client_order_id) = parse_client_order_id(&msg.cl_ord_id) else {
698 return false;
699 };
700
701 let Some((_, (trader_id, strategy_id, instrument_id))) =
702 self.active_client_orders.remove(&client_order_id)
703 else {
704 return false;
705 };
706
707 self.client_id_aliases.remove(&client_order_id);
708
709 if !exec_reports.is_empty() {
710 let reports = std::mem::take(exec_reports);
711 self.pending_messages
712 .push_back(NautilusWsMessage::ExecutionReports(reports));
713 }
714
715 let reason = msg
716 .cancel_source_reason
717 .as_ref()
718 .filter(|reason| !reason.is_empty())
719 .map_or_else(
720 || Ustr::from(OKX_POST_ONLY_CANCEL_REASON),
721 |reason| Ustr::from(reason.as_str()),
722 );
723
724 let ts_event = parse_millisecond_timestamp(msg.u_time);
725 let rejected = OrderRejected::new(
726 trader_id,
727 strategy_id,
728 instrument_id,
729 client_order_id,
730 self.account_id,
731 reason,
732 UUID4::new(),
733 ts_event,
734 ts_init,
735 false,
736 true,
737 );
738
739 self.pending_messages
740 .push_back(NautilusWsMessage::OrderRejected(rejected));
741
742 true
743 }
744
745 fn register_client_order_aliases(
746 &self,
747 raw_child: &Option<ClientOrderId>,
748 parent_from_msg: &Option<ClientOrderId>,
749 ) -> Option<ClientOrderId> {
750 if let Some(parent) = parent_from_msg {
751 self.client_id_aliases.insert(*parent, *parent);
752 if let Some(child) = raw_child.as_ref().filter(|child| **child != *parent) {
753 self.client_id_aliases.insert(*child, *parent);
754 }
755 Some(*parent)
756 } else if let Some(child) = raw_child.as_ref() {
757 if let Some(mapped) = self.client_id_aliases.get(child) {
758 Some(*mapped.value())
759 } else {
760 self.client_id_aliases.insert(*child, *child);
761 Some(*child)
762 }
763 } else {
764 None
765 }
766 }
767
768 fn adjust_execution_report(
769 &self,
770 report: ExecutionReport,
771 effective_client_id: &Option<ClientOrderId>,
772 raw_child: &Option<ClientOrderId>,
773 ) -> ExecutionReport {
774 match report {
775 ExecutionReport::Order(status_report) => {
776 let mut adjusted = status_report;
777 let mut final_id = *effective_client_id;
778
779 if final_id.is_none() {
780 final_id = adjusted.client_order_id;
781 }
782
783 if final_id.is_none()
784 && let Some(child) = raw_child.as_ref()
785 && let Some(mapped) = self.client_id_aliases.get(child)
786 {
787 final_id = Some(*mapped.value());
788 }
789
790 if let Some(final_id_value) = final_id {
791 if adjusted.client_order_id != Some(final_id_value) {
792 adjusted = adjusted.with_client_order_id(final_id_value);
793 }
794 self.client_id_aliases
795 .insert(final_id_value, final_id_value);
796
797 if let Some(child) =
798 raw_child.as_ref().filter(|child| **child != final_id_value)
799 {
800 adjusted = adjusted.with_linked_order_ids(vec![*child]);
801 }
802 }
803
804 ExecutionReport::Order(adjusted)
805 }
806 ExecutionReport::Fill(mut fill_report) => {
807 let mut final_id = *effective_client_id;
808 if final_id.is_none() {
809 final_id = fill_report.client_order_id;
810 }
811 if final_id.is_none()
812 && let Some(child) = raw_child.as_ref()
813 && let Some(mapped) = self.client_id_aliases.get(child)
814 {
815 final_id = Some(*mapped.value());
816 }
817
818 if let Some(final_id_value) = final_id {
819 fill_report.client_order_id = Some(final_id_value);
820 self.client_id_aliases
821 .insert(final_id_value, final_id_value);
822 }
823
824 ExecutionReport::Fill(fill_report)
825 }
826 }
827 }
828
829 fn update_caches_with_report(&mut self, report: &ExecutionReport) {
830 match report {
831 ExecutionReport::Fill(fill_report) => {
832 let order_id = fill_report.venue_order_id.inner();
833 let current_fee = self
834 .fee_cache
835 .get(&order_id)
836 .copied()
837 .unwrap_or_else(|| Money::new(0.0, fill_report.commission.currency));
838 let total_fee = current_fee + fill_report.commission;
839 self.fee_cache.insert(order_id, total_fee);
840
841 let current_filled_qty = self
842 .filled_qty_cache
843 .get(&order_id)
844 .copied()
845 .unwrap_or_else(|| Quantity::zero(fill_report.last_qty.precision));
846 let total_filled_qty = current_filled_qty + fill_report.last_qty;
847 self.filled_qty_cache.insert(order_id, total_filled_qty);
848 }
849 ExecutionReport::Order(status_report) => {
850 if matches!(status_report.order_status, OrderStatus::Filled) {
851 self.fee_cache.remove(&status_report.venue_order_id.inner());
852 self.filled_qty_cache
853 .remove(&status_report.venue_order_id.inner());
854 }
855
856 if matches!(
857 status_report.order_status,
858 OrderStatus::Canceled
859 | OrderStatus::Expired
860 | OrderStatus::Filled
861 | OrderStatus::Rejected,
862 ) {
863 self.emitted_order_accepted
864 .remove(&status_report.venue_order_id);
865 if let Some(client_order_id) = status_report.client_order_id {
866 self.order_state_cache.remove(&client_order_id);
867 self.active_client_orders.remove(&client_order_id);
868 self.client_id_aliases.remove(&client_order_id);
869 }
870 if let Some(linked) = &status_report.linked_order_ids {
871 for child in linked {
872 self.client_id_aliases.remove(child);
873 }
874 }
875 }
876 }
877 }
878 }
879
880 #[allow(clippy::too_many_lines)]
881 fn handle_order_response(
882 &mut self,
883 id: Option<String>,
884 op: OKXWsOperation,
885 code: String,
886 msg: String,
887 data: Vec<Value>,
888 ts_init: UnixNanos,
889 ) -> Option<NautilusWsMessage> {
890 if code == "0" {
891 log::debug!("Order operation successful: id={id:?} op={op} code={code}");
892
893 if op == OKXWsOperation::BatchCancelOrders {
894 log::debug!(
895 "Batch cancel operation successful: id={id:?} cancel_count={}",
896 data.len()
897 );
898
899 for (idx, entry) in data.iter().enumerate() {
901 if let Some(entry_code) = entry.get("sCode").and_then(|v| v.as_str())
902 && entry_code != "0"
903 {
904 let entry_msg = entry
905 .get("sMsg")
906 .and_then(|v| v.as_str())
907 .unwrap_or("Unknown error");
908
909 if let Some(cl_ord_id_str) = entry
910 .get("clOrdId")
911 .and_then(|v| v.as_str())
912 .filter(|s| !s.is_empty())
913 {
914 log::error!(
915 "Batch cancel partial failure for order {cl_ord_id_str}: sCode={entry_code} sMsg={entry_msg}"
916 );
917 } else {
919 log::error!(
920 "Batch cancel entry[{idx}] failed: sCode={entry_code} sMsg={entry_msg} data={entry:?}"
921 );
922 }
923 }
924 }
925
926 return None;
927 } else if op == OKXWsOperation::MassCancel
928 && let Some(request_id) = &id
929 && let Some(instrument_id) = self.pending_mass_cancel_requests.remove(request_id)
930 {
931 log::debug!("Mass cancel operation successful for instrument: {instrument_id}");
932 } else if op == OKXWsOperation::Order
933 && let Some(request_id) = &id
934 && let Some((params, client_order_id, trader_id, strategy_id, instrument_id)) =
935 self.pending_place_requests.remove(request_id)
936 {
937 let (venue_order_id, ts_accepted) = if let Some(first) = data.first() {
938 let ord_id = first
939 .get("ordId")
940 .and_then(|v| v.as_str())
941 .filter(|s| !s.is_empty())
942 .map(VenueOrderId::new);
943
944 let ts = first
945 .get("ts")
946 .and_then(|v| v.as_str())
947 .and_then(|s| s.parse::<u64>().ok())
948 .map_or_else(
949 || self.clock.get_time_ns(),
950 |ms| UnixNanos::from(ms * 1_000_000),
951 );
952
953 (ord_id, ts)
954 } else {
955 (None, self.clock.get_time_ns())
956 };
957
958 if let Some(instrument) = self.instruments_cache.get(&instrument_id.symbol.inner())
959 {
960 match params {
961 PendingOrderParams::Regular(order_params) => {
962 let order_type = determine_order_type(
963 order_params.ord_type,
964 order_params.px.as_deref().unwrap_or(""),
965 );
966
967 let is_explicit_quote_sized = order_params
968 .tgt_ccy
969 .is_some_and(|tgt| tgt == OKXTargetCurrency::QuoteCcy);
970
971 let is_implicit_quote_sized = order_params.tgt_ccy.is_none()
973 && order_params.side == OKXSide::Buy
974 && order_type == OrderType::Market
975 && order_params.td_mode == OKXTradeMode::Cash
976 && instrument.instrument_class().as_ref() == "SPOT";
977
978 if is_explicit_quote_sized || is_implicit_quote_sized {
979 log::debug!(
984 "Skipping synthetic OrderAccepted for {} quote-sized order: client_order_id={client_order_id}, venue_order_id={venue_order_id:?}",
985 if is_explicit_quote_sized {
986 "explicit"
987 } else {
988 "implicit"
989 },
990 );
991 return None;
992 }
993
994 let Some(v_order_id) = venue_order_id else {
995 log::error!(
996 "No venue_order_id for accepted order: client_order_id={client_order_id}"
997 );
998 return None;
999 };
1000
1001 if self.emitted_order_accepted.contains_key(&v_order_id) {
1003 log::debug!(
1004 "Skipping duplicate OrderAccepted from operation response for venue_order_id={v_order_id}"
1005 );
1006 return None;
1007 }
1008 self.emitted_order_accepted.insert(v_order_id, ());
1009
1010 let accepted = OrderAccepted::new(
1011 trader_id,
1012 strategy_id,
1013 instrument_id,
1014 client_order_id,
1015 v_order_id,
1016 self.account_id,
1017 UUID4::new(),
1018 ts_accepted,
1019 ts_init,
1020 false, );
1022
1023 log::debug!(
1024 "Order accepted: client_order_id={client_order_id}, venue_order_id={v_order_id}"
1025 );
1026
1027 return Some(NautilusWsMessage::OrderAccepted(accepted));
1028 }
1029 PendingOrderParams::Algo(_) => {
1030 log::debug!(
1031 "Algo order placement confirmed: client_order_id={client_order_id}, venue_order_id={venue_order_id:?}"
1032 );
1033 }
1034 }
1035 } else {
1036 log::error!("Instrument not found for accepted order: {instrument_id}");
1037 }
1038 }
1039
1040 if let Some(first) = data.first()
1041 && let Some(success_msg) = first.get("sMsg").and_then(|value| value.as_str())
1042 {
1043 log::debug!("Order details: {success_msg}");
1044 }
1045
1046 return None;
1047 }
1048
1049 let error_msg = data
1050 .first()
1051 .and_then(|d| d.get("sMsg"))
1052 .and_then(|s| s.as_str())
1053 .unwrap_or(&msg)
1054 .to_string();
1055
1056 if let Some(first) = data.first() {
1057 log::debug!(
1058 "Error data fields: {}",
1059 serde_json::to_string_pretty(first)
1060 .unwrap_or_else(|_| "unable to serialize".to_string())
1061 );
1062 }
1063
1064 log::warn!("Order operation failed: id={id:?} op={op} code={code} msg={error_msg}");
1065
1066 let ts_event = self.clock.get_time_ns();
1067
1068 if let Some(request_id) = &id {
1069 match op {
1070 OKXWsOperation::Order => {
1071 if let Some((_params, client_order_id, trader_id, strategy_id, instrument_id)) =
1072 self.pending_place_requests.remove(request_id)
1073 {
1074 let due_post_only = is_post_only_rejection(code.as_str(), &data);
1075 let rejected = OrderRejected::new(
1076 trader_id,
1077 strategy_id,
1078 instrument_id,
1079 client_order_id,
1080 self.account_id,
1081 Ustr::from(error_msg.as_str()),
1082 UUID4::new(),
1083 ts_event,
1084 ts_init,
1085 false, due_post_only,
1087 );
1088
1089 return Some(NautilusWsMessage::OrderRejected(rejected));
1090 }
1091 }
1092 OKXWsOperation::CancelOrder => {
1093 if let Some((
1094 client_order_id,
1095 trader_id,
1096 strategy_id,
1097 instrument_id,
1098 venue_order_id,
1099 )) = self.pending_cancel_requests.remove(request_id)
1100 {
1101 let rejected = OrderCancelRejected::new(
1102 trader_id,
1103 strategy_id,
1104 instrument_id,
1105 client_order_id,
1106 Ustr::from(error_msg.as_str()),
1107 UUID4::new(),
1108 ts_event,
1109 ts_init,
1110 false, venue_order_id,
1112 Some(self.account_id),
1113 );
1114
1115 return Some(NautilusWsMessage::OrderCancelRejected(rejected));
1116 }
1117 }
1118 OKXWsOperation::AmendOrder => {
1119 if let Some((
1120 client_order_id,
1121 trader_id,
1122 strategy_id,
1123 instrument_id,
1124 venue_order_id,
1125 )) = self.pending_amend_requests.remove(request_id)
1126 {
1127 let rejected = OrderModifyRejected::new(
1128 trader_id,
1129 strategy_id,
1130 instrument_id,
1131 client_order_id,
1132 Ustr::from(error_msg.as_str()),
1133 UUID4::new(),
1134 ts_event,
1135 ts_init,
1136 false, venue_order_id,
1138 Some(self.account_id),
1139 );
1140
1141 return Some(NautilusWsMessage::OrderModifyRejected(rejected));
1142 }
1143 }
1144 OKXWsOperation::OrderAlgo => {
1145 if let Some((_params, client_order_id, trader_id, strategy_id, instrument_id)) =
1146 self.pending_place_requests.remove(request_id)
1147 {
1148 let due_post_only = is_post_only_rejection(code.as_str(), &data);
1149 let rejected = OrderRejected::new(
1150 trader_id,
1151 strategy_id,
1152 instrument_id,
1153 client_order_id,
1154 self.account_id,
1155 Ustr::from(error_msg.as_str()),
1156 UUID4::new(),
1157 ts_event,
1158 ts_init,
1159 false, due_post_only,
1161 );
1162
1163 return Some(NautilusWsMessage::OrderRejected(rejected));
1164 }
1165 }
1166 OKXWsOperation::CancelAlgos => {
1167 if let Some((
1168 client_order_id,
1169 trader_id,
1170 strategy_id,
1171 instrument_id,
1172 venue_order_id,
1173 )) = self.pending_cancel_requests.remove(request_id)
1174 {
1175 let rejected = OrderCancelRejected::new(
1176 trader_id,
1177 strategy_id,
1178 instrument_id,
1179 client_order_id,
1180 Ustr::from(error_msg.as_str()),
1181 UUID4::new(),
1182 ts_event,
1183 ts_init,
1184 false, venue_order_id,
1186 Some(self.account_id),
1187 );
1188
1189 return Some(NautilusWsMessage::OrderCancelRejected(rejected));
1190 }
1191 }
1192 OKXWsOperation::MassCancel => {
1193 if let Some(instrument_id) =
1194 self.pending_mass_cancel_requests.remove(request_id)
1195 {
1196 log::error!(
1197 "Mass cancel operation failed for {instrument_id}: code={code} msg={error_msg}"
1198 );
1199 let error = OKXWebSocketError {
1200 code,
1201 message: format!("Mass cancel failed for {instrument_id}: {error_msg}"),
1202 conn_id: None,
1203 timestamp: ts_event.as_u64(),
1204 };
1205 return Some(NautilusWsMessage::Error(error));
1206 } else {
1207 log::error!("Mass cancel operation failed: code={code} msg={error_msg}");
1208 }
1209 }
1210 OKXWsOperation::BatchCancelOrders => {
1211 log::warn!(
1212 "Batch cancel operation failed: id={id:?} code={code} msg={error_msg} data_count={}",
1213 data.len()
1214 );
1215
1216 for (idx, entry) in data.iter().enumerate() {
1218 let entry_code =
1219 entry.get("sCode").and_then(|v| v.as_str()).unwrap_or(&code);
1220 let entry_msg = entry
1221 .get("sMsg")
1222 .and_then(|v| v.as_str())
1223 .unwrap_or(&error_msg);
1224
1225 if entry_code != "0" {
1226 if let Some(cl_ord_id_str) = entry
1228 .get("clOrdId")
1229 .and_then(|v| v.as_str())
1230 .filter(|s| !s.is_empty())
1231 {
1232 log::error!(
1233 "Batch cancel failed for order {cl_ord_id_str}: sCode={entry_code} sMsg={entry_msg}"
1234 );
1235 } else {
1238 log::error!(
1239 "Batch cancel entry[{idx}] failed: sCode={entry_code} sMsg={entry_msg} data={entry:?}"
1240 );
1241 }
1242 }
1243 }
1244
1245 let error = OKXWebSocketError {
1247 code,
1248 message: format!("Batch cancel failed: {error_msg}"),
1249 conn_id: None,
1250 timestamp: ts_event.as_u64(),
1251 };
1252 return Some(NautilusWsMessage::Error(error));
1253 }
1254 _ => log::warn!("Unhandled operation type for rejection: {op}"),
1255 }
1256 }
1257
1258 let error = OKXWebSocketError {
1259 code,
1260 message: error_msg,
1261 conn_id: None,
1262 timestamp: ts_event.as_u64(),
1263 };
1264 Some(NautilusWsMessage::Error(error))
1265 }
1266
1267 fn handle_book_data(
1268 &self,
1269 arg: OKXWebSocketArg,
1270 action: OKXBookAction,
1271 data: Vec<OKXBookMsg>,
1272 ts_init: UnixNanos,
1273 ) -> Option<NautilusWsMessage> {
1274 let Some(inst_id) = arg.inst_id else {
1275 log::error!("Instrument ID missing for book data event");
1276 return None;
1277 };
1278
1279 let inst = self.instruments_cache.get(&inst_id)?;
1280
1281 let instrument_id = inst.id();
1282 let price_precision = inst.price_precision();
1283 let size_precision = inst.size_precision();
1284
1285 match parse_book_msg_vec(
1286 data,
1287 &instrument_id,
1288 price_precision,
1289 size_precision,
1290 action,
1291 ts_init,
1292 ) {
1293 Ok(payloads) => Some(NautilusWsMessage::Data(payloads)),
1294 Err(e) => {
1295 log::error!("Failed to parse book message: {e}");
1296 None
1297 }
1298 }
1299 }
1300
1301 fn handle_account_data(
1302 &mut self,
1303 data: Value,
1304 ts_init: UnixNanos,
1305 ) -> Option<NautilusWsMessage> {
1306 let Value::Array(arr) = data else {
1307 log::error!("Account data is not an array");
1308 return None;
1309 };
1310
1311 let first = arr.into_iter().next()?;
1312
1313 let account: OKXAccount = match serde_json::from_value(first) {
1314 Ok(acc) => acc,
1315 Err(e) => {
1316 log::error!("Failed to parse account data: {e}");
1317 return None;
1318 }
1319 };
1320
1321 match parse_account_state(&account, self.account_id, ts_init) {
1322 Ok(account_state) => {
1323 if let Some(last_account_state) = &self.last_account_state
1324 && account_state.has_same_balances_and_margins(last_account_state)
1325 {
1326 return None;
1327 }
1328 self.last_account_state = Some(account_state.clone());
1329 Some(NautilusWsMessage::AccountUpdate(account_state))
1330 }
1331 Err(e) => {
1332 log::error!("Failed to parse account state: {e}");
1333 None
1334 }
1335 }
1336 }
1337
1338 fn handle_positions_data(&mut self, data: Value, ts_init: UnixNanos) {
1339 match serde_json::from_value::<Vec<OKXPosition>>(data) {
1340 Ok(positions) => {
1341 log::debug!("Received {} position update(s)", positions.len());
1342
1343 for position in positions {
1344 let instrument = match self.instruments_cache.get(&position.inst_id) {
1345 Some(inst) => inst,
1346 None => {
1347 log::warn!(
1348 "Received position update for unknown instrument {}, skipping",
1349 position.inst_id
1350 );
1351 continue;
1352 }
1353 };
1354
1355 let instrument_id = instrument.id();
1356 let size_precision = instrument.size_precision();
1357
1358 match parse_position_status_report(
1359 position,
1360 self.account_id,
1361 instrument_id,
1362 size_precision,
1363 ts_init,
1364 ) {
1365 Ok(position_report) => {
1366 self.pending_messages
1367 .push_back(NautilusWsMessage::PositionUpdate(position_report));
1368 }
1369 Err(e) => {
1370 log::error!(
1371 "Failed to parse position status report for {instrument_id}: {e}"
1372 );
1373 }
1374 }
1375 }
1376 }
1377 Err(e) => {
1378 log::error!("Failed to parse positions data: {e}");
1379 }
1380 }
1381 }
1382
1383 fn handle_orders_data(&mut self, data: Value, ts_init: UnixNanos) -> Option<NautilusWsMessage> {
1384 let orders: Vec<OKXOrderMsg> = match serde_json::from_value(data) {
1385 Ok(orders) => orders,
1386 Err(e) => {
1387 log::error!("Failed to deserialize orders channel payload: {e}");
1388 return None;
1389 }
1390 };
1391
1392 log::debug!(
1393 "Received {} order message(s) from orders channel",
1394 orders.len()
1395 );
1396
1397 let mut exec_reports: Vec<ExecutionReport> = Vec::with_capacity(orders.len());
1398
1399 for msg in orders {
1400 log::debug!(
1401 "Processing order message: inst_id={}, cl_ord_id={}, state={:?}, exec_type={:?}",
1402 msg.inst_id,
1403 msg.cl_ord_id,
1404 msg.state,
1405 msg.exec_type
1406 );
1407
1408 if self.try_handle_post_only_auto_cancel(&msg, ts_init, &mut exec_reports) {
1409 continue;
1410 }
1411
1412 let raw_child = parse_client_order_id(&msg.cl_ord_id);
1413 let parent_from_msg = msg
1414 .algo_cl_ord_id
1415 .as_ref()
1416 .filter(|value| !value.is_empty())
1417 .map(ClientOrderId::new);
1418 let effective_client_id =
1419 self.register_client_order_aliases(&raw_child, &parent_from_msg);
1420
1421 let Some(instrument) = self.instruments_cache.get(&msg.inst_id) else {
1422 log::error!(
1423 "No instrument found for inst_id: {inst_id}",
1424 inst_id = msg.inst_id
1425 );
1426 continue;
1427 };
1428 let price_precision = instrument.price_precision();
1429 let size_precision = instrument.size_precision();
1430
1431 let order_metadata = effective_client_id
1432 .and_then(|cid| self.active_client_orders.get(&cid).map(|e| *e.value()));
1433
1434 let previous_fee = self.fee_cache.get(&msg.ord_id).copied();
1435 let previous_filled_qty = self.filled_qty_cache.get(&msg.ord_id).copied();
1436 let previous_state =
1437 effective_client_id.and_then(|cid| self.order_state_cache.get(&cid).cloned());
1438
1439 if let (Some((trader_id, strategy_id, _instrument_id)), Some(canonical_client_id)) =
1441 (order_metadata, effective_client_id)
1442 {
1443 match parse_order_event(
1444 &msg,
1445 canonical_client_id,
1446 self.account_id,
1447 trader_id,
1448 strategy_id,
1449 instrument,
1450 previous_fee,
1451 previous_filled_qty,
1452 previous_state.as_ref(),
1453 ts_init,
1454 ) {
1455 Ok(event) => {
1456 self.process_parsed_order_event(
1457 event,
1458 &msg,
1459 price_precision,
1460 size_precision,
1461 canonical_client_id,
1462 &raw_child,
1463 &mut exec_reports,
1464 );
1465 }
1466 Err(e) => log::error!("Failed to parse order event: {e}"),
1467 }
1468 } else {
1469 match parse_order_msg(
1471 &msg,
1472 self.account_id,
1473 &self.instruments_cache,
1474 &self.fee_cache,
1475 &self.filled_qty_cache,
1476 ts_init,
1477 ) {
1478 Ok(report) => {
1479 log::debug!("Parsed external order as execution report: {report:?}");
1480 let adjusted =
1481 self.adjust_execution_report(report, &effective_client_id, &raw_child);
1482 self.update_caches_with_report(&adjusted);
1483 exec_reports.push(adjusted);
1484 }
1485 Err(e) => log::error!("Failed to parse order message: {e}"),
1486 }
1487 }
1488 }
1489
1490 if !exec_reports.is_empty() {
1491 log::debug!(
1492 "Pushing {count} execution report(s) to message queue",
1493 count = exec_reports.len()
1494 );
1495 self.pending_messages
1496 .push_back(NautilusWsMessage::ExecutionReports(exec_reports));
1497 }
1498
1499 self.pending_messages.pop_front()
1500 }
1501
1502 #[allow(clippy::too_many_arguments)]
1504 fn process_parsed_order_event(
1505 &mut self,
1506 event: ParsedOrderEvent,
1507 msg: &OKXOrderMsg,
1508 price_precision: u8,
1509 size_precision: u8,
1510 canonical_client_id: ClientOrderId,
1511 raw_child: &Option<ClientOrderId>,
1512 exec_reports: &mut Vec<ExecutionReport>,
1513 ) {
1514 let venue_order_id = VenueOrderId::new(msg.ord_id);
1515
1516 match event {
1517 ParsedOrderEvent::Accepted(accepted) => {
1518 if self.emitted_order_accepted.contains_key(&venue_order_id) {
1519 log::debug!(
1520 "Skipping duplicate OrderAccepted for venue_order_id={venue_order_id}"
1521 );
1522 return;
1523 }
1524 self.emitted_order_accepted.insert(venue_order_id, ());
1525 self.update_order_state_cache(
1526 &canonical_client_id,
1527 msg,
1528 price_precision,
1529 size_precision,
1530 );
1531
1532 self.pending_messages
1533 .push_back(NautilusWsMessage::OrderAccepted(accepted));
1534 }
1535 ParsedOrderEvent::Canceled(canceled) => {
1536 self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1537 self.pending_messages
1538 .push_back(NautilusWsMessage::OrderCanceled(canceled));
1539 }
1540 ParsedOrderEvent::Expired(expired) => {
1541 self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1542 self.pending_messages
1543 .push_back(NautilusWsMessage::OrderExpired(expired));
1544 }
1545 ParsedOrderEvent::Triggered(triggered) => {
1546 self.update_order_state_cache(
1547 &canonical_client_id,
1548 msg,
1549 price_precision,
1550 size_precision,
1551 );
1552 self.pending_messages
1553 .push_back(NautilusWsMessage::OrderTriggered(triggered));
1554 }
1555 ParsedOrderEvent::Updated(updated) => {
1556 self.update_order_state_cache(
1557 &canonical_client_id,
1558 msg,
1559 price_precision,
1560 size_precision,
1561 );
1562 self.pending_messages
1563 .push_back(NautilusWsMessage::OrderUpdated(updated));
1564 }
1565 ParsedOrderEvent::Fill(fill_report) => {
1566 let effective_client_id = Some(canonical_client_id);
1567 let adjusted = self.adjust_execution_report(
1568 ExecutionReport::Fill(fill_report),
1569 &effective_client_id,
1570 raw_child,
1571 );
1572 self.update_caches_with_report(&adjusted);
1573
1574 if msg.state == OKXOrderStatus::Filled {
1575 self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1576 }
1577
1578 exec_reports.push(adjusted);
1579 }
1580 ParsedOrderEvent::StatusOnly(status_report) => {
1581 let effective_client_id = Some(canonical_client_id);
1582 let adjusted = self.adjust_execution_report(
1583 ExecutionReport::Order(*status_report),
1584 &effective_client_id,
1585 raw_child,
1586 );
1587 self.update_caches_with_report(&adjusted);
1588 exec_reports.push(adjusted);
1589 }
1590 }
1591 }
1592
1593 fn update_order_state_cache(
1595 &mut self,
1596 client_order_id: &ClientOrderId,
1597 msg: &OKXOrderMsg,
1598 price_precision: u8,
1599 size_precision: u8,
1600 ) {
1601 let venue_order_id = VenueOrderId::new(msg.ord_id);
1602 let quantity = parse_quantity(&msg.sz, size_precision).ok();
1603 let price = if is_market_price(&msg.px) {
1604 None
1605 } else {
1606 parse_price(&msg.px, price_precision).ok()
1607 };
1608
1609 if let Some(qty) = quantity {
1610 self.order_state_cache.insert(
1611 *client_order_id,
1612 OrderStateSnapshot {
1613 venue_order_id,
1614 quantity: qty,
1615 price,
1616 },
1617 );
1618 }
1619 }
1620
1621 fn cleanup_terminal_order(
1623 &mut self,
1624 client_order_id: &ClientOrderId,
1625 venue_order_id: &VenueOrderId,
1626 ) {
1627 self.emitted_order_accepted.remove(venue_order_id);
1628 self.order_state_cache.remove(client_order_id);
1629 self.active_client_orders.remove(client_order_id);
1630 self.client_id_aliases.remove(client_order_id);
1631 self.client_id_aliases.retain(|_, v| *v != *client_order_id);
1632
1633 self.fee_cache.remove(&venue_order_id.inner());
1634 self.filled_qty_cache.remove(&venue_order_id.inner());
1635 }
1636
1637 fn handle_algo_orders_data(
1638 &mut self,
1639 data: Value,
1640 ts_init: UnixNanos,
1641 ) -> Option<NautilusWsMessage> {
1642 let orders: Vec<OKXAlgoOrderMsg> = match serde_json::from_value(data) {
1643 Ok(orders) => orders,
1644 Err(e) => {
1645 log::error!("Failed to deserialize algo orders payload: {e}");
1646 return None;
1647 }
1648 };
1649
1650 let mut exec_reports: Vec<ExecutionReport> = Vec::with_capacity(orders.len());
1651
1652 for msg in orders {
1653 let raw_child = parse_client_order_id(&msg.cl_ord_id);
1654 let parent_from_msg = parse_client_order_id(&msg.algo_cl_ord_id);
1655 let effective_client_id =
1656 self.register_client_order_aliases(&raw_child, &parent_from_msg);
1657
1658 match parse_algo_order_msg(msg, self.account_id, &self.instruments_cache, ts_init) {
1659 Ok(report) => {
1660 let adjusted =
1661 self.adjust_execution_report(report, &effective_client_id, &raw_child);
1662 self.update_caches_with_report(&adjusted);
1663 exec_reports.push(adjusted);
1664 }
1665 Err(e) => {
1666 log::error!("Failed to parse algo order message: {e}");
1667 }
1668 }
1669 }
1670
1671 if exec_reports.is_empty() {
1672 None
1673 } else {
1674 Some(NautilusWsMessage::ExecutionReports(exec_reports))
1675 }
1676 }
1677
1678 fn handle_other_channel_data(
1679 &mut self,
1680 channel: OKXWsChannel,
1681 inst_id: Option<Ustr>,
1682 data: Value,
1683 ts_init: UnixNanos,
1684 ) -> Option<NautilusWsMessage> {
1685 let Some(inst_id) = inst_id else {
1686 log::error!("No instrument for channel {channel:?}");
1687 return None;
1688 };
1689
1690 let Some(instrument) = self.instruments_cache.get(&inst_id) else {
1691 log::error!("No instrument for channel {channel:?}, inst_id {inst_id:?}");
1692 return None;
1693 };
1694
1695 let instrument_id = instrument.id();
1696 let price_precision = instrument.price_precision();
1697 let size_precision = instrument.size_precision();
1698
1699 match parse_ws_message_data(
1700 &channel,
1701 data,
1702 &instrument_id,
1703 price_precision,
1704 size_precision,
1705 ts_init,
1706 &mut self.funding_rate_cache,
1707 &self.instruments_cache,
1708 ) {
1709 Ok(Some(msg)) => {
1710 if let NautilusWsMessage::Instrument(ref inst) = msg {
1711 self.instruments_cache
1712 .insert(inst.symbol().inner(), inst.as_ref().clone());
1713 }
1714 Some(msg)
1715 }
1716 Ok(None) => None,
1717 Err(e) => {
1718 log::error!("Error parsing message for channel {channel:?}: {e}");
1719 None
1720 }
1721 }
1722 }
1723
1724 pub(crate) fn parse_raw_message(
1725 msg: tokio_tungstenite::tungstenite::Message,
1726 ) -> Option<OKXWsMessage> {
1727 match msg {
1728 tokio_tungstenite::tungstenite::Message::Text(text) => {
1729 if text == TEXT_PONG {
1730 log::trace!("Received pong from OKX");
1731 return None;
1732 }
1733 if text == TEXT_PING {
1734 log::trace!("Received ping from OKX (text)");
1735 return Some(OKXWsMessage::Ping);
1736 }
1737
1738 if text == RECONNECTED {
1739 log::debug!("Received WebSocket reconnection signal");
1740 return Some(OKXWsMessage::Reconnected);
1741 }
1742 log::trace!("Received WebSocket message: {text}");
1743
1744 match serde_json::from_str(&text) {
1745 Ok(ws_event) => match &ws_event {
1746 OKXWsMessage::Error { code, msg } => {
1747 log::error!("WebSocket error: {code} - {msg}");
1748 Some(ws_event)
1749 }
1750 OKXWsMessage::Login {
1751 event,
1752 code,
1753 msg,
1754 conn_id,
1755 } => {
1756 if code == "0" {
1757 log::info!("WebSocket authenticated: conn_id={conn_id}");
1758 } else {
1759 log::error!(
1760 "WebSocket authentication failed: event={event}, code={code}, error={msg}"
1761 );
1762 }
1763 Some(ws_event)
1764 }
1765 OKXWsMessage::Subscription {
1766 event,
1767 arg,
1768 conn_id,
1769 ..
1770 } => {
1771 let channel_str = serde_json::to_string(&arg.channel)
1772 .expect("Invalid OKX websocket channel")
1773 .trim_matches('"')
1774 .to_string();
1775 log::debug!("{event}d: channel={channel_str}, conn_id={conn_id}");
1776 Some(ws_event)
1777 }
1778 OKXWsMessage::ChannelConnCount {
1779 event: _,
1780 channel,
1781 conn_count,
1782 conn_id,
1783 } => {
1784 let channel_str = serde_json::to_string(&channel)
1785 .expect("Invalid OKX websocket channel")
1786 .trim_matches('"')
1787 .to_string();
1788 log::debug!(
1789 "Channel connection status: channel={channel_str}, connections={conn_count}, conn_id={conn_id}",
1790 );
1791 None
1792 }
1793 OKXWsMessage::Ping => {
1794 log::trace!("Ignoring ping event parsed from text payload");
1795 None
1796 }
1797 OKXWsMessage::Data { .. } => Some(ws_event),
1798 OKXWsMessage::BookData { .. } => Some(ws_event),
1799 OKXWsMessage::OrderResponse {
1800 id,
1801 op,
1802 code,
1803 msg: _,
1804 data,
1805 } => {
1806 if code == "0" {
1807 log::debug!(
1808 "Order operation successful: id={id:?}, op={op}, code={code}"
1809 );
1810
1811 if let Some(order_data) = data.first() {
1812 let success_msg = order_data
1813 .get("sMsg")
1814 .and_then(|s| s.as_str())
1815 .unwrap_or("Order operation successful");
1816 log::debug!("Order success details: {success_msg}");
1817 }
1818 }
1819 Some(ws_event)
1820 }
1821 OKXWsMessage::Reconnected => {
1822 log::warn!("Unexpected Reconnected event from deserialization");
1824 None
1825 }
1826 },
1827 Err(e) => {
1828 log::error!("Failed to parse message: {e}: {text}");
1829 None
1830 }
1831 }
1832 }
1833 Message::Ping(_payload) => {
1834 log::trace!("Received binary ping frame from OKX");
1835 Some(OKXWsMessage::Ping)
1836 }
1837 Message::Pong(payload) => {
1838 log::trace!("Received pong frame from OKX ({} bytes)", payload.len());
1839 None
1840 }
1841 Message::Binary(msg) => {
1842 log::debug!("Raw binary: {msg:?}");
1843 None
1844 }
1845 Message::Close(_) => {
1846 log::debug!("Received close message");
1847 None
1848 }
1849 msg => {
1850 log::warn!("Unexpected message: {msg}");
1851 None
1852 }
1853 }
1854 }
1855
1856 fn generate_unique_request_id(&self) -> String {
1857 self.request_id_counter
1858 .fetch_add(1, Ordering::SeqCst)
1859 .to_string()
1860 }
1861
1862 fn get_instrument_type_and_family_from_instrument(
1863 instrument: &InstrumentAny,
1864 ) -> anyhow::Result<(OKXInstrumentType, String)> {
1865 let inst_type = okx_instrument_type(instrument)?;
1866 let symbol = instrument.symbol().inner();
1867
1868 let inst_family = match instrument {
1870 InstrumentAny::CurrencyPair(_) => symbol.as_str().to_string(),
1871 InstrumentAny::CryptoPerpetual(_) => {
1872 symbol
1874 .as_str()
1875 .strip_suffix("-SWAP")
1876 .unwrap_or(symbol.as_str())
1877 .to_string()
1878 }
1879 InstrumentAny::CryptoFuture(_) => {
1880 let s = symbol.as_str();
1883 if let Some(idx) = s.rfind('-') {
1884 s[..idx].to_string()
1885 } else {
1886 s.to_string()
1887 }
1888 }
1889 _ => {
1890 anyhow::bail!("Unsupported instrument type for OKX");
1891 }
1892 };
1893
1894 Ok((inst_type, inst_family))
1895 }
1896
1897 async fn handle_mass_cancel(&mut self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1898 let instrument = self
1899 .instruments_cache
1900 .get(&instrument_id.symbol.inner())
1901 .ok_or_else(|| anyhow::anyhow!("Unknown instrument {instrument_id}"))?;
1902
1903 let (inst_type, inst_family) =
1904 Self::get_instrument_type_and_family_from_instrument(instrument)?;
1905
1906 let params = WsMassCancelParams {
1907 inst_type,
1908 inst_family: Ustr::from(&inst_family),
1909 };
1910
1911 let args =
1912 vec![serde_json::to_value(params).map_err(|e| anyhow::anyhow!("JSON error: {e}"))?];
1913
1914 let request_id = self.generate_unique_request_id();
1915
1916 self.pending_mass_cancel_requests
1917 .insert(request_id.clone(), instrument_id);
1918
1919 let request = OKXWsRequest {
1920 id: Some(request_id.clone()),
1921 op: OKXWsOperation::MassCancel,
1922 exp_time: None,
1923 args,
1924 };
1925
1926 let payload = serde_json::to_string(&request)
1927 .map_err(|e| anyhow::anyhow!("Failed to serialize mass cancel request: {e}"))?;
1928
1929 match self
1930 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
1931 .await
1932 {
1933 Ok(()) => {
1934 log::debug!("Sent mass cancel for {instrument_id}");
1935 Ok(())
1936 }
1937 Err(e) => {
1938 log::error!("Failed to send mass cancel after retries: error={e}");
1939
1940 self.pending_mass_cancel_requests.remove(&request_id);
1941
1942 let error = OKXWebSocketError {
1943 code: "CLIENT_ERROR".to_string(),
1944 message: format!("Mass cancel failed for {instrument_id}: {e}"),
1945 conn_id: None,
1946 timestamp: self.clock.get_time_ns().as_u64(),
1947 };
1948 let _ = self.send(NautilusWsMessage::Error(error));
1949
1950 Err(anyhow::anyhow!("Failed to send mass cancel: {e}"))
1951 }
1952 }
1953 }
1954
1955 async fn handle_batch_cancel_orders(
1956 &self,
1957 args: Vec<Value>,
1958 request_id: String,
1959 ) -> anyhow::Result<()> {
1960 let request = OKXWsRequest {
1961 id: Some(request_id),
1962 op: OKXWsOperation::BatchCancelOrders,
1963 exp_time: None,
1964 args,
1965 };
1966
1967 let payload = serde_json::to_string(&request)
1968 .map_err(|e| anyhow::anyhow!("Failed to serialize batch cancel request: {e}"))?;
1969
1970 if let Some(client) = &self.inner {
1971 client
1972 .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
1973 .await
1974 .map_err(|e| anyhow::anyhow!("Failed to send batch cancel: {e}"))?;
1975 log::debug!("Sent batch cancel orders");
1976 Ok(())
1977 } else {
1978 Err(anyhow::anyhow!("No active WebSocket client"))
1979 }
1980 }
1981
1982 async fn handle_batch_place_orders(
1983 &self,
1984 args: Vec<Value>,
1985 request_id: String,
1986 ) -> anyhow::Result<()> {
1987 let request = OKXWsRequest {
1988 id: Some(request_id),
1989 op: OKXWsOperation::BatchOrders,
1990 exp_time: None,
1991 args,
1992 };
1993
1994 let payload = serde_json::to_string(&request)
1995 .map_err(|e| anyhow::anyhow!("Failed to serialize batch place request: {e}"))?;
1996
1997 if let Some(client) = &self.inner {
1998 client
1999 .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2000 .await
2001 .map_err(|e| anyhow::anyhow!("Failed to send batch place: {e}"))?;
2002 log::debug!("Sent batch place orders");
2003 Ok(())
2004 } else {
2005 Err(anyhow::anyhow!("No active WebSocket client"))
2006 }
2007 }
2008
2009 async fn handle_batch_amend_orders(
2010 &self,
2011 args: Vec<Value>,
2012 request_id: String,
2013 ) -> anyhow::Result<()> {
2014 let request = OKXWsRequest {
2015 id: Some(request_id),
2016 op: OKXWsOperation::BatchAmendOrders,
2017 exp_time: None,
2018 args,
2019 };
2020
2021 let payload = serde_json::to_string(&request)
2022 .map_err(|e| anyhow::anyhow!("Failed to serialize batch amend request: {e}"))?;
2023
2024 if let Some(client) = &self.inner {
2025 client
2026 .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_AMEND.to_string()]))
2027 .await
2028 .map_err(|e| anyhow::anyhow!("Failed to send batch amend: {e}"))?;
2029 log::debug!("Sent batch amend orders");
2030 Ok(())
2031 } else {
2032 Err(anyhow::anyhow!("No active WebSocket client"))
2033 }
2034 }
2035
2036 async fn handle_subscribe(&self, args: Vec<OKXSubscriptionArg>) -> anyhow::Result<()> {
2037 for arg in &args {
2038 log::debug!(
2039 "Subscribing to channel: channel={:?}, inst_id={:?}",
2040 arg.channel,
2041 arg.inst_id
2042 );
2043 }
2044
2045 let message = OKXSubscription {
2046 op: OKXWsOperation::Subscribe,
2047 args,
2048 };
2049
2050 let json_txt = serde_json::to_string(&message)
2051 .map_err(|e| anyhow::anyhow!("Failed to serialize subscription: {e}"))?;
2052
2053 self.send_with_retry(
2054 json_txt,
2055 Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
2056 )
2057 .await
2058 .map_err(|e| anyhow::anyhow!("Failed to send subscription after retries: {e}"))?;
2059 Ok(())
2060 }
2061
2062 async fn handle_unsubscribe(&self, args: Vec<OKXSubscriptionArg>) -> anyhow::Result<()> {
2063 for arg in &args {
2064 log::debug!(
2065 "Unsubscribing from channel: channel={:?}, inst_id={:?}",
2066 arg.channel,
2067 arg.inst_id
2068 );
2069 }
2070
2071 let message = OKXSubscription {
2072 op: OKXWsOperation::Unsubscribe,
2073 args,
2074 };
2075
2076 let json_txt = serde_json::to_string(&message)
2077 .map_err(|e| anyhow::anyhow!("Failed to serialize unsubscription: {e}"))?;
2078
2079 self.send_with_retry(
2080 json_txt,
2081 Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
2082 )
2083 .await
2084 .map_err(|e| anyhow::anyhow!("Failed to send unsubscription after retries: {e}"))?;
2085 Ok(())
2086 }
2087
2088 async fn handle_place_order(
2089 &mut self,
2090 params: WsPostOrderParams,
2091 client_order_id: ClientOrderId,
2092 trader_id: TraderId,
2093 strategy_id: StrategyId,
2094 instrument_id: InstrumentId,
2095 ) -> anyhow::Result<()> {
2096 let request_id = self.generate_unique_request_id();
2097
2098 self.pending_place_requests.insert(
2099 request_id.clone(),
2100 (
2101 PendingOrderParams::Regular(params.clone()),
2102 client_order_id,
2103 trader_id,
2104 strategy_id,
2105 instrument_id,
2106 ),
2107 );
2108
2109 let request = OKXWsRequest {
2110 id: Some(request_id.clone()),
2111 op: OKXWsOperation::Order,
2112 exp_time: None,
2113 args: vec![params],
2114 };
2115
2116 let payload = serde_json::to_string(&request)
2117 .map_err(|e| anyhow::anyhow!("Failed to serialize place order request: {e}"))?;
2118
2119 match self
2120 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2121 .await
2122 {
2123 Ok(()) => {
2124 log::debug!("Sent place order request");
2125 Ok(())
2126 }
2127 Err(e) => {
2128 log::error!("Failed to send place order after retries: error={e}");
2129
2130 self.pending_place_requests.remove(&request_id);
2131
2132 let ts_now = self.clock.get_time_ns();
2133 let rejected = OrderRejected::new(
2134 trader_id,
2135 strategy_id,
2136 instrument_id,
2137 client_order_id,
2138 self.account_id,
2139 Ustr::from(&format!("WebSocket send failed: {e}")),
2140 UUID4::new(),
2141 ts_now, ts_now, false, false, );
2146 let _ = self.send(NautilusWsMessage::OrderRejected(rejected));
2147
2148 Err(anyhow::anyhow!("Failed to send place order: {e}"))
2149 }
2150 }
2151 }
2152
2153 async fn handle_place_algo_order(
2154 &mut self,
2155 params: WsPostAlgoOrderParams,
2156 client_order_id: ClientOrderId,
2157 trader_id: TraderId,
2158 strategy_id: StrategyId,
2159 instrument_id: InstrumentId,
2160 ) -> anyhow::Result<()> {
2161 let request_id = self.generate_unique_request_id();
2162
2163 self.pending_place_requests.insert(
2164 request_id.clone(),
2165 (
2166 PendingOrderParams::Algo(params.clone()),
2167 client_order_id,
2168 trader_id,
2169 strategy_id,
2170 instrument_id,
2171 ),
2172 );
2173
2174 let request = OKXWsRequest {
2175 id: Some(request_id.clone()),
2176 op: OKXWsOperation::OrderAlgo,
2177 exp_time: None,
2178 args: vec![params],
2179 };
2180
2181 let payload = serde_json::to_string(&request)
2182 .map_err(|e| anyhow::anyhow!("Failed to serialize place algo order request: {e}"))?;
2183
2184 match self
2185 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2186 .await
2187 {
2188 Ok(()) => {
2189 log::debug!("Sent place algo order request");
2190 Ok(())
2191 }
2192 Err(e) => {
2193 log::error!("Failed to send place algo order after retries: error={e}");
2194
2195 self.pending_place_requests.remove(&request_id);
2196
2197 let ts_now = self.clock.get_time_ns();
2198 let rejected = OrderRejected::new(
2199 trader_id,
2200 strategy_id,
2201 instrument_id,
2202 client_order_id,
2203 self.account_id,
2204 Ustr::from(&format!("WebSocket send failed: {e}")),
2205 UUID4::new(),
2206 ts_now, ts_now, false, false, );
2211 let _ = self.send(NautilusWsMessage::OrderRejected(rejected));
2212
2213 Err(anyhow::anyhow!("Failed to send place algo order: {e}"))
2214 }
2215 }
2216 }
2217
2218 async fn handle_cancel_order(
2219 &mut self,
2220 client_order_id: Option<ClientOrderId>,
2221 venue_order_id: Option<VenueOrderId>,
2222 instrument_id: InstrumentId,
2223 trader_id: TraderId,
2224 strategy_id: StrategyId,
2225 ) -> anyhow::Result<()> {
2226 let mut builder = WsCancelOrderParamsBuilder::default();
2227 builder.inst_id(instrument_id.symbol.as_str());
2228
2229 if let Some(venue_order_id) = venue_order_id {
2230 builder.ord_id(venue_order_id.as_str());
2231 }
2232
2233 if let Some(client_order_id) = client_order_id {
2234 builder.cl_ord_id(client_order_id.as_str());
2235 }
2236
2237 let params = builder
2238 .build()
2239 .map_err(|e| anyhow::anyhow!("Failed to build cancel params: {e}"))?;
2240
2241 let request_id = self.generate_unique_request_id();
2242
2243 if let Some(client_order_id) = client_order_id {
2245 self.pending_cancel_requests.insert(
2246 request_id.clone(),
2247 (
2248 client_order_id,
2249 trader_id,
2250 strategy_id,
2251 instrument_id,
2252 venue_order_id,
2253 ),
2254 );
2255 }
2256
2257 let request = OKXWsRequest {
2258 id: Some(request_id.clone()),
2259 op: OKXWsOperation::CancelOrder,
2260 exp_time: None,
2261 args: vec![params],
2262 };
2263
2264 let payload = serde_json::to_string(&request)
2265 .map_err(|e| anyhow::anyhow!("Failed to serialize cancel request: {e}"))?;
2266
2267 match self
2268 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
2269 .await
2270 {
2271 Ok(()) => {
2272 log::debug!("Sent cancel order request");
2273 Ok(())
2274 }
2275 Err(e) => {
2276 log::error!("Failed to send cancel order after retries: error={e}");
2277
2278 self.pending_cancel_requests.remove(&request_id);
2279
2280 if let Some(client_order_id) = client_order_id {
2281 let ts_now = self.clock.get_time_ns();
2282 let rejected = OrderCancelRejected::new(
2283 trader_id,
2284 strategy_id,
2285 instrument_id,
2286 client_order_id,
2287 Ustr::from(&format!("WebSocket send failed: {e}")),
2288 UUID4::new(),
2289 ts_now, ts_now, false, venue_order_id,
2293 Some(self.account_id),
2294 );
2295 let _ = self.send(NautilusWsMessage::OrderCancelRejected(rejected));
2296 }
2297
2298 Err(anyhow::anyhow!("Failed to send cancel order: {e}"))
2299 }
2300 }
2301 }
2302
2303 async fn handle_amend_order(
2304 &mut self,
2305 params: WsAmendOrderParams,
2306 client_order_id: ClientOrderId,
2307 trader_id: TraderId,
2308 strategy_id: StrategyId,
2309 instrument_id: InstrumentId,
2310 venue_order_id: Option<VenueOrderId>,
2311 ) -> anyhow::Result<()> {
2312 let request_id = self.generate_unique_request_id();
2313
2314 self.pending_amend_requests.insert(
2315 request_id.clone(),
2316 (
2317 client_order_id,
2318 trader_id,
2319 strategy_id,
2320 instrument_id,
2321 venue_order_id,
2322 ),
2323 );
2324
2325 let request = OKXWsRequest {
2326 id: Some(request_id.clone()),
2327 op: OKXWsOperation::AmendOrder,
2328 exp_time: None,
2329 args: vec![params],
2330 };
2331
2332 let payload = serde_json::to_string(&request)
2333 .map_err(|e| anyhow::anyhow!("Failed to serialize amend order request: {e}"))?;
2334
2335 match self
2336 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_AMEND.to_string()]))
2337 .await
2338 {
2339 Ok(()) => {
2340 log::debug!("Sent amend order request");
2341 Ok(())
2342 }
2343 Err(e) => {
2344 log::error!("Failed to send amend order after retries: error={e}");
2345
2346 self.pending_amend_requests.remove(&request_id);
2347
2348 let ts_now = self.clock.get_time_ns();
2349 let rejected = OrderModifyRejected::new(
2350 trader_id,
2351 strategy_id,
2352 instrument_id,
2353 client_order_id,
2354 Ustr::from(&format!("WebSocket send failed: {e}")),
2355 UUID4::new(),
2356 ts_now, ts_now, false, venue_order_id,
2360 Some(self.account_id),
2361 );
2362 let _ = self.send(NautilusWsMessage::OrderModifyRejected(rejected));
2363
2364 Err(anyhow::anyhow!("Failed to send amend order: {e}"))
2365 }
2366 }
2367 }
2368
2369 async fn handle_cancel_algo_order(
2370 &mut self,
2371 client_order_id: Option<ClientOrderId>,
2372 algo_order_id: Option<VenueOrderId>,
2373 instrument_id: InstrumentId,
2374 trader_id: TraderId,
2375 strategy_id: StrategyId,
2376 ) -> anyhow::Result<()> {
2377 let mut builder = WsCancelAlgoOrderParamsBuilder::default();
2378 builder.inst_id(instrument_id.symbol.as_str());
2379
2380 if let Some(client_order_id) = &client_order_id {
2381 builder.algo_cl_ord_id(client_order_id.as_str());
2382 }
2383
2384 if let Some(algo_id) = &algo_order_id {
2385 builder.algo_id(algo_id.as_str());
2386 }
2387
2388 let params = builder
2389 .build()
2390 .map_err(|e| anyhow::anyhow!("Failed to build cancel algo params: {e}"))?;
2391
2392 let request_id = self.generate_unique_request_id();
2393
2394 if let Some(client_order_id) = client_order_id {
2396 self.pending_cancel_requests.insert(
2397 request_id.clone(),
2398 (client_order_id, trader_id, strategy_id, instrument_id, None),
2399 );
2400 }
2401
2402 let request = OKXWsRequest {
2403 id: Some(request_id.clone()),
2404 op: OKXWsOperation::CancelAlgos,
2405 exp_time: None,
2406 args: vec![params],
2407 };
2408
2409 let payload = serde_json::to_string(&request)
2410 .map_err(|e| anyhow::anyhow!("Failed to serialize cancel algo request: {e}"))?;
2411
2412 match self
2413 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
2414 .await
2415 {
2416 Ok(()) => {
2417 log::debug!("Sent cancel algo order request");
2418 Ok(())
2419 }
2420 Err(e) => {
2421 log::error!("Failed to send cancel algo order after retries: error={e}");
2422
2423 self.pending_cancel_requests.remove(&request_id);
2424
2425 if let Some(client_order_id) = client_order_id {
2426 let ts_now = self.clock.get_time_ns();
2427 let rejected = OrderCancelRejected::new(
2428 trader_id,
2429 strategy_id,
2430 instrument_id,
2431 client_order_id,
2432 Ustr::from(&format!("WebSocket send failed: {e}")),
2433 UUID4::new(),
2434 ts_now, ts_now, false, None,
2438 Some(self.account_id),
2439 );
2440 let _ = self.send(NautilusWsMessage::OrderCancelRejected(rejected));
2441 }
2442
2443 Err(anyhow::anyhow!("Failed to send cancel algo order: {e}"))
2444 }
2445 }
2446 }
2447}
2448
2449pub fn is_post_only_rejection(code: &str, data: &[Value]) -> bool {
2451 if code == OKX_POST_ONLY_ERROR_CODE {
2452 return true;
2453 }
2454
2455 for entry in data {
2456 if let Some(s_code) = entry.get("sCode").and_then(|value| value.as_str())
2457 && s_code == OKX_POST_ONLY_ERROR_CODE
2458 {
2459 return true;
2460 }
2461
2462 if let Some(inner_code) = entry.get("code").and_then(|value| value.as_str())
2463 && inner_code == OKX_POST_ONLY_ERROR_CODE
2464 {
2465 return true;
2466 }
2467 }
2468
2469 false
2470}
2471
2472#[inline]
2474fn contains_ignore_ascii_case(haystack: &str, needle: &str) -> bool {
2475 haystack
2476 .as_bytes()
2477 .windows(needle.len())
2478 .any(|window| window.eq_ignore_ascii_case(needle.as_bytes()))
2479}
2480
2481fn should_retry_okx_error(error: &OKXWsError) -> bool {
2483 match error {
2484 OKXWsError::OkxError { error_code, .. } => should_retry_error_code(error_code),
2485 OKXWsError::TungsteniteError(_) => true, OKXWsError::ClientError(msg) => {
2487 contains_ignore_ascii_case(msg, "timeout")
2489 || contains_ignore_ascii_case(msg, "timed out")
2490 || contains_ignore_ascii_case(msg, "connection")
2491 || contains_ignore_ascii_case(msg, "network")
2492 }
2493 OKXWsError::AuthenticationError(_)
2494 | OKXWsError::JsonError(_)
2495 | OKXWsError::ParsingError(_) => {
2496 false
2498 }
2499 }
2500}
2501
2502fn create_okx_timeout_error(msg: String) -> OKXWsError {
2504 OKXWsError::ClientError(msg)
2505}
2506
2507#[cfg(test)]
2508mod tests {
2509 use std::sync::{Arc, atomic::AtomicBool};
2510
2511 use ahash::AHashMap;
2512 use dashmap::DashMap;
2513 use nautilus_model::{
2514 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
2515 types::{Money, Quantity},
2516 };
2517 use nautilus_network::websocket::{AuthTracker, SubscriptionState};
2518 use rstest::rstest;
2519 use ustr::Ustr;
2520
2521 use super::{NautilusWsMessage, OKXWsFeedHandler};
2522 use crate::websocket::parse::OrderStateSnapshot;
2523
2524 const OKX_WS_TOPIC_DELIMITER: char = ':';
2525
2526 #[allow(clippy::type_complexity)]
2527 fn create_test_handler() -> (
2528 OKXWsFeedHandler,
2529 tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>,
2530 Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
2531 Arc<DashMap<ClientOrderId, ClientOrderId>>,
2532 ) {
2533 let account_id = AccountId::new("OKX-001");
2534 let signal = Arc::new(AtomicBool::new(false));
2535 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
2536 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
2537 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
2538 let active_client_orders = Arc::new(DashMap::new());
2539 let client_id_aliases = Arc::new(DashMap::new());
2540 let auth_tracker = AuthTracker::new();
2541 let subscriptions_state = SubscriptionState::new(OKX_WS_TOPIC_DELIMITER);
2542
2543 let handler = OKXWsFeedHandler::new(
2544 account_id,
2545 signal,
2546 cmd_rx,
2547 raw_rx,
2548 out_tx,
2549 active_client_orders.clone(),
2550 client_id_aliases.clone(),
2551 auth_tracker,
2552 subscriptions_state,
2553 );
2554
2555 (handler, out_rx, active_client_orders, client_id_aliases)
2556 }
2557
2558 #[rstest]
2559 fn test_is_post_only_rejection_detects_by_code() {
2560 assert!(super::is_post_only_rejection("51019", &[]));
2561 }
2562
2563 #[rstest]
2564 fn test_is_post_only_rejection_detects_by_inner_code() {
2565 let data = vec![serde_json::json!({
2566 "sCode": "51019"
2567 })];
2568 assert!(super::is_post_only_rejection("50000", &data));
2569 }
2570
2571 #[rstest]
2572 fn test_is_post_only_rejection_false_for_unrelated_error() {
2573 let data = vec![serde_json::json!({
2574 "sMsg": "Insufficient balance"
2575 })];
2576 assert!(!super::is_post_only_rejection("50000", &data));
2577 }
2578
2579 #[rstest]
2580 fn test_cleanup_alias_removes_canonical_entry() {
2581 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2582 let canonical = ClientOrderId::new("PARENT-001");
2583 aliases.insert(canonical, canonical);
2584
2585 aliases.remove(&canonical);
2586 aliases.retain(|_, v| *v != canonical);
2587
2588 assert!(!aliases.contains_key(&canonical));
2589 assert!(aliases.is_empty());
2590 }
2591
2592 #[rstest]
2593 fn test_cleanup_alias_removes_child_alias_pointing_to_canonical() {
2594 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2595 let canonical = ClientOrderId::new("PARENT-001");
2596 let child = ClientOrderId::new("CHILD-001");
2597 aliases.insert(canonical, canonical);
2598 aliases.insert(child, canonical);
2599
2600 aliases.remove(&canonical);
2601 aliases.retain(|_, v| *v != canonical);
2602
2603 assert!(!aliases.contains_key(&canonical));
2604 assert!(!aliases.contains_key(&child));
2605 assert!(aliases.is_empty());
2606 }
2607
2608 #[rstest]
2609 fn test_cleanup_alias_does_not_affect_unrelated_entries() {
2610 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2611 let canonical1 = ClientOrderId::new("PARENT-001");
2612 let child1 = ClientOrderId::new("CHILD-001");
2613 let canonical2 = ClientOrderId::new("PARENT-002");
2614 let child2 = ClientOrderId::new("CHILD-002");
2615 aliases.insert(canonical1, canonical1);
2616 aliases.insert(child1, canonical1);
2617 aliases.insert(canonical2, canonical2);
2618 aliases.insert(child2, canonical2);
2619
2620 aliases.remove(&canonical1);
2621 aliases.retain(|_, v| *v != canonical1);
2622
2623 assert!(!aliases.contains_key(&canonical1));
2624 assert!(!aliases.contains_key(&child1));
2625 assert!(aliases.contains_key(&canonical2));
2626 assert!(aliases.contains_key(&child2));
2627 assert_eq!(aliases.len(), 2);
2628 }
2629
2630 #[rstest]
2631 fn test_cleanup_alias_handles_multiple_children() {
2632 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2633 let canonical = ClientOrderId::new("PARENT-001");
2634 let child1 = ClientOrderId::new("CHILD-001");
2635 let child2 = ClientOrderId::new("CHILD-002");
2636 let child3 = ClientOrderId::new("CHILD-003");
2637 aliases.insert(canonical, canonical);
2638 aliases.insert(child1, canonical);
2639 aliases.insert(child2, canonical);
2640 aliases.insert(child3, canonical);
2641
2642 aliases.remove(&canonical);
2643 aliases.retain(|_, v| *v != canonical);
2644
2645 assert!(aliases.is_empty());
2646 }
2647
2648 #[rstest]
2649 fn test_cleanup_removes_from_all_caches() {
2650 let emitted_accepted: DashMap<VenueOrderId, ()> = DashMap::new();
2651 let order_state_cache: AHashMap<ClientOrderId, u32> = AHashMap::new();
2652 let active_orders: DashMap<ClientOrderId, ()> = DashMap::new();
2653 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2654 let fee_cache: AHashMap<Ustr, f64> = AHashMap::new();
2655 let filled_qty_cache: AHashMap<Ustr, f64> = AHashMap::new();
2656 let canonical = ClientOrderId::new("PARENT-001");
2657 let child = ClientOrderId::new("CHILD-001");
2658 let venue_id = VenueOrderId::new("VENUE-001");
2659
2660 emitted_accepted.insert(venue_id, ());
2661 let mut order_state = order_state_cache;
2662 order_state.insert(canonical, 1);
2663 active_orders.insert(canonical, ());
2664 aliases.insert(canonical, canonical);
2665 aliases.insert(child, canonical);
2666 let mut fees = fee_cache;
2667 fees.insert(venue_id.inner(), 0.001);
2668 let mut filled = filled_qty_cache;
2669 filled.insert(venue_id.inner(), 1.0);
2670
2671 emitted_accepted.remove(&venue_id);
2672 order_state.remove(&canonical);
2673 active_orders.remove(&canonical);
2674 aliases.remove(&canonical);
2675 aliases.retain(|_, v| *v != canonical);
2676 fees.remove(&venue_id.inner());
2677 filled.remove(&venue_id.inner());
2678
2679 assert!(emitted_accepted.is_empty());
2680 assert!(order_state.is_empty());
2681 assert!(active_orders.is_empty());
2682 assert!(aliases.is_empty());
2683 assert!(fees.is_empty());
2684 assert!(filled.is_empty());
2685 }
2686
2687 #[rstest]
2688 fn test_alias_registration_parent_with_child() {
2689 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2690 let parent = ClientOrderId::new("PARENT-001");
2691 let child = ClientOrderId::new("CHILD-001");
2692 aliases.insert(parent, parent);
2693 aliases.insert(child, parent);
2694
2695 assert_eq!(*aliases.get(&parent).unwrap(), parent);
2696 assert_eq!(*aliases.get(&child).unwrap(), parent);
2697 }
2698
2699 #[rstest]
2700 fn test_alias_registration_standalone_order() {
2701 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2702 let order_id = ClientOrderId::new("ORDER-001");
2703 aliases.insert(order_id, order_id);
2704
2705 assert_eq!(*aliases.get(&order_id).unwrap(), order_id);
2706 }
2707
2708 #[rstest]
2709 fn test_alias_lookup_returns_canonical() {
2710 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2711 let canonical = ClientOrderId::new("PARENT-001");
2712 let child = ClientOrderId::new("CHILD-001");
2713
2714 aliases.insert(canonical, canonical);
2715 aliases.insert(child, canonical);
2716
2717 let resolved = aliases.get(&child).map(|v| *v);
2718 assert_eq!(resolved, Some(canonical));
2719 }
2720
2721 #[rstest]
2722 fn test_handler_register_client_order_aliases_with_parent() {
2723 let (handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2724
2725 let child = Some(ClientOrderId::new("CHILD-001"));
2726 let parent = Some(ClientOrderId::new("PARENT-001"));
2727
2728 let result = handler.register_client_order_aliases(&child, &parent);
2729
2730 assert_eq!(result, Some(ClientOrderId::new("PARENT-001")));
2731 assert!(client_id_aliases.contains_key(&ClientOrderId::new("PARENT-001")));
2732 assert!(client_id_aliases.contains_key(&ClientOrderId::new("CHILD-001")));
2733 assert_eq!(
2734 *client_id_aliases
2735 .get(&ClientOrderId::new("CHILD-001"))
2736 .unwrap(),
2737 ClientOrderId::new("PARENT-001")
2738 );
2739 }
2740
2741 #[rstest]
2742 fn test_handler_register_client_order_aliases_without_parent() {
2743 let (handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2744
2745 let child = Some(ClientOrderId::new("ORDER-001"));
2746 let parent: Option<ClientOrderId> = None;
2747
2748 let result = handler.register_client_order_aliases(&child, &parent);
2749
2750 assert_eq!(result, Some(ClientOrderId::new("ORDER-001")));
2751 assert!(client_id_aliases.contains_key(&ClientOrderId::new("ORDER-001")));
2752 assert_eq!(
2753 *client_id_aliases
2754 .get(&ClientOrderId::new("ORDER-001"))
2755 .unwrap(),
2756 ClientOrderId::new("ORDER-001")
2757 );
2758 }
2759
2760 #[rstest]
2761 fn test_handler_cleanup_terminal_order_removes_all_state() {
2762 let (mut handler, _out_rx, active_client_orders, client_id_aliases) = create_test_handler();
2763
2764 let canonical = ClientOrderId::new("PARENT-001");
2765 let child = ClientOrderId::new("CHILD-001");
2766 let venue_id = VenueOrderId::new("VENUE-001");
2767 let trader_id = TraderId::new("TRADER-001");
2768 let strategy_id = StrategyId::new("STRATEGY-001");
2769 let instrument_id = InstrumentId::from("ETH-USDT-PERP.OKX");
2770
2771 active_client_orders.insert(canonical, (trader_id, strategy_id, instrument_id));
2772 client_id_aliases.insert(canonical, canonical);
2773 client_id_aliases.insert(child, canonical);
2774 handler
2775 .fee_cache
2776 .insert(venue_id.inner(), Money::from("0.001 USDT"));
2777 handler
2778 .filled_qty_cache
2779 .insert(venue_id.inner(), Quantity::from("1.0"));
2780 handler.order_state_cache.insert(
2781 canonical,
2782 OrderStateSnapshot {
2783 venue_order_id: venue_id,
2784 quantity: Quantity::from("1.0"),
2785 price: None,
2786 },
2787 );
2788
2789 handler.cleanup_terminal_order(&canonical, &venue_id);
2790
2791 assert!(!active_client_orders.contains_key(&canonical));
2792 assert!(!client_id_aliases.contains_key(&canonical));
2793 assert!(!client_id_aliases.contains_key(&child));
2794 assert!(!handler.fee_cache.contains_key(&venue_id.inner()));
2795 assert!(!handler.filled_qty_cache.contains_key(&venue_id.inner()));
2796 assert!(!handler.order_state_cache.contains_key(&canonical));
2797 }
2798
2799 #[rstest]
2800 fn test_handler_cleanup_terminal_order_removes_multiple_children() {
2801 let (mut handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2802
2803 let canonical = ClientOrderId::new("PARENT-001");
2804 let child1 = ClientOrderId::new("CHILD-001");
2805 let child2 = ClientOrderId::new("CHILD-002");
2806 let child3 = ClientOrderId::new("CHILD-003");
2807 let venue_id = VenueOrderId::new("VENUE-001");
2808
2809 client_id_aliases.insert(canonical, canonical);
2810 client_id_aliases.insert(child1, canonical);
2811 client_id_aliases.insert(child2, canonical);
2812 client_id_aliases.insert(child3, canonical);
2813
2814 handler.cleanup_terminal_order(&canonical, &venue_id);
2815
2816 assert!(!client_id_aliases.contains_key(&canonical));
2817 assert!(!client_id_aliases.contains_key(&child1));
2818 assert!(!client_id_aliases.contains_key(&child2));
2819 assert!(!client_id_aliases.contains_key(&child3));
2820 assert!(client_id_aliases.is_empty());
2821 }
2822
2823 #[rstest]
2824 fn test_handler_cleanup_does_not_affect_other_orders() {
2825 let (mut handler, _out_rx, active_client_orders, client_id_aliases) = create_test_handler();
2826
2827 let canonical1 = ClientOrderId::new("PARENT-001");
2828 let child1 = ClientOrderId::new("CHILD-001");
2829 let venue_id1 = VenueOrderId::new("VENUE-001");
2830
2831 let canonical2 = ClientOrderId::new("PARENT-002");
2832 let child2 = ClientOrderId::new("CHILD-002");
2833 let venue_id2 = VenueOrderId::new("VENUE-002");
2834
2835 let trader_id = TraderId::new("TRADER-001");
2836 let strategy_id = StrategyId::new("STRATEGY-001");
2837 let instrument_id = InstrumentId::from("ETH-USDT-PERP.OKX");
2838
2839 active_client_orders.insert(canonical1, (trader_id, strategy_id, instrument_id));
2840 active_client_orders.insert(canonical2, (trader_id, strategy_id, instrument_id));
2841 client_id_aliases.insert(canonical1, canonical1);
2842 client_id_aliases.insert(child1, canonical1);
2843 client_id_aliases.insert(canonical2, canonical2);
2844 client_id_aliases.insert(child2, canonical2);
2845 handler
2846 .fee_cache
2847 .insert(venue_id1.inner(), Money::from("0.001 USDT"));
2848 handler
2849 .fee_cache
2850 .insert(venue_id2.inner(), Money::from("0.002 USDT"));
2851
2852 handler.cleanup_terminal_order(&canonical1, &venue_id1);
2853
2854 assert!(!active_client_orders.contains_key(&canonical1));
2855 assert!(!client_id_aliases.contains_key(&canonical1));
2856 assert!(!client_id_aliases.contains_key(&child1));
2857 assert!(!handler.fee_cache.contains_key(&venue_id1.inner()));
2858
2859 assert!(active_client_orders.contains_key(&canonical2));
2860 assert!(client_id_aliases.contains_key(&canonical2));
2861 assert!(client_id_aliases.contains_key(&child2));
2862 assert!(handler.fee_cache.contains_key(&venue_id2.inner()));
2863 }
2864
2865 mod channel_routing {
2866 use nautilus_core::nanos::UnixNanos;
2867 use nautilus_model::{
2868 identifiers::{InstrumentId, Symbol},
2869 instruments::{CryptoPerpetual, CurrencyPair, Instrument, InstrumentAny},
2870 types::{Currency, Price, Quantity},
2871 };
2872 use rstest::rstest;
2873 use ustr::Ustr;
2874
2875 use super::*;
2876 use crate::{
2877 common::{enums::OKXBookAction, testing::load_test_json},
2878 websocket::{enums::OKXWsChannel, messages::OKXWsMessage},
2879 };
2880
2881 fn create_spot_instrument() -> InstrumentAny {
2882 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2883 InstrumentAny::CurrencyPair(CurrencyPair::new(
2884 instrument_id,
2885 Symbol::from("BTC-USDT"),
2886 Currency::BTC(),
2887 Currency::USDT(),
2888 2,
2889 8,
2890 Price::from("0.01"),
2891 Quantity::from("0.00000001"),
2892 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
2905 UnixNanos::default(),
2906 ))
2907 }
2908
2909 fn create_swap_instrument() -> InstrumentAny {
2910 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2911 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2912 instrument_id,
2913 Symbol::from("BTC-USDT-SWAP"),
2914 Currency::BTC(),
2915 Currency::USDT(),
2916 Currency::USDT(),
2917 false,
2918 2,
2919 8,
2920 Price::from("0.01"),
2921 Quantity::from("0.00000001"),
2922 None,
2923 None,
2924 None,
2925 None,
2926 None,
2927 None,
2928 None,
2929 None,
2930 None,
2931 None,
2932 None,
2933 None,
2934 UnixNanos::default(),
2935 UnixNanos::default(),
2936 ))
2937 }
2938
2939 fn create_handler_with_instruments(instruments: Vec<InstrumentAny>) -> OKXWsFeedHandler {
2940 let (mut handler, _, _, _) = create_test_handler();
2941 for inst in instruments {
2942 handler
2943 .instruments_cache
2944 .insert(inst.symbol().inner(), inst);
2945 }
2946 handler
2947 }
2948
2949 #[rstest]
2950 fn test_parse_raw_message_ticker_channel() {
2951 let json = load_test_json("ws_tickers.json");
2952 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2953
2954 match msg {
2955 OKXWsMessage::Data { arg, data } => {
2956 assert!(
2957 matches!(arg.channel, OKXWsChannel::Tickers),
2958 "Expected Tickers channel"
2959 );
2960 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
2961 assert!(data.is_array());
2962 }
2963 _ => panic!("Expected OKXWsMessage::Data variant"),
2964 }
2965 }
2966
2967 #[rstest]
2968 fn test_parse_raw_message_trades_channel() {
2969 let json = load_test_json("ws_trades.json");
2970 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2971
2972 match msg {
2973 OKXWsMessage::Data { arg, data } => {
2974 assert!(
2975 matches!(arg.channel, OKXWsChannel::Trades),
2976 "Expected Trades channel"
2977 );
2978 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USD")));
2979 assert!(data.is_array());
2980 }
2981 _ => panic!("Expected OKXWsMessage::Data variant"),
2982 }
2983 }
2984
2985 #[rstest]
2986 fn test_parse_raw_message_books_channel() {
2987 let json = load_test_json("ws_books_snapshot.json");
2988 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2989
2990 match msg {
2991 OKXWsMessage::BookData { arg, action, data } => {
2992 assert!(
2993 matches!(arg.channel, OKXWsChannel::Books),
2994 "Expected Books channel"
2995 );
2996 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
2997 assert!(
2998 matches!(action, OKXBookAction::Snapshot),
2999 "Expected snapshot action"
3000 );
3001 assert!(!data.is_empty());
3002 }
3003 _ => panic!("Expected OKXWsMessage::BookData variant"),
3004 }
3005 }
3006
3007 #[rstest]
3008 fn test_parse_raw_message_candle_channel() {
3009 let json = load_test_json("ws_candle.json");
3010 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3011
3012 match msg {
3013 OKXWsMessage::Data { arg, data } => {
3014 assert!(
3016 matches!(arg.channel, OKXWsChannel::Candle1Day),
3017 "Expected Candle1Day channel, got {:?}",
3018 arg.channel
3019 );
3020 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
3021 assert!(data.is_array());
3022 }
3023 _ => panic!("Expected OKXWsMessage::Data variant"),
3024 }
3025 }
3026
3027 #[rstest]
3028 fn test_parse_raw_message_funding_rate_channel() {
3029 let json = load_test_json("ws_funding_rate.json");
3030 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3031
3032 match msg {
3033 OKXWsMessage::Data { arg, data } => {
3034 assert!(
3035 matches!(arg.channel, OKXWsChannel::FundingRate),
3036 "Expected FundingRate channel"
3037 );
3038 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT-SWAP")));
3039 assert!(data.is_array());
3040 }
3041 _ => panic!("Expected OKXWsMessage::Data variant"),
3042 }
3043 }
3044
3045 #[rstest]
3046 fn test_parse_raw_message_bbo_tbt_channel() {
3047 let json = load_test_json("ws_bbo_tbt.json");
3048 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3049
3050 match msg {
3051 OKXWsMessage::Data { arg, data } => {
3052 assert!(
3053 matches!(arg.channel, OKXWsChannel::BboTbt),
3054 "Expected BboTbt channel"
3055 );
3056 assert!(data.is_array());
3057 }
3058 _ => panic!("Expected OKXWsMessage::Data variant"),
3059 }
3060 }
3061
3062 #[rstest]
3063 fn test_handle_other_channel_data_tickers() {
3064 let mut handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3065 let json = load_test_json("ws_tickers.json");
3066 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3067
3068 let OKXWsMessage::Data { arg, data } = msg else {
3069 panic!("Expected OKXWsMessage::Data");
3070 };
3071
3072 let ts_init = UnixNanos::from(1_000_000_000u64);
3073 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3074
3075 assert!(result.is_some());
3076 match result.unwrap() {
3077 NautilusWsMessage::Data(payloads) => {
3078 assert!(!payloads.is_empty(), "Should produce data payloads");
3079 }
3080 other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3081 }
3082 }
3083
3084 #[rstest]
3085 fn test_handle_other_channel_data_trades() {
3086 let instrument_id = InstrumentId::from("BTC-USD.OKX");
3088 let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
3089 instrument_id,
3090 Symbol::from("BTC-USD"),
3091 Currency::BTC(),
3092 Currency::USD(),
3093 1,
3094 8,
3095 Price::from("0.1"),
3096 Quantity::from("0.00000001"),
3097 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
3110 UnixNanos::default(),
3111 ));
3112
3113 let mut handler = create_handler_with_instruments(vec![instrument]);
3114 let json = load_test_json("ws_trades.json");
3115 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3116
3117 let OKXWsMessage::Data { arg, data } = msg else {
3118 panic!("Expected OKXWsMessage::Data");
3119 };
3120
3121 let ts_init = UnixNanos::from(1_000_000_000u64);
3122 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3123
3124 assert!(result.is_some());
3125 match result.unwrap() {
3126 NautilusWsMessage::Data(payloads) => {
3127 assert!(!payloads.is_empty(), "Should produce trade data payloads");
3128 }
3129 other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3130 }
3131 }
3132
3133 #[rstest]
3134 fn test_handle_book_data_snapshot() {
3135 let handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3136 let json = load_test_json("ws_books_snapshot.json");
3137 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3138
3139 let OKXWsMessage::BookData { arg, action, data } = msg else {
3140 panic!("Expected OKXWsMessage::BookData");
3141 };
3142
3143 let ts_init = UnixNanos::from(1_000_000_000u64);
3144 let result = handler.handle_book_data(arg, action, data, ts_init);
3145
3146 assert!(result.is_some());
3147 match result.unwrap() {
3148 NautilusWsMessage::Data(payloads) => {
3149 assert!(!payloads.is_empty(), "Should produce order book payloads");
3150 }
3151 other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3152 }
3153 }
3154
3155 #[rstest]
3156 fn test_handle_book_data_update() {
3157 let handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3158 let json = load_test_json("ws_books_update.json");
3159 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3160
3161 let OKXWsMessage::BookData { arg, action, data } = msg else {
3162 panic!("Expected OKXWsMessage::BookData");
3163 };
3164
3165 let ts_init = UnixNanos::from(1_000_000_000u64);
3166 let result = handler.handle_book_data(arg, action, data, ts_init);
3167
3168 assert!(result.is_some());
3169 match result.unwrap() {
3170 NautilusWsMessage::Data(payloads) => {
3171 assert!(
3172 !payloads.is_empty(),
3173 "Should produce order book delta payloads"
3174 );
3175 }
3176 other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3177 }
3178 }
3179
3180 #[rstest]
3181 fn test_handle_other_channel_data_candles() {
3182 let mut handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3183 let json = load_test_json("ws_candle.json");
3184 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3185
3186 let OKXWsMessage::Data { arg, data } = msg else {
3187 panic!("Expected OKXWsMessage::Data");
3188 };
3189
3190 let ts_init = UnixNanos::from(1_000_000_000u64);
3191 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3192
3193 assert!(result.is_some());
3194 match result.unwrap() {
3195 NautilusWsMessage::Data(payloads) => {
3196 assert!(!payloads.is_empty(), "Should produce bar data payloads");
3197 }
3198 other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3199 }
3200 }
3201
3202 #[rstest]
3203 fn test_handle_other_channel_data_funding_rate() {
3204 let mut handler = create_handler_with_instruments(vec![create_swap_instrument()]);
3205 let json = load_test_json("ws_funding_rate.json");
3206 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3207
3208 let OKXWsMessage::Data { arg, data } = msg else {
3209 panic!("Expected OKXWsMessage::Data");
3210 };
3211
3212 let ts_init = UnixNanos::from(1_000_000_000u64);
3213 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3214
3215 assert!(result.is_none() || matches!(result, Some(NautilusWsMessage::FundingRates(_))));
3217 }
3218
3219 #[rstest]
3220 fn test_handle_account_data_parses_successfully() {
3221 let mut handler = create_handler_with_instruments(vec![]);
3222 let json = load_test_json("ws_account.json");
3223 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3224
3225 let OKXWsMessage::Data { data, .. } = msg else {
3226 panic!("Expected OKXWsMessage::Data");
3227 };
3228
3229 let ts_init = UnixNanos::from(1_000_000_000u64);
3230 let result = handler.handle_account_data(data, ts_init);
3231
3232 assert!(result.is_some());
3233 match result.unwrap() {
3234 NautilusWsMessage::AccountUpdate(account_state) => {
3235 assert!(
3236 !account_state.balances.is_empty(),
3237 "Should have balance data"
3238 );
3239 }
3240 other => panic!("Expected NautilusWsMessage::AccountUpdate, got {other:?}"),
3241 }
3242 }
3243
3244 #[rstest]
3245 fn test_handle_other_channel_data_missing_instrument() {
3246 let mut handler = create_handler_with_instruments(vec![]);
3247 let json = load_test_json("ws_tickers.json");
3248 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3249
3250 let OKXWsMessage::Data { arg, data } = msg else {
3251 panic!("Expected OKXWsMessage::Data");
3252 };
3253
3254 let ts_init = UnixNanos::from(1_000_000_000u64);
3255 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3256
3257 assert!(result.is_none());
3259 }
3260
3261 #[rstest]
3262 fn test_handle_book_data_missing_instrument() {
3263 let handler = create_handler_with_instruments(vec![]);
3264 let json = load_test_json("ws_books_snapshot.json");
3265 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3266
3267 let OKXWsMessage::BookData { arg, action, data } = msg else {
3268 panic!("Expected OKXWsMessage::BookData");
3269 };
3270
3271 let ts_init = UnixNanos::from(1_000_000_000u64);
3272 let result = handler.handle_book_data(arg, action, data, ts_init);
3273
3274 assert!(result.is_none());
3276 }
3277 }
3278}