nautilus_okx/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the OKX adapter.
17
18use std::{future::Future, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use chrono::{DateTime, Utc};
23use futures_util::{StreamExt, pin_mut};
24use nautilus_common::{
25    messages::{
26        ExecutionEvent,
27        execution::{
28            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
29            GenerateOrderStatusReport, GeneratePositionReports,
30        },
31    },
32    msgbus,
33    runner::get_exec_event_sender,
34    runtime::get_runtime,
35};
36use nautilus_core::{MUTEX_POISONED, UnixNanos};
37use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
38use nautilus_live::execution::client::LiveExecutionClient;
39use nautilus_model::{
40    accounts::AccountAny,
41    enums::{AccountType, OmsType, OrderType},
42    events::{AccountState, OrderEventAny},
43    identifiers::{AccountId, ClientId, InstrumentId, Venue},
44    orders::Order,
45    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
46    types::{AccountBalance, MarginBalance},
47};
48use tokio::task::JoinHandle;
49
50use crate::{
51    common::{
52        consts::{OKX_CONDITIONAL_ORDER_TYPES, OKX_VENUE},
53        enums::{OKXInstrumentType, OKXMarginMode, OKXTradeMode},
54    },
55    config::OKXExecClientConfig,
56    http::client::OKXHttpClient,
57    websocket::{
58        client::OKXWebSocketClient,
59        messages::{ExecutionReport, NautilusWsMessage},
60    },
61};
62
63#[derive(Debug)]
64pub struct OKXExecutionClient {
65    core: ExecutionClientCore,
66    config: OKXExecClientConfig,
67    http_client: OKXHttpClient,
68    ws_private: OKXWebSocketClient,
69    ws_business: OKXWebSocketClient,
70    trade_mode: OKXTradeMode,
71    started: bool,
72    connected: bool,
73    instruments_initialized: bool,
74    ws_stream_handle: Option<JoinHandle<()>>,
75    ws_business_stream_handle: Option<JoinHandle<()>>,
76    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
77}
78
79impl OKXExecutionClient {
80    /// Creates a new [`OKXExecutionClient`].
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if the client fails to initialize.
85    pub fn new(core: ExecutionClientCore, config: OKXExecClientConfig) -> anyhow::Result<Self> {
86        let http_client = if config.has_api_credentials() {
87            OKXHttpClient::with_credentials(
88                config.api_key.clone(),
89                config.api_secret.clone(),
90                config.api_passphrase.clone(),
91                config.base_url_http.clone(),
92                config.http_timeout_secs,
93                config.max_retries,
94                config.retry_delay_initial_ms,
95                config.retry_delay_max_ms,
96                config.is_demo,
97                config.http_proxy_url.clone(),
98            )?
99        } else {
100            OKXHttpClient::new(
101                config.base_url_http.clone(),
102                config.http_timeout_secs,
103                config.max_retries,
104                config.retry_delay_initial_ms,
105                config.retry_delay_max_ms,
106                config.is_demo,
107                config.http_proxy_url.clone(),
108            )?
109        };
110
111        let account_id = core.account_id;
112        let ws_private = OKXWebSocketClient::new(
113            Some(config.ws_private_url()),
114            config.api_key.clone(),
115            config.api_secret.clone(),
116            config.api_passphrase.clone(),
117            Some(account_id),
118            Some(20), // Heartbeat
119        )
120        .context("failed to construct OKX private websocket client")?;
121
122        let ws_business = OKXWebSocketClient::new(
123            Some(config.ws_business_url()),
124            config.api_key.clone(),
125            config.api_secret.clone(),
126            config.api_passphrase.clone(),
127            Some(account_id),
128            Some(20), // Heartbeat
129        )
130        .context("failed to construct OKX business websocket client")?;
131
132        let trade_mode = Self::derive_trade_mode(core.account_type, &config);
133
134        Ok(Self {
135            core,
136            config,
137            http_client,
138            ws_private,
139            ws_business,
140            trade_mode,
141            started: false,
142            connected: false,
143            instruments_initialized: false,
144            ws_stream_handle: None,
145            ws_business_stream_handle: None,
146            pending_tasks: Mutex::new(Vec::new()),
147        })
148    }
149
150    fn derive_trade_mode(account_type: AccountType, config: &OKXExecClientConfig) -> OKXTradeMode {
151        let is_cross_margin = config.margin_mode == Some(OKXMarginMode::Cross);
152
153        if account_type == AccountType::Cash {
154            if !config.use_spot_margin {
155                return OKXTradeMode::Cash;
156            }
157            return if is_cross_margin {
158                OKXTradeMode::Cross
159            } else {
160                OKXTradeMode::Isolated
161            };
162        }
163
164        if is_cross_margin {
165            OKXTradeMode::Cross
166        } else {
167            OKXTradeMode::Isolated
168        }
169    }
170
171    fn instrument_types(&self) -> Vec<OKXInstrumentType> {
172        if self.config.instrument_types.is_empty() {
173            vec![OKXInstrumentType::Spot]
174        } else {
175            self.config.instrument_types.clone()
176        }
177    }
178
179    async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
180        if self.instruments_initialized {
181            return Ok(());
182        }
183
184        let mut all_instruments = Vec::new();
185        for instrument_type in self.instrument_types() {
186            let instruments = self
187                .http_client
188                .request_instruments(instrument_type, None)
189                .await
190                .with_context(|| {
191                    format!("failed to request OKX instruments for {instrument_type:?}")
192                })?;
193
194            if instruments.is_empty() {
195                tracing::warn!("No instruments returned for {instrument_type:?}");
196                continue;
197            }
198
199            self.http_client.cache_instruments(instruments.clone());
200            all_instruments.extend(instruments);
201        }
202
203        if all_instruments.is_empty() {
204            tracing::warn!(
205                "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
206            );
207        } else {
208            self.ws_private.cache_instruments(all_instruments);
209        }
210
211        self.instruments_initialized = true;
212        Ok(())
213    }
214
215    fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
216        if self.instruments_initialized {
217            return Ok(());
218        }
219
220        let runtime = get_runtime();
221        runtime.block_on(self.ensure_instruments_initialized_async())
222    }
223
224    async fn refresh_account_state(&self) -> anyhow::Result<()> {
225        let account_state = self
226            .http_client
227            .request_account_state(self.core.account_id)
228            .await
229            .context("failed to request OKX account state")?;
230
231        self.core.generate_account_state(
232            account_state.balances.clone(),
233            account_state.margins.clone(),
234            account_state.is_reported,
235            account_state.ts_event,
236        )
237    }
238
239    fn update_account_state(&self) -> anyhow::Result<()> {
240        let runtime = get_runtime();
241        runtime.block_on(self.refresh_account_state())
242    }
243
244    fn is_conditional_order(&self, order_type: OrderType) -> bool {
245        OKX_CONDITIONAL_ORDER_TYPES.contains(&order_type)
246    }
247
248    fn submit_regular_order(
249        &self,
250        cmd: &nautilus_common::messages::execution::SubmitOrder,
251    ) -> anyhow::Result<()> {
252        let order = cmd.order.clone();
253        let ws_private = self.ws_private.clone();
254        let trade_mode = self.trade_mode;
255
256        self.spawn_task("submit_order", async move {
257            ws_private
258                .submit_order(
259                    order.trader_id(),
260                    order.strategy_id(),
261                    order.instrument_id(),
262                    trade_mode,
263                    order.client_order_id(),
264                    order.order_side(),
265                    order.order_type(),
266                    order.quantity(),
267                    Some(order.time_in_force()),
268                    order.price(),
269                    order.trigger_price(),
270                    Some(order.is_post_only()),
271                    Some(order.is_reduce_only()),
272                    Some(order.is_quote_quantity()),
273                    None,
274                )
275                .await?;
276            Ok(())
277        });
278
279        Ok(())
280    }
281
282    fn submit_conditional_order(
283        &self,
284        cmd: &nautilus_common::messages::execution::SubmitOrder,
285    ) -> anyhow::Result<()> {
286        let order = cmd.order.clone();
287        let trigger_price = order
288            .trigger_price()
289            .ok_or_else(|| anyhow::anyhow!("conditional order requires a trigger price"))?;
290        let http_client = self.http_client.clone();
291        let trade_mode = self.trade_mode;
292
293        self.spawn_task("submit_algo_order", async move {
294            http_client
295                .place_algo_order_with_domain_types(
296                    order.instrument_id(),
297                    trade_mode,
298                    order.client_order_id(),
299                    order.order_side(),
300                    order.order_type(),
301                    order.quantity(),
302                    trigger_price,
303                    order.trigger_type(),
304                    order.price(),
305                    Some(order.is_reduce_only()),
306                )
307                .await?;
308            Ok(())
309        });
310
311        Ok(())
312    }
313
314    fn cancel_ws_order(
315        &self,
316        cmd: &nautilus_common::messages::execution::CancelOrder,
317    ) -> anyhow::Result<()> {
318        let ws_private = self.ws_private.clone();
319        let command = cmd.clone();
320
321        self.spawn_task("cancel_order", async move {
322            ws_private
323                .cancel_order(
324                    command.trader_id,
325                    command.strategy_id,
326                    command.instrument_id,
327                    Some(command.client_order_id),
328                    Some(command.venue_order_id),
329                )
330                .await?;
331            Ok(())
332        });
333
334        Ok(())
335    }
336
337    fn mass_cancel_instrument(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
338        let ws_private = self.ws_private.clone();
339        self.spawn_task("mass_cancel_orders", async move {
340            ws_private.mass_cancel_orders(instrument_id).await?;
341            Ok(())
342        });
343        Ok(())
344    }
345
346    fn spawn_task<F>(&self, description: &'static str, fut: F)
347    where
348        F: Future<Output = anyhow::Result<()>> + Send + 'static,
349    {
350        let runtime = get_runtime();
351        let handle = runtime.spawn(async move {
352            if let Err(e) = fut.await {
353                tracing::warn!("{description} failed: {e:?}");
354            }
355        });
356
357        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
358        tasks.retain(|handle| !handle.is_finished());
359        tasks.push(handle);
360    }
361
362    fn abort_pending_tasks(&self) {
363        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
364        for handle in tasks.drain(..) {
365            handle.abort();
366        }
367    }
368}
369
370impl ExecutionClient for OKXExecutionClient {
371    fn is_connected(&self) -> bool {
372        self.connected
373    }
374
375    fn client_id(&self) -> ClientId {
376        self.core.client_id
377    }
378
379    fn account_id(&self) -> AccountId {
380        self.core.account_id
381    }
382
383    fn venue(&self) -> Venue {
384        *OKX_VENUE
385    }
386
387    fn oms_type(&self) -> OmsType {
388        self.core.oms_type
389    }
390
391    fn get_account(&self) -> Option<AccountAny> {
392        self.core.get_account()
393    }
394
395    fn generate_account_state(
396        &self,
397        balances: Vec<AccountBalance>,
398        margins: Vec<MarginBalance>,
399        reported: bool,
400        ts_event: UnixNanos,
401    ) -> anyhow::Result<()> {
402        self.core
403            .generate_account_state(balances, margins, reported, ts_event)
404    }
405
406    fn start(&mut self) -> anyhow::Result<()> {
407        if self.started {
408            return Ok(());
409        }
410
411        self.ensure_instruments_initialized()?;
412        self.started = true;
413        tracing::info!(
414            client_id = %self.core.client_id,
415            account_id = %self.core.account_id,
416            account_type = ?self.core.account_type,
417            trade_mode = ?self.trade_mode,
418            instrument_types = ?self.config.instrument_types,
419            use_fills_channel = self.config.use_fills_channel,
420            is_demo = self.config.is_demo,
421            http_proxy_url = ?self.config.http_proxy_url,
422            ws_proxy_url = ?self.config.ws_proxy_url,
423            "OKX execution client started"
424        );
425        Ok(())
426    }
427
428    fn stop(&mut self) -> anyhow::Result<()> {
429        if !self.started {
430            return Ok(());
431        }
432
433        self.started = false;
434        self.connected = false;
435        if let Some(handle) = self.ws_stream_handle.take() {
436            handle.abort();
437        }
438        self.abort_pending_tasks();
439        tracing::info!("OKX execution client {} stopped", self.core.client_id);
440        Ok(())
441    }
442
443    fn submit_order(
444        &self,
445        cmd: &nautilus_common::messages::execution::SubmitOrder,
446    ) -> anyhow::Result<()> {
447        let order = &cmd.order;
448
449        if order.is_closed() {
450            tracing::warn!("Cannot submit closed order {}", order.client_order_id());
451            return Ok(());
452        }
453
454        self.core.generate_order_submitted(
455            order.strategy_id(),
456            order.instrument_id(),
457            order.client_order_id(),
458            cmd.ts_init,
459        );
460
461        let result = if self.is_conditional_order(order.order_type()) {
462            self.submit_conditional_order(cmd)
463        } else {
464            self.submit_regular_order(cmd)
465        };
466
467        if let Err(e) = result {
468            self.core.generate_order_rejected(
469                order.strategy_id(),
470                order.instrument_id(),
471                order.client_order_id(),
472                &format!("submit-order-error: {e}"),
473                cmd.ts_init,
474                false,
475            );
476            return Err(e);
477        }
478
479        Ok(())
480    }
481
482    fn submit_order_list(
483        &self,
484        cmd: &nautilus_common::messages::execution::SubmitOrderList,
485    ) -> anyhow::Result<()> {
486        tracing::warn!(
487            "submit_order_list not yet implemented for OKX execution client (got {} orders)",
488            cmd.order_list.orders.len()
489        );
490        Ok(())
491    }
492
493    fn modify_order(
494        &self,
495        cmd: &nautilus_common::messages::execution::ModifyOrder,
496    ) -> anyhow::Result<()> {
497        let ws_private = self.ws_private.clone();
498        let command = cmd.clone();
499
500        self.spawn_task("modify_order", async move {
501            ws_private
502                .modify_order(
503                    command.trader_id,
504                    command.strategy_id,
505                    command.instrument_id,
506                    Some(command.client_order_id),
507                    command.price,
508                    command.quantity,
509                    Some(command.venue_order_id),
510                )
511                .await?;
512            Ok(())
513        });
514
515        Ok(())
516    }
517
518    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
519        self.cancel_ws_order(cmd)
520    }
521
522    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
523        self.mass_cancel_instrument(cmd.instrument_id)
524    }
525
526    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
527        let mut payload = Vec::with_capacity(cmd.cancels.len());
528
529        for cancel in &cmd.cancels {
530            payload.push((
531                cancel.instrument_id,
532                Some(cancel.client_order_id),
533                Some(cancel.venue_order_id),
534            ));
535        }
536
537        let ws_private = self.ws_private.clone();
538        self.spawn_task("batch_cancel_orders", async move {
539            ws_private.batch_cancel_orders(payload).await?;
540            Ok(())
541        });
542
543        Ok(())
544    }
545
546    fn query_account(
547        &self,
548        _cmd: &nautilus_common::messages::execution::QueryAccount,
549    ) -> anyhow::Result<()> {
550        self.update_account_state()
551    }
552
553    fn query_order(
554        &self,
555        cmd: &nautilus_common::messages::execution::QueryOrder,
556    ) -> anyhow::Result<()> {
557        tracing::debug!(
558            "query_order not implemented for OKX execution client (client_order_id={})",
559            cmd.client_order_id
560        );
561        Ok(())
562    }
563}
564
565#[async_trait(?Send)]
566impl LiveExecutionClient for OKXExecutionClient {
567    fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ExecutionEvent> {
568        get_exec_event_sender()
569    }
570
571    fn get_clock(&self) -> std::cell::Ref<'_, dyn nautilus_common::clock::Clock> {
572        self.core.clock().borrow()
573    }
574
575    async fn connect(&mut self) -> anyhow::Result<()> {
576        if self.connected {
577            return Ok(());
578        }
579
580        self.ensure_instruments_initialized_async().await?;
581
582        self.ws_private.connect().await?;
583        self.ws_private.wait_until_active(10.0).await?;
584
585        for inst_type in self.instrument_types() {
586            tracing::info!(
587                "Subscribing to channels for instrument type: {:?}",
588                inst_type
589            );
590            self.ws_private.subscribe_orders(inst_type).await?;
591
592            // OKX doesn't support algo orders channel for OPTIONS
593            if inst_type != OKXInstrumentType::Option {
594                self.ws_private.subscribe_orders_algo(inst_type).await?;
595            }
596
597            if self.config.use_fills_channel
598                && let Err(e) = self.ws_private.subscribe_fills(inst_type).await
599            {
600                tracing::warn!("Failed to subscribe to fills channel ({inst_type:?}): {e}");
601            }
602        }
603
604        self.ws_private.subscribe_account().await?;
605
606        self.ws_business.connect().await?;
607        self.ws_business.wait_until_active(10.0).await?;
608
609        self.start_ws_stream()?;
610        self.start_ws_business_stream()?;
611        self.refresh_account_state().await?;
612
613        self.connected = true;
614        tracing::info!(client_id = %self.core.client_id, "Connected");
615
616        Ok(())
617    }
618
619    async fn disconnect(&mut self) -> anyhow::Result<()> {
620        if !self.connected {
621            return Ok(());
622        }
623
624        self.http_client.cancel_all_requests();
625        if let Err(e) = self.ws_private.close().await {
626            tracing::warn!("Error while closing OKX private websocket: {e:?}");
627        }
628
629        if let Err(e) = self.ws_business.close().await {
630            tracing::warn!("Error while closing OKX business websocket: {e:?}");
631        }
632
633        if let Some(handle) = self.ws_stream_handle.take() {
634            handle.abort();
635        }
636
637        if let Some(handle) = self.ws_business_stream_handle.take() {
638            handle.abort();
639        }
640
641        self.abort_pending_tasks();
642
643        self.connected = false;
644        tracing::info!(client_id = %self.core.client_id, "Disconnected");
645        Ok(())
646    }
647
648    async fn generate_order_status_report(
649        &self,
650        cmd: &GenerateOrderStatusReport,
651    ) -> anyhow::Result<Option<OrderStatusReport>> {
652        let Some(instrument_id) = cmd.instrument_id else {
653            tracing::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
654            return Ok(None);
655        };
656
657        let mut reports = self
658            .http_client
659            .request_order_status_reports(
660                self.core.account_id,
661                None,
662                Some(instrument_id),
663                None,
664                None,
665                false,
666                None,
667            )
668            .await?;
669
670        if let Some(client_order_id) = cmd.client_order_id {
671            reports.retain(|report| report.client_order_id == Some(client_order_id));
672        }
673
674        if let Some(venue_order_id) = cmd.venue_order_id {
675            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
676        }
677
678        Ok(reports.into_iter().next())
679    }
680
681    async fn generate_order_status_reports(
682        &self,
683        cmd: &GenerateOrderStatusReport,
684    ) -> anyhow::Result<Vec<OrderStatusReport>> {
685        let mut reports = Vec::new();
686
687        if let Some(instrument_id) = cmd.instrument_id {
688            let mut fetched = self
689                .http_client
690                .request_order_status_reports(
691                    self.core.account_id,
692                    None,
693                    Some(instrument_id),
694                    None,
695                    None,
696                    false,
697                    None,
698                )
699                .await?;
700            reports.append(&mut fetched);
701        } else {
702            for inst_type in self.instrument_types() {
703                let mut fetched = self
704                    .http_client
705                    .request_order_status_reports(
706                        self.core.account_id,
707                        Some(inst_type),
708                        None,
709                        None,
710                        None,
711                        false,
712                        None,
713                    )
714                    .await?;
715                reports.append(&mut fetched);
716            }
717        }
718
719        if let Some(client_order_id) = cmd.client_order_id {
720            reports.retain(|report| report.client_order_id == Some(client_order_id));
721        }
722
723        if let Some(venue_order_id) = cmd.venue_order_id {
724            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
725        }
726
727        Ok(reports)
728    }
729
730    async fn generate_fill_reports(
731        &self,
732        cmd: GenerateFillReports,
733    ) -> anyhow::Result<Vec<FillReport>> {
734        let start_dt = nanos_to_datetime(cmd.start);
735        let end_dt = nanos_to_datetime(cmd.end);
736        let mut reports = Vec::new();
737
738        if let Some(instrument_id) = cmd.instrument_id {
739            let mut fetched = self
740                .http_client
741                .request_fill_reports(
742                    self.core.account_id,
743                    None,
744                    Some(instrument_id),
745                    start_dt,
746                    end_dt,
747                    None,
748                )
749                .await?;
750            reports.append(&mut fetched);
751        } else {
752            for inst_type in self.instrument_types() {
753                let mut fetched = self
754                    .http_client
755                    .request_fill_reports(
756                        self.core.account_id,
757                        Some(inst_type),
758                        None,
759                        start_dt,
760                        end_dt,
761                        None,
762                    )
763                    .await?;
764                reports.append(&mut fetched);
765            }
766        }
767
768        if let Some(venue_order_id) = cmd.venue_order_id {
769            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
770        }
771
772        Ok(reports)
773    }
774
775    async fn generate_position_status_reports(
776        &self,
777        cmd: &GeneratePositionReports,
778    ) -> anyhow::Result<Vec<PositionStatusReport>> {
779        let mut reports = Vec::new();
780
781        // Query derivative positions (SWAP/FUTURES/OPTION) from /api/v5/account/positions
782        if let Some(instrument_id) = cmd.instrument_id {
783            let mut fetched = self
784                .http_client
785                .request_position_status_reports(self.core.account_id, None, Some(instrument_id))
786                .await?;
787            reports.append(&mut fetched);
788        } else {
789            for inst_type in self.instrument_types() {
790                let mut fetched = self
791                    .http_client
792                    .request_position_status_reports(self.core.account_id, Some(inst_type), None)
793                    .await?;
794                reports.append(&mut fetched);
795            }
796        }
797
798        // Query spot margin positions from /api/v5/account/balance
799        // Spot margin positions appear as balance sheet items (liab/spotInUseAmt fields)
800        let mut margin_reports = self
801            .http_client
802            .request_spot_margin_position_reports(self.core.account_id)
803            .await?;
804
805        // Filter margin reports by instrument_id if specified
806        if let Some(instrument_id) = cmd.instrument_id {
807            margin_reports.retain(|report| report.instrument_id == instrument_id);
808        }
809
810        reports.append(&mut margin_reports);
811
812        let _ = nanos_to_datetime(cmd.start);
813        let _ = nanos_to_datetime(cmd.end);
814
815        Ok(reports)
816    }
817
818    async fn generate_mass_status(
819        &self,
820        lookback_mins: Option<u64>,
821    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
822        tracing::warn!(
823            "generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})"
824        );
825        Ok(None)
826    }
827}
828
829impl OKXExecutionClient {
830    fn start_ws_stream(&mut self) -> anyhow::Result<()> {
831        if self.ws_stream_handle.is_some() {
832            return Ok(());
833        }
834
835        let stream = self.ws_private.stream();
836        let runtime = get_runtime();
837        let handle = runtime.spawn(async move {
838            pin_mut!(stream);
839            while let Some(message) = stream.next().await {
840                dispatch_ws_message(message);
841            }
842        });
843
844        self.ws_stream_handle = Some(handle);
845        Ok(())
846    }
847
848    fn start_ws_business_stream(&mut self) -> anyhow::Result<()> {
849        if self.ws_business_stream_handle.is_some() {
850            return Ok(());
851        }
852
853        let stream = self.ws_business.stream();
854        let runtime = get_runtime();
855        let handle = runtime.spawn(async move {
856            pin_mut!(stream);
857            while let Some(message) = stream.next().await {
858                dispatch_ws_message(message);
859            }
860        });
861
862        self.ws_business_stream_handle = Some(handle);
863        Ok(())
864    }
865}
866
867fn dispatch_ws_message(message: NautilusWsMessage) {
868    match message {
869        NautilusWsMessage::AccountUpdate(state) => dispatch_account_state(state),
870        NautilusWsMessage::PositionUpdate(report) => dispatch_position_status_report(report),
871        NautilusWsMessage::ExecutionReports(reports) => {
872            for report in reports {
873                dispatch_execution_report(report);
874            }
875        }
876        NautilusWsMessage::OrderRejected(event) => {
877            dispatch_order_event(OrderEventAny::Rejected(event));
878        }
879        NautilusWsMessage::OrderCancelRejected(event) => {
880            dispatch_order_event(OrderEventAny::CancelRejected(event));
881        }
882        NautilusWsMessage::OrderModifyRejected(event) => {
883            dispatch_order_event(OrderEventAny::ModifyRejected(event));
884        }
885        NautilusWsMessage::Error(e) => {
886            tracing::warn!(
887                "OKX websocket error: code={} message={} conn_id={:?}",
888                e.code,
889                e.message,
890                e.conn_id
891            );
892        }
893        NautilusWsMessage::Reconnected => {
894            tracing::info!("OKX websocket reconnected");
895        }
896        NautilusWsMessage::Authenticated => {
897            tracing::debug!("OKX websocket authenticated");
898        }
899        NautilusWsMessage::Deltas(_)
900        | NautilusWsMessage::Raw(_)
901        | NautilusWsMessage::Data(_)
902        | NautilusWsMessage::FundingRates(_)
903        | NautilusWsMessage::Instrument(_) => {
904            tracing::debug!("Ignoring OKX websocket data message");
905        }
906    }
907}
908
909fn dispatch_account_state(state: AccountState) {
910    msgbus::send_any(
911        "Portfolio.update_account".into(),
912        &state as &dyn std::any::Any,
913    );
914}
915
916fn dispatch_position_status_report(report: PositionStatusReport) {
917    let sender = get_exec_event_sender();
918    let exec_report = nautilus_common::messages::ExecutionReport::Position(Box::new(report));
919    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
920        tracing::warn!("Failed to send position status report: {e}");
921    }
922}
923
924fn dispatch_execution_report(report: ExecutionReport) {
925    let sender = get_exec_event_sender();
926    match report {
927        ExecutionReport::Order(order_report) => {
928            let exec_report =
929                nautilus_common::messages::ExecutionReport::OrderStatus(Box::new(order_report));
930            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
931                tracing::warn!("Failed to send order status report: {e}");
932            }
933        }
934        ExecutionReport::Fill(fill_report) => {
935            let exec_report =
936                nautilus_common::messages::ExecutionReport::Fill(Box::new(fill_report));
937            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
938                tracing::warn!("Failed to send fill report: {e}");
939            }
940        }
941    }
942}
943
944fn dispatch_order_event(event: OrderEventAny) {
945    let sender = get_exec_event_sender();
946    if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
947        tracing::warn!("Failed to send order event: {e}");
948    }
949}
950
951fn nanos_to_datetime(value: Option<UnixNanos>) -> Option<DateTime<Utc>> {
952    value.map(|nanos| nanos.to_datetime_utc())
953}
954
955#[cfg(test)]
956mod tests {
957    use nautilus_common::messages::execution::{BatchCancelOrders, CancelOrder};
958    use nautilus_core::UnixNanos;
959    use nautilus_model::identifiers::{
960        ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId,
961    };
962    use rstest::rstest;
963
964    #[rstest]
965    fn test_batch_cancel_orders_builds_payload() {
966        use nautilus_model::identifiers::ClientId;
967
968        let trader_id = TraderId::from("TRADER-001");
969        let strategy_id = StrategyId::from("STRATEGY-001");
970        let client_id = ClientId::from("OKX");
971        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
972        let client_order_id1 = ClientOrderId::new("order1");
973        let client_order_id2 = ClientOrderId::new("order2");
974        let venue_order_id1 = VenueOrderId::new("venue1");
975        let venue_order_id2 = VenueOrderId::new("venue2");
976
977        let cmd = BatchCancelOrders {
978            trader_id,
979            client_id,
980            strategy_id,
981            instrument_id,
982            cancels: vec![
983                CancelOrder {
984                    trader_id,
985                    client_id,
986                    strategy_id,
987                    instrument_id,
988                    client_order_id: client_order_id1,
989                    venue_order_id: venue_order_id1,
990                    command_id: Default::default(),
991                    ts_init: UnixNanos::default(),
992                },
993                CancelOrder {
994                    trader_id,
995                    client_id,
996                    strategy_id,
997                    instrument_id,
998                    client_order_id: client_order_id2,
999                    venue_order_id: venue_order_id2,
1000                    command_id: Default::default(),
1001                    ts_init: UnixNanos::default(),
1002                },
1003            ],
1004            command_id: Default::default(),
1005            ts_init: UnixNanos::default(),
1006        };
1007
1008        // Verify we can build the payload structure
1009        let mut payload = Vec::with_capacity(cmd.cancels.len());
1010        for cancel in &cmd.cancels {
1011            payload.push((
1012                cancel.instrument_id,
1013                Some(cancel.client_order_id),
1014                Some(cancel.venue_order_id),
1015            ));
1016        }
1017
1018        assert_eq!(payload.len(), 2);
1019        assert_eq!(payload[0].0, instrument_id);
1020        assert_eq!(payload[0].1, Some(client_order_id1));
1021        assert_eq!(payload[0].2, Some(venue_order_id1));
1022        assert_eq!(payload[1].0, instrument_id);
1023        assert_eq!(payload[1].1, Some(client_order_id2));
1024        assert_eq!(payload[1].2, Some(venue_order_id2));
1025    }
1026
1027    #[rstest]
1028    fn test_batch_cancel_orders_with_empty_cancels() {
1029        use nautilus_model::identifiers::ClientId;
1030
1031        let cmd = BatchCancelOrders {
1032            trader_id: TraderId::from("TRADER-001"),
1033            client_id: ClientId::from("OKX"),
1034            strategy_id: StrategyId::from("STRATEGY-001"),
1035            instrument_id: InstrumentId::from("BTC-USDT.OKX"),
1036            cancels: vec![],
1037            command_id: Default::default(),
1038            ts_init: UnixNanos::default(),
1039        };
1040
1041        let payload: Vec<(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>)> =
1042            Vec::with_capacity(cmd.cancels.len());
1043        assert_eq!(payload.len(), 0);
1044    }
1045}