nautilus_okx/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the OKX adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Mutex,
22        atomic::{AtomicBool, Ordering},
23    },
24    time::{Duration, Instant},
25};
26
27use anyhow::Context;
28use async_trait::async_trait;
29use chrono::{DateTime, Utc};
30use futures_util::{StreamExt, pin_mut};
31use nautilus_common::{
32    clients::ExecutionClient,
33    live::{runner::get_exec_event_sender, runtime::get_runtime},
34    messages::{
35        ExecutionEvent, ExecutionReport as NautilusExecutionReport,
36        execution::{
37            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
38            GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
39            ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
40        },
41    },
42};
43use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos, time::get_atomic_clock_realtime};
44use nautilus_live::ExecutionClientCore;
45use nautilus_model::{
46    accounts::AccountAny,
47    enums::{AccountType, OmsType, OrderType},
48    events::{
49        AccountState, OrderCancelRejected, OrderEventAny, OrderModifyRejected, OrderRejected,
50        OrderSubmitted,
51    },
52    identifiers::{AccountId, ClientId, InstrumentId, Venue},
53    orders::Order,
54    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
55    types::{AccountBalance, MarginBalance},
56};
57use tokio::task::JoinHandle;
58
59use crate::{
60    common::{
61        consts::{OKX_CONDITIONAL_ORDER_TYPES, OKX_VENUE},
62        enums::{OKXInstrumentType, OKXMarginMode, OKXTradeMode},
63    },
64    config::OKXExecClientConfig,
65    http::client::OKXHttpClient,
66    websocket::{
67        client::OKXWebSocketClient,
68        messages::{ExecutionReport, NautilusWsMessage},
69    },
70};
71
72#[derive(Debug)]
73pub struct OKXExecutionClient {
74    core: ExecutionClientCore,
75    config: OKXExecClientConfig,
76    http_client: OKXHttpClient,
77    ws_private: OKXWebSocketClient,
78    ws_business: OKXWebSocketClient,
79    trade_mode: OKXTradeMode,
80    exec_event_sender: Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>,
81    started: bool,
82    connected: AtomicBool,
83    instruments_initialized: AtomicBool,
84    ws_stream_handle: Option<JoinHandle<()>>,
85    ws_business_stream_handle: Option<JoinHandle<()>>,
86    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
87}
88
89impl OKXExecutionClient {
90    /// Creates a new [`OKXExecutionClient`].
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if the client fails to initialize.
95    pub fn new(core: ExecutionClientCore, config: OKXExecClientConfig) -> anyhow::Result<Self> {
96        // Always use with_credentials which loads from env vars when config values are None
97        let http_client = OKXHttpClient::with_credentials(
98            config.api_key.clone(),
99            config.api_secret.clone(),
100            config.api_passphrase.clone(),
101            config.base_url_http.clone(),
102            config.http_timeout_secs,
103            config.max_retries,
104            config.retry_delay_initial_ms,
105            config.retry_delay_max_ms,
106            config.is_demo,
107            config.http_proxy_url.clone(),
108        )?;
109
110        let account_id = core.account_id;
111        let ws_private = OKXWebSocketClient::with_credentials(
112            Some(config.ws_private_url()),
113            config.api_key.clone(),
114            config.api_secret.clone(),
115            config.api_passphrase.clone(),
116            Some(account_id),
117            Some(20), // Heartbeat
118        )
119        .context("failed to construct OKX private websocket client")?;
120
121        let ws_business = OKXWebSocketClient::with_credentials(
122            Some(config.ws_business_url()),
123            config.api_key.clone(),
124            config.api_secret.clone(),
125            config.api_passphrase.clone(),
126            Some(account_id),
127            Some(20), // Heartbeat
128        )
129        .context("failed to construct OKX business websocket client")?;
130
131        let trade_mode = Self::derive_trade_mode(core.account_type, &config);
132
133        Ok(Self {
134            core,
135            config,
136            http_client,
137            ws_private,
138            ws_business,
139            trade_mode,
140            exec_event_sender: None,
141            started: false,
142            connected: AtomicBool::new(false),
143            instruments_initialized: AtomicBool::new(false),
144            ws_stream_handle: None,
145            ws_business_stream_handle: None,
146            pending_tasks: Mutex::new(Vec::new()),
147        })
148    }
149
150    fn derive_trade_mode(account_type: AccountType, config: &OKXExecClientConfig) -> OKXTradeMode {
151        let is_cross_margin = config.margin_mode == Some(OKXMarginMode::Cross);
152
153        if account_type == AccountType::Cash {
154            if !config.use_spot_margin {
155                return OKXTradeMode::Cash;
156            }
157            return if is_cross_margin {
158                OKXTradeMode::Cross
159            } else {
160                OKXTradeMode::Isolated
161            };
162        }
163
164        if is_cross_margin {
165            OKXTradeMode::Cross
166        } else {
167            OKXTradeMode::Isolated
168        }
169    }
170
171    fn instrument_types(&self) -> Vec<OKXInstrumentType> {
172        if self.config.instrument_types.is_empty() {
173            vec![OKXInstrumentType::Spot]
174        } else {
175            self.config.instrument_types.clone()
176        }
177    }
178
179    async fn refresh_account_state(&self) -> anyhow::Result<()> {
180        let account_state = self
181            .http_client
182            .request_account_state(self.core.account_id)
183            .await
184            .context("failed to request OKX account state")?;
185
186        self.core.generate_account_state(
187            account_state.balances.clone(),
188            account_state.margins.clone(),
189            account_state.is_reported,
190            account_state.ts_event,
191        )
192    }
193
194    fn update_account_state(&self) -> anyhow::Result<()> {
195        let runtime = get_runtime();
196        runtime.block_on(self.refresh_account_state())
197    }
198
199    fn is_conditional_order(&self, order_type: OrderType) -> bool {
200        OKX_CONDITIONAL_ORDER_TYPES.contains(&order_type)
201    }
202
203    fn submit_regular_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
204        let order = cmd.order.clone();
205        let ws_private = self.ws_private.clone();
206        let trade_mode = self.trade_mode;
207
208        let exec_event_sender = self.exec_event_sender.clone();
209        let trader_id = self.core.trader_id;
210        let account_id = self.core.account_id;
211        let ts_init = cmd.ts_init;
212        let client_order_id = order.client_order_id();
213        let strategy_id = order.strategy_id();
214        let instrument_id = order.instrument_id();
215        let order_side = order.order_side();
216        let order_type = order.order_type();
217        let quantity = order.quantity();
218        let time_in_force = order.time_in_force();
219        let price = order.price();
220        let trigger_price = order.trigger_price();
221        let is_post_only = order.is_post_only();
222        let is_reduce_only = order.is_reduce_only();
223        let is_quote_quantity = order.is_quote_quantity();
224
225        self.spawn_task("submit_order", async move {
226            let result = ws_private
227                .submit_order(
228                    trader_id,
229                    strategy_id,
230                    instrument_id,
231                    trade_mode,
232                    client_order_id,
233                    order_side,
234                    order_type,
235                    quantity,
236                    Some(time_in_force),
237                    price,
238                    trigger_price,
239                    Some(is_post_only),
240                    Some(is_reduce_only),
241                    Some(is_quote_quantity),
242                    None,
243                )
244                .await
245                .map_err(|e| anyhow::anyhow!("Submit order failed: {e}"));
246
247            if let Err(e) = result {
248                let rejected_event = OrderRejected::new(
249                    trader_id,
250                    strategy_id,
251                    instrument_id,
252                    client_order_id,
253                    account_id,
254                    format!("submit-order-error: {e}").into(),
255                    UUID4::new(),
256                    ts_init,
257                    get_atomic_clock_realtime().get_time_ns(),
258                    false,
259                    false,
260                );
261
262                if let Some(sender) = &exec_event_sender {
263                    if let Err(send_err) = sender.send(ExecutionEvent::Order(
264                        OrderEventAny::Rejected(rejected_event),
265                    )) {
266                        log::warn!("Failed to send OrderRejected event: {send_err}");
267                    }
268                } else {
269                    log::warn!("Cannot send OrderRejected: exec_event_sender not initialized");
270                }
271
272                return Err(e);
273            }
274
275            Ok(())
276        });
277
278        Ok(())
279    }
280
281    fn submit_conditional_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
282        let order = cmd.order.clone();
283        let trigger_price = order
284            .trigger_price()
285            .ok_or_else(|| anyhow::anyhow!("conditional order requires a trigger price"))?;
286        let http_client = self.http_client.clone();
287        let trade_mode = self.trade_mode;
288
289        let exec_event_sender = self.exec_event_sender.clone();
290        let trader_id = self.core.trader_id;
291        let account_id = self.core.account_id;
292        let ts_init = cmd.ts_init;
293        let client_order_id = order.client_order_id();
294        let strategy_id = order.strategy_id();
295        let instrument_id = order.instrument_id();
296        let order_side = order.order_side();
297        let order_type = order.order_type();
298        let quantity = order.quantity();
299        let trigger_type = order.trigger_type();
300        let price = order.price();
301        let is_reduce_only = order.is_reduce_only();
302
303        self.spawn_task("submit_algo_order", async move {
304            let result = http_client
305                .place_algo_order_with_domain_types(
306                    instrument_id,
307                    trade_mode,
308                    client_order_id,
309                    order_side,
310                    order_type,
311                    quantity,
312                    trigger_price,
313                    trigger_type,
314                    price,
315                    Some(is_reduce_only),
316                )
317                .await
318                .map_err(|e| anyhow::anyhow!("Submit algo order failed: {e}"));
319
320            if let Err(e) = result {
321                let rejected_event = OrderRejected::new(
322                    trader_id,
323                    strategy_id,
324                    instrument_id,
325                    client_order_id,
326                    account_id,
327                    format!("submit-order-error: {e}").into(),
328                    UUID4::new(),
329                    ts_init,
330                    get_atomic_clock_realtime().get_time_ns(),
331                    false,
332                    false,
333                );
334
335                if let Some(sender) = &exec_event_sender {
336                    if let Err(send_err) = sender.send(ExecutionEvent::Order(
337                        OrderEventAny::Rejected(rejected_event),
338                    )) {
339                        log::warn!("Failed to send OrderRejected event: {send_err}");
340                    }
341                } else {
342                    log::warn!("Cannot send OrderRejected: exec_event_sender not initialized");
343                }
344
345                return Err(e);
346            }
347
348            Ok(())
349        });
350
351        Ok(())
352    }
353
354    fn cancel_ws_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
355        let ws_private = self.ws_private.clone();
356        let command = cmd.clone();
357
358        let exec_event_sender = self.exec_event_sender.clone();
359        let trader_id = self.core.trader_id;
360        let account_id = self.core.account_id;
361        let ts_init = cmd.ts_init;
362
363        self.spawn_task("cancel_order", async move {
364            let result = ws_private
365                .cancel_order(
366                    command.trader_id,
367                    command.strategy_id,
368                    command.instrument_id,
369                    Some(command.client_order_id),
370                    command.venue_order_id,
371                )
372                .await
373                .map_err(|e| anyhow::anyhow!("Cancel order failed: {e}"));
374
375            if let Err(e) = result {
376                let rejected_event = OrderCancelRejected::new(
377                    trader_id,
378                    command.strategy_id,
379                    command.instrument_id,
380                    command.client_order_id,
381                    format!("cancel-order-error: {e}").into(),
382                    UUID4::new(),
383                    get_atomic_clock_realtime().get_time_ns(),
384                    ts_init,
385                    false,
386                    command.venue_order_id,
387                    Some(account_id),
388                );
389
390                if let Some(sender) = &exec_event_sender {
391                    if let Err(send_err) = sender.send(ExecutionEvent::Order(
392                        OrderEventAny::CancelRejected(rejected_event),
393                    )) {
394                        log::warn!("Failed to send OrderCancelRejected event: {send_err}");
395                    }
396                } else {
397                    log::warn!(
398                        "Cannot send OrderCancelRejected: exec_event_sender not initialized"
399                    );
400                }
401
402                return Err(e);
403            }
404
405            Ok(())
406        });
407
408        Ok(())
409    }
410
411    fn mass_cancel_instrument(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
412        let ws_private = self.ws_private.clone();
413        self.spawn_task("mass_cancel_orders", async move {
414            ws_private.mass_cancel_orders(instrument_id).await?;
415            Ok(())
416        });
417        Ok(())
418    }
419
420    fn spawn_task<F>(&self, description: &'static str, fut: F)
421    where
422        F: Future<Output = anyhow::Result<()>> + Send + 'static,
423    {
424        let runtime = get_runtime();
425        let handle = runtime.spawn(async move {
426            if let Err(e) = fut.await {
427                log::warn!("{description} failed: {e:?}");
428            }
429        });
430
431        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
432        tasks.retain(|handle| !handle.is_finished());
433        tasks.push(handle);
434    }
435
436    fn abort_pending_tasks(&self) {
437        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
438        for handle in tasks.drain(..) {
439            handle.abort();
440        }
441    }
442
443    /// Polls the cache until the account is registered or timeout is reached.
444    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
445        let account_id = self.core.account_id;
446
447        if self.core.cache().borrow().account(&account_id).is_some() {
448            log::info!("Account {account_id} registered");
449            return Ok(());
450        }
451
452        let start = Instant::now();
453        let timeout = Duration::from_secs_f64(timeout_secs);
454        let interval = Duration::from_millis(10);
455
456        loop {
457            tokio::time::sleep(interval).await;
458
459            if self.core.cache().borrow().account(&account_id).is_some() {
460                log::info!("Account {account_id} registered");
461                return Ok(());
462            }
463
464            if start.elapsed() >= timeout {
465                anyhow::bail!(
466                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
467                );
468            }
469        }
470    }
471}
472
473#[async_trait(?Send)]
474impl ExecutionClient for OKXExecutionClient {
475    fn is_connected(&self) -> bool {
476        self.connected.load(Ordering::Acquire)
477    }
478
479    fn client_id(&self) -> ClientId {
480        self.core.client_id
481    }
482
483    fn account_id(&self) -> AccountId {
484        self.core.account_id
485    }
486
487    fn venue(&self) -> Venue {
488        *OKX_VENUE
489    }
490
491    fn oms_type(&self) -> OmsType {
492        self.core.oms_type
493    }
494
495    fn get_account(&self) -> Option<AccountAny> {
496        self.core.get_account()
497    }
498
499    async fn connect(&mut self) -> anyhow::Result<()> {
500        if self.connected.load(Ordering::Acquire) {
501            return Ok(());
502        }
503
504        // Initialize exec event sender (must be done in async context after runner is set up)
505        if self.exec_event_sender.is_none() {
506            self.exec_event_sender = Some(get_exec_event_sender());
507        }
508
509        let instrument_types = self.instrument_types();
510
511        if !self.instruments_initialized.load(Ordering::Acquire) {
512            let mut all_instruments = Vec::new();
513            for instrument_type in &instrument_types {
514                let instruments = self
515                    .http_client
516                    .request_instruments(*instrument_type, None)
517                    .await
518                    .with_context(|| {
519                        format!("failed to request OKX instruments for {instrument_type:?}")
520                    })?;
521
522                if instruments.is_empty() {
523                    log::warn!("No instruments returned for {instrument_type:?}");
524                    continue;
525                }
526
527                log::info!(
528                    "Loaded {} {instrument_type:?} instruments",
529                    instruments.len()
530                );
531
532                self.http_client.cache_instruments(instruments.clone());
533                all_instruments.extend(instruments);
534            }
535
536            // Add instruments to Nautilus Cache for reconciliation
537            {
538                let mut cache = self.core.cache().borrow_mut();
539                for instrument in &all_instruments {
540                    if let Err(e) = cache.add_instrument(instrument.clone()) {
541                        log::debug!("Instrument already in cache: {e}");
542                    }
543                }
544            }
545
546            if !all_instruments.is_empty() {
547                self.ws_private.cache_instruments(all_instruments);
548            }
549            self.instruments_initialized.store(true, Ordering::Release);
550        }
551
552        let Some(sender) = self.exec_event_sender.as_ref() else {
553            log::error!("Execution event sender not initialized");
554            anyhow::bail!("Execution event sender not initialized");
555        };
556
557        self.ws_private.connect().await?;
558        self.ws_private.wait_until_active(10.0).await?;
559        log::info!("Connected to private WebSocket");
560
561        if self.ws_stream_handle.is_none() {
562            let stream = self.ws_private.stream();
563            let sender = sender.clone();
564            let handle = get_runtime().spawn(async move {
565                pin_mut!(stream);
566                while let Some(message) = stream.next().await {
567                    dispatch_ws_message(message, &sender);
568                }
569            });
570            self.ws_stream_handle = Some(handle);
571        }
572
573        self.ws_business.connect().await?;
574        self.ws_business.wait_until_active(10.0).await?;
575        log::info!("Connected to business WebSocket");
576
577        if self.ws_business_stream_handle.is_none() {
578            let stream = self.ws_business.stream();
579            let sender = sender.clone();
580            let handle = get_runtime().spawn(async move {
581                pin_mut!(stream);
582                while let Some(message) = stream.next().await {
583                    dispatch_ws_message(message, &sender);
584                }
585            });
586            self.ws_business_stream_handle = Some(handle);
587        }
588
589        for inst_type in &instrument_types {
590            log::info!("Subscribing to orders channel for {inst_type:?}");
591            self.ws_private.subscribe_orders(*inst_type).await?;
592
593            if self.config.use_fills_channel {
594                log::info!("Subscribing to fills channel for {inst_type:?}");
595                if let Err(e) = self.ws_private.subscribe_fills(*inst_type).await {
596                    log::warn!("Failed to subscribe to fills channel ({inst_type:?}): {e}");
597                }
598            }
599        }
600
601        self.ws_private.subscribe_account().await?;
602
603        // Subscribe to algo orders on business WebSocket (OKX requires this endpoint)
604        for inst_type in &instrument_types {
605            if *inst_type != OKXInstrumentType::Option {
606                self.ws_business.subscribe_orders_algo(*inst_type).await?;
607            }
608        }
609
610        let account_state = self
611            .http_client
612            .request_account_state(self.core.account_id)
613            .await
614            .context("failed to request OKX account state")?;
615
616        if !account_state.balances.is_empty() {
617            log::info!(
618                "Received account state with {} balance(s)",
619                account_state.balances.len()
620            );
621        }
622        dispatch_account_state(account_state, sender);
623
624        // Wait for account to be registered in cache before completing connect
625        self.await_account_registered(30.0).await?;
626
627        self.connected.store(true, Ordering::Release);
628        log::info!("Connected: client_id={}", self.core.client_id);
629        Ok(())
630    }
631
632    async fn disconnect(&mut self) -> anyhow::Result<()> {
633        if !self.connected.load(Ordering::Acquire) {
634            return Ok(());
635        }
636
637        self.abort_pending_tasks();
638        self.http_client.cancel_all_requests();
639
640        if let Err(e) = self.ws_private.close().await {
641            log::warn!("Error closing private websocket: {e:?}");
642        }
643
644        if let Err(e) = self.ws_business.close().await {
645            log::warn!("Error closing business websocket: {e:?}");
646        }
647
648        if let Some(handle) = self.ws_stream_handle.take() {
649            handle.abort();
650        }
651
652        if let Some(handle) = self.ws_business_stream_handle.take() {
653            handle.abort();
654        }
655
656        self.connected.store(false, Ordering::Release);
657        log::info!("Disconnected: client_id={}", self.core.client_id);
658        Ok(())
659    }
660
661    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
662        self.update_account_state()
663    }
664
665    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
666        log::debug!(
667            "query_order not implemented for OKX execution client (client_order_id={})",
668            cmd.client_order_id
669        );
670        Ok(())
671    }
672
673    fn generate_account_state(
674        &self,
675        balances: Vec<AccountBalance>,
676        margins: Vec<MarginBalance>,
677        reported: bool,
678        ts_event: UnixNanos,
679    ) -> anyhow::Result<()> {
680        self.core
681            .generate_account_state(balances, margins, reported, ts_event)
682    }
683
684    fn start(&mut self) -> anyhow::Result<()> {
685        if self.started {
686            return Ok(());
687        }
688
689        self.started = true;
690
691        // Spawn instrument bootstrap task
692        let http_client = self.http_client.clone();
693        let ws_private = self.ws_private.clone();
694        let instrument_types = self.config.instrument_types.clone();
695
696        get_runtime().spawn(async move {
697            let mut all_instruments = Vec::new();
698            for instrument_type in instrument_types {
699                match http_client.request_instruments(instrument_type, None).await {
700                    Ok(instruments) => {
701                        if instruments.is_empty() {
702                            log::warn!("No instruments returned for {instrument_type:?}");
703                            continue;
704                        }
705                        http_client.cache_instruments(instruments.clone());
706                        all_instruments.extend(instruments);
707                    }
708                    Err(e) => {
709                        log::error!("Failed to request instruments for {instrument_type:?}: {e}");
710                    }
711                }
712            }
713
714            if all_instruments.is_empty() {
715                log::warn!(
716                    "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
717                );
718            } else {
719                ws_private.cache_instruments(all_instruments);
720                log::info!("Instruments initialized");
721            }
722        });
723
724        log::info!(
725            "Started: client_id={}, account_id={}, account_type={:?}, trade_mode={:?}, instrument_types={:?}, use_fills_channel={}, is_demo={}, http_proxy_url={:?}, ws_proxy_url={:?}",
726            self.core.client_id,
727            self.core.account_id,
728            self.core.account_type,
729            self.trade_mode,
730            self.config.instrument_types,
731            self.config.use_fills_channel,
732            self.config.is_demo,
733            self.config.http_proxy_url,
734            self.config.ws_proxy_url,
735        );
736        Ok(())
737    }
738
739    fn stop(&mut self) -> anyhow::Result<()> {
740        if !self.started {
741            return Ok(());
742        }
743
744        self.started = false;
745        self.connected.store(false, Ordering::Release);
746        if let Some(handle) = self.ws_stream_handle.take() {
747            handle.abort();
748        }
749        self.abort_pending_tasks();
750        log::info!("Stopped: client_id={}", self.core.client_id);
751        Ok(())
752    }
753
754    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
755        let order = &cmd.order;
756
757        if order.is_closed() {
758            let client_order_id = order.client_order_id();
759            log::warn!("Cannot submit closed order {client_order_id}");
760            return Ok(());
761        }
762
763        let event = OrderSubmitted::new(
764            self.core.trader_id,
765            order.strategy_id(),
766            order.instrument_id(),
767            order.client_order_id(),
768            self.core.account_id,
769            UUID4::new(),
770            cmd.ts_init,
771            get_atomic_clock_realtime().get_time_ns(),
772        );
773        if let Some(sender) = &self.exec_event_sender {
774            log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
775            if let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::Submitted(event))) {
776                log::warn!("Failed to send OrderSubmitted event: {e}");
777            }
778        } else {
779            log::warn!("Cannot send OrderSubmitted: exec_event_sender not initialized");
780        }
781
782        if self.is_conditional_order(order.order_type()) {
783            self.submit_conditional_order(cmd)
784        } else {
785            self.submit_regular_order(cmd)
786        }
787    }
788
789    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
790        log::warn!(
791            "submit_order_list not yet implemented for OKX execution client (got {} orders)",
792            cmd.order_list.orders.len()
793        );
794        Ok(())
795    }
796
797    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
798        let ws_private = self.ws_private.clone();
799        let command = cmd.clone();
800
801        // Capture for error handling
802        let exec_event_sender = self.exec_event_sender.clone();
803        let trader_id = self.core.trader_id;
804        let account_id = self.core.account_id;
805        let ts_init = cmd.ts_init;
806
807        self.spawn_task("modify_order", async move {
808            let result = ws_private
809                .modify_order(
810                    command.trader_id,
811                    command.strategy_id,
812                    command.instrument_id,
813                    Some(command.client_order_id),
814                    command.price,
815                    command.quantity,
816                    command.venue_order_id,
817                )
818                .await
819                .map_err(|e| anyhow::anyhow!("Modify order failed: {e}"));
820
821            if let Err(e) = result {
822                let rejected_event = OrderModifyRejected::new(
823                    trader_id,
824                    command.strategy_id,
825                    command.instrument_id,
826                    command.client_order_id,
827                    format!("modify-order-error: {e}").into(),
828                    UUID4::new(),
829                    get_atomic_clock_realtime().get_time_ns(),
830                    ts_init,
831                    false,
832                    command.venue_order_id,
833                    Some(account_id),
834                );
835
836                if let Some(sender) = &exec_event_sender {
837                    if let Err(send_err) = sender.send(ExecutionEvent::Order(
838                        OrderEventAny::ModifyRejected(rejected_event),
839                    )) {
840                        log::warn!("Failed to send OrderModifyRejected event: {send_err}");
841                    }
842                } else {
843                    log::warn!(
844                        "Cannot send OrderModifyRejected: exec_event_sender not initialized"
845                    );
846                }
847
848                return Err(e);
849            }
850
851            Ok(())
852        });
853
854        Ok(())
855    }
856
857    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
858        self.cancel_ws_order(cmd)
859    }
860
861    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
862        if self.config.use_mm_mass_cancel {
863            // Use OKX's mass-cancel endpoint (requires market maker permissions)
864            self.mass_cancel_instrument(cmd.instrument_id)
865        } else {
866            // Cancel orders individually via batch cancel (works for all users)
867            let cache = self.core.cache().borrow();
868            let open_orders = cache.orders_open(None, Some(&cmd.instrument_id), None, None);
869
870            if open_orders.is_empty() {
871                log::debug!("No open orders to cancel for {}", cmd.instrument_id);
872                return Ok(());
873            }
874
875            let mut payload = Vec::with_capacity(open_orders.len());
876            for order in open_orders {
877                payload.push((
878                    order.instrument_id(),
879                    Some(order.client_order_id()),
880                    order.venue_order_id(),
881                ));
882            }
883            drop(cache);
884
885            log::debug!(
886                "Canceling {} open orders for {} via batch cancel",
887                payload.len(),
888                cmd.instrument_id
889            );
890
891            let ws_private = self.ws_private.clone();
892            self.spawn_task("batch_cancel_orders", async move {
893                ws_private.batch_cancel_orders(payload).await?;
894                Ok(())
895            });
896
897            Ok(())
898        }
899    }
900
901    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
902        let mut payload = Vec::with_capacity(cmd.cancels.len());
903
904        for cancel in &cmd.cancels {
905            payload.push((
906                cancel.instrument_id,
907                Some(cancel.client_order_id),
908                cancel.venue_order_id,
909            ));
910        }
911
912        let ws_private = self.ws_private.clone();
913        self.spawn_task("batch_cancel_orders", async move {
914            ws_private.batch_cancel_orders(payload).await?;
915            Ok(())
916        });
917
918        Ok(())
919    }
920
921    async fn generate_order_status_report(
922        &self,
923        cmd: &GenerateOrderStatusReport,
924    ) -> anyhow::Result<Option<OrderStatusReport>> {
925        let Some(instrument_id) = cmd.instrument_id else {
926            log::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
927            return Ok(None);
928        };
929
930        let mut reports = self
931            .http_client
932            .request_order_status_reports(
933                self.core.account_id,
934                None,
935                Some(instrument_id),
936                None,
937                None,
938                false,
939                None,
940            )
941            .await?;
942
943        if let Some(client_order_id) = cmd.client_order_id {
944            reports.retain(|report| report.client_order_id == Some(client_order_id));
945        }
946
947        if let Some(venue_order_id) = cmd.venue_order_id {
948            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
949        }
950
951        Ok(reports.into_iter().next())
952    }
953
954    async fn generate_order_status_reports(
955        &self,
956        cmd: &GenerateOrderStatusReports,
957    ) -> anyhow::Result<Vec<OrderStatusReport>> {
958        let mut reports = Vec::new();
959
960        if let Some(instrument_id) = cmd.instrument_id {
961            let mut fetched = self
962                .http_client
963                .request_order_status_reports(
964                    self.core.account_id,
965                    None,
966                    Some(instrument_id),
967                    None,
968                    None,
969                    false,
970                    None,
971                )
972                .await?;
973            reports.append(&mut fetched);
974        } else {
975            for inst_type in self.instrument_types() {
976                let mut fetched = self
977                    .http_client
978                    .request_order_status_reports(
979                        self.core.account_id,
980                        Some(inst_type),
981                        None,
982                        None,
983                        None,
984                        false,
985                        None,
986                    )
987                    .await?;
988                reports.append(&mut fetched);
989            }
990        }
991
992        // Filter by open_only if specified
993        if cmd.open_only {
994            reports.retain(|r| r.order_status.is_open());
995        }
996
997        // Filter by time range if specified
998        if let Some(start) = cmd.start {
999            reports.retain(|r| r.ts_last >= start);
1000        }
1001        if let Some(end) = cmd.end {
1002            reports.retain(|r| r.ts_last <= end);
1003        }
1004
1005        Ok(reports)
1006    }
1007
1008    async fn generate_fill_reports(
1009        &self,
1010        cmd: GenerateFillReports,
1011    ) -> anyhow::Result<Vec<FillReport>> {
1012        let start_dt = nanos_to_datetime(cmd.start);
1013        let end_dt = nanos_to_datetime(cmd.end);
1014        let mut reports = Vec::new();
1015
1016        if let Some(instrument_id) = cmd.instrument_id {
1017            let mut fetched = self
1018                .http_client
1019                .request_fill_reports(
1020                    self.core.account_id,
1021                    None,
1022                    Some(instrument_id),
1023                    start_dt,
1024                    end_dt,
1025                    None,
1026                )
1027                .await?;
1028            reports.append(&mut fetched);
1029        } else {
1030            for inst_type in self.instrument_types() {
1031                let mut fetched = self
1032                    .http_client
1033                    .request_fill_reports(
1034                        self.core.account_id,
1035                        Some(inst_type),
1036                        None,
1037                        start_dt,
1038                        end_dt,
1039                        None,
1040                    )
1041                    .await?;
1042                reports.append(&mut fetched);
1043            }
1044        }
1045
1046        if let Some(venue_order_id) = cmd.venue_order_id {
1047            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
1048        }
1049
1050        Ok(reports)
1051    }
1052
1053    async fn generate_position_status_reports(
1054        &self,
1055        cmd: &GeneratePositionStatusReports,
1056    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1057        let mut reports = Vec::new();
1058
1059        // Query derivative positions (SWAP/FUTURES/OPTION) from /api/v5/account/positions
1060        // Note: The positions endpoint does not support Spot or Margin - those are handled separately
1061        if let Some(instrument_id) = cmd.instrument_id {
1062            let mut fetched = self
1063                .http_client
1064                .request_position_status_reports(self.core.account_id, None, Some(instrument_id))
1065                .await?;
1066            reports.append(&mut fetched);
1067        } else {
1068            for inst_type in self.instrument_types() {
1069                // Skip Spot and Margin - positions API only supports derivatives
1070                if inst_type == OKXInstrumentType::Spot || inst_type == OKXInstrumentType::Margin {
1071                    continue;
1072                }
1073                let mut fetched = self
1074                    .http_client
1075                    .request_position_status_reports(self.core.account_id, Some(inst_type), None)
1076                    .await?;
1077                reports.append(&mut fetched);
1078            }
1079        }
1080
1081        // Query spot margin positions from /api/v5/account/balance
1082        // Spot margin positions appear as balance sheet items (liab/spotInUseAmt fields)
1083        let mut margin_reports = self
1084            .http_client
1085            .request_spot_margin_position_reports(self.core.account_id)
1086            .await?;
1087
1088        if let Some(instrument_id) = cmd.instrument_id {
1089            margin_reports.retain(|report| report.instrument_id == instrument_id);
1090        }
1091
1092        reports.append(&mut margin_reports);
1093
1094        let _ = nanos_to_datetime(cmd.start);
1095        let _ = nanos_to_datetime(cmd.end);
1096
1097        Ok(reports)
1098    }
1099
1100    async fn generate_mass_status(
1101        &self,
1102        lookback_mins: Option<u64>,
1103    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1104        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
1105
1106        let ts_now = get_atomic_clock_realtime().get_time_ns();
1107
1108        let start = lookback_mins.map(|mins| {
1109            let lookback_ns = mins * 60 * 1_000_000_000;
1110            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
1111        });
1112
1113        let order_cmd = GenerateOrderStatusReports::new(
1114            UUID4::new(),
1115            ts_now,
1116            false, // open_only - get all orders for mass status
1117            None,  // instrument_id
1118            start, // start
1119            None,  // end
1120            None,  // params
1121            None,  // correlation_id
1122        );
1123
1124        let fill_cmd = GenerateFillReports::new(
1125            UUID4::new(),
1126            ts_now,
1127            None, // instrument_id
1128            None, // venue_order_id
1129            start,
1130            None, // end
1131            None, // params
1132            None, // correlation_id
1133        );
1134
1135        let position_cmd = GeneratePositionStatusReports::new(
1136            UUID4::new(),
1137            ts_now,
1138            None, // instrument_id
1139            start,
1140            None, // end
1141            None, // params
1142            None, // correlation_id
1143        );
1144
1145        let (order_reports, fill_reports, position_reports) = tokio::try_join!(
1146            self.generate_order_status_reports(&order_cmd),
1147            self.generate_fill_reports(fill_cmd),
1148            self.generate_position_status_reports(&position_cmd),
1149        )?;
1150
1151        log::info!("Received {} OrderStatusReports", order_reports.len());
1152        log::info!("Received {} FillReports", fill_reports.len());
1153        log::info!("Received {} PositionReports", position_reports.len());
1154
1155        let mut mass_status = ExecutionMassStatus::new(
1156            self.core.client_id,
1157            self.core.account_id,
1158            *OKX_VENUE,
1159            ts_now,
1160            None,
1161        );
1162
1163        mass_status.add_order_reports(order_reports);
1164        mass_status.add_fill_reports(fill_reports);
1165        mass_status.add_position_reports(position_reports);
1166
1167        Ok(Some(mass_status))
1168    }
1169}
1170
1171fn dispatch_ws_message(
1172    message: NautilusWsMessage,
1173    sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1174) {
1175    match message {
1176        NautilusWsMessage::AccountUpdate(state) => dispatch_account_state(state, sender),
1177        NautilusWsMessage::PositionUpdate(report) => {
1178            dispatch_position_status_report(report, sender);
1179        }
1180        NautilusWsMessage::ExecutionReports(reports) => {
1181            log::debug!("Processing {} execution report(s)", reports.len());
1182            for report in reports {
1183                dispatch_execution_report(report, sender);
1184            }
1185        }
1186        NautilusWsMessage::OrderAccepted(event) => {
1187            dispatch_order_event(OrderEventAny::Accepted(event), sender);
1188        }
1189        NautilusWsMessage::OrderCanceled(event) => {
1190            dispatch_order_event(OrderEventAny::Canceled(event), sender);
1191        }
1192        NautilusWsMessage::OrderExpired(event) => {
1193            dispatch_order_event(OrderEventAny::Expired(event), sender);
1194        }
1195        NautilusWsMessage::OrderRejected(event) => {
1196            dispatch_order_event(OrderEventAny::Rejected(event), sender);
1197        }
1198        NautilusWsMessage::OrderCancelRejected(event) => {
1199            dispatch_order_event(OrderEventAny::CancelRejected(event), sender);
1200        }
1201        NautilusWsMessage::OrderModifyRejected(event) => {
1202            dispatch_order_event(OrderEventAny::ModifyRejected(event), sender);
1203        }
1204        NautilusWsMessage::OrderTriggered(event) => {
1205            dispatch_order_event(OrderEventAny::Triggered(event), sender);
1206        }
1207        NautilusWsMessage::OrderUpdated(event) => {
1208            dispatch_order_event(OrderEventAny::Updated(event), sender);
1209        }
1210        NautilusWsMessage::Error(e) => {
1211            log::warn!(
1212                "Websocket error: code={} message={} conn_id={:?}",
1213                e.code,
1214                e.message,
1215                e.conn_id
1216            );
1217        }
1218        NautilusWsMessage::Reconnected => {
1219            log::info!("Websocket reconnected");
1220        }
1221        NautilusWsMessage::Authenticated => {
1222            log::debug!("Websocket authenticated");
1223        }
1224        NautilusWsMessage::Deltas(_)
1225        | NautilusWsMessage::Raw(_)
1226        | NautilusWsMessage::Data(_)
1227        | NautilusWsMessage::FundingRates(_)
1228        | NautilusWsMessage::Instrument(_) => {
1229            log::debug!("Ignoring websocket data message");
1230        }
1231    }
1232}
1233
1234fn dispatch_account_state(
1235    state: AccountState,
1236    sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1237) {
1238    if let Err(e) = sender.send(ExecutionEvent::Account(state)) {
1239        log::warn!("Failed to send account state: {e}");
1240    }
1241}
1242
1243fn dispatch_position_status_report(
1244    report: PositionStatusReport,
1245    sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1246) {
1247    let exec_report = NautilusExecutionReport::Position(Box::new(report));
1248    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1249        log::warn!("Failed to send position status report: {e}");
1250    }
1251}
1252
1253fn dispatch_execution_report(
1254    report: ExecutionReport,
1255    sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1256) {
1257    match report {
1258        ExecutionReport::Order(order_report) => {
1259            let exec_report = NautilusExecutionReport::Order(Box::new(order_report));
1260            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1261                log::warn!("Failed to send order status report: {e}");
1262            }
1263        }
1264        ExecutionReport::Fill(fill_report) => {
1265            let exec_report = NautilusExecutionReport::Fill(Box::new(fill_report));
1266            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1267                log::warn!("Failed to send fill report: {e}");
1268            }
1269        }
1270    }
1271}
1272
1273fn dispatch_order_event(
1274    event: OrderEventAny,
1275    sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1276) {
1277    if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
1278        log::warn!("Failed to send order event: {e}");
1279    }
1280}
1281
1282fn nanos_to_datetime(value: Option<UnixNanos>) -> Option<DateTime<Utc>> {
1283    value.map(|nanos| nanos.to_datetime_utc())
1284}
1285
1286#[cfg(test)]
1287mod tests {
1288    use nautilus_common::messages::execution::{BatchCancelOrders, CancelOrder};
1289    use nautilus_core::UnixNanos;
1290    use nautilus_model::identifiers::{
1291        ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId,
1292    };
1293    use rstest::rstest;
1294
1295    #[rstest]
1296    fn test_batch_cancel_orders_builds_payload() {
1297        let trader_id = TraderId::from("TRADER-001");
1298        let strategy_id = StrategyId::from("STRATEGY-001");
1299        let client_id = Some(ClientId::from("OKX"));
1300        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1301        let client_order_id1 = ClientOrderId::new("order1");
1302        let client_order_id2 = ClientOrderId::new("order2");
1303        let venue_order_id1 = VenueOrderId::new("venue1");
1304        let venue_order_id2 = VenueOrderId::new("venue2");
1305
1306        let cmd = BatchCancelOrders {
1307            trader_id,
1308            client_id,
1309            strategy_id,
1310            instrument_id,
1311            cancels: vec![
1312                CancelOrder {
1313                    trader_id,
1314                    client_id,
1315                    strategy_id,
1316                    instrument_id,
1317                    client_order_id: client_order_id1,
1318                    venue_order_id: Some(venue_order_id1),
1319                    command_id: Default::default(),
1320                    ts_init: UnixNanos::default(),
1321                    params: None,
1322                },
1323                CancelOrder {
1324                    trader_id,
1325                    client_id,
1326                    strategy_id,
1327                    instrument_id,
1328                    client_order_id: client_order_id2,
1329                    venue_order_id: Some(venue_order_id2),
1330                    command_id: Default::default(),
1331                    ts_init: UnixNanos::default(),
1332                    params: None,
1333                },
1334            ],
1335            command_id: Default::default(),
1336            ts_init: UnixNanos::default(),
1337            params: None,
1338        };
1339
1340        // Verify we can build the payload structure
1341        let mut payload = Vec::with_capacity(cmd.cancels.len());
1342        for cancel in &cmd.cancels {
1343            payload.push((
1344                cancel.instrument_id,
1345                Some(cancel.client_order_id),
1346                cancel.venue_order_id,
1347            ));
1348        }
1349
1350        assert_eq!(payload.len(), 2);
1351        assert_eq!(payload[0].0, instrument_id);
1352        assert_eq!(payload[0].1, Some(client_order_id1));
1353        assert_eq!(payload[0].2, Some(venue_order_id1));
1354        assert_eq!(payload[1].0, instrument_id);
1355        assert_eq!(payload[1].1, Some(client_order_id2));
1356        assert_eq!(payload[1].2, Some(venue_order_id2));
1357    }
1358
1359    #[rstest]
1360    fn test_batch_cancel_orders_with_empty_cancels() {
1361        let cmd = BatchCancelOrders {
1362            trader_id: TraderId::from("TRADER-001"),
1363            client_id: Some(ClientId::from("OKX")),
1364            strategy_id: StrategyId::from("STRATEGY-001"),
1365            instrument_id: InstrumentId::from("BTC-USDT.OKX"),
1366            cancels: vec![],
1367            command_id: Default::default(),
1368            ts_init: UnixNanos::default(),
1369            params: None,
1370        };
1371
1372        let payload: Vec<(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>)> =
1373            Vec::with_capacity(cmd.cancels.len());
1374        assert_eq!(payload.len(), 0);
1375    }
1376}