1use std::{
32 collections::VecDeque,
33 sync::{
34 Arc,
35 atomic::{AtomicBool, AtomicU64, Ordering},
36 },
37};
38
39use ahash::AHashMap;
40use dashmap::DashMap;
41use nautilus_core::{AtomicTime, UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
42use nautilus_model::{
43 enums::{OrderStatus, OrderType},
44 events::{
45 AccountState, OrderAccepted, OrderCancelRejected, OrderModifyRejected, OrderRejected,
46 },
47 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
48 instruments::{Instrument, InstrumentAny},
49 types::{Money, Quantity},
50};
51use nautilus_network::{
52 RECONNECTED,
53 retry::{RetryManager, create_websocket_retry_manager},
54 websocket::{AuthTracker, SubscriptionState, TEXT_PING, TEXT_PONG, WebSocketClient},
55};
56use serde_json::Value;
57use tokio_tungstenite::tungstenite::Message;
58use ustr::Ustr;
59
60use super::{
61 enums::{OKXSubscriptionEvent, OKXWsChannel, OKXWsOperation},
62 error::OKXWsError,
63 messages::{
64 ExecutionReport, NautilusWsMessage, OKXAlgoOrderMsg, OKXBookMsg, OKXOrderMsg,
65 OKXSubscription, OKXSubscriptionArg, OKXWebSocketArg, OKXWebSocketError, OKXWsMessage,
66 OKXWsRequest, WsAmendOrderParams, WsCancelAlgoOrderParamsBuilder,
67 WsCancelOrderParamsBuilder, WsMassCancelParams, WsPostAlgoOrderParams, WsPostOrderParams,
68 },
69 parse::{
70 OrderStateSnapshot, ParsedOrderEvent, parse_algo_order_msg, parse_book_msg_vec,
71 parse_order_event, parse_order_msg, parse_ws_message_data,
72 },
73 subscription::topic_from_websocket_arg,
74};
75use crate::{
76 common::{
77 consts::{
78 OKX_POST_ONLY_CANCEL_REASON, OKX_POST_ONLY_CANCEL_SOURCE, OKX_POST_ONLY_ERROR_CODE,
79 should_retry_error_code,
80 },
81 enums::{
82 OKXBookAction, OKXInstrumentType, OKXOrderStatus, OKXSide, OKXTargetCurrency,
83 OKXTradeMode,
84 },
85 parse::{
86 determine_order_type, is_market_price, okx_instrument_type, parse_account_state,
87 parse_client_order_id, parse_millisecond_timestamp, parse_position_status_report,
88 parse_price, parse_quantity,
89 },
90 },
91 http::models::{OKXAccount, OKXPosition},
92 websocket::client::{
93 OKX_RATE_LIMIT_KEY_AMEND, OKX_RATE_LIMIT_KEY_CANCEL, OKX_RATE_LIMIT_KEY_ORDER,
94 OKX_RATE_LIMIT_KEY_SUBSCRIPTION,
95 },
96};
97
98type PlaceRequestData = (
100 PendingOrderParams,
101 ClientOrderId,
102 TraderId,
103 StrategyId,
104 InstrumentId,
105);
106
107type CancelRequestData = (
109 ClientOrderId,
110 TraderId,
111 StrategyId,
112 InstrumentId,
113 Option<VenueOrderId>,
114);
115
116type AmendRequestData = (
118 ClientOrderId,
119 TraderId,
120 StrategyId,
121 InstrumentId,
122 Option<VenueOrderId>,
123);
124
125#[derive(Debug)]
126pub enum PendingOrderParams {
127 Regular(WsPostOrderParams),
128 Algo(WsPostAlgoOrderParams),
129}
130
131#[allow(
133 clippy::large_enum_variant,
134 reason = "Commands are ephemeral and immediately consumed"
135)]
136#[allow(missing_debug_implementations)]
137pub enum HandlerCommand {
138 SetClient(WebSocketClient),
139 Disconnect,
140 Authenticate {
141 payload: String,
142 },
143 InitializeInstruments(Vec<InstrumentAny>),
144 UpdateInstrument(InstrumentAny),
145 Subscribe {
146 args: Vec<OKXSubscriptionArg>,
147 },
148 Unsubscribe {
149 args: Vec<OKXSubscriptionArg>,
150 },
151 PlaceOrder {
152 params: WsPostOrderParams,
153 client_order_id: ClientOrderId,
154 trader_id: TraderId,
155 strategy_id: StrategyId,
156 instrument_id: InstrumentId,
157 },
158 PlaceAlgoOrder {
159 params: WsPostAlgoOrderParams,
160 client_order_id: ClientOrderId,
161 trader_id: TraderId,
162 strategy_id: StrategyId,
163 instrument_id: InstrumentId,
164 },
165 AmendOrder {
166 params: WsAmendOrderParams,
167 client_order_id: ClientOrderId,
168 trader_id: TraderId,
169 strategy_id: StrategyId,
170 instrument_id: InstrumentId,
171 venue_order_id: Option<VenueOrderId>,
172 },
173 CancelOrder {
174 client_order_id: Option<ClientOrderId>,
175 venue_order_id: Option<VenueOrderId>,
176 instrument_id: InstrumentId,
177 trader_id: TraderId,
178 strategy_id: StrategyId,
179 },
180 CancelAlgoOrder {
181 client_order_id: Option<ClientOrderId>,
182 algo_order_id: Option<VenueOrderId>,
183 instrument_id: InstrumentId,
184 trader_id: TraderId,
185 strategy_id: StrategyId,
186 },
187 MassCancel {
188 instrument_id: InstrumentId,
189 },
190 BatchPlaceOrders {
191 args: Vec<Value>,
192 request_id: String,
193 },
194 BatchAmendOrders {
195 args: Vec<Value>,
196 request_id: String,
197 },
198 BatchCancelOrders {
199 args: Vec<Value>,
200 request_id: String,
201 },
202}
203
204pub(super) struct OKXWsFeedHandler {
205 clock: &'static AtomicTime,
206 account_id: AccountId,
207 signal: Arc<AtomicBool>,
208 inner: Option<WebSocketClient>,
209 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
210 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
211 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
212 auth_tracker: AuthTracker,
213 subscriptions_state: SubscriptionState,
214 retry_manager: RetryManager<OKXWsError>,
215 pending_place_requests: AHashMap<String, PlaceRequestData>,
216 pending_cancel_requests: AHashMap<String, CancelRequestData>,
217 pending_amend_requests: AHashMap<String, AmendRequestData>,
218 pending_mass_cancel_requests: AHashMap<String, InstrumentId>,
219 pending_messages: VecDeque<NautilusWsMessage>,
220 active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
221 client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
222 emitted_order_accepted: AHashMap<VenueOrderId, ()>,
223 instruments_cache: AHashMap<Ustr, InstrumentAny>,
224 fee_cache: AHashMap<Ustr, Money>, filled_qty_cache: AHashMap<Ustr, Quantity>, order_state_cache: AHashMap<ClientOrderId, OrderStateSnapshot>,
227 funding_rate_cache: AHashMap<Ustr, (Ustr, u64)>, last_account_state: Option<AccountState>,
229 request_id_counter: AtomicU64,
230}
231
232impl OKXWsFeedHandler {
233 #[allow(clippy::too_many_arguments)]
235 pub fn new(
236 account_id: AccountId,
237 signal: Arc<AtomicBool>,
238 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
239 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
240 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
241 active_client_orders: Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
242 client_id_aliases: Arc<DashMap<ClientOrderId, ClientOrderId>>,
243 auth_tracker: AuthTracker,
244 subscriptions_state: SubscriptionState,
245 ) -> Self {
246 Self {
247 clock: get_atomic_clock_realtime(),
248 account_id,
249 signal,
250 inner: None,
251 cmd_rx,
252 raw_rx,
253 out_tx,
254 auth_tracker,
255 subscriptions_state,
256 retry_manager: create_websocket_retry_manager(),
257 pending_place_requests: AHashMap::new(),
258 pending_cancel_requests: AHashMap::new(),
259 pending_amend_requests: AHashMap::new(),
260 pending_mass_cancel_requests: AHashMap::new(),
261 pending_messages: VecDeque::new(),
262 active_client_orders,
263 client_id_aliases,
264 emitted_order_accepted: AHashMap::new(),
265 instruments_cache: AHashMap::new(),
266 fee_cache: AHashMap::new(),
267 filled_qty_cache: AHashMap::new(),
268 order_state_cache: AHashMap::new(),
269 funding_rate_cache: AHashMap::new(),
270 last_account_state: None,
271 request_id_counter: AtomicU64::new(0),
272 }
273 }
274
275 pub(super) fn is_stopped(&self) -> bool {
276 self.signal.load(std::sync::atomic::Ordering::Acquire)
277 }
278
279 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
280 self.out_tx.send(msg).map_err(|_| ())
281 }
282
283 async fn send_with_retry(
284 &self,
285 payload: String,
286 rate_limit_keys: Option<Vec<String>>,
287 ) -> Result<(), OKXWsError> {
288 if let Some(client) = &self.inner {
289 self.retry_manager
290 .execute_with_retry(
291 "websocket_send",
292 || {
293 let payload = payload.clone();
294 let keys = rate_limit_keys.clone();
295 async move {
296 client
297 .send_text(payload, keys)
298 .await
299 .map_err(|e| OKXWsError::ClientError(format!("Send failed: {e}")))
300 }
301 },
302 should_retry_okx_error,
303 create_okx_timeout_error,
304 )
305 .await
306 } else {
307 Err(OKXWsError::ClientError(
308 "No active WebSocket client".to_string(),
309 ))
310 }
311 }
312
313 pub(super) async fn send_pong(&self) -> anyhow::Result<()> {
314 match self.send_with_retry(TEXT_PONG.to_string(), None).await {
315 Ok(()) => {
316 tracing::trace!("Sent pong response to OKX text ping");
317 Ok(())
318 }
319 Err(e) => {
320 tracing::warn!(error = %e, "Failed to send pong after retries");
321 Err(anyhow::anyhow!("Failed to send pong: {e}"))
322 }
323 }
324 }
325
326 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
327 if let Some(message) = self.pending_messages.pop_front() {
328 return Some(message);
329 }
330
331 loop {
332 tokio::select! {
333 Some(cmd) = self.cmd_rx.recv() => {
334 match cmd {
335 HandlerCommand::SetClient(client) => {
336 tracing::debug!("Handler received WebSocket client");
337 self.inner = Some(client);
338 }
339 HandlerCommand::Disconnect => {
340 tracing::debug!("Handler disconnecting WebSocket client");
341 self.inner = None;
342 return None;
343 }
344 HandlerCommand::Authenticate { payload } => {
345 if let Err(e) = self.send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()])).await {
346 tracing::error!(error = %e, "Failed to send authentication message after retries");
347 }
348 }
349 HandlerCommand::InitializeInstruments(instruments) => {
350 for inst in instruments {
351 self.instruments_cache.insert(inst.symbol().inner(), inst);
352 }
353 }
354 HandlerCommand::UpdateInstrument(inst) => {
355 self.instruments_cache.insert(inst.symbol().inner(), inst);
356 }
357 HandlerCommand::Subscribe { args } => {
358 if let Err(e) = self.handle_subscribe(args).await {
359 tracing::error!(error = %e, "Failed to handle subscribe command");
360 }
361 }
362 HandlerCommand::Unsubscribe { args } => {
363 if let Err(e) = self.handle_unsubscribe(args).await {
364 tracing::error!(error = %e, "Failed to handle unsubscribe command");
365 }
366 }
367 HandlerCommand::CancelOrder {
368 client_order_id,
369 venue_order_id,
370 instrument_id,
371 trader_id,
372 strategy_id,
373 } => {
374 if let Err(e) = self
375 .handle_cancel_order(
376 client_order_id,
377 venue_order_id,
378 instrument_id,
379 trader_id,
380 strategy_id,
381 )
382 .await
383 {
384 tracing::error!(error = %e, "Failed to handle cancel order command");
385 }
386 }
387 HandlerCommand::CancelAlgoOrder {
388 client_order_id,
389 algo_order_id,
390 instrument_id,
391 trader_id,
392 strategy_id,
393 } => {
394 if let Err(e) = self
395 .handle_cancel_algo_order(
396 client_order_id,
397 algo_order_id,
398 instrument_id,
399 trader_id,
400 strategy_id,
401 )
402 .await
403 {
404 tracing::error!(error = %e, "Failed to handle cancel algo order command");
405 }
406 }
407 HandlerCommand::PlaceOrder {
408 params,
409 client_order_id,
410 trader_id,
411 strategy_id,
412 instrument_id,
413 } => {
414 if let Err(e) = self
415 .handle_place_order(
416 params,
417 client_order_id,
418 trader_id,
419 strategy_id,
420 instrument_id,
421 )
422 .await
423 {
424 tracing::error!(error = %e, "Failed to handle place order command");
425 }
426 }
427 HandlerCommand::PlaceAlgoOrder {
428 params,
429 client_order_id,
430 trader_id,
431 strategy_id,
432 instrument_id,
433 } => {
434 if let Err(e) = self
435 .handle_place_algo_order(
436 params,
437 client_order_id,
438 trader_id,
439 strategy_id,
440 instrument_id,
441 )
442 .await
443 {
444 tracing::error!(error = %e, "Failed to handle place algo order command");
445 }
446 }
447 HandlerCommand::AmendOrder {
448 params,
449 client_order_id,
450 trader_id,
451 strategy_id,
452 instrument_id,
453 venue_order_id,
454 } => {
455 if let Err(e) = self
456 .handle_amend_order(
457 params,
458 client_order_id,
459 trader_id,
460 strategy_id,
461 instrument_id,
462 venue_order_id,
463 )
464 .await
465 {
466 tracing::error!(error = %e, "Failed to handle amend order command");
467 }
468 }
469 HandlerCommand::MassCancel { instrument_id } => {
470 if let Err(e) = self.handle_mass_cancel(instrument_id).await {
471 tracing::error!(error = %e, "Failed to handle mass cancel command");
472 }
473 }
474 HandlerCommand::BatchCancelOrders { args, request_id } => {
475 if let Err(e) = self.handle_batch_cancel_orders(args, request_id).await {
476 tracing::error!(error = %e, "Failed to handle batch cancel orders command");
477 }
478 }
479 HandlerCommand::BatchPlaceOrders { args, request_id } => {
480 if let Err(e) = self.handle_batch_place_orders(args, request_id).await {
481 tracing::error!(error = %e, "Failed to handle batch place orders command");
482 }
483 }
484 HandlerCommand::BatchAmendOrders { args, request_id } => {
485 if let Err(e) = self.handle_batch_amend_orders(args, request_id).await {
486 tracing::error!(error = %e, "Failed to handle batch amend orders command");
487 }
488 }
489 }
490 continue;
492 }
493
494 _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
495 if self.signal.load(std::sync::atomic::Ordering::Acquire) {
496 tracing::debug!("Stop signal received during idle period");
497 return None;
498 }
499 continue;
500 }
501
502 msg = self.raw_rx.recv() => {
503 let event = match msg {
504 Some(msg) => match Self::parse_raw_message(msg) {
505 Some(event) => event,
506 None => continue,
507 },
508 None => {
509 tracing::debug!("WebSocket stream closed");
510 return None;
511 }
512 };
513
514 let ts_init = self.clock.get_time_ns();
515
516 match event {
517 OKXWsMessage::Ping => {
518 if let Err(e) = self.send_pong().await {
519 tracing::warn!(error = %e, "Failed to send pong response");
520 }
521 continue;
522 }
523 OKXWsMessage::Login {
524 code, msg, conn_id, ..
525 } => {
526 if code == "0" {
527 self.auth_tracker.succeed();
528
529 return Some(NautilusWsMessage::Authenticated);
533 }
534
535 tracing::error!(error = %msg, "WebSocket authentication failed");
536 self.auth_tracker.fail(msg.clone());
537
538 let error = OKXWebSocketError {
539 code,
540 message: msg,
541 conn_id: Some(conn_id),
542 timestamp: self.clock.get_time_ns().as_u64(),
543 };
544 self.pending_messages
545 .push_back(NautilusWsMessage::Error(error));
546 continue;
547 }
548 OKXWsMessage::BookData { arg, action, data } => {
549 if let Some(msg) = self.handle_book_data(arg, action, data, ts_init) {
550 return Some(msg);
551 }
552 continue;
553 }
554 OKXWsMessage::OrderResponse {
555 id,
556 op,
557 code,
558 msg,
559 data,
560 } => {
561 if let Some(msg) = self.handle_order_response(id, op, code, msg, data, ts_init) {
562 return Some(msg);
563 }
564 continue;
565 }
566 OKXWsMessage::Data { arg, data } => {
567 let OKXWebSocketArg {
568 channel, inst_id, ..
569 } = arg;
570
571 match channel {
572 OKXWsChannel::Account => {
573 if let Some(msg) = self.handle_account_data(data, ts_init) {
574 return Some(msg);
575 }
576 continue;
577 }
578 OKXWsChannel::Positions => {
579 self.handle_positions_data(data, ts_init);
580 continue;
581 }
582 OKXWsChannel::Orders => {
583 if let Some(msg) = self.handle_orders_data(data, ts_init) {
584 return Some(msg);
585 }
586 continue;
587 }
588 OKXWsChannel::OrdersAlgo => {
589 if let Some(msg) = self.handle_algo_orders_data(data, ts_init) {
590 return Some(msg);
591 }
592 continue;
593 }
594 _ => {
595 if let Some(msg) =
596 self.handle_other_channel_data(channel, inst_id, data, ts_init)
597 {
598 return Some(msg);
599 }
600 continue;
601 }
602 }
603 }
604 OKXWsMessage::Error { code, msg } => {
605 let error = OKXWebSocketError {
606 code,
607 message: msg,
608 conn_id: None,
609 timestamp: self.clock.get_time_ns().as_u64(),
610 };
611 return Some(NautilusWsMessage::Error(error));
612 }
613 OKXWsMessage::Reconnected => {
614 return Some(NautilusWsMessage::Reconnected);
615 }
616 OKXWsMessage::Subscription {
617 event,
618 arg,
619 code,
620 msg,
621 ..
622 } => {
623 let topic = topic_from_websocket_arg(&arg);
624 let success = code.as_deref().is_none_or(|c| c == "0");
625
626 match event {
627 OKXSubscriptionEvent::Subscribe => {
628 if success {
629 self.subscriptions_state.confirm_subscribe(&topic);
630 } else {
631 tracing::warn!(?topic, error = ?msg, code = ?code, "Subscription failed");
632 self.subscriptions_state.mark_failure(&topic);
633 }
634 }
635 OKXSubscriptionEvent::Unsubscribe => {
636 if success {
637 self.subscriptions_state.confirm_unsubscribe(&topic);
638 } else {
639 tracing::warn!(?topic, error = ?msg, code = ?code, "Unsubscription failed - restoring subscription");
640 self.subscriptions_state.confirm_unsubscribe(&topic); self.subscriptions_state.mark_subscribe(&topic); self.subscriptions_state.confirm_subscribe(&topic); }
645 }
646 }
647
648 continue;
649 }
650 OKXWsMessage::ChannelConnCount { .. } => continue,
651 }
652 }
653
654 else => {
656 tracing::debug!("Handler shutting down: stream ended or command channel closed");
657 return None;
658 }
659 }
660 }
661 }
662
663 pub(super) fn is_post_only_auto_cancel(msg: &OKXOrderMsg) -> bool {
664 if msg.state != OKXOrderStatus::Canceled {
665 return false;
666 }
667
668 let cancel_source_matches = matches!(
669 msg.cancel_source.as_deref(),
670 Some(source) if source == OKX_POST_ONLY_CANCEL_SOURCE
671 );
672
673 let reason_matches = matches!(
674 msg.cancel_source_reason.as_deref(),
675 Some(reason) if reason.contains("POST_ONLY")
676 );
677
678 if !(cancel_source_matches || reason_matches) {
679 return false;
680 }
681
682 msg.acc_fill_sz
683 .as_ref()
684 .is_none_or(|filled| filled == "0" || filled.is_empty())
685 }
686
687 fn try_handle_post_only_auto_cancel(
688 &mut self,
689 msg: &OKXOrderMsg,
690 ts_init: UnixNanos,
691 exec_reports: &mut Vec<ExecutionReport>,
692 ) -> bool {
693 if !Self::is_post_only_auto_cancel(msg) {
694 return false;
695 }
696
697 let Some(client_order_id) = parse_client_order_id(&msg.cl_ord_id) else {
698 return false;
699 };
700
701 let Some((_, (trader_id, strategy_id, instrument_id))) =
702 self.active_client_orders.remove(&client_order_id)
703 else {
704 return false;
705 };
706
707 self.client_id_aliases.remove(&client_order_id);
708
709 if !exec_reports.is_empty() {
710 let reports = std::mem::take(exec_reports);
711 self.pending_messages
712 .push_back(NautilusWsMessage::ExecutionReports(reports));
713 }
714
715 let reason = msg
716 .cancel_source_reason
717 .as_ref()
718 .filter(|reason| !reason.is_empty())
719 .map_or_else(
720 || Ustr::from(OKX_POST_ONLY_CANCEL_REASON),
721 |reason| Ustr::from(reason.as_str()),
722 );
723
724 let ts_event = parse_millisecond_timestamp(msg.u_time);
725 let rejected = OrderRejected::new(
726 trader_id,
727 strategy_id,
728 instrument_id,
729 client_order_id,
730 self.account_id,
731 reason,
732 UUID4::new(),
733 ts_event,
734 ts_init,
735 false,
736 true,
737 );
738
739 self.pending_messages
740 .push_back(NautilusWsMessage::OrderRejected(rejected));
741
742 true
743 }
744
745 fn register_client_order_aliases(
746 &self,
747 raw_child: &Option<ClientOrderId>,
748 parent_from_msg: &Option<ClientOrderId>,
749 ) -> Option<ClientOrderId> {
750 if let Some(parent) = parent_from_msg {
751 self.client_id_aliases.insert(*parent, *parent);
752 if let Some(child) = raw_child.as_ref().filter(|child| **child != *parent) {
753 self.client_id_aliases.insert(*child, *parent);
754 }
755 Some(*parent)
756 } else if let Some(child) = raw_child.as_ref() {
757 if let Some(mapped) = self.client_id_aliases.get(child) {
758 Some(*mapped.value())
759 } else {
760 self.client_id_aliases.insert(*child, *child);
761 Some(*child)
762 }
763 } else {
764 None
765 }
766 }
767
768 fn adjust_execution_report(
769 &self,
770 report: ExecutionReport,
771 effective_client_id: &Option<ClientOrderId>,
772 raw_child: &Option<ClientOrderId>,
773 ) -> ExecutionReport {
774 match report {
775 ExecutionReport::Order(status_report) => {
776 let mut adjusted = status_report;
777 let mut final_id = *effective_client_id;
778
779 if final_id.is_none() {
780 final_id = adjusted.client_order_id;
781 }
782
783 if final_id.is_none()
784 && let Some(child) = raw_child.as_ref()
785 && let Some(mapped) = self.client_id_aliases.get(child)
786 {
787 final_id = Some(*mapped.value());
788 }
789
790 if let Some(final_id_value) = final_id {
791 if adjusted.client_order_id != Some(final_id_value) {
792 adjusted = adjusted.with_client_order_id(final_id_value);
793 }
794 self.client_id_aliases
795 .insert(final_id_value, final_id_value);
796
797 if let Some(child) =
798 raw_child.as_ref().filter(|child| **child != final_id_value)
799 {
800 adjusted = adjusted.with_linked_order_ids(vec![*child]);
801 }
802 }
803
804 ExecutionReport::Order(adjusted)
805 }
806 ExecutionReport::Fill(mut fill_report) => {
807 let mut final_id = *effective_client_id;
808 if final_id.is_none() {
809 final_id = fill_report.client_order_id;
810 }
811 if final_id.is_none()
812 && let Some(child) = raw_child.as_ref()
813 && let Some(mapped) = self.client_id_aliases.get(child)
814 {
815 final_id = Some(*mapped.value());
816 }
817
818 if let Some(final_id_value) = final_id {
819 fill_report.client_order_id = Some(final_id_value);
820 self.client_id_aliases
821 .insert(final_id_value, final_id_value);
822 }
823
824 ExecutionReport::Fill(fill_report)
825 }
826 }
827 }
828
829 fn update_caches_with_report(&mut self, report: &ExecutionReport) {
830 match report {
831 ExecutionReport::Fill(fill_report) => {
832 let order_id = fill_report.venue_order_id.inner();
833 let current_fee = self
834 .fee_cache
835 .get(&order_id)
836 .copied()
837 .unwrap_or_else(|| Money::new(0.0, fill_report.commission.currency));
838 let total_fee = current_fee + fill_report.commission;
839 self.fee_cache.insert(order_id, total_fee);
840
841 let current_filled_qty = self
842 .filled_qty_cache
843 .get(&order_id)
844 .copied()
845 .unwrap_or_else(|| Quantity::zero(fill_report.last_qty.precision));
846 let total_filled_qty = current_filled_qty + fill_report.last_qty;
847 self.filled_qty_cache.insert(order_id, total_filled_qty);
848 }
849 ExecutionReport::Order(status_report) => {
850 if matches!(status_report.order_status, OrderStatus::Filled) {
851 self.fee_cache.remove(&status_report.venue_order_id.inner());
852 self.filled_qty_cache
853 .remove(&status_report.venue_order_id.inner());
854 }
855
856 if matches!(
857 status_report.order_status,
858 OrderStatus::Canceled
859 | OrderStatus::Expired
860 | OrderStatus::Filled
861 | OrderStatus::Rejected,
862 ) {
863 self.emitted_order_accepted
864 .remove(&status_report.venue_order_id);
865 if let Some(client_order_id) = status_report.client_order_id {
866 self.order_state_cache.remove(&client_order_id);
867 self.active_client_orders.remove(&client_order_id);
868 self.client_id_aliases.remove(&client_order_id);
869 }
870 if let Some(linked) = &status_report.linked_order_ids {
871 for child in linked {
872 self.client_id_aliases.remove(child);
873 }
874 }
875 }
876 }
877 }
878 }
879
880 #[allow(clippy::too_many_lines)]
881 fn handle_order_response(
882 &mut self,
883 id: Option<String>,
884 op: OKXWsOperation,
885 code: String,
886 msg: String,
887 data: Vec<Value>,
888 ts_init: UnixNanos,
889 ) -> Option<NautilusWsMessage> {
890 if code == "0" {
891 tracing::debug!("Order operation successful: id={id:?} op={op} code={code}");
892
893 if op == OKXWsOperation::BatchCancelOrders {
894 tracing::debug!(
895 "Batch cancel operation successful: id={id:?} cancel_count={}",
896 data.len()
897 );
898
899 for (idx, entry) in data.iter().enumerate() {
901 if let Some(entry_code) = entry.get("sCode").and_then(|v| v.as_str())
902 && entry_code != "0"
903 {
904 let entry_msg = entry
905 .get("sMsg")
906 .and_then(|v| v.as_str())
907 .unwrap_or("Unknown error");
908
909 if let Some(cl_ord_id_str) = entry
910 .get("clOrdId")
911 .and_then(|v| v.as_str())
912 .filter(|s| !s.is_empty())
913 {
914 tracing::error!(
915 "Batch cancel partial failure for order {}: sCode={} sMsg={}",
916 cl_ord_id_str,
917 entry_code,
918 entry_msg
919 );
920 } else {
922 tracing::error!(
923 "Batch cancel entry[{}] failed: sCode={} sMsg={} data={:?}",
924 idx,
925 entry_code,
926 entry_msg,
927 entry
928 );
929 }
930 }
931 }
932
933 return None;
934 } else if op == OKXWsOperation::MassCancel
935 && let Some(request_id) = &id
936 && let Some(instrument_id) = self.pending_mass_cancel_requests.remove(request_id)
937 {
938 tracing::debug!(
939 "Mass cancel operation successful for instrument: {}",
940 instrument_id
941 );
942 } else if op == OKXWsOperation::Order
943 && let Some(request_id) = &id
944 && let Some((params, client_order_id, trader_id, strategy_id, instrument_id)) =
945 self.pending_place_requests.remove(request_id)
946 {
947 let (venue_order_id, ts_accepted) = if let Some(first) = data.first() {
948 let ord_id = first
949 .get("ordId")
950 .and_then(|v| v.as_str())
951 .filter(|s| !s.is_empty())
952 .map(VenueOrderId::new);
953
954 let ts = first
955 .get("ts")
956 .and_then(|v| v.as_str())
957 .and_then(|s| s.parse::<u64>().ok())
958 .map_or_else(
959 || self.clock.get_time_ns(),
960 |ms| UnixNanos::from(ms * 1_000_000),
961 );
962
963 (ord_id, ts)
964 } else {
965 (None, self.clock.get_time_ns())
966 };
967
968 if let Some(instrument) = self.instruments_cache.get(&instrument_id.symbol.inner())
969 {
970 match params {
971 PendingOrderParams::Regular(order_params) => {
972 let order_type = determine_order_type(
973 order_params.ord_type,
974 order_params.px.as_deref().unwrap_or(""),
975 );
976
977 let is_explicit_quote_sized = order_params
978 .tgt_ccy
979 .is_some_and(|tgt| tgt == OKXTargetCurrency::QuoteCcy);
980
981 let is_implicit_quote_sized = order_params.tgt_ccy.is_none()
983 && order_params.side == OKXSide::Buy
984 && order_type == OrderType::Market
985 && order_params.td_mode == OKXTradeMode::Cash
986 && instrument.instrument_class().as_ref() == "SPOT";
987
988 if is_explicit_quote_sized || is_implicit_quote_sized {
989 tracing::debug!(
994 "Skipping synthetic OrderAccepted for {} quote-sized order: client_order_id={client_order_id}, venue_order_id={venue_order_id:?}",
995 if is_explicit_quote_sized {
996 "explicit"
997 } else {
998 "implicit"
999 },
1000 );
1001 return None;
1002 }
1003
1004 let Some(v_order_id) = venue_order_id else {
1005 tracing::error!(
1006 "No venue_order_id for accepted order: client_order_id={client_order_id}"
1007 );
1008 return None;
1009 };
1010
1011 if self.emitted_order_accepted.contains_key(&v_order_id) {
1013 tracing::debug!(
1014 "Skipping duplicate OrderAccepted from operation response for venue_order_id={v_order_id}"
1015 );
1016 return None;
1017 }
1018 self.emitted_order_accepted.insert(v_order_id, ());
1019
1020 let accepted = OrderAccepted::new(
1021 trader_id,
1022 strategy_id,
1023 instrument_id,
1024 client_order_id,
1025 v_order_id,
1026 self.account_id,
1027 UUID4::new(),
1028 ts_accepted,
1029 ts_init,
1030 false, );
1032
1033 tracing::debug!(
1034 "Order accepted: client_order_id={client_order_id}, venue_order_id={v_order_id}"
1035 );
1036
1037 return Some(NautilusWsMessage::OrderAccepted(accepted));
1038 }
1039 PendingOrderParams::Algo(_) => {
1040 tracing::debug!(
1041 "Algo order placement confirmed: client_order_id={client_order_id}, venue_order_id={:?}",
1042 venue_order_id
1043 );
1044 }
1045 }
1046 } else {
1047 tracing::error!("Instrument not found for accepted order: {instrument_id}");
1048 }
1049 }
1050
1051 if let Some(first) = data.first()
1052 && let Some(success_msg) = first.get("sMsg").and_then(|value| value.as_str())
1053 {
1054 tracing::debug!("Order details: {success_msg}");
1055 }
1056
1057 return None;
1058 }
1059
1060 let error_msg = data
1061 .first()
1062 .and_then(|d| d.get("sMsg"))
1063 .and_then(|s| s.as_str())
1064 .unwrap_or(&msg)
1065 .to_string();
1066
1067 if let Some(first) = data.first() {
1068 tracing::debug!(
1069 "Error data fields: {}",
1070 serde_json::to_string_pretty(first)
1071 .unwrap_or_else(|_| "unable to serialize".to_string())
1072 );
1073 }
1074
1075 tracing::warn!("Order operation failed: id={id:?} op={op} code={code} msg={error_msg}");
1076
1077 let ts_event = self.clock.get_time_ns();
1078
1079 if let Some(request_id) = &id {
1080 match op {
1081 OKXWsOperation::Order => {
1082 if let Some((_params, client_order_id, trader_id, strategy_id, instrument_id)) =
1083 self.pending_place_requests.remove(request_id)
1084 {
1085 let due_post_only = is_post_only_rejection(code.as_str(), &data);
1086 let rejected = OrderRejected::new(
1087 trader_id,
1088 strategy_id,
1089 instrument_id,
1090 client_order_id,
1091 self.account_id,
1092 Ustr::from(error_msg.as_str()),
1093 UUID4::new(),
1094 ts_event,
1095 ts_init,
1096 false, due_post_only,
1098 );
1099
1100 return Some(NautilusWsMessage::OrderRejected(rejected));
1101 }
1102 }
1103 OKXWsOperation::CancelOrder => {
1104 if let Some((
1105 client_order_id,
1106 trader_id,
1107 strategy_id,
1108 instrument_id,
1109 venue_order_id,
1110 )) = self.pending_cancel_requests.remove(request_id)
1111 {
1112 let rejected = OrderCancelRejected::new(
1113 trader_id,
1114 strategy_id,
1115 instrument_id,
1116 client_order_id,
1117 Ustr::from(error_msg.as_str()),
1118 UUID4::new(),
1119 ts_event,
1120 ts_init,
1121 false, venue_order_id,
1123 Some(self.account_id),
1124 );
1125
1126 return Some(NautilusWsMessage::OrderCancelRejected(rejected));
1127 }
1128 }
1129 OKXWsOperation::AmendOrder => {
1130 if let Some((
1131 client_order_id,
1132 trader_id,
1133 strategy_id,
1134 instrument_id,
1135 venue_order_id,
1136 )) = self.pending_amend_requests.remove(request_id)
1137 {
1138 let rejected = OrderModifyRejected::new(
1139 trader_id,
1140 strategy_id,
1141 instrument_id,
1142 client_order_id,
1143 Ustr::from(error_msg.as_str()),
1144 UUID4::new(),
1145 ts_event,
1146 ts_init,
1147 false, venue_order_id,
1149 Some(self.account_id),
1150 );
1151
1152 return Some(NautilusWsMessage::OrderModifyRejected(rejected));
1153 }
1154 }
1155 OKXWsOperation::OrderAlgo => {
1156 if let Some((_params, client_order_id, trader_id, strategy_id, instrument_id)) =
1157 self.pending_place_requests.remove(request_id)
1158 {
1159 let due_post_only = is_post_only_rejection(code.as_str(), &data);
1160 let rejected = OrderRejected::new(
1161 trader_id,
1162 strategy_id,
1163 instrument_id,
1164 client_order_id,
1165 self.account_id,
1166 Ustr::from(error_msg.as_str()),
1167 UUID4::new(),
1168 ts_event,
1169 ts_init,
1170 false, due_post_only,
1172 );
1173
1174 return Some(NautilusWsMessage::OrderRejected(rejected));
1175 }
1176 }
1177 OKXWsOperation::CancelAlgos => {
1178 if let Some((
1179 client_order_id,
1180 trader_id,
1181 strategy_id,
1182 instrument_id,
1183 venue_order_id,
1184 )) = self.pending_cancel_requests.remove(request_id)
1185 {
1186 let rejected = OrderCancelRejected::new(
1187 trader_id,
1188 strategy_id,
1189 instrument_id,
1190 client_order_id,
1191 Ustr::from(error_msg.as_str()),
1192 UUID4::new(),
1193 ts_event,
1194 ts_init,
1195 false, venue_order_id,
1197 Some(self.account_id),
1198 );
1199
1200 return Some(NautilusWsMessage::OrderCancelRejected(rejected));
1201 }
1202 }
1203 OKXWsOperation::MassCancel => {
1204 if let Some(instrument_id) =
1205 self.pending_mass_cancel_requests.remove(request_id)
1206 {
1207 tracing::error!(
1208 "Mass cancel operation failed for {}: code={code} msg={error_msg}",
1209 instrument_id
1210 );
1211 let error = OKXWebSocketError {
1212 code,
1213 message: format!("Mass cancel failed for {instrument_id}: {error_msg}"),
1214 conn_id: None,
1215 timestamp: ts_event.as_u64(),
1216 };
1217 return Some(NautilusWsMessage::Error(error));
1218 } else {
1219 tracing::error!(
1220 "Mass cancel operation failed: code={code} msg={error_msg}"
1221 );
1222 }
1223 }
1224 OKXWsOperation::BatchCancelOrders => {
1225 tracing::warn!(
1226 "Batch cancel operation failed: id={id:?} code={code} msg={error_msg} data_count={}",
1227 data.len()
1228 );
1229
1230 for (idx, entry) in data.iter().enumerate() {
1232 let entry_code =
1233 entry.get("sCode").and_then(|v| v.as_str()).unwrap_or(&code);
1234 let entry_msg = entry
1235 .get("sMsg")
1236 .and_then(|v| v.as_str())
1237 .unwrap_or(&error_msg);
1238
1239 if entry_code != "0" {
1240 if let Some(cl_ord_id_str) = entry
1242 .get("clOrdId")
1243 .and_then(|v| v.as_str())
1244 .filter(|s| !s.is_empty())
1245 {
1246 tracing::error!(
1247 "Batch cancel failed for order {}: sCode={} sMsg={}",
1248 cl_ord_id_str,
1249 entry_code,
1250 entry_msg
1251 );
1252 } else {
1255 tracing::error!(
1256 "Batch cancel entry[{}] failed: sCode={} sMsg={} data={:?}",
1257 idx,
1258 entry_code,
1259 entry_msg,
1260 entry
1261 );
1262 }
1263 }
1264 }
1265
1266 let error = OKXWebSocketError {
1268 code,
1269 message: format!("Batch cancel failed: {error_msg}"),
1270 conn_id: None,
1271 timestamp: ts_event.as_u64(),
1272 };
1273 return Some(NautilusWsMessage::Error(error));
1274 }
1275 _ => tracing::warn!("Unhandled operation type for rejection: {op}"),
1276 }
1277 }
1278
1279 let error = OKXWebSocketError {
1280 code,
1281 message: error_msg,
1282 conn_id: None,
1283 timestamp: ts_event.as_u64(),
1284 };
1285 Some(NautilusWsMessage::Error(error))
1286 }
1287
1288 fn handle_book_data(
1289 &self,
1290 arg: OKXWebSocketArg,
1291 action: OKXBookAction,
1292 data: Vec<OKXBookMsg>,
1293 ts_init: UnixNanos,
1294 ) -> Option<NautilusWsMessage> {
1295 let Some(inst_id) = arg.inst_id else {
1296 tracing::error!("Instrument ID missing for book data event");
1297 return None;
1298 };
1299
1300 let inst = self.instruments_cache.get(&inst_id)?;
1301
1302 let instrument_id = inst.id();
1303 let price_precision = inst.price_precision();
1304 let size_precision = inst.size_precision();
1305
1306 match parse_book_msg_vec(
1307 data,
1308 &instrument_id,
1309 price_precision,
1310 size_precision,
1311 action,
1312 ts_init,
1313 ) {
1314 Ok(payloads) => Some(NautilusWsMessage::Data(payloads)),
1315 Err(e) => {
1316 tracing::error!("Failed to parse book message: {e}");
1317 None
1318 }
1319 }
1320 }
1321
1322 fn handle_account_data(
1323 &mut self,
1324 data: Value,
1325 ts_init: UnixNanos,
1326 ) -> Option<NautilusWsMessage> {
1327 let Value::Array(arr) = data else {
1328 tracing::error!("Account data is not an array");
1329 return None;
1330 };
1331
1332 let first = arr.into_iter().next()?;
1333
1334 let account: OKXAccount = match serde_json::from_value(first) {
1335 Ok(acc) => acc,
1336 Err(e) => {
1337 tracing::error!("Failed to parse account data: {e}");
1338 return None;
1339 }
1340 };
1341
1342 match parse_account_state(&account, self.account_id, ts_init) {
1343 Ok(account_state) => {
1344 if let Some(last_account_state) = &self.last_account_state
1345 && account_state.has_same_balances_and_margins(last_account_state)
1346 {
1347 return None;
1348 }
1349 self.last_account_state = Some(account_state.clone());
1350 Some(NautilusWsMessage::AccountUpdate(account_state))
1351 }
1352 Err(e) => {
1353 tracing::error!("Failed to parse account state: {e}");
1354 None
1355 }
1356 }
1357 }
1358
1359 fn handle_positions_data(&mut self, data: Value, ts_init: UnixNanos) {
1360 match serde_json::from_value::<Vec<OKXPosition>>(data) {
1361 Ok(positions) => {
1362 tracing::debug!("Received {} position update(s)", positions.len());
1363
1364 for position in positions {
1365 let instrument = match self.instruments_cache.get(&position.inst_id) {
1366 Some(inst) => inst,
1367 None => {
1368 tracing::warn!(
1369 "Received position update for unknown instrument {}, skipping",
1370 position.inst_id
1371 );
1372 continue;
1373 }
1374 };
1375
1376 let instrument_id = instrument.id();
1377 let size_precision = instrument.size_precision();
1378
1379 match parse_position_status_report(
1380 position,
1381 self.account_id,
1382 instrument_id,
1383 size_precision,
1384 ts_init,
1385 ) {
1386 Ok(position_report) => {
1387 self.pending_messages
1388 .push_back(NautilusWsMessage::PositionUpdate(position_report));
1389 }
1390 Err(e) => {
1391 tracing::error!(
1392 "Failed to parse position status report for {}: {e}",
1393 instrument_id
1394 );
1395 }
1396 }
1397 }
1398 }
1399 Err(e) => {
1400 tracing::error!("Failed to parse positions data: {e}");
1401 }
1402 }
1403 }
1404
1405 fn handle_orders_data(&mut self, data: Value, ts_init: UnixNanos) -> Option<NautilusWsMessage> {
1406 let orders: Vec<OKXOrderMsg> = match serde_json::from_value(data) {
1407 Ok(orders) => orders,
1408 Err(e) => {
1409 tracing::error!("Failed to deserialize orders channel payload: {e}");
1410 return None;
1411 }
1412 };
1413
1414 tracing::debug!(
1415 "Received {} order message(s) from orders channel",
1416 orders.len()
1417 );
1418
1419 let mut exec_reports: Vec<ExecutionReport> = Vec::with_capacity(orders.len());
1420
1421 for msg in orders {
1422 tracing::debug!(
1423 "Processing order message: inst_id={}, cl_ord_id={}, state={:?}, exec_type={:?}",
1424 msg.inst_id,
1425 msg.cl_ord_id,
1426 msg.state,
1427 msg.exec_type
1428 );
1429
1430 if self.try_handle_post_only_auto_cancel(&msg, ts_init, &mut exec_reports) {
1431 continue;
1432 }
1433
1434 let raw_child = parse_client_order_id(&msg.cl_ord_id);
1435 let parent_from_msg = msg
1436 .algo_cl_ord_id
1437 .as_ref()
1438 .filter(|value| !value.is_empty())
1439 .map(ClientOrderId::new);
1440 let effective_client_id =
1441 self.register_client_order_aliases(&raw_child, &parent_from_msg);
1442
1443 let Some(instrument) = self.instruments_cache.get(&msg.inst_id) else {
1444 tracing::error!(
1445 "No instrument found for inst_id: {inst_id}",
1446 inst_id = msg.inst_id
1447 );
1448 continue;
1449 };
1450 let price_precision = instrument.price_precision();
1451 let size_precision = instrument.size_precision();
1452
1453 let order_metadata = effective_client_id
1454 .and_then(|cid| self.active_client_orders.get(&cid).map(|e| *e.value()));
1455
1456 let previous_fee = self.fee_cache.get(&msg.ord_id).copied();
1457 let previous_filled_qty = self.filled_qty_cache.get(&msg.ord_id).copied();
1458 let previous_state =
1459 effective_client_id.and_then(|cid| self.order_state_cache.get(&cid).cloned());
1460
1461 if let (Some((trader_id, strategy_id, _instrument_id)), Some(canonical_client_id)) =
1463 (order_metadata, effective_client_id)
1464 {
1465 match parse_order_event(
1466 &msg,
1467 canonical_client_id,
1468 self.account_id,
1469 trader_id,
1470 strategy_id,
1471 instrument,
1472 previous_fee,
1473 previous_filled_qty,
1474 previous_state.as_ref(),
1475 ts_init,
1476 ) {
1477 Ok(event) => {
1478 self.process_parsed_order_event(
1479 event,
1480 &msg,
1481 price_precision,
1482 size_precision,
1483 canonical_client_id,
1484 &raw_child,
1485 &mut exec_reports,
1486 );
1487 }
1488 Err(e) => tracing::error!("Failed to parse order event: {e}"),
1489 }
1490 } else {
1491 match parse_order_msg(
1493 &msg,
1494 self.account_id,
1495 &self.instruments_cache,
1496 &self.fee_cache,
1497 &self.filled_qty_cache,
1498 ts_init,
1499 ) {
1500 Ok(report) => {
1501 tracing::debug!("Parsed external order as execution report: {report:?}");
1502 let adjusted =
1503 self.adjust_execution_report(report, &effective_client_id, &raw_child);
1504 self.update_caches_with_report(&adjusted);
1505 exec_reports.push(adjusted);
1506 }
1507 Err(e) => tracing::error!("Failed to parse order message: {e}"),
1508 }
1509 }
1510 }
1511
1512 if !exec_reports.is_empty() {
1513 tracing::debug!(
1514 "Pushing {count} execution report(s) to message queue",
1515 count = exec_reports.len()
1516 );
1517 self.pending_messages
1518 .push_back(NautilusWsMessage::ExecutionReports(exec_reports));
1519 }
1520
1521 self.pending_messages.pop_front()
1522 }
1523
1524 #[allow(clippy::too_many_arguments)]
1526 fn process_parsed_order_event(
1527 &mut self,
1528 event: ParsedOrderEvent,
1529 msg: &OKXOrderMsg,
1530 price_precision: u8,
1531 size_precision: u8,
1532 canonical_client_id: ClientOrderId,
1533 raw_child: &Option<ClientOrderId>,
1534 exec_reports: &mut Vec<ExecutionReport>,
1535 ) {
1536 let venue_order_id = VenueOrderId::new(msg.ord_id);
1537
1538 match event {
1539 ParsedOrderEvent::Accepted(accepted) => {
1540 if self.emitted_order_accepted.contains_key(&venue_order_id) {
1541 tracing::debug!(
1542 "Skipping duplicate OrderAccepted for venue_order_id={venue_order_id}"
1543 );
1544 return;
1545 }
1546 self.emitted_order_accepted.insert(venue_order_id, ());
1547 self.update_order_state_cache(
1548 &canonical_client_id,
1549 msg,
1550 price_precision,
1551 size_precision,
1552 );
1553
1554 self.pending_messages
1555 .push_back(NautilusWsMessage::OrderAccepted(accepted));
1556 }
1557 ParsedOrderEvent::Canceled(canceled) => {
1558 self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1559 self.pending_messages
1560 .push_back(NautilusWsMessage::OrderCanceled(canceled));
1561 }
1562 ParsedOrderEvent::Expired(expired) => {
1563 self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1564 self.pending_messages
1565 .push_back(NautilusWsMessage::OrderExpired(expired));
1566 }
1567 ParsedOrderEvent::Triggered(triggered) => {
1568 self.update_order_state_cache(
1569 &canonical_client_id,
1570 msg,
1571 price_precision,
1572 size_precision,
1573 );
1574 self.pending_messages
1575 .push_back(NautilusWsMessage::OrderTriggered(triggered));
1576 }
1577 ParsedOrderEvent::Updated(updated) => {
1578 self.update_order_state_cache(
1579 &canonical_client_id,
1580 msg,
1581 price_precision,
1582 size_precision,
1583 );
1584 self.pending_messages
1585 .push_back(NautilusWsMessage::OrderUpdated(updated));
1586 }
1587 ParsedOrderEvent::Fill(fill_report) => {
1588 let effective_client_id = Some(canonical_client_id);
1589 let adjusted = self.adjust_execution_report(
1590 ExecutionReport::Fill(fill_report),
1591 &effective_client_id,
1592 raw_child,
1593 );
1594 self.update_caches_with_report(&adjusted);
1595
1596 if msg.state == OKXOrderStatus::Filled {
1597 self.cleanup_terminal_order(&canonical_client_id, &venue_order_id);
1598 }
1599
1600 exec_reports.push(adjusted);
1601 }
1602 ParsedOrderEvent::StatusOnly(status_report) => {
1603 let effective_client_id = Some(canonical_client_id);
1604 let adjusted = self.adjust_execution_report(
1605 ExecutionReport::Order(*status_report),
1606 &effective_client_id,
1607 raw_child,
1608 );
1609 self.update_caches_with_report(&adjusted);
1610 exec_reports.push(adjusted);
1611 }
1612 }
1613 }
1614
1615 fn update_order_state_cache(
1617 &mut self,
1618 client_order_id: &ClientOrderId,
1619 msg: &OKXOrderMsg,
1620 price_precision: u8,
1621 size_precision: u8,
1622 ) {
1623 let venue_order_id = VenueOrderId::new(msg.ord_id);
1624 let quantity = parse_quantity(&msg.sz, size_precision).ok();
1625 let price = if !is_market_price(&msg.px) {
1626 parse_price(&msg.px, price_precision).ok()
1627 } else {
1628 None
1629 };
1630
1631 if let Some(qty) = quantity {
1632 self.order_state_cache.insert(
1633 *client_order_id,
1634 OrderStateSnapshot {
1635 venue_order_id,
1636 quantity: qty,
1637 price,
1638 },
1639 );
1640 }
1641 }
1642
1643 fn cleanup_terminal_order(
1645 &mut self,
1646 client_order_id: &ClientOrderId,
1647 venue_order_id: &VenueOrderId,
1648 ) {
1649 self.emitted_order_accepted.remove(venue_order_id);
1650 self.order_state_cache.remove(client_order_id);
1651 self.active_client_orders.remove(client_order_id);
1652 self.client_id_aliases.remove(client_order_id);
1653 self.client_id_aliases.retain(|_, v| *v != *client_order_id);
1654
1655 self.fee_cache.remove(&venue_order_id.inner());
1656 self.filled_qty_cache.remove(&venue_order_id.inner());
1657 }
1658
1659 fn handle_algo_orders_data(
1660 &mut self,
1661 data: Value,
1662 ts_init: UnixNanos,
1663 ) -> Option<NautilusWsMessage> {
1664 let orders: Vec<OKXAlgoOrderMsg> = match serde_json::from_value(data) {
1665 Ok(orders) => orders,
1666 Err(e) => {
1667 tracing::error!("Failed to deserialize algo orders payload: {e}");
1668 return None;
1669 }
1670 };
1671
1672 let mut exec_reports: Vec<ExecutionReport> = Vec::with_capacity(orders.len());
1673
1674 for msg in orders {
1675 let raw_child = parse_client_order_id(&msg.cl_ord_id);
1676 let parent_from_msg = parse_client_order_id(&msg.algo_cl_ord_id);
1677 let effective_client_id =
1678 self.register_client_order_aliases(&raw_child, &parent_from_msg);
1679
1680 match parse_algo_order_msg(msg, self.account_id, &self.instruments_cache, ts_init) {
1681 Ok(report) => {
1682 let adjusted =
1683 self.adjust_execution_report(report, &effective_client_id, &raw_child);
1684 self.update_caches_with_report(&adjusted);
1685 exec_reports.push(adjusted);
1686 }
1687 Err(e) => {
1688 tracing::error!("Failed to parse algo order message: {e}");
1689 }
1690 }
1691 }
1692
1693 if !exec_reports.is_empty() {
1694 Some(NautilusWsMessage::ExecutionReports(exec_reports))
1695 } else {
1696 None
1697 }
1698 }
1699
1700 fn handle_other_channel_data(
1701 &mut self,
1702 channel: OKXWsChannel,
1703 inst_id: Option<Ustr>,
1704 data: Value,
1705 ts_init: UnixNanos,
1706 ) -> Option<NautilusWsMessage> {
1707 let Some(inst_id) = inst_id else {
1708 tracing::error!("No instrument for channel {:?}", channel);
1709 return None;
1710 };
1711
1712 let Some(instrument) = self.instruments_cache.get(&inst_id) else {
1713 tracing::error!(
1714 "No instrument for channel {:?}, inst_id {:?}",
1715 channel,
1716 inst_id
1717 );
1718 return None;
1719 };
1720
1721 let instrument_id = instrument.id();
1722 let price_precision = instrument.price_precision();
1723 let size_precision = instrument.size_precision();
1724
1725 match parse_ws_message_data(
1726 &channel,
1727 data,
1728 &instrument_id,
1729 price_precision,
1730 size_precision,
1731 ts_init,
1732 &mut self.funding_rate_cache,
1733 &self.instruments_cache,
1734 ) {
1735 Ok(Some(msg)) => {
1736 if let NautilusWsMessage::Instrument(ref inst) = msg {
1737 self.instruments_cache
1738 .insert(inst.symbol().inner(), inst.as_ref().clone());
1739 }
1740 Some(msg)
1741 }
1742 Ok(None) => None,
1743 Err(e) => {
1744 tracing::error!("Error parsing message for channel {:?}: {e}", channel);
1745 None
1746 }
1747 }
1748 }
1749
1750 pub(crate) fn parse_raw_message(
1751 msg: tokio_tungstenite::tungstenite::Message,
1752 ) -> Option<OKXWsMessage> {
1753 match msg {
1754 tokio_tungstenite::tungstenite::Message::Text(text) => {
1755 if text == TEXT_PONG {
1756 tracing::trace!("Received pong from OKX");
1757 return None;
1758 }
1759 if text == TEXT_PING {
1760 tracing::trace!("Received ping from OKX (text)");
1761 return Some(OKXWsMessage::Ping);
1762 }
1763
1764 if text == RECONNECTED {
1765 tracing::debug!("Received WebSocket reconnection signal");
1766 return Some(OKXWsMessage::Reconnected);
1767 }
1768 tracing::trace!("Received WebSocket message: {text}");
1769
1770 match serde_json::from_str(&text) {
1771 Ok(ws_event) => match &ws_event {
1772 OKXWsMessage::Error { code, msg } => {
1773 tracing::error!("WebSocket error: {code} - {msg}");
1774 Some(ws_event)
1775 }
1776 OKXWsMessage::Login {
1777 event,
1778 code,
1779 msg,
1780 conn_id,
1781 } => {
1782 if code == "0" {
1783 tracing::info!(conn_id = %conn_id, "WebSocket authenticated");
1784 } else {
1785 tracing::error!(event = %event, code = %code, error = %msg, "WebSocket authentication failed");
1786 }
1787 Some(ws_event)
1788 }
1789 OKXWsMessage::Subscription {
1790 event,
1791 arg,
1792 conn_id,
1793 ..
1794 } => {
1795 let channel_str = serde_json::to_string(&arg.channel)
1796 .expect("Invalid OKX websocket channel")
1797 .trim_matches('"')
1798 .to_string();
1799 tracing::debug!("{event}d: channel={channel_str}, conn_id={conn_id}");
1800 Some(ws_event)
1801 }
1802 OKXWsMessage::ChannelConnCount {
1803 event: _,
1804 channel,
1805 conn_count,
1806 conn_id,
1807 } => {
1808 let channel_str = serde_json::to_string(&channel)
1809 .expect("Invalid OKX websocket channel")
1810 .trim_matches('"')
1811 .to_string();
1812 tracing::debug!(
1813 "Channel connection status: channel={channel_str}, connections={conn_count}, conn_id={conn_id}",
1814 );
1815 None
1816 }
1817 OKXWsMessage::Ping => {
1818 tracing::trace!("Ignoring ping event parsed from text payload");
1819 None
1820 }
1821 OKXWsMessage::Data { .. } => Some(ws_event),
1822 OKXWsMessage::BookData { .. } => Some(ws_event),
1823 OKXWsMessage::OrderResponse {
1824 id,
1825 op,
1826 code,
1827 msg: _,
1828 data,
1829 } => {
1830 if code == "0" {
1831 tracing::debug!(
1832 "Order operation successful: id={:?}, op={op}, code={code}",
1833 id
1834 );
1835
1836 if let Some(order_data) = data.first() {
1837 let success_msg = order_data
1838 .get("sMsg")
1839 .and_then(|s| s.as_str())
1840 .unwrap_or("Order operation successful");
1841 tracing::debug!("Order success details: {success_msg}");
1842 }
1843 }
1844 Some(ws_event)
1845 }
1846 OKXWsMessage::Reconnected => {
1847 tracing::warn!("Unexpected Reconnected event from deserialization");
1849 None
1850 }
1851 },
1852 Err(e) => {
1853 tracing::error!("Failed to parse message: {e}: {text}");
1854 None
1855 }
1856 }
1857 }
1858 Message::Ping(_payload) => {
1859 tracing::trace!("Received binary ping frame from OKX");
1860 Some(OKXWsMessage::Ping)
1861 }
1862 Message::Pong(payload) => {
1863 tracing::trace!("Received pong frame from OKX ({} bytes)", payload.len());
1864 None
1865 }
1866 Message::Binary(msg) => {
1867 tracing::debug!("Raw binary: {msg:?}");
1868 None
1869 }
1870 Message::Close(_) => {
1871 tracing::debug!("Received close message");
1872 None
1873 }
1874 msg => {
1875 tracing::warn!("Unexpected message: {msg}");
1876 None
1877 }
1878 }
1879 }
1880
1881 fn generate_unique_request_id(&self) -> String {
1882 self.request_id_counter
1883 .fetch_add(1, Ordering::SeqCst)
1884 .to_string()
1885 }
1886
1887 fn get_instrument_type_and_family_from_instrument(
1888 instrument: &InstrumentAny,
1889 ) -> anyhow::Result<(OKXInstrumentType, String)> {
1890 let inst_type = okx_instrument_type(instrument)?;
1891 let symbol = instrument.symbol().inner();
1892
1893 let inst_family = match instrument {
1895 InstrumentAny::CurrencyPair(_) => symbol.as_str().to_string(),
1896 InstrumentAny::CryptoPerpetual(_) => {
1897 symbol
1899 .as_str()
1900 .strip_suffix("-SWAP")
1901 .unwrap_or(symbol.as_str())
1902 .to_string()
1903 }
1904 InstrumentAny::CryptoFuture(_) => {
1905 let s = symbol.as_str();
1908 if let Some(idx) = s.rfind('-') {
1909 s[..idx].to_string()
1910 } else {
1911 s.to_string()
1912 }
1913 }
1914 _ => {
1915 anyhow::bail!("Unsupported instrument type for OKX");
1916 }
1917 };
1918
1919 Ok((inst_type, inst_family))
1920 }
1921
1922 async fn handle_mass_cancel(&mut self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1923 let instrument = self
1924 .instruments_cache
1925 .get(&instrument_id.symbol.inner())
1926 .ok_or_else(|| anyhow::anyhow!("Unknown instrument {instrument_id}"))?;
1927
1928 let (inst_type, inst_family) =
1929 Self::get_instrument_type_and_family_from_instrument(instrument)?;
1930
1931 let params = WsMassCancelParams {
1932 inst_type,
1933 inst_family: Ustr::from(&inst_family),
1934 };
1935
1936 let args =
1937 vec![serde_json::to_value(params).map_err(|e| anyhow::anyhow!("JSON error: {e}"))?];
1938
1939 let request_id = self.generate_unique_request_id();
1940
1941 self.pending_mass_cancel_requests
1942 .insert(request_id.clone(), instrument_id);
1943
1944 let request = OKXWsRequest {
1945 id: Some(request_id.clone()),
1946 op: OKXWsOperation::MassCancel,
1947 exp_time: None,
1948 args,
1949 };
1950
1951 let payload = serde_json::to_string(&request)
1952 .map_err(|e| anyhow::anyhow!("Failed to serialize mass cancel request: {e}"))?;
1953
1954 match self
1955 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
1956 .await
1957 {
1958 Ok(()) => {
1959 tracing::debug!("Sent mass cancel for {instrument_id}");
1960 Ok(())
1961 }
1962 Err(e) => {
1963 tracing::error!(error = %e, "Failed to send mass cancel after retries");
1964
1965 self.pending_mass_cancel_requests.remove(&request_id);
1966
1967 let error = OKXWebSocketError {
1968 code: "CLIENT_ERROR".to_string(),
1969 message: format!("Mass cancel failed for {instrument_id}: {e}"),
1970 conn_id: None,
1971 timestamp: self.clock.get_time_ns().as_u64(),
1972 };
1973 let _ = self.send(NautilusWsMessage::Error(error));
1974
1975 Err(anyhow::anyhow!("Failed to send mass cancel: {e}"))
1976 }
1977 }
1978 }
1979
1980 async fn handle_batch_cancel_orders(
1981 &self,
1982 args: Vec<Value>,
1983 request_id: String,
1984 ) -> anyhow::Result<()> {
1985 let request = OKXWsRequest {
1986 id: Some(request_id),
1987 op: OKXWsOperation::BatchCancelOrders,
1988 exp_time: None,
1989 args,
1990 };
1991
1992 let payload = serde_json::to_string(&request)
1993 .map_err(|e| anyhow::anyhow!("Failed to serialize batch cancel request: {e}"))?;
1994
1995 if let Some(client) = &self.inner {
1996 client
1997 .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
1998 .await
1999 .map_err(|e| anyhow::anyhow!("Failed to send batch cancel: {e}"))?;
2000 tracing::debug!("Sent batch cancel orders");
2001 Ok(())
2002 } else {
2003 Err(anyhow::anyhow!("No active WebSocket client"))
2004 }
2005 }
2006
2007 async fn handle_batch_place_orders(
2008 &self,
2009 args: Vec<Value>,
2010 request_id: String,
2011 ) -> anyhow::Result<()> {
2012 let request = OKXWsRequest {
2013 id: Some(request_id),
2014 op: OKXWsOperation::BatchOrders,
2015 exp_time: None,
2016 args,
2017 };
2018
2019 let payload = serde_json::to_string(&request)
2020 .map_err(|e| anyhow::anyhow!("Failed to serialize batch place request: {e}"))?;
2021
2022 if let Some(client) = &self.inner {
2023 client
2024 .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2025 .await
2026 .map_err(|e| anyhow::anyhow!("Failed to send batch place: {e}"))?;
2027 tracing::debug!("Sent batch place orders");
2028 Ok(())
2029 } else {
2030 Err(anyhow::anyhow!("No active WebSocket client"))
2031 }
2032 }
2033
2034 async fn handle_batch_amend_orders(
2035 &self,
2036 args: Vec<Value>,
2037 request_id: String,
2038 ) -> anyhow::Result<()> {
2039 let request = OKXWsRequest {
2040 id: Some(request_id),
2041 op: OKXWsOperation::BatchAmendOrders,
2042 exp_time: None,
2043 args,
2044 };
2045
2046 let payload = serde_json::to_string(&request)
2047 .map_err(|e| anyhow::anyhow!("Failed to serialize batch amend request: {e}"))?;
2048
2049 if let Some(client) = &self.inner {
2050 client
2051 .send_text(payload, Some(vec![OKX_RATE_LIMIT_KEY_AMEND.to_string()]))
2052 .await
2053 .map_err(|e| anyhow::anyhow!("Failed to send batch amend: {e}"))?;
2054 tracing::debug!("Sent batch amend orders");
2055 Ok(())
2056 } else {
2057 Err(anyhow::anyhow!("No active WebSocket client"))
2058 }
2059 }
2060
2061 async fn handle_subscribe(&self, args: Vec<OKXSubscriptionArg>) -> anyhow::Result<()> {
2062 for arg in &args {
2063 tracing::debug!(channel = ?arg.channel, inst_id = ?arg.inst_id, "Subscribing to channel");
2064 }
2065
2066 let message = OKXSubscription {
2067 op: OKXWsOperation::Subscribe,
2068 args,
2069 };
2070
2071 let json_txt = serde_json::to_string(&message)
2072 .map_err(|e| anyhow::anyhow!("Failed to serialize subscription: {e}"))?;
2073
2074 self.send_with_retry(
2075 json_txt,
2076 Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
2077 )
2078 .await
2079 .map_err(|e| anyhow::anyhow!("Failed to send subscription after retries: {e}"))?;
2080 Ok(())
2081 }
2082
2083 async fn handle_unsubscribe(&self, args: Vec<OKXSubscriptionArg>) -> anyhow::Result<()> {
2084 for arg in &args {
2085 tracing::debug!(channel = ?arg.channel, inst_id = ?arg.inst_id, "Unsubscribing from channel");
2086 }
2087
2088 let message = OKXSubscription {
2089 op: OKXWsOperation::Unsubscribe,
2090 args,
2091 };
2092
2093 let json_txt = serde_json::to_string(&message)
2094 .map_err(|e| anyhow::anyhow!("Failed to serialize unsubscription: {e}"))?;
2095
2096 self.send_with_retry(
2097 json_txt,
2098 Some(vec![OKX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
2099 )
2100 .await
2101 .map_err(|e| anyhow::anyhow!("Failed to send unsubscription after retries: {e}"))?;
2102 Ok(())
2103 }
2104
2105 async fn handle_place_order(
2106 &mut self,
2107 params: WsPostOrderParams,
2108 client_order_id: ClientOrderId,
2109 trader_id: TraderId,
2110 strategy_id: StrategyId,
2111 instrument_id: InstrumentId,
2112 ) -> anyhow::Result<()> {
2113 let request_id = self.generate_unique_request_id();
2114
2115 self.pending_place_requests.insert(
2116 request_id.clone(),
2117 (
2118 PendingOrderParams::Regular(params.clone()),
2119 client_order_id,
2120 trader_id,
2121 strategy_id,
2122 instrument_id,
2123 ),
2124 );
2125
2126 let request = OKXWsRequest {
2127 id: Some(request_id.clone()),
2128 op: OKXWsOperation::Order,
2129 exp_time: None,
2130 args: vec![params],
2131 };
2132
2133 let payload = serde_json::to_string(&request)
2134 .map_err(|e| anyhow::anyhow!("Failed to serialize place order request: {e}"))?;
2135
2136 match self
2137 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2138 .await
2139 {
2140 Ok(()) => {
2141 tracing::debug!("Sent place order request");
2142 Ok(())
2143 }
2144 Err(e) => {
2145 tracing::error!(error = %e, "Failed to send place order after retries");
2146
2147 self.pending_place_requests.remove(&request_id);
2148
2149 let ts_now = self.clock.get_time_ns();
2150 let rejected = OrderRejected::new(
2151 trader_id,
2152 strategy_id,
2153 instrument_id,
2154 client_order_id,
2155 self.account_id,
2156 Ustr::from(&format!("WebSocket send failed: {e}")),
2157 UUID4::new(),
2158 ts_now, ts_now, false, false, );
2163 let _ = self.send(NautilusWsMessage::OrderRejected(rejected));
2164
2165 Err(anyhow::anyhow!("Failed to send place order: {e}"))
2166 }
2167 }
2168 }
2169
2170 async fn handle_place_algo_order(
2171 &mut self,
2172 params: WsPostAlgoOrderParams,
2173 client_order_id: ClientOrderId,
2174 trader_id: TraderId,
2175 strategy_id: StrategyId,
2176 instrument_id: InstrumentId,
2177 ) -> anyhow::Result<()> {
2178 let request_id = self.generate_unique_request_id();
2179
2180 self.pending_place_requests.insert(
2181 request_id.clone(),
2182 (
2183 PendingOrderParams::Algo(params.clone()),
2184 client_order_id,
2185 trader_id,
2186 strategy_id,
2187 instrument_id,
2188 ),
2189 );
2190
2191 let request = OKXWsRequest {
2192 id: Some(request_id.clone()),
2193 op: OKXWsOperation::OrderAlgo,
2194 exp_time: None,
2195 args: vec![params],
2196 };
2197
2198 let payload = serde_json::to_string(&request)
2199 .map_err(|e| anyhow::anyhow!("Failed to serialize place algo order request: {e}"))?;
2200
2201 match self
2202 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()]))
2203 .await
2204 {
2205 Ok(()) => {
2206 tracing::debug!("Sent place algo order request");
2207 Ok(())
2208 }
2209 Err(e) => {
2210 tracing::error!(error = %e, "Failed to send place algo order after retries");
2211
2212 self.pending_place_requests.remove(&request_id);
2213
2214 let ts_now = self.clock.get_time_ns();
2215 let rejected = OrderRejected::new(
2216 trader_id,
2217 strategy_id,
2218 instrument_id,
2219 client_order_id,
2220 self.account_id,
2221 Ustr::from(&format!("WebSocket send failed: {e}")),
2222 UUID4::new(),
2223 ts_now, ts_now, false, false, );
2228 let _ = self.send(NautilusWsMessage::OrderRejected(rejected));
2229
2230 Err(anyhow::anyhow!("Failed to send place algo order: {e}"))
2231 }
2232 }
2233 }
2234
2235 async fn handle_cancel_order(
2236 &mut self,
2237 client_order_id: Option<ClientOrderId>,
2238 venue_order_id: Option<VenueOrderId>,
2239 instrument_id: InstrumentId,
2240 trader_id: TraderId,
2241 strategy_id: StrategyId,
2242 ) -> anyhow::Result<()> {
2243 let mut builder = WsCancelOrderParamsBuilder::default();
2244 builder.inst_id(instrument_id.symbol.as_str());
2245
2246 if let Some(venue_order_id) = venue_order_id {
2247 builder.ord_id(venue_order_id.as_str());
2248 }
2249
2250 if let Some(client_order_id) = client_order_id {
2251 builder.cl_ord_id(client_order_id.as_str());
2252 }
2253
2254 let params = builder
2255 .build()
2256 .map_err(|e| anyhow::anyhow!("Failed to build cancel params: {e}"))?;
2257
2258 let request_id = self.generate_unique_request_id();
2259
2260 if let Some(client_order_id) = client_order_id {
2262 self.pending_cancel_requests.insert(
2263 request_id.clone(),
2264 (
2265 client_order_id,
2266 trader_id,
2267 strategy_id,
2268 instrument_id,
2269 venue_order_id,
2270 ),
2271 );
2272 }
2273
2274 let request = OKXWsRequest {
2275 id: Some(request_id.clone()),
2276 op: OKXWsOperation::CancelOrder,
2277 exp_time: None,
2278 args: vec![params],
2279 };
2280
2281 let payload = serde_json::to_string(&request)
2282 .map_err(|e| anyhow::anyhow!("Failed to serialize cancel request: {e}"))?;
2283
2284 match self
2285 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
2286 .await
2287 {
2288 Ok(()) => {
2289 tracing::debug!("Sent cancel order request");
2290 Ok(())
2291 }
2292 Err(e) => {
2293 tracing::error!(error = %e, "Failed to send cancel order after retries");
2294
2295 self.pending_cancel_requests.remove(&request_id);
2296
2297 if let Some(client_order_id) = client_order_id {
2298 let ts_now = self.clock.get_time_ns();
2299 let rejected = OrderCancelRejected::new(
2300 trader_id,
2301 strategy_id,
2302 instrument_id,
2303 client_order_id,
2304 Ustr::from(&format!("WebSocket send failed: {e}")),
2305 UUID4::new(),
2306 ts_now, ts_now, false, venue_order_id,
2310 Some(self.account_id),
2311 );
2312 let _ = self.send(NautilusWsMessage::OrderCancelRejected(rejected));
2313 }
2314
2315 Err(anyhow::anyhow!("Failed to send cancel order: {e}"))
2316 }
2317 }
2318 }
2319
2320 async fn handle_amend_order(
2321 &mut self,
2322 params: WsAmendOrderParams,
2323 client_order_id: ClientOrderId,
2324 trader_id: TraderId,
2325 strategy_id: StrategyId,
2326 instrument_id: InstrumentId,
2327 venue_order_id: Option<VenueOrderId>,
2328 ) -> anyhow::Result<()> {
2329 let request_id = self.generate_unique_request_id();
2330
2331 self.pending_amend_requests.insert(
2332 request_id.clone(),
2333 (
2334 client_order_id,
2335 trader_id,
2336 strategy_id,
2337 instrument_id,
2338 venue_order_id,
2339 ),
2340 );
2341
2342 let request = OKXWsRequest {
2343 id: Some(request_id.clone()),
2344 op: OKXWsOperation::AmendOrder,
2345 exp_time: None,
2346 args: vec![params],
2347 };
2348
2349 let payload = serde_json::to_string(&request)
2350 .map_err(|e| anyhow::anyhow!("Failed to serialize amend order request: {e}"))?;
2351
2352 match self
2353 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_AMEND.to_string()]))
2354 .await
2355 {
2356 Ok(()) => {
2357 tracing::debug!("Sent amend order request");
2358 Ok(())
2359 }
2360 Err(e) => {
2361 tracing::error!(error = %e, "Failed to send amend order after retries");
2362
2363 self.pending_amend_requests.remove(&request_id);
2364
2365 let ts_now = self.clock.get_time_ns();
2366 let rejected = OrderModifyRejected::new(
2367 trader_id,
2368 strategy_id,
2369 instrument_id,
2370 client_order_id,
2371 Ustr::from(&format!("WebSocket send failed: {e}")),
2372 UUID4::new(),
2373 ts_now, ts_now, false, venue_order_id,
2377 Some(self.account_id),
2378 );
2379 let _ = self.send(NautilusWsMessage::OrderModifyRejected(rejected));
2380
2381 Err(anyhow::anyhow!("Failed to send amend order: {e}"))
2382 }
2383 }
2384 }
2385
2386 async fn handle_cancel_algo_order(
2387 &mut self,
2388 client_order_id: Option<ClientOrderId>,
2389 algo_order_id: Option<VenueOrderId>,
2390 instrument_id: InstrumentId,
2391 trader_id: TraderId,
2392 strategy_id: StrategyId,
2393 ) -> anyhow::Result<()> {
2394 let mut builder = WsCancelAlgoOrderParamsBuilder::default();
2395 builder.inst_id(instrument_id.symbol.as_str());
2396
2397 if let Some(client_order_id) = &client_order_id {
2398 builder.algo_cl_ord_id(client_order_id.as_str());
2399 }
2400
2401 if let Some(algo_id) = &algo_order_id {
2402 builder.algo_id(algo_id.as_str());
2403 }
2404
2405 let params = builder
2406 .build()
2407 .map_err(|e| anyhow::anyhow!("Failed to build cancel algo params: {e}"))?;
2408
2409 let request_id = self.generate_unique_request_id();
2410
2411 if let Some(client_order_id) = client_order_id {
2413 self.pending_cancel_requests.insert(
2414 request_id.clone(),
2415 (client_order_id, trader_id, strategy_id, instrument_id, None),
2416 );
2417 }
2418
2419 let request = OKXWsRequest {
2420 id: Some(request_id.clone()),
2421 op: OKXWsOperation::CancelAlgos,
2422 exp_time: None,
2423 args: vec![params],
2424 };
2425
2426 let payload = serde_json::to_string(&request)
2427 .map_err(|e| anyhow::anyhow!("Failed to serialize cancel algo request: {e}"))?;
2428
2429 match self
2430 .send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_CANCEL.to_string()]))
2431 .await
2432 {
2433 Ok(()) => {
2434 tracing::debug!("Sent cancel algo order request");
2435 Ok(())
2436 }
2437 Err(e) => {
2438 tracing::error!(error = %e, "Failed to send cancel algo order after retries");
2439
2440 self.pending_cancel_requests.remove(&request_id);
2441
2442 if let Some(client_order_id) = client_order_id {
2443 let ts_now = self.clock.get_time_ns();
2444 let rejected = OrderCancelRejected::new(
2445 trader_id,
2446 strategy_id,
2447 instrument_id,
2448 client_order_id,
2449 Ustr::from(&format!("WebSocket send failed: {e}")),
2450 UUID4::new(),
2451 ts_now, ts_now, false, None,
2455 Some(self.account_id),
2456 );
2457 let _ = self.send(NautilusWsMessage::OrderCancelRejected(rejected));
2458 }
2459
2460 Err(anyhow::anyhow!("Failed to send cancel algo order: {e}"))
2461 }
2462 }
2463 }
2464}
2465
2466pub fn is_post_only_rejection(code: &str, data: &[Value]) -> bool {
2468 if code == OKX_POST_ONLY_ERROR_CODE {
2469 return true;
2470 }
2471
2472 for entry in data {
2473 if let Some(s_code) = entry.get("sCode").and_then(|value| value.as_str())
2474 && s_code == OKX_POST_ONLY_ERROR_CODE
2475 {
2476 return true;
2477 }
2478
2479 if let Some(inner_code) = entry.get("code").and_then(|value| value.as_str())
2480 && inner_code == OKX_POST_ONLY_ERROR_CODE
2481 {
2482 return true;
2483 }
2484 }
2485
2486 false
2487}
2488
2489#[inline]
2491fn contains_ignore_ascii_case(haystack: &str, needle: &str) -> bool {
2492 haystack
2493 .as_bytes()
2494 .windows(needle.len())
2495 .any(|window| window.eq_ignore_ascii_case(needle.as_bytes()))
2496}
2497
2498fn should_retry_okx_error(error: &OKXWsError) -> bool {
2500 match error {
2501 OKXWsError::OkxError { error_code, .. } => should_retry_error_code(error_code),
2502 OKXWsError::TungsteniteError(_) => true, OKXWsError::ClientError(msg) => {
2504 contains_ignore_ascii_case(msg, "timeout")
2506 || contains_ignore_ascii_case(msg, "timed out")
2507 || contains_ignore_ascii_case(msg, "connection")
2508 || contains_ignore_ascii_case(msg, "network")
2509 }
2510 OKXWsError::AuthenticationError(_)
2511 | OKXWsError::JsonError(_)
2512 | OKXWsError::ParsingError(_) => {
2513 false
2515 }
2516 }
2517}
2518
2519fn create_okx_timeout_error(msg: String) -> OKXWsError {
2521 OKXWsError::ClientError(msg)
2522}
2523
2524#[cfg(test)]
2525mod tests {
2526 use std::sync::{Arc, atomic::AtomicBool};
2527
2528 use ahash::AHashMap;
2529 use dashmap::DashMap;
2530 use nautilus_model::{
2531 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
2532 types::{Money, Quantity},
2533 };
2534 use nautilus_network::websocket::{AuthTracker, SubscriptionState};
2535 use rstest::rstest;
2536
2537 use super::{NautilusWsMessage, OKXWsFeedHandler};
2538 use crate::websocket::parse::OrderStateSnapshot;
2539
2540 const OKX_WS_TOPIC_DELIMITER: char = ':';
2541
2542 #[allow(clippy::type_complexity)]
2543 fn create_test_handler() -> (
2544 OKXWsFeedHandler,
2545 tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>,
2546 Arc<DashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>>,
2547 Arc<DashMap<ClientOrderId, ClientOrderId>>,
2548 ) {
2549 let account_id = AccountId::new("OKX-001");
2550 let signal = Arc::new(AtomicBool::new(false));
2551 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
2552 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
2553 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
2554 let active_client_orders = Arc::new(DashMap::new());
2555 let client_id_aliases = Arc::new(DashMap::new());
2556 let auth_tracker = AuthTracker::new();
2557 let subscriptions_state = SubscriptionState::new(OKX_WS_TOPIC_DELIMITER);
2558
2559 let handler = OKXWsFeedHandler::new(
2560 account_id,
2561 signal,
2562 cmd_rx,
2563 raw_rx,
2564 out_tx,
2565 active_client_orders.clone(),
2566 client_id_aliases.clone(),
2567 auth_tracker,
2568 subscriptions_state,
2569 );
2570
2571 (handler, out_rx, active_client_orders, client_id_aliases)
2572 }
2573
2574 #[rstest]
2575 fn test_is_post_only_rejection_detects_by_code() {
2576 assert!(super::is_post_only_rejection("51019", &[]));
2577 }
2578
2579 #[rstest]
2580 fn test_is_post_only_rejection_detects_by_inner_code() {
2581 let data = vec![serde_json::json!({
2582 "sCode": "51019"
2583 })];
2584 assert!(super::is_post_only_rejection("50000", &data));
2585 }
2586
2587 #[rstest]
2588 fn test_is_post_only_rejection_false_for_unrelated_error() {
2589 let data = vec![serde_json::json!({
2590 "sMsg": "Insufficient balance"
2591 })];
2592 assert!(!super::is_post_only_rejection("50000", &data));
2593 }
2594
2595 #[rstest]
2596 fn test_cleanup_alias_removes_canonical_entry() {
2597 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2598 let canonical = ClientOrderId::new("PARENT-001");
2599 aliases.insert(canonical, canonical);
2600
2601 aliases.remove(&canonical);
2602 aliases.retain(|_, v| *v != canonical);
2603
2604 assert!(!aliases.contains_key(&canonical));
2605 assert!(aliases.is_empty());
2606 }
2607
2608 #[rstest]
2609 fn test_cleanup_alias_removes_child_alias_pointing_to_canonical() {
2610 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2611 let canonical = ClientOrderId::new("PARENT-001");
2612 let child = ClientOrderId::new("CHILD-001");
2613 aliases.insert(canonical, canonical);
2614 aliases.insert(child, canonical);
2615
2616 aliases.remove(&canonical);
2617 aliases.retain(|_, v| *v != canonical);
2618
2619 assert!(!aliases.contains_key(&canonical));
2620 assert!(!aliases.contains_key(&child));
2621 assert!(aliases.is_empty());
2622 }
2623
2624 #[rstest]
2625 fn test_cleanup_alias_does_not_affect_unrelated_entries() {
2626 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2627 let canonical1 = ClientOrderId::new("PARENT-001");
2628 let child1 = ClientOrderId::new("CHILD-001");
2629 let canonical2 = ClientOrderId::new("PARENT-002");
2630 let child2 = ClientOrderId::new("CHILD-002");
2631 aliases.insert(canonical1, canonical1);
2632 aliases.insert(child1, canonical1);
2633 aliases.insert(canonical2, canonical2);
2634 aliases.insert(child2, canonical2);
2635
2636 aliases.remove(&canonical1);
2637 aliases.retain(|_, v| *v != canonical1);
2638
2639 assert!(!aliases.contains_key(&canonical1));
2640 assert!(!aliases.contains_key(&child1));
2641 assert!(aliases.contains_key(&canonical2));
2642 assert!(aliases.contains_key(&child2));
2643 assert_eq!(aliases.len(), 2);
2644 }
2645
2646 #[rstest]
2647 fn test_cleanup_alias_handles_multiple_children() {
2648 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2649 let canonical = ClientOrderId::new("PARENT-001");
2650 let child1 = ClientOrderId::new("CHILD-001");
2651 let child2 = ClientOrderId::new("CHILD-002");
2652 let child3 = ClientOrderId::new("CHILD-003");
2653 aliases.insert(canonical, canonical);
2654 aliases.insert(child1, canonical);
2655 aliases.insert(child2, canonical);
2656 aliases.insert(child3, canonical);
2657
2658 aliases.remove(&canonical);
2659 aliases.retain(|_, v| *v != canonical);
2660
2661 assert!(aliases.is_empty());
2662 }
2663
2664 #[rstest]
2665 fn test_cleanup_removes_from_all_caches() {
2666 let emitted_accepted: DashMap<VenueOrderId, ()> = DashMap::new();
2667 let order_state_cache: AHashMap<ClientOrderId, u32> = AHashMap::new();
2668 let active_orders: DashMap<ClientOrderId, ()> = DashMap::new();
2669 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2670 let fee_cache: AHashMap<ustr::Ustr, f64> = AHashMap::new();
2671 let filled_qty_cache: AHashMap<ustr::Ustr, f64> = AHashMap::new();
2672 let canonical = ClientOrderId::new("PARENT-001");
2673 let child = ClientOrderId::new("CHILD-001");
2674 let venue_id = VenueOrderId::new("VENUE-001");
2675
2676 emitted_accepted.insert(venue_id, ());
2677 let mut order_state = order_state_cache;
2678 order_state.insert(canonical, 1);
2679 active_orders.insert(canonical, ());
2680 aliases.insert(canonical, canonical);
2681 aliases.insert(child, canonical);
2682 let mut fees = fee_cache;
2683 fees.insert(venue_id.inner(), 0.001);
2684 let mut filled = filled_qty_cache;
2685 filled.insert(venue_id.inner(), 1.0);
2686
2687 emitted_accepted.remove(&venue_id);
2688 order_state.remove(&canonical);
2689 active_orders.remove(&canonical);
2690 aliases.remove(&canonical);
2691 aliases.retain(|_, v| *v != canonical);
2692 fees.remove(&venue_id.inner());
2693 filled.remove(&venue_id.inner());
2694
2695 assert!(emitted_accepted.is_empty());
2696 assert!(order_state.is_empty());
2697 assert!(active_orders.is_empty());
2698 assert!(aliases.is_empty());
2699 assert!(fees.is_empty());
2700 assert!(filled.is_empty());
2701 }
2702
2703 #[rstest]
2704 fn test_alias_registration_parent_with_child() {
2705 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2706 let parent = ClientOrderId::new("PARENT-001");
2707 let child = ClientOrderId::new("CHILD-001");
2708 aliases.insert(parent, parent);
2709 aliases.insert(child, parent);
2710
2711 assert_eq!(*aliases.get(&parent).unwrap(), parent);
2712 assert_eq!(*aliases.get(&child).unwrap(), parent);
2713 }
2714
2715 #[rstest]
2716 fn test_alias_registration_standalone_order() {
2717 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2718 let order_id = ClientOrderId::new("ORDER-001");
2719 aliases.insert(order_id, order_id);
2720
2721 assert_eq!(*aliases.get(&order_id).unwrap(), order_id);
2722 }
2723
2724 #[rstest]
2725 fn test_alias_lookup_returns_canonical() {
2726 let aliases: DashMap<ClientOrderId, ClientOrderId> = DashMap::new();
2727 let canonical = ClientOrderId::new("PARENT-001");
2728 let child = ClientOrderId::new("CHILD-001");
2729
2730 aliases.insert(canonical, canonical);
2731 aliases.insert(child, canonical);
2732
2733 let resolved = aliases.get(&child).map(|v| *v);
2734 assert_eq!(resolved, Some(canonical));
2735 }
2736
2737 #[rstest]
2738 fn test_handler_register_client_order_aliases_with_parent() {
2739 let (handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2740
2741 let child = Some(ClientOrderId::new("CHILD-001"));
2742 let parent = Some(ClientOrderId::new("PARENT-001"));
2743
2744 let result = handler.register_client_order_aliases(&child, &parent);
2745
2746 assert_eq!(result, Some(ClientOrderId::new("PARENT-001")));
2747 assert!(client_id_aliases.contains_key(&ClientOrderId::new("PARENT-001")));
2748 assert!(client_id_aliases.contains_key(&ClientOrderId::new("CHILD-001")));
2749 assert_eq!(
2750 *client_id_aliases
2751 .get(&ClientOrderId::new("CHILD-001"))
2752 .unwrap(),
2753 ClientOrderId::new("PARENT-001")
2754 );
2755 }
2756
2757 #[rstest]
2758 fn test_handler_register_client_order_aliases_without_parent() {
2759 let (handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2760
2761 let child = Some(ClientOrderId::new("ORDER-001"));
2762 let parent: Option<ClientOrderId> = None;
2763
2764 let result = handler.register_client_order_aliases(&child, &parent);
2765
2766 assert_eq!(result, Some(ClientOrderId::new("ORDER-001")));
2767 assert!(client_id_aliases.contains_key(&ClientOrderId::new("ORDER-001")));
2768 assert_eq!(
2769 *client_id_aliases
2770 .get(&ClientOrderId::new("ORDER-001"))
2771 .unwrap(),
2772 ClientOrderId::new("ORDER-001")
2773 );
2774 }
2775
2776 #[rstest]
2777 fn test_handler_cleanup_terminal_order_removes_all_state() {
2778 let (mut handler, _out_rx, active_client_orders, client_id_aliases) = create_test_handler();
2779
2780 let canonical = ClientOrderId::new("PARENT-001");
2781 let child = ClientOrderId::new("CHILD-001");
2782 let venue_id = VenueOrderId::new("VENUE-001");
2783 let trader_id = TraderId::new("TRADER-001");
2784 let strategy_id = StrategyId::new("STRATEGY-001");
2785 let instrument_id = InstrumentId::from("ETH-USDT-PERP.OKX");
2786
2787 active_client_orders.insert(canonical, (trader_id, strategy_id, instrument_id));
2788 client_id_aliases.insert(canonical, canonical);
2789 client_id_aliases.insert(child, canonical);
2790 handler
2791 .fee_cache
2792 .insert(venue_id.inner(), Money::from("0.001 USDT"));
2793 handler
2794 .filled_qty_cache
2795 .insert(venue_id.inner(), Quantity::from("1.0"));
2796 handler.order_state_cache.insert(
2797 canonical,
2798 OrderStateSnapshot {
2799 venue_order_id: venue_id,
2800 quantity: Quantity::from("1.0"),
2801 price: None,
2802 },
2803 );
2804
2805 handler.cleanup_terminal_order(&canonical, &venue_id);
2806
2807 assert!(!active_client_orders.contains_key(&canonical));
2808 assert!(!client_id_aliases.contains_key(&canonical));
2809 assert!(!client_id_aliases.contains_key(&child));
2810 assert!(!handler.fee_cache.contains_key(&venue_id.inner()));
2811 assert!(!handler.filled_qty_cache.contains_key(&venue_id.inner()));
2812 assert!(!handler.order_state_cache.contains_key(&canonical));
2813 }
2814
2815 #[rstest]
2816 fn test_handler_cleanup_terminal_order_removes_multiple_children() {
2817 let (mut handler, _out_rx, _active, client_id_aliases) = create_test_handler();
2818
2819 let canonical = ClientOrderId::new("PARENT-001");
2820 let child1 = ClientOrderId::new("CHILD-001");
2821 let child2 = ClientOrderId::new("CHILD-002");
2822 let child3 = ClientOrderId::new("CHILD-003");
2823 let venue_id = VenueOrderId::new("VENUE-001");
2824
2825 client_id_aliases.insert(canonical, canonical);
2826 client_id_aliases.insert(child1, canonical);
2827 client_id_aliases.insert(child2, canonical);
2828 client_id_aliases.insert(child3, canonical);
2829
2830 handler.cleanup_terminal_order(&canonical, &venue_id);
2831
2832 assert!(!client_id_aliases.contains_key(&canonical));
2833 assert!(!client_id_aliases.contains_key(&child1));
2834 assert!(!client_id_aliases.contains_key(&child2));
2835 assert!(!client_id_aliases.contains_key(&child3));
2836 assert!(client_id_aliases.is_empty());
2837 }
2838
2839 #[rstest]
2840 fn test_handler_cleanup_does_not_affect_other_orders() {
2841 let (mut handler, _out_rx, active_client_orders, client_id_aliases) = create_test_handler();
2842
2843 let canonical1 = ClientOrderId::new("PARENT-001");
2844 let child1 = ClientOrderId::new("CHILD-001");
2845 let venue_id1 = VenueOrderId::new("VENUE-001");
2846
2847 let canonical2 = ClientOrderId::new("PARENT-002");
2848 let child2 = ClientOrderId::new("CHILD-002");
2849 let venue_id2 = VenueOrderId::new("VENUE-002");
2850
2851 let trader_id = TraderId::new("TRADER-001");
2852 let strategy_id = StrategyId::new("STRATEGY-001");
2853 let instrument_id = InstrumentId::from("ETH-USDT-PERP.OKX");
2854
2855 active_client_orders.insert(canonical1, (trader_id, strategy_id, instrument_id));
2856 active_client_orders.insert(canonical2, (trader_id, strategy_id, instrument_id));
2857 client_id_aliases.insert(canonical1, canonical1);
2858 client_id_aliases.insert(child1, canonical1);
2859 client_id_aliases.insert(canonical2, canonical2);
2860 client_id_aliases.insert(child2, canonical2);
2861 handler
2862 .fee_cache
2863 .insert(venue_id1.inner(), Money::from("0.001 USDT"));
2864 handler
2865 .fee_cache
2866 .insert(venue_id2.inner(), Money::from("0.002 USDT"));
2867
2868 handler.cleanup_terminal_order(&canonical1, &venue_id1);
2869
2870 assert!(!active_client_orders.contains_key(&canonical1));
2871 assert!(!client_id_aliases.contains_key(&canonical1));
2872 assert!(!client_id_aliases.contains_key(&child1));
2873 assert!(!handler.fee_cache.contains_key(&venue_id1.inner()));
2874
2875 assert!(active_client_orders.contains_key(&canonical2));
2876 assert!(client_id_aliases.contains_key(&canonical2));
2877 assert!(client_id_aliases.contains_key(&child2));
2878 assert!(handler.fee_cache.contains_key(&venue_id2.inner()));
2879 }
2880
2881 mod channel_routing {
2886 use nautilus_core::nanos::UnixNanos;
2887 use nautilus_model::{
2888 identifiers::{InstrumentId, Symbol},
2889 instruments::{CryptoPerpetual, CurrencyPair, Instrument, InstrumentAny},
2890 types::{Currency, Price, Quantity},
2891 };
2892 use rstest::rstest;
2893 use ustr::Ustr;
2894
2895 use super::*;
2896 use crate::{
2897 common::{enums::OKXBookAction, testing::load_test_json},
2898 websocket::{enums::OKXWsChannel, messages::OKXWsMessage},
2899 };
2900
2901 fn create_spot_instrument() -> InstrumentAny {
2902 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2903 InstrumentAny::CurrencyPair(CurrencyPair::new(
2904 instrument_id,
2905 Symbol::from("BTC-USDT"),
2906 Currency::BTC(),
2907 Currency::USDT(),
2908 2,
2909 8,
2910 Price::from("0.01"),
2911 Quantity::from("0.00000001"),
2912 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
2925 UnixNanos::default(),
2926 ))
2927 }
2928
2929 fn create_swap_instrument() -> InstrumentAny {
2930 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2931 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2932 instrument_id,
2933 Symbol::from("BTC-USDT-SWAP"),
2934 Currency::BTC(),
2935 Currency::USDT(),
2936 Currency::USDT(),
2937 false,
2938 2,
2939 8,
2940 Price::from("0.01"),
2941 Quantity::from("0.00000001"),
2942 None,
2943 None,
2944 None,
2945 None,
2946 None,
2947 None,
2948 None,
2949 None,
2950 None,
2951 None,
2952 None,
2953 None,
2954 UnixNanos::default(),
2955 UnixNanos::default(),
2956 ))
2957 }
2958
2959 fn create_handler_with_instruments(instruments: Vec<InstrumentAny>) -> OKXWsFeedHandler {
2960 let (mut handler, _, _, _) = create_test_handler();
2961 for inst in instruments {
2962 handler
2963 .instruments_cache
2964 .insert(inst.symbol().inner(), inst);
2965 }
2966 handler
2967 }
2968
2969 #[rstest]
2970 fn test_parse_raw_message_ticker_channel() {
2971 let json = load_test_json("ws_tickers.json");
2972 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2973
2974 match msg {
2975 OKXWsMessage::Data { arg, data } => {
2976 assert!(
2977 matches!(arg.channel, OKXWsChannel::Tickers),
2978 "Expected Tickers channel"
2979 );
2980 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
2981 assert!(data.is_array());
2982 }
2983 _ => panic!("Expected OKXWsMessage::Data variant"),
2984 }
2985 }
2986
2987 #[rstest]
2988 fn test_parse_raw_message_trades_channel() {
2989 let json = load_test_json("ws_trades.json");
2990 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
2991
2992 match msg {
2993 OKXWsMessage::Data { arg, data } => {
2994 assert!(
2995 matches!(arg.channel, OKXWsChannel::Trades),
2996 "Expected Trades channel"
2997 );
2998 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USD")));
2999 assert!(data.is_array());
3000 }
3001 _ => panic!("Expected OKXWsMessage::Data variant"),
3002 }
3003 }
3004
3005 #[rstest]
3006 fn test_parse_raw_message_books_channel() {
3007 let json = load_test_json("ws_books_snapshot.json");
3008 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3009
3010 match msg {
3011 OKXWsMessage::BookData { arg, action, data } => {
3012 assert!(
3013 matches!(arg.channel, OKXWsChannel::Books),
3014 "Expected Books channel"
3015 );
3016 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
3017 assert!(
3018 matches!(action, OKXBookAction::Snapshot),
3019 "Expected snapshot action"
3020 );
3021 assert!(!data.is_empty());
3022 }
3023 _ => panic!("Expected OKXWsMessage::BookData variant"),
3024 }
3025 }
3026
3027 #[rstest]
3028 fn test_parse_raw_message_candle_channel() {
3029 let json = load_test_json("ws_candle.json");
3030 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3031
3032 match msg {
3033 OKXWsMessage::Data { arg, data } => {
3034 assert!(
3036 matches!(arg.channel, OKXWsChannel::Candle1Day),
3037 "Expected Candle1Day channel, got {:?}",
3038 arg.channel
3039 );
3040 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
3041 assert!(data.is_array());
3042 }
3043 _ => panic!("Expected OKXWsMessage::Data variant"),
3044 }
3045 }
3046
3047 #[rstest]
3048 fn test_parse_raw_message_funding_rate_channel() {
3049 let json = load_test_json("ws_funding_rate.json");
3050 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3051
3052 match msg {
3053 OKXWsMessage::Data { arg, data } => {
3054 assert!(
3055 matches!(arg.channel, OKXWsChannel::FundingRate),
3056 "Expected FundingRate channel"
3057 );
3058 assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT-SWAP")));
3059 assert!(data.is_array());
3060 }
3061 _ => panic!("Expected OKXWsMessage::Data variant"),
3062 }
3063 }
3064
3065 #[rstest]
3066 fn test_parse_raw_message_bbo_tbt_channel() {
3067 let json = load_test_json("ws_bbo_tbt.json");
3068 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3069
3070 match msg {
3071 OKXWsMessage::Data { arg, data } => {
3072 assert!(
3073 matches!(arg.channel, OKXWsChannel::BboTbt),
3074 "Expected BboTbt channel"
3075 );
3076 assert!(data.is_array());
3077 }
3078 _ => panic!("Expected OKXWsMessage::Data variant"),
3079 }
3080 }
3081
3082 #[rstest]
3083 fn test_handle_other_channel_data_tickers() {
3084 let mut handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3085 let json = load_test_json("ws_tickers.json");
3086 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3087
3088 let OKXWsMessage::Data { arg, data } = msg else {
3089 panic!("Expected OKXWsMessage::Data");
3090 };
3091
3092 let ts_init = UnixNanos::from(1_000_000_000u64);
3093 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3094
3095 assert!(result.is_some());
3096 match result.unwrap() {
3097 NautilusWsMessage::Data(payloads) => {
3098 assert!(!payloads.is_empty(), "Should produce data payloads");
3099 }
3100 other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3101 }
3102 }
3103
3104 #[rstest]
3105 fn test_handle_other_channel_data_trades() {
3106 let instrument_id = InstrumentId::from("BTC-USD.OKX");
3108 let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
3109 instrument_id,
3110 Symbol::from("BTC-USD"),
3111 Currency::BTC(),
3112 Currency::USD(),
3113 1,
3114 8,
3115 Price::from("0.1"),
3116 Quantity::from("0.00000001"),
3117 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
3130 UnixNanos::default(),
3131 ));
3132
3133 let mut handler = create_handler_with_instruments(vec![instrument]);
3134 let json = load_test_json("ws_trades.json");
3135 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3136
3137 let OKXWsMessage::Data { arg, data } = msg else {
3138 panic!("Expected OKXWsMessage::Data");
3139 };
3140
3141 let ts_init = UnixNanos::from(1_000_000_000u64);
3142 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3143
3144 assert!(result.is_some());
3145 match result.unwrap() {
3146 NautilusWsMessage::Data(payloads) => {
3147 assert!(!payloads.is_empty(), "Should produce trade data payloads");
3148 }
3149 other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3150 }
3151 }
3152
3153 #[rstest]
3154 fn test_handle_book_data_snapshot() {
3155 let handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3156 let json = load_test_json("ws_books_snapshot.json");
3157 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3158
3159 let OKXWsMessage::BookData { arg, action, data } = msg else {
3160 panic!("Expected OKXWsMessage::BookData");
3161 };
3162
3163 let ts_init = UnixNanos::from(1_000_000_000u64);
3164 let result = handler.handle_book_data(arg, action, data, ts_init);
3165
3166 assert!(result.is_some());
3167 match result.unwrap() {
3168 NautilusWsMessage::Data(payloads) => {
3169 assert!(!payloads.is_empty(), "Should produce order book payloads");
3170 }
3171 other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3172 }
3173 }
3174
3175 #[rstest]
3176 fn test_handle_book_data_update() {
3177 let handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3178 let json = load_test_json("ws_books_update.json");
3179 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3180
3181 let OKXWsMessage::BookData { arg, action, data } = msg else {
3182 panic!("Expected OKXWsMessage::BookData");
3183 };
3184
3185 let ts_init = UnixNanos::from(1_000_000_000u64);
3186 let result = handler.handle_book_data(arg, action, data, ts_init);
3187
3188 assert!(result.is_some());
3189 match result.unwrap() {
3190 NautilusWsMessage::Data(payloads) => {
3191 assert!(
3192 !payloads.is_empty(),
3193 "Should produce order book delta payloads"
3194 );
3195 }
3196 other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3197 }
3198 }
3199
3200 #[rstest]
3201 fn test_handle_other_channel_data_candles() {
3202 let mut handler = create_handler_with_instruments(vec![create_spot_instrument()]);
3203 let json = load_test_json("ws_candle.json");
3204 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3205
3206 let OKXWsMessage::Data { arg, data } = msg else {
3207 panic!("Expected OKXWsMessage::Data");
3208 };
3209
3210 let ts_init = UnixNanos::from(1_000_000_000u64);
3211 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3212
3213 assert!(result.is_some());
3214 match result.unwrap() {
3215 NautilusWsMessage::Data(payloads) => {
3216 assert!(!payloads.is_empty(), "Should produce bar data payloads");
3217 }
3218 other => panic!("Expected NautilusWsMessage::Data, got {other:?}"),
3219 }
3220 }
3221
3222 #[rstest]
3223 fn test_handle_other_channel_data_funding_rate() {
3224 let mut handler = create_handler_with_instruments(vec![create_swap_instrument()]);
3225 let json = load_test_json("ws_funding_rate.json");
3226 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3227
3228 let OKXWsMessage::Data { arg, data } = msg else {
3229 panic!("Expected OKXWsMessage::Data");
3230 };
3231
3232 let ts_init = UnixNanos::from(1_000_000_000u64);
3233 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3234
3235 assert!(result.is_none() || matches!(result, Some(NautilusWsMessage::FundingRates(_))));
3237 }
3238
3239 #[rstest]
3240 fn test_handle_account_data_parses_successfully() {
3241 let mut handler = create_handler_with_instruments(vec![]);
3242 let json = load_test_json("ws_account.json");
3243 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3244
3245 let OKXWsMessage::Data { data, .. } = msg else {
3246 panic!("Expected OKXWsMessage::Data");
3247 };
3248
3249 let ts_init = UnixNanos::from(1_000_000_000u64);
3250 let result = handler.handle_account_data(data, ts_init);
3251
3252 assert!(result.is_some());
3253 match result.unwrap() {
3254 NautilusWsMessage::AccountUpdate(account_state) => {
3255 assert!(
3256 !account_state.balances.is_empty(),
3257 "Should have balance data"
3258 );
3259 }
3260 other => panic!("Expected NautilusWsMessage::AccountUpdate, got {other:?}"),
3261 }
3262 }
3263
3264 #[rstest]
3265 fn test_handle_other_channel_data_missing_instrument() {
3266 let mut handler = create_handler_with_instruments(vec![]);
3267 let json = load_test_json("ws_tickers.json");
3268 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3269
3270 let OKXWsMessage::Data { arg, data } = msg else {
3271 panic!("Expected OKXWsMessage::Data");
3272 };
3273
3274 let ts_init = UnixNanos::from(1_000_000_000u64);
3275 let result = handler.handle_other_channel_data(arg.channel, arg.inst_id, data, ts_init);
3276
3277 assert!(result.is_none());
3279 }
3280
3281 #[rstest]
3282 fn test_handle_book_data_missing_instrument() {
3283 let handler = create_handler_with_instruments(vec![]);
3284 let json = load_test_json("ws_books_snapshot.json");
3285 let msg: OKXWsMessage = serde_json::from_str(&json).unwrap();
3286
3287 let OKXWsMessage::BookData { arg, action, data } = msg else {
3288 panic!("Expected OKXWsMessage::BookData");
3289 };
3290
3291 let ts_init = UnixNanos::from(1_000_000_000u64);
3292 let result = handler.handle_book_data(arg, action, data, ts_init);
3293
3294 assert!(result.is_none());
3296 }
3297 }
3298}