nautilus_okx/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the OKX adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Mutex,
22        atomic::{AtomicBool, Ordering},
23    },
24    time::{Duration, Instant},
25};
26
27use anyhow::Context;
28use async_trait::async_trait;
29use chrono::{DateTime, Utc};
30use futures_util::{StreamExt, pin_mut};
31use nautilus_common::{
32    live::{runner::get_exec_event_sender, runtime::get_runtime},
33    messages::{
34        ExecutionEvent, ExecutionReport as NautilusExecutionReport,
35        execution::{
36            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
37            GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
38            ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
39        },
40    },
41};
42use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos, time::get_atomic_clock_realtime};
43use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
44use nautilus_model::{
45    accounts::AccountAny,
46    enums::{AccountType, OmsType, OrderType},
47    events::{AccountState, OrderEventAny, OrderRejected, OrderSubmitted},
48    identifiers::{AccountId, ClientId, InstrumentId, Venue},
49    orders::Order,
50    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
51    types::{AccountBalance, MarginBalance},
52};
53use tokio::task::JoinHandle;
54
55use crate::{
56    common::{
57        consts::{OKX_CONDITIONAL_ORDER_TYPES, OKX_VENUE},
58        enums::{OKXInstrumentType, OKXMarginMode, OKXTradeMode},
59    },
60    config::OKXExecClientConfig,
61    http::client::OKXHttpClient,
62    websocket::{
63        client::OKXWebSocketClient,
64        messages::{ExecutionReport, NautilusWsMessage},
65    },
66};
67
68#[derive(Debug)]
69pub struct OKXExecutionClient {
70    core: ExecutionClientCore,
71    config: OKXExecClientConfig,
72    http_client: OKXHttpClient,
73    ws_private: OKXWebSocketClient,
74    ws_business: OKXWebSocketClient,
75    trade_mode: OKXTradeMode,
76    exec_event_sender: Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>,
77    started: bool,
78    connected: AtomicBool,
79    instruments_initialized: AtomicBool,
80    ws_stream_handle: Option<JoinHandle<()>>,
81    ws_business_stream_handle: Option<JoinHandle<()>>,
82    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
83}
84
85impl OKXExecutionClient {
86    /// Creates a new [`OKXExecutionClient`].
87    ///
88    /// # Errors
89    ///
90    /// Returns an error if the client fails to initialize.
91    pub fn new(core: ExecutionClientCore, config: OKXExecClientConfig) -> anyhow::Result<Self> {
92        // Always use with_credentials which loads from env vars when config values are None
93        let http_client = OKXHttpClient::with_credentials(
94            config.api_key.clone(),
95            config.api_secret.clone(),
96            config.api_passphrase.clone(),
97            config.base_url_http.clone(),
98            config.http_timeout_secs,
99            config.max_retries,
100            config.retry_delay_initial_ms,
101            config.retry_delay_max_ms,
102            config.is_demo,
103            config.http_proxy_url.clone(),
104        )?;
105
106        let account_id = core.account_id;
107        let ws_private = OKXWebSocketClient::with_credentials(
108            Some(config.ws_private_url()),
109            config.api_key.clone(),
110            config.api_secret.clone(),
111            config.api_passphrase.clone(),
112            Some(account_id),
113            Some(20), // Heartbeat
114        )
115        .context("failed to construct OKX private websocket client")?;
116
117        let ws_business = OKXWebSocketClient::with_credentials(
118            Some(config.ws_business_url()),
119            config.api_key.clone(),
120            config.api_secret.clone(),
121            config.api_passphrase.clone(),
122            Some(account_id),
123            Some(20), // Heartbeat
124        )
125        .context("failed to construct OKX business websocket client")?;
126
127        let trade_mode = Self::derive_trade_mode(core.account_type, &config);
128
129        Ok(Self {
130            core,
131            config,
132            http_client,
133            ws_private,
134            ws_business,
135            trade_mode,
136            exec_event_sender: None,
137            started: false,
138            connected: AtomicBool::new(false),
139            instruments_initialized: AtomicBool::new(false),
140            ws_stream_handle: None,
141            ws_business_stream_handle: None,
142            pending_tasks: Mutex::new(Vec::new()),
143        })
144    }
145
146    fn derive_trade_mode(account_type: AccountType, config: &OKXExecClientConfig) -> OKXTradeMode {
147        let is_cross_margin = config.margin_mode == Some(OKXMarginMode::Cross);
148
149        if account_type == AccountType::Cash {
150            if !config.use_spot_margin {
151                return OKXTradeMode::Cash;
152            }
153            return if is_cross_margin {
154                OKXTradeMode::Cross
155            } else {
156                OKXTradeMode::Isolated
157            };
158        }
159
160        if is_cross_margin {
161            OKXTradeMode::Cross
162        } else {
163            OKXTradeMode::Isolated
164        }
165    }
166
167    fn instrument_types(&self) -> Vec<OKXInstrumentType> {
168        if self.config.instrument_types.is_empty() {
169            vec![OKXInstrumentType::Spot]
170        } else {
171            self.config.instrument_types.clone()
172        }
173    }
174
175    async fn refresh_account_state(&self) -> anyhow::Result<()> {
176        let account_state = self
177            .http_client
178            .request_account_state(self.core.account_id)
179            .await
180            .context("failed to request OKX account state")?;
181
182        self.core.generate_account_state(
183            account_state.balances.clone(),
184            account_state.margins.clone(),
185            account_state.is_reported,
186            account_state.ts_event,
187        )
188    }
189
190    fn update_account_state(&self) -> anyhow::Result<()> {
191        let runtime = get_runtime();
192        runtime.block_on(self.refresh_account_state())
193    }
194
195    fn is_conditional_order(&self, order_type: OrderType) -> bool {
196        OKX_CONDITIONAL_ORDER_TYPES.contains(&order_type)
197    }
198
199    fn submit_regular_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
200        let order = cmd.order.clone();
201        let ws_private = self.ws_private.clone();
202        let trade_mode = self.trade_mode;
203
204        self.spawn_task("submit_order", async move {
205            ws_private
206                .submit_order(
207                    order.trader_id(),
208                    order.strategy_id(),
209                    order.instrument_id(),
210                    trade_mode,
211                    order.client_order_id(),
212                    order.order_side(),
213                    order.order_type(),
214                    order.quantity(),
215                    Some(order.time_in_force()),
216                    order.price(),
217                    order.trigger_price(),
218                    Some(order.is_post_only()),
219                    Some(order.is_reduce_only()),
220                    Some(order.is_quote_quantity()),
221                    None,
222                )
223                .await?;
224            Ok(())
225        });
226
227        Ok(())
228    }
229
230    fn submit_conditional_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
231        let order = cmd.order.clone();
232        let trigger_price = order
233            .trigger_price()
234            .ok_or_else(|| anyhow::anyhow!("conditional order requires a trigger price"))?;
235        let http_client = self.http_client.clone();
236        let trade_mode = self.trade_mode;
237
238        self.spawn_task("submit_algo_order", async move {
239            http_client
240                .place_algo_order_with_domain_types(
241                    order.instrument_id(),
242                    trade_mode,
243                    order.client_order_id(),
244                    order.order_side(),
245                    order.order_type(),
246                    order.quantity(),
247                    trigger_price,
248                    order.trigger_type(),
249                    order.price(),
250                    Some(order.is_reduce_only()),
251                )
252                .await?;
253            Ok(())
254        });
255
256        Ok(())
257    }
258
259    fn cancel_ws_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
260        let ws_private = self.ws_private.clone();
261        let command = cmd.clone();
262
263        self.spawn_task("cancel_order", async move {
264            ws_private
265                .cancel_order(
266                    command.trader_id,
267                    command.strategy_id,
268                    command.instrument_id,
269                    Some(command.client_order_id),
270                    command.venue_order_id,
271                )
272                .await?;
273            Ok(())
274        });
275
276        Ok(())
277    }
278
279    fn mass_cancel_instrument(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
280        let ws_private = self.ws_private.clone();
281        self.spawn_task("mass_cancel_orders", async move {
282            ws_private.mass_cancel_orders(instrument_id).await?;
283            Ok(())
284        });
285        Ok(())
286    }
287
288    fn spawn_task<F>(&self, description: &'static str, fut: F)
289    where
290        F: Future<Output = anyhow::Result<()>> + Send + 'static,
291    {
292        let runtime = get_runtime();
293        let handle = runtime.spawn(async move {
294            if let Err(e) = fut.await {
295                tracing::warn!("{description} failed: {e:?}");
296            }
297        });
298
299        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
300        tasks.retain(|handle| !handle.is_finished());
301        tasks.push(handle);
302    }
303
304    fn abort_pending_tasks(&self) {
305        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
306        for handle in tasks.drain(..) {
307            handle.abort();
308        }
309    }
310
311    /// Polls the cache until the account is registered or timeout is reached.
312    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
313        let account_id = self.core.account_id;
314
315        if self.core.cache().borrow().account(&account_id).is_some() {
316            tracing::info!("Account {account_id} registered");
317            return Ok(());
318        }
319
320        let start = Instant::now();
321        let timeout = Duration::from_secs_f64(timeout_secs);
322        let interval = Duration::from_millis(10);
323
324        loop {
325            tokio::time::sleep(interval).await;
326
327            if self.core.cache().borrow().account(&account_id).is_some() {
328                tracing::info!("Account {account_id} registered");
329                return Ok(());
330            }
331
332            if start.elapsed() >= timeout {
333                anyhow::bail!(
334                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
335                );
336            }
337        }
338    }
339}
340
341#[async_trait(?Send)]
342impl ExecutionClient for OKXExecutionClient {
343    fn is_connected(&self) -> bool {
344        self.connected.load(Ordering::Acquire)
345    }
346
347    fn client_id(&self) -> ClientId {
348        self.core.client_id
349    }
350
351    fn account_id(&self) -> AccountId {
352        self.core.account_id
353    }
354
355    fn venue(&self) -> Venue {
356        *OKX_VENUE
357    }
358
359    fn oms_type(&self) -> OmsType {
360        self.core.oms_type
361    }
362
363    fn get_account(&self) -> Option<AccountAny> {
364        self.core.get_account()
365    }
366
367    async fn connect(&mut self) -> anyhow::Result<()> {
368        if self.connected.load(Ordering::Acquire) {
369            return Ok(());
370        }
371
372        // Initialize exec event sender (must be done in async context after runner is set up)
373        if self.exec_event_sender.is_none() {
374            self.exec_event_sender = Some(get_exec_event_sender());
375        }
376
377        let instrument_types = self.instrument_types();
378
379        if !self.instruments_initialized.load(Ordering::Acquire) {
380            let mut all_instruments = Vec::new();
381            for instrument_type in &instrument_types {
382                let instruments = self
383                    .http_client
384                    .request_instruments(*instrument_type, None)
385                    .await
386                    .with_context(|| {
387                        format!("failed to request OKX instruments for {instrument_type:?}")
388                    })?;
389
390                if instruments.is_empty() {
391                    tracing::warn!("No instruments returned for {instrument_type:?}");
392                    continue;
393                }
394
395                tracing::info!(
396                    "Loaded {} {instrument_type:?} instruments",
397                    instruments.len()
398                );
399
400                self.http_client.cache_instruments(instruments.clone());
401                all_instruments.extend(instruments);
402            }
403
404            // Add instruments to Nautilus Cache for reconciliation
405            {
406                let mut cache = self.core.cache().borrow_mut();
407                for instrument in &all_instruments {
408                    if let Err(e) = cache.add_instrument(instrument.clone()) {
409                        tracing::debug!("Instrument already in cache: {e}");
410                    }
411                }
412            }
413
414            if !all_instruments.is_empty() {
415                self.ws_private.cache_instruments(all_instruments);
416            }
417            self.instruments_initialized.store(true, Ordering::Release);
418        }
419
420        let Some(sender) = self.exec_event_sender.as_ref() else {
421            tracing::error!("Execution event sender not initialized");
422            anyhow::bail!("Execution event sender not initialized");
423        };
424
425        self.ws_private.connect().await?;
426        self.ws_private.wait_until_active(10.0).await?;
427        tracing::info!("Connected to private WebSocket");
428
429        if self.ws_stream_handle.is_none() {
430            let stream = self.ws_private.stream();
431            let sender = sender.clone();
432            let handle = get_runtime().spawn(async move {
433                pin_mut!(stream);
434                while let Some(message) = stream.next().await {
435                    dispatch_ws_message(message, &sender);
436                }
437            });
438            self.ws_stream_handle = Some(handle);
439        }
440
441        self.ws_business.connect().await?;
442        self.ws_business.wait_until_active(10.0).await?;
443        tracing::info!("Connected to business WebSocket");
444
445        if self.ws_business_stream_handle.is_none() {
446            let stream = self.ws_business.stream();
447            let sender = sender.clone();
448            let handle = get_runtime().spawn(async move {
449                pin_mut!(stream);
450                while let Some(message) = stream.next().await {
451                    dispatch_ws_message(message, &sender);
452                }
453            });
454            self.ws_business_stream_handle = Some(handle);
455        }
456
457        for inst_type in &instrument_types {
458            tracing::info!("Subscribing to orders channel for {inst_type:?}");
459            self.ws_private.subscribe_orders(*inst_type).await?;
460
461            if self.config.use_fills_channel {
462                tracing::info!("Subscribing to fills channel for {inst_type:?}");
463                if let Err(e) = self.ws_private.subscribe_fills(*inst_type).await {
464                    tracing::warn!("Failed to subscribe to fills channel ({inst_type:?}): {e}");
465                }
466            }
467        }
468
469        self.ws_private.subscribe_account().await?;
470
471        // Subscribe to algo orders on business WebSocket (OKX requires this endpoint)
472        for inst_type in &instrument_types {
473            if *inst_type != OKXInstrumentType::Option {
474                self.ws_business.subscribe_orders_algo(*inst_type).await?;
475            }
476        }
477
478        let account_state = self
479            .http_client
480            .request_account_state(self.core.account_id)
481            .await
482            .context("failed to request OKX account state")?;
483
484        if !account_state.balances.is_empty() {
485            tracing::info!(
486                "Received account state with {} balance(s)",
487                account_state.balances.len()
488            );
489        }
490        dispatch_account_state(account_state, sender);
491
492        // Wait for account to be registered in cache before completing connect
493        self.await_account_registered(30.0).await?;
494
495        self.connected.store(true, Ordering::Release);
496        tracing::info!(client_id = %self.core.client_id, "Connected");
497        Ok(())
498    }
499
500    async fn disconnect(&mut self) -> anyhow::Result<()> {
501        if !self.connected.load(Ordering::Acquire) {
502            return Ok(());
503        }
504
505        self.abort_pending_tasks();
506        self.http_client.cancel_all_requests();
507
508        if let Err(e) = self.ws_private.close().await {
509            tracing::warn!("Error closing private websocket: {e:?}");
510        }
511
512        if let Err(e) = self.ws_business.close().await {
513            tracing::warn!("Error closing business websocket: {e:?}");
514        }
515
516        if let Some(handle) = self.ws_stream_handle.take() {
517            handle.abort();
518        }
519
520        if let Some(handle) = self.ws_business_stream_handle.take() {
521            handle.abort();
522        }
523
524        self.connected.store(false, Ordering::Release);
525        tracing::info!(client_id = %self.core.client_id, "Disconnected");
526        Ok(())
527    }
528
529    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
530        self.update_account_state()
531    }
532
533    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
534        tracing::debug!(
535            "query_order not implemented for OKX execution client (client_order_id={})",
536            cmd.client_order_id
537        );
538        Ok(())
539    }
540
541    fn generate_account_state(
542        &self,
543        balances: Vec<AccountBalance>,
544        margins: Vec<MarginBalance>,
545        reported: bool,
546        ts_event: UnixNanos,
547    ) -> anyhow::Result<()> {
548        self.core
549            .generate_account_state(balances, margins, reported, ts_event)
550    }
551
552    fn start(&mut self) -> anyhow::Result<()> {
553        if self.started {
554            return Ok(());
555        }
556
557        self.started = true;
558
559        // Spawn instrument bootstrap task
560        let http_client = self.http_client.clone();
561        let ws_private = self.ws_private.clone();
562        let instrument_types = self.config.instrument_types.clone();
563
564        get_runtime().spawn(async move {
565            let mut all_instruments = Vec::new();
566            for instrument_type in instrument_types {
567                match http_client.request_instruments(instrument_type, None).await {
568                    Ok(instruments) => {
569                        if instruments.is_empty() {
570                            tracing::warn!("No instruments returned for {instrument_type:?}");
571                            continue;
572                        }
573                        http_client.cache_instruments(instruments.clone());
574                        all_instruments.extend(instruments);
575                    }
576                    Err(e) => {
577                        tracing::error!(
578                            "Failed to request instruments for {instrument_type:?}: {e}"
579                        );
580                    }
581                }
582            }
583
584            if all_instruments.is_empty() {
585                tracing::warn!(
586                    "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
587                );
588            } else {
589                ws_private.cache_instruments(all_instruments);
590                tracing::info!("Instruments initialized");
591            }
592        });
593
594        tracing::info!(
595            client_id = %self.core.client_id,
596            account_id = %self.core.account_id,
597            account_type = ?self.core.account_type,
598            trade_mode = ?self.trade_mode,
599            instrument_types = ?self.config.instrument_types,
600            use_fills_channel = self.config.use_fills_channel,
601            is_demo = self.config.is_demo,
602            http_proxy_url = ?self.config.http_proxy_url,
603            ws_proxy_url = ?self.config.ws_proxy_url,
604            "Started"
605        );
606        Ok(())
607    }
608
609    fn stop(&mut self) -> anyhow::Result<()> {
610        if !self.started {
611            return Ok(());
612        }
613
614        self.started = false;
615        self.connected.store(false, Ordering::Release);
616        if let Some(handle) = self.ws_stream_handle.take() {
617            handle.abort();
618        }
619        self.abort_pending_tasks();
620        tracing::info!(client_id = %self.core.client_id, "Stopped");
621        Ok(())
622    }
623
624    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
625        let order = &cmd.order;
626
627        if order.is_closed() {
628            let client_order_id = order.client_order_id();
629            tracing::warn!("Cannot submit closed order {client_order_id}");
630            return Ok(());
631        }
632
633        let event = OrderSubmitted::new(
634            self.core.trader_id,
635            order.strategy_id(),
636            order.instrument_id(),
637            order.client_order_id(),
638            self.core.account_id,
639            UUID4::new(),
640            cmd.ts_init,
641            get_atomic_clock_realtime().get_time_ns(),
642        );
643        if let Some(sender) = &self.exec_event_sender {
644            tracing::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
645            if let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::Submitted(event))) {
646                tracing::warn!("Failed to send OrderSubmitted event: {e}");
647            }
648        } else {
649            tracing::warn!("Cannot send OrderSubmitted: exec_event_sender not initialized");
650        }
651
652        let result = if self.is_conditional_order(order.order_type()) {
653            self.submit_conditional_order(cmd)
654        } else {
655            self.submit_regular_order(cmd)
656        };
657
658        if let Err(e) = result {
659            let rejected_event = OrderRejected::new(
660                self.core.trader_id,
661                order.strategy_id(),
662                order.instrument_id(),
663                order.client_order_id(),
664                self.core.account_id,
665                format!("submit-order-error: {e}").into(),
666                UUID4::new(),
667                cmd.ts_init,
668                get_atomic_clock_realtime().get_time_ns(),
669                false,
670                false,
671            );
672            if let Some(sender) = &self.exec_event_sender {
673                if let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::Rejected(
674                    rejected_event,
675                ))) {
676                    tracing::warn!("Failed to send OrderRejected event: {e}");
677                }
678            } else {
679                tracing::warn!("Cannot send OrderRejected: exec_event_sender not initialized");
680            }
681            return Err(e);
682        }
683
684        Ok(())
685    }
686
687    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
688        tracing::warn!(
689            "submit_order_list not yet implemented for OKX execution client (got {} orders)",
690            cmd.order_list.orders.len()
691        );
692        Ok(())
693    }
694
695    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
696        let ws_private = self.ws_private.clone();
697        let command = cmd.clone();
698
699        self.spawn_task("modify_order", async move {
700            ws_private
701                .modify_order(
702                    command.trader_id,
703                    command.strategy_id,
704                    command.instrument_id,
705                    Some(command.client_order_id),
706                    command.price,
707                    command.quantity,
708                    command.venue_order_id,
709                )
710                .await?;
711            Ok(())
712        });
713
714        Ok(())
715    }
716
717    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
718        self.cancel_ws_order(cmd)
719    }
720
721    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
722        if self.config.use_mm_mass_cancel {
723            // Use OKX's mass-cancel endpoint (requires market maker permissions)
724            self.mass_cancel_instrument(cmd.instrument_id)
725        } else {
726            // Cancel orders individually via batch cancel (works for all users)
727            let cache = self.core.cache().borrow();
728            let open_orders = cache.orders_open(None, Some(&cmd.instrument_id), None, None);
729
730            if open_orders.is_empty() {
731                tracing::debug!("No open orders to cancel for {}", cmd.instrument_id);
732                return Ok(());
733            }
734
735            let mut payload = Vec::with_capacity(open_orders.len());
736            for order in open_orders {
737                payload.push((
738                    order.instrument_id(),
739                    Some(order.client_order_id()),
740                    order.venue_order_id(),
741                ));
742            }
743            drop(cache);
744
745            tracing::debug!(
746                "Canceling {} open orders for {} via batch cancel",
747                payload.len(),
748                cmd.instrument_id
749            );
750
751            let ws_private = self.ws_private.clone();
752            self.spawn_task("batch_cancel_orders", async move {
753                ws_private.batch_cancel_orders(payload).await?;
754                Ok(())
755            });
756
757            Ok(())
758        }
759    }
760
761    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
762        let mut payload = Vec::with_capacity(cmd.cancels.len());
763
764        for cancel in &cmd.cancels {
765            payload.push((
766                cancel.instrument_id,
767                Some(cancel.client_order_id),
768                cancel.venue_order_id,
769            ));
770        }
771
772        let ws_private = self.ws_private.clone();
773        self.spawn_task("batch_cancel_orders", async move {
774            ws_private.batch_cancel_orders(payload).await?;
775            Ok(())
776        });
777
778        Ok(())
779    }
780
781    async fn generate_order_status_report(
782        &self,
783        cmd: &GenerateOrderStatusReport,
784    ) -> anyhow::Result<Option<OrderStatusReport>> {
785        let Some(instrument_id) = cmd.instrument_id else {
786            tracing::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
787            return Ok(None);
788        };
789
790        let mut reports = self
791            .http_client
792            .request_order_status_reports(
793                self.core.account_id,
794                None,
795                Some(instrument_id),
796                None,
797                None,
798                false,
799                None,
800            )
801            .await?;
802
803        if let Some(client_order_id) = cmd.client_order_id {
804            reports.retain(|report| report.client_order_id == Some(client_order_id));
805        }
806
807        if let Some(venue_order_id) = cmd.venue_order_id {
808            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
809        }
810
811        Ok(reports.into_iter().next())
812    }
813
814    async fn generate_order_status_reports(
815        &self,
816        cmd: &GenerateOrderStatusReports,
817    ) -> anyhow::Result<Vec<OrderStatusReport>> {
818        let mut reports = Vec::new();
819
820        if let Some(instrument_id) = cmd.instrument_id {
821            let mut fetched = self
822                .http_client
823                .request_order_status_reports(
824                    self.core.account_id,
825                    None,
826                    Some(instrument_id),
827                    None,
828                    None,
829                    false,
830                    None,
831                )
832                .await?;
833            reports.append(&mut fetched);
834        } else {
835            for inst_type in self.instrument_types() {
836                let mut fetched = self
837                    .http_client
838                    .request_order_status_reports(
839                        self.core.account_id,
840                        Some(inst_type),
841                        None,
842                        None,
843                        None,
844                        false,
845                        None,
846                    )
847                    .await?;
848                reports.append(&mut fetched);
849            }
850        }
851
852        // Filter by open_only if specified
853        if cmd.open_only {
854            reports.retain(|r| r.order_status.is_open());
855        }
856
857        // Filter by time range if specified
858        if let Some(start) = cmd.start {
859            reports.retain(|r| r.ts_last >= start);
860        }
861        if let Some(end) = cmd.end {
862            reports.retain(|r| r.ts_last <= end);
863        }
864
865        Ok(reports)
866    }
867
868    async fn generate_fill_reports(
869        &self,
870        cmd: GenerateFillReports,
871    ) -> anyhow::Result<Vec<FillReport>> {
872        let start_dt = nanos_to_datetime(cmd.start);
873        let end_dt = nanos_to_datetime(cmd.end);
874        let mut reports = Vec::new();
875
876        if let Some(instrument_id) = cmd.instrument_id {
877            let mut fetched = self
878                .http_client
879                .request_fill_reports(
880                    self.core.account_id,
881                    None,
882                    Some(instrument_id),
883                    start_dt,
884                    end_dt,
885                    None,
886                )
887                .await?;
888            reports.append(&mut fetched);
889        } else {
890            for inst_type in self.instrument_types() {
891                let mut fetched = self
892                    .http_client
893                    .request_fill_reports(
894                        self.core.account_id,
895                        Some(inst_type),
896                        None,
897                        start_dt,
898                        end_dt,
899                        None,
900                    )
901                    .await?;
902                reports.append(&mut fetched);
903            }
904        }
905
906        if let Some(venue_order_id) = cmd.venue_order_id {
907            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
908        }
909
910        Ok(reports)
911    }
912
913    async fn generate_position_status_reports(
914        &self,
915        cmd: &GeneratePositionStatusReports,
916    ) -> anyhow::Result<Vec<PositionStatusReport>> {
917        let mut reports = Vec::new();
918
919        // Query derivative positions (SWAP/FUTURES/OPTION) from /api/v5/account/positions
920        // Note: The positions endpoint does not support Spot or Margin - those are handled separately
921        if let Some(instrument_id) = cmd.instrument_id {
922            let mut fetched = self
923                .http_client
924                .request_position_status_reports(self.core.account_id, None, Some(instrument_id))
925                .await?;
926            reports.append(&mut fetched);
927        } else {
928            for inst_type in self.instrument_types() {
929                // Skip Spot and Margin - positions API only supports derivatives
930                if inst_type == OKXInstrumentType::Spot || inst_type == OKXInstrumentType::Margin {
931                    continue;
932                }
933                let mut fetched = self
934                    .http_client
935                    .request_position_status_reports(self.core.account_id, Some(inst_type), None)
936                    .await?;
937                reports.append(&mut fetched);
938            }
939        }
940
941        // Query spot margin positions from /api/v5/account/balance
942        // Spot margin positions appear as balance sheet items (liab/spotInUseAmt fields)
943        let mut margin_reports = self
944            .http_client
945            .request_spot_margin_position_reports(self.core.account_id)
946            .await?;
947
948        if let Some(instrument_id) = cmd.instrument_id {
949            margin_reports.retain(|report| report.instrument_id == instrument_id);
950        }
951
952        reports.append(&mut margin_reports);
953
954        let _ = nanos_to_datetime(cmd.start);
955        let _ = nanos_to_datetime(cmd.end);
956
957        Ok(reports)
958    }
959
960    async fn generate_mass_status(
961        &self,
962        lookback_mins: Option<u64>,
963    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
964        tracing::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
965
966        let ts_now = get_atomic_clock_realtime().get_time_ns();
967
968        let start = lookback_mins.map(|mins| {
969            let lookback_ns = mins * 60 * 1_000_000_000;
970            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
971        });
972
973        let order_cmd = GenerateOrderStatusReports::new(
974            UUID4::new(),
975            ts_now,
976            false, // open_only - get all orders for mass status
977            None,  // instrument_id
978            start, // start
979            None,  // end
980            None,  // params
981            None,  // correlation_id
982        );
983
984        let fill_cmd = GenerateFillReports::new(
985            UUID4::new(),
986            ts_now,
987            None, // instrument_id
988            None, // venue_order_id
989            start,
990            None, // end
991            None, // params
992            None, // correlation_id
993        );
994
995        let position_cmd = GeneratePositionStatusReports::new(
996            UUID4::new(),
997            ts_now,
998            None, // instrument_id
999            start,
1000            None, // end
1001            None, // params
1002            None, // correlation_id
1003        );
1004
1005        let (order_reports, fill_reports, position_reports) = tokio::try_join!(
1006            self.generate_order_status_reports(&order_cmd),
1007            self.generate_fill_reports(fill_cmd),
1008            self.generate_position_status_reports(&position_cmd),
1009        )?;
1010
1011        tracing::info!("Received {} OrderStatusReports", order_reports.len());
1012        tracing::info!("Received {} FillReports", fill_reports.len());
1013        tracing::info!("Received {} PositionReports", position_reports.len());
1014
1015        let mut mass_status = ExecutionMassStatus::new(
1016            self.core.client_id,
1017            self.core.account_id,
1018            *OKX_VENUE,
1019            ts_now,
1020            None,
1021        );
1022
1023        mass_status.add_order_reports(order_reports);
1024        mass_status.add_fill_reports(fill_reports);
1025        mass_status.add_position_reports(position_reports);
1026
1027        Ok(Some(mass_status))
1028    }
1029}
1030
1031fn dispatch_ws_message(
1032    message: NautilusWsMessage,
1033    sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1034) {
1035    match message {
1036        NautilusWsMessage::AccountUpdate(state) => dispatch_account_state(state, sender),
1037        NautilusWsMessage::PositionUpdate(report) => {
1038            dispatch_position_status_report(report, sender);
1039        }
1040        NautilusWsMessage::ExecutionReports(reports) => {
1041            tracing::debug!("Processing {} execution report(s)", reports.len());
1042            for report in reports {
1043                dispatch_execution_report(report, sender);
1044            }
1045        }
1046        NautilusWsMessage::OrderAccepted(event) => {
1047            dispatch_order_event(OrderEventAny::Accepted(event), sender);
1048        }
1049        NautilusWsMessage::OrderCanceled(event) => {
1050            dispatch_order_event(OrderEventAny::Canceled(event), sender);
1051        }
1052        NautilusWsMessage::OrderExpired(event) => {
1053            dispatch_order_event(OrderEventAny::Expired(event), sender);
1054        }
1055        NautilusWsMessage::OrderRejected(event) => {
1056            dispatch_order_event(OrderEventAny::Rejected(event), sender);
1057        }
1058        NautilusWsMessage::OrderCancelRejected(event) => {
1059            dispatch_order_event(OrderEventAny::CancelRejected(event), sender);
1060        }
1061        NautilusWsMessage::OrderModifyRejected(event) => {
1062            dispatch_order_event(OrderEventAny::ModifyRejected(event), sender);
1063        }
1064        NautilusWsMessage::OrderTriggered(event) => {
1065            dispatch_order_event(OrderEventAny::Triggered(event), sender);
1066        }
1067        NautilusWsMessage::OrderUpdated(event) => {
1068            dispatch_order_event(OrderEventAny::Updated(event), sender);
1069        }
1070        NautilusWsMessage::Error(e) => {
1071            tracing::warn!(
1072                "Websocket error: code={} message={} conn_id={:?}",
1073                e.code,
1074                e.message,
1075                e.conn_id
1076            );
1077        }
1078        NautilusWsMessage::Reconnected => {
1079            tracing::info!("Websocket reconnected");
1080        }
1081        NautilusWsMessage::Authenticated => {
1082            tracing::debug!("Websocket authenticated");
1083        }
1084        NautilusWsMessage::Deltas(_)
1085        | NautilusWsMessage::Raw(_)
1086        | NautilusWsMessage::Data(_)
1087        | NautilusWsMessage::FundingRates(_)
1088        | NautilusWsMessage::Instrument(_) => {
1089            tracing::debug!("Ignoring websocket data message");
1090        }
1091    }
1092}
1093
1094fn dispatch_account_state(
1095    state: AccountState,
1096    sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1097) {
1098    if let Err(e) = sender.send(ExecutionEvent::Account(state)) {
1099        tracing::warn!("Failed to send account state: {e}");
1100    }
1101}
1102
1103fn dispatch_position_status_report(
1104    report: PositionStatusReport,
1105    sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1106) {
1107    let exec_report = NautilusExecutionReport::Position(Box::new(report));
1108    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1109        tracing::warn!("Failed to send position status report: {e}");
1110    }
1111}
1112
1113fn dispatch_execution_report(
1114    report: ExecutionReport,
1115    sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1116) {
1117    match report {
1118        ExecutionReport::Order(order_report) => {
1119            let exec_report = NautilusExecutionReport::OrderStatus(Box::new(order_report));
1120            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1121                tracing::warn!("Failed to send order status report: {e}");
1122            }
1123        }
1124        ExecutionReport::Fill(fill_report) => {
1125            let exec_report = NautilusExecutionReport::Fill(Box::new(fill_report));
1126            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1127                tracing::warn!("Failed to send fill report: {e}");
1128            }
1129        }
1130    }
1131}
1132
1133fn dispatch_order_event(
1134    event: OrderEventAny,
1135    sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1136) {
1137    if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
1138        tracing::warn!("Failed to send order event: {e}");
1139    }
1140}
1141
1142fn nanos_to_datetime(value: Option<UnixNanos>) -> Option<DateTime<Utc>> {
1143    value.map(|nanos| nanos.to_datetime_utc())
1144}
1145
1146#[cfg(test)]
1147mod tests {
1148    use nautilus_common::messages::execution::{BatchCancelOrders, CancelOrder};
1149    use nautilus_core::UnixNanos;
1150    use nautilus_model::identifiers::{
1151        ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId,
1152    };
1153    use rstest::rstest;
1154
1155    #[rstest]
1156    fn test_batch_cancel_orders_builds_payload() {
1157        let trader_id = TraderId::from("TRADER-001");
1158        let strategy_id = StrategyId::from("STRATEGY-001");
1159        let client_id = Some(ClientId::from("OKX"));
1160        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1161        let client_order_id1 = ClientOrderId::new("order1");
1162        let client_order_id2 = ClientOrderId::new("order2");
1163        let venue_order_id1 = VenueOrderId::new("venue1");
1164        let venue_order_id2 = VenueOrderId::new("venue2");
1165
1166        let cmd = BatchCancelOrders {
1167            trader_id,
1168            client_id,
1169            strategy_id,
1170            instrument_id,
1171            cancels: vec![
1172                CancelOrder {
1173                    trader_id,
1174                    client_id,
1175                    strategy_id,
1176                    instrument_id,
1177                    client_order_id: client_order_id1,
1178                    venue_order_id: Some(venue_order_id1),
1179                    command_id: Default::default(),
1180                    ts_init: UnixNanos::default(),
1181                    params: None,
1182                },
1183                CancelOrder {
1184                    trader_id,
1185                    client_id,
1186                    strategy_id,
1187                    instrument_id,
1188                    client_order_id: client_order_id2,
1189                    venue_order_id: Some(venue_order_id2),
1190                    command_id: Default::default(),
1191                    ts_init: UnixNanos::default(),
1192                    params: None,
1193                },
1194            ],
1195            command_id: Default::default(),
1196            ts_init: UnixNanos::default(),
1197            params: None,
1198        };
1199
1200        // Verify we can build the payload structure
1201        let mut payload = Vec::with_capacity(cmd.cancels.len());
1202        for cancel in &cmd.cancels {
1203            payload.push((
1204                cancel.instrument_id,
1205                Some(cancel.client_order_id),
1206                cancel.venue_order_id,
1207            ));
1208        }
1209
1210        assert_eq!(payload.len(), 2);
1211        assert_eq!(payload[0].0, instrument_id);
1212        assert_eq!(payload[0].1, Some(client_order_id1));
1213        assert_eq!(payload[0].2, Some(venue_order_id1));
1214        assert_eq!(payload[1].0, instrument_id);
1215        assert_eq!(payload[1].1, Some(client_order_id2));
1216        assert_eq!(payload[1].2, Some(venue_order_id2));
1217    }
1218
1219    #[rstest]
1220    fn test_batch_cancel_orders_with_empty_cancels() {
1221        let cmd = BatchCancelOrders {
1222            trader_id: TraderId::from("TRADER-001"),
1223            client_id: Some(ClientId::from("OKX")),
1224            strategy_id: StrategyId::from("STRATEGY-001"),
1225            instrument_id: InstrumentId::from("BTC-USDT.OKX"),
1226            cancels: vec![],
1227            command_id: Default::default(),
1228            ts_init: UnixNanos::default(),
1229            params: None,
1230        };
1231
1232        let payload: Vec<(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>)> =
1233            Vec::with_capacity(cmd.cancels.len());
1234        assert_eq!(payload.len(), 0);
1235    }
1236}