Skip to main content

nautilus_okx/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the OKX adapter.
17
18use std::{
19    future::Future,
20    sync::Mutex,
21    time::{Duration, Instant},
22};
23
24use anyhow::Context;
25use async_trait::async_trait;
26use chrono::{DateTime, Utc};
27use futures_util::{StreamExt, pin_mut};
28use nautilus_common::{
29    clients::ExecutionClient,
30    live::{get_runtime, runner::get_exec_event_sender},
31    messages::execution::{
32        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
33        GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
34        GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
35        GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder, SubmitOrder,
36        SubmitOrderList,
37    },
38};
39use nautilus_core::{
40    MUTEX_POISONED, UnixNanos,
41    time::{AtomicTime, get_atomic_clock_realtime},
42};
43use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
44use nautilus_model::{
45    accounts::AccountAny,
46    enums::{AccountType, OmsType, OrderType},
47    events::OrderEventAny,
48    identifiers::{
49        AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId, Venue, VenueOrderId,
50    },
51    orders::Order,
52    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
53    types::{AccountBalance, MarginBalance},
54};
55use tokio::task::JoinHandle;
56
57use crate::{
58    common::{
59        consts::{OKX_CONDITIONAL_ORDER_TYPES, OKX_VENUE},
60        enums::{OKXInstrumentType, OKXMarginMode, OKXTradeMode},
61    },
62    config::OKXExecClientConfig,
63    http::{client::OKXHttpClient, models::OKXCancelAlgoOrderRequest},
64    websocket::{
65        client::OKXWebSocketClient,
66        messages::{ExecutionReport, NautilusWsMessage},
67    },
68};
69
70#[derive(Debug)]
71pub struct OKXExecutionClient {
72    core: ExecutionClientCore,
73    clock: &'static AtomicTime,
74    config: OKXExecClientConfig,
75    emitter: ExecutionEventEmitter,
76    http_client: OKXHttpClient,
77    ws_private: OKXWebSocketClient,
78    ws_business: OKXWebSocketClient,
79    trade_mode: OKXTradeMode,
80    ws_stream_handle: Option<JoinHandle<()>>,
81    ws_business_stream_handle: Option<JoinHandle<()>>,
82    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
83}
84
85impl OKXExecutionClient {
86    /// Creates a new [`OKXExecutionClient`].
87    ///
88    /// # Errors
89    ///
90    /// Returns an error if the client fails to initialize.
91    pub fn new(core: ExecutionClientCore, config: OKXExecClientConfig) -> anyhow::Result<Self> {
92        // Always use with_credentials which loads from env vars when config values are None
93        let http_client = OKXHttpClient::with_credentials(
94            config.api_key.clone(),
95            config.api_secret.clone(),
96            config.api_passphrase.clone(),
97            config.base_url_http.clone(),
98            config.http_timeout_secs,
99            config.max_retries,
100            config.retry_delay_initial_ms,
101            config.retry_delay_max_ms,
102            config.is_demo,
103            config.http_proxy_url.clone(),
104        )?;
105
106        let account_id = core.account_id;
107
108        let ws_private = OKXWebSocketClient::with_credentials(
109            Some(config.ws_private_url()),
110            config.api_key.clone(),
111            config.api_secret.clone(),
112            config.api_passphrase.clone(),
113            Some(account_id),
114            Some(20), // Heartbeat
115        )
116        .context("failed to construct OKX private websocket client")?;
117
118        let ws_business = OKXWebSocketClient::with_credentials(
119            Some(config.ws_business_url()),
120            config.api_key.clone(),
121            config.api_secret.clone(),
122            config.api_passphrase.clone(),
123            Some(account_id),
124            Some(20), // Heartbeat
125        )
126        .context("failed to construct OKX business websocket client")?;
127
128        let trade_mode = Self::derive_trade_mode(core.account_type, &config);
129        let clock = get_atomic_clock_realtime();
130        let emitter = ExecutionEventEmitter::new(
131            clock,
132            core.trader_id,
133            core.account_id,
134            core.account_type,
135            None,
136        );
137
138        Ok(Self {
139            core,
140            clock,
141            config,
142            emitter,
143            http_client,
144            ws_private,
145            ws_business,
146            trade_mode,
147            ws_stream_handle: None,
148            ws_business_stream_handle: None,
149            pending_tasks: Mutex::new(Vec::new()),
150        })
151    }
152
153    fn derive_trade_mode(account_type: AccountType, config: &OKXExecClientConfig) -> OKXTradeMode {
154        let is_cross_margin = config.margin_mode == Some(OKXMarginMode::Cross);
155
156        if account_type == AccountType::Cash {
157            if !config.use_spot_margin {
158                return OKXTradeMode::Cash;
159            }
160            return if is_cross_margin {
161                OKXTradeMode::Cross
162            } else {
163                OKXTradeMode::Isolated
164            };
165        }
166
167        if is_cross_margin {
168            OKXTradeMode::Cross
169        } else {
170            OKXTradeMode::Isolated
171        }
172    }
173
174    fn instrument_types(&self) -> Vec<OKXInstrumentType> {
175        if self.config.instrument_types.is_empty() {
176            vec![OKXInstrumentType::Spot]
177        } else {
178            self.config.instrument_types.clone()
179        }
180    }
181
182    async fn refresh_account_state(&self) -> anyhow::Result<()> {
183        let account_state = self
184            .http_client
185            .request_account_state(self.core.account_id)
186            .await
187            .context("failed to request OKX account state")?;
188
189        self.emitter.send_account_state(account_state);
190        Ok(())
191    }
192
193    fn update_account_state(&self) -> anyhow::Result<()> {
194        let runtime = get_runtime();
195        runtime.block_on(self.refresh_account_state())
196    }
197
198    fn is_conditional_order(&self, order_type: OrderType) -> bool {
199        OKX_CONDITIONAL_ORDER_TYPES.contains(&order_type)
200    }
201
202    fn submit_regular_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
203        let order = {
204            let cache = self.core.cache();
205            cache
206                .order(&cmd.client_order_id)
207                .cloned()
208                .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?
209        };
210        let ws_private = self.ws_private.clone();
211        let trade_mode = self.trade_mode;
212
213        let emitter = self.emitter.clone();
214        let clock = self.clock;
215        let trader_id = self.core.trader_id;
216        let client_order_id = order.client_order_id();
217        let strategy_id = order.strategy_id();
218        let instrument_id = order.instrument_id();
219        let order_side = order.order_side();
220        let order_type = order.order_type();
221        let quantity = order.quantity();
222        let time_in_force = order.time_in_force();
223        let price = order.price();
224        let trigger_price = order.trigger_price();
225        let is_post_only = order.is_post_only();
226        let is_reduce_only = order.is_reduce_only();
227        let is_quote_quantity = order.is_quote_quantity();
228
229        self.spawn_task("submit_order", async move {
230            let result = ws_private
231                .submit_order(
232                    trader_id,
233                    strategy_id,
234                    instrument_id,
235                    trade_mode,
236                    client_order_id,
237                    order_side,
238                    order_type,
239                    quantity,
240                    Some(time_in_force),
241                    price,
242                    trigger_price,
243                    Some(is_post_only),
244                    Some(is_reduce_only),
245                    Some(is_quote_quantity),
246                    None,
247                )
248                .await
249                .map_err(|e| anyhow::anyhow!("Submit order failed: {e}"));
250
251            if let Err(e) = result {
252                let ts_event = clock.get_time_ns();
253                emitter.emit_order_rejected_event(
254                    strategy_id,
255                    instrument_id,
256                    client_order_id,
257                    &format!("submit-order-error: {e}"),
258                    ts_event,
259                    false,
260                );
261                return Err(e);
262            }
263
264            Ok(())
265        });
266
267        Ok(())
268    }
269
270    fn submit_conditional_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
271        let order = {
272            let cache = self.core.cache();
273            cache
274                .order(&cmd.client_order_id)
275                .cloned()
276                .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?
277        };
278        let trigger_price = order
279            .trigger_price()
280            .ok_or_else(|| anyhow::anyhow!("conditional order requires a trigger price"))?;
281        let http_client = self.http_client.clone();
282        let trade_mode = self.trade_mode;
283
284        let emitter = self.emitter.clone();
285        let clock = self.clock;
286        let client_order_id = order.client_order_id();
287        let strategy_id = order.strategy_id();
288        let instrument_id = order.instrument_id();
289        let order_side = order.order_side();
290        let order_type = order.order_type();
291        let quantity = order.quantity();
292        let trigger_type = order.trigger_type();
293        let price = order.price();
294        let is_reduce_only = order.is_reduce_only();
295
296        self.spawn_task("submit_algo_order", async move {
297            let result = http_client
298                .place_algo_order_with_domain_types(
299                    instrument_id,
300                    trade_mode,
301                    client_order_id,
302                    order_side,
303                    order_type,
304                    quantity,
305                    trigger_price,
306                    trigger_type,
307                    price,
308                    Some(is_reduce_only),
309                )
310                .await
311                .map_err(|e| anyhow::anyhow!("Submit algo order failed: {e}"));
312
313            if let Err(e) = result {
314                let ts_event = clock.get_time_ns();
315                emitter.emit_order_rejected_event(
316                    strategy_id,
317                    instrument_id,
318                    client_order_id,
319                    &format!("submit-order-error: {e}"),
320                    ts_event,
321                    false,
322                );
323                return Err(e);
324            }
325
326            Ok(())
327        });
328
329        Ok(())
330    }
331
332    fn cancel_ws_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
333        let ws_private = self.ws_private.clone();
334        let command = cmd.clone();
335
336        let emitter = self.emitter.clone();
337        let clock = self.clock;
338
339        self.spawn_task("cancel_order", async move {
340            let result = ws_private
341                .cancel_order(
342                    command.trader_id,
343                    command.strategy_id,
344                    command.instrument_id,
345                    Some(command.client_order_id),
346                    command.venue_order_id,
347                )
348                .await
349                .map_err(|e| anyhow::anyhow!("Cancel order failed: {e}"));
350
351            if let Err(e) = result {
352                let ts_event = clock.get_time_ns();
353                emitter.emit_order_cancel_rejected_event(
354                    command.strategy_id,
355                    command.instrument_id,
356                    command.client_order_id,
357                    command.venue_order_id,
358                    &format!("cancel-order-error: {e}"),
359                    ts_event,
360                );
361                return Err(e);
362            }
363
364            Ok(())
365        });
366
367        Ok(())
368    }
369
370    fn mass_cancel_instrument(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
371        let ws_private = self.ws_private.clone();
372        self.spawn_task("mass_cancel_orders", async move {
373            ws_private.mass_cancel_orders(instrument_id).await?;
374            Ok(())
375        });
376        Ok(())
377    }
378
379    fn spawn_task<F>(&self, description: &'static str, fut: F)
380    where
381        F: Future<Output = anyhow::Result<()>> + Send + 'static,
382    {
383        let runtime = get_runtime();
384        let handle = runtime.spawn(async move {
385            if let Err(e) = fut.await {
386                log::warn!("{description} failed: {e:?}");
387            }
388        });
389
390        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
391        tasks.retain(|handle| !handle.is_finished());
392        tasks.push(handle);
393    }
394
395    fn abort_pending_tasks(&self) {
396        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
397        for handle in tasks.drain(..) {
398            handle.abort();
399        }
400    }
401
402    /// Polls the cache until the account is registered or timeout is reached.
403    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
404        let account_id = self.core.account_id;
405
406        if self.core.cache().account(&account_id).is_some() {
407            log::info!("Account {account_id} registered");
408            return Ok(());
409        }
410
411        let start = Instant::now();
412        let timeout = Duration::from_secs_f64(timeout_secs);
413        let interval = Duration::from_millis(10);
414
415        loop {
416            tokio::time::sleep(interval).await;
417
418            if self.core.cache().account(&account_id).is_some() {
419                log::info!("Account {account_id} registered");
420                return Ok(());
421            }
422
423            if start.elapsed() >= timeout {
424                anyhow::bail!(
425                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
426                );
427            }
428        }
429    }
430}
431
432#[async_trait(?Send)]
433impl ExecutionClient for OKXExecutionClient {
434    fn is_connected(&self) -> bool {
435        self.core.is_connected()
436    }
437
438    fn client_id(&self) -> ClientId {
439        self.core.client_id
440    }
441
442    fn account_id(&self) -> AccountId {
443        self.core.account_id
444    }
445
446    fn venue(&self) -> Venue {
447        *OKX_VENUE
448    }
449
450    fn oms_type(&self) -> OmsType {
451        self.core.oms_type
452    }
453
454    fn get_account(&self) -> Option<AccountAny> {
455        self.core.cache().account(&self.core.account_id).cloned()
456    }
457
458    async fn connect(&mut self) -> anyhow::Result<()> {
459        if self.core.is_connected() {
460            return Ok(());
461        }
462
463        let instrument_types = self.instrument_types();
464
465        if !self.core.instruments_initialized() {
466            let mut all_instruments = Vec::new();
467            let mut all_inst_id_codes = Vec::new();
468
469            for instrument_type in &instrument_types {
470                let (instruments, inst_id_codes) = self
471                    .http_client
472                    .request_instruments(*instrument_type, None)
473                    .await
474                    .with_context(|| {
475                        format!("failed to request OKX instruments for {instrument_type:?}")
476                    })?;
477
478                if instruments.is_empty() {
479                    log::warn!("No instruments returned for {instrument_type:?}");
480                    continue;
481                }
482
483                log::info!(
484                    "Loaded {} {instrument_type:?} instruments",
485                    instruments.len()
486                );
487
488                self.http_client.cache_instruments(instruments.clone());
489                all_instruments.extend(instruments);
490                all_inst_id_codes.extend(inst_id_codes);
491            }
492
493            if !all_instruments.is_empty() {
494                self.ws_private.cache_instruments(all_instruments);
495                self.ws_private.cache_inst_id_codes(all_inst_id_codes);
496            }
497            self.core.set_instruments_initialized();
498        }
499
500        self.ws_private.connect().await?;
501        self.ws_private.wait_until_active(10.0).await?;
502        log::info!("Connected to private WebSocket");
503
504        if self.ws_stream_handle.is_none() {
505            let stream = self.ws_private.stream();
506            let emitter = self.emitter.clone();
507            let handle = get_runtime().spawn(async move {
508                pin_mut!(stream);
509                while let Some(message) = stream.next().await {
510                    dispatch_ws_message(message, &emitter);
511                }
512            });
513            self.ws_stream_handle = Some(handle);
514        }
515
516        self.ws_business.connect().await?;
517        self.ws_business.wait_until_active(10.0).await?;
518        log::info!("Connected to business WebSocket");
519
520        if self.ws_business_stream_handle.is_none() {
521            let stream = self.ws_business.stream();
522            let emitter = self.emitter.clone();
523            let handle = get_runtime().spawn(async move {
524                pin_mut!(stream);
525                while let Some(message) = stream.next().await {
526                    dispatch_ws_message(message, &emitter);
527                }
528            });
529            self.ws_business_stream_handle = Some(handle);
530        }
531
532        for inst_type in &instrument_types {
533            log::info!("Subscribing to orders channel for {inst_type:?}");
534            self.ws_private.subscribe_orders(*inst_type).await?;
535
536            if self.config.use_fills_channel {
537                log::info!("Subscribing to fills channel for {inst_type:?}");
538                if let Err(e) = self.ws_private.subscribe_fills(*inst_type).await {
539                    log::warn!("Failed to subscribe to fills channel ({inst_type:?}): {e}");
540                }
541            }
542        }
543
544        self.ws_private.subscribe_account().await?;
545
546        // Subscribe to algo orders on business WebSocket (OKX requires this endpoint)
547        for inst_type in &instrument_types {
548            if *inst_type != OKXInstrumentType::Option {
549                self.ws_business.subscribe_orders_algo(*inst_type).await?;
550            }
551        }
552
553        let account_state = self
554            .http_client
555            .request_account_state(self.core.account_id)
556            .await
557            .context("failed to request OKX account state")?;
558
559        if !account_state.balances.is_empty() {
560            log::info!(
561                "Received account state with {} balance(s)",
562                account_state.balances.len()
563            );
564        }
565        self.emitter.send_account_state(account_state);
566
567        // Wait for account to be registered in cache before completing connect
568        self.await_account_registered(30.0).await?;
569
570        self.core.set_connected();
571        log::info!("Connected: client_id={}", self.core.client_id);
572        Ok(())
573    }
574
575    async fn disconnect(&mut self) -> anyhow::Result<()> {
576        if self.core.is_disconnected() {
577            return Ok(());
578        }
579
580        self.abort_pending_tasks();
581        self.http_client.cancel_all_requests();
582
583        if let Err(e) = self.ws_private.close().await {
584            log::warn!("Error closing private websocket: {e:?}");
585        }
586
587        if let Err(e) = self.ws_business.close().await {
588            log::warn!("Error closing business websocket: {e:?}");
589        }
590
591        if let Some(handle) = self.ws_stream_handle.take() {
592            handle.abort();
593        }
594
595        if let Some(handle) = self.ws_business_stream_handle.take() {
596            handle.abort();
597        }
598
599        self.core.set_disconnected();
600        log::info!("Disconnected: client_id={}", self.core.client_id);
601        Ok(())
602    }
603
604    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
605        self.update_account_state()
606    }
607
608    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
609        log::debug!(
610            "query_order not implemented for OKX execution client (client_order_id={})",
611            cmd.client_order_id
612        );
613        Ok(())
614    }
615
616    fn generate_account_state(
617        &self,
618        balances: Vec<AccountBalance>,
619        margins: Vec<MarginBalance>,
620        reported: bool,
621        ts_event: UnixNanos,
622    ) -> anyhow::Result<()> {
623        self.emitter
624            .emit_account_state(balances, margins, reported, ts_event);
625        Ok(())
626    }
627
628    fn start(&mut self) -> anyhow::Result<()> {
629        if self.core.is_started() {
630            return Ok(());
631        }
632
633        let sender = get_exec_event_sender();
634        self.emitter.set_sender(sender);
635        self.core.set_started();
636
637        // Spawn instrument bootstrap task
638        let http_client = self.http_client.clone();
639        let ws_private = self.ws_private.clone();
640        let instrument_types = self.config.instrument_types.clone();
641
642        get_runtime().spawn(async move {
643            let mut all_instruments = Vec::new();
644            let mut all_inst_id_codes = Vec::new();
645
646            for instrument_type in instrument_types {
647                match http_client.request_instruments(instrument_type, None).await {
648                    Ok((instruments, inst_id_codes)) => {
649                        if instruments.is_empty() {
650                            log::warn!("No instruments returned for {instrument_type:?}");
651                            continue;
652                        }
653                        http_client.cache_instruments(instruments.clone());
654                        all_instruments.extend(instruments);
655                        all_inst_id_codes.extend(inst_id_codes);
656                    }
657                    Err(e) => {
658                        log::error!("Failed to request instruments for {instrument_type:?}: {e}");
659                    }
660                }
661            }
662
663            if all_instruments.is_empty() {
664                log::warn!(
665                    "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
666                );
667            } else {
668                ws_private.cache_instruments(all_instruments);
669                ws_private.cache_inst_id_codes(all_inst_id_codes);
670                log::info!("Instruments initialized");
671            }
672        });
673
674        log::info!(
675            "Started: client_id={}, account_id={}, account_type={:?}, trade_mode={:?}, instrument_types={:?}, use_fills_channel={}, is_demo={}, http_proxy_url={:?}, ws_proxy_url={:?}",
676            self.core.client_id,
677            self.core.account_id,
678            self.core.account_type,
679            self.trade_mode,
680            self.config.instrument_types,
681            self.config.use_fills_channel,
682            self.config.is_demo,
683            self.config.http_proxy_url,
684            self.config.ws_proxy_url,
685        );
686        Ok(())
687    }
688
689    fn stop(&mut self) -> anyhow::Result<()> {
690        if self.core.is_stopped() {
691            return Ok(());
692        }
693
694        self.core.set_stopped();
695        self.core.set_disconnected();
696        if let Some(handle) = self.ws_stream_handle.take() {
697            handle.abort();
698        }
699        self.abort_pending_tasks();
700        log::info!("Stopped: client_id={}", self.core.client_id);
701        Ok(())
702    }
703
704    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
705        let order_type = {
706            let cache = self.core.cache();
707            let order = cache
708                .order(&cmd.client_order_id)
709                .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
710
711            if order.is_closed() {
712                log::warn!("Cannot submit closed order {}", order.client_order_id());
713                return Ok(());
714            }
715
716            log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
717            self.emitter.emit_order_submitted(order);
718
719            order.order_type()
720        };
721
722        if self.is_conditional_order(order_type) {
723            self.submit_conditional_order(cmd)
724        } else {
725            self.submit_regular_order(cmd)
726        }
727    }
728
729    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
730        log::warn!(
731            "submit_order_list not yet implemented for OKX execution client (got {} orders)",
732            cmd.order_list.client_order_ids.len()
733        );
734        Ok(())
735    }
736
737    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
738        let ws_private = self.ws_private.clone();
739        let command = cmd.clone();
740
741        let emitter = self.emitter.clone();
742        let clock = self.clock;
743
744        self.spawn_task("modify_order", async move {
745            let result = ws_private
746                .modify_order(
747                    command.trader_id,
748                    command.strategy_id,
749                    command.instrument_id,
750                    Some(command.client_order_id),
751                    command.price,
752                    command.quantity,
753                    command.venue_order_id,
754                )
755                .await
756                .map_err(|e| anyhow::anyhow!("Modify order failed: {e}"));
757
758            if let Err(e) = result {
759                let ts_event = clock.get_time_ns();
760                emitter.emit_order_modify_rejected_event(
761                    command.strategy_id,
762                    command.instrument_id,
763                    command.client_order_id,
764                    command.venue_order_id,
765                    &format!("modify-order-error: {e}"),
766                    ts_event,
767                );
768                return Err(e);
769            }
770
771            Ok(())
772        });
773
774        Ok(())
775    }
776
777    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
778        self.cancel_ws_order(cmd)
779    }
780
781    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
782        if self.config.use_mm_mass_cancel {
783            // Use OKX's mass-cancel endpoint (requires market maker permissions)
784            self.mass_cancel_instrument(cmd.instrument_id)
785        } else {
786            // Cancel orders via batch cancel (works for all users)
787            let cache = self.core.cache();
788            let open_orders = cache.orders_open(None, Some(&cmd.instrument_id), None, None, None);
789
790            if open_orders.is_empty() {
791                log::debug!("No open orders to cancel for {}", cmd.instrument_id);
792                return Ok(());
793            }
794
795            let mut regular_payload = Vec::new();
796            let mut algo_orders: Vec<(
797                InstrumentId,
798                ClientOrderId,
799                Option<VenueOrderId>,
800                TraderId,
801                StrategyId,
802            )> = Vec::new();
803
804            for order in &open_orders {
805                // Triggered stop orders become regular orders on OKX
806                let is_pending_algo = self.is_conditional_order(order.order_type())
807                    && order.is_triggered() != Some(true);
808
809                if is_pending_algo {
810                    algo_orders.push((
811                        order.instrument_id(),
812                        order.client_order_id(),
813                        order.venue_order_id(),
814                        order.trader_id(),
815                        order.strategy_id(),
816                    ));
817                } else {
818                    regular_payload.push((
819                        order.instrument_id(),
820                        Some(order.client_order_id()),
821                        order.venue_order_id(),
822                    ));
823                }
824            }
825            drop(cache);
826
827            log::debug!(
828                "Canceling {} regular orders and {} algo orders for {}",
829                regular_payload.len(),
830                algo_orders.len(),
831                cmd.instrument_id
832            );
833
834            if !regular_payload.is_empty() {
835                let ws_private = self.ws_private.clone();
836                self.spawn_task("batch_cancel_orders", async move {
837                    ws_private.batch_cancel_orders(regular_payload).await?;
838                    Ok(())
839                });
840            }
841
842            // OKX doesn't support algo cancel via private WebSocket, must use HTTP
843            if !algo_orders.is_empty() {
844                let http_client = self.http_client.clone();
845                let requests: Vec<OKXCancelAlgoOrderRequest> = algo_orders
846                    .into_iter()
847                    .map(
848                        |(
849                            instrument_id,
850                            client_order_id,
851                            venue_order_id,
852                            _trader_id,
853                            _strategy_id,
854                        )| {
855                            OKXCancelAlgoOrderRequest {
856                                inst_id: instrument_id.symbol.to_string(),
857                                inst_id_code: None,
858                                algo_id: venue_order_id.map(|id| id.to_string()),
859                                algo_cl_ord_id: if venue_order_id.is_none() {
860                                    Some(client_order_id.to_string())
861                                } else {
862                                    None
863                                },
864                            }
865                        },
866                    )
867                    .collect();
868
869                self.spawn_task("cancel_algo_orders", async move {
870                    http_client.cancel_algo_orders(requests).await?;
871                    Ok(())
872                });
873            }
874
875            Ok(())
876        }
877    }
878
879    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
880        let cache = self.core.cache();
881
882        let mut regular_payload = Vec::new();
883        let mut algo_orders = Vec::new();
884
885        for cancel in &cmd.cancels {
886            // Triggered stop orders become regular orders on OKX
887            let is_pending_algo = cache.order(&cancel.client_order_id).is_some_and(|o| {
888                self.is_conditional_order(o.order_type()) && o.is_triggered() != Some(true)
889            });
890
891            if is_pending_algo {
892                algo_orders.push(cancel.clone());
893            } else {
894                regular_payload.push((
895                    cancel.instrument_id,
896                    Some(cancel.client_order_id),
897                    cancel.venue_order_id,
898                ));
899            }
900        }
901        drop(cache);
902
903        if !regular_payload.is_empty() {
904            let ws_private = self.ws_private.clone();
905            self.spawn_task("batch_cancel_orders", async move {
906                ws_private.batch_cancel_orders(regular_payload).await?;
907                Ok(())
908            });
909        }
910
911        // OKX doesn't support algo cancel via private WebSocket, must use HTTP
912        if !algo_orders.is_empty() {
913            let http_client = self.http_client.clone();
914            let requests: Vec<OKXCancelAlgoOrderRequest> = algo_orders
915                .into_iter()
916                .map(|cancel| OKXCancelAlgoOrderRequest {
917                    inst_id: cancel.instrument_id.symbol.to_string(),
918                    inst_id_code: None,
919                    algo_id: cancel.venue_order_id.map(|id| id.to_string()),
920                    algo_cl_ord_id: if cancel.venue_order_id.is_none() {
921                        Some(cancel.client_order_id.to_string())
922                    } else {
923                        None
924                    },
925                })
926                .collect();
927
928            self.spawn_task("cancel_algo_orders", async move {
929                http_client.cancel_algo_orders(requests).await?;
930                Ok(())
931            });
932        }
933
934        Ok(())
935    }
936
937    async fn generate_order_status_report(
938        &self,
939        cmd: &GenerateOrderStatusReport,
940    ) -> anyhow::Result<Option<OrderStatusReport>> {
941        let Some(instrument_id) = cmd.instrument_id else {
942            log::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
943            return Ok(None);
944        };
945
946        let mut reports = self
947            .http_client
948            .request_order_status_reports(
949                self.core.account_id,
950                None,
951                Some(instrument_id),
952                None,
953                None,
954                false,
955                None,
956            )
957            .await?;
958
959        if let Some(client_order_id) = cmd.client_order_id {
960            reports.retain(|report| report.client_order_id == Some(client_order_id));
961        }
962
963        if let Some(venue_order_id) = cmd.venue_order_id {
964            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
965        }
966
967        Ok(reports.into_iter().next())
968    }
969
970    async fn generate_order_status_reports(
971        &self,
972        cmd: &GenerateOrderStatusReports,
973    ) -> anyhow::Result<Vec<OrderStatusReport>> {
974        let mut reports = Vec::new();
975
976        if let Some(instrument_id) = cmd.instrument_id {
977            let mut fetched = self
978                .http_client
979                .request_order_status_reports(
980                    self.core.account_id,
981                    None,
982                    Some(instrument_id),
983                    None,
984                    None,
985                    false,
986                    None,
987                )
988                .await?;
989            reports.append(&mut fetched);
990        } else {
991            for inst_type in self.instrument_types() {
992                let mut fetched = self
993                    .http_client
994                    .request_order_status_reports(
995                        self.core.account_id,
996                        Some(inst_type),
997                        None,
998                        None,
999                        None,
1000                        false,
1001                        None,
1002                    )
1003                    .await?;
1004                reports.append(&mut fetched);
1005            }
1006        }
1007
1008        // Filter by open_only if specified
1009        if cmd.open_only {
1010            reports.retain(|r| r.order_status.is_open());
1011        }
1012
1013        // Filter by time range if specified
1014        if let Some(start) = cmd.start {
1015            reports.retain(|r| r.ts_last >= start);
1016        }
1017        if let Some(end) = cmd.end {
1018            reports.retain(|r| r.ts_last <= end);
1019        }
1020
1021        Ok(reports)
1022    }
1023
1024    async fn generate_fill_reports(
1025        &self,
1026        cmd: GenerateFillReports,
1027    ) -> anyhow::Result<Vec<FillReport>> {
1028        let start_dt = nanos_to_datetime(cmd.start);
1029        let end_dt = nanos_to_datetime(cmd.end);
1030        let mut reports = Vec::new();
1031
1032        if let Some(instrument_id) = cmd.instrument_id {
1033            let mut fetched = self
1034                .http_client
1035                .request_fill_reports(
1036                    self.core.account_id,
1037                    None,
1038                    Some(instrument_id),
1039                    start_dt,
1040                    end_dt,
1041                    None,
1042                )
1043                .await?;
1044            reports.append(&mut fetched);
1045        } else {
1046            for inst_type in self.instrument_types() {
1047                let mut fetched = self
1048                    .http_client
1049                    .request_fill_reports(
1050                        self.core.account_id,
1051                        Some(inst_type),
1052                        None,
1053                        start_dt,
1054                        end_dt,
1055                        None,
1056                    )
1057                    .await?;
1058                reports.append(&mut fetched);
1059            }
1060        }
1061
1062        if let Some(venue_order_id) = cmd.venue_order_id {
1063            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
1064        }
1065
1066        Ok(reports)
1067    }
1068
1069    async fn generate_position_status_reports(
1070        &self,
1071        cmd: &GeneratePositionStatusReports,
1072    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1073        let mut reports = Vec::new();
1074
1075        // Query derivative positions (SWAP/FUTURES/OPTION) from /api/v5/account/positions
1076        // Note: The positions endpoint does not support Spot or Margin - those are handled separately
1077        if let Some(instrument_id) = cmd.instrument_id {
1078            let mut fetched = self
1079                .http_client
1080                .request_position_status_reports(self.core.account_id, None, Some(instrument_id))
1081                .await?;
1082            reports.append(&mut fetched);
1083        } else {
1084            for inst_type in self.instrument_types() {
1085                // Skip Spot and Margin - positions API only supports derivatives
1086                if inst_type == OKXInstrumentType::Spot || inst_type == OKXInstrumentType::Margin {
1087                    continue;
1088                }
1089                let mut fetched = self
1090                    .http_client
1091                    .request_position_status_reports(self.core.account_id, Some(inst_type), None)
1092                    .await?;
1093                reports.append(&mut fetched);
1094            }
1095        }
1096
1097        // Query spot margin positions from /api/v5/account/balance
1098        // Spot margin positions appear as balance sheet items (liab/spotInUseAmt fields)
1099        let mut margin_reports = self
1100            .http_client
1101            .request_spot_margin_position_reports(self.core.account_id)
1102            .await?;
1103
1104        if let Some(instrument_id) = cmd.instrument_id {
1105            margin_reports.retain(|report| report.instrument_id == instrument_id);
1106        }
1107
1108        reports.append(&mut margin_reports);
1109
1110        let _ = nanos_to_datetime(cmd.start);
1111        let _ = nanos_to_datetime(cmd.end);
1112
1113        Ok(reports)
1114    }
1115
1116    async fn generate_mass_status(
1117        &self,
1118        lookback_mins: Option<u64>,
1119    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1120        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
1121
1122        let ts_now = self.clock.get_time_ns();
1123
1124        let start = lookback_mins.map(|mins| {
1125            let lookback_ns = mins * 60 * 1_000_000_000;
1126            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
1127        });
1128
1129        let order_cmd = GenerateOrderStatusReportsBuilder::default()
1130            .ts_init(ts_now)
1131            .open_only(false) // get all orders for mass status
1132            .start(start)
1133            .build()
1134            .map_err(|e| anyhow::anyhow!("{e}"))?;
1135
1136        let fill_cmd = GenerateFillReportsBuilder::default()
1137            .ts_init(ts_now)
1138            .start(start)
1139            .build()
1140            .map_err(|e| anyhow::anyhow!("{e}"))?;
1141
1142        let position_cmd = GeneratePositionStatusReportsBuilder::default()
1143            .ts_init(ts_now)
1144            .start(start)
1145            .build()
1146            .map_err(|e| anyhow::anyhow!("{e}"))?;
1147
1148        let (order_reports, fill_reports, position_reports) = tokio::try_join!(
1149            self.generate_order_status_reports(&order_cmd),
1150            self.generate_fill_reports(fill_cmd),
1151            self.generate_position_status_reports(&position_cmd),
1152        )?;
1153
1154        log::info!("Received {} OrderStatusReports", order_reports.len());
1155        log::info!("Received {} FillReports", fill_reports.len());
1156        log::info!("Received {} PositionReports", position_reports.len());
1157
1158        let mut mass_status = ExecutionMassStatus::new(
1159            self.core.client_id,
1160            self.core.account_id,
1161            *OKX_VENUE,
1162            ts_now,
1163            None,
1164        );
1165
1166        mass_status.add_order_reports(order_reports);
1167        mass_status.add_fill_reports(fill_reports);
1168        mass_status.add_position_reports(position_reports);
1169
1170        Ok(Some(mass_status))
1171    }
1172}
1173
1174/// Dispatches a WebSocket message using the event emitter.
1175fn dispatch_ws_message(message: NautilusWsMessage, emitter: &ExecutionEventEmitter) {
1176    match message {
1177        NautilusWsMessage::AccountUpdate(state) => {
1178            emitter.send_account_state(state);
1179        }
1180        NautilusWsMessage::PositionUpdate(report) => {
1181            emitter.send_position_report(report);
1182        }
1183        NautilusWsMessage::ExecutionReports(reports) => {
1184            log::debug!("Processing {} execution report(s)", reports.len());
1185            for report in reports {
1186                match report {
1187                    ExecutionReport::Order(order_report) => {
1188                        emitter.send_order_status_report(order_report);
1189                    }
1190                    ExecutionReport::Fill(fill_report) => {
1191                        emitter.send_fill_report(fill_report);
1192                    }
1193                }
1194            }
1195        }
1196        NautilusWsMessage::OrderAccepted(event) => {
1197            emitter.send_order_event(OrderEventAny::Accepted(event));
1198        }
1199        NautilusWsMessage::OrderCanceled(event) => {
1200            emitter.send_order_event(OrderEventAny::Canceled(event));
1201        }
1202        NautilusWsMessage::OrderExpired(event) => {
1203            emitter.send_order_event(OrderEventAny::Expired(event));
1204        }
1205        NautilusWsMessage::OrderRejected(event) => {
1206            emitter.send_order_event(OrderEventAny::Rejected(event));
1207        }
1208        NautilusWsMessage::OrderCancelRejected(event) => {
1209            emitter.send_order_event(OrderEventAny::CancelRejected(event));
1210        }
1211        NautilusWsMessage::OrderModifyRejected(event) => {
1212            emitter.send_order_event(OrderEventAny::ModifyRejected(event));
1213        }
1214        NautilusWsMessage::OrderTriggered(event) => {
1215            emitter.send_order_event(OrderEventAny::Triggered(event));
1216        }
1217        NautilusWsMessage::OrderUpdated(event) => {
1218            emitter.send_order_event(OrderEventAny::Updated(event));
1219        }
1220        NautilusWsMessage::Error(e) => {
1221            log::warn!(
1222                "Websocket error: code={} message={} conn_id={:?}",
1223                e.code,
1224                e.message,
1225                e.conn_id
1226            );
1227        }
1228        NautilusWsMessage::Reconnected => {
1229            log::info!("Websocket reconnected");
1230        }
1231        NautilusWsMessage::Authenticated => {
1232            log::debug!("Websocket authenticated");
1233        }
1234        NautilusWsMessage::Deltas(_)
1235        | NautilusWsMessage::Raw(_)
1236        | NautilusWsMessage::Data(_)
1237        | NautilusWsMessage::FundingRates(_)
1238        | NautilusWsMessage::Instrument(_) => {
1239            log::debug!("Ignoring websocket data message");
1240        }
1241    }
1242}
1243
1244fn nanos_to_datetime(value: Option<UnixNanos>) -> Option<DateTime<Utc>> {
1245    value.map(|nanos| nanos.to_datetime_utc())
1246}
1247
1248#[cfg(test)]
1249mod tests {
1250    use nautilus_common::messages::execution::{BatchCancelOrders, CancelOrder};
1251    use nautilus_core::UnixNanos;
1252    use nautilus_model::identifiers::{
1253        ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId,
1254    };
1255    use rstest::rstest;
1256
1257    #[rstest]
1258    fn test_batch_cancel_orders_builds_payload() {
1259        let trader_id = TraderId::from("TRADER-001");
1260        let strategy_id = StrategyId::from("STRATEGY-001");
1261        let client_id = Some(ClientId::from("OKX"));
1262        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1263        let client_order_id1 = ClientOrderId::new("order1");
1264        let client_order_id2 = ClientOrderId::new("order2");
1265        let venue_order_id1 = VenueOrderId::new("venue1");
1266        let venue_order_id2 = VenueOrderId::new("venue2");
1267
1268        let cmd = BatchCancelOrders {
1269            trader_id,
1270            client_id,
1271            strategy_id,
1272            instrument_id,
1273            cancels: vec![
1274                CancelOrder {
1275                    trader_id,
1276                    client_id,
1277                    strategy_id,
1278                    instrument_id,
1279                    client_order_id: client_order_id1,
1280                    venue_order_id: Some(venue_order_id1),
1281                    command_id: Default::default(),
1282                    ts_init: UnixNanos::default(),
1283                    params: None,
1284                },
1285                CancelOrder {
1286                    trader_id,
1287                    client_id,
1288                    strategy_id,
1289                    instrument_id,
1290                    client_order_id: client_order_id2,
1291                    venue_order_id: Some(venue_order_id2),
1292                    command_id: Default::default(),
1293                    ts_init: UnixNanos::default(),
1294                    params: None,
1295                },
1296            ],
1297            command_id: Default::default(),
1298            ts_init: UnixNanos::default(),
1299            params: None,
1300        };
1301
1302        // Verify we can build the payload structure
1303        let mut payload = Vec::with_capacity(cmd.cancels.len());
1304        for cancel in &cmd.cancels {
1305            payload.push((
1306                cancel.instrument_id,
1307                Some(cancel.client_order_id),
1308                cancel.venue_order_id,
1309            ));
1310        }
1311
1312        assert_eq!(payload.len(), 2);
1313        assert_eq!(payload[0].0, instrument_id);
1314        assert_eq!(payload[0].1, Some(client_order_id1));
1315        assert_eq!(payload[0].2, Some(venue_order_id1));
1316        assert_eq!(payload[1].0, instrument_id);
1317        assert_eq!(payload[1].1, Some(client_order_id2));
1318        assert_eq!(payload[1].2, Some(venue_order_id2));
1319    }
1320
1321    #[rstest]
1322    fn test_batch_cancel_orders_with_empty_cancels() {
1323        let cmd = BatchCancelOrders {
1324            trader_id: TraderId::from("TRADER-001"),
1325            client_id: Some(ClientId::from("OKX")),
1326            strategy_id: StrategyId::from("STRATEGY-001"),
1327            instrument_id: InstrumentId::from("BTC-USDT.OKX"),
1328            cancels: vec![],
1329            command_id: Default::default(),
1330            ts_init: UnixNanos::default(),
1331            params: None,
1332        };
1333
1334        let payload: Vec<(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>)> =
1335            Vec::with_capacity(cmd.cancels.len());
1336        assert_eq!(payload.len(), 0);
1337    }
1338}