nautilus_okx/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the OKX adapter.
17
18use std::{cell::Ref, future::Future, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use chrono::{DateTime, Utc};
23use futures_util::{StreamExt, pin_mut};
24use nautilus_common::{
25    clock::Clock,
26    messages::{
27        ExecutionEvent,
28        execution::{
29            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
30            GenerateOrderStatusReport, GeneratePositionReports,
31        },
32    },
33    msgbus,
34    runner::get_exec_event_sender,
35    runtime::get_runtime,
36};
37use nautilus_core::{MUTEX_POISONED, UnixNanos};
38use nautilus_execution::client::{ExecutionClient, LiveExecutionClient, base::ExecutionClientCore};
39use nautilus_live::execution::LiveExecutionClientExt;
40use nautilus_model::{
41    accounts::AccountAny,
42    enums::{AccountType, OmsType, OrderType},
43    events::{AccountState, OrderEventAny},
44    identifiers::{AccountId, ClientId, InstrumentId, Venue},
45    orders::Order,
46    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
47    types::{AccountBalance, MarginBalance},
48};
49use tokio::task::JoinHandle;
50
51use crate::{
52    common::{
53        consts::{OKX_CONDITIONAL_ORDER_TYPES, OKX_VENUE},
54        enums::{OKXInstrumentType, OKXMarginMode, OKXTradeMode},
55    },
56    config::OKXExecClientConfig,
57    http::client::OKXHttpClient,
58    websocket::{
59        client::OKXWebSocketClient,
60        messages::{ExecutionReport, NautilusWsMessage},
61    },
62};
63
64#[derive(Debug)]
65pub struct OKXExecutionClient {
66    core: ExecutionClientCore,
67    config: OKXExecClientConfig,
68    http_client: OKXHttpClient,
69    ws_client: OKXWebSocketClient,
70    trade_mode: OKXTradeMode,
71    started: bool,
72    connected: bool,
73    instruments_initialized: bool,
74    ws_stream_handle: Option<JoinHandle<()>>,
75    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
76}
77
78impl OKXExecutionClient {
79    /// Creates a new [`OKXExecutionClient`].
80    ///
81    /// # Errors
82    ///
83    /// Returns an error if the client fails to initialize.
84    pub fn new(core: ExecutionClientCore, config: OKXExecClientConfig) -> anyhow::Result<Self> {
85        let http_client = if config.has_api_credentials() {
86            OKXHttpClient::with_credentials(
87                config.api_key.clone(),
88                config.api_secret.clone(),
89                config.api_passphrase.clone(),
90                config.base_url_http.clone(),
91                config.http_timeout_secs,
92                config.max_retries,
93                config.retry_delay_initial_ms,
94                config.retry_delay_max_ms,
95                config.is_demo,
96            )?
97        } else {
98            OKXHttpClient::new(
99                config.base_url_http.clone(),
100                config.http_timeout_secs,
101                config.max_retries,
102                config.retry_delay_initial_ms,
103                config.retry_delay_max_ms,
104                config.is_demo,
105            )?
106        };
107
108        let account_id = core.account_id;
109        let ws_client = OKXWebSocketClient::new(
110            Some(config.ws_private_url()),
111            config.api_key.clone(),
112            config.api_secret.clone(),
113            config.api_passphrase.clone(),
114            Some(account_id),
115            None,
116        )
117        .context("failed to construct OKX execution websocket client")?;
118
119        let trade_mode = Self::derive_trade_mode(core.account_type, &config);
120
121        Ok(Self {
122            core,
123            config,
124            http_client,
125            ws_client,
126            trade_mode,
127            started: false,
128            connected: false,
129            instruments_initialized: false,
130            ws_stream_handle: None,
131            pending_tasks: Mutex::new(Vec::new()),
132        })
133    }
134
135    fn derive_trade_mode(account_type: AccountType, config: &OKXExecClientConfig) -> OKXTradeMode {
136        let is_cross_margin = config.margin_mode == Some(OKXMarginMode::Cross);
137
138        if account_type == AccountType::Cash {
139            if !config.use_spot_margin {
140                return OKXTradeMode::Cash;
141            }
142            return if is_cross_margin {
143                OKXTradeMode::Cross
144            } else {
145                OKXTradeMode::Isolated
146            };
147        }
148
149        if is_cross_margin {
150            OKXTradeMode::Cross
151        } else {
152            OKXTradeMode::Isolated
153        }
154    }
155
156    fn instrument_types(&self) -> Vec<OKXInstrumentType> {
157        if self.config.instrument_types.is_empty() {
158            vec![OKXInstrumentType::Spot]
159        } else {
160            self.config.instrument_types.clone()
161        }
162    }
163
164    async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
165        if self.instruments_initialized {
166            return Ok(());
167        }
168
169        let mut all_instruments = Vec::new();
170        for instrument_type in self.instrument_types() {
171            let instruments = self
172                .http_client
173                .request_instruments(instrument_type, None)
174                .await
175                .with_context(|| {
176                    format!("failed to request OKX instruments for {instrument_type:?}")
177                })?;
178
179            if instruments.is_empty() {
180                tracing::warn!("No instruments returned for {instrument_type:?}");
181                continue;
182            }
183
184            self.http_client.add_instruments(instruments.clone());
185            all_instruments.extend(instruments);
186        }
187
188        if all_instruments.is_empty() {
189            tracing::warn!(
190                "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
191            );
192        } else {
193            self.ws_client.initialize_instruments_cache(all_instruments);
194        }
195
196        self.instruments_initialized = true;
197        Ok(())
198    }
199
200    fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
201        if self.instruments_initialized {
202            return Ok(());
203        }
204
205        let runtime = get_runtime();
206        runtime.block_on(self.ensure_instruments_initialized_async())
207    }
208
209    async fn refresh_account_state(&self) -> anyhow::Result<()> {
210        let account_state = self
211            .http_client
212            .request_account_state(self.core.account_id)
213            .await
214            .context("failed to request OKX account state")?;
215
216        self.core.generate_account_state(
217            account_state.balances.clone(),
218            account_state.margins.clone(),
219            account_state.is_reported,
220            account_state.ts_event,
221        )
222    }
223
224    fn update_account_state(&self) -> anyhow::Result<()> {
225        let runtime = get_runtime();
226        runtime.block_on(self.refresh_account_state())
227    }
228
229    fn is_conditional_order(&self, order_type: OrderType) -> bool {
230        OKX_CONDITIONAL_ORDER_TYPES.contains(&order_type)
231    }
232
233    fn submit_regular_order(
234        &self,
235        cmd: &nautilus_common::messages::execution::SubmitOrder,
236    ) -> anyhow::Result<()> {
237        let order = cmd.order.clone();
238        let ws_client = self.ws_client.clone();
239        let trade_mode = self.trade_mode;
240
241        self.spawn_task("submit_order", async move {
242            ws_client
243                .submit_order(
244                    order.trader_id(),
245                    order.strategy_id(),
246                    order.instrument_id(),
247                    trade_mode,
248                    order.client_order_id(),
249                    order.order_side(),
250                    order.order_type(),
251                    order.quantity(),
252                    Some(order.time_in_force()),
253                    order.price(),
254                    order.trigger_price(),
255                    Some(order.is_post_only()),
256                    Some(order.is_reduce_only()),
257                    Some(order.is_quote_quantity()),
258                    None,
259                )
260                .await?;
261            Ok(())
262        });
263
264        Ok(())
265    }
266
267    fn submit_conditional_order(
268        &self,
269        cmd: &nautilus_common::messages::execution::SubmitOrder,
270    ) -> anyhow::Result<()> {
271        let order = cmd.order.clone();
272        let trigger_price = order
273            .trigger_price()
274            .ok_or_else(|| anyhow::anyhow!("conditional order requires a trigger price"))?;
275        let http_client = self.http_client.clone();
276        let trade_mode = self.trade_mode;
277
278        self.spawn_task("submit_algo_order", async move {
279            http_client
280                .place_algo_order_with_domain_types(
281                    order.instrument_id(),
282                    trade_mode,
283                    order.client_order_id(),
284                    order.order_side(),
285                    order.order_type(),
286                    order.quantity(),
287                    trigger_price,
288                    order.trigger_type(),
289                    order.price(),
290                    Some(order.is_reduce_only()),
291                )
292                .await?;
293            Ok(())
294        });
295
296        Ok(())
297    }
298
299    fn cancel_ws_order(
300        &self,
301        cmd: &nautilus_common::messages::execution::CancelOrder,
302    ) -> anyhow::Result<()> {
303        let ws_client = self.ws_client.clone();
304        let command = cmd.clone();
305
306        self.spawn_task("cancel_order", async move {
307            ws_client
308                .cancel_order(
309                    command.trader_id,
310                    command.strategy_id,
311                    command.instrument_id,
312                    Some(command.client_order_id),
313                    Some(command.venue_order_id),
314                )
315                .await?;
316            Ok(())
317        });
318
319        Ok(())
320    }
321
322    fn mass_cancel_instrument(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
323        let ws_client = self.ws_client.clone();
324        self.spawn_task("mass_cancel_orders", async move {
325            ws_client.mass_cancel_orders(instrument_id).await?;
326            Ok(())
327        });
328        Ok(())
329    }
330
331    fn spawn_task<F>(&self, description: &'static str, fut: F)
332    where
333        F: Future<Output = anyhow::Result<()>> + Send + 'static,
334    {
335        let runtime = get_runtime();
336        let handle = runtime.spawn(async move {
337            if let Err(e) = fut.await {
338                tracing::warn!("{description} failed: {e:?}");
339            }
340        });
341
342        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
343        tasks.retain(|handle| !handle.is_finished());
344        tasks.push(handle);
345    }
346
347    fn abort_pending_tasks(&self) {
348        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
349        for handle in tasks.drain(..) {
350            handle.abort();
351        }
352    }
353}
354
355impl ExecutionClient for OKXExecutionClient {
356    fn is_connected(&self) -> bool {
357        self.connected
358    }
359
360    fn client_id(&self) -> ClientId {
361        self.core.client_id
362    }
363
364    fn account_id(&self) -> AccountId {
365        self.core.account_id
366    }
367
368    fn venue(&self) -> Venue {
369        *OKX_VENUE
370    }
371
372    fn oms_type(&self) -> OmsType {
373        self.core.oms_type
374    }
375
376    fn get_account(&self) -> Option<AccountAny> {
377        self.core.get_account()
378    }
379
380    fn generate_account_state(
381        &self,
382        balances: Vec<AccountBalance>,
383        margins: Vec<MarginBalance>,
384        reported: bool,
385        ts_event: UnixNanos,
386    ) -> anyhow::Result<()> {
387        self.core
388            .generate_account_state(balances, margins, reported, ts_event)
389    }
390
391    fn start(&mut self) -> anyhow::Result<()> {
392        if self.started {
393            return Ok(());
394        }
395
396        self.ensure_instruments_initialized()?;
397        self.started = true;
398        tracing::info!(
399            client_id = %self.core.client_id,
400            account_id = %self.core.account_id,
401            account_type = ?self.core.account_type,
402            trade_mode = ?self.trade_mode,
403            instrument_types = ?self.config.instrument_types,
404            use_fills_channel = self.config.use_fills_channel,
405            is_demo = self.config.is_demo,
406            "OKX execution client started"
407        );
408        Ok(())
409    }
410
411    fn stop(&mut self) -> anyhow::Result<()> {
412        if !self.started {
413            return Ok(());
414        }
415
416        self.started = false;
417        self.connected = false;
418        if let Some(handle) = self.ws_stream_handle.take() {
419            handle.abort();
420        }
421        self.abort_pending_tasks();
422        tracing::info!("OKX execution client {} stopped", self.core.client_id);
423        Ok(())
424    }
425
426    fn submit_order(
427        &self,
428        cmd: &nautilus_common::messages::execution::SubmitOrder,
429    ) -> anyhow::Result<()> {
430        let order = &cmd.order;
431
432        if order.is_closed() {
433            tracing::warn!("Cannot submit closed order {}", order.client_order_id());
434            return Ok(());
435        }
436
437        self.core.generate_order_submitted(
438            order.strategy_id(),
439            order.instrument_id(),
440            order.client_order_id(),
441            cmd.ts_init,
442        );
443
444        let result = if self.is_conditional_order(order.order_type()) {
445            self.submit_conditional_order(cmd)
446        } else {
447            self.submit_regular_order(cmd)
448        };
449
450        if let Err(e) = result {
451            self.core.generate_order_rejected(
452                order.strategy_id(),
453                order.instrument_id(),
454                order.client_order_id(),
455                &format!("submit-order-error: {e}"),
456                cmd.ts_init,
457                false,
458            );
459            return Err(e);
460        }
461
462        Ok(())
463    }
464
465    fn submit_order_list(
466        &self,
467        cmd: &nautilus_common::messages::execution::SubmitOrderList,
468    ) -> anyhow::Result<()> {
469        tracing::warn!(
470            "submit_order_list not yet implemented for OKX execution client (got {} orders)",
471            cmd.order_list.orders.len()
472        );
473        Ok(())
474    }
475
476    fn modify_order(
477        &self,
478        cmd: &nautilus_common::messages::execution::ModifyOrder,
479    ) -> anyhow::Result<()> {
480        let ws_client = self.ws_client.clone();
481        let command = cmd.clone();
482
483        self.spawn_task("modify_order", async move {
484            ws_client
485                .modify_order(
486                    command.trader_id,
487                    command.strategy_id,
488                    command.instrument_id,
489                    Some(command.client_order_id),
490                    command.price,
491                    command.quantity,
492                    Some(command.venue_order_id),
493                )
494                .await?;
495            Ok(())
496        });
497
498        Ok(())
499    }
500
501    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
502        self.cancel_ws_order(cmd)
503    }
504
505    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
506        self.mass_cancel_instrument(cmd.instrument_id)
507    }
508
509    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
510        let mut payload = Vec::with_capacity(cmd.cancels.len());
511
512        for cancel in &cmd.cancels {
513            payload.push((
514                cancel.instrument_id,
515                Some(cancel.client_order_id),
516                Some(cancel.venue_order_id),
517            ));
518        }
519
520        let ws_client = self.ws_client.clone();
521        self.spawn_task("batch_cancel_orders", async move {
522            ws_client.batch_cancel_orders(payload).await?;
523            Ok(())
524        });
525
526        Ok(())
527    }
528
529    fn query_account(
530        &self,
531        _cmd: &nautilus_common::messages::execution::QueryAccount,
532    ) -> anyhow::Result<()> {
533        self.update_account_state()
534    }
535
536    fn query_order(
537        &self,
538        cmd: &nautilus_common::messages::execution::QueryOrder,
539    ) -> anyhow::Result<()> {
540        tracing::debug!(
541            "query_order not implemented for OKX execution client (client_order_id={})",
542            cmd.client_order_id
543        );
544        Ok(())
545    }
546}
547
548#[async_trait(?Send)]
549impl LiveExecutionClient for OKXExecutionClient {
550    async fn connect(&mut self) -> anyhow::Result<()> {
551        if self.connected {
552            return Ok(());
553        }
554
555        self.ensure_instruments_initialized_async().await?;
556
557        self.ws_client.connect().await?;
558        self.ws_client.wait_until_active(10.0).await?;
559
560        for inst_type in self.instrument_types() {
561            tracing::info!(
562                "Subscribing to orders channel for instrument type: {:?}",
563                inst_type
564            );
565            self.ws_client.subscribe_orders(inst_type).await?;
566
567            // OKX doesn't support algo orders channel for OPTIONS
568            if inst_type != OKXInstrumentType::Option {
569                self.ws_client.subscribe_orders_algo(inst_type).await?;
570            }
571
572            if self.config.use_fills_channel
573                && let Err(e) = self.ws_client.subscribe_fills(inst_type).await
574            {
575                tracing::warn!("Failed to subscribe to fills channel ({inst_type:?}): {e}");
576            }
577        }
578
579        self.ws_client.subscribe_account().await?;
580
581        self.start_ws_stream()?;
582        self.refresh_account_state().await?;
583
584        self.connected = true;
585        tracing::info!("OKX execution client {} connected", self.core.client_id);
586
587        Ok(())
588    }
589
590    async fn disconnect(&mut self) -> anyhow::Result<()> {
591        if !self.connected {
592            return Ok(());
593        }
594
595        self.http_client.cancel_all_requests();
596        if let Err(e) = self.ws_client.close().await {
597            tracing::warn!("Error while closing OKX websocket: {e:?}");
598        }
599
600        if let Some(handle) = self.ws_stream_handle.take() {
601            handle.abort();
602        }
603
604        self.abort_pending_tasks();
605
606        self.connected = false;
607        tracing::info!("OKX execution client {} disconnected", self.core.client_id);
608        Ok(())
609    }
610
611    async fn generate_order_status_report(
612        &self,
613        cmd: &GenerateOrderStatusReport,
614    ) -> anyhow::Result<Option<OrderStatusReport>> {
615        let Some(instrument_id) = cmd.instrument_id else {
616            tracing::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
617            return Ok(None);
618        };
619
620        let mut reports = self
621            .http_client
622            .request_order_status_reports(
623                self.core.account_id,
624                None,
625                Some(instrument_id),
626                None,
627                None,
628                false,
629                None,
630            )
631            .await?;
632
633        if let Some(client_order_id) = cmd.client_order_id {
634            reports.retain(|report| report.client_order_id == Some(client_order_id));
635        }
636
637        if let Some(venue_order_id) = cmd.venue_order_id {
638            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
639        }
640
641        Ok(reports.into_iter().next())
642    }
643
644    async fn generate_order_status_reports(
645        &self,
646        cmd: &GenerateOrderStatusReport,
647    ) -> anyhow::Result<Vec<OrderStatusReport>> {
648        let mut reports = Vec::new();
649
650        if let Some(instrument_id) = cmd.instrument_id {
651            let mut fetched = self
652                .http_client
653                .request_order_status_reports(
654                    self.core.account_id,
655                    None,
656                    Some(instrument_id),
657                    None,
658                    None,
659                    false,
660                    None,
661                )
662                .await?;
663            reports.append(&mut fetched);
664        } else {
665            for inst_type in self.instrument_types() {
666                let mut fetched = self
667                    .http_client
668                    .request_order_status_reports(
669                        self.core.account_id,
670                        Some(inst_type),
671                        None,
672                        None,
673                        None,
674                        false,
675                        None,
676                    )
677                    .await?;
678                reports.append(&mut fetched);
679            }
680        }
681
682        if let Some(client_order_id) = cmd.client_order_id {
683            reports.retain(|report| report.client_order_id == Some(client_order_id));
684        }
685
686        if let Some(venue_order_id) = cmd.venue_order_id {
687            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
688        }
689
690        Ok(reports)
691    }
692
693    async fn generate_fill_reports(
694        &self,
695        cmd: GenerateFillReports,
696    ) -> anyhow::Result<Vec<FillReport>> {
697        let start_dt = nanos_to_datetime(cmd.start);
698        let end_dt = nanos_to_datetime(cmd.end);
699        let mut reports = Vec::new();
700
701        if let Some(instrument_id) = cmd.instrument_id {
702            let mut fetched = self
703                .http_client
704                .request_fill_reports(
705                    self.core.account_id,
706                    None,
707                    Some(instrument_id),
708                    start_dt,
709                    end_dt,
710                    None,
711                )
712                .await?;
713            reports.append(&mut fetched);
714        } else {
715            for inst_type in self.instrument_types() {
716                let mut fetched = self
717                    .http_client
718                    .request_fill_reports(
719                        self.core.account_id,
720                        Some(inst_type),
721                        None,
722                        start_dt,
723                        end_dt,
724                        None,
725                    )
726                    .await?;
727                reports.append(&mut fetched);
728            }
729        }
730
731        if let Some(venue_order_id) = cmd.venue_order_id {
732            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
733        }
734
735        Ok(reports)
736    }
737
738    async fn generate_position_status_reports(
739        &self,
740        cmd: &GeneratePositionReports,
741    ) -> anyhow::Result<Vec<PositionStatusReport>> {
742        let mut reports = Vec::new();
743
744        if let Some(instrument_id) = cmd.instrument_id {
745            let mut fetched = self
746                .http_client
747                .request_position_status_reports(self.core.account_id, None, Some(instrument_id))
748                .await?;
749            reports.append(&mut fetched);
750        } else {
751            for inst_type in self.instrument_types() {
752                let mut fetched = self
753                    .http_client
754                    .request_position_status_reports(self.core.account_id, Some(inst_type), None)
755                    .await?;
756                reports.append(&mut fetched);
757            }
758        }
759
760        let _ = nanos_to_datetime(cmd.start);
761        let _ = nanos_to_datetime(cmd.end);
762
763        Ok(reports)
764    }
765
766    async fn generate_mass_status(
767        &self,
768        lookback_mins: Option<u64>,
769    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
770        tracing::warn!(
771            "generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})"
772        );
773        Ok(None)
774    }
775}
776
777impl LiveExecutionClientExt for OKXExecutionClient {
778    fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ExecutionEvent> {
779        get_exec_event_sender()
780    }
781
782    fn get_clock(&self) -> Ref<'_, dyn Clock> {
783        self.core.clock().borrow()
784    }
785}
786
787impl OKXExecutionClient {
788    fn start_ws_stream(&mut self) -> anyhow::Result<()> {
789        if self.ws_stream_handle.is_some() {
790            return Ok(());
791        }
792
793        let stream = self.ws_client.stream();
794        let runtime = get_runtime();
795        let handle = runtime.spawn(async move {
796            pin_mut!(stream);
797            while let Some(message) = stream.next().await {
798                dispatch_ws_message(message);
799            }
800        });
801
802        self.ws_stream_handle = Some(handle);
803        Ok(())
804    }
805}
806
807fn dispatch_ws_message(message: NautilusWsMessage) {
808    match message {
809        NautilusWsMessage::AccountUpdate(state) => dispatch_account_state(state),
810        NautilusWsMessage::ExecutionReports(reports) => {
811            for report in reports {
812                dispatch_execution_report(report);
813            }
814        }
815        NautilusWsMessage::OrderRejected(event) => {
816            dispatch_order_event(OrderEventAny::Rejected(event));
817        }
818        NautilusWsMessage::OrderCancelRejected(event) => {
819            dispatch_order_event(OrderEventAny::CancelRejected(event));
820        }
821        NautilusWsMessage::OrderModifyRejected(event) => {
822            dispatch_order_event(OrderEventAny::ModifyRejected(event));
823        }
824        NautilusWsMessage::Error(e) => {
825            tracing::warn!(
826                "OKX websocket error: code={} message={} conn_id={:?}",
827                e.code,
828                e.message,
829                e.conn_id
830            );
831        }
832        NautilusWsMessage::Reconnected => {
833            tracing::info!("OKX websocket reconnected");
834        }
835        NautilusWsMessage::Deltas(_)
836        | NautilusWsMessage::Raw(_)
837        | NautilusWsMessage::Data(_)
838        | NautilusWsMessage::FundingRates(_)
839        | NautilusWsMessage::Instrument(_) => {
840            tracing::debug!("Ignoring OKX websocket data message");
841        }
842    }
843}
844
845fn dispatch_account_state(state: AccountState) {
846    msgbus::send_any(
847        "Portfolio.update_account".into(),
848        &state as &dyn std::any::Any,
849    );
850}
851
852fn dispatch_execution_report(report: ExecutionReport) {
853    let sender = get_exec_event_sender();
854    match report {
855        ExecutionReport::Order(order_report) => {
856            let exec_report =
857                nautilus_common::messages::ExecutionReport::OrderStatus(Box::new(order_report));
858            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
859                tracing::warn!("Failed to send order status report: {e}");
860            }
861        }
862        ExecutionReport::Fill(fill_report) => {
863            let exec_report =
864                nautilus_common::messages::ExecutionReport::Fill(Box::new(fill_report));
865            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
866                tracing::warn!("Failed to send fill report: {e}");
867            }
868        }
869    }
870}
871
872fn dispatch_order_event(event: OrderEventAny) {
873    let sender = get_exec_event_sender();
874    if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
875        tracing::warn!("Failed to send order event: {e}");
876    }
877}
878
879fn nanos_to_datetime(value: Option<UnixNanos>) -> Option<DateTime<Utc>> {
880    value.map(|nanos| nanos.to_datetime_utc())
881}
882
883#[cfg(test)]
884mod tests {
885    use nautilus_common::messages::execution::{BatchCancelOrders, CancelOrder};
886    use nautilus_core::UnixNanos;
887    use nautilus_model::identifiers::{
888        ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId,
889    };
890
891    #[test]
892    fn test_batch_cancel_orders_builds_payload() {
893        use nautilus_model::identifiers::ClientId;
894
895        let trader_id = TraderId::from("TRADER-001");
896        let strategy_id = StrategyId::from("STRATEGY-001");
897        let client_id = ClientId::from("OKX");
898        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
899        let client_order_id1 = ClientOrderId::new("order1");
900        let client_order_id2 = ClientOrderId::new("order2");
901        let venue_order_id1 = VenueOrderId::new("venue1");
902        let venue_order_id2 = VenueOrderId::new("venue2");
903
904        let cmd = BatchCancelOrders {
905            trader_id,
906            client_id,
907            strategy_id,
908            instrument_id,
909            cancels: vec![
910                CancelOrder {
911                    trader_id,
912                    client_id,
913                    strategy_id,
914                    instrument_id,
915                    client_order_id: client_order_id1,
916                    venue_order_id: venue_order_id1,
917                    command_id: Default::default(),
918                    ts_init: UnixNanos::default(),
919                },
920                CancelOrder {
921                    trader_id,
922                    client_id,
923                    strategy_id,
924                    instrument_id,
925                    client_order_id: client_order_id2,
926                    venue_order_id: venue_order_id2,
927                    command_id: Default::default(),
928                    ts_init: UnixNanos::default(),
929                },
930            ],
931            command_id: Default::default(),
932            ts_init: UnixNanos::default(),
933        };
934
935        // Verify we can build the payload structure
936        let mut payload = Vec::with_capacity(cmd.cancels.len());
937        for cancel in &cmd.cancels {
938            payload.push((
939                cancel.instrument_id,
940                Some(cancel.client_order_id),
941                Some(cancel.venue_order_id),
942            ));
943        }
944
945        assert_eq!(payload.len(), 2);
946        assert_eq!(payload[0].0, instrument_id);
947        assert_eq!(payload[0].1, Some(client_order_id1));
948        assert_eq!(payload[0].2, Some(venue_order_id1));
949        assert_eq!(payload[1].0, instrument_id);
950        assert_eq!(payload[1].1, Some(client_order_id2));
951        assert_eq!(payload[1].2, Some(venue_order_id2));
952    }
953
954    #[test]
955    fn test_batch_cancel_orders_with_empty_cancels() {
956        use nautilus_model::identifiers::ClientId;
957
958        let cmd = BatchCancelOrders {
959            trader_id: TraderId::from("TRADER-001"),
960            client_id: ClientId::from("OKX"),
961            strategy_id: StrategyId::from("STRATEGY-001"),
962            instrument_id: InstrumentId::from("BTC-USDT.OKX"),
963            cancels: vec![],
964            command_id: Default::default(),
965            ts_init: UnixNanos::default(),
966        };
967
968        let payload: Vec<(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>)> =
969            Vec::with_capacity(cmd.cancels.len());
970        assert_eq!(payload.len(), 0);
971    }
972}