1use std::{
32 collections::VecDeque,
33 sync::{
34 Arc,
35 atomic::{AtomicBool, AtomicU64, Ordering},
36 },
37};
38
39use ahash::AHashMap;
40use dashmap::DashMap;
41use nautilus_core::{AtomicTime, UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
42use nautilus_model::{
43 enums::{OrderStatus, OrderType},
44 events::{
45 AccountState, OrderAccepted, OrderCancelRejected, OrderModifyRejected, OrderRejected,
46 },
47 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
48 instruments::{Instrument, InstrumentAny},
49 types::{Money, Quantity},
50};
51use nautilus_network::{
52 RECONNECTED,
53 retry::{RetryManager, create_websocket_retry_manager},
54 websocket::{AuthTracker, SubscriptionState, TEXT_PING, TEXT_PONG, WebSocketClient},
55};
56use serde_json::Value;
57use tokio_tungstenite::tungstenite::Message;
58use ustr::Ustr;
59
60use super::{
61 enums::{OKXSubscriptionEvent, OKXWsChannel, OKXWsOperation},
62 error::OKXWsError,
63 messages::{
64 ExecutionReport, NautilusWsMessage, OKXAlgoOrderMsg, OKXBookMsg, OKXOrderMsg,
65 OKXSubscription, OKXSubscriptionArg, OKXWebSocketArg, OKXWebSocketError, OKXWsMessage,
66 OKXWsRequest, WsAmendOrderParams, WsCancelAlgoOrderParamsBuilder,
67 WsCancelOrderParamsBuilder, WsMassCancelParams, WsPostAlgoOrderParams, WsPostOrderParams,
68 },
69 parse::{
70 OrderStateSnapshot, ParsedOrderEvent, parse_algo_order_msg, parse_book_msg_vec,
71 parse_order_event, parse_order_msg, parse_ws_message_data,
72 },
73 subscription::topic_from_websocket_arg,
74};
75use crate::{
76 common::{
77 consts::{
78 OKX_POST_ONLY_CANCEL_REASON, OKX_POST_ONLY_CANCEL_SOURCE, OKX_POST_ONLY_ERROR_CODE,
79 should_retry_error_code,
80 },
81 enums::{
82 OKXBookAction, OKXInstrumentType, OKXOrderStatus, OKXSide, OKXTargetCurrency,
83 OKXTradeMode,
84 },
85 parse::{
86 determine_order_type, is_market_price, okx_instrument_type, parse_account_state,
87 parse_client_order_id, parse_millisecond_timestamp, parse_position_status_report,
88 parse_price, parse_quantity,
89 },
90 },
91 http::models::{OKXAccount, OKXPosition},
92 websocket::client::{
93 OKX_RATE_LIMIT_KEY_AMEND, OKX_RATE_LIMIT_KEY_CANCEL, OKX_RATE_LIMIT_KEY_ORDER,
94 OKX_RATE_LIMIT_KEY_SUBSCRIPTION,
95 },
96};
97
98type PlaceRequestData = (
100 PendingOrderParams,
101 ClientOrderId,
102 TraderId,
103 StrategyId,
104 InstrumentId,
105);
106
107type CancelRequestData = (
109 ClientOrderId,
110 TraderId,
111 StrategyId,
112 InstrumentId,
113 Option<VenueOrderId>,
114);
115
116type AmendRequestData = (
118 ClientOrderId,
119 TraderId,
120 StrategyId,
121 InstrumentId,
122 Option<VenueOrderId>,
123);
124
125#[derive(Debug)]
126pub enum PendingOrderParams {
127 Regular(WsPostOrderParams),
128 Algo(WsPostAlgoOrderParams),
129}
130
131#[allow(
133 clippy::large_enum_variant,
134 reason = "Commands are ephemeral and immediately consumed"
135)]
136#[allow(missing_debug_implementations)]
137pub enum HandlerCommand {
138 SetClient(WebSocketClient),
139 Disconnect,
140 Authenticate {
141 payload: String,
142 },
143 InitializeInstruments(Vec<InstrumentAny>),
144 UpdateInstrument(InstrumentAny),
145 Subscribe {
146 args: Vec<OKXSubscriptionArg>,
147 },
148 Unsubscribe {
149 args: Vec<OKXSubscriptionArg>,
150 },
151 PlaceOrder {
152 params: WsPostOrderParams,
153 client_order_id: ClientOrderId,
154 trader_id: TraderId,
155 strategy_id: StrategyId,
156 instrument_id: InstrumentId,
157 },
158 PlaceAlgoOrder {
159 params: WsPostAlgoOrderParams,
160 client_order_id: ClientOrderId,
161 trader_id: TraderId,
162 strategy_id: StrategyId,
163 instrument_id: InstrumentId,
164 },
165 AmendOrder {
166 params: WsAmendOrderParams,
167 client_order_id: ClientOrderId,
168 trader_id: TraderId,
169 strategy_id: StrategyId,
170 instrument_id: InstrumentId,
171 venue_order_id: Option<VenueOrderId>,
172 },
173 CancelOrder {
174 client_order_id: Option<ClientOrderId>,
175 venue_order_id: Option<VenueOrderId>,
176 instrument_id: InstrumentId,
177 trader_id: TraderId,
178 strategy_id: StrategyId,
179 },
180 CancelAlgoOrder {
181 client_order_id: Option<ClientOrderId>,
182 algo_order_id: Option<VenueOrderId>,
183 instrument_id: InstrumentId,
184 trader_id: TraderId,
185 strategy_id: StrategyId,
186 },
187 MassCancel {
188 instrument_id: InstrumentId,
189 },
190 BatchPlaceOrders {
191 args: Vec<Value>,
192 request_id: String,
193 },
194 BatchAmendOrders {
195 args: Vec<Value>,
196 request_id: String,
197 },
198 BatchCancelOrders {
199 args: Vec<Value>,
200 request_id: String,
201 },
202}
203
204pub(super) struct OKXWsFeedHandler {
205 clock: &'static AtomicTime,
206 account_id: AccountId,
207 signal: Arc<AtomicBool>,
208 inner: Option<WebSocketClient>,
209 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
210 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
211 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
212 auth_tracker: AuthTracker,
213 subscriptions_state: SubscriptionState,
214 retry_manager: RetryManager<OKXWsError>,
215 pending_place_requests: AHashMap<String, PlaceRequestData>,
216 pending_cancel_requests: AHashMap<String, CancelRequestData>,
217 pending_amend_requests: AHashMap<String, AmendRequestData>,
218 pending_mass_cancel_requests: AHashMap<String, InstrumentId>,
219 pending_messages: VecDeque<NautilusWsMessage>,
220 active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
221 client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
222 emitted_order_accepted: AHashMap<VenueOrderId, ()>,
223 instruments_cache: AHashMap<Ustr, InstrumentAny>,
224 fee_cache: AHashMap<Ustr, Money>, filled_qty_cache: AHashMap<Ustr, Quantity>, order_state_cache: AHashMap<ClientOrderId, OrderStateSnapshot>,
227 funding_rate_cache: AHashMap<Ustr, (Ustr, u64)>, last_account_state: Option<AccountState>,
229 request_id_counter: AtomicU64,
230}
231
232impl OKXWsFeedHandler {
233 #[allow(clippy::too_many_arguments)]
235 pub fn new(
236 account_id: AccountId,
237 signal: Arc<AtomicBool>,
238 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
239 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
240 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
241 active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
242 client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
243 auth_tracker: AuthTracker,
244 subscriptions_state: SubscriptionState,
245 ) -> Self {
246 Self {
247 clock: get_atomic_clock_realtime(),
248 account_id,
249 signal,
250 inner: None,
251 cmd_rx,
252 raw_rx,
253 out_tx,
254 auth_tracker,
255 subscriptions_state,
256 retry_manager: create_websocket_retry_manager(),
257 pending_place_requests: AHashMap::new(),
258 pending_cancel_requests: AHashMap::new(),
259 pending_amend_requests: AHashMap::new(),
260 pending_mass_cancel_requests: AHashMap::new(),
261 pending_messages: VecDeque::new(),
262 active_client_orders,
263 client_id_aliases,
264 emitted_order_accepted: AHashMap::new(),
265 instruments_cache: AHashMap::new(),
266 fee_cache: AHashMap::new(),
267 filled_qty_cache: AHashMap::new(),
268 order_state_cache: AHashMap::new(),
269 funding_rate_cache: AHashMap::new(),
270 last_account_state: None,
271 request_id_counter: AtomicU64::new(0),
272 }
273 }
274
275 pub(super) fn is_stopped(&self) -> bool {
276 self.signal.load(std::sync::atomic::Ordering::Acquire)
277 }
278
279 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
280 self.out_tx.send(msg).map_err(|_| ())
281 }
282
283 async fn send_with_retry(
284 &self,
285 payload: String,
286 rate_limit_keys: Option<Vec<String>>,
287 ) -> Result<(), OKXWsError> {
288 if let Some(client) = &self.inner {
289 self.retry_manager
290 .execute_with_retry(
291 "websocket_send",
292 || {
293 let payload = payload.clone();
294 let keys = rate_limit_keys.clone();
295 async move {
296 client
297 .send_text(payload, keys)
298 .await
299 .map_err(|e| OKXWsError::ClientError(format!("Send failed: {e}")))
300 }
301 },
302 should_retry_okx_error,
303 create_okx_timeout_error,
304 )
305 .await
306 } else {
307 Err(OKXWsError::ClientError(
308 "No active WebSocket client".to_string(),
309 ))
310 }
311 }
312
313 pub(super) async fn send_pong(&self) -> anyhow::Result<()> {
314 match self.send_with_retry(TEXT_PONG.to_string(), None).await {
315 Ok(()) => {
316 tracing::trace!("Sent pong response to OKX text ping");
317 Ok(())
318 }
319 Err(e) => {
320 tracing::warn!(error = %e, "Failed to send pong after retries");
321 Err(anyhow::anyhow!("Failed to send pong: {e}"))
322 }
323 }
324 }
325
326 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
327 if let Some(message) = self.pending_messages.pop_front() {
328 return Some(message);
329 }
330
331 loop {
332 tokio::select! {
333 Some(cmd) = self.cmd_rx.recv() => {
334 match cmd {
335 HandlerCommand::SetClient(client) => {
336 tracing::debug!("Handler received WebSocket client");
337 self.inner = Some(client);
338 }
339 HandlerCommand::Disconnect => {
340 tracing::debug!("Handler disconnecting WebSocket client");
341 self.inner = None;
342 return None;
343 }
344 HandlerCommand::Authenticate { payload } => {
345 if let Err(e) = self.send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()])).await {
346 tracing::error!(error = %e, "Failed to send authentication message after retries");
347 }
348 }
349 HandlerCommand::InitializeInstruments(instruments) => {
350 for inst in instruments {
351 self.instruments_cache.insert(inst.symbol().inner(), inst);
352 }
353 }
354 HandlerCommand::UpdateInstrument(inst) => {
355 self.instruments_cache.insert(inst.symbol().inner(), inst);
356 }
357 HandlerCommand::Subscribe { args } => {
358 if let Err(e) = self.handle_subscribe(args).await {
359 tracing::error!(error = %e, "Failed to handle subscribe command");
360 }
361 }
362 HandlerCommand::Unsubscribe { args } => {
363 if let Err(e) = self.handle_unsubscribe(args).await {
364 tracing::error!(error = %e, "Failed to handle unsubscribe command");
365 }
366 }
367 HandlerCommand::CancelOrder {
368 client_order_id,
369 venue_order_id,
370 instrument_id,
371 trader_id,
372 strategy_id,
373 } => {
374 if let Err(e) = self
375 .handle_cancel_order(
376 client_order_id,
377 venue_order_id,
378 instrument_id,
379 trader_id,
380 strategy_id,
381 )
382 .await
383 {
384 tracing::error!(error = %e, "Failed to handle cancel order command");
385 }
386 }
387 HandlerCommand::CancelAlgoOrder {
388 client_order_id,
389 algo_order_id,
390 instrument_id,
391 trader_id,
392 strategy_id,
393 } => {
394 if let Err(e) = self
395 .handle_cancel_algo_order(
396 client_order_id,
397 algo_order_id,
398 instrument_id,
399 trader_id,
400 strategy_id,
401 )
402 .await
403 {
404 tracing::error!(error = %e, "Failed to handle cancel algo order command");
405 }
406 }
407 HandlerCommand::PlaceOrder {
408 params,
409 client_order_id,
410 trader_id,
411 strategy_id,
412 instrument_id,
413 } => {
414 if let Err(e) = self
415 .handle_place_order(
416 params,
417 client_order_id,
418 trader_id,
419 strategy_id,
420 instrument_id,
421 )
422 .await
423 {
424 tracing::error!(error = %e, "Failed to handle place order command");
425 }
426 }
427 HandlerCommand::PlaceAlgoOrder {
428 params,
429 client_order_id,
430 trader_id,
431 strategy_id,
432 instrument_id,
433 } => {
434 if let Err(e) = self
435 .handle_place_algo_order(
436 params,
437 client_order_id,
438 trader_id,
439 strategy_id,
440 instrument_id,
441 )
442 .await
443 {
444 tracing::error!(error = %e, "Failed to handle place algo order command");
445 }
446 }
447 HandlerCommand::AmendOrder {
448 params,
449 client_order_id,
450 trader_id,
451 strategy_id,
452 instrument_id,
453 venue_order_id,
454 } => {
455 if let Err(e) = self
456 .handle_amend_order(
457 params,
458 client_order_id,
459 trader_id,
460 strategy_id,
461 instrument_id,
462 venue_order_id,
463 )
464 .await
465 {
466 tracing::error!(error = %e, "Failed to handle amend order command");
467 }
468 }
469 HandlerCommand::MassCancel { instrument_id } => {
470 if let Err(e) = self.handle_mass_cancel(instrument_id).await {
471 tracing::error!(error = %e, "Failed to handle mass cancel command");
472 }
473 }
474 HandlerCommand::BatchCancelOrders { args, request_id } => {
475 if let Err(e) = self.handle_batch_cancel_orders(args, request_id).await {
476 tracing::error!(error = %e, "Failed to handle batch cancel orders command");
477 }
478 }
479 HandlerCommand::BatchPlaceOrders { args, request_id } => {
480 if let Err(e) = self.handle_batch_place_orders(args, request_id).await {
481 tracing::error!(error = %e, "Failed to handle batch place orders command");
482 }
483 }
484 HandlerCommand::BatchAmendOrders { args, request_id } => {
485 if let Err(e) = self.handle_batch_amend_orders(args, request_id).await {
486 tracing::error!(error = %e, "Failed to handle batch amend orders command");
487 }
488 }
489 }
490 continue;
492 }
493
494 _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
495 if self.signal.load(std::sync::atomic::Ordering::Acquire) {
496 tracing::debug!("Stop signal received during idle period");
497 return None;
498 }
499 continue;
500 }
501
502 msg = self.raw_rx.recv() => {
503 let event = match msg {
504 Some(msg) => match Self::parse_raw_message(msg) {
505 Some(event) => event,
506 None => continue,
507 },
508 None => {
509 tracing::debug!("WebSocket stream closed");
510 return None;
511 }
512 };
513
514 let ts_init = self.clock.get_time_ns();
515
516 match event {
517 OKXWsMessage::Ping => {
518 if let Err(e) = self.send_pong().await {
519 tracing::warn!(error = %e, "Failed to send pong response");
520 }
521 continue;
522 }
523 OKXWsMessage::Login {
524 code, msg, conn_id, ..
525 } => {
526 if code == "0" {
527 self.auth_tracker.succeed();
528
529 return Some(NautilusWsMessage::Authenticated);
533 }
534
535 tracing::error!(error = %msg, "WebSocket authentication failed");
536 self.auth_tracker.fail(msg.clone());
537
538 let error = OKXWebSocketError {
539 code,
540 message: msg,
541 conn_id: Some(conn_id),
542 timestamp: self.clock.get_time_ns().as_u64(),
543 };
544 self.pending_messages
545 .push_back(NautilusWsMessage::Error(error));
546 continue;
547 }
548 OKXWsMessage::BookData { arg, action, data } => {
549 if let Some(msg) = self.handle_book_data(arg, action, data, ts_init) {
550 return Some(msg);
551 }
552 continue;
553 }
554 OKXWsMessage::OrderResponse {
555 id,
556 op,
557 code,
558 msg,
559 data,
560 } => {
561 if let Some(msg) = self.handle_order_response(id, op, code, msg, data, ts_init) {
562 return Some(msg);
563 }
564 continue;
565 }
566 OKXWsMessage::Data { arg, data } => {
567 let OKXWebSocketArg {
568 channel, inst_id, ..
569 } = arg;
570
571 match channel {
572 OKXWsChannel::Account => {
573 if let Some(msg) = self.handle_account_data(data, ts_init) {
574 return Some(msg);
575 }
576 continue;
577 }
578 OKXWsChannel::Positions => {
579 self.handle_positions_data(data, ts_init);
580 continue;
581 }
582 OKXWsChannel::Orders => {
583 if let Some(msg) = self.handle_orders_data(data, ts_init) {
584 return Some(msg);
585 }
586 continue;
587 }
588 OKXWsChannel::OrdersAlgo => {
589 if let Some(msg) = self.handle_algo_orders_data(data, ts_init) {
590 return Some(msg);
591 }
592 continue;
593 }
594 _ => {
595 if let Some(msg) =
596 self.handle_other_channel_data(channel, inst_id, data, ts_init)
597 {
598 return Some(msg);
599 }
600 continue;
601 }
602 }
603 }
604 OKXWsMessage::Error { code, msg } => {
605 let error = OKXWebSocketError {
606 code,
607 message: msg,
608 conn_id: None,
609 timestamp: self.clock.get_time_ns().as_u64(),
610 };
611 return Some(NautilusWsMessage::Error(error));
612 }
613 OKXWsMessage::Reconnected => {
614 return Some(NautilusWsMessage::Reconnected);
615 }
616 OKXWsMessage::Subscription {
617 event,
618 arg,
619 code,
620 msg,
621 ..
622 } => {
623 let topic = topic_from_websocket_arg(&arg);
624 let success = code.as_deref().is_none_or(|c| c == "0");
625
626 match event {
627 OKXSubscriptionEvent::Subscribe => {
628 if success {
629 self.subscriptions_state.confirm_subscribe(&topic);
630 } else {
631 tracing::warn!(?topic, error = ?msg, code = ?code, "Subscription failed");
632 self.subscriptions_state.mark_failure(&topic);
633 }
634 }
635 OKXSubscriptionEvent::Unsubscribe => {
636 if success {
637 self.subscriptions_state.confirm_unsubscribe(&topic);
638 } else {
639 tracing::warn!(?topic, error = ?msg, code = ?code, "Unsubscription failed - restoring subscription");
640 self.subscriptions_state.confirm_unsubscribe(&topic); self.subscriptions_state.mark_subscribe(&topic); self.subscriptions_state.confirm_subscribe(&topic); }
645 }
646 }
647
648 continue;
649 }
650 OKXWsMessage::ChannelConnCount { .. } => continue,
651 }
652 }
653
654 else => {
656 tracing::debug!("Handler shutting down: stream ended or command channel closed");
657 return None;
658 }
659 }
660 }
661 }
662
663 pub(super) fn is_post_only_auto_cancel(msg: &OKXOrderMsg) -> bool {
664 if msg.state != OKXOrderStatus::Canceled {
665 return false;
666 }
667
668 let cancel_source_matches = matches!(
669 msg.cancel_source.as_deref(),
670 Some(source) if source == OKX_POST_ONLY_CANCEL_SOURCE
671 );
672
673 let reason_matches = matches!(
674 msg.cancel_source_reason.as_deref(),
675 Some(reason) if reason.contains("POST_ONLY")
676 );
677
678 if !(cancel_source_matches || reason_matches) {
679 return false;
680 }
681
682 msg.acc_fill_sz
683 .as_ref()
684 .is_none_or(|filled| filled == "0" || filled.is_empty())
685 }
686
687 fn try_handle_post_only_auto_cancel(
688 &mut self,
689 msg: &OKXOrderMsg,
690 ts_init: UnixNanos,
691 exec_reports: &mut Vec<ExecutionReport>,
692 ) -> bool {
693 if !Self::is_post_only_auto_cancel(msg) {
694 return false;
695 }
696
697 let Some(client_order_id) = parse_client_order_id(&msg.cl_ord_id) else {
698 return false;
699 };
700
701 let Some((_, (trader_id, strategy_id, instrument_id))) =
702 self.active_client_orders.remove(&client_order_id)
703 else {
704 return false;
705 };
706
707 self.client_id_aliases.remove(&client_order_id);
708
709 if !exec_reports.is_empty() {
710 let reports = std::mem::take(exec_reports);
711 self.pending_messages
712 .push_back(NautilusWsMessage::ExecutionReports(reports));
713 }
714
715 let reason = msg
716 .cancel_source_reason
717 .as_ref()
718 .filter(|reason| !reason.is_empty())
719 .map_or_else(
720 || Ustr::from(OKX_POST_ONLY_CANCEL_REASON),
721 |reason| Ustr::from(reason.as_str()),
722 );
723
724 let ts_event = parse_millisecond_timestamp(msg.u_time);
725 let rejected = OrderRejected::new(
726 trader_id,
727 strategy_id,
728 instrument_id,
729 client_order_id,
730 self.account_id,
731 reason,
732 UUID4::new(),
733 ts_event,
734 ts_init,
735 false,
736 true,
737 );
738
739 self.pending_messages
740 .push_back(NautilusWsMessage::OrderRejected(rejected));
741
742 true
743 }
744
745 fn register_client_order_aliases(
746 &self,
747 raw_child: &Option<ClientOrderId>,
748 parent_from_msg: &Option<ClientOrderId>,
749 ) -> Option<ClientOrderId> {
750 if let Some(parent) = parent_from_msg {
751 self.client_id_aliases.insert(*parent, *parent);
752 if let Some(child) = raw_child.as_ref().filter(|child| **child != *parent) {
753 self.client_id_aliases.insert(*child, *parent);
754 }
755 Some(*parent)
756 } else if let Some(child) = raw_child.as_ref() {
757 if let Some(mapped) = self.client_id_aliases.get(child) {
758 Some(*mapped.value())
759 } else {
760 self.client_id_aliases.insert(*child, *child);
761 Some(*child)
762 }
763 } else {
764 None
765 }
766 }
767
768 fn adjust_execution_report(
769 &self,
770 report: ExecutionReport,
771 effective_client_id: &Option<ClientOrderId>,
772 raw_child: &Option<ClientOrderId>,
773 ) -> ExecutionReport {
774 match report {
775 ExecutionReport::Order(status_report) => {
776 let mut adjusted = status_report;
777 let mut final_id = *effective_client_id;
778
779 if final_id.is_none() {
780 final_id = adjusted.client_order_id;
781 }
782
783 if final_id.is_none()
784 && let Some(child) = raw_child.as_ref()
785 && let Some(mapped) = self.client_id_aliases.get(child)
786 {
787 final_id = Some(*mapped.value());
788 }
789
790 if let Some(final_id_value) = final_id {
791 if adjusted.client_order_id != Some(final_id_value) {
792 adjusted = adjusted.with_client_order_id(final_id_value);
793 }
794 self.client_id_aliases
795 .insert(final_id_value, final_id_value);
796
797 if let Some(child) =
798 raw_child.as_ref().filter(|child| **child != final_id_value)
799 {
800 adjusted = adjusted.with_linked_order_ids(vec![*child]);
801 }
802 }
803
804 ExecutionReport::Order(adjusted)
805 }
806 ExecutionReport::Fill(mut fill_report) => {
807 let mut final_id = *effective_client_id;
808 if final_id.is_none() {
809 final_id = fill_report.client_order_id;
810 }
811 if final_id.is_none()
812 && let Some(child) = raw_child.as_ref()
813 && let Some(mapped) = self.client_id_aliases.get(child)
814 {
815 final_id = Some(*mapped.value());
816 }
817
818 if let Some(final_id_value) = final_id {
819 fill_report.client_order_id = Some(final_id_value);
820 self.client_id_aliases
821 .insert(final_id_value, final_id_value);
822 }
823
824 ExecutionReport::Fill(fill_report)
825 }
826 }
827 }
828
829 fn update_caches_with_report(&mut self, report: &ExecutionReport) {
830 match report {
831 ExecutionReport::Fill(fill_report) => {
832 let order_id = fill_report.venue_order_id.inner();
833 let current_fee = self
834 .fee_cache
835 .get(&order_id)
836 .copied()
837 .unwrap_or_else(|| Money::new(0.0, fill_report.commission.currency));
838 let total_fee = current_fee + fill_report.commission;
839 self.fee_cache.insert(order_id, total_fee);
840
841 let current_filled_qty = self
842 .filled_qty_cache
843 .get(&order_id)
844 .copied()
845 .unwrap_or_else(|| Quantity::zero(fill_report.last_qty.precision));
846 let total_filled_qty = current_filled_qty + fill_report.last_qty;
847 self.filled_qty_cache.insert(order_id, total_filled_qty);
848 }
849 ExecutionReport::Order(status_report) => {
850 if matches!(status_report.order_status, OrderStatus::Filled) {
851 self.fee_cache.remove(&status_report.venue_order_id.inner());
852 self.filled_qty_cache
853 .remove(&status_report.venue_order_id.inner());
854 }
855
856 if matches!(
857 status_report.order_status,
858 OrderStatus::Canceled
859 | OrderStatus::Expired
860 | OrderStatus::Filled
861 | OrderStatus::Rejected,
862 ) {
863 self.emitted_order_accepted
864 .remove(&status_report.venue_order_id);
865 if let Some(client_order_id) = status_report.client_order_id {
866 self.order_state_cache.remove(&client_order_id);
867 self.active_client_orders.remove(&client_order_id);
868 self.client_id_aliases.remove(&client_order_id);
869 }
870 if let Some(linked) = &status_report.linked_order_ids {
871 for child in linked {
872 self.client_id_aliases.remove(child);
873 }
874 }
875 }
876 }
877 }
878 }
879
880 #[allow(clippy::too_many_lines)]
881 fn handle_order_response(
882 &mut self,
883 id: Option<String>,
884 op: OKXWsOperation,
885 code: String,
886 msg: String,
887 data: Vec<Value>,
888 ts_init: UnixNanos,
889 ) -> Option<NautilusWsMessage> {
890 if code == "0" {
891 tracing::debug!("Order operation successful: id={id:?} op={op} code={code}");
892
893 if op == OKXWsOperation::BatchCancelOrders {
894 tracing::debug!(
895 "Batch cancel operation successful: id={id:?} cancel_count={}",
896 data.len()
897 );
898
899 for (idx, entry) in data.iter().enumerate() {
901 if let Some(entry_code) = entry.get("sCode").and_then(|v| v.as_str())
902 && entry_code != "0"
903 {
904 let entry_msg = entry
905 .get("sMsg")
906 .and_then(|v| v.as_str())
907 .unwrap_or("Unknown error");
908
909 if let Some(cl_ord_id_str) = entry
910 .get("clOrdId")
911 .and_then(|v| v.as_str())
912 .filter(|s| !s.is_empty())
913 {
914 tracing::error!(
915 "Batch cancel partial failure for order {}: sCode={} sMsg={}",
916 cl_ord_id_str,
917 entry_code,
918 entry_msg
919 );
920 } else {
922 tracing::error!(
923 "Batch cancel entry[{}] failed: sCode={} sMsg={} data={:?}",
924 idx,
925 entry_code,
926 entry_msg,
927 entry
928 );
929 }
930 }
931 }
932
933 return None;
934 } else if op == OKXWsOperation::MassCancel
935 && let Some(request_id) = &id
936 && let Some(instrument_id) = self.pending_mass_cancel_requests.remove(request_id)
937 {
938 tracing::debug!(
939 "Mass cancel operation successful for instrument: {}",
940 instrument_id
941 );
942 } else if op == OKXWsOperation::Order
943 && let Some(request_id) = &id
944 && let Some((params, client_order_id, trader_id, strategy_id, instrument_id)) =
945 self.pending_place_requests.remove(request_id)
946 {
947 let (venue_order_id, ts_accepted) = if let Some(first) = data.first() {
948 let ord_id = first
949 .get("ordId")
950 .and_then(|v| v.as_str())
951 .filter(|s| !s.is_empty())
952 .map(VenueOrderId::new);
953
954 let ts = first
955 .get("ts")
956 .and_then(|v| v.as_str())
957 .and_then(|s| s.parse::<u64>().ok())
958 .map_or_else(
959 || self.clock.get_time_ns(),
960 |ms| UnixNanos::from(ms * 1_000_000),
961 );
962
963 (ord_id, ts)
964 } else {
965 (None, self.clock.get_time_ns())
966 };
967
968 if let Some(instrument) = self.instruments_cache.get(&instrument_id.symbol.inner())
969 {
970 match params {
971 PendingOrderParams::Regular(order_params) => {
972 let order_type = determine_order_type(
973 order_params.ord_type,
974 order_params.px.as_deref().unwrap_or(""),
975 );
976
977 let is_explicit_quote_sized = order_params
978 .tgt_ccy
979 .is_some_and(|tgt| tgt == OKXTargetCurrency::QuoteCcy);
980
981 let is_implicit_quote_sized = order_params.tgt_ccy.is_none()
983 && order_params.side == OKXSide::Buy
984 && order_type == OrderType::Market
985 && order_params.td_mode == OKXTradeMode::Cash
986 && instrument.instrument_class().as_ref() == "SPOT";
987
988 if is_explicit_quote_sized || is_implicit_quote_sized {
989 tracing::debug!(
994 "Skipping synthetic OrderAccepted for {} quote-sized order: client_order_id={client_order_id}, venue_order_id={venue_order_id:?}",
995 if is_explicit_quote_sized {
996 "explicit"
997 } else {
998 "implicit"
999 },
1000 );
1001 return None;
1002 }
1003
1004 let Some(v_order_id) = venue_order_id else {
1005 tracing::error!(
1006 "No venue_order_id for accepted order: client_order_id={client_order_id}"
1007 );
1008 return None;
1009 };
1010
1011 if self.emitted_order_accepted.contains_key(&v_order_id) {
1013 tracing::debug!(
1014 "Skipping duplicate OrderAccepted from operation response for venue_order_id={v_order_id}"
1015 );
1016 return None;
1017 }
1018 self.emitted_order_accepted.insert(v_order_id, ());
1019
1020 let accepted = OrderAccepted::new(
1021 trader_id,
1022 strategy_id,
1023 instrument_id,
1024 client_order_id,
1025 v_order_id,
1026 self.account_id,
1027 UUID4::new(),
1028 ts_accepted,
1029 ts_init,
1030 false, );
1032
1033 tracing::debug!(
1034 "Order accepted: client_order_id={client_order_id}, venue_order_id={v_order_id}"
1035 );
1036
1037 return Some(NautilusWsMessage::OrderAccepted(accepted));
1038 }
1039 PendingOrderParams::Algo(_) => {
1040 tracing::debug!(
1041 "Algo order placement confirmed: client_order_id={client_order_id}, venue_order_id={:?}",
1042 venue_order_id
1043 );
1044 }
1045 }
1046 } else {
1047 tracing::error!("Instrument not found for accepted order: {instrument_id}");
1048 }
1049 }
1050
1051 if let Some(first) = data.first()
1052 && let Some(success_msg) = first.get("sMsg").and_then(|value| value.as_str())
1053 {
1054 tracing::debug!("Order details: {success_msg}");
1055 }
1056
1057 return None;
1058 }
1059
1060 let error_msg = data
1061 .first()
1062 .and_then(|d| d.get("sMsg"))
1063 .and_then(|s| s.as_str())
1064 .unwrap_or(&msg)
1065 .to_string();
1066
1067 if let Some(first) = data.first() {
1068 tracing::debug!(
1069 "Error data fields: {}",
1070 serde_json::to_string_pretty(first)
1071 .unwrap_or_else(|_| "unable to serialize".to_string())
1072 );
1073 }
1074
1075 tracing::warn!("Order operation failed: id={id:?} op={op} code={code} msg={error_msg}");
1076
1077 let ts_event = self.clock.get_time_ns();
1078
1079 if let Some(request_id) = &id {
1080 match op {
1081 OKXWsOperation::Order => {
1082 if let Some((_params, client_order_id, trader_id, strategy_id, instrument_id)) =
1083 self.pending_place_requests.remove(request_id)
1084 {
1085 let due_post_only = is_post_only_rejection(code.as_str(), &data);
1086 let rejected = OrderRejected::new(
1087 trader_id,
1088 strategy_id,
1089 instrument_id,
1090 client_order_id,
1091 self.account_id,
1092 Ustr::from(error_msg.as_str()),
1093 UUID4::new(),
1094 ts_event,
1095 ts_init,
1096 false, due_post_only,
1098 );
1099
1100 return Some(NautilusWsMessage::OrderRejected(rejected));
1101 }
1102 }
1103 OKXWsOperation::CancelOrder => {
1104 if let Some((
1105 client_order_id,
1106 trader_id,
1107 strategy_id,
1108 instrument_id,
1109 venue_order_id,
1110 )) = self.pending_cancel_requests.remove(request_id)
1111 {
1112 let rejected = OrderCancelRejected::new(
1113 trader_id,
1114 strategy_id,
1115 instrument_id,
1116 client_order_id,
1117 Ustr::from(error_msg.as_str()),
1118 UUID4::new(),
1119 ts_event,
1120 ts_init,
1121 false, venue_order_id,
1123 Some(self.account_id),
1124 );
1125
1126 return Some(NautilusWsMessage::OrderCancelRejected(rejected));
1127 }
1128 }
1129 OKXWsOperation::AmendOrder => {
1130 if let Some((
1131 client_order_id,
1132 trader_id,
1133 strategy_id,
1134 instrument_id,
1135 venue_order_id,
1136 )) = self.pending_amend_requests.remove(request_id)
1137 {
1138 let rejected = OrderModifyRejected::new(
1139 trader_id,
1140 strategy_id,
1141 instrument_id,
1142 client_order_id,
1143 Ustr::from(error_msg.as_str()),
1144 UUID4::new(),
1145 ts_event,
1146 ts_init,
1147 false, venue_order_id,
1149 Some(self.account_id),
1150 );
1151
1152 return Some(NautilusWsMessage::OrderModifyRejected(rejected));
1153 }
1154 }
1155 OKXWsOperation::OrderAlgo => {
1156 if let Some((_params, client_order_id, trader_id, strategy_id, instrument_id)) =
1157 self.pending_place_requests.remove(request_id)
1158 {
1159 let due_post_only = is_post_only_rejection(code.as_str(), &data);
1160 let rejected = OrderRejected::new(
1161 trader_id,
1162 strategy_id,
1163 instrument_id,
1164 client_order_id,
1165 self.account_id,
1166 Ustr::from(error_msg.as_str()),
1167 UUID4::new(),
1168 ts_event,
1169 ts_init,
1170 false, due_post_only,
1172 );
1173
1174 return Some(NautilusWsMessage::OrderRejected(rejected));
1175 }
1176 }
1177 OKXWsOperation::CancelAlgos => {
1178 if let Some((
1179 client_order_id,
1180 trader_id,
1181 strategy_id,
1182 instrument_id,
1183 venue_order_id,
1184 )) = self.pending_cancel_requests.remove(request_id)
1185 {
1186 let rejected = OrderCancelRejected::new(
1187 trader_id,
1188 strategy_id,
1189 instrument_id,
1190 client_order_id,
1191 Ustr::from(error_msg.as_str()),
1192 UUID4::new(),
1193 ts_event,
1194 ts_init,
1195 false, venue_order_id,
1197 Some(self.account_id),
1198 );
1199
1200 return Some(NautilusWsMessage::OrderCancelRejected(rejected));
1201 }
1202 }
1203 OKXWsOperation::MassCancel => {
1204 if let Some(instrument_id) =
1205 self.pending_mass_cancel_requests.remove(request_id)
1206 {
1207 tracing::error!(
1208 "Mass cancel operation failed for {}: code={code} msg={error_msg}",
1209 instrument_id
1210 );
1211 let error = OKXWebSocketError {
1212 code,
1213 message: format!("Mass cancel failed for {instrument_id}: {error_msg}"),
1214 conn_id: None,
1215 timestamp: ts_event.as_u64(),
1216 };
1217 return Some(NautilusWsMessage::Error(error));
1218 } else {
1219 tracing::error!(
1220 "Mass cancel operation failed: code={code} msg={error_msg}"
1221 );
1222 }
1223 }
1224 OKXWsOperation::BatchCancelOrders => {
1225 tracing::warn!(
1226 "Batch cancel operation failed: id={id:?} code={code} msg={error_msg} data_count={}",
1227 data.len()
1228 );
1229
1230 for (idx, entry) in data.iter().enumerate() {
1232 let entry_code =
1233 entry.get("sCode").and_then(|v| v.as_str()).unwrap_or(&code);
1234 let entry_msg = entry
1235 .get("sMsg")
1236 .and_then(|v| v.as_str())
1237 .unwrap_or(&error_msg);
1238
1239 if entry_code != "0" {
1240 if let Some(cl_ord_id_str) = entry
1242 .get("clOrdId")
1243 .and_then(|v| v.as_str())
1244 .filter(|s| !s.is_empty())
1245 {
1246 tracing::error!(
1247 "Batch cancel failed for order {}: sCode={} sMsg={}",
1248 cl_ord_id_str,
1249 entry_code,
1250 entry_msg
1251 );
1252 } else {
1255 tracing::error!(
1256 "Batch cancel entry[{}] failed: sCode={} sMsg={} data={:?}",
1257 idx,
1258 entry_code,
1259 entry_msg,
1260 entry
1261 );
1262 }
1263 }
1264 }
1265
1266 let error = OKXWebSocketError {
1268 code,
1269 message: format!("Batch cancel failed: {error_msg}"),
1270 conn_id: None,
1271 timestamp: ts_event.as_u64(),
1272 };
1273 return Some(NautilusWsMessage::Error(error));
1274 }
1275 _ => tracing::warn!("Unhandled operation type for rejection: {op}"),
1276 }
1277 }
1278
1279 let error = OKXWebSocketError {
1280 code,
1281 message: error_msg,
1282 conn_id: None,
1283 timestamp: ts_event.as_u64(),
1284 };
1285 Some(NautilusWsMessage::Error(error))
1286 }
1287
1288 fn handle_book_data(
1289 &self,
1290 arg: OKXWebSocketArg,
1291 action: OKXBookAction,
1292 data: Vec<OKXBookMsg>,
1293 ts_init: UnixNanos,
1294 ) -> Option<NautilusWsMessage> {
1295 let Some(inst_id) = arg.inst_id else {
1296 tracing::error!("Instrument ID missing for book data event");
1297 return None;
1298 };
1299
1300 let inst = self.instruments_cache.get(&inst_id)?;
1301
1302 let instrument_id = inst.id();
1303 let price_precision = inst.price_precision();
1304 let size_precision = inst.size_precision();
1305
1306 match parse_book_msg_vec(
1307 data,
1308 &instrument_id,
1309 price_precision,
1310 size_precision,
1311 action,
1312 ts_init,
1313 ) {
1314 Ok(payloads) => Some(NautilusWsMessage::Data(payloads)),
1315 Err(e) => {
1316 tracing::error!("Failed to parse book message: {e}");
1317 None
1318 }
1319 }
1320 }
1321
1322 fn handle_account_data(
1323 &mut self,
1324 data: Value,
1325 ts_init: UnixNanos,
1326 ) -> Option<NautilusWsMessage> {
1327 let Value::Array(arr) = data else {
1328 tracing::error!("Account data is not an array");
1329 return None;
1330 };
1331
1332 let first = arr.into_iter().next()?;
1333
1334 let account: OKXAccount = match serde_json::from_value(first) {
1335 Ok(acc) => acc,
1336 Err(e) => {
1337 tracing::error!("Failed to parse account data: {e}");
1338 return None;
1339 }
1340 };
1341
1342 match parse_account_state(&account, self.account_id, ts_init) {
1343 Ok(account_state) => {
1344 if let Some(last_account_state) = &self.last_account_state
1345 && account_state.has_same_balances_and_margins(last_account_state)
1346 {
1347 return None;
1348 }
1349 self.last_account_state = Some(account_state.clone());
1350 Some(NautilusWsMessage::AccountUpdate(account_state))
1351 }
1352 Err(e) => {
1353 tracing::error!("Failed to parse account state: {e}");
1354 None
1355 }
1356 }
1357 }
1358
1359 fn handle_positions_data(&mut self, data: Value, ts_init: UnixNanos) {
1360 match serde_json::from_value::<Vec<OKXPosition>>(data) {
1361 Ok(positions) => {
1362 tracing::debug!("Received {} position update(s)", positions.len());
1363
1364 for position in positions {
1365 let instrument = match self.instruments_cache.get(&position.inst_id) {
1366 Some(inst) => inst,
1367 None => {
1368 tracing::warn!(
1369 "Received position update for unknown instrument {}, skipping",
1370 position.inst_id
1371 );
1372 continue;
1373 }
1374 };
1375
1376 let instrument_id = instrument.id();
1377 let size_precision = instrument.size_precision();
1378
1379 match parse_position_status_report(
1380 position,
1381 self.account_id,
1382 instrument_id,
1383 size_precision,
1384 ts_init,
1385 ) {
1386 Ok(position_report) => {
1387 self.pending_messages
1388 .push_back(NautilusWsMessage::PositionUpdate(position_report));
1389 }
1390 Err(e) => {
1391 tracing::error!(
1392 "Failed to parse position status report for {}: {e}",
1393 instrument_id
1394 );
1395 }
1396 }
1397 }
1398 }
1399 Err(e) => {
1400 tracing::error!("Failed to parse positions data: {e}");
1401 }
1402 }
1403 }
1404
1405 fn handle_orders_data(&mut self, data: Value, ts_init: UnixNanos) -> Option<NautilusWsMessage> {
1406 let orders: Vec<OKXOrderMsg> = match serde_json::from_value(data) {
1407 Ok(orders) => orders,
1408 Err(e) => {
1409 tracing::error!("Failed to deserialize orders channel payload: {e}");
1410 return None;
1411 }
1412 };
1413
1414 tracing::debug!(
1415 "Received {} order message(s) from orders channel",
1416 orders.len()
1417 );
1418
1419 let mut exec_reports: Vec<ExecutionReport> = Vec::with_capacity(orders.len());
1420
1421 for msg in orders {
1422 tracing::debug!(
1423 "Processing order message: inst_id={}, cl_ord_id={}, state={:?}, exec_type={:?}",
1424 msg.inst_id,
1425 msg.cl_ord_id,
1426 msg.state,
1427 msg.exec_type
1428 );
1429
1430 if self.try_handle_post_only_auto_cancel(&msg, ts_init, &mut exec_reports) {
1431 continue;
1432 }
1433
1434 let raw_child = parse_client_order_id(&msg.cl_ord_id);
1435 let parent_from_msg = msg
1436 .algo_cl_ord_id
1437 .as_ref()
1438 .filter(|value| !value.is_empty())
1439 .map(ClientOrderId::new);
1440 let effective_client_id =
1441 self.register_client_order_aliases(&raw_child, &parent_from_msg);
1442
1443 let Some(instrument) = self.instruments_cache.get(&msg.inst_id) else {
1444 tracing::error!(
1445 "No instrument found for inst_id: {inst_id}",
1446 inst_id = msg.inst_id
1447 );
1448 continue;
1449 };
1450 let price_precision = instrument.price_precision();
1451 let size_precision = instrument.size_precision();
1452
1453 let order_metadata = effective_client_id
1454 .and_then(|cid| self.active_client_orders.get(&cid).map(|e| *e.value()));
1455
1456 let previous_fee = self.fee_cache.get(&msg.ord_id).copied();
1457 let previous_filled_qty = self.filled_qty_cache.get(&msg.ord_id).copied();
1458 let previous_state =
1459 effective_client_id.and_then(|cid| self.order_state_cache.get(&cid).cloned());
1460
1461 if let (Some((trader_id, strategy_id, _instrument_id)), Some(canonical_client_id)) =
1463 (order_metadata, effective_client_id)
1464 {
1465 match parse_order_event(
1466 &msg,
1467 canonical_client_id,
1468 self.account_id,
1469 trader_id,
1470 strategy_id,
1471 instrument,
1472 previous_fee,
1473 previous_filled_qty,
1474 previous_state.as_ref(),
1475 ts_init,
1476 ) {
1477 Ok(event) => {
1478 self.process_parsed_order_event(
1479 event,
1480 &msg,
1481 price_precision,
1482 size_precision,
1483 canonical_client_id,
1484 &raw_child,
1485 &mut exec_reports,
1486 );
1487 }
1488 Err(e) => tracing::error!("Failed to parse order event: {e}"),
1489 }
1490 } else {
1491 match parse_order_msg(
1493 &msg,
1494 self.account_id,
1495 &self.instruments_cache,
1496 &self.fee_cache,
1497 &self.filled_qty_cache,
1498 ts_init,
1499 ) {
1500 Ok(report) => {
1501 tracing::debug!("Parsed external order as execution report: {report:?}");
1502 let adjusted =
1503 self.adjust_execution_report(report, &effective_client_id, &raw_child);
1504 self.update_caches_with_report(&adjusted);
1505 exec_reports.push(adjusted);
1506 }
1507 Err(e) => tracing::error!("Failed to parse order message: {e}"),
1508 }
1509 }
1510 }
1511
1512 if !exec_reports.is_empty() {
1513 tracing::debug!(
1514 "Pushing {count} execution report(s) to message queue",
1515 count = exec_reports.len()
1516 );
1517 self.pending_messages
1518 .push_back(NautilusWsMessage::ExecutionReports(exec_reports));
1519 }
1520
1521 self.pending_messages.pop_front()
1522 }
1523
1524 #[allow(clippy::too_many_arguments)]
1526 fn process_parsed_order_event(
1527 &mut self,
1528 event: ParsedOrderEvent,
1529 msg: &OKXOrderMsg,
1530 price_precision: u8,
1531 size_precision: u8,
1532 canonical_client_id: ClientOrderId,
1533 raw_child: &Option<ClientOrderId>,
1534 exec_reports: &mut Vec<ExecutionReport>,
1535 ) {
1536 let venue_order_id = VenueOrderId::new(msg.ord_id);
1537
1538 match event {
1539 ParsedOrderEvent::Accepted(accepted) => {
1540 if self.emitted_order_accepted.contains_key(&venue_order_id) {
1541 tracing::debug!(
1542 "Skipping duplicate OrderAccepted for venue_order_id={venue_order_id}"
1543 );
1544 return;
1545 }
1546 self.emitted_order_accepted.insert(venue_order_id, ());
1547 self.update_order_state_cache(
1548 &canonical_client_id,
1549 msg,
1550 price_precision,
1551 size_precision,
1552 );
1553
1554 self.pending_messages
1555 .push_back(NautilusWsMessage::OrderAccepted(accepted));
1556 }
1557 ParsedOrderEvent::Canceled(canceled) => {
1558 self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1559 self.pending_messages
1560 .push_back(NautilusWsMessage::OrderCanceled(canceled));
1561 }
1562 ParsedOrderEvent::Expired(expired) => {
1563 self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1564 self.pending_messages
1565 .push_back(NautilusWsMessage::OrderExpired(expired));
1566 }
1567 ParsedOrderEvent::Triggered(triggered) => {
1568 self.update_order_state_cache(
1569 &canonical_client_id,
1570 msg,
1571 price_precision,
1572 size_precision,
1573 );
1574 self.pending_messages
1575 .push_back(NautilusWsMessage::OrderTriggered(triggered));
1576 }
1577 ParsedOrderEvent::Updated(updated) => {
1578 self.update_order_state_cache(
1579 &canonical_client_id,
1580 msg,
1581 price_precision,
1582 size_precision,
1583 );
1584 self.pending_messages
1585 .push_back(NautilusWsMessage::OrderUpdated(updated));
1586 }
1587 ParsedOrderEvent::Fill(fill_report) => {
1588 let effective_client_id = Some(canonical_client_id);
1589 let adjusted = self.adjust_execution_report(
1590 ExecutionReport::Fill(fill_report),
1591 &effective_client_id,
1592 raw_child,
1593 );
1594 self.update_caches_with_report(&adjusted);
1595
1596 if msg.state == OKXOrderStatus::Filled {
1597 self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1598 }
1599
1600 exec_reports.push(adjusted);
1601 }
1602 ParsedOrderEvent::StatusOnly(status_report) => {
1603 let effective_client_id = Some(canonical_client_id);
1604 let adjusted = self.adjust_execution_report(
1605 ExecutionReport::Order(*status_report),
1606 &effective_client_id,
1607 raw_child,
1608 );
1609 self.update_caches_with_report(&adjusted);
1610 exec_reports.push(adjusted);
1611 }
1612 }
1613 }
1614
1615 fn update_order_state_cache(
1617 &mut self,
1618 client_order_id: &ClientOrderId,
1619 msg: &OKXOrderMsg,
1620 price_precision: u8,
1621 size_precision: u8,
1622 ) {
1623 let venue_order_id = VenueOrderId::new(msg.ord_id);
1624 let quantity = parse_quantity(&msg.sz, size_precision).ok();
1625 let price = if !is_market_price(&msg.px) {
1626 parse_price(&msg.px, price_precision).ok()
1627 } else {
1628 None
1629 };
1630
1631 if let Some(qty) = quantity {
1632 self.order_state_cache.insert(
1633 *client_order_id,
1634 OrderStateSnapshot {
1635 venue_order_id,
1636 quantity: qty,
1637 price,
1638 },
1639 );
1640 }
1641 }
1642
1643 fn cleanup_terminal_order(
1645 &mut self,
1646 client_order_id: &ClientOrderId,
1647 venue_order_id: &VenueOrderId,
1648 ) {
1649 self.emitted_order_accepted.remove(venue_order_id);
1650 self.order_state_cache.remove(client_order_id);
1651 self.active_client_orders.remove(client_order_id);
1652 self.client_id_aliases.remove(client_order_id);
1653 self.client_id_aliases.retain(|_, v| *v != *client_order_id);
1654
1655 self.fee_cache.remove(&venue_order_id.inner());
1656 self.filled_qty_cache.remove(&venue_order_id.inner());
1657 }
1658
1659 fn handle_algo_orders_data(
1660 &mut self,
1661 data: Value,
1662 ts_init: UnixNanos,
1663 ) -> Option<NautilusWsMessage> {
1664 let orders: Vec<OKXAlgoOrderMsg> = match serde_json::from_value(data) {
1665 Ok(orders) => orders,
1666 Err(e) => {
1667 tracing::error!("Failed to deserialize algo orders payload: {e}");
1668 return None;
1669 }
1670 };
1671
1672 let mut exec_reports: Vec<ExecutionReport> = Vec::with_capacity(orders.len());
1673
1674 for msg in orders {
1675 let raw_child = parse_client_order_id(&msg.cl_ord_id);
1676 let parent_from_msg = parse_client_order_id(&msg.algo_cl_ord_id);
1677 let effective_client_id =
1678 self.register_client_order_aliases(&raw_child, &parent_from_msg);
1679
1680 match parse_algo_order_msg(msg, self.account_id, &self.instruments_cache, ts_init) {
1681 Ok(report) => {
1682 let adjusted =
1683 self.adjust_execution_report(report, &effective_client_id, &raw_child);
1684 self.update_caches_with_report(&adjusted);
1685 exec_reports.push(adjusted);
1686 }
1687 Err(e) => {
1688 tracing::error!("Failed to parse algo order message: {e}");
1689 }
1690 }
1691 }
1692
1693 if !exec_reports.is_empty() {
1694 Some(NautilusWsMessage::ExecutionReports(exec_reports))
1695 } else {
1696 None
1697 }
1698 }
1699
1700 fn handle_other_channel_data(
1701 &mut self,
1702 channel: OKXWsChannel,
1703 inst_id: Option<Ustr>,
1704 data: Value,
1705 ts_init: UnixNanos,
1706 ) -> Option<NautilusWsMessage> {
1707 let Some(inst_id) = inst_id else {
1708 tracing::error!("No instrument for channel {:?}", channel);
1709 return None;
1710 };
1711
1712 let Some(instrument) = self.instruments_cache.get(&inst_id) else {
1713 tracing::error!(
1714 "No instrument for channel {:?}, inst_id {:?}",
1715 channel,
1716 inst_id
1717 );
1718 return None;
1719 };
1720
1721 let instrument_id = instrument.id();
1722 let price_precision = instrument.price_precision();
1723 let size_precision = instrument.size_precision();
1724
1725 match parse_ws_message_data(
1726 &channel,
1727 data,
1728 &instrument_id,
1729 price_precision,
1730 size_precision,
1731 ts_init,
1732 &mut self.funding_rate_cache,
1733 &self.instruments_cache,
1734 ) {
1735 Ok(Some(msg)) => {
1736 if let NautilusWsMessage::Instrument(ref inst) = msg {
1737 self.instruments_cache
1738 .insert(inst.symbol().inner(), inst.as_ref().clone());
1739 }
1740 Some(msg)
1741 }
1742 Ok(None) => None,
1743 Err(e) => {
1744 tracing::error!("Error parsing message for channel {:?}: {e}", channel);
1745 None
1746 }
1747 }
1748 }
1749
1750 pub(crate) fn parse_raw_message(
1751 msg: tokio_tungstenite::tungstenite::Message,
1752 ) -> Option<OKXWsMessage> {
1753 match msg {
1754 tokio_tungstenite::tungstenite::Message::Text(text) => {
1755 if text == TEXT_PONG {
1756 tracing::trace!("Received pong from OKX");
1757 return None;
1758 }
1759 if text == TEXT_PING {
1760 tracing::trace!("Received ping from OKX (text)");
1761 return Some(OKXWsMessage::Ping);
1762 }
1763
1764 if text == RECONNECTED {
1765 tracing::debug!("Received WebSocket reconnection signal");
1766 return Some(OKXWsMessage::Reconnected);
1767 }
1768 tracing::trace!("Received WebSocket message: {text}");
1769
1770 match serde_json::from_str(&text) {
1771 Ok(ws_event) => match &ws_event {
1772 OKXWsMessage::Error { code, msg } => {
1773 tracing::error!("WebSocket error: {code} - {msg}");
1774 Some(ws_event)
1775 }
1776 OKXWsMessage::Login {
1777 event,
1778 code,
1779 msg,
1780 conn_id,
1781 } => {
1782 if code == "0" {
1783 tracing::info!(conn_id = %conn_id, "WebSocket authenticated");
1784 } else {
1785 tracing::error!(event = %event, code = %code, error = %msg, "WebSocket authentication failed");
1786 }
1787 Some(ws_event)
1788 }
1789 OKXWsMessage::Subscription {
1790 event,
1791 arg,
1792 conn_id,
1793 ..
1794 } => {
1795 let channel_str = serde_json::to_string(&arg.channel)
1796 .expect("Invalid OKX websocket channel")
1797 .trim_matches('"')
1798 .to_string();
1799 tracing::debug!("{event}d: channel={channel_str}, conn_id={conn_id}");
1800 Some(ws_event)
1801 }
1802 OKXWsMessage::ChannelConnCount {
1803 event: _,
1804 channel,
1805 conn_count,
1806 conn_id,
1807 } => {
1808 let channel_str = serde_json::to_string(&channel)
1809 .expect("Invalid OKX websocket channel")
1810 .trim_matches('"')
1811 .to_string();
1812 tracing::debug!(
1813 "Channel connection status: channel={channel_str}, connections={conn_count}, conn_id={conn_id}",
1814 );
1815 None
1816 }
1817 OKXWsMessage::Ping => {
1818 tracing::trace!("Ignoring ping event parsed from text payload");
1819 None
1820 }
1821 OKXWsMessage::Data { .. } => Some(ws_event),
1822 OKXWsMessage::BookData { .. } => Some(ws_event),
1823 OKXWsMessage::OrderResponse {
1824 id,
1825 op,
1826 code,
1827 msg: _,
1828 data,
1829 } => {
1830 if code == "0" {
1831 tracing::debug!(
1832 "Order operation successful: id={:?}, op={op}, code={code}",
1833 id
1834 );
1835
1836 if let Some(order_data) = data.first() {
1837 let success_msg = order_data
1838 .get("sMsg")
1839 .and_then(|s| s.as_str())
1840 .unwrap_or("Order operation successful");
1841 tracing::debug!("Order success details: {success_msg}");
1842 }
1843 }
1844 Some(ws_event)
1845 }
1846 OKXWsMessage::Reconnected => {
1847 tracing::warn!("Unexpected Reconnected event from deserialization");
1849 None
1850 }
1851 },
1852 Err(e) => {
1853 tracing::error!("Failed to parse message: {e}: {text}");
1854 None
1855 }
1856 }
1857 }
1858 Message::Ping(_payload) => {
1859 tracing::trace!("Received binary ping frame from OKX");
1860 Some(OKXWsMessage::Ping)
1861 }
1862 Message::Pong(payload) => {
1863 tracing::trace!("Received pong frame from OKX ({} bytes)", payload.len());
1864 None
1865 }
1866 Message::Binary(msg) => {
1867 tracing::debug!("Raw binary: {msg:?}");
1868 None
1869 }
1870 Message::Close(_) => {
1871 tracing::debug!("Received close message");
1872 None
1873 }
1874 msg => {
1875 tracing::warn!("Unexpected message: {msg}");
1876 None
1877 }
1878 }
1879 }
1880
1881 fn generate_unique_request_id(&self) -> String {
1882 self.request_id_counter
1883 .fetch_add(1, Ordering::SeqCst)
1884 .to_string()
1885 }
1886
1887 fn get_instrument_type_and_family_from_instrument(
1888 instrument: &InstrumentAny,
1889 ) -> anyhow::Result<(OKXInstrumentType, String)> {
1890 let inst_type = okx_instrument_type(instrument)?;
1891 let symbol = instrument.symbol().inner();
1892
1893 let inst_family = match instrument {
1895 InstrumentAny::CurrencyPair(_) => symbol.as_str().to_string(),
1896 InstrumentAny::CryptoPerpetual(_) => {
1897 symbol
1899 .as_str()
1900 .strip_suffix("-SWAP")
1901 .unwrap_or(symbol.as_str())
1902 .to_string()
1903 }
1904 InstrumentAny::CryptoFuture(_) => {
1905 let s = symbol.as_str();
1908 if let Some(idx) = s.rfind('-') {
1909 s[..idx].to_string()
1910 } else {
1911 s.to_string()
1912 }
1913 }
1914 _ => {
1915 anyhow::bail!("Unsupported instrument type for OKX");
1916 }
1917 };
1918
1919 Ok((inst_type, inst_family))
1920 }
1921
1922 async fn handle_mass_cancel(&mut self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1923 let instrument = self
1924 .instruments_cache
1925 .get(&instrument_id.symbol.inner())
1926 .ok_or_else(|| anyhow::anyhow!("Unknown instrument {instrument_id}"))?;
1927
1928 let (inst_type, inst_family) =
1929 Self::get_instrument_type_and_family_from_instrument(instrument)?;
1930
1931 let params = WsMassCancelParams {
1932 inst_type,
1933 inst_family: Ustr::from(&inst_family),
1934 };
1935
1936 let args =
1937 vec![serde_json::to_value(params).map_err(|e| anyhow::anyhow!("JSON error: {e}"))?];
1938
1939 let request_id = self.generate_unique_request_id();
1940
1941 self.pending_mass_cancel_requests
1942 .insert(request_id.clone(), instrument_id);
1943
1944 let request = OKXWsRequest {
1945 id: Some(request_id.clone()),
1946 op: OKXWsOperation::MassCancel,
1947 exp_time: None,
1948 args,
1949 };
1950
1951 let payload = serde_json::to_string(&request)
1952 .map_err(|e| anyhow::anyhow!("Failed to serialize mass cancel request: {e}"))?;
1953
1954 match self
1955 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
1956 .await
1957 {
1958 Ok(()) => {
1959 tracing::debug!("Sent mass cancel for {instrument_id}");
1960 Ok(())
1961 }
1962 Err(e) => {
1963 tracing::error!(error = %e, "Failed to send mass cancel after retries");
1964
1965 self.pending_mass_cancel_requests.remove(&request_id);
1966
1967 let error = OKXWebSocketError {
1968 code: "CLIENT_ERROR".to_string(),
1969 message: format!("Mass cancel failed for {instrument_id}: {e}"),
1970 conn_id: None,
1971 timestamp: self.clock.get_time_ns().as_u64(),
1972 };
1973 let _ = self.send(NautilusWsMessage::Error(error));
1974
1975 Err(anyhow::anyhow!("Failed to send mass cancel: {e}"))
1976 }
1977 }
1978 }
1979
1980 async fn handle_batch_cancel_orders(
1981 &self,
1982 args: Vec<Value>,
1983 request_id: String,
1984 ) -> anyhow::Result<()> {
1985 let request = OKXWsRequest {
1986 id: Some(request_id),
1987 op: OKXWsOperation::BatchCancelOrders,
1988 exp_time: None,
1989 args,
1990 };
1991
1992 let payload = serde_json::to_string(&request)
1993 .map_err(|e| anyhow::anyhow!("Failed to serialize batch cancel request: {e}"))?;
1994
1995 if let Some(client) = &self.inner {
1996 client
1997 .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
1998 .await
1999 .map_err(|e| anyhow::anyhow!("Failed to send batch cancel: {e}"))?;
2000 tracing::debug!("Sent batch cancel orders");
2001 Ok(())
2002 } else {
2003 Err(anyhow::anyhow!("No active WebSocket client"))
2004 }
2005 }
2006
2007 async fn handle_batch_place_orders(
2008 &self,
2009 args: Vec<Value>,
2010 request_id: String,
2011 ) -> anyhow::Result<()> {
2012 let request = OKXWsRequest {
2013 id: Some(request_id),
2014 op: OKXWsOperation::BatchOrders,
2015 exp_time: None,
2016 args,
2017 };
2018
2019 let payload = serde_json::to_string(&request)
2020 .map_err(|e| anyhow::anyhow!("Failed to serialize batch place request: {e}"))?;
2021
2022 if let Some(client) = &self.inner {
2023 client
2024 .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2025 .await
2026 .map_err(|e| anyhow::anyhow!("Failed to send batch place: {e}"))?;
2027 tracing::debug!("Sent batch place orders");
2028 Ok(())
2029 } else {
2030 Err(anyhow::anyhow!("No active WebSocket client"))
2031 }
2032 }
2033
2034 async fn handle_batch_amend_orders(
2035 &self,
2036 args: Vec<Value>,
2037 request_id: String,
2038 ) -> anyhow::Result<()> {
2039 let request = OKXWsRequest {
2040 id: Some(request_id),
2041 op: OKXWsOperation::BatchAmendOrders,
2042 exp_time: None,
2043 args,
2044 };
2045
2046 let payload = serde_json::to_string(&request)
2047 .map_err(|e| anyhow::anyhow!("Failed to serialize batch amend request: {e}"))?;
2048
2049 if let Some(client) = &self.inner {
2050 client
2051 .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_AMEND.to_string()]))
2052 .await
2053 .map_err(|e| anyhow::anyhow!("Failed to send batch amend: {e}"))?;
2054 tracing::debug!("Sent batch amend orders");
2055 Ok(())
2056 } else {
2057 Err(anyhow::anyhow!("No active WebSocket client"))
2058 }
2059 }
2060
2061 async fn handle_subscribe(&self, args: Vec<OKXSubscriptionArg>) -> anyhow::Result<()> {
2062 for arg in &args {
2063 tracing::debug!(channel = ?arg.channel, inst_id = ?arg.inst_id, "Subscribing to channel");
2064 }
2065
2066 let message = OKXSubscription {
2067 op: OKXWsOperation::Subscribe,
2068 args,
2069 };
2070
2071 let json_txt = serde_json::to_string(&message)
2072 .map_err(|e| anyhow::anyhow!("Failed to serialize subscription: {e}"))?;
2073
2074 self.send_with_retry(
2075 json_txt,
2076 Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
2077 )
2078 .await
2079 .map_err(|e| anyhow::anyhow!("Failed to send subscription after retries: {e}"))?;
2080 Ok(())
2081 }
2082
2083 async fn handle_unsubscribe(&self, args: Vec<OKXSubscriptionArg>) -> anyhow::Result<()> {
2084 for arg in &args {
2085 tracing::debug!(channel = ?arg.channel, inst_id = ?arg.inst_id, "Unsubscribing from channel");
2086 }
2087
2088 let message = OKXSubscription {
2089 op: OKXWsOperation::Unsubscribe,
2090 args,
2091 };
2092
2093 let json_txt = serde_json::to_string(&message)
2094 .map_err(|e| anyhow::anyhow!("Failed to serialize unsubscription: {e}"))?;
2095
2096 self.send_with_retry(
2097 json_txt,
2098 Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
2099 )
2100 .await
2101 .map_err(|e| anyhow::anyhow!("Failed to send unsubscription after retries: {e}"))?;
2102 Ok(())
2103 }
2104
2105 async fn handle_place_order(
2106 &mut self,
2107 params: WsPostOrderParams,
2108 client_order_id: ClientOrderId,
2109 trader_id: TraderId,
2110 strategy_id: StrategyId,
2111 instrument_id: InstrumentId,
2112 ) -> anyhow::Result<()> {
2113 let request_id = self.generate_unique_request_id();
2114
2115 self.pending_place_requests.insert(
2116 request_id.clone(),
2117 (
2118 PendingOrderParams::Regular(params.clone()),
2119 client_order_id,
2120 trader_id,
2121 strategy_id,
2122 instrument_id,
2123 ),
2124 );
2125
2126 let request = OKXWsRequest {
2127 id: Some(request_id.clone()),
2128 op: OKXWsOperation::Order,
2129 exp_time: None,
2130 args: vec![params],
2131 };
2132
2133 let payload = serde_json::to_string(&request)
2134 .map_err(|e| anyhow::anyhow!("Failed to serialize place order request: {e}"))?;
2135
2136 match self
2137 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2138 .await
2139 {
2140 Ok(()) => {
2141 tracing::debug!("Sent place order request");
2142 Ok(())
2143 }
2144 Err(e) => {
2145 tracing::error!(error = %e, "Failed to send place order after retries");
2146
2147 self.pending_place_requests.remove(&request_id);
2148
2149 let ts_now = self.clock.get_time_ns();
2150 let rejected = OrderRejected::new(
2151 trader_id,
2152 strategy_id,
2153 instrument_id,
2154 client_order_id,
2155 self.account_id,
2156 Ustr::from(&format!("WebSocket send failed: {e}")),
2157 UUID4::new(),
2158 ts_now, ts_now, false, false, );
2163 let _ = self.send(NautilusWsMessage::OrderRejected(rejected));
2164
2165 Err(anyhow::anyhow!("Failed to send place order: {e}"))
2166 }
2167 }
2168 }
2169
2170 async fn handle_place_algo_order(
2171 &mut self,
2172 params: WsPostAlgoOrderParams,
2173 client_order_id: ClientOrderId,
2174 trader_id: TraderId,
2175 strategy_id: StrategyId,
2176 instrument_id: InstrumentId,
2177 ) -> anyhow::Result<()> {
2178 let request_id = self.generate_unique_request_id();
2179
2180 self.pending_place_requests.insert(
2181 request_id.clone(),
2182 (
2183 PendingOrderParams::Algo(params.clone()),
2184 client_order_id,
2185 trader_id,
2186 strategy_id,
2187 instrument_id,
2188 ),
2189 );
2190
2191 let request = OKXWsRequest {
2192 id: Some(request_id.clone()),
2193 op: OKXWsOperation::OrderAlgo,
2194 exp_time: None,
2195 args: vec![params],
2196 };
2197
2198 let payload = serde_json::to_string(&request)
2199 .map_err(|e| anyhow::anyhow!("Failed to serialize place algo order request: {e}"))?;
2200
2201 match self
2202 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2203 .await
2204 {
2205 Ok(()) => {
2206 tracing::debug!("Sent place algo order request");
2207 Ok(())
2208 }
2209 Err(e) => {
2210 tracing::error!(error = %e, "Failed to send place algo order after retries");
2211
2212 self.pending_place_requests.remove(&request_id);
2213
2214 let ts_now = self.clock.get_time_ns();
2215 let rejected = OrderRejected::new(
2216 trader_id,
2217 strategy_id,
2218 instrument_id,
2219 client_order_id,
2220 self.account_id,
2221 Ustr::from(&format!("WebSocket send failed: {e}")),
2222 UUID4::new(),
2223 ts_now, ts_now, false, false, );
2228 let _ = self.send(NautilusWsMessage::OrderRejected(rejected));
2229
2230 Err(anyhow::anyhow!("Failed to send place algo order: {e}"))
2231 }
2232 }
2233 }
2234
2235 async fn handle_cancel_order(
2236 &mut self,
2237 client_order_id: Option<ClientOrderId>,
2238 venue_order_id: Option<VenueOrderId>,
2239 instrument_id: InstrumentId,
2240 trader_id: TraderId,
2241 strategy_id: StrategyId,
2242 ) -> anyhow::Result<()> {
2243 let mut builder = WsCancelOrderParamsBuilder::default();
2244 builder.inst_id(instrument_id.symbol.as_str());
2245
2246 if let Some(venue_order_id) = venue_order_id {
2247 builder.ord_id(venue_order_id.as_str());
2248 }
2249
2250 if let Some(client_order_id) = client_order_id {
2251 builder.cl_ord_id(client_order_id.as_str());
2252 }
2253
2254 let params = builder
2255 .build()
2256 .map_err(|e| anyhow::anyhow!("Failed to build cancel params: {e}"))?;
2257
2258 let request_id = self.generate_unique_request_id();
2259
2260 if let Some(client_order_id) = client_order_id {
2262 self.pending_cancel_requests.insert(
2263 request_id.clone(),
2264 (
2265 client_order_id,
2266 trader_id,
2267 strategy_id,
2268 instrument_id,
2269 venue_order_id,
2270 ),
2271 );
2272 }
2273
2274 let request = OKXWsRequest {
2275 id: Some(request_id.clone()),
2276 op: OKXWsOperation::CancelOrder,
2277 exp_time: None,
2278 args: vec![params],
2279 };
2280
2281 let payload = serde_json::to_string(&request)
2282 .map_err(|e| anyhow::anyhow!("Failed to serialize cancel request: {e}"))?;
2283
2284 match self
2285 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
2286 .await
2287 {
2288 Ok(()) => {
2289 tracing::debug!("Sent cancel order request");
2290 Ok(())
2291 }
2292 Err(e) => {
2293 tracing::error!(error = %e, "Failed to send cancel order after retries");
2294
2295 self.pending_cancel_requests.remove(&request_id);
2296
2297 if let Some(client_order_id) = client_order_id {
2298 let ts_now = self.clock.get_time_ns();
2299 let rejected = OrderCancelRejected::new(
2300 trader_id,
2301 strategy_id,
2302 instrument_id,
2303 client_order_id,
2304 Ustr::from(&format!("WebSocket send failed: {e}")),
2305 UUID4::new(),
2306 ts_now, ts_now, false, venue_order_id,
2310 Some(self.account_id),
2311 );
2312 let _ = self.send(NautilusWsMessage::OrderCancelRejected(rejected));
2313 }
2314
2315 Err(anyhow::anyhow!("Failed to send cancel order: {e}"))
2316 }
2317 }
2318 }
2319
2320 async fn handle_amend_order(
2321 &mut self,
2322 params: WsAmendOrderParams,
2323 client_order_id: ClientOrderId,
2324 trader_id: TraderId,
2325 strategy_id: StrategyId,
2326 instrument_id: InstrumentId,
2327 venue_order_id: Option<VenueOrderId>,
2328 ) -> anyhow::Result<()> {
2329 let request_id = self.generate_unique_request_id();
2330
2331 self.pending_amend_requests.insert(
2332 request_id.clone(),
2333 (
2334 client_order_id,
2335 trader_id,
2336 strategy_id,
2337 instrument_id,
2338 venue_order_id,
2339 ),
2340 );
2341
2342 let request = OKXWsRequest {
2343 id: Some(request_id.clone()),
2344 op: OKXWsOperation::AmendOrder,
2345 exp_time: None,
2346 args: vec![params],
2347 };
2348
2349 let payload = serde_json::to_string(&request)
2350 .map_err(|e| anyhow::anyhow!("Failed to serialize amend order request: {e}"))?;
2351
2352 match self
2353 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_AMEND.to_string()]))
2354 .await
2355 {
2356 Ok(()) => {
2357 tracing::debug!("Sent amend order request");
2358 Ok(())
2359 }
2360 Err(e) => {
2361 tracing::error!(error = %e, "Failed to send amend order after retries");
2362
2363 self.pending_amend_requests.remove(&request_id);
2364
2365 let ts_now = self.clock.get_time_ns();
2366 let rejected = OrderModifyRejected::new(
2367 trader_id,
2368 strategy_id,
2369 instrument_id,
2370 client_order_id,
2371 Ustr::from(&format!("WebSocket send failed: {e}")),
2372 UUID4::new(),
2373 ts_now, ts_now, false, venue_order_id,
2377 Some(self.account_id),
2378 );
2379 let _ = self.send(NautilusWsMessage::OrderModifyRejected(rejected));
2380
2381 Err(anyhow::anyhow!("Failed to send amend order: {e}"))
2382 }
2383 }
2384 }
2385
2386 async fn handle_cancel_algo_order(
2387 &mut self,
2388 client_order_id: Option<ClientOrderId>,
2389 algo_order_id: Option<VenueOrderId>,
2390 instrument_id: InstrumentId,
2391 trader_id: TraderId,
2392 strategy_id: StrategyId,
2393 ) -> anyhow::Result<()> {
2394 let mut builder = WsCancelAlgoOrderParamsBuilder::default();
2395 builder.inst_id(instrument_id.symbol.as_str());
2396
2397 if let Some(client_order_id) = &client_order_id {
2398 builder.algo_cl_ord_id(client_order_id.as_str());
2399 }
2400
2401 if let Some(algo_id) = &algo_order_id {
2402 builder.algo_id(algo_id.as_str());
2403 }
2404
2405 let params = builder
2406 .build()
2407 .map_err(|e| anyhow::anyhow!("Failed to build cancel algo params: {e}"))?;
2408
2409 let request_id = self.generate_unique_request_id();
2410
2411 if let Some(client_order_id) = client_order_id {
2413 self.pending_cancel_requests.insert(
2414 request_id.clone(),
2415 (client_order_id, trader_id, strategy_id, instrument_id, None),
2416 );
2417 }
2418
2419 let request = OKXWsRequest {
2420 id: Some(request_id.clone()),
2421 op: OKXWsOperation::CancelAlgos,
2422 exp_time: None,
2423 args: vec![params],
2424 };
2425
2426 let payload = serde_json::to_string(&request)
2427 .map_err(|e| anyhow::anyhow!("Failed to serialize cancel algo request: {e}"))?;
2428
2429 match self
2430 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
2431 .await
2432 {
2433 Ok(()) => {
2434 tracing::debug!("Sent cancel algo order request");
2435 Ok(())
2436 }
2437 Err(e) => {
2438 tracing::error!(error = %e, "Failed to send cancel algo order after retries");
2439
2440 self.pending_cancel_requests.remove(&request_id);
2441
2442 if let Some(client_order_id) = client_order_id {
2443 let ts_now = self.clock.get_time_ns();
2444 let rejected = OrderCancelRejected::new(
2445 trader_id,
2446 strategy_id,
2447 instrument_id,
2448 client_order_id,
2449 Ustr::from(&format!("WebSocket send failed: {e}")),
2450 UUID4::new(),
2451 ts_now, ts_now, false, None,
2455 Some(self.account_id),
2456 );
2457 let _ = self.send(NautilusWsMessage::OrderCancelRejected(rejected));
2458 }
2459
2460 Err(anyhow::anyhow!("Failed to send cancel algo order: {e}"))
2461 }
2462 }
2463 }
2464}
2465
2466pub fn is_post_only_rejection(code: &str, data: &[Value]) -> bool {
2468 if code == OKX_POST_ONLY_ERROR_CODE {
2469 return true;
2470 }
2471
2472 for entry in data {
2473 if let Some(s_code) = entry.get("sCode").and_then(|value| value.as_str())
2474 && s_code == OKX_POST_ONLY_ERROR_CODE
2475 {
2476 return true;
2477 }
2478
2479 if let Some(inner_code) = entry.get("code").and_then(|value| value.as_str())
2480 && inner_code == OKX_POST_ONLY_ERROR_CODE
2481 {
2482 return true;
2483 }
2484 }
2485
2486 false
2487}
2488
2489#[inline]
2491fn contains_ignore_ascii_case(haystack: &str, needle: &str) -> bool {
2492 haystack
2493 .as_bytes()
2494 .windows(needle.len())
2495 .any(|window| window.eq_ignore_ascii_case(needle.as_bytes()))
2496}
2497
2498fn should_retry_okx_error(error: &OKXWsError) -> bool {
2500 match error {
2501 OKXWsError::OkxError { error_code, .. } => should_retry_error_code(error_code),
2502 OKXWsError::TungsteniteError(_) => true, OKXWsError::ClientError(msg) => {
2504 contains_ignore_ascii_case(msg, "timeout")
2506 || contains_ignore_ascii_case(msg, "timed out")
2507 || contains_ignore_ascii_case(msg, "connection")
2508 || contains_ignore_ascii_case(msg, "network")
2509 }
2510 OKXWsError::AuthenticationError(_)
2511 | OKXWsError::JsonError(_)
2512 | OKXWsError::ParsingError(_) => {
2513 false
2515 }
2516 }
2517}
2518
2519fn create_okx_timeout_error(msg: String) -> OKXWsError {
2521 OKXWsError::ClientError(msg)
2522}
2523
2524#[cfg(test)]
2525mod tests {
2526 use std::sync::{Arc, atomic::AtomicBool};
2527
2528 use ahash::AHashMap;
2529 use dashmap::DashMap;
2530 use nautilus_model::{
2531 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
2532 types::{Money, Quantity},
2533 };
2534 use nautilus_network::websocket::{AuthTracker, SubscriptionState};
2535 use rstest::rstest;
2536 use ustr::Ustr;
2537
2538 use super::{NautilusWsMessage, OKXWsFeedHandler};
2539 use crate::websocket::parse::OrderStateSnapshot;
2540
2541 const OKX_WS_TOPIC_DELIMITER: char = ':';
2542
2543 #[allow(clippy::type_complexity)]
2544 fn create_test_handler() -> (
2545 OKXWsFeedHandler,
2546 tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>,
2547 Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
2548 Arc<DashMap<ClientOrderId, ClientOrderId>>,
2549 ) {
2550 let account_id = AccountId::new("OKX-001");
2551 let signal = Arc::new(AtomicBool::new(false));
2552 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
2553 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
2554 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
2555 let active_client_orders = Arc::new(DashMap::new());
2556 let client_id_aliases = Arc::new(DashMap::new());
2557 let auth_tracker = AuthTracker::new();
2558 let subscriptions_state = SubscriptionState::new(OKX_WS_TOPIC_DELIMITER);
2559
2560 let handler = OKXWsFeedHandler::new(
2561 account_id,
2562 signal,
2563 cmd_rx,
2564 raw_rx,
2565 out_tx,
2566 active_client_orders.clone(),
2567 client_id_aliases.clone(),
2568 auth_tracker,
2569 subscriptions_state,
2570 );
2571
2572 (handler, out_rx, active_client_orders, client_id_aliases)
2573 }
2574
2575 #[rstest]
2576 fn test_is_post_only_rejection_detects_by_code() {
2577 assert!(super::is_post_only_rejection("51019", &[]));
2578 }
2579
2580 #[rstest]
2581 fn test_is_post_only_rejection_detects_by_inner_code() {
2582 let data = vec![serde_json::json!({
2583 "sCode": "51019"
2584 })];
2585 assert!(super::is_post_only_rejection("50000", &data));
2586 }
2587
2588 #[rstest]
2589 fn test_is_post_only_rejection_false_for_unrelated_error() {
2590 let data = vec![serde_json::json!({
2591 "sMsg": "Insufficient balance"
2592 })];
2593 assert!(!super::is_post_only_rejection("50000", &data));
2594 }
2595
2596 #[rstest]
2597 fn test_cleanup_alias_removes_canonical_entry() {
2598 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2599 let canonical = ClientOrderId::new("PARENT-001");
2600 aliases.insert(canonical, canonical);
2601
2602 aliases.remove(&canonical);
2603 aliases.retain(|_, v| *v != canonical);
2604
2605 assert!(!aliases.contains_key(&canonical));
2606 assert!(aliases.is_empty());
2607 }
2608
2609 #[rstest]
2610 fn test_cleanup_alias_removes_child_alias_pointing_to_canonical() {
2611 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2612 let canonical = ClientOrderId::new("PARENT-001");
2613 let child = ClientOrderId::new("CHILD-001");
2614 aliases.insert(canonical, canonical);
2615 aliases.insert(child, canonical);
2616
2617 aliases.remove(&canonical);
2618 aliases.retain(|_, v| *v != canonical);
2619
2620 assert!(!aliases.contains_key(&canonical));
2621 assert!(!aliases.contains_key(&child));
2622 assert!(aliases.is_empty());
2623 }
2624
2625 #[rstest]
2626 fn test_cleanup_alias_does_not_affect_unrelated_entries() {
2627 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2628 let canonical1 = ClientOrderId::new("PARENT-001");
2629 let child1 = ClientOrderId::new("CHILD-001");
2630 let canonical2 = ClientOrderId::new("PARENT-002");
2631 let child2 = ClientOrderId::new("CHILD-002");
2632 aliases.insert(canonical1, canonical1);
2633 aliases.insert(child1, canonical1);
2634 aliases.insert(canonical2, canonical2);
2635 aliases.insert(child2, canonical2);
2636
2637 aliases.remove(&canonical1);
2638 aliases.retain(|_, v| *v != canonical1);
2639
2640 assert!(!aliases.contains_key(&canonical1));
2641 assert!(!aliases.contains_key(&child1));
2642 assert!(aliases.contains_key(&canonical2));
2643 assert!(aliases.contains_key(&child2));
2644 assert_eq!(aliases.len(), 2);
2645 }
2646
2647 #[rstest]
2648 fn test_cleanup_alias_handles_multiple_children() {
2649 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2650 let canonical = ClientOrderId::new("PARENT-001");
2651 let child1 = ClientOrderId::new("CHILD-001");
2652 let child2 = ClientOrderId::new("CHILD-002");
2653 let child3 = ClientOrderId::new("CHILD-003");
2654 aliases.insert(canonical, canonical);
2655 aliases.insert(child1, canonical);
2656 aliases.insert(child2, canonical);
2657 aliases.insert(child3, canonical);
2658
2659 aliases.remove(&canonical);
2660 aliases.retain(|_, v| *v != canonical);
2661
2662 assert!(aliases.is_empty());
2663 }
2664
2665 #[rstest]
2666 fn test_cleanup_removes_from_all_caches() {
2667 let emitted_accepted: DashMap<VenueOrderId, ()> = DashMap::new();
2668 let order_state_cache: AHashMap<ClientOrderId, u32> = AHashMap::new();
2669 let active_orders: DashMap<ClientOrderId, ()> = DashMap::new();
2670 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2671 let fee_cache: AHashMap<Ustr, f64> = AHashMap::new();
2672 let filled_qty_cache: AHashMap<Ustr, f64> = AHashMap::new();
2673 let canonical = ClientOrderId::new("PARENT-001");
2674 let child = ClientOrderId::new("CHILD-001");
2675 let venue_id = VenueOrderId::new("VENUE-001");
2676
2677 emitted_accepted.insert(venue_id, ());
2678 let mut order_state = order_state_cache;
2679 order_state.insert(canonical, 1);
2680 active_orders.insert(canonical, ());
2681 aliases.insert(canonical, canonical);
2682 aliases.insert(child, canonical);
2683 let mut fees = fee_cache;
2684 fees.insert(venue_id.inner(), 0.001);
2685 let mut filled = filled_qty_cache;
2686 filled.insert(venue_id.inner(), 1.0);
2687
2688 emitted_accepted.remove(&venue_id);
2689 order_state.remove(&canonical);
2690 active_orders.remove(&canonical);
2691 aliases.remove(&canonical);
2692 aliases.retain(|_, v| *v != canonical);
2693 fees.remove(&venue_id.inner());
2694 filled.remove(&venue_id.inner());
2695
2696 assert!(emitted_accepted.is_empty());
2697 assert!(order_state.is_empty());
2698 assert!(active_orders.is_empty());
2699 assert!(aliases.is_empty());
2700 assert!(fees.is_empty());
2701 assert!(filled.is_empty());
2702 }
2703
2704 #[rstest]
2705 fn test_alias_registration_parent_with_child() {
2706 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2707 let parent = ClientOrderId::new("PARENT-001");
2708 let child = ClientOrderId::new("CHILD-001");
2709 aliases.insert(parent, parent);
2710 aliases.insert(child, parent);
2711
2712 assert_eq!(*aliases.get(&parent).unwrap(), parent);
2713 assert_eq!(*aliases.get(&child).unwrap(), parent);
2714 }
2715
2716 #[rstest]
2717 fn test_alias_registration_standalone_order() {
2718 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2719 let order_id = ClientOrderId::new("ORDER-001");
2720 aliases.insert(order_id, order_id);
2721
2722 assert_eq!(*aliases.get(&order_id).unwrap(), order_id);
2723 }
2724
2725 #[rstest]
2726 fn test_alias_lookup_returns_canonical() {
2727 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2728 let canonical = ClientOrderId::new("PARENT-001");
2729 let child = ClientOrderId::new("CHILD-001");
2730
2731 aliases.insert(canonical, canonical);
2732 aliases.insert(child, canonical);
2733
2734 let resolved = aliases.get(&child).map(|v| *v);
2735 assert_eq!(resolved, Some(canonical));
2736 }
2737
2738 #[rstest]
2739 fn test_handler_register_client_order_aliases_with_parent() {
2740 let (handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2741
2742 let child = Some(ClientOrderId::new("CHILD-001"));
2743 let parent = Some(ClientOrderId::new("PARENT-001"));
2744
2745 let result = handler.register_client_order_aliases(&child, &parent);
2746
2747 assert_eq!(result, Some(ClientOrderId::new("PARENT-001")));
2748 assert!(client_id_aliases.contains_key(&ClientOrderId::new("PARENT-001")));
2749 assert!(client_id_aliases.contains_key(&ClientOrderId::new("CHILD-001")));
2750 assert_eq!(
2751 *client_id_aliases
2752 .get(&ClientOrderId::new("CHILD-001"))
2753 .unwrap(),
2754 ClientOrderId::new("PARENT-001")
2755 );
2756 }
2757
2758 #[rstest]
2759 fn test_handler_register_client_order_aliases_without_parent() {
2760 let (handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2761
2762 let child = Some(ClientOrderId::new("ORDER-001"));
2763 let parent: Option<ClientOrderId> = None;
2764
2765 let result = handler.register_client_order_aliases(&child, &parent);
2766
2767 assert_eq!(result, Some(ClientOrderId::new("ORDER-001")));
2768 assert!(client_id_aliases.contains_key(&ClientOrderId::new("ORDER-001")));
2769 assert_eq!(
2770 *client_id_aliases
2771 .get(&ClientOrderId::new("ORDER-001"))
2772 .unwrap(),
2773 ClientOrderId::new("ORDER-001")
2774 );
2775 }
2776
2777 #[rstest]
2778 fn test_handler_cleanup_terminal_order_removes_all_state() {
2779 let (mut handler, _out_rx, active_client_orders, client_id_aliases) = create_test_handler();
2780
2781 let canonical = ClientOrderId::new("PARENT-001");
2782 let child = ClientOrderId::new("CHILD-001");
2783 let venue_id = VenueOrderId::new("VENUE-001");
2784 let trader_id = TraderId::new("TRADER-001");
2785 let strategy_id = StrategyId::new("STRATEGY-001");
2786 let instrument_id = InstrumentId::from("ETH-USDT-PERP.OKX");
2787
2788 active_client_orders.insert(canonical, (trader_id, strategy_id, instrument_id));
2789 client_id_aliases.insert(canonical, canonical);
2790 client_id_aliases.insert(child, canonical);
2791 handler
2792 .fee_cache
2793 .insert(venue_id.inner(), Money::from("0.001 USDT"));
2794 handler
2795 .filled_qty_cache
2796 .insert(venue_id.inner(), Quantity::from("1.0"));
2797 handler.order_state_cache.insert(
2798 canonical,
2799 OrderStateSnapshot {
2800 venue_order_id: venue_id,
2801 quantity: Quantity::from("1.0"),
2802 price: None,
2803 },
2804 );
2805
2806 handler.cleanup_terminal_order(&canonical, &venue_id);
2807
2808 assert!(!active_client_orders.contains_key(&canonical));
2809 assert!(!client_id_aliases.contains_key(&canonical));
2810 assert!(!client_id_aliases.contains_key(&child));
2811 assert!(!handler.fee_cache.contains_key(&venue_id.inner()));
2812 assert!(!handler.filled_qty_cache.contains_key(&venue_id.inner()));
2813 assert!(!handler.order_state_cache.contains_key(&canonical));
2814 }
2815
2816 #[rstest]
2817 fn test_handler_cleanup_terminal_order_removes_multiple_children() {
2818 let (mut handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2819
2820 let canonical = ClientOrderId::new("PARENT-001");
2821 let child1 = ClientOrderId::new("CHILD-001");
2822 let child2 = ClientOrderId::new("CHILD-002");
2823 let child3 = ClientOrderId::new("CHILD-003");
2824 let venue_id = VenueOrderId::new("VENUE-001");
2825
2826 client_id_aliases.insert(canonical, canonical);
2827 client_id_aliases.insert(child1, canonical);
2828 client_id_aliases.insert(child2, canonical);
2829 client_id_aliases.insert(child3, canonical);
2830
2831 handler.cleanup_terminal_order(&canonical, &venue_id);
2832
2833 assert!(!client_id_aliases.contains_key(&canonical));
2834 assert!(!client_id_aliases.contains_key(&child1));
2835 assert!(!client_id_aliases.contains_key(&child2));
2836 assert!(!client_id_aliases.contains_key(&child3));
2837 assert!(client_id_aliases.is_empty());
2838 }
2839
2840 #[rstest]
2841 fn test_handler_cleanup_does_not_affect_other_orders() {
2842 let (mut handler, _out_rx, active_client_orders, client_id_aliases) = create_test_handler();
2843
2844 let canonical1 = ClientOrderId::new("PARENT-001");
2845 let child1 = ClientOrderId::new("CHILD-001");
2846 let venue_id1 = VenueOrderId::new("VENUE-001");
2847
2848 let canonical2 = ClientOrderId::new("PARENT-002");
2849 let child2 = ClientOrderId::new("CHILD-002");
2850 let venue_id2 = VenueOrderId::new("VENUE-002");
2851
2852 let trader_id = TraderId::new("TRADER-001");
2853 let strategy_id = StrategyId::new("STRATEGY-001");
2854 let instrument_id = InstrumentId::from("ETH-USDT-PERP.OKX");
2855
2856 active_client_orders.insert(canonical1, (trader_id, strategy_id, instrument_id));
2857 active_client_orders.insert(canonical2, (trader_id, strategy_id, instrument_id));
2858 client_id_aliases.insert(canonical1, canonical1);
2859 client_id_aliases.insert(child1, canonical1);
2860 client_id_aliases.insert(canonical2, canonical2);
2861 client_id_aliases.insert(child2, canonical2);
2862 handler
2863 .fee_cache
2864 .insert(venue_id1.inner(), Money::from("0.001 USDT"));
2865 handler
2866 .fee_cache
2867 .insert(venue_id2.inner(), Money::from("0.002 USDT"));
2868
2869 handler.cleanup_terminal_order(&canonical1, &venue_id1);
2870
2871 assert!(!active_client_orders.contains_key(&canonical1));
2872 assert!(!client_id_aliases.contains_key(&canonical1));
2873 assert!(!client_id_aliases.contains_key(&child1));
2874 assert!(!handler.fee_cache.contains_key(&venue_id1.inner()));
2875
2876 assert!(active_client_orders.contains_key(&canonical2));
2877 assert!(client_id_aliases.contains_key(&canonical2));
2878 assert!(client_id_aliases.contains_key(&child2));
2879 assert!(handler.fee_cache.contains_key(&venue_id2.inner()));
2880 }
2881
2882 mod channel_routing {
2883 use nautilus_core::nanos::UnixNanos;
2884 use nautilus_model::{
2885 identifiers::{InstrumentId, Symbol},
2886 instruments::{CryptoPerpetual, CurrencyPair, Instrument, InstrumentAny},
2887 types::{Currency, Price, Quantity},
2888 };
2889 use rstest::rstest;
2890 use ustr::Ustr;
2891
2892 use super::*;
2893 use crate::{
2894 common::{enums::OKXBookAction, testing::load_test_json},
2895 websocket::{enums::OKXWsChannel, messages::OKXWsMessage},
2896 };
2897
2898 fn create_spot_instrument() -> InstrumentAny {
2899 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2900 InstrumentAny::CurrencyPair(CurrencyPair::new(
2901 instrument_id,
2902 Symbol::from("BTC-USDT"),
2903 Currency::BTC(),
2904 Currency::USDT(),
2905 2,
2906 8,
2907 Price::from("0.01"),
2908 Quantity::from("0.00000001"),
2909 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
2922 UnixNanos::default(),
2923 ))
2924 }
2925
2926 fn create_swap_instrument() -> InstrumentAny {
2927 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2928 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2929 instrument_id,
2930 Symbol::from("BTC-USDT-SWAP"),
2931 Currency::BTC(),
2932 Currency::USDT(),
2933 Currency::USDT(),
2934 false,
2935 2,
2936 8,
2937 Price::from("0.01"),
2938 Quantity::from("0.00000001"),
2939 None,
2940 None,
2941 None,
2942 None,
2943 None,
2944 None,
2945 None,
2946 None,
2947 None,
2948 None,
2949 None,
2950 None,
2951 UnixNanos::default(),
2952 UnixNanos::default(),
2953 ))
2954 }
2955
2956 fn create_handler_with_instruments(instruments: Vec<InstrumentAny>) -> OKXWsFeedHandler {
2957 let (mut handler, _, _, _) = create_test_handler();
2958 for inst in instruments {
2959 handler
2960 .instruments_cache
2961 .insert(inst.symbol().inner(), inst);
2962 }
2963 handler
2964 }
2965
2966 #[rstest]
2967 fn test_parse_raw_message_ticker_channel() {
2968 let json = load_test_json("ws_tickers.json");
2969 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2970
2971 match msg {
2972 OKXWsMessage::Data { arg, data } => {
2973 assert!(
2974 matches!(arg.channel, OKXWsChannel::Tickers),
2975 "Expected Tickers channel"
2976 );
2977 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
2978 assert!(data.is_array());
2979 }
2980 _ => panic!("Expected OKXWsMessage::Data variant"),
2981 }
2982 }
2983
2984 #[rstest]
2985 fn test_parse_raw_message_trades_channel() {
2986 let json = load_test_json("ws_trades.json");
2987 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2988
2989 match msg {
2990 OKXWsMessage::Data { arg, data } => {
2991 assert!(
2992 matches!(arg.channel, OKXWsChannel::Trades),
2993 "Expected Trades channel"
2994 );
2995 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USD")));
2996 assert!(data.is_array());
2997 }
2998 _ => panic!("Expected OKXWsMessage::Data variant"),
2999 }
3000 }
3001
3002 #[rstest]
3003 fn test_parse_raw_message_books_channel() {
3004 let json = load_test_json("ws_books_snapshot.json");
3005 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3006
3007 match msg {
3008 OKXWsMessage::BookData { arg, action, data } => {
3009 assert!(
3010 matches!(arg.channel, OKXWsChannel::Books),
3011 "Expected Books channel"
3012 );
3013 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
3014 assert!(
3015 matches!(action, OKXBookAction::Snapshot),
3016 "Expected snapshot action"
3017 );
3018 assert!(!data.is_empty());
3019 }
3020 _ => panic!("Expected OKXWsMessage::BookData variant"),
3021 }
3022 }
3023
3024 #[rstest]
3025 fn test_parse_raw_message_candle_channel() {
3026 let json = load_test_json("ws_candle.json");
3027 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3028
3029 match msg {
3030 OKXWsMessage::Data { arg, data } => {
3031 assert!(
3033 matches!(arg.channel, OKXWsChannel::Candle1Day),
3034 "Expected Candle1Day channel, got {:?}",
3035 arg.channel
3036 );
3037 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
3038 assert!(data.is_array());
3039 }
3040 _ => panic!("Expected OKXWsMessage::Data variant"),
3041 }
3042 }
3043
3044 #[rstest]
3045 fn test_parse_raw_message_funding_rate_channel() {
3046 let json = load_test_json("ws_funding_rate.json");
3047 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3048
3049 match msg {
3050 OKXWsMessage::Data { arg, data } => {
3051 assert!(
3052 matches!(arg.channel, OKXWsChannel::FundingRate),
3053 "Expected FundingRate channel"
3054 );
3055 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT-SWAP")));
3056 assert!(data.is_array());
3057 }
3058 _ => panic!("Expected OKXWsMessage::Data variant"),
3059 }
3060 }
3061
3062 #[rstest]
3063 fn test_parse_raw_message_bbo_tbt_channel() {
3064 let json = load_test_json("ws_bbo_tbt.json");
3065 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3066
3067 match msg {
3068 OKXWsMessage::Data { arg, data } => {
3069 assert!(
3070 matches!(arg.channel, OKXWsChannel::BboTbt),
3071 "Expected BboTbt channel"
3072 );
3073 assert!(data.is_array());
3074 }
3075 _ => panic!("Expected OKXWsMessage::Data variant"),
3076 }
3077 }
3078
3079 #[rstest]
3080 fn test_handle_other_channel_data_tickers() {
3081 let mut handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3082 let json = load_test_json("ws_tickers.json");
3083 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3084
3085 let OKXWsMessage::Data { arg, data } = msg else {
3086 panic!("Expected OKXWsMessage::Data");
3087 };
3088
3089 let ts_init = UnixNanos::from(1_000_000_000u64);
3090 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3091
3092 assert!(result.is_some());
3093 match result.unwrap() {
3094 NautilusWsMessage::Data(payloads) => {
3095 assert!(!payloads.is_empty(), "Should produce data payloads");
3096 }
3097 other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3098 }
3099 }
3100
3101 #[rstest]
3102 fn test_handle_other_channel_data_trades() {
3103 let instrument_id = InstrumentId::from("BTC-USD.OKX");
3105 let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
3106 instrument_id,
3107 Symbol::from("BTC-USD"),
3108 Currency::BTC(),
3109 Currency::USD(),
3110 1,
3111 8,
3112 Price::from("0.1"),
3113 Quantity::from("0.00000001"),
3114 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
3127 UnixNanos::default(),
3128 ));
3129
3130 let mut handler = create_handler_with_instruments(vec![instrument]);
3131 let json = load_test_json("ws_trades.json");
3132 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3133
3134 let OKXWsMessage::Data { arg, data } = msg else {
3135 panic!("Expected OKXWsMessage::Data");
3136 };
3137
3138 let ts_init = UnixNanos::from(1_000_000_000u64);
3139 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3140
3141 assert!(result.is_some());
3142 match result.unwrap() {
3143 NautilusWsMessage::Data(payloads) => {
3144 assert!(!payloads.is_empty(), "Should produce trade data payloads");
3145 }
3146 other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3147 }
3148 }
3149
3150 #[rstest]
3151 fn test_handle_book_data_snapshot() {
3152 let handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3153 let json = load_test_json("ws_books_snapshot.json");
3154 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3155
3156 let OKXWsMessage::BookData { arg, action, data } = msg else {
3157 panic!("Expected OKXWsMessage::BookData");
3158 };
3159
3160 let ts_init = UnixNanos::from(1_000_000_000u64);
3161 let result = handler.handle_book_data(arg, action, data, ts_init);
3162
3163 assert!(result.is_some());
3164 match result.unwrap() {
3165 NautilusWsMessage::Data(payloads) => {
3166 assert!(!payloads.is_empty(), "Should produce order book payloads");
3167 }
3168 other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3169 }
3170 }
3171
3172 #[rstest]
3173 fn test_handle_book_data_update() {
3174 let handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3175 let json = load_test_json("ws_books_update.json");
3176 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3177
3178 let OKXWsMessage::BookData { arg, action, data } = msg else {
3179 panic!("Expected OKXWsMessage::BookData");
3180 };
3181
3182 let ts_init = UnixNanos::from(1_000_000_000u64);
3183 let result = handler.handle_book_data(arg, action, data, ts_init);
3184
3185 assert!(result.is_some());
3186 match result.unwrap() {
3187 NautilusWsMessage::Data(payloads) => {
3188 assert!(
3189 !payloads.is_empty(),
3190 "Should produce order book delta payloads"
3191 );
3192 }
3193 other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3194 }
3195 }
3196
3197 #[rstest]
3198 fn test_handle_other_channel_data_candles() {
3199 let mut handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3200 let json = load_test_json("ws_candle.json");
3201 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3202
3203 let OKXWsMessage::Data { arg, data } = msg else {
3204 panic!("Expected OKXWsMessage::Data");
3205 };
3206
3207 let ts_init = UnixNanos::from(1_000_000_000u64);
3208 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3209
3210 assert!(result.is_some());
3211 match result.unwrap() {
3212 NautilusWsMessage::Data(payloads) => {
3213 assert!(!payloads.is_empty(), "Should produce bar data payloads");
3214 }
3215 other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3216 }
3217 }
3218
3219 #[rstest]
3220 fn test_handle_other_channel_data_funding_rate() {
3221 let mut handler = create_handler_with_instruments(vec![create_swap_instrument()]);
3222 let json = load_test_json("ws_funding_rate.json");
3223 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3224
3225 let OKXWsMessage::Data { arg, data } = msg else {
3226 panic!("Expected OKXWsMessage::Data");
3227 };
3228
3229 let ts_init = UnixNanos::from(1_000_000_000u64);
3230 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3231
3232 assert!(result.is_none() || matches!(result, Some(NautilusWsMessage::FundingRates(_))));
3234 }
3235
3236 #[rstest]
3237 fn test_handle_account_data_parses_successfully() {
3238 let mut handler = create_handler_with_instruments(vec![]);
3239 let json = load_test_json("ws_account.json");
3240 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3241
3242 let OKXWsMessage::Data { data, .. } = msg else {
3243 panic!("Expected OKXWsMessage::Data");
3244 };
3245
3246 let ts_init = UnixNanos::from(1_000_000_000u64);
3247 let result = handler.handle_account_data(data, ts_init);
3248
3249 assert!(result.is_some());
3250 match result.unwrap() {
3251 NautilusWsMessage::AccountUpdate(account_state) => {
3252 assert!(
3253 !account_state.balances.is_empty(),
3254 "Should have balance data"
3255 );
3256 }
3257 other => panic!("Expected NautilusWsMessage::AccountUpdate, got {other:?}"),
3258 }
3259 }
3260
3261 #[rstest]
3262 fn test_handle_other_channel_data_missing_instrument() {
3263 let mut handler = create_handler_with_instruments(vec![]);
3264 let json = load_test_json("ws_tickers.json");
3265 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3266
3267 let OKXWsMessage::Data { arg, data } = msg else {
3268 panic!("Expected OKXWsMessage::Data");
3269 };
3270
3271 let ts_init = UnixNanos::from(1_000_000_000u64);
3272 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3273
3274 assert!(result.is_none());
3276 }
3277
3278 #[rstest]
3279 fn test_handle_book_data_missing_instrument() {
3280 let handler = create_handler_with_instruments(vec![]);
3281 let json = load_test_json("ws_books_snapshot.json");
3282 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3283
3284 let OKXWsMessage::BookData { arg, action, data } = msg else {
3285 panic!("Expected OKXWsMessage::BookData");
3286 };
3287
3288 let ts_init = UnixNanos::from(1_000_000_000u64);
3289 let result = handler.handle_book_data(arg, action, data, ts_init);
3290
3291 assert!(result.is_none());
3293 }
3294 }
3295}