nautilus_okx/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the OKX adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Mutex,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use anyhow::Context;
27use async_trait::async_trait;
28use chrono::{DateTime, Utc};
29use futures_util::{StreamExt, pin_mut};
30use nautilus_common::{
31    live::{runner::get_exec_event_sender, runtime::get_runtime},
32    messages::{
33        ExecutionEvent, ExecutionReport as NautilusExecutionReport,
34        execution::{
35            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
36            GenerateOrderStatusReport, GeneratePositionReports, ModifyOrder, QueryAccount,
37            QueryOrder, SubmitOrder, SubmitOrderList,
38        },
39    },
40};
41use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos, time::get_atomic_clock_realtime};
42use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
43use nautilus_live::execution::client::LiveExecutionClient;
44use nautilus_model::{
45    accounts::AccountAny,
46    enums::{AccountType, OmsType, OrderType},
47    events::{AccountState, OrderEventAny, OrderRejected, OrderSubmitted},
48    identifiers::{AccountId, ClientId, InstrumentId, Venue},
49    orders::Order,
50    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
51    types::{AccountBalance, MarginBalance},
52};
53use tokio::task::JoinHandle;
54
55use crate::{
56    common::{
57        consts::{OKX_CONDITIONAL_ORDER_TYPES, OKX_VENUE},
58        enums::{OKXInstrumentType, OKXMarginMode, OKXTradeMode},
59    },
60    config::OKXExecClientConfig,
61    http::client::OKXHttpClient,
62    websocket::{
63        client::OKXWebSocketClient,
64        messages::{ExecutionReport, NautilusWsMessage},
65    },
66};
67
68#[derive(Debug)]
69pub struct OKXExecutionClient {
70    core: ExecutionClientCore,
71    config: OKXExecClientConfig,
72    http_client: OKXHttpClient,
73    ws_private: OKXWebSocketClient,
74    ws_business: OKXWebSocketClient,
75    trade_mode: OKXTradeMode,
76    exec_event_sender: Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>,
77    started: bool,
78    connected: AtomicBool,
79    instruments_initialized: AtomicBool,
80    ws_stream_handle: Option<JoinHandle<()>>,
81    ws_business_stream_handle: Option<JoinHandle<()>>,
82    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
83}
84
85impl OKXExecutionClient {
86    /// Creates a new [`OKXExecutionClient`].
87    ///
88    /// # Errors
89    ///
90    /// Returns an error if the client fails to initialize.
91    pub fn new(core: ExecutionClientCore, config: OKXExecClientConfig) -> anyhow::Result<Self> {
92        // Always use with_credentials which loads from env vars when config values are None
93        let http_client = OKXHttpClient::with_credentials(
94            config.api_key.clone(),
95            config.api_secret.clone(),
96            config.api_passphrase.clone(),
97            config.base_url_http.clone(),
98            config.http_timeout_secs,
99            config.max_retries,
100            config.retry_delay_initial_ms,
101            config.retry_delay_max_ms,
102            config.is_demo,
103            config.http_proxy_url.clone(),
104        )?;
105
106        let account_id = core.account_id;
107        let ws_private = OKXWebSocketClient::with_credentials(
108            Some(config.ws_private_url()),
109            config.api_key.clone(),
110            config.api_secret.clone(),
111            config.api_passphrase.clone(),
112            Some(account_id),
113            Some(20), // Heartbeat
114        )
115        .context("failed to construct OKX private websocket client")?;
116
117        let ws_business = OKXWebSocketClient::with_credentials(
118            Some(config.ws_business_url()),
119            config.api_key.clone(),
120            config.api_secret.clone(),
121            config.api_passphrase.clone(),
122            Some(account_id),
123            Some(20), // Heartbeat
124        )
125        .context("failed to construct OKX business websocket client")?;
126
127        let trade_mode = Self::derive_trade_mode(core.account_type, &config);
128
129        Ok(Self {
130            core,
131            config,
132            http_client,
133            ws_private,
134            ws_business,
135            trade_mode,
136            exec_event_sender: None,
137            started: false,
138            connected: AtomicBool::new(false),
139            instruments_initialized: AtomicBool::new(false),
140            ws_stream_handle: None,
141            ws_business_stream_handle: None,
142            pending_tasks: Mutex::new(Vec::new()),
143        })
144    }
145
146    fn derive_trade_mode(account_type: AccountType, config: &OKXExecClientConfig) -> OKXTradeMode {
147        let is_cross_margin = config.margin_mode == Some(OKXMarginMode::Cross);
148
149        if account_type == AccountType::Cash {
150            if !config.use_spot_margin {
151                return OKXTradeMode::Cash;
152            }
153            return if is_cross_margin {
154                OKXTradeMode::Cross
155            } else {
156                OKXTradeMode::Isolated
157            };
158        }
159
160        if is_cross_margin {
161            OKXTradeMode::Cross
162        } else {
163            OKXTradeMode::Isolated
164        }
165    }
166
167    fn instrument_types(&self) -> Vec<OKXInstrumentType> {
168        if self.config.instrument_types.is_empty() {
169            vec![OKXInstrumentType::Spot]
170        } else {
171            self.config.instrument_types.clone()
172        }
173    }
174
175    async fn refresh_account_state(&self) -> anyhow::Result<()> {
176        let account_state = self
177            .http_client
178            .request_account_state(self.core.account_id)
179            .await
180            .context("failed to request OKX account state")?;
181
182        self.core.generate_account_state(
183            account_state.balances.clone(),
184            account_state.margins.clone(),
185            account_state.is_reported,
186            account_state.ts_event,
187        )
188    }
189
190    fn update_account_state(&self) -> anyhow::Result<()> {
191        let runtime = get_runtime();
192        runtime.block_on(self.refresh_account_state())
193    }
194
195    fn is_conditional_order(&self, order_type: OrderType) -> bool {
196        OKX_CONDITIONAL_ORDER_TYPES.contains(&order_type)
197    }
198
199    fn submit_regular_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
200        let order = cmd.order.clone();
201        let ws_private = self.ws_private.clone();
202        let trade_mode = self.trade_mode;
203
204        self.spawn_task("submit_order", async move {
205            ws_private
206                .submit_order(
207                    order.trader_id(),
208                    order.strategy_id(),
209                    order.instrument_id(),
210                    trade_mode,
211                    order.client_order_id(),
212                    order.order_side(),
213                    order.order_type(),
214                    order.quantity(),
215                    Some(order.time_in_force()),
216                    order.price(),
217                    order.trigger_price(),
218                    Some(order.is_post_only()),
219                    Some(order.is_reduce_only()),
220                    Some(order.is_quote_quantity()),
221                    None,
222                )
223                .await?;
224            Ok(())
225        });
226
227        Ok(())
228    }
229
230    fn submit_conditional_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
231        let order = cmd.order.clone();
232        let trigger_price = order
233            .trigger_price()
234            .ok_or_else(|| anyhow::anyhow!("conditional order requires a trigger price"))?;
235        let http_client = self.http_client.clone();
236        let trade_mode = self.trade_mode;
237
238        self.spawn_task("submit_algo_order", async move {
239            http_client
240                .place_algo_order_with_domain_types(
241                    order.instrument_id(),
242                    trade_mode,
243                    order.client_order_id(),
244                    order.order_side(),
245                    order.order_type(),
246                    order.quantity(),
247                    trigger_price,
248                    order.trigger_type(),
249                    order.price(),
250                    Some(order.is_reduce_only()),
251                )
252                .await?;
253            Ok(())
254        });
255
256        Ok(())
257    }
258
259    fn cancel_ws_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
260        let ws_private = self.ws_private.clone();
261        let command = cmd.clone();
262
263        self.spawn_task("cancel_order", async move {
264            ws_private
265                .cancel_order(
266                    command.trader_id,
267                    command.strategy_id,
268                    command.instrument_id,
269                    Some(command.client_order_id),
270                    Some(command.venue_order_id),
271                )
272                .await?;
273            Ok(())
274        });
275
276        Ok(())
277    }
278
279    fn mass_cancel_instrument(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
280        let ws_private = self.ws_private.clone();
281        self.spawn_task("mass_cancel_orders", async move {
282            ws_private.mass_cancel_orders(instrument_id).await?;
283            Ok(())
284        });
285        Ok(())
286    }
287
288    fn spawn_task<F>(&self, description: &'static str, fut: F)
289    where
290        F: Future<Output = anyhow::Result<()>> + Send + 'static,
291    {
292        let runtime = get_runtime();
293        let handle = runtime.spawn(async move {
294            if let Err(e) = fut.await {
295                tracing::warn!("{description} failed: {e:?}");
296            }
297        });
298
299        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
300        tasks.retain(|handle| !handle.is_finished());
301        tasks.push(handle);
302    }
303
304    fn abort_pending_tasks(&self) {
305        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
306        for handle in tasks.drain(..) {
307            handle.abort();
308        }
309    }
310}
311
312#[async_trait(?Send)]
313impl ExecutionClient for OKXExecutionClient {
314    fn is_connected(&self) -> bool {
315        self.connected.load(Ordering::Acquire)
316    }
317
318    fn client_id(&self) -> ClientId {
319        self.core.client_id
320    }
321
322    fn account_id(&self) -> AccountId {
323        self.core.account_id
324    }
325
326    fn venue(&self) -> Venue {
327        *OKX_VENUE
328    }
329
330    fn oms_type(&self) -> OmsType {
331        self.core.oms_type
332    }
333
334    fn get_account(&self) -> Option<AccountAny> {
335        self.core.get_account()
336    }
337
338    async fn connect(&mut self) -> anyhow::Result<()> {
339        if self.connected.load(Ordering::Acquire) {
340            return Ok(());
341        }
342
343        // Initialize exec event sender (must be done in async context after runner is set up)
344        if self.exec_event_sender.is_none() {
345            self.exec_event_sender = Some(get_exec_event_sender());
346        }
347
348        let instrument_types = self.instrument_types();
349
350        if !self.instruments_initialized.load(Ordering::Acquire) {
351            let mut all_instruments = Vec::new();
352            for instrument_type in &instrument_types {
353                let instruments = self
354                    .http_client
355                    .request_instruments(*instrument_type, None)
356                    .await
357                    .with_context(|| {
358                        format!("failed to request OKX instruments for {instrument_type:?}")
359                    })?;
360
361                if instruments.is_empty() {
362                    tracing::warn!("No instruments returned for {instrument_type:?}");
363                    continue;
364                }
365
366                self.http_client.cache_instruments(instruments.clone());
367                all_instruments.extend(instruments);
368            }
369
370            if !all_instruments.is_empty() {
371                self.ws_private.cache_instruments(all_instruments);
372            }
373            self.instruments_initialized.store(true, Ordering::Release);
374        }
375
376        let Some(sender) = self.exec_event_sender.as_ref() else {
377            tracing::error!("Execution event sender not initialized");
378            anyhow::bail!("Execution event sender not initialized");
379        };
380
381        self.ws_private.connect().await?;
382        self.ws_private.wait_until_active(10.0).await?;
383
384        if self.ws_stream_handle.is_none() {
385            let stream = self.ws_private.stream();
386            let sender = sender.clone();
387            let handle = tokio::spawn(async move {
388                pin_mut!(stream);
389                while let Some(message) = stream.next().await {
390                    dispatch_ws_message(message, &sender);
391                }
392            });
393            self.ws_stream_handle = Some(handle);
394        }
395
396        self.ws_business.connect().await?;
397        self.ws_business.wait_until_active(10.0).await?;
398
399        if self.ws_business_stream_handle.is_none() {
400            let stream = self.ws_business.stream();
401            let sender = sender.clone();
402            let handle = tokio::spawn(async move {
403                pin_mut!(stream);
404                while let Some(message) = stream.next().await {
405                    dispatch_ws_message(message, &sender);
406                }
407            });
408            self.ws_business_stream_handle = Some(handle);
409        }
410
411        for inst_type in &instrument_types {
412            tracing::debug!(
413                "Subscribing to channels for instrument type: {:?}",
414                inst_type
415            );
416            self.ws_private.subscribe_orders(*inst_type).await?;
417
418            if self.config.use_fills_channel
419                && let Err(e) = self.ws_private.subscribe_fills(*inst_type).await
420            {
421                tracing::warn!("Failed to subscribe to fills channel ({inst_type:?}): {e}");
422            }
423        }
424
425        self.ws_private.subscribe_account().await?;
426
427        // Subscribe to algo orders on business WebSocket (OKX requires this endpoint)
428        for inst_type in &instrument_types {
429            if *inst_type != OKXInstrumentType::Option {
430                self.ws_business.subscribe_orders_algo(*inst_type).await?;
431            }
432        }
433
434        let account_state = self
435            .http_client
436            .request_account_state(self.core.account_id)
437            .await
438            .context("failed to request OKX account state")?;
439
440        dispatch_account_state(account_state, sender);
441
442        self.connected.store(true, Ordering::Release);
443        tracing::info!(client_id = %self.core.client_id, "Connected");
444        Ok(())
445    }
446
447    async fn disconnect(&mut self) -> anyhow::Result<()> {
448        if !self.connected.load(Ordering::Acquire) {
449            return Ok(());
450        }
451
452        self.abort_pending_tasks();
453        self.http_client.cancel_all_requests();
454
455        if let Err(e) = self.ws_private.close().await {
456            tracing::warn!("Error closing private websocket: {e:?}");
457        }
458
459        if let Err(e) = self.ws_business.close().await {
460            tracing::warn!("Error closing business websocket: {e:?}");
461        }
462
463        if let Some(handle) = self.ws_stream_handle.take() {
464            handle.abort();
465        }
466
467        if let Some(handle) = self.ws_business_stream_handle.take() {
468            handle.abort();
469        }
470
471        self.connected.store(false, Ordering::Release);
472        tracing::info!(client_id = %self.core.client_id, "Disconnected");
473        Ok(())
474    }
475
476    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
477        self.update_account_state()
478    }
479
480    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
481        tracing::debug!(
482            "query_order not implemented for OKX execution client (client_order_id={})",
483            cmd.client_order_id
484        );
485        Ok(())
486    }
487
488    fn generate_account_state(
489        &self,
490        balances: Vec<AccountBalance>,
491        margins: Vec<MarginBalance>,
492        reported: bool,
493        ts_event: UnixNanos,
494    ) -> anyhow::Result<()> {
495        self.core
496            .generate_account_state(balances, margins, reported, ts_event)
497    }
498
499    fn start(&mut self) -> anyhow::Result<()> {
500        if self.started {
501            return Ok(());
502        }
503
504        self.started = true;
505
506        // Spawn instrument bootstrap task
507        let http_client = self.http_client.clone();
508        let ws_private = self.ws_private.clone();
509        let instrument_types = self.config.instrument_types.clone();
510
511        get_runtime().spawn(async move {
512            let mut all_instruments = Vec::new();
513            for instrument_type in instrument_types {
514                match http_client.request_instruments(instrument_type, None).await {
515                    Ok(instruments) => {
516                        if instruments.is_empty() {
517                            tracing::warn!("No instruments returned for {instrument_type:?}");
518                            continue;
519                        }
520                        http_client.cache_instruments(instruments.clone());
521                        all_instruments.extend(instruments);
522                    }
523                    Err(e) => {
524                        tracing::error!(
525                            "Failed to request instruments for {instrument_type:?}: {e}"
526                        );
527                    }
528                }
529            }
530
531            if all_instruments.is_empty() {
532                tracing::warn!(
533                    "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
534                );
535            } else {
536                ws_private.cache_instruments(all_instruments);
537                tracing::info!("Instruments initialized");
538            }
539        });
540
541        tracing::info!(
542            client_id = %self.core.client_id,
543            account_id = %self.core.account_id,
544            account_type = ?self.core.account_type,
545            trade_mode = ?self.trade_mode,
546            instrument_types = ?self.config.instrument_types,
547            use_fills_channel = self.config.use_fills_channel,
548            is_demo = self.config.is_demo,
549            http_proxy_url = ?self.config.http_proxy_url,
550            ws_proxy_url = ?self.config.ws_proxy_url,
551            "Started"
552        );
553        Ok(())
554    }
555
556    fn stop(&mut self) -> anyhow::Result<()> {
557        if !self.started {
558            return Ok(());
559        }
560
561        self.started = false;
562        self.connected.store(false, Ordering::Release);
563        if let Some(handle) = self.ws_stream_handle.take() {
564            handle.abort();
565        }
566        self.abort_pending_tasks();
567        tracing::info!(client_id = %self.core.client_id, "Stopped");
568        Ok(())
569    }
570
571    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
572        let order = &cmd.order;
573
574        if order.is_closed() {
575            let client_order_id = order.client_order_id();
576            tracing::warn!("Cannot submit closed order {client_order_id}");
577            return Ok(());
578        }
579
580        let event = OrderSubmitted::new(
581            self.core.trader_id,
582            order.strategy_id(),
583            order.instrument_id(),
584            order.client_order_id(),
585            self.core.account_id,
586            UUID4::new(),
587            cmd.ts_init,
588            get_atomic_clock_realtime().get_time_ns(),
589        );
590        if let Some(sender) = &self.exec_event_sender {
591            tracing::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
592            if let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::Submitted(event))) {
593                tracing::warn!("Failed to send OrderSubmitted event: {e}");
594            }
595        } else {
596            tracing::warn!("Cannot send OrderSubmitted: exec_event_sender not initialized");
597        }
598
599        let result = if self.is_conditional_order(order.order_type()) {
600            self.submit_conditional_order(cmd)
601        } else {
602            self.submit_regular_order(cmd)
603        };
604
605        if let Err(e) = result {
606            let rejected_event = OrderRejected::new(
607                self.core.trader_id,
608                order.strategy_id(),
609                order.instrument_id(),
610                order.client_order_id(),
611                self.core.account_id,
612                format!("submit-order-error: {e}").into(),
613                UUID4::new(),
614                cmd.ts_init,
615                get_atomic_clock_realtime().get_time_ns(),
616                false,
617                false,
618            );
619            if let Some(sender) = &self.exec_event_sender {
620                if let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::Rejected(
621                    rejected_event,
622                ))) {
623                    tracing::warn!("Failed to send OrderRejected event: {e}");
624                }
625            } else {
626                tracing::warn!("Cannot send OrderRejected: exec_event_sender not initialized");
627            }
628            return Err(e);
629        }
630
631        Ok(())
632    }
633
634    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
635        tracing::warn!(
636            "submit_order_list not yet implemented for OKX execution client (got {} orders)",
637            cmd.order_list.orders.len()
638        );
639        Ok(())
640    }
641
642    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
643        let ws_private = self.ws_private.clone();
644        let command = cmd.clone();
645
646        self.spawn_task("modify_order", async move {
647            ws_private
648                .modify_order(
649                    command.trader_id,
650                    command.strategy_id,
651                    command.instrument_id,
652                    Some(command.client_order_id),
653                    command.price,
654                    command.quantity,
655                    Some(command.venue_order_id),
656                )
657                .await?;
658            Ok(())
659        });
660
661        Ok(())
662    }
663
664    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
665        self.cancel_ws_order(cmd)
666    }
667
668    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
669        if self.config.use_mm_mass_cancel {
670            // Use OKX's mass-cancel endpoint (requires market maker permissions)
671            self.mass_cancel_instrument(cmd.instrument_id)
672        } else {
673            // Cancel orders individually via batch cancel (works for all users)
674            let cache = self.core.cache().borrow();
675            let open_orders = cache.orders_open(None, Some(&cmd.instrument_id), None, None);
676
677            if open_orders.is_empty() {
678                tracing::debug!("No open orders to cancel for {}", cmd.instrument_id);
679                return Ok(());
680            }
681
682            let mut payload = Vec::with_capacity(open_orders.len());
683            for order in open_orders {
684                payload.push((
685                    order.instrument_id(),
686                    Some(order.client_order_id()),
687                    order.venue_order_id(),
688                ));
689            }
690            drop(cache);
691
692            tracing::debug!(
693                "Canceling {} open orders for {} via batch cancel",
694                payload.len(),
695                cmd.instrument_id
696            );
697
698            let ws_private = self.ws_private.clone();
699            self.spawn_task("batch_cancel_orders", async move {
700                ws_private.batch_cancel_orders(payload).await?;
701                Ok(())
702            });
703
704            Ok(())
705        }
706    }
707
708    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
709        let mut payload = Vec::with_capacity(cmd.cancels.len());
710
711        for cancel in &cmd.cancels {
712            payload.push((
713                cancel.instrument_id,
714                Some(cancel.client_order_id),
715                Some(cancel.venue_order_id),
716            ));
717        }
718
719        let ws_private = self.ws_private.clone();
720        self.spawn_task("batch_cancel_orders", async move {
721            ws_private.batch_cancel_orders(payload).await?;
722            Ok(())
723        });
724
725        Ok(())
726    }
727}
728
729#[async_trait(?Send)]
730impl LiveExecutionClient for OKXExecutionClient {
731    async fn generate_order_status_report(
732        &self,
733        cmd: &GenerateOrderStatusReport,
734    ) -> anyhow::Result<Option<OrderStatusReport>> {
735        let Some(instrument_id) = cmd.instrument_id else {
736            tracing::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
737            return Ok(None);
738        };
739
740        let mut reports = self
741            .http_client
742            .request_order_status_reports(
743                self.core.account_id,
744                None,
745                Some(instrument_id),
746                None,
747                None,
748                false,
749                None,
750            )
751            .await?;
752
753        if let Some(client_order_id) = cmd.client_order_id {
754            reports.retain(|report| report.client_order_id == Some(client_order_id));
755        }
756
757        if let Some(venue_order_id) = cmd.venue_order_id {
758            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
759        }
760
761        Ok(reports.into_iter().next())
762    }
763
764    async fn generate_order_status_reports(
765        &self,
766        cmd: &GenerateOrderStatusReport,
767    ) -> anyhow::Result<Vec<OrderStatusReport>> {
768        let mut reports = Vec::new();
769
770        if let Some(instrument_id) = cmd.instrument_id {
771            let mut fetched = self
772                .http_client
773                .request_order_status_reports(
774                    self.core.account_id,
775                    None,
776                    Some(instrument_id),
777                    None,
778                    None,
779                    false,
780                    None,
781                )
782                .await?;
783            reports.append(&mut fetched);
784        } else {
785            for inst_type in self.instrument_types() {
786                let mut fetched = self
787                    .http_client
788                    .request_order_status_reports(
789                        self.core.account_id,
790                        Some(inst_type),
791                        None,
792                        None,
793                        None,
794                        false,
795                        None,
796                    )
797                    .await?;
798                reports.append(&mut fetched);
799            }
800        }
801
802        if let Some(client_order_id) = cmd.client_order_id {
803            reports.retain(|report| report.client_order_id == Some(client_order_id));
804        }
805
806        if let Some(venue_order_id) = cmd.venue_order_id {
807            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
808        }
809
810        Ok(reports)
811    }
812
813    async fn generate_fill_reports(
814        &self,
815        cmd: GenerateFillReports,
816    ) -> anyhow::Result<Vec<FillReport>> {
817        let start_dt = nanos_to_datetime(cmd.start);
818        let end_dt = nanos_to_datetime(cmd.end);
819        let mut reports = Vec::new();
820
821        if let Some(instrument_id) = cmd.instrument_id {
822            let mut fetched = self
823                .http_client
824                .request_fill_reports(
825                    self.core.account_id,
826                    None,
827                    Some(instrument_id),
828                    start_dt,
829                    end_dt,
830                    None,
831                )
832                .await?;
833            reports.append(&mut fetched);
834        } else {
835            for inst_type in self.instrument_types() {
836                let mut fetched = self
837                    .http_client
838                    .request_fill_reports(
839                        self.core.account_id,
840                        Some(inst_type),
841                        None,
842                        start_dt,
843                        end_dt,
844                        None,
845                    )
846                    .await?;
847                reports.append(&mut fetched);
848            }
849        }
850
851        if let Some(venue_order_id) = cmd.venue_order_id {
852            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
853        }
854
855        Ok(reports)
856    }
857
858    async fn generate_position_status_reports(
859        &self,
860        cmd: &GeneratePositionReports,
861    ) -> anyhow::Result<Vec<PositionStatusReport>> {
862        let mut reports = Vec::new();
863
864        // Query derivative positions (SWAP/FUTURES/OPTION) from /api/v5/account/positions
865        if let Some(instrument_id) = cmd.instrument_id {
866            let mut fetched = self
867                .http_client
868                .request_position_status_reports(self.core.account_id, None, Some(instrument_id))
869                .await?;
870            reports.append(&mut fetched);
871        } else {
872            for inst_type in self.instrument_types() {
873                let mut fetched = self
874                    .http_client
875                    .request_position_status_reports(self.core.account_id, Some(inst_type), None)
876                    .await?;
877                reports.append(&mut fetched);
878            }
879        }
880
881        // Query spot margin positions from /api/v5/account/balance
882        // Spot margin positions appear as balance sheet items (liab/spotInUseAmt fields)
883        let mut margin_reports = self
884            .http_client
885            .request_spot_margin_position_reports(self.core.account_id)
886            .await?;
887
888        if let Some(instrument_id) = cmd.instrument_id {
889            margin_reports.retain(|report| report.instrument_id == instrument_id);
890        }
891
892        reports.append(&mut margin_reports);
893
894        let _ = nanos_to_datetime(cmd.start);
895        let _ = nanos_to_datetime(cmd.end);
896
897        Ok(reports)
898    }
899
900    async fn generate_mass_status(
901        &self,
902        lookback_mins: Option<u64>,
903    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
904        tracing::warn!(
905            "generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})"
906        );
907        Ok(None)
908    }
909}
910
911fn dispatch_ws_message(
912    message: NautilusWsMessage,
913    sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
914) {
915    match message {
916        NautilusWsMessage::AccountUpdate(state) => dispatch_account_state(state, sender),
917        NautilusWsMessage::PositionUpdate(report) => {
918            dispatch_position_status_report(report, sender);
919        }
920        NautilusWsMessage::ExecutionReports(reports) => {
921            tracing::debug!("Processing {} execution report(s)", reports.len());
922            for report in reports {
923                dispatch_execution_report(report, sender);
924            }
925        }
926        NautilusWsMessage::OrderAccepted(event) => {
927            dispatch_order_event(OrderEventAny::Accepted(event), sender);
928        }
929        NautilusWsMessage::OrderCanceled(event) => {
930            dispatch_order_event(OrderEventAny::Canceled(event), sender);
931        }
932        NautilusWsMessage::OrderExpired(event) => {
933            dispatch_order_event(OrderEventAny::Expired(event), sender);
934        }
935        NautilusWsMessage::OrderRejected(event) => {
936            dispatch_order_event(OrderEventAny::Rejected(event), sender);
937        }
938        NautilusWsMessage::OrderCancelRejected(event) => {
939            dispatch_order_event(OrderEventAny::CancelRejected(event), sender);
940        }
941        NautilusWsMessage::OrderModifyRejected(event) => {
942            dispatch_order_event(OrderEventAny::ModifyRejected(event), sender);
943        }
944        NautilusWsMessage::OrderTriggered(event) => {
945            dispatch_order_event(OrderEventAny::Triggered(event), sender);
946        }
947        NautilusWsMessage::OrderUpdated(event) => {
948            dispatch_order_event(OrderEventAny::Updated(event), sender);
949        }
950        NautilusWsMessage::Error(e) => {
951            tracing::warn!(
952                "Websocket error: code={} message={} conn_id={:?}",
953                e.code,
954                e.message,
955                e.conn_id
956            );
957        }
958        NautilusWsMessage::Reconnected => {
959            tracing::info!("Websocket reconnected");
960        }
961        NautilusWsMessage::Authenticated => {
962            tracing::debug!("Websocket authenticated");
963        }
964        NautilusWsMessage::Deltas(_)
965        | NautilusWsMessage::Raw(_)
966        | NautilusWsMessage::Data(_)
967        | NautilusWsMessage::FundingRates(_)
968        | NautilusWsMessage::Instrument(_) => {
969            tracing::debug!("Ignoring websocket data message");
970        }
971    }
972}
973
974fn dispatch_account_state(
975    state: AccountState,
976    sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
977) {
978    if let Err(e) = sender.send(ExecutionEvent::Account(state)) {
979        tracing::warn!("Failed to send account state: {e}");
980    }
981}
982
983fn dispatch_position_status_report(
984    report: PositionStatusReport,
985    sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
986) {
987    let exec_report = NautilusExecutionReport::Position(Box::new(report));
988    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
989        tracing::warn!("Failed to send position status report: {e}");
990    }
991}
992
993fn dispatch_execution_report(
994    report: ExecutionReport,
995    sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
996) {
997    match report {
998        ExecutionReport::Order(order_report) => {
999            let exec_report = NautilusExecutionReport::OrderStatus(Box::new(order_report));
1000            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1001                tracing::warn!("Failed to send order status report: {e}");
1002            }
1003        }
1004        ExecutionReport::Fill(fill_report) => {
1005            let exec_report = NautilusExecutionReport::Fill(Box::new(fill_report));
1006            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1007                tracing::warn!("Failed to send fill report: {e}");
1008            }
1009        }
1010    }
1011}
1012
1013fn dispatch_order_event(
1014    event: OrderEventAny,
1015    sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1016) {
1017    if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
1018        tracing::warn!("Failed to send order event: {e}");
1019    }
1020}
1021
1022fn nanos_to_datetime(value: Option<UnixNanos>) -> Option<DateTime<Utc>> {
1023    value.map(|nanos| nanos.to_datetime_utc())
1024}
1025
1026#[cfg(test)]
1027mod tests {
1028    use nautilus_common::messages::execution::{BatchCancelOrders, CancelOrder};
1029    use nautilus_core::UnixNanos;
1030    use nautilus_model::identifiers::{
1031        ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId,
1032    };
1033    use rstest::rstest;
1034
1035    #[rstest]
1036    fn test_batch_cancel_orders_builds_payload() {
1037        let trader_id = TraderId::from("TRADER-001");
1038        let strategy_id = StrategyId::from("STRATEGY-001");
1039        let client_id = ClientId::from("OKX");
1040        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1041        let client_order_id1 = ClientOrderId::new("order1");
1042        let client_order_id2 = ClientOrderId::new("order2");
1043        let venue_order_id1 = VenueOrderId::new("venue1");
1044        let venue_order_id2 = VenueOrderId::new("venue2");
1045
1046        let cmd = BatchCancelOrders {
1047            trader_id,
1048            client_id,
1049            strategy_id,
1050            instrument_id,
1051            cancels: vec![
1052                CancelOrder {
1053                    trader_id,
1054                    client_id,
1055                    strategy_id,
1056                    instrument_id,
1057                    client_order_id: client_order_id1,
1058                    venue_order_id: venue_order_id1,
1059                    command_id: Default::default(),
1060                    ts_init: UnixNanos::default(),
1061                    params: None,
1062                },
1063                CancelOrder {
1064                    trader_id,
1065                    client_id,
1066                    strategy_id,
1067                    instrument_id,
1068                    client_order_id: client_order_id2,
1069                    venue_order_id: venue_order_id2,
1070                    command_id: Default::default(),
1071                    ts_init: UnixNanos::default(),
1072                    params: None,
1073                },
1074            ],
1075            command_id: Default::default(),
1076            ts_init: UnixNanos::default(),
1077            params: None,
1078        };
1079
1080        // Verify we can build the payload structure
1081        let mut payload = Vec::with_capacity(cmd.cancels.len());
1082        for cancel in &cmd.cancels {
1083            payload.push((
1084                cancel.instrument_id,
1085                Some(cancel.client_order_id),
1086                Some(cancel.venue_order_id),
1087            ));
1088        }
1089
1090        assert_eq!(payload.len(), 2);
1091        assert_eq!(payload[0].0, instrument_id);
1092        assert_eq!(payload[0].1, Some(client_order_id1));
1093        assert_eq!(payload[0].2, Some(venue_order_id1));
1094        assert_eq!(payload[1].0, instrument_id);
1095        assert_eq!(payload[1].1, Some(client_order_id2));
1096        assert_eq!(payload[1].2, Some(venue_order_id2));
1097    }
1098
1099    #[rstest]
1100    fn test_batch_cancel_orders_with_empty_cancels() {
1101        let cmd = BatchCancelOrders {
1102            trader_id: TraderId::from("TRADER-001"),
1103            client_id: ClientId::from("OKX"),
1104            strategy_id: StrategyId::from("STRATEGY-001"),
1105            instrument_id: InstrumentId::from("BTC-USDT.OKX"),
1106            cancels: vec![],
1107            command_id: Default::default(),
1108            ts_init: UnixNanos::default(),
1109            params: None,
1110        };
1111
1112        let payload: Vec<(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>)> =
1113            Vec::with_capacity(cmd.cancels.len());
1114        assert_eq!(payload.len(), 0);
1115    }
1116}