nautilus_okx/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the OKX adapter.
17
18use std::{cell::Ref, future::Future, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use chrono::{DateTime, Utc};
23use futures_util::{StreamExt, pin_mut};
24use nautilus_common::{
25    clock::Clock, messages::ExecutionEvent, msgbus, runner::get_exec_event_sender,
26    runtime::get_runtime,
27};
28use nautilus_core::UnixNanos;
29use nautilus_execution::client::{ExecutionClient, LiveExecutionClient, base::ExecutionClientCore};
30use nautilus_live::execution::LiveExecutionClientExt;
31use nautilus_model::{
32    enums::{AccountType, OrderType},
33    events::{AccountState, OrderEventAny},
34    identifiers::InstrumentId,
35    orders::Order,
36    reports::ExecutionMassStatus,
37};
38use tokio::task::JoinHandle;
39
40use crate::{
41    common::{
42        consts::{OKX_CONDITIONAL_ORDER_TYPES, OKX_VENUE},
43        enums::{OKXInstrumentType, OKXTradeMode},
44    },
45    config::OKXExecClientConfig,
46    http::client::OKXHttpClient,
47    websocket::{
48        client::OKXWebSocketClient,
49        messages::{ExecutionReport, NautilusWsMessage},
50    },
51};
52
53#[derive(Debug)]
54pub struct OKXExecutionClient {
55    core: ExecutionClientCore,
56    config: OKXExecClientConfig,
57    http_client: OKXHttpClient,
58    ws_client: OKXWebSocketClient,
59    trade_mode: OKXTradeMode,
60    started: bool,
61    connected: bool,
62    instruments_initialized: bool,
63    ws_stream_handle: Option<JoinHandle<()>>,
64    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
65}
66
67impl OKXExecutionClient {
68    /// Creates a new [`OKXExecutionClient`].
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if the client fails to initialize.
73    pub fn new(core: ExecutionClientCore, config: OKXExecClientConfig) -> anyhow::Result<Self> {
74        let http_client = if config.has_api_credentials() {
75            OKXHttpClient::with_credentials(
76                config.api_key.clone(),
77                config.api_secret.clone(),
78                config.api_passphrase.clone(),
79                config.base_url_http.clone(),
80                config.http_timeout_secs,
81                config.max_retries,
82                config.retry_delay_initial_ms,
83                config.retry_delay_max_ms,
84                config.is_demo,
85            )?
86        } else {
87            OKXHttpClient::new(
88                config.base_url_http.clone(),
89                config.http_timeout_secs,
90                config.max_retries,
91                config.retry_delay_initial_ms,
92                config.retry_delay_max_ms,
93                config.is_demo,
94            )?
95        };
96
97        let account_id = core.account_id;
98        let ws_client = OKXWebSocketClient::new(
99            Some(config.ws_private_url()),
100            config.api_key.clone(),
101            config.api_secret.clone(),
102            config.api_passphrase.clone(),
103            Some(account_id),
104            None,
105        )
106        .context("failed to construct OKX execution websocket client")?;
107
108        let trade_mode = match core.account_type {
109            AccountType::Cash => OKXTradeMode::Cash,
110            _ => OKXTradeMode::Isolated,
111        };
112
113        Ok(Self {
114            core,
115            config,
116            http_client,
117            ws_client,
118            trade_mode,
119            started: false,
120            connected: false,
121            instruments_initialized: false,
122            ws_stream_handle: None,
123            pending_tasks: Mutex::new(Vec::new()),
124        })
125    }
126
127    fn instrument_types(&self) -> Vec<OKXInstrumentType> {
128        if self.config.instrument_types.is_empty() {
129            vec![OKXInstrumentType::Spot]
130        } else {
131            self.config.instrument_types.clone()
132        }
133    }
134
135    async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
136        if self.instruments_initialized {
137            return Ok(());
138        }
139
140        let mut all_instruments = Vec::new();
141        for instrument_type in self.instrument_types() {
142            let instruments = self
143                .http_client
144                .request_instruments(instrument_type, None)
145                .await
146                .with_context(|| {
147                    format!("failed to request OKX instruments for {instrument_type:?}")
148                })?;
149
150            if instruments.is_empty() {
151                tracing::warn!("No instruments returned for {instrument_type:?}");
152                continue;
153            }
154
155            self.http_client.add_instruments(instruments.clone());
156            all_instruments.extend(instruments);
157        }
158
159        if all_instruments.is_empty() {
160            tracing::warn!(
161                "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
162            );
163        } else {
164            self.ws_client.initialize_instruments_cache(all_instruments);
165        }
166
167        self.instruments_initialized = true;
168        Ok(())
169    }
170
171    fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
172        if self.instruments_initialized {
173            return Ok(());
174        }
175
176        let runtime = get_runtime();
177        runtime.block_on(self.ensure_instruments_initialized_async())
178    }
179
180    async fn refresh_account_state(&self) -> anyhow::Result<()> {
181        let account_state = self
182            .http_client
183            .request_account_state(self.core.account_id)
184            .await
185            .context("failed to request OKX account state")?;
186
187        self.core.generate_account_state(
188            account_state.balances.clone(),
189            account_state.margins.clone(),
190            account_state.is_reported,
191            account_state.ts_event,
192        )
193    }
194
195    fn update_account_state(&self) -> anyhow::Result<()> {
196        let runtime = get_runtime();
197        runtime.block_on(self.refresh_account_state())
198    }
199
200    fn is_conditional_order(&self, order_type: OrderType) -> bool {
201        OKX_CONDITIONAL_ORDER_TYPES.contains(&order_type)
202    }
203
204    fn submit_regular_order(
205        &self,
206        cmd: &nautilus_common::messages::execution::SubmitOrder,
207    ) -> anyhow::Result<()> {
208        let order = cmd.order.clone();
209        let ws_client = self.ws_client.clone();
210        let trade_mode = self.trade_mode;
211
212        self.spawn_task("submit_order", async move {
213            ws_client
214                .submit_order(
215                    order.trader_id(),
216                    order.strategy_id(),
217                    order.instrument_id(),
218                    trade_mode,
219                    order.client_order_id(),
220                    order.order_side(),
221                    order.order_type(),
222                    order.quantity(),
223                    Some(order.time_in_force()),
224                    order.price(),
225                    order.trigger_price(),
226                    Some(order.is_post_only()),
227                    Some(order.is_reduce_only()),
228                    Some(order.is_quote_quantity()),
229                    None,
230                )
231                .await?;
232            Ok(())
233        });
234
235        Ok(())
236    }
237
238    fn submit_conditional_order(
239        &self,
240        cmd: &nautilus_common::messages::execution::SubmitOrder,
241    ) -> anyhow::Result<()> {
242        let order = cmd.order.clone();
243        let trigger_price = order
244            .trigger_price()
245            .ok_or_else(|| anyhow::anyhow!("conditional order requires a trigger price"))?;
246        let http_client = self.http_client.clone();
247        let trade_mode = self.trade_mode;
248
249        self.spawn_task("submit_algo_order", async move {
250            http_client
251                .place_algo_order_with_domain_types(
252                    order.instrument_id(),
253                    trade_mode,
254                    order.client_order_id(),
255                    order.order_side(),
256                    order.order_type(),
257                    order.quantity(),
258                    trigger_price,
259                    order.trigger_type(),
260                    order.price(),
261                    Some(order.is_reduce_only()),
262                )
263                .await?;
264            Ok(())
265        });
266
267        Ok(())
268    }
269
270    fn cancel_ws_order(
271        &self,
272        cmd: &nautilus_common::messages::execution::CancelOrder,
273    ) -> anyhow::Result<()> {
274        let ws_client = self.ws_client.clone();
275        let command = cmd.clone();
276
277        self.spawn_task("cancel_order", async move {
278            ws_client
279                .cancel_order(
280                    command.trader_id,
281                    command.strategy_id,
282                    command.instrument_id,
283                    Some(command.client_order_id),
284                    Some(command.venue_order_id),
285                )
286                .await?;
287            Ok(())
288        });
289
290        Ok(())
291    }
292
293    fn mass_cancel_instrument(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
294        let ws_client = self.ws_client.clone();
295        self.spawn_task("mass_cancel_orders", async move {
296            ws_client.mass_cancel_orders(instrument_id).await?;
297            Ok(())
298        });
299        Ok(())
300    }
301
302    fn spawn_task<F>(&self, description: &'static str, fut: F)
303    where
304        F: Future<Output = anyhow::Result<()>> + Send + 'static,
305    {
306        let runtime = get_runtime();
307        let handle = runtime.spawn(async move {
308            if let Err(err) = fut.await {
309                tracing::warn!("{description} failed: {err:?}");
310            }
311        });
312
313        let mut tasks = self.pending_tasks.lock().unwrap();
314        tasks.retain(|handle| !handle.is_finished());
315        tasks.push(handle);
316    }
317
318    fn abort_pending_tasks(&self) {
319        let mut tasks = self.pending_tasks.lock().unwrap();
320        for handle in tasks.drain(..) {
321            handle.abort();
322        }
323    }
324}
325
326impl ExecutionClient for OKXExecutionClient {
327    fn is_connected(&self) -> bool {
328        self.connected
329    }
330
331    fn client_id(&self) -> nautilus_model::identifiers::ClientId {
332        self.core.client_id
333    }
334
335    fn account_id(&self) -> nautilus_model::identifiers::AccountId {
336        self.core.account_id
337    }
338
339    fn venue(&self) -> nautilus_model::identifiers::Venue {
340        *OKX_VENUE
341    }
342
343    fn oms_type(&self) -> nautilus_model::enums::OmsType {
344        self.core.oms_type
345    }
346
347    fn get_account(&self) -> Option<nautilus_model::accounts::AccountAny> {
348        self.core.get_account()
349    }
350
351    fn generate_account_state(
352        &self,
353        balances: Vec<nautilus_model::types::AccountBalance>,
354        margins: Vec<nautilus_model::types::MarginBalance>,
355        reported: bool,
356        ts_event: nautilus_core::UnixNanos,
357    ) -> anyhow::Result<()> {
358        self.core
359            .generate_account_state(balances, margins, reported, ts_event)
360    }
361
362    fn start(&mut self) -> anyhow::Result<()> {
363        if self.started {
364            return Ok(());
365        }
366
367        self.ensure_instruments_initialized()?;
368        self.started = true;
369        tracing::info!(
370            client_id = %self.core.client_id,
371            account_id = %self.core.account_id,
372            account_type = ?self.core.account_type,
373            trade_mode = ?self.trade_mode,
374            instrument_types = ?self.config.instrument_types,
375            use_fills_channel = self.config.use_fills_channel,
376            is_demo = self.config.is_demo,
377            "OKX execution client started"
378        );
379        Ok(())
380    }
381
382    fn stop(&mut self) -> anyhow::Result<()> {
383        if !self.started {
384            return Ok(());
385        }
386
387        self.started = false;
388        self.connected = false;
389        if let Some(handle) = self.ws_stream_handle.take() {
390            handle.abort();
391        }
392        self.abort_pending_tasks();
393        tracing::info!("OKX execution client {} stopped", self.core.client_id);
394        Ok(())
395    }
396
397    fn submit_order(
398        &self,
399        cmd: &nautilus_common::messages::execution::SubmitOrder,
400    ) -> anyhow::Result<()> {
401        let order = &cmd.order;
402
403        if order.is_closed() {
404            tracing::warn!("Cannot submit closed order {}", order.client_order_id());
405            return Ok(());
406        }
407
408        self.core.generate_order_submitted(
409            order.strategy_id(),
410            order.instrument_id(),
411            order.client_order_id(),
412            cmd.ts_init,
413        );
414
415        let result = if self.is_conditional_order(order.order_type()) {
416            self.submit_conditional_order(cmd)
417        } else {
418            self.submit_regular_order(cmd)
419        };
420
421        if let Err(err) = result {
422            self.core.generate_order_rejected(
423                order.strategy_id(),
424                order.instrument_id(),
425                order.client_order_id(),
426                &format!("submit-order-error: {err}"),
427                cmd.ts_init,
428                false,
429            );
430            return Err(err);
431        }
432
433        Ok(())
434    }
435
436    fn submit_order_list(
437        &self,
438        cmd: &nautilus_common::messages::execution::SubmitOrderList,
439    ) -> anyhow::Result<()> {
440        tracing::warn!(
441            "submit_order_list not yet implemented for OKX execution client (got {} orders)",
442            cmd.order_list.orders.len()
443        );
444        Ok(())
445    }
446
447    fn modify_order(
448        &self,
449        cmd: &nautilus_common::messages::execution::ModifyOrder,
450    ) -> anyhow::Result<()> {
451        let ws_client = self.ws_client.clone();
452        let command = cmd.clone();
453
454        self.spawn_task("modify_order", async move {
455            ws_client
456                .modify_order(
457                    command.trader_id,
458                    command.strategy_id,
459                    command.instrument_id,
460                    Some(command.client_order_id),
461                    command.price,
462                    command.quantity,
463                    Some(command.venue_order_id),
464                )
465                .await?;
466            Ok(())
467        });
468
469        Ok(())
470    }
471
472    fn cancel_order(
473        &self,
474        cmd: &nautilus_common::messages::execution::CancelOrder,
475    ) -> anyhow::Result<()> {
476        self.cancel_ws_order(cmd)
477    }
478
479    fn cancel_all_orders(
480        &self,
481        cmd: &nautilus_common::messages::execution::CancelAllOrders,
482    ) -> anyhow::Result<()> {
483        self.mass_cancel_instrument(cmd.instrument_id)
484    }
485
486    fn batch_cancel_orders(
487        &self,
488        cmd: &nautilus_common::messages::execution::BatchCancelOrders,
489    ) -> anyhow::Result<()> {
490        let mut payload = Vec::with_capacity(cmd.cancels.len());
491
492        for cancel in &cmd.cancels {
493            payload.push((
494                OKXInstrumentType::Spot, // TODO: derive real instrument type
495                cancel.instrument_id,
496                Some(cancel.client_order_id),
497                Some(cancel.venue_order_id.as_str().to_string()),
498            ));
499        }
500
501        let ws_client = self.ws_client.clone();
502        self.spawn_task("batch_cancel_orders", async move {
503            ws_client.batch_cancel_orders(payload).await?;
504            Ok(())
505        });
506
507        Ok(())
508    }
509
510    fn query_account(
511        &self,
512        _cmd: &nautilus_common::messages::execution::QueryAccount,
513    ) -> anyhow::Result<()> {
514        self.update_account_state()
515    }
516
517    fn query_order(
518        &self,
519        cmd: &nautilus_common::messages::execution::QueryOrder,
520    ) -> anyhow::Result<()> {
521        tracing::debug!(
522            "query_order not implemented for OKX execution client (client_order_id={})",
523            cmd.client_order_id
524        );
525        Ok(())
526    }
527}
528
529#[async_trait(?Send)]
530impl LiveExecutionClient for OKXExecutionClient {
531    async fn connect(&mut self) -> anyhow::Result<()> {
532        if self.connected {
533            return Ok(());
534        }
535
536        self.ensure_instruments_initialized_async().await?;
537
538        // Query VIP level and update websocket client
539        if let Ok(Some(vip_level)) = self.http_client.request_vip_level().await {
540            self.ws_client.set_vip_level(vip_level);
541        }
542
543        self.ws_client.connect().await?;
544        self.ws_client.wait_until_active(10.0).await?;
545
546        for inst_type in self.instrument_types() {
547            tracing::info!(
548                "Subscribing to orders channel for instrument type: {:?}",
549                inst_type
550            );
551            self.ws_client.subscribe_orders(inst_type).await?;
552
553            // OKX doesn't support algo orders channel for OPTIONS
554            if inst_type != OKXInstrumentType::Option {
555                self.ws_client.subscribe_orders_algo(inst_type).await?;
556            }
557
558            if self.config.use_fills_channel
559                && let Err(err) = self.ws_client.subscribe_fills(inst_type).await
560            {
561                tracing::warn!("Failed to subscribe to fills channel ({inst_type:?}): {err}");
562            }
563        }
564
565        self.ws_client.subscribe_account().await?;
566
567        self.start_ws_stream()?;
568        self.refresh_account_state().await?;
569
570        self.connected = true;
571        tracing::info!("OKX execution client {} connected", self.core.client_id);
572        Ok(())
573    }
574
575    async fn disconnect(&mut self) -> anyhow::Result<()> {
576        if !self.connected {
577            return Ok(());
578        }
579
580        self.http_client.cancel_all_requests();
581        if let Err(err) = self.ws_client.close().await {
582            tracing::warn!("Error while closing OKX websocket: {err:?}");
583        }
584
585        if let Some(handle) = self.ws_stream_handle.take() {
586            handle.abort();
587        }
588
589        self.abort_pending_tasks();
590
591        self.connected = false;
592        tracing::info!("OKX execution client {} disconnected", self.core.client_id);
593        Ok(())
594    }
595
596    async fn generate_order_status_report(
597        &self,
598        cmd: &nautilus_common::messages::execution::GenerateOrderStatusReport,
599    ) -> anyhow::Result<Option<nautilus_model::reports::OrderStatusReport>> {
600        let Some(instrument_id) = cmd.instrument_id else {
601            tracing::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
602            return Ok(None);
603        };
604
605        let mut reports = self
606            .http_client
607            .request_order_status_reports(
608                self.core.account_id,
609                None,
610                Some(instrument_id),
611                None,
612                None,
613                false,
614                None,
615            )
616            .await?;
617
618        if let Some(client_order_id) = cmd.client_order_id {
619            reports.retain(|report| report.client_order_id == Some(client_order_id));
620        }
621
622        if let Some(venue_order_id) = cmd.venue_order_id {
623            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
624        }
625
626        Ok(reports.into_iter().next())
627    }
628
629    async fn generate_order_status_reports(
630        &self,
631        cmd: &nautilus_common::messages::execution::GenerateOrderStatusReport,
632    ) -> anyhow::Result<Vec<nautilus_model::reports::OrderStatusReport>> {
633        let mut reports = Vec::new();
634
635        if let Some(instrument_id) = cmd.instrument_id {
636            let mut fetched = self
637                .http_client
638                .request_order_status_reports(
639                    self.core.account_id,
640                    None,
641                    Some(instrument_id),
642                    None,
643                    None,
644                    false,
645                    None,
646                )
647                .await?;
648            reports.append(&mut fetched);
649        } else {
650            for inst_type in self.instrument_types() {
651                let mut fetched = self
652                    .http_client
653                    .request_order_status_reports(
654                        self.core.account_id,
655                        Some(inst_type),
656                        None,
657                        None,
658                        None,
659                        false,
660                        None,
661                    )
662                    .await?;
663                reports.append(&mut fetched);
664            }
665        }
666
667        if let Some(client_order_id) = cmd.client_order_id {
668            reports.retain(|report| report.client_order_id == Some(client_order_id));
669        }
670
671        if let Some(venue_order_id) = cmd.venue_order_id {
672            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
673        }
674
675        Ok(reports)
676    }
677
678    async fn generate_fill_reports(
679        &self,
680        cmd: nautilus_common::messages::execution::GenerateFillReports,
681    ) -> anyhow::Result<Vec<nautilus_model::reports::FillReport>> {
682        let start_dt = nanos_to_datetime(cmd.start);
683        let end_dt = nanos_to_datetime(cmd.end);
684        let mut reports = Vec::new();
685
686        if let Some(instrument_id) = cmd.instrument_id {
687            let mut fetched = self
688                .http_client
689                .request_fill_reports(
690                    self.core.account_id,
691                    None,
692                    Some(instrument_id),
693                    start_dt,
694                    end_dt,
695                    None,
696                )
697                .await?;
698            reports.append(&mut fetched);
699        } else {
700            for inst_type in self.instrument_types() {
701                let mut fetched = self
702                    .http_client
703                    .request_fill_reports(
704                        self.core.account_id,
705                        Some(inst_type),
706                        None,
707                        start_dt,
708                        end_dt,
709                        None,
710                    )
711                    .await?;
712                reports.append(&mut fetched);
713            }
714        }
715
716        if let Some(venue_order_id) = cmd.venue_order_id {
717            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
718        }
719
720        Ok(reports)
721    }
722
723    async fn generate_position_status_reports(
724        &self,
725        cmd: &nautilus_common::messages::execution::GeneratePositionReports,
726    ) -> anyhow::Result<Vec<nautilus_model::reports::PositionStatusReport>> {
727        let mut reports = Vec::new();
728
729        if let Some(instrument_id) = cmd.instrument_id {
730            let mut fetched = self
731                .http_client
732                .request_position_status_reports(self.core.account_id, None, Some(instrument_id))
733                .await?;
734            reports.append(&mut fetched);
735        } else {
736            for inst_type in self.instrument_types() {
737                let mut fetched = self
738                    .http_client
739                    .request_position_status_reports(self.core.account_id, Some(inst_type), None)
740                    .await?;
741                reports.append(&mut fetched);
742            }
743        }
744
745        let _ = nanos_to_datetime(cmd.start);
746        let _ = nanos_to_datetime(cmd.end);
747
748        Ok(reports)
749    }
750
751    async fn generate_mass_status(
752        &self,
753        lookback_mins: Option<u64>,
754    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
755        tracing::warn!(
756            "generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})"
757        );
758        Ok(None)
759    }
760}
761
762impl LiveExecutionClientExt for OKXExecutionClient {
763    fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ExecutionEvent> {
764        get_exec_event_sender()
765    }
766
767    fn get_clock(&self) -> Ref<'_, dyn Clock> {
768        self.core.clock().borrow()
769    }
770}
771
772impl OKXExecutionClient {
773    fn start_ws_stream(&mut self) -> anyhow::Result<()> {
774        if self.ws_stream_handle.is_some() {
775            return Ok(());
776        }
777
778        let stream = self.ws_client.stream();
779        let runtime = get_runtime();
780        let handle = runtime.spawn(async move {
781            pin_mut!(stream);
782            while let Some(message) = stream.next().await {
783                dispatch_ws_message(message);
784            }
785        });
786
787        self.ws_stream_handle = Some(handle);
788        Ok(())
789    }
790}
791
792fn dispatch_ws_message(message: NautilusWsMessage) {
793    match message {
794        NautilusWsMessage::AccountUpdate(state) => dispatch_account_state(state),
795        NautilusWsMessage::ExecutionReports(reports) => {
796            for report in reports {
797                dispatch_execution_report(report);
798            }
799        }
800        NautilusWsMessage::OrderRejected(event) => {
801            dispatch_order_event(OrderEventAny::Rejected(event));
802        }
803        NautilusWsMessage::OrderCancelRejected(event) => {
804            dispatch_order_event(OrderEventAny::CancelRejected(event));
805        }
806        NautilusWsMessage::OrderModifyRejected(event) => {
807            dispatch_order_event(OrderEventAny::ModifyRejected(event));
808        }
809        NautilusWsMessage::Error(err) => {
810            tracing::warn!(
811                "OKX websocket error: code={} message={} conn_id={:?}",
812                err.code,
813                err.message,
814                err.conn_id
815            );
816        }
817        NautilusWsMessage::Reconnected => {
818            tracing::info!("OKX websocket reconnected");
819        }
820        NautilusWsMessage::Deltas(_)
821        | NautilusWsMessage::Raw(_)
822        | NautilusWsMessage::Data(_)
823        | NautilusWsMessage::FundingRates(_)
824        | NautilusWsMessage::Instrument(_) => {
825            tracing::debug!("Ignoring OKX websocket data message");
826        }
827    }
828}
829
830fn dispatch_account_state(state: AccountState) {
831    msgbus::send_any(
832        "Portfolio.update_account".into(),
833        &state as &dyn std::any::Any,
834    );
835}
836
837fn dispatch_execution_report(report: ExecutionReport) {
838    let sender = get_exec_event_sender();
839    match report {
840        ExecutionReport::Order(order_report) => {
841            let exec_report =
842                nautilus_common::messages::ExecutionReport::OrderStatus(Box::new(order_report));
843            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
844                tracing::warn!("Failed to send order status report: {e}");
845            }
846        }
847        ExecutionReport::Fill(fill_report) => {
848            let exec_report =
849                nautilus_common::messages::ExecutionReport::Fill(Box::new(fill_report));
850            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
851                tracing::warn!("Failed to send fill report: {e}");
852            }
853        }
854    }
855}
856
857fn dispatch_order_event(event: OrderEventAny) {
858    let sender = get_exec_event_sender();
859    if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
860        tracing::warn!("Failed to send order event: {e}");
861    }
862}
863
864fn nanos_to_datetime(value: Option<UnixNanos>) -> Option<DateTime<Utc>> {
865    value.map(|nanos| nanos.to_datetime_utc())
866}