1use std::{
32 collections::VecDeque,
33 sync::{
34 Arc,
35 atomic::{AtomicBool, AtomicU64, Ordering},
36 },
37};
38
39use ahash::AHashMap;
40use dashmap::DashMap;
41use nautilus_common::cache::fifo::FifoCache;
42use nautilus_core::{AtomicTime, UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
43use nautilus_model::{
44 enums::{OrderStatus, OrderType},
45 events::{
46 AccountState, OrderAccepted, OrderCancelRejected, OrderModifyRejected, OrderRejected,
47 },
48 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
49 instruments::{Instrument, InstrumentAny},
50 types::{Money, Quantity},
51};
52use nautilus_network::{
53 RECONNECTED,
54 retry::{RetryManager, create_websocket_retry_manager},
55 websocket::{AuthTracker, SubscriptionState, TEXT_PING, TEXT_PONG, WebSocketClient},
56};
57use serde_json::Value;
58use tokio_tungstenite::tungstenite::Message;
59use ustr::Ustr;
60
61use super::{
62 enums::{OKXSubscriptionEvent, OKXWsChannel, OKXWsOperation},
63 error::OKXWsError,
64 messages::{
65 ExecutionReport, NautilusWsMessage, OKXAlgoOrderMsg, OKXBookMsg, OKXOrderMsg,
66 OKXSubscription, OKXSubscriptionArg, OKXWebSocketArg, OKXWebSocketError, OKXWsMessage,
67 OKXWsRequest, WsAmendOrderParams, WsCancelAlgoOrderParamsBuilder,
68 WsCancelOrderParamsBuilder, WsMassCancelParams, WsPostAlgoOrderParams, WsPostOrderParams,
69 },
70 parse::{
71 OrderStateSnapshot, ParsedOrderEvent, parse_algo_order_msg, parse_book_msg_vec,
72 parse_order_event, parse_order_msg, parse_ws_message_data,
73 },
74 subscription::topic_from_websocket_arg,
75};
76use crate::{
77 common::{
78 consts::{
79 OKX_POST_ONLY_CANCEL_REASON, OKX_POST_ONLY_CANCEL_SOURCE, OKX_POST_ONLY_ERROR_CODE,
80 should_retry_error_code,
81 },
82 enums::{
83 OKXBookAction, OKXInstrumentType, OKXOrderStatus, OKXSide, OKXTargetCurrency,
84 OKXTradeMode,
85 },
86 parse::{
87 determine_order_type, is_market_price, okx_instrument_type, parse_account_state,
88 parse_client_order_id, parse_millisecond_timestamp, parse_position_status_report,
89 parse_price, parse_quantity,
90 },
91 },
92 http::models::{OKXAccount, OKXPosition},
93 websocket::client::{
94 OKX_RATE_LIMIT_KEY_AMEND, OKX_RATE_LIMIT_KEY_CANCEL, OKX_RATE_LIMIT_KEY_ORDER,
95 OKX_RATE_LIMIT_KEY_SUBSCRIPTION,
96 },
97};
98
99type PlaceRequestData = (
101 PendingOrderParams,
102 ClientOrderId,
103 TraderId,
104 StrategyId,
105 InstrumentId,
106);
107
108type CancelRequestData = (
110 ClientOrderId,
111 TraderId,
112 StrategyId,
113 InstrumentId,
114 Option<VenueOrderId>,
115);
116
117type AmendRequestData = (
119 ClientOrderId,
120 TraderId,
121 StrategyId,
122 InstrumentId,
123 Option<VenueOrderId>,
124);
125
126#[derive(Debug)]
127pub enum PendingOrderParams {
128 Regular(WsPostOrderParams),
129 Algo(WsPostAlgoOrderParams),
130}
131
132#[allow(
134 clippy::large_enum_variant,
135 reason = "Commands are ephemeral and immediately consumed"
136)]
137#[allow(missing_debug_implementations)]
138pub enum HandlerCommand {
139 SetClient(WebSocketClient),
140 Disconnect,
141 Authenticate {
142 payload: String,
143 },
144 InitializeInstruments(Vec<InstrumentAny>),
145 UpdateInstrument(InstrumentAny),
146 Subscribe {
147 args: Vec<OKXSubscriptionArg>,
148 },
149 Unsubscribe {
150 args: Vec<OKXSubscriptionArg>,
151 },
152 PlaceOrder {
153 params: WsPostOrderParams,
154 client_order_id: ClientOrderId,
155 trader_id: TraderId,
156 strategy_id: StrategyId,
157 instrument_id: InstrumentId,
158 },
159 PlaceAlgoOrder {
160 params: WsPostAlgoOrderParams,
161 client_order_id: ClientOrderId,
162 trader_id: TraderId,
163 strategy_id: StrategyId,
164 instrument_id: InstrumentId,
165 },
166 AmendOrder {
167 params: WsAmendOrderParams,
168 client_order_id: ClientOrderId,
169 trader_id: TraderId,
170 strategy_id: StrategyId,
171 instrument_id: InstrumentId,
172 venue_order_id: Option<VenueOrderId>,
173 },
174 CancelOrder {
175 client_order_id: Option<ClientOrderId>,
176 venue_order_id: Option<VenueOrderId>,
177 instrument_id: InstrumentId,
178 trader_id: TraderId,
179 strategy_id: StrategyId,
180 },
181 CancelAlgoOrder {
182 client_order_id: Option<ClientOrderId>,
183 algo_order_id: Option<VenueOrderId>,
184 instrument_id: InstrumentId,
185 trader_id: TraderId,
186 strategy_id: StrategyId,
187 },
188 MassCancel {
189 instrument_id: InstrumentId,
190 },
191 BatchPlaceOrders {
192 args: Vec<Value>,
193 request_id: String,
194 },
195 BatchAmendOrders {
196 args: Vec<Value>,
197 request_id: String,
198 },
199 BatchCancelOrders {
200 args: Vec<Value>,
201 request_id: String,
202 },
203}
204
205pub(super) struct OKXWsFeedHandler {
206 clock: &'static AtomicTime,
207 account_id: AccountId,
208 signal: Arc<AtomicBool>,
209 inner: Option<WebSocketClient>,
210 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
211 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
212 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
213 auth_tracker: AuthTracker,
214 subscriptions_state: SubscriptionState,
215 retry_manager: RetryManager<OKXWsError>,
216 pending_place_requests: AHashMap<String, PlaceRequestData>,
217 pending_cancel_requests: AHashMap<String, CancelRequestData>,
218 pending_amend_requests: AHashMap<String, AmendRequestData>,
219 pending_mass_cancel_requests: AHashMap<String, InstrumentId>,
220 pending_messages: VecDeque<NautilusWsMessage>,
221 active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
222 client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
223 inst_id_code_cache: Arc<DashMap<Ustr, u64>>,
224 emitted_accepted: FifoCache<VenueOrderId, 10_000>,
225 instruments_cache: AHashMap<Ustr, InstrumentAny>,
226 fee_cache: AHashMap<Ustr, Money>,
227 filled_qty_cache: AHashMap<Ustr, Quantity>,
228 order_state_cache: AHashMap<ClientOrderId, OrderStateSnapshot>,
229 funding_rate_cache: AHashMap<Ustr, (Ustr, u64)>, last_account_state: Option<AccountState>,
231 request_id_counter: AtomicU64,
232}
233
234impl OKXWsFeedHandler {
235 #[allow(clippy::too_many_arguments)]
237 pub fn new(
238 account_id: AccountId,
239 signal: Arc<AtomicBool>,
240 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
241 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
242 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
243 active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
244 client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
245 inst_id_code_cache: Arc<DashMap<Ustr, u64>>,
246 auth_tracker: AuthTracker,
247 subscriptions_state: SubscriptionState,
248 ) -> Self {
249 Self {
250 clock: get_atomic_clock_realtime(),
251 account_id,
252 signal,
253 inner: None,
254 cmd_rx,
255 raw_rx,
256 out_tx,
257 auth_tracker,
258 subscriptions_state,
259 retry_manager: create_websocket_retry_manager(),
260 pending_place_requests: AHashMap::new(),
261 pending_cancel_requests: AHashMap::new(),
262 pending_amend_requests: AHashMap::new(),
263 pending_mass_cancel_requests: AHashMap::new(),
264 pending_messages: VecDeque::new(),
265 active_client_orders,
266 client_id_aliases,
267 inst_id_code_cache,
268 emitted_accepted: FifoCache::new(),
269 instruments_cache: AHashMap::new(),
270 fee_cache: AHashMap::new(),
271 filled_qty_cache: AHashMap::new(),
272 order_state_cache: AHashMap::new(),
273 funding_rate_cache: AHashMap::new(),
274 last_account_state: None,
275 request_id_counter: AtomicU64::new(0),
276 }
277 }
278
279 pub(super) fn is_stopped(&self) -> bool {
280 self.signal.load(std::sync::atomic::Ordering::Acquire)
281 }
282
283 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
284 self.out_tx.send(msg).map_err(|_| ())
285 }
286
287 async fn send_with_retry(
288 &self,
289 payload: String,
290 rate_limit_keys: Option<&[Ustr]>,
291 ) -> Result<(), OKXWsError> {
292 if let Some(client) = &self.inner {
293 let keys_owned: Option<Vec<Ustr>> = rate_limit_keys.map(|k| k.to_vec());
294 self.retry_manager
295 .execute_with_retry(
296 "websocket_send",
297 || {
298 let payload = payload.clone();
299 let keys = keys_owned.clone();
300 async move {
301 client
302 .send_text(payload, keys.as_deref())
303 .await
304 .map_err(|e| OKXWsError::ClientError(format!("Send failed: {e}")))
305 }
306 },
307 should_retry_okx_error,
308 create_okx_timeout_error,
309 )
310 .await
311 } else {
312 Err(OKXWsError::ClientError(
313 "No active WebSocket client".to_string(),
314 ))
315 }
316 }
317
318 pub(super) async fn send_pong(&self) -> anyhow::Result<()> {
319 match self.send_with_retry(TEXT_PONG.to_string(), None).await {
320 Ok(()) => {
321 log::trace!("Sent pong response to OKX text ping");
322 Ok(())
323 }
324 Err(e) => {
325 log::warn!("Failed to send pong after retries: error={e}");
326 Err(anyhow::anyhow!("Failed to send pong: {e}"))
327 }
328 }
329 }
330
331 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
332 if let Some(message) = self.pending_messages.pop_front() {
333 return Some(message);
334 }
335
336 loop {
337 tokio::select! {
338 Some(cmd) = self.cmd_rx.recv() => {
339 match cmd {
340 HandlerCommand::SetClient(client) => {
341 log::debug!("Handler received WebSocket client");
342 self.inner = Some(client);
343 }
344 HandlerCommand::Disconnect => {
345 log::debug!("Handler disconnecting WebSocket client");
346 self.inner = None;
347 return None;
348 }
349 HandlerCommand::Authenticate { payload } => {
350 if let Err(e) = self.send_with_retry(payload, Some(OKX_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice())).await {
351 log::error!("Failed to send authentication message after retries: error={e}");
352 }
353 }
354 HandlerCommand::InitializeInstruments(instruments) => {
355 for inst in instruments {
356 self.instruments_cache.insert(inst.symbol().inner(), inst);
357 }
358 }
359 HandlerCommand::UpdateInstrument(inst) => {
360 self.instruments_cache.insert(inst.symbol().inner(), inst);
361 }
362 HandlerCommand::Subscribe { args } => {
363 if let Err(e) = self.handle_subscribe(args).await {
364 log::error!("Failed to handle subscribe command: error={e}");
365 }
366 }
367 HandlerCommand::Unsubscribe { args } => {
368 if let Err(e) = self.handle_unsubscribe(args).await {
369 log::error!("Failed to handle unsubscribe command: error={e}");
370 }
371 }
372 HandlerCommand::CancelOrder {
373 client_order_id,
374 venue_order_id,
375 instrument_id,
376 trader_id,
377 strategy_id,
378 } => {
379 if let Err(e) = self
380 .handle_cancel_order(
381 client_order_id,
382 venue_order_id,
383 instrument_id,
384 trader_id,
385 strategy_id,
386 )
387 .await
388 {
389 log::error!("Failed to handle cancel order command: error={e}");
390 }
391 }
392 HandlerCommand::CancelAlgoOrder {
393 client_order_id,
394 algo_order_id,
395 instrument_id,
396 trader_id,
397 strategy_id,
398 } => {
399 if let Err(e) = self
400 .handle_cancel_algo_order(
401 client_order_id,
402 algo_order_id,
403 instrument_id,
404 trader_id,
405 strategy_id,
406 )
407 .await
408 {
409 log::error!("Failed to handle cancel algo order command: error={e}");
410 }
411 }
412 HandlerCommand::PlaceOrder {
413 params,
414 client_order_id,
415 trader_id,
416 strategy_id,
417 instrument_id,
418 } => {
419 if let Err(e) = self
420 .handle_place_order(
421 params,
422 client_order_id,
423 trader_id,
424 strategy_id,
425 instrument_id,
426 )
427 .await
428 {
429 log::error!("Failed to handle place order command: error={e}");
430 }
431 }
432 HandlerCommand::PlaceAlgoOrder {
433 params,
434 client_order_id,
435 trader_id,
436 strategy_id,
437 instrument_id,
438 } => {
439 if let Err(e) = self
440 .handle_place_algo_order(
441 params,
442 client_order_id,
443 trader_id,
444 strategy_id,
445 instrument_id,
446 )
447 .await
448 {
449 log::error!("Failed to handle place algo order command: error={e}");
450 }
451 }
452 HandlerCommand::AmendOrder {
453 params,
454 client_order_id,
455 trader_id,
456 strategy_id,
457 instrument_id,
458 venue_order_id,
459 } => {
460 if let Err(e) = self
461 .handle_amend_order(
462 params,
463 client_order_id,
464 trader_id,
465 strategy_id,
466 instrument_id,
467 venue_order_id,
468 )
469 .await
470 {
471 log::error!("Failed to handle amend order command: error={e}");
472 }
473 }
474 HandlerCommand::MassCancel { instrument_id } => {
475 if let Err(e) = self.handle_mass_cancel(instrument_id).await {
476 log::error!("Failed to handle mass cancel command: error={e}");
477 }
478 }
479 HandlerCommand::BatchCancelOrders { args, request_id } => {
480 if let Err(e) = self.handle_batch_cancel_orders(args, request_id).await {
481 log::error!("Failed to handle batch cancel orders command: error={e}");
482 }
483 }
484 HandlerCommand::BatchPlaceOrders { args, request_id } => {
485 if let Err(e) = self.handle_batch_place_orders(args, request_id).await {
486 log::error!("Failed to handle batch place orders command: error={e}");
487 }
488 }
489 HandlerCommand::BatchAmendOrders { args, request_id } => {
490 if let Err(e) = self.handle_batch_amend_orders(args, request_id).await {
491 log::error!("Failed to handle batch amend orders command: error={e}");
492 }
493 }
494 }
495 continue;
497 }
498
499 () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
500 if self.signal.load(std::sync::atomic::Ordering::Acquire) {
501 log::debug!("Stop signal received during idle period");
502 return None;
503 }
504 continue;
505 }
506
507 msg = self.raw_rx.recv() => {
508 let event = match msg {
509 Some(msg) => match Self::parse_raw_message(msg) {
510 Some(event) => event,
511 None => continue,
512 },
513 None => {
514 log::debug!("WebSocket stream closed");
515 return None;
516 }
517 };
518
519 let ts_init = self.clock.get_time_ns();
520
521 match event {
522 OKXWsMessage::Ping => {
523 if let Err(e) = self.send_pong().await {
524 log::warn!("Failed to send pong response: error={e}");
525 }
526 continue;
527 }
528 OKXWsMessage::Login {
529 code, msg, conn_id, ..
530 } => {
531 if code == "0" {
532 self.auth_tracker.succeed();
533
534 return Some(NautilusWsMessage::Authenticated);
538 }
539
540 log::error!("WebSocket authentication failed: error={msg}");
541 self.auth_tracker.fail(msg.clone());
542
543 let error = OKXWebSocketError {
544 code,
545 message: msg,
546 conn_id: Some(conn_id),
547 timestamp: self.clock.get_time_ns().as_u64(),
548 };
549 self.pending_messages
550 .push_back(NautilusWsMessage::Error(error));
551 continue;
552 }
553 OKXWsMessage::BookData { arg, action, data } => {
554 if let Some(msg) = self.handle_book_data(arg, action, data, ts_init) {
555 return Some(msg);
556 }
557 continue;
558 }
559 OKXWsMessage::OrderResponse {
560 id,
561 op,
562 code,
563 msg,
564 data,
565 } => {
566 if let Some(msg) = self.handle_order_response(id, op, code, msg, data, ts_init) {
567 return Some(msg);
568 }
569 continue;
570 }
571 OKXWsMessage::Data { arg, data } => {
572 let OKXWebSocketArg {
573 channel, inst_id, ..
574 } = arg;
575
576 match channel {
577 OKXWsChannel::Account => {
578 if let Some(msg) = self.handle_account_data(data, ts_init) {
579 return Some(msg);
580 }
581 continue;
582 }
583 OKXWsChannel::Positions => {
584 self.handle_positions_data(data, ts_init);
585 continue;
586 }
587 OKXWsChannel::Orders => {
588 if let Some(msg) = self.handle_orders_data(data, ts_init) {
589 return Some(msg);
590 }
591 continue;
592 }
593 OKXWsChannel::OrdersAlgo => {
594 if let Some(msg) = self.handle_algo_orders_data(data, ts_init) {
595 return Some(msg);
596 }
597 continue;
598 }
599 _ => {
600 if let Some(msg) =
601 self.handle_other_channel_data(channel, inst_id, data, ts_init)
602 {
603 return Some(msg);
604 }
605 continue;
606 }
607 }
608 }
609 OKXWsMessage::Error { code, msg } => {
610 let error = OKXWebSocketError {
611 code,
612 message: msg,
613 conn_id: None,
614 timestamp: self.clock.get_time_ns().as_u64(),
615 };
616 return Some(NautilusWsMessage::Error(error));
617 }
618 OKXWsMessage::Reconnected => {
619 return Some(NautilusWsMessage::Reconnected);
620 }
621 OKXWsMessage::Subscription {
622 event,
623 arg,
624 code,
625 msg,
626 ..
627 } => {
628 let topic = topic_from_websocket_arg(&arg);
629 let success = code.as_deref().is_none_or(|c| c == "0");
630
631 match event {
632 OKXSubscriptionEvent::Subscribe => {
633 if success {
634 self.subscriptions_state.confirm_subscribe(&topic);
635 } else {
636 log::warn!("Subscription failed: topic={topic:?}, error={msg:?}, code={code:?}");
637 self.subscriptions_state.mark_failure(&topic);
638 }
639 }
640 OKXSubscriptionEvent::Unsubscribe => {
641 if success {
642 self.subscriptions_state.confirm_unsubscribe(&topic);
643 } else {
644 log::warn!("Unsubscription failed - restoring subscription: topic={topic:?}, error={msg:?}, code={code:?}");
645 self.subscriptions_state.confirm_unsubscribe(&topic); self.subscriptions_state.mark_subscribe(&topic); self.subscriptions_state.confirm_subscribe(&topic); }
650 }
651 }
652
653 continue;
654 }
655 OKXWsMessage::ChannelConnCount { .. } => continue,
656 }
657 }
658
659 else => {
661 log::debug!("Handler shutting down: stream ended or command channel closed");
662 return None;
663 }
664 }
665 }
666 }
667
668 pub(super) fn is_post_only_auto_cancel(msg: &OKXOrderMsg) -> bool {
669 if msg.state != OKXOrderStatus::Canceled {
670 return false;
671 }
672
673 let cancel_source_matches = matches!(
674 msg.cancel_source.as_deref(),
675 Some(source) if source == OKX_POST_ONLY_CANCEL_SOURCE
676 );
677
678 let reason_matches = matches!(
679 msg.cancel_source_reason.as_deref(),
680 Some(reason) if reason.contains("POST_ONLY")
681 );
682
683 if !(cancel_source_matches || reason_matches) {
684 return false;
685 }
686
687 msg.acc_fill_sz
688 .as_ref()
689 .is_none_or(|filled| filled == "0" || filled.is_empty())
690 }
691
692 fn try_handle_post_only_auto_cancel(
693 &mut self,
694 msg: &OKXOrderMsg,
695 ts_init: UnixNanos,
696 exec_reports: &mut Vec<ExecutionReport>,
697 ) -> bool {
698 if !Self::is_post_only_auto_cancel(msg) {
699 return false;
700 }
701
702 let Some(client_order_id) = parse_client_order_id(&msg.cl_ord_id) else {
703 return false;
704 };
705
706 let Some((_, (trader_id, strategy_id, instrument_id))) =
707 self.active_client_orders.remove(&client_order_id)
708 else {
709 return false;
710 };
711
712 self.client_id_aliases.remove(&client_order_id);
713
714 if !exec_reports.is_empty() {
715 let reports = std::mem::take(exec_reports);
716 self.pending_messages
717 .push_back(NautilusWsMessage::ExecutionReports(reports));
718 }
719
720 let reason = msg
721 .cancel_source_reason
722 .as_ref()
723 .filter(|reason| !reason.is_empty())
724 .map_or_else(
725 || Ustr::from(OKX_POST_ONLY_CANCEL_REASON),
726 |reason| Ustr::from(reason.as_str()),
727 );
728
729 let ts_event = parse_millisecond_timestamp(msg.u_time);
730 let rejected = OrderRejected::new(
731 trader_id,
732 strategy_id,
733 instrument_id,
734 client_order_id,
735 self.account_id,
736 reason,
737 UUID4::new(),
738 ts_event,
739 ts_init,
740 false,
741 true,
742 );
743
744 self.pending_messages
745 .push_back(NautilusWsMessage::OrderRejected(rejected));
746
747 true
748 }
749
750 fn register_client_order_aliases(
751 &self,
752 raw_child: &Option<ClientOrderId>,
753 parent_from_msg: &Option<ClientOrderId>,
754 ) -> Option<ClientOrderId> {
755 if let Some(parent) = parent_from_msg {
756 self.client_id_aliases.insert(*parent, *parent);
757 if let Some(child) = raw_child.as_ref().filter(|child| **child != *parent) {
758 self.client_id_aliases.insert(*child, *parent);
759 }
760 Some(*parent)
761 } else if let Some(child) = raw_child.as_ref() {
762 if let Some(mapped) = self.client_id_aliases.get(child) {
763 Some(*mapped.value())
764 } else {
765 self.client_id_aliases.insert(*child, *child);
766 Some(*child)
767 }
768 } else {
769 None
770 }
771 }
772
773 fn adjust_execution_report(
774 &self,
775 report: ExecutionReport,
776 effective_client_id: &Option<ClientOrderId>,
777 raw_child: &Option<ClientOrderId>,
778 ) -> ExecutionReport {
779 match report {
780 ExecutionReport::Order(status_report) => {
781 let mut adjusted = status_report;
782 let mut final_id = *effective_client_id;
783
784 if final_id.is_none() {
785 final_id = adjusted.client_order_id;
786 }
787
788 if final_id.is_none()
789 && let Some(child) = raw_child.as_ref()
790 && let Some(mapped) = self.client_id_aliases.get(child)
791 {
792 final_id = Some(*mapped.value());
793 }
794
795 if let Some(final_id_value) = final_id {
796 if adjusted.client_order_id != Some(final_id_value) {
797 adjusted = adjusted.with_client_order_id(final_id_value);
798 }
799 self.client_id_aliases
800 .insert(final_id_value, final_id_value);
801
802 if let Some(child) =
803 raw_child.as_ref().filter(|child| **child != final_id_value)
804 {
805 adjusted = adjusted.with_linked_order_ids(vec![*child]);
806 }
807 }
808
809 ExecutionReport::Order(adjusted)
810 }
811 ExecutionReport::Fill(mut fill_report) => {
812 let mut final_id = *effective_client_id;
813 if final_id.is_none() {
814 final_id = fill_report.client_order_id;
815 }
816 if final_id.is_none()
817 && let Some(child) = raw_child.as_ref()
818 && let Some(mapped) = self.client_id_aliases.get(child)
819 {
820 final_id = Some(*mapped.value());
821 }
822
823 if let Some(final_id_value) = final_id {
824 fill_report.client_order_id = Some(final_id_value);
825 self.client_id_aliases
826 .insert(final_id_value, final_id_value);
827 }
828
829 ExecutionReport::Fill(fill_report)
830 }
831 }
832 }
833
834 fn update_caches_with_report(&mut self, report: &ExecutionReport) {
835 match report {
836 ExecutionReport::Fill(fill_report) => {
837 let order_id = fill_report.venue_order_id.inner();
838 let current_fee = self
839 .fee_cache
840 .get(&order_id)
841 .copied()
842 .unwrap_or_else(|| Money::new(0.0, fill_report.commission.currency));
843 let total_fee = current_fee + fill_report.commission;
844 self.fee_cache.insert(order_id, total_fee);
845
846 let current_filled_qty = self
847 .filled_qty_cache
848 .get(&order_id)
849 .copied()
850 .unwrap_or_else(|| Quantity::zero(fill_report.last_qty.precision));
851 let total_filled_qty = current_filled_qty + fill_report.last_qty;
852 self.filled_qty_cache.insert(order_id, total_filled_qty);
853 }
854 ExecutionReport::Order(status_report) => {
855 if matches!(status_report.order_status, OrderStatus::Filled) {
856 self.fee_cache.remove(&status_report.venue_order_id.inner());
857 self.filled_qty_cache
858 .remove(&status_report.venue_order_id.inner());
859 }
860
861 if matches!(
862 status_report.order_status,
863 OrderStatus::Canceled
864 | OrderStatus::Expired
865 | OrderStatus::Filled
866 | OrderStatus::Rejected,
867 ) {
868 self.emitted_accepted.remove(&status_report.venue_order_id);
869 if let Some(client_order_id) = status_report.client_order_id {
870 self.order_state_cache.remove(&client_order_id);
871 self.active_client_orders.remove(&client_order_id);
872 self.client_id_aliases.remove(&client_order_id);
873 }
874 if let Some(linked) = &status_report.linked_order_ids {
875 for child in linked {
876 self.client_id_aliases.remove(child);
877 }
878 }
879 }
880 }
881 }
882 }
883
884 #[allow(clippy::too_many_lines)]
885 fn handle_order_response(
886 &mut self,
887 id: Option<String>,
888 op: OKXWsOperation,
889 code: String,
890 msg: String,
891 data: Vec<Value>,
892 ts_init: UnixNanos,
893 ) -> Option<NautilusWsMessage> {
894 if code == "0" {
895 log::debug!("Order operation successful: id={id:?} op={op} code={code}");
896
897 if op == OKXWsOperation::BatchCancelOrders {
898 log::debug!(
899 "Batch cancel operation successful: id={id:?} cancel_count={}",
900 data.len()
901 );
902
903 for (idx, entry) in data.iter().enumerate() {
905 if let Some(entry_code) = entry.get("sCode").and_then(|v| v.as_str())
906 && entry_code != "0"
907 {
908 let entry_msg = entry
909 .get("sMsg")
910 .and_then(|v| v.as_str())
911 .unwrap_or("Unknown error");
912
913 if let Some(cl_ord_id_str) = entry
914 .get("clOrdId")
915 .and_then(|v| v.as_str())
916 .filter(|s| !s.is_empty())
917 {
918 log::error!(
919 "Batch cancel partial failure for order {cl_ord_id_str}: sCode={entry_code} sMsg={entry_msg}"
920 );
921 } else {
923 log::error!(
924 "Batch cancel entry[{idx}] failed: sCode={entry_code} sMsg={entry_msg} data={entry:?}"
925 );
926 }
927 }
928 }
929
930 return None;
931 } else if op == OKXWsOperation::MassCancel
932 && let Some(request_id) = &id
933 && let Some(instrument_id) = self.pending_mass_cancel_requests.remove(request_id)
934 {
935 log::debug!("Mass cancel operation successful for instrument: {instrument_id}");
936 } else if op == OKXWsOperation::Order
937 && let Some(request_id) = &id
938 && let Some((params, client_order_id, trader_id, strategy_id, instrument_id)) =
939 self.pending_place_requests.remove(request_id)
940 {
941 let (venue_order_id, ts_accepted) = if let Some(first) = data.first() {
942 let ord_id = first
943 .get("ordId")
944 .and_then(|v| v.as_str())
945 .filter(|s| !s.is_empty())
946 .map(VenueOrderId::new);
947
948 let ts = first
949 .get("ts")
950 .and_then(|v| v.as_str())
951 .and_then(|s| s.parse::<u64>().ok())
952 .map_or_else(
953 || self.clock.get_time_ns(),
954 |ms| UnixNanos::from(ms * 1_000_000),
955 );
956
957 (ord_id, ts)
958 } else {
959 (None, self.clock.get_time_ns())
960 };
961
962 if let Some(instrument) = self.instruments_cache.get(&instrument_id.symbol.inner())
963 {
964 match params {
965 PendingOrderParams::Regular(order_params) => {
966 let order_type = determine_order_type(
967 order_params.ord_type,
968 order_params.px.as_deref().unwrap_or(""),
969 );
970
971 let is_explicit_quote_sized = order_params
972 .tgt_ccy
973 .is_some_and(|tgt| tgt == OKXTargetCurrency::QuoteCcy);
974
975 let is_implicit_quote_sized = order_params.tgt_ccy.is_none()
977 && order_params.side == OKXSide::Buy
978 && order_type == OrderType::Market
979 && order_params.td_mode == OKXTradeMode::Cash
980 && instrument.instrument_class().as_ref() == "SPOT";
981
982 if is_explicit_quote_sized || is_implicit_quote_sized {
983 log::debug!(
988 "Skipping synthetic OrderAccepted for {} quote-sized order: client_order_id={client_order_id}, venue_order_id={venue_order_id:?}",
989 if is_explicit_quote_sized {
990 "explicit"
991 } else {
992 "implicit"
993 },
994 );
995 return None;
996 }
997
998 let Some(v_order_id) = venue_order_id else {
999 log::error!(
1000 "No venue_order_id for accepted order: client_order_id={client_order_id}"
1001 );
1002 return None;
1003 };
1004
1005 if self.emitted_accepted.contains(&v_order_id) {
1007 log::debug!(
1008 "Skipping duplicate OrderAccepted from operation response for venue_order_id={v_order_id}"
1009 );
1010 return None;
1011 }
1012 self.emitted_accepted.add(v_order_id);
1013
1014 let accepted = OrderAccepted::new(
1015 trader_id,
1016 strategy_id,
1017 instrument_id,
1018 client_order_id,
1019 v_order_id,
1020 self.account_id,
1021 UUID4::new(),
1022 ts_accepted,
1023 ts_init,
1024 false, );
1026
1027 log::debug!(
1028 "Order accepted: client_order_id={client_order_id}, venue_order_id={v_order_id}"
1029 );
1030
1031 return Some(NautilusWsMessage::OrderAccepted(accepted));
1032 }
1033 PendingOrderParams::Algo(_) => {
1034 log::debug!(
1035 "Algo order placement confirmed: client_order_id={client_order_id}, venue_order_id={venue_order_id:?}"
1036 );
1037 }
1038 }
1039 } else {
1040 log::error!("Instrument not found for accepted order: {instrument_id}");
1041 }
1042 }
1043
1044 if let Some(first) = data.first()
1045 && let Some(success_msg) = first.get("sMsg").and_then(|value| value.as_str())
1046 {
1047 log::debug!("Order details: {success_msg}");
1048 }
1049
1050 return None;
1051 }
1052
1053 let error_msg = data
1054 .first()
1055 .and_then(|d| d.get("sMsg"))
1056 .and_then(|s| s.as_str())
1057 .unwrap_or(&msg)
1058 .to_string();
1059
1060 if let Some(first) = data.first() {
1061 log::debug!(
1062 "Error data fields: {}",
1063 serde_json::to_string_pretty(first)
1064 .unwrap_or_else(|_| "unable to serialize".to_string())
1065 );
1066 }
1067
1068 log::warn!("Order operation failed: id={id:?} op={op} code={code} msg={error_msg}");
1069
1070 let ts_event = self.clock.get_time_ns();
1071
1072 if let Some(request_id) = &id {
1073 match op {
1074 OKXWsOperation::Order => {
1075 if let Some((_params, client_order_id, trader_id, strategy_id, instrument_id)) =
1076 self.pending_place_requests.remove(request_id)
1077 {
1078 let due_post_only = is_post_only_rejection(code.as_str(), &data);
1079 let rejected = OrderRejected::new(
1080 trader_id,
1081 strategy_id,
1082 instrument_id,
1083 client_order_id,
1084 self.account_id,
1085 Ustr::from(error_msg.as_str()),
1086 UUID4::new(),
1087 ts_event,
1088 ts_init,
1089 false, due_post_only,
1091 );
1092
1093 return Some(NautilusWsMessage::OrderRejected(rejected));
1094 }
1095 }
1096 OKXWsOperation::CancelOrder => {
1097 if let Some((
1098 client_order_id,
1099 trader_id,
1100 strategy_id,
1101 instrument_id,
1102 venue_order_id,
1103 )) = self.pending_cancel_requests.remove(request_id)
1104 {
1105 let rejected = OrderCancelRejected::new(
1106 trader_id,
1107 strategy_id,
1108 instrument_id,
1109 client_order_id,
1110 Ustr::from(error_msg.as_str()),
1111 UUID4::new(),
1112 ts_event,
1113 ts_init,
1114 false, venue_order_id,
1116 Some(self.account_id),
1117 );
1118
1119 return Some(NautilusWsMessage::OrderCancelRejected(rejected));
1120 }
1121 }
1122 OKXWsOperation::AmendOrder => {
1123 if let Some((
1124 client_order_id,
1125 trader_id,
1126 strategy_id,
1127 instrument_id,
1128 venue_order_id,
1129 )) = self.pending_amend_requests.remove(request_id)
1130 {
1131 let rejected = OrderModifyRejected::new(
1132 trader_id,
1133 strategy_id,
1134 instrument_id,
1135 client_order_id,
1136 Ustr::from(error_msg.as_str()),
1137 UUID4::new(),
1138 ts_event,
1139 ts_init,
1140 false, venue_order_id,
1142 Some(self.account_id),
1143 );
1144
1145 return Some(NautilusWsMessage::OrderModifyRejected(rejected));
1146 }
1147 }
1148 OKXWsOperation::OrderAlgo => {
1149 if let Some((_params, client_order_id, trader_id, strategy_id, instrument_id)) =
1150 self.pending_place_requests.remove(request_id)
1151 {
1152 let due_post_only = is_post_only_rejection(code.as_str(), &data);
1153 let rejected = OrderRejected::new(
1154 trader_id,
1155 strategy_id,
1156 instrument_id,
1157 client_order_id,
1158 self.account_id,
1159 Ustr::from(error_msg.as_str()),
1160 UUID4::new(),
1161 ts_event,
1162 ts_init,
1163 false, due_post_only,
1165 );
1166
1167 return Some(NautilusWsMessage::OrderRejected(rejected));
1168 }
1169 }
1170 OKXWsOperation::CancelAlgos => {
1171 if let Some((
1172 client_order_id,
1173 trader_id,
1174 strategy_id,
1175 instrument_id,
1176 venue_order_id,
1177 )) = self.pending_cancel_requests.remove(request_id)
1178 {
1179 let rejected = OrderCancelRejected::new(
1180 trader_id,
1181 strategy_id,
1182 instrument_id,
1183 client_order_id,
1184 Ustr::from(error_msg.as_str()),
1185 UUID4::new(),
1186 ts_event,
1187 ts_init,
1188 false, venue_order_id,
1190 Some(self.account_id),
1191 );
1192
1193 return Some(NautilusWsMessage::OrderCancelRejected(rejected));
1194 }
1195 }
1196 OKXWsOperation::MassCancel => {
1197 if let Some(instrument_id) =
1198 self.pending_mass_cancel_requests.remove(request_id)
1199 {
1200 log::error!(
1201 "Mass cancel operation failed for {instrument_id}: code={code} msg={error_msg}"
1202 );
1203 let error = OKXWebSocketError {
1204 code,
1205 message: format!("Mass cancel failed for {instrument_id}: {error_msg}"),
1206 conn_id: None,
1207 timestamp: ts_event.as_u64(),
1208 };
1209 return Some(NautilusWsMessage::Error(error));
1210 } else {
1211 log::error!("Mass cancel operation failed: code={code} msg={error_msg}");
1212 }
1213 }
1214 OKXWsOperation::BatchCancelOrders => {
1215 log::warn!(
1216 "Batch cancel operation failed: id={id:?} code={code} msg={error_msg} data_count={}",
1217 data.len()
1218 );
1219
1220 for (idx, entry) in data.iter().enumerate() {
1222 let entry_code =
1223 entry.get("sCode").and_then(|v| v.as_str()).unwrap_or(&code);
1224 let entry_msg = entry
1225 .get("sMsg")
1226 .and_then(|v| v.as_str())
1227 .unwrap_or(&error_msg);
1228
1229 if entry_code != "0" {
1230 if let Some(cl_ord_id_str) = entry
1232 .get("clOrdId")
1233 .and_then(|v| v.as_str())
1234 .filter(|s| !s.is_empty())
1235 {
1236 log::error!(
1237 "Batch cancel failed for order {cl_ord_id_str}: sCode={entry_code} sMsg={entry_msg}"
1238 );
1239 } else {
1242 log::error!(
1243 "Batch cancel entry[{idx}] failed: sCode={entry_code} sMsg={entry_msg} data={entry:?}"
1244 );
1245 }
1246 }
1247 }
1248
1249 let error = OKXWebSocketError {
1251 code,
1252 message: format!("Batch cancel failed: {error_msg}"),
1253 conn_id: None,
1254 timestamp: ts_event.as_u64(),
1255 };
1256 return Some(NautilusWsMessage::Error(error));
1257 }
1258 _ => log::warn!("Unhandled operation type for rejection: {op}"),
1259 }
1260 }
1261
1262 let error = OKXWebSocketError {
1263 code,
1264 message: error_msg,
1265 conn_id: None,
1266 timestamp: ts_event.as_u64(),
1267 };
1268 Some(NautilusWsMessage::Error(error))
1269 }
1270
1271 fn handle_book_data(
1272 &self,
1273 arg: OKXWebSocketArg,
1274 action: OKXBookAction,
1275 data: Vec<OKXBookMsg>,
1276 ts_init: UnixNanos,
1277 ) -> Option<NautilusWsMessage> {
1278 let Some(inst_id) = arg.inst_id else {
1279 log::error!("Instrument ID missing for book data event");
1280 return None;
1281 };
1282
1283 let inst = self.instruments_cache.get(&inst_id)?;
1284
1285 let instrument_id = inst.id();
1286 let price_precision = inst.price_precision();
1287 let size_precision = inst.size_precision();
1288
1289 match parse_book_msg_vec(
1290 data,
1291 &instrument_id,
1292 price_precision,
1293 size_precision,
1294 action,
1295 ts_init,
1296 ) {
1297 Ok(payloads) => Some(NautilusWsMessage::Data(payloads)),
1298 Err(e) => {
1299 log::error!("Failed to parse book message: {e}");
1300 None
1301 }
1302 }
1303 }
1304
1305 fn handle_account_data(
1306 &mut self,
1307 data: Value,
1308 ts_init: UnixNanos,
1309 ) -> Option<NautilusWsMessage> {
1310 let Value::Array(arr) = data else {
1311 log::error!("Account data is not an array");
1312 return None;
1313 };
1314
1315 let first = arr.into_iter().next()?;
1316
1317 let account: OKXAccount = match serde_json::from_value(first) {
1318 Ok(acc) => acc,
1319 Err(e) => {
1320 log::error!("Failed to parse account data: {e}");
1321 return None;
1322 }
1323 };
1324
1325 match parse_account_state(&account, self.account_id, ts_init) {
1326 Ok(account_state) => {
1327 if let Some(last_account_state) = &self.last_account_state
1328 && account_state.has_same_balances_and_margins(last_account_state)
1329 {
1330 return None;
1331 }
1332 self.last_account_state = Some(account_state.clone());
1333 Some(NautilusWsMessage::AccountUpdate(account_state))
1334 }
1335 Err(e) => {
1336 log::error!("Failed to parse account state: {e}");
1337 None
1338 }
1339 }
1340 }
1341
1342 fn handle_positions_data(&mut self, data: Value, ts_init: UnixNanos) {
1343 match serde_json::from_value::<Vec<OKXPosition>>(data) {
1344 Ok(positions) => {
1345 log::debug!("Received {} position update(s)", positions.len());
1346
1347 for position in positions {
1348 let instrument = match self.instruments_cache.get(&position.inst_id) {
1349 Some(inst) => inst,
1350 None => {
1351 log::warn!(
1352 "Received position update for unknown instrument {}, skipping",
1353 position.inst_id
1354 );
1355 continue;
1356 }
1357 };
1358
1359 let instrument_id = instrument.id();
1360 let size_precision = instrument.size_precision();
1361
1362 match parse_position_status_report(
1363 position,
1364 self.account_id,
1365 instrument_id,
1366 size_precision,
1367 ts_init,
1368 ) {
1369 Ok(position_report) => {
1370 self.pending_messages
1371 .push_back(NautilusWsMessage::PositionUpdate(position_report));
1372 }
1373 Err(e) => {
1374 log::error!(
1375 "Failed to parse position status report for {instrument_id}: {e}"
1376 );
1377 }
1378 }
1379 }
1380 }
1381 Err(e) => {
1382 log::error!("Failed to parse positions data: {e}");
1383 }
1384 }
1385 }
1386
1387 fn handle_orders_data(&mut self, data: Value, ts_init: UnixNanos) -> Option<NautilusWsMessage> {
1388 let orders: Vec<OKXOrderMsg> = match serde_json::from_value(data) {
1389 Ok(orders) => orders,
1390 Err(e) => {
1391 log::error!("Failed to deserialize orders channel payload: {e}");
1392 return None;
1393 }
1394 };
1395
1396 log::debug!(
1397 "Received {} order message(s) from orders channel",
1398 orders.len()
1399 );
1400
1401 let mut exec_reports: Vec<ExecutionReport> = Vec::with_capacity(orders.len());
1402
1403 for msg in orders {
1404 log::debug!(
1405 "Processing order message: inst_id={}, cl_ord_id={}, state={:?}, exec_type={:?}",
1406 msg.inst_id,
1407 msg.cl_ord_id,
1408 msg.state,
1409 msg.exec_type
1410 );
1411
1412 if self.try_handle_post_only_auto_cancel(&msg, ts_init, &mut exec_reports) {
1413 continue;
1414 }
1415
1416 let raw_child = parse_client_order_id(&msg.cl_ord_id);
1417 let parent_from_msg = msg
1418 .algo_cl_ord_id
1419 .as_ref()
1420 .filter(|value| !value.is_empty())
1421 .map(ClientOrderId::new);
1422 let effective_client_id =
1423 self.register_client_order_aliases(&raw_child, &parent_from_msg);
1424
1425 let Some(instrument) = self.instruments_cache.get(&msg.inst_id) else {
1426 log::error!(
1427 "No instrument found for inst_id: {inst_id}",
1428 inst_id = msg.inst_id
1429 );
1430 continue;
1431 };
1432 let price_precision = instrument.price_precision();
1433 let size_precision = instrument.size_precision();
1434
1435 let order_metadata = effective_client_id
1436 .and_then(|cid| self.active_client_orders.get(&cid).map(|e| *e.value()));
1437
1438 let previous_fee = self.fee_cache.get(&msg.ord_id).copied();
1439 let previous_filled_qty = self.filled_qty_cache.get(&msg.ord_id).copied();
1440 let previous_state =
1441 effective_client_id.and_then(|cid| self.order_state_cache.get(&cid).cloned());
1442
1443 if let (Some((trader_id, strategy_id, _instrument_id)), Some(canonical_client_id)) =
1445 (order_metadata, effective_client_id)
1446 {
1447 match parse_order_event(
1448 &msg,
1449 canonical_client_id,
1450 self.account_id,
1451 trader_id,
1452 strategy_id,
1453 instrument,
1454 previous_fee,
1455 previous_filled_qty,
1456 previous_state.as_ref(),
1457 ts_init,
1458 ) {
1459 Ok(event) => {
1460 self.process_parsed_order_event(
1461 event,
1462 &msg,
1463 price_precision,
1464 size_precision,
1465 canonical_client_id,
1466 &raw_child,
1467 &mut exec_reports,
1468 );
1469 }
1470 Err(e) => log::error!("Failed to parse order event: {e}"),
1471 }
1472 } else {
1473 match parse_order_msg(
1475 &msg,
1476 self.account_id,
1477 &self.instruments_cache,
1478 &self.fee_cache,
1479 &self.filled_qty_cache,
1480 ts_init,
1481 ) {
1482 Ok(report) => {
1483 log::debug!("Parsed external order as execution report: {report:?}");
1484 let adjusted =
1485 self.adjust_execution_report(report, &effective_client_id, &raw_child);
1486 self.update_caches_with_report(&adjusted);
1487 exec_reports.push(adjusted);
1488 }
1489 Err(e) => log::error!("Failed to parse order message: {e}"),
1490 }
1491 }
1492 }
1493
1494 if !exec_reports.is_empty() {
1495 log::debug!(
1496 "Pushing {count} execution report(s) to message queue",
1497 count = exec_reports.len()
1498 );
1499 self.pending_messages
1500 .push_back(NautilusWsMessage::ExecutionReports(exec_reports));
1501 }
1502
1503 self.pending_messages.pop_front()
1504 }
1505
1506 #[allow(clippy::too_many_arguments)]
1508 fn process_parsed_order_event(
1509 &mut self,
1510 event: ParsedOrderEvent,
1511 msg: &OKXOrderMsg,
1512 price_precision: u8,
1513 size_precision: u8,
1514 canonical_client_id: ClientOrderId,
1515 raw_child: &Option<ClientOrderId>,
1516 exec_reports: &mut Vec<ExecutionReport>,
1517 ) {
1518 let venue_order_id = VenueOrderId::new(msg.ord_id);
1519
1520 match event {
1521 ParsedOrderEvent::Accepted(accepted) => {
1522 if self.emitted_accepted.contains(&venue_order_id) {
1523 log::debug!(
1524 "Skipping duplicate OrderAccepted for venue_order_id={venue_order_id}"
1525 );
1526 return;
1527 }
1528 self.emitted_accepted.add(venue_order_id);
1529 self.update_order_state_cache(
1530 &canonical_client_id,
1531 msg,
1532 price_precision,
1533 size_precision,
1534 );
1535
1536 self.pending_messages
1537 .push_back(NautilusWsMessage::OrderAccepted(accepted));
1538 }
1539 ParsedOrderEvent::Canceled(canceled) => {
1540 self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1541 self.pending_messages
1542 .push_back(NautilusWsMessage::OrderCanceled(canceled));
1543 }
1544 ParsedOrderEvent::Expired(expired) => {
1545 self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1546 self.pending_messages
1547 .push_back(NautilusWsMessage::OrderExpired(expired));
1548 }
1549 ParsedOrderEvent::Triggered(triggered) => {
1550 self.update_order_state_cache(
1551 &canonical_client_id,
1552 msg,
1553 price_precision,
1554 size_precision,
1555 );
1556 self.pending_messages
1557 .push_back(NautilusWsMessage::OrderTriggered(triggered));
1558 }
1559 ParsedOrderEvent::Updated(updated) => {
1560 self.update_order_state_cache(
1561 &canonical_client_id,
1562 msg,
1563 price_precision,
1564 size_precision,
1565 );
1566 self.pending_messages
1567 .push_back(NautilusWsMessage::OrderUpdated(updated));
1568 }
1569 ParsedOrderEvent::Fill(fill_report) => {
1570 let effective_client_id = Some(canonical_client_id);
1571 let adjusted = self.adjust_execution_report(
1572 ExecutionReport::Fill(fill_report),
1573 &effective_client_id,
1574 raw_child,
1575 );
1576 self.update_caches_with_report(&adjusted);
1577
1578 if msg.state == OKXOrderStatus::Filled {
1579 self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1580 }
1581
1582 exec_reports.push(adjusted);
1583 }
1584 ParsedOrderEvent::StatusOnly(status_report) => {
1585 let effective_client_id = Some(canonical_client_id);
1586 let adjusted = self.adjust_execution_report(
1587 ExecutionReport::Order(*status_report),
1588 &effective_client_id,
1589 raw_child,
1590 );
1591 self.update_caches_with_report(&adjusted);
1592 exec_reports.push(adjusted);
1593 }
1594 }
1595 }
1596
1597 fn update_order_state_cache(
1599 &mut self,
1600 client_order_id: &ClientOrderId,
1601 msg: &OKXOrderMsg,
1602 price_precision: u8,
1603 size_precision: u8,
1604 ) {
1605 let venue_order_id = VenueOrderId::new(msg.ord_id);
1606 let quantity = parse_quantity(&msg.sz, size_precision).ok();
1607 let price = if is_market_price(&msg.px) {
1608 None
1609 } else {
1610 parse_price(&msg.px, price_precision).ok()
1611 };
1612
1613 if let Some(qty) = quantity {
1614 self.order_state_cache.insert(
1615 *client_order_id,
1616 OrderStateSnapshot {
1617 venue_order_id,
1618 quantity: qty,
1619 price,
1620 },
1621 );
1622 }
1623 }
1624
1625 fn cleanup_terminal_order(
1627 &mut self,
1628 client_order_id: &ClientOrderId,
1629 venue_order_id: &VenueOrderId,
1630 ) {
1631 self.emitted_accepted.remove(venue_order_id);
1632 self.order_state_cache.remove(client_order_id);
1633 self.active_client_orders.remove(client_order_id);
1634 self.client_id_aliases.remove(client_order_id);
1635 self.client_id_aliases.retain(|_, v| *v != *client_order_id);
1636
1637 self.fee_cache.remove(&venue_order_id.inner());
1638 self.filled_qty_cache.remove(&venue_order_id.inner());
1639 }
1640
1641 fn handle_algo_orders_data(
1642 &mut self,
1643 data: Value,
1644 ts_init: UnixNanos,
1645 ) -> Option<NautilusWsMessage> {
1646 let orders: Vec<OKXAlgoOrderMsg> = match serde_json::from_value(data) {
1647 Ok(orders) => orders,
1648 Err(e) => {
1649 log::error!("Failed to deserialize algo orders payload: {e}");
1650 return None;
1651 }
1652 };
1653
1654 let mut exec_reports: Vec<ExecutionReport> = Vec::with_capacity(orders.len());
1655
1656 for msg in orders {
1657 let raw_child = parse_client_order_id(&msg.cl_ord_id);
1658 let parent_from_msg = parse_client_order_id(&msg.algo_cl_ord_id);
1659 let effective_client_id =
1660 self.register_client_order_aliases(&raw_child, &parent_from_msg);
1661
1662 match parse_algo_order_msg(msg, self.account_id, &self.instruments_cache, ts_init) {
1663 Ok(report) => {
1664 let adjusted =
1665 self.adjust_execution_report(report, &effective_client_id, &raw_child);
1666 self.update_caches_with_report(&adjusted);
1667 exec_reports.push(adjusted);
1668 }
1669 Err(e) => {
1670 log::error!("Failed to parse algo order message: {e}");
1671 }
1672 }
1673 }
1674
1675 if exec_reports.is_empty() {
1676 None
1677 } else {
1678 Some(NautilusWsMessage::ExecutionReports(exec_reports))
1679 }
1680 }
1681
1682 fn handle_other_channel_data(
1683 &mut self,
1684 channel: OKXWsChannel,
1685 inst_id: Option<Ustr>,
1686 data: Value,
1687 ts_init: UnixNanos,
1688 ) -> Option<NautilusWsMessage> {
1689 let Some(inst_id) = inst_id else {
1690 log::error!("No instrument for channel {channel:?}");
1691 return None;
1692 };
1693
1694 let Some(instrument) = self.instruments_cache.get(&inst_id) else {
1695 log::error!("No instrument for channel {channel:?}, inst_id {inst_id:?}");
1696 return None;
1697 };
1698
1699 let instrument_id = instrument.id();
1700 let price_precision = instrument.price_precision();
1701 let size_precision = instrument.size_precision();
1702
1703 match parse_ws_message_data(
1704 &channel,
1705 data,
1706 &instrument_id,
1707 price_precision,
1708 size_precision,
1709 ts_init,
1710 &mut self.funding_rate_cache,
1711 &self.instruments_cache,
1712 ) {
1713 Ok(Some(msg)) => {
1714 if let NautilusWsMessage::Instrument(ref inst) = msg {
1715 self.instruments_cache
1716 .insert(inst.symbol().inner(), inst.as_ref().clone());
1717 }
1718 Some(msg)
1719 }
1720 Ok(None) => None,
1721 Err(e) => {
1722 log::error!("Error parsing message for channel {channel:?}: {e}");
1723 None
1724 }
1725 }
1726 }
1727
1728 pub(crate) fn parse_raw_message(
1729 msg: tokio_tungstenite::tungstenite::Message,
1730 ) -> Option<OKXWsMessage> {
1731 match msg {
1732 tokio_tungstenite::tungstenite::Message::Text(text) => {
1733 if text == TEXT_PONG {
1734 log::trace!("Received pong from OKX");
1735 return None;
1736 }
1737 if text == TEXT_PING {
1738 log::trace!("Received ping from OKX (text)");
1739 return Some(OKXWsMessage::Ping);
1740 }
1741
1742 if text == RECONNECTED {
1743 log::debug!("Received WebSocket reconnection signal");
1744 return Some(OKXWsMessage::Reconnected);
1745 }
1746 log::trace!("Received WebSocket message: {text}");
1747
1748 match serde_json::from_str(&text) {
1749 Ok(ws_event) => match &ws_event {
1750 OKXWsMessage::Error { code, msg } => {
1751 log::error!("WebSocket error: {code} - {msg}");
1752 Some(ws_event)
1753 }
1754 OKXWsMessage::Login {
1755 event,
1756 code,
1757 msg,
1758 conn_id,
1759 } => {
1760 if code == "0" {
1761 log::info!("WebSocket authenticated: conn_id={conn_id}");
1762 } else {
1763 log::error!(
1764 "WebSocket authentication failed: event={event}, code={code}, error={msg}"
1765 );
1766 }
1767 Some(ws_event)
1768 }
1769 OKXWsMessage::Subscription {
1770 event,
1771 arg,
1772 conn_id,
1773 ..
1774 } => {
1775 let channel_str = serde_json::to_string(&arg.channel)
1776 .expect("Invalid OKX websocket channel")
1777 .trim_matches('"')
1778 .to_string();
1779 log::debug!("{event}d: channel={channel_str}, conn_id={conn_id}");
1780 Some(ws_event)
1781 }
1782 OKXWsMessage::ChannelConnCount {
1783 event: _,
1784 channel,
1785 conn_count,
1786 conn_id,
1787 } => {
1788 let channel_str = serde_json::to_string(&channel)
1789 .expect("Invalid OKX websocket channel")
1790 .trim_matches('"')
1791 .to_string();
1792 log::debug!(
1793 "Channel connection status: channel={channel_str}, connections={conn_count}, conn_id={conn_id}",
1794 );
1795 None
1796 }
1797 OKXWsMessage::Ping => {
1798 log::trace!("Ignoring ping event parsed from text payload");
1799 None
1800 }
1801 OKXWsMessage::Data { .. } => Some(ws_event),
1802 OKXWsMessage::BookData { .. } => Some(ws_event),
1803 OKXWsMessage::OrderResponse {
1804 id,
1805 op,
1806 code,
1807 msg: _,
1808 data,
1809 } => {
1810 if code == "0" {
1811 log::debug!(
1812 "Order operation successful: id={id:?}, op={op}, code={code}"
1813 );
1814
1815 if let Some(order_data) = data.first() {
1816 let success_msg = order_data
1817 .get("sMsg")
1818 .and_then(|s| s.as_str())
1819 .unwrap_or("Order operation successful");
1820 log::debug!("Order success details: {success_msg}");
1821 }
1822 }
1823 Some(ws_event)
1824 }
1825 OKXWsMessage::Reconnected => {
1826 log::warn!("Unexpected Reconnected event from deserialization");
1828 None
1829 }
1830 },
1831 Err(e) => {
1832 log::error!("Failed to parse message: {e}: {text}");
1833 None
1834 }
1835 }
1836 }
1837 Message::Ping(_payload) => {
1838 log::trace!("Received binary ping frame from OKX");
1839 Some(OKXWsMessage::Ping)
1840 }
1841 Message::Pong(payload) => {
1842 log::trace!("Received pong frame from OKX ({} bytes)", payload.len());
1843 None
1844 }
1845 Message::Binary(msg) => {
1846 log::debug!("Raw binary: {msg:?}");
1847 None
1848 }
1849 Message::Close(_) => {
1850 log::debug!("Received close message");
1851 None
1852 }
1853 msg => {
1854 log::warn!("Unexpected message: {msg}");
1855 None
1856 }
1857 }
1858 }
1859
1860 fn generate_unique_request_id(&self) -> String {
1861 self.request_id_counter
1862 .fetch_add(1, Ordering::SeqCst)
1863 .to_string()
1864 }
1865
1866 fn get_instrument_type_and_family_from_instrument(
1867 instrument: &InstrumentAny,
1868 ) -> anyhow::Result<(OKXInstrumentType, String)> {
1869 let inst_type = okx_instrument_type(instrument)?;
1870 let symbol = instrument.symbol().inner();
1871
1872 let inst_family = match instrument {
1874 InstrumentAny::CurrencyPair(_) => symbol.as_str().to_string(),
1875 InstrumentAny::CryptoPerpetual(_) => {
1876 symbol
1878 .as_str()
1879 .strip_suffix("-SWAP")
1880 .unwrap_or(symbol.as_str())
1881 .to_string()
1882 }
1883 InstrumentAny::CryptoFuture(_) => {
1884 let s = symbol.as_str();
1887 if let Some(idx) = s.rfind('-') {
1888 s[..idx].to_string()
1889 } else {
1890 s.to_string()
1891 }
1892 }
1893 _ => {
1894 anyhow::bail!("Unsupported instrument type for OKX");
1895 }
1896 };
1897
1898 Ok((inst_type, inst_family))
1899 }
1900
1901 async fn handle_mass_cancel(&mut self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1902 let instrument = self
1903 .instruments_cache
1904 .get(&instrument_id.symbol.inner())
1905 .ok_or_else(|| anyhow::anyhow!("Unknown instrument {instrument_id}"))?;
1906
1907 let (inst_type, inst_family) =
1908 Self::get_instrument_type_and_family_from_instrument(instrument)?;
1909
1910 let params = WsMassCancelParams {
1911 inst_type,
1912 inst_family: Ustr::from(&inst_family),
1913 };
1914
1915 let args =
1916 vec![serde_json::to_value(params).map_err(|e| anyhow::anyhow!("JSON error: {e}"))?];
1917
1918 let request_id = self.generate_unique_request_id();
1919
1920 self.pending_mass_cancel_requests
1921 .insert(request_id.clone(), instrument_id);
1922
1923 let request = OKXWsRequest {
1924 id: Some(request_id.clone()),
1925 op: OKXWsOperation::MassCancel,
1926 exp_time: None,
1927 args,
1928 };
1929
1930 let payload = serde_json::to_string(&request)
1931 .map_err(|e| anyhow::anyhow!("Failed to serialize mass cancel request: {e}"))?;
1932
1933 match self
1934 .send_with_retry(payload, Some(OKX_RATE_LIMIT_KEY_CANCEL.as_slice()))
1935 .await
1936 {
1937 Ok(()) => {
1938 log::debug!("Sent mass cancel for {instrument_id}");
1939 Ok(())
1940 }
1941 Err(e) => {
1942 log::error!("Failed to send mass cancel after retries: error={e}");
1943
1944 self.pending_mass_cancel_requests.remove(&request_id);
1945
1946 let error = OKXWebSocketError {
1947 code: "CLIENT_ERROR".to_string(),
1948 message: format!("Mass cancel failed for {instrument_id}: {e}"),
1949 conn_id: None,
1950 timestamp: self.clock.get_time_ns().as_u64(),
1951 };
1952 let _ = self.send(NautilusWsMessage::Error(error));
1953
1954 Err(anyhow::anyhow!("Failed to send mass cancel: {e}"))
1955 }
1956 }
1957 }
1958
1959 async fn handle_batch_cancel_orders(
1960 &self,
1961 args: Vec<Value>,
1962 request_id: String,
1963 ) -> anyhow::Result<()> {
1964 let request = OKXWsRequest {
1965 id: Some(request_id),
1966 op: OKXWsOperation::BatchCancelOrders,
1967 exp_time: None,
1968 args,
1969 };
1970
1971 let payload = serde_json::to_string(&request)
1972 .map_err(|e| anyhow::anyhow!("Failed to serialize batch cancel request: {e}"))?;
1973
1974 if let Some(client) = &self.inner {
1975 client
1976 .send_text(payload, Some(OKX_RATE_LIMIT_KEY_CANCEL.as_slice()))
1977 .await
1978 .map_err(|e| anyhow::anyhow!("Failed to send batch cancel: {e}"))?;
1979 log::debug!("Sent batch cancel orders");
1980 Ok(())
1981 } else {
1982 Err(anyhow::anyhow!("No active WebSocket client"))
1983 }
1984 }
1985
1986 async fn handle_batch_place_orders(
1987 &self,
1988 args: Vec<Value>,
1989 request_id: String,
1990 ) -> anyhow::Result<()> {
1991 let request = OKXWsRequest {
1992 id: Some(request_id),
1993 op: OKXWsOperation::BatchOrders,
1994 exp_time: None,
1995 args,
1996 };
1997
1998 let payload = serde_json::to_string(&request)
1999 .map_err(|e| anyhow::anyhow!("Failed to serialize batch place request: {e}"))?;
2000
2001 if let Some(client) = &self.inner {
2002 client
2003 .send_text(payload, Some(OKX_RATE_LIMIT_KEY_ORDER.as_slice()))
2004 .await
2005 .map_err(|e| anyhow::anyhow!("Failed to send batch place: {e}"))?;
2006 log::debug!("Sent batch place orders");
2007 Ok(())
2008 } else {
2009 Err(anyhow::anyhow!("No active WebSocket client"))
2010 }
2011 }
2012
2013 async fn handle_batch_amend_orders(
2014 &self,
2015 args: Vec<Value>,
2016 request_id: String,
2017 ) -> anyhow::Result<()> {
2018 let request = OKXWsRequest {
2019 id: Some(request_id),
2020 op: OKXWsOperation::BatchAmendOrders,
2021 exp_time: None,
2022 args,
2023 };
2024
2025 let payload = serde_json::to_string(&request)
2026 .map_err(|e| anyhow::anyhow!("Failed to serialize batch amend request: {e}"))?;
2027
2028 if let Some(client) = &self.inner {
2029 client
2030 .send_text(payload, Some(OKX_RATE_LIMIT_KEY_AMEND.as_slice()))
2031 .await
2032 .map_err(|e| anyhow::anyhow!("Failed to send batch amend: {e}"))?;
2033 log::debug!("Sent batch amend orders");
2034 Ok(())
2035 } else {
2036 Err(anyhow::anyhow!("No active WebSocket client"))
2037 }
2038 }
2039
2040 async fn handle_subscribe(&self, args: Vec<OKXSubscriptionArg>) -> anyhow::Result<()> {
2041 for arg in &args {
2042 log::debug!(
2043 "Subscribing to channel: channel={:?}, inst_id={:?}",
2044 arg.channel,
2045 arg.inst_id
2046 );
2047 }
2048
2049 let message = OKXSubscription {
2050 op: OKXWsOperation::Subscribe,
2051 args,
2052 };
2053
2054 let json_txt = serde_json::to_string(&message)
2055 .map_err(|e| anyhow::anyhow!("Failed to serialize subscription: {e}"))?;
2056
2057 self.send_with_retry(json_txt, Some(OKX_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
2058 .await
2059 .map_err(|e| anyhow::anyhow!("Failed to send subscription after retries: {e}"))?;
2060 Ok(())
2061 }
2062
2063 async fn handle_unsubscribe(&self, args: Vec<OKXSubscriptionArg>) -> anyhow::Result<()> {
2064 for arg in &args {
2065 log::debug!(
2066 "Unsubscribing from channel: channel={:?}, inst_id={:?}",
2067 arg.channel,
2068 arg.inst_id
2069 );
2070 }
2071
2072 let message = OKXSubscription {
2073 op: OKXWsOperation::Unsubscribe,
2074 args,
2075 };
2076
2077 let json_txt = serde_json::to_string(&message)
2078 .map_err(|e| anyhow::anyhow!("Failed to serialize unsubscription: {e}"))?;
2079
2080 self.send_with_retry(json_txt, Some(OKX_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
2081 .await
2082 .map_err(|e| anyhow::anyhow!("Failed to send unsubscription after retries: {e}"))?;
2083 Ok(())
2084 }
2085
2086 async fn handle_place_order(
2087 &mut self,
2088 params: WsPostOrderParams,
2089 client_order_id: ClientOrderId,
2090 trader_id: TraderId,
2091 strategy_id: StrategyId,
2092 instrument_id: InstrumentId,
2093 ) -> anyhow::Result<()> {
2094 let request_id = self.generate_unique_request_id();
2095
2096 self.pending_place_requests.insert(
2097 request_id.clone(),
2098 (
2099 PendingOrderParams::Regular(params.clone()),
2100 client_order_id,
2101 trader_id,
2102 strategy_id,
2103 instrument_id,
2104 ),
2105 );
2106
2107 let request = OKXWsRequest {
2108 id: Some(request_id.clone()),
2109 op: OKXWsOperation::Order,
2110 exp_time: None,
2111 args: vec![params],
2112 };
2113
2114 let payload = serde_json::to_string(&request)
2115 .map_err(|e| anyhow::anyhow!("Failed to serialize place order request: {e}"))?;
2116
2117 match self
2118 .send_with_retry(payload, Some(OKX_RATE_LIMIT_KEY_ORDER.as_slice()))
2119 .await
2120 {
2121 Ok(()) => {
2122 log::debug!("Sent place order request");
2123 Ok(())
2124 }
2125 Err(e) => {
2126 log::error!("Failed to send place order after retries: error={e}");
2127
2128 self.pending_place_requests.remove(&request_id);
2129
2130 let ts_now = self.clock.get_time_ns();
2131 let rejected = OrderRejected::new(
2132 trader_id,
2133 strategy_id,
2134 instrument_id,
2135 client_order_id,
2136 self.account_id,
2137 Ustr::from(&format!("WebSocket send failed: {e}")),
2138 UUID4::new(),
2139 ts_now, ts_now, false, false, );
2144 let _ = self.send(NautilusWsMessage::OrderRejected(rejected));
2145
2146 Err(anyhow::anyhow!("Failed to send place order: {e}"))
2147 }
2148 }
2149 }
2150
2151 async fn handle_place_algo_order(
2152 &mut self,
2153 params: WsPostAlgoOrderParams,
2154 client_order_id: ClientOrderId,
2155 trader_id: TraderId,
2156 strategy_id: StrategyId,
2157 instrument_id: InstrumentId,
2158 ) -> anyhow::Result<()> {
2159 let request_id = self.generate_unique_request_id();
2160
2161 self.pending_place_requests.insert(
2162 request_id.clone(),
2163 (
2164 PendingOrderParams::Algo(params.clone()),
2165 client_order_id,
2166 trader_id,
2167 strategy_id,
2168 instrument_id,
2169 ),
2170 );
2171
2172 let request = OKXWsRequest {
2173 id: Some(request_id.clone()),
2174 op: OKXWsOperation::OrderAlgo,
2175 exp_time: None,
2176 args: vec![params],
2177 };
2178
2179 let payload = serde_json::to_string(&request)
2180 .map_err(|e| anyhow::anyhow!("Failed to serialize place algo order request: {e}"))?;
2181
2182 match self
2183 .send_with_retry(payload, Some(OKX_RATE_LIMIT_KEY_ORDER.as_slice()))
2184 .await
2185 {
2186 Ok(()) => {
2187 log::debug!("Sent place algo order request");
2188 Ok(())
2189 }
2190 Err(e) => {
2191 log::error!("Failed to send place algo order after retries: error={e}");
2192
2193 self.pending_place_requests.remove(&request_id);
2194
2195 let ts_now = self.clock.get_time_ns();
2196 let rejected = OrderRejected::new(
2197 trader_id,
2198 strategy_id,
2199 instrument_id,
2200 client_order_id,
2201 self.account_id,
2202 Ustr::from(&format!("WebSocket send failed: {e}")),
2203 UUID4::new(),
2204 ts_now, ts_now, false, false, );
2209 let _ = self.send(NautilusWsMessage::OrderRejected(rejected));
2210
2211 Err(anyhow::anyhow!("Failed to send place algo order: {e}"))
2212 }
2213 }
2214 }
2215
2216 async fn handle_cancel_order(
2217 &mut self,
2218 client_order_id: Option<ClientOrderId>,
2219 venue_order_id: Option<VenueOrderId>,
2220 instrument_id: InstrumentId,
2221 trader_id: TraderId,
2222 strategy_id: StrategyId,
2223 ) -> anyhow::Result<()> {
2224 let mut builder = WsCancelOrderParamsBuilder::default();
2225 builder.inst_id(instrument_id.symbol.as_str());
2226
2227 if let Some(inst_id_code) = self.inst_id_code_cache.get(&instrument_id.symbol.inner()) {
2229 builder.inst_id_code(*inst_id_code.value());
2230 }
2231
2232 if let Some(venue_order_id) = venue_order_id {
2233 builder.ord_id(venue_order_id.as_str());
2234 }
2235
2236 if let Some(client_order_id) = client_order_id {
2237 builder.cl_ord_id(client_order_id.as_str());
2238 }
2239
2240 let params = builder
2241 .build()
2242 .map_err(|e| anyhow::anyhow!("Failed to build cancel params: {e}"))?;
2243
2244 let request_id = self.generate_unique_request_id();
2245
2246 if let Some(client_order_id) = client_order_id {
2248 self.pending_cancel_requests.insert(
2249 request_id.clone(),
2250 (
2251 client_order_id,
2252 trader_id,
2253 strategy_id,
2254 instrument_id,
2255 venue_order_id,
2256 ),
2257 );
2258 }
2259
2260 let request = OKXWsRequest {
2261 id: Some(request_id.clone()),
2262 op: OKXWsOperation::CancelOrder,
2263 exp_time: None,
2264 args: vec![params],
2265 };
2266
2267 let payload = serde_json::to_string(&request)
2268 .map_err(|e| anyhow::anyhow!("Failed to serialize cancel request: {e}"))?;
2269
2270 match self
2271 .send_with_retry(payload, Some(OKX_RATE_LIMIT_KEY_CANCEL.as_slice()))
2272 .await
2273 {
2274 Ok(()) => {
2275 log::debug!("Sent cancel order request");
2276 Ok(())
2277 }
2278 Err(e) => {
2279 log::error!("Failed to send cancel order after retries: error={e}");
2280
2281 self.pending_cancel_requests.remove(&request_id);
2282
2283 if let Some(client_order_id) = client_order_id {
2284 let ts_now = self.clock.get_time_ns();
2285 let rejected = OrderCancelRejected::new(
2286 trader_id,
2287 strategy_id,
2288 instrument_id,
2289 client_order_id,
2290 Ustr::from(&format!("WebSocket send failed: {e}")),
2291 UUID4::new(),
2292 ts_now, ts_now, false, venue_order_id,
2296 Some(self.account_id),
2297 );
2298 let _ = self.send(NautilusWsMessage::OrderCancelRejected(rejected));
2299 }
2300
2301 Err(anyhow::anyhow!("Failed to send cancel order: {e}"))
2302 }
2303 }
2304 }
2305
2306 async fn handle_amend_order(
2307 &mut self,
2308 params: WsAmendOrderParams,
2309 client_order_id: ClientOrderId,
2310 trader_id: TraderId,
2311 strategy_id: StrategyId,
2312 instrument_id: InstrumentId,
2313 venue_order_id: Option<VenueOrderId>,
2314 ) -> anyhow::Result<()> {
2315 let request_id = self.generate_unique_request_id();
2316
2317 self.pending_amend_requests.insert(
2318 request_id.clone(),
2319 (
2320 client_order_id,
2321 trader_id,
2322 strategy_id,
2323 instrument_id,
2324 venue_order_id,
2325 ),
2326 );
2327
2328 let request = OKXWsRequest {
2329 id: Some(request_id.clone()),
2330 op: OKXWsOperation::AmendOrder,
2331 exp_time: None,
2332 args: vec![params],
2333 };
2334
2335 let payload = serde_json::to_string(&request)
2336 .map_err(|e| anyhow::anyhow!("Failed to serialize amend order request: {e}"))?;
2337
2338 match self
2339 .send_with_retry(payload, Some(OKX_RATE_LIMIT_KEY_AMEND.as_slice()))
2340 .await
2341 {
2342 Ok(()) => {
2343 log::debug!("Sent amend order request");
2344 Ok(())
2345 }
2346 Err(e) => {
2347 log::error!("Failed to send amend order after retries: error={e}");
2348
2349 self.pending_amend_requests.remove(&request_id);
2350
2351 let ts_now = self.clock.get_time_ns();
2352 let rejected = OrderModifyRejected::new(
2353 trader_id,
2354 strategy_id,
2355 instrument_id,
2356 client_order_id,
2357 Ustr::from(&format!("WebSocket send failed: {e}")),
2358 UUID4::new(),
2359 ts_now, ts_now, false, venue_order_id,
2363 Some(self.account_id),
2364 );
2365 let _ = self.send(NautilusWsMessage::OrderModifyRejected(rejected));
2366
2367 Err(anyhow::anyhow!("Failed to send amend order: {e}"))
2368 }
2369 }
2370 }
2371
2372 async fn handle_cancel_algo_order(
2373 &mut self,
2374 client_order_id: Option<ClientOrderId>,
2375 algo_order_id: Option<VenueOrderId>,
2376 instrument_id: InstrumentId,
2377 trader_id: TraderId,
2378 strategy_id: StrategyId,
2379 ) -> anyhow::Result<()> {
2380 let mut builder = WsCancelAlgoOrderParamsBuilder::default();
2381 builder.inst_id(instrument_id.symbol.as_str());
2382
2383 if let Some(inst_id_code) = self.inst_id_code_cache.get(&instrument_id.symbol.inner()) {
2385 builder.inst_id_code(*inst_id_code.value());
2386 }
2387
2388 if let Some(client_order_id) = &client_order_id {
2389 builder.algo_cl_ord_id(client_order_id.as_str());
2390 }
2391
2392 if let Some(algo_id) = &algo_order_id {
2393 builder.algo_id(algo_id.as_str());
2394 }
2395
2396 let params = builder
2397 .build()
2398 .map_err(|e| anyhow::anyhow!("Failed to build cancel algo params: {e}"))?;
2399
2400 let request_id = self.generate_unique_request_id();
2401
2402 if let Some(client_order_id) = client_order_id {
2404 self.pending_cancel_requests.insert(
2405 request_id.clone(),
2406 (client_order_id, trader_id, strategy_id, instrument_id, None),
2407 );
2408 }
2409
2410 let request = OKXWsRequest {
2411 id: Some(request_id.clone()),
2412 op: OKXWsOperation::CancelAlgos,
2413 exp_time: None,
2414 args: vec![params],
2415 };
2416
2417 let payload = serde_json::to_string(&request)
2418 .map_err(|e| anyhow::anyhow!("Failed to serialize cancel algo request: {e}"))?;
2419
2420 match self
2421 .send_with_retry(payload, Some(OKX_RATE_LIMIT_KEY_CANCEL.as_slice()))
2422 .await
2423 {
2424 Ok(()) => {
2425 log::debug!("Sent cancel algo order request");
2426 Ok(())
2427 }
2428 Err(e) => {
2429 log::error!("Failed to send cancel algo order after retries: error={e}");
2430
2431 self.pending_cancel_requests.remove(&request_id);
2432
2433 if let Some(client_order_id) = client_order_id {
2434 let ts_now = self.clock.get_time_ns();
2435 let rejected = OrderCancelRejected::new(
2436 trader_id,
2437 strategy_id,
2438 instrument_id,
2439 client_order_id,
2440 Ustr::from(&format!("WebSocket send failed: {e}")),
2441 UUID4::new(),
2442 ts_now, ts_now, false, None,
2446 Some(self.account_id),
2447 );
2448 let _ = self.send(NautilusWsMessage::OrderCancelRejected(rejected));
2449 }
2450
2451 Err(anyhow::anyhow!("Failed to send cancel algo order: {e}"))
2452 }
2453 }
2454 }
2455}
2456
2457pub fn is_post_only_rejection(code: &str, data: &[Value]) -> bool {
2459 if code == OKX_POST_ONLY_ERROR_CODE {
2460 return true;
2461 }
2462
2463 for entry in data {
2464 if let Some(s_code) = entry.get("sCode").and_then(|value| value.as_str())
2465 && s_code == OKX_POST_ONLY_ERROR_CODE
2466 {
2467 return true;
2468 }
2469
2470 if let Some(inner_code) = entry.get("code").and_then(|value| value.as_str())
2471 && inner_code == OKX_POST_ONLY_ERROR_CODE
2472 {
2473 return true;
2474 }
2475 }
2476
2477 false
2478}
2479
2480#[inline]
2482fn contains_ignore_ascii_case(haystack: &str, needle: &str) -> bool {
2483 haystack
2484 .as_bytes()
2485 .windows(needle.len())
2486 .any(|window| window.eq_ignore_ascii_case(needle.as_bytes()))
2487}
2488
2489fn should_retry_okx_error(error: &OKXWsError) -> bool {
2491 match error {
2492 OKXWsError::OkxError { error_code, .. } => should_retry_error_code(error_code),
2493 OKXWsError::TungsteniteError(_) => true, OKXWsError::ClientError(msg) => {
2495 contains_ignore_ascii_case(msg, "timeout")
2497 || contains_ignore_ascii_case(msg, "timed out")
2498 || contains_ignore_ascii_case(msg, "connection")
2499 || contains_ignore_ascii_case(msg, "network")
2500 }
2501 OKXWsError::AuthenticationError(_)
2502 | OKXWsError::JsonError(_)
2503 | OKXWsError::ParsingError(_) => {
2504 false
2506 }
2507 }
2508}
2509
2510fn create_okx_timeout_error(msg: String) -> OKXWsError {
2512 OKXWsError::ClientError(msg)
2513}
2514
2515#[cfg(test)]
2516mod tests {
2517 use std::sync::{Arc, atomic::AtomicBool};
2518
2519 use dashmap::DashMap;
2520 use nautilus_model::{
2521 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
2522 types::{Money, Quantity},
2523 };
2524 use nautilus_network::websocket::{AuthTracker, SubscriptionState};
2525 use rstest::rstest;
2526
2527 use super::{NautilusWsMessage, OKXWsFeedHandler};
2528 use crate::websocket::parse::OrderStateSnapshot;
2529
2530 const OKX_WS_TOPIC_DELIMITER: char = ':';
2531
2532 #[allow(clippy::type_complexity)]
2533 fn create_test_handler() -> (
2534 OKXWsFeedHandler,
2535 tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>,
2536 Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
2537 Arc<DashMap<ClientOrderId, ClientOrderId>>,
2538 ) {
2539 let account_id = AccountId::new("OKX-001");
2540 let signal = Arc::new(AtomicBool::new(false));
2541 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
2542 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
2543 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
2544 let active_client_orders = Arc::new(DashMap::new());
2545 let client_id_aliases = Arc::new(DashMap::new());
2546 let inst_id_code_cache = Arc::new(DashMap::new());
2547 let auth_tracker = AuthTracker::new();
2548 let subscriptions_state = SubscriptionState::new(OKX_WS_TOPIC_DELIMITER);
2549
2550 let handler = OKXWsFeedHandler::new(
2551 account_id,
2552 signal,
2553 cmd_rx,
2554 raw_rx,
2555 out_tx,
2556 active_client_orders.clone(),
2557 client_id_aliases.clone(),
2558 inst_id_code_cache,
2559 auth_tracker,
2560 subscriptions_state,
2561 );
2562
2563 (handler, out_rx, active_client_orders, client_id_aliases)
2564 }
2565
2566 #[rstest]
2567 fn test_is_post_only_rejection_detects_by_code() {
2568 assert!(super::is_post_only_rejection("51019", &[]));
2569 }
2570
2571 #[rstest]
2572 fn test_is_post_only_rejection_detects_by_inner_code() {
2573 let data = vec![serde_json::json!({
2574 "sCode": "51019"
2575 })];
2576 assert!(super::is_post_only_rejection("50000", &data));
2577 }
2578
2579 #[rstest]
2580 fn test_is_post_only_rejection_false_for_unrelated_error() {
2581 let data = vec![serde_json::json!({
2582 "sMsg": "Insufficient balance"
2583 })];
2584 assert!(!super::is_post_only_rejection("50000", &data));
2585 }
2586
2587 #[rstest]
2588 fn test_handler_register_client_order_aliases_with_parent() {
2589 let (handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2590
2591 let child = Some(ClientOrderId::new("CHILD-001"));
2592 let parent = Some(ClientOrderId::new("PARENT-001"));
2593
2594 let result = handler.register_client_order_aliases(&child, &parent);
2595
2596 assert_eq!(result, Some(ClientOrderId::new("PARENT-001")));
2597 assert!(client_id_aliases.contains_key(&ClientOrderId::new("PARENT-001")));
2598 assert!(client_id_aliases.contains_key(&ClientOrderId::new("CHILD-001")));
2599 assert_eq!(
2600 *client_id_aliases
2601 .get(&ClientOrderId::new("CHILD-001"))
2602 .unwrap(),
2603 ClientOrderId::new("PARENT-001")
2604 );
2605 }
2606
2607 #[rstest]
2608 fn test_handler_register_client_order_aliases_without_parent() {
2609 let (handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2610
2611 let child = Some(ClientOrderId::new("ORDER-001"));
2612 let parent: Option<ClientOrderId> = None;
2613
2614 let result = handler.register_client_order_aliases(&child, &parent);
2615
2616 assert_eq!(result, Some(ClientOrderId::new("ORDER-001")));
2617 assert!(client_id_aliases.contains_key(&ClientOrderId::new("ORDER-001")));
2618 assert_eq!(
2619 *client_id_aliases
2620 .get(&ClientOrderId::new("ORDER-001"))
2621 .unwrap(),
2622 ClientOrderId::new("ORDER-001")
2623 );
2624 }
2625
2626 #[rstest]
2627 fn test_handler_cleanup_terminal_order_removes_all_state() {
2628 let (mut handler, _out_rx, active_client_orders, client_id_aliases) = create_test_handler();
2629
2630 let canonical = ClientOrderId::new("PARENT-001");
2631 let child = ClientOrderId::new("CHILD-001");
2632 let venue_id = VenueOrderId::new("VENUE-001");
2633 let trader_id = TraderId::new("TRADER-001");
2634 let strategy_id = StrategyId::new("STRATEGY-001");
2635 let instrument_id = InstrumentId::from("ETH-USDT-PERP.OKX");
2636
2637 active_client_orders.insert(canonical, (trader_id, strategy_id, instrument_id));
2638 client_id_aliases.insert(canonical, canonical);
2639 client_id_aliases.insert(child, canonical);
2640 handler
2641 .fee_cache
2642 .insert(venue_id.inner(), Money::from("0.001 USDT"));
2643 handler
2644 .filled_qty_cache
2645 .insert(venue_id.inner(), Quantity::from("1.0"));
2646 handler.order_state_cache.insert(
2647 canonical,
2648 OrderStateSnapshot {
2649 venue_order_id: venue_id,
2650 quantity: Quantity::from("1.0"),
2651 price: None,
2652 },
2653 );
2654
2655 handler.cleanup_terminal_order(&canonical, &venue_id);
2656
2657 assert!(!active_client_orders.contains_key(&canonical));
2658 assert!(!client_id_aliases.contains_key(&canonical));
2659 assert!(!client_id_aliases.contains_key(&child));
2660 assert!(!handler.fee_cache.contains_key(&venue_id.inner()));
2661 assert!(!handler.filled_qty_cache.contains_key(&venue_id.inner()));
2662 assert!(!handler.order_state_cache.contains_key(&canonical));
2663 }
2664
2665 #[rstest]
2666 fn test_handler_cleanup_terminal_order_removes_multiple_children() {
2667 let (mut handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2668
2669 let canonical = ClientOrderId::new("PARENT-001");
2670 let child1 = ClientOrderId::new("CHILD-001");
2671 let child2 = ClientOrderId::new("CHILD-002");
2672 let child3 = ClientOrderId::new("CHILD-003");
2673 let venue_id = VenueOrderId::new("VENUE-001");
2674
2675 client_id_aliases.insert(canonical, canonical);
2676 client_id_aliases.insert(child1, canonical);
2677 client_id_aliases.insert(child2, canonical);
2678 client_id_aliases.insert(child3, canonical);
2679
2680 handler.cleanup_terminal_order(&canonical, &venue_id);
2681
2682 assert!(!client_id_aliases.contains_key(&canonical));
2683 assert!(!client_id_aliases.contains_key(&child1));
2684 assert!(!client_id_aliases.contains_key(&child2));
2685 assert!(!client_id_aliases.contains_key(&child3));
2686 assert!(client_id_aliases.is_empty());
2687 }
2688
2689 #[rstest]
2690 fn test_handler_cleanup_does_not_affect_other_orders() {
2691 let (mut handler, _out_rx, active_client_orders, client_id_aliases) = create_test_handler();
2692
2693 let canonical1 = ClientOrderId::new("PARENT-001");
2694 let child1 = ClientOrderId::new("CHILD-001");
2695 let venue_id1 = VenueOrderId::new("VENUE-001");
2696
2697 let canonical2 = ClientOrderId::new("PARENT-002");
2698 let child2 = ClientOrderId::new("CHILD-002");
2699 let venue_id2 = VenueOrderId::new("VENUE-002");
2700
2701 let trader_id = TraderId::new("TRADER-001");
2702 let strategy_id = StrategyId::new("STRATEGY-001");
2703 let instrument_id = InstrumentId::from("ETH-USDT-PERP.OKX");
2704
2705 active_client_orders.insert(canonical1, (trader_id, strategy_id, instrument_id));
2706 active_client_orders.insert(canonical2, (trader_id, strategy_id, instrument_id));
2707 client_id_aliases.insert(canonical1, canonical1);
2708 client_id_aliases.insert(child1, canonical1);
2709 client_id_aliases.insert(canonical2, canonical2);
2710 client_id_aliases.insert(child2, canonical2);
2711 handler
2712 .fee_cache
2713 .insert(venue_id1.inner(), Money::from("0.001 USDT"));
2714 handler
2715 .fee_cache
2716 .insert(venue_id2.inner(), Money::from("0.002 USDT"));
2717
2718 handler.cleanup_terminal_order(&canonical1, &venue_id1);
2719
2720 assert!(!active_client_orders.contains_key(&canonical1));
2721 assert!(!client_id_aliases.contains_key(&canonical1));
2722 assert!(!client_id_aliases.contains_key(&child1));
2723 assert!(!handler.fee_cache.contains_key(&venue_id1.inner()));
2724
2725 assert!(active_client_orders.contains_key(&canonical2));
2726 assert!(client_id_aliases.contains_key(&canonical2));
2727 assert!(client_id_aliases.contains_key(&child2));
2728 assert!(handler.fee_cache.contains_key(&venue_id2.inner()));
2729 }
2730
2731 mod channel_routing {
2732 use nautilus_core::nanos::UnixNanos;
2733 use nautilus_model::{
2734 identifiers::{InstrumentId, Symbol},
2735 instruments::{CryptoPerpetual, CurrencyPair, Instrument, InstrumentAny},
2736 types::{Currency, Price, Quantity},
2737 };
2738 use rstest::rstest;
2739 use ustr::Ustr;
2740
2741 use super::*;
2742 use crate::{
2743 common::{enums::OKXBookAction, testing::load_test_json},
2744 websocket::{enums::OKXWsChannel, messages::OKXWsMessage},
2745 };
2746
2747 fn create_spot_instrument() -> InstrumentAny {
2748 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2749 InstrumentAny::CurrencyPair(CurrencyPair::new(
2750 instrument_id,
2751 Symbol::from("BTC-USDT"),
2752 Currency::BTC(),
2753 Currency::USDT(),
2754 2,
2755 8,
2756 Price::from("0.01"),
2757 Quantity::from("0.00000001"),
2758 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
2771 UnixNanos::default(),
2772 ))
2773 }
2774
2775 fn create_swap_instrument() -> InstrumentAny {
2776 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2777 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2778 instrument_id,
2779 Symbol::from("BTC-USDT-SWAP"),
2780 Currency::BTC(),
2781 Currency::USDT(),
2782 Currency::USDT(),
2783 false,
2784 2,
2785 8,
2786 Price::from("0.01"),
2787 Quantity::from("0.00000001"),
2788 None,
2789 None,
2790 None,
2791 None,
2792 None,
2793 None,
2794 None,
2795 None,
2796 None,
2797 None,
2798 None,
2799 None,
2800 UnixNanos::default(),
2801 UnixNanos::default(),
2802 ))
2803 }
2804
2805 fn create_handler_with_instruments(instruments: Vec<InstrumentAny>) -> OKXWsFeedHandler {
2806 let (mut handler, _, _, _) = create_test_handler();
2807 for inst in instruments {
2808 handler
2809 .instruments_cache
2810 .insert(inst.symbol().inner(), inst);
2811 }
2812 handler
2813 }
2814
2815 #[rstest]
2816 fn test_parse_raw_message_ticker_channel() {
2817 let json = load_test_json("ws_tickers.json");
2818 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2819
2820 match msg {
2821 OKXWsMessage::Data { arg, data } => {
2822 assert!(
2823 matches!(arg.channel, OKXWsChannel::Tickers),
2824 "Expected Tickers channel"
2825 );
2826 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
2827 assert!(data.is_array());
2828 }
2829 _ => panic!("Expected OKXWsMessage::Data variant"),
2830 }
2831 }
2832
2833 #[rstest]
2834 fn test_parse_raw_message_trades_channel() {
2835 let json = load_test_json("ws_trades.json");
2836 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2837
2838 match msg {
2839 OKXWsMessage::Data { arg, data } => {
2840 assert!(
2841 matches!(arg.channel, OKXWsChannel::Trades),
2842 "Expected Trades channel"
2843 );
2844 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USD")));
2845 assert!(data.is_array());
2846 }
2847 _ => panic!("Expected OKXWsMessage::Data variant"),
2848 }
2849 }
2850
2851 #[rstest]
2852 fn test_parse_raw_message_books_channel() {
2853 let json = load_test_json("ws_books_snapshot.json");
2854 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2855
2856 match msg {
2857 OKXWsMessage::BookData { arg, action, data } => {
2858 assert!(
2859 matches!(arg.channel, OKXWsChannel::Books),
2860 "Expected Books channel"
2861 );
2862 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
2863 assert!(
2864 matches!(action, OKXBookAction::Snapshot),
2865 "Expected snapshot action"
2866 );
2867 assert!(!data.is_empty());
2868 }
2869 _ => panic!("Expected OKXWsMessage::BookData variant"),
2870 }
2871 }
2872
2873 #[rstest]
2874 fn test_parse_raw_message_candle_channel() {
2875 let json = load_test_json("ws_candle.json");
2876 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2877
2878 match msg {
2879 OKXWsMessage::Data { arg, data } => {
2880 assert!(
2882 matches!(arg.channel, OKXWsChannel::Candle1Day),
2883 "Expected Candle1Day channel, was {:?}",
2884 arg.channel
2885 );
2886 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
2887 assert!(data.is_array());
2888 }
2889 _ => panic!("Expected OKXWsMessage::Data variant"),
2890 }
2891 }
2892
2893 #[rstest]
2894 fn test_parse_raw_message_funding_rate_channel() {
2895 let json = load_test_json("ws_funding_rate.json");
2896 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2897
2898 match msg {
2899 OKXWsMessage::Data { arg, data } => {
2900 assert!(
2901 matches!(arg.channel, OKXWsChannel::FundingRate),
2902 "Expected FundingRate channel"
2903 );
2904 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT-SWAP")));
2905 assert!(data.is_array());
2906 }
2907 _ => panic!("Expected OKXWsMessage::Data variant"),
2908 }
2909 }
2910
2911 #[rstest]
2912 fn test_parse_raw_message_bbo_tbt_channel() {
2913 let json = load_test_json("ws_bbo_tbt.json");
2914 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2915
2916 match msg {
2917 OKXWsMessage::Data { arg, data } => {
2918 assert!(
2919 matches!(arg.channel, OKXWsChannel::BboTbt),
2920 "Expected BboTbt channel"
2921 );
2922 assert!(data.is_array());
2923 }
2924 _ => panic!("Expected OKXWsMessage::Data variant"),
2925 }
2926 }
2927
2928 #[rstest]
2929 fn test_handle_other_channel_data_tickers() {
2930 let mut handler = create_handler_with_instruments(vec![create_spot_instrument()]);
2931 let json = load_test_json("ws_tickers.json");
2932 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2933
2934 let OKXWsMessage::Data { arg, data } = msg else {
2935 panic!("Expected OKXWsMessage::Data");
2936 };
2937
2938 let ts_init = UnixNanos::from(1_000_000_000u64);
2939 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
2940
2941 assert!(result.is_some());
2942 match result.unwrap() {
2943 NautilusWsMessage::Data(payloads) => {
2944 assert!(!payloads.is_empty(), "Should produce data payloads");
2945 }
2946 other => panic!("Expected NautilusWsMessage::Data, was {other:?}"),
2947 }
2948 }
2949
2950 #[rstest]
2951 fn test_handle_other_channel_data_trades() {
2952 let instrument_id = InstrumentId::from("BTC-USD.OKX");
2954 let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
2955 instrument_id,
2956 Symbol::from("BTC-USD"),
2957 Currency::BTC(),
2958 Currency::USD(),
2959 1,
2960 8,
2961 Price::from("0.1"),
2962 Quantity::from("0.00000001"),
2963 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
2976 UnixNanos::default(),
2977 ));
2978
2979 let mut handler = create_handler_with_instruments(vec![instrument]);
2980 let json = load_test_json("ws_trades.json");
2981 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2982
2983 let OKXWsMessage::Data { arg, data } = msg else {
2984 panic!("Expected OKXWsMessage::Data");
2985 };
2986
2987 let ts_init = UnixNanos::from(1_000_000_000u64);
2988 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
2989
2990 assert!(result.is_some());
2991 match result.unwrap() {
2992 NautilusWsMessage::Data(payloads) => {
2993 assert!(!payloads.is_empty(), "Should produce trade data payloads");
2994 }
2995 other => panic!("Expected NautilusWsMessage::Data, was {other:?}"),
2996 }
2997 }
2998
2999 #[rstest]
3000 fn test_handle_book_data_snapshot() {
3001 let handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3002 let json = load_test_json("ws_books_snapshot.json");
3003 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3004
3005 let OKXWsMessage::BookData { arg, action, data } = msg else {
3006 panic!("Expected OKXWsMessage::BookData");
3007 };
3008
3009 let ts_init = UnixNanos::from(1_000_000_000u64);
3010 let result = handler.handle_book_data(arg, action, data, ts_init);
3011
3012 assert!(result.is_some());
3013 match result.unwrap() {
3014 NautilusWsMessage::Data(payloads) => {
3015 assert!(!payloads.is_empty(), "Should produce order book payloads");
3016 }
3017 other => panic!("Expected NautilusWsMessage::Data, was {other:?}"),
3018 }
3019 }
3020
3021 #[rstest]
3022 fn test_handle_book_data_update() {
3023 let handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3024 let json = load_test_json("ws_books_update.json");
3025 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3026
3027 let OKXWsMessage::BookData { arg, action, data } = msg else {
3028 panic!("Expected OKXWsMessage::BookData");
3029 };
3030
3031 let ts_init = UnixNanos::from(1_000_000_000u64);
3032 let result = handler.handle_book_data(arg, action, data, ts_init);
3033
3034 assert!(result.is_some());
3035 match result.unwrap() {
3036 NautilusWsMessage::Data(payloads) => {
3037 assert!(
3038 !payloads.is_empty(),
3039 "Should produce order book delta payloads"
3040 );
3041 }
3042 other => panic!("Expected NautilusWsMessage::Data, was {other:?}"),
3043 }
3044 }
3045
3046 #[rstest]
3047 fn test_handle_other_channel_data_candles() {
3048 let mut handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3049 let json = load_test_json("ws_candle.json");
3050 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3051
3052 let OKXWsMessage::Data { arg, data } = msg else {
3053 panic!("Expected OKXWsMessage::Data");
3054 };
3055
3056 let ts_init = UnixNanos::from(1_000_000_000u64);
3057 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3058
3059 assert!(result.is_some());
3060 match result.unwrap() {
3061 NautilusWsMessage::Data(payloads) => {
3062 assert!(!payloads.is_empty(), "Should produce bar data payloads");
3063 }
3064 other => panic!("Expected NautilusWsMessage::Data, was {other:?}"),
3065 }
3066 }
3067
3068 #[rstest]
3069 fn test_handle_other_channel_data_funding_rate() {
3070 let mut handler = create_handler_with_instruments(vec![create_swap_instrument()]);
3071 let json = load_test_json("ws_funding_rate.json");
3072 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3073
3074 let OKXWsMessage::Data { arg, data } = msg else {
3075 panic!("Expected OKXWsMessage::Data");
3076 };
3077
3078 let ts_init = UnixNanos::from(1_000_000_000u64);
3079 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3080
3081 assert!(result.is_none() || matches!(result, Some(NautilusWsMessage::FundingRates(_))));
3083 }
3084
3085 #[rstest]
3086 fn test_handle_account_data_parses_successfully() {
3087 let mut handler = create_handler_with_instruments(vec![]);
3088 let json = load_test_json("ws_account.json");
3089 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3090
3091 let OKXWsMessage::Data { data, .. } = msg else {
3092 panic!("Expected OKXWsMessage::Data");
3093 };
3094
3095 let ts_init = UnixNanos::from(1_000_000_000u64);
3096 let result = handler.handle_account_data(data, ts_init);
3097
3098 assert!(result.is_some());
3099 match result.unwrap() {
3100 NautilusWsMessage::AccountUpdate(account_state) => {
3101 assert!(
3102 !account_state.balances.is_empty(),
3103 "Should have balance data"
3104 );
3105 }
3106 other => panic!("Expected NautilusWsMessage::AccountUpdate, was {other:?}"),
3107 }
3108 }
3109
3110 #[rstest]
3111 fn test_handle_other_channel_data_missing_instrument() {
3112 let mut handler = create_handler_with_instruments(vec![]);
3113 let json = load_test_json("ws_tickers.json");
3114 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3115
3116 let OKXWsMessage::Data { arg, data } = msg else {
3117 panic!("Expected OKXWsMessage::Data");
3118 };
3119
3120 let ts_init = UnixNanos::from(1_000_000_000u64);
3121 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3122
3123 assert!(result.is_none());
3125 }
3126
3127 #[rstest]
3128 fn test_handle_book_data_missing_instrument() {
3129 let handler = create_handler_with_instruments(vec![]);
3130 let json = load_test_json("ws_books_snapshot.json");
3131 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3132
3133 let OKXWsMessage::BookData { arg, action, data } = msg else {
3134 panic!("Expected OKXWsMessage::BookData");
3135 };
3136
3137 let ts_init = UnixNanos::from(1_000_000_000u64);
3138 let result = handler.handle_book_data(arg, action, data, ts_init);
3139
3140 assert!(result.is_none());
3142 }
3143 }
3144}