1use std::{future::Future, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use chrono::{DateTime, Utc};
23use futures_util::{StreamExt, pin_mut};
24use nautilus_common::{
25 messages::{
26 ExecutionEvent,
27 execution::{
28 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
29 GenerateOrderStatusReport, GeneratePositionReports,
30 },
31 },
32 msgbus,
33 runner::get_exec_event_sender,
34 runtime::get_runtime,
35};
36use nautilus_core::{MUTEX_POISONED, UnixNanos};
37use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
38use nautilus_live::execution::client::LiveExecutionClient;
39use nautilus_model::{
40 accounts::AccountAny,
41 enums::{AccountType, OmsType, OrderType},
42 events::{AccountState, OrderEventAny},
43 identifiers::{AccountId, ClientId, InstrumentId, Venue},
44 orders::Order,
45 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
46 types::{AccountBalance, MarginBalance},
47};
48use tokio::task::JoinHandle;
49
50use crate::{
51 common::{
52 consts::{OKX_CONDITIONAL_ORDER_TYPES, OKX_VENUE},
53 enums::{OKXInstrumentType, OKXMarginMode, OKXTradeMode},
54 },
55 config::OKXExecClientConfig,
56 http::client::OKXHttpClient,
57 websocket::{
58 client::OKXWebSocketClient,
59 messages::{ExecutionReport, NautilusWsMessage},
60 },
61};
62
63#[derive(Debug)]
64pub struct OKXExecutionClient {
65 core: ExecutionClientCore,
66 config: OKXExecClientConfig,
67 http_client: OKXHttpClient,
68 ws_private: OKXWebSocketClient,
69 ws_business: OKXWebSocketClient,
70 trade_mode: OKXTradeMode,
71 started: bool,
72 connected: bool,
73 instruments_initialized: bool,
74 ws_stream_handle: Option<JoinHandle<()>>,
75 ws_business_stream_handle: Option<JoinHandle<()>>,
76 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
77}
78
79impl OKXExecutionClient {
80 pub fn new(core: ExecutionClientCore, config: OKXExecClientConfig) -> anyhow::Result<Self> {
86 let http_client = if config.has_api_credentials() {
87 OKXHttpClient::with_credentials(
88 config.api_key.clone(),
89 config.api_secret.clone(),
90 config.api_passphrase.clone(),
91 config.base_url_http.clone(),
92 config.http_timeout_secs,
93 config.max_retries,
94 config.retry_delay_initial_ms,
95 config.retry_delay_max_ms,
96 config.is_demo,
97 config.http_proxy_url.clone(),
98 )?
99 } else {
100 OKXHttpClient::new(
101 config.base_url_http.clone(),
102 config.http_timeout_secs,
103 config.max_retries,
104 config.retry_delay_initial_ms,
105 config.retry_delay_max_ms,
106 config.is_demo,
107 config.http_proxy_url.clone(),
108 )?
109 };
110
111 let account_id = core.account_id;
112 let ws_private = OKXWebSocketClient::new(
113 Some(config.ws_private_url()),
114 config.api_key.clone(),
115 config.api_secret.clone(),
116 config.api_passphrase.clone(),
117 Some(account_id),
118 Some(20), )
120 .context("failed to construct OKX private websocket client")?;
121
122 let ws_business = OKXWebSocketClient::new(
123 Some(config.ws_business_url()),
124 config.api_key.clone(),
125 config.api_secret.clone(),
126 config.api_passphrase.clone(),
127 Some(account_id),
128 Some(20), )
130 .context("failed to construct OKX business websocket client")?;
131
132 let trade_mode = Self::derive_trade_mode(core.account_type, &config);
133
134 Ok(Self {
135 core,
136 config,
137 http_client,
138 ws_private,
139 ws_business,
140 trade_mode,
141 started: false,
142 connected: false,
143 instruments_initialized: false,
144 ws_stream_handle: None,
145 ws_business_stream_handle: None,
146 pending_tasks: Mutex::new(Vec::new()),
147 })
148 }
149
150 fn derive_trade_mode(account_type: AccountType, config: &OKXExecClientConfig) -> OKXTradeMode {
151 let is_cross_margin = config.margin_mode == Some(OKXMarginMode::Cross);
152
153 if account_type == AccountType::Cash {
154 if !config.use_spot_margin {
155 return OKXTradeMode::Cash;
156 }
157 return if is_cross_margin {
158 OKXTradeMode::Cross
159 } else {
160 OKXTradeMode::Isolated
161 };
162 }
163
164 if is_cross_margin {
165 OKXTradeMode::Cross
166 } else {
167 OKXTradeMode::Isolated
168 }
169 }
170
171 fn instrument_types(&self) -> Vec<OKXInstrumentType> {
172 if self.config.instrument_types.is_empty() {
173 vec![OKXInstrumentType::Spot]
174 } else {
175 self.config.instrument_types.clone()
176 }
177 }
178
179 async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
180 if self.instruments_initialized {
181 return Ok(());
182 }
183
184 let mut all_instruments = Vec::new();
185 for instrument_type in self.instrument_types() {
186 let instruments = self
187 .http_client
188 .request_instruments(instrument_type, None)
189 .await
190 .with_context(|| {
191 format!("failed to request OKX instruments for {instrument_type:?}")
192 })?;
193
194 if instruments.is_empty() {
195 tracing::warn!("No instruments returned for {instrument_type:?}");
196 continue;
197 }
198
199 self.http_client.cache_instruments(instruments.clone());
200 all_instruments.extend(instruments);
201 }
202
203 if all_instruments.is_empty() {
204 tracing::warn!(
205 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
206 );
207 } else {
208 self.ws_private.cache_instruments(all_instruments);
209 }
210
211 self.instruments_initialized = true;
212 Ok(())
213 }
214
215 fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
216 if self.instruments_initialized {
217 return Ok(());
218 }
219
220 let runtime = get_runtime();
221 runtime.block_on(self.ensure_instruments_initialized_async())
222 }
223
224 async fn refresh_account_state(&self) -> anyhow::Result<()> {
225 let account_state = self
226 .http_client
227 .request_account_state(self.core.account_id)
228 .await
229 .context("failed to request OKX account state")?;
230
231 self.core.generate_account_state(
232 account_state.balances.clone(),
233 account_state.margins.clone(),
234 account_state.is_reported,
235 account_state.ts_event,
236 )
237 }
238
239 fn update_account_state(&self) -> anyhow::Result<()> {
240 let runtime = get_runtime();
241 runtime.block_on(self.refresh_account_state())
242 }
243
244 fn is_conditional_order(&self, order_type: OrderType) -> bool {
245 OKX_CONDITIONAL_ORDER_TYPES.contains(&order_type)
246 }
247
248 fn submit_regular_order(
249 &self,
250 cmd: &nautilus_common::messages::execution::SubmitOrder,
251 ) -> anyhow::Result<()> {
252 let order = cmd.order.clone();
253 let ws_private = self.ws_private.clone();
254 let trade_mode = self.trade_mode;
255
256 self.spawn_task("submit_order", async move {
257 ws_private
258 .submit_order(
259 order.trader_id(),
260 order.strategy_id(),
261 order.instrument_id(),
262 trade_mode,
263 order.client_order_id(),
264 order.order_side(),
265 order.order_type(),
266 order.quantity(),
267 Some(order.time_in_force()),
268 order.price(),
269 order.trigger_price(),
270 Some(order.is_post_only()),
271 Some(order.is_reduce_only()),
272 Some(order.is_quote_quantity()),
273 None,
274 )
275 .await?;
276 Ok(())
277 });
278
279 Ok(())
280 }
281
282 fn submit_conditional_order(
283 &self,
284 cmd: &nautilus_common::messages::execution::SubmitOrder,
285 ) -> anyhow::Result<()> {
286 let order = cmd.order.clone();
287 let trigger_price = order
288 .trigger_price()
289 .ok_or_else(|| anyhow::anyhow!("conditional order requires a trigger price"))?;
290 let http_client = self.http_client.clone();
291 let trade_mode = self.trade_mode;
292
293 self.spawn_task("submit_algo_order", async move {
294 http_client
295 .place_algo_order_with_domain_types(
296 order.instrument_id(),
297 trade_mode,
298 order.client_order_id(),
299 order.order_side(),
300 order.order_type(),
301 order.quantity(),
302 trigger_price,
303 order.trigger_type(),
304 order.price(),
305 Some(order.is_reduce_only()),
306 )
307 .await?;
308 Ok(())
309 });
310
311 Ok(())
312 }
313
314 fn cancel_ws_order(
315 &self,
316 cmd: &nautilus_common::messages::execution::CancelOrder,
317 ) -> anyhow::Result<()> {
318 let ws_private = self.ws_private.clone();
319 let command = cmd.clone();
320
321 self.spawn_task("cancel_order", async move {
322 ws_private
323 .cancel_order(
324 command.trader_id,
325 command.strategy_id,
326 command.instrument_id,
327 Some(command.client_order_id),
328 Some(command.venue_order_id),
329 )
330 .await?;
331 Ok(())
332 });
333
334 Ok(())
335 }
336
337 fn mass_cancel_instrument(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
338 let ws_private = self.ws_private.clone();
339 self.spawn_task("mass_cancel_orders", async move {
340 ws_private.mass_cancel_orders(instrument_id).await?;
341 Ok(())
342 });
343 Ok(())
344 }
345
346 fn spawn_task<F>(&self, description: &'static str, fut: F)
347 where
348 F: Future<Output = anyhow::Result<()>> + Send + 'static,
349 {
350 let runtime = get_runtime();
351 let handle = runtime.spawn(async move {
352 if let Err(e) = fut.await {
353 tracing::warn!("{description} failed: {e:?}");
354 }
355 });
356
357 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
358 tasks.retain(|handle| !handle.is_finished());
359 tasks.push(handle);
360 }
361
362 fn abort_pending_tasks(&self) {
363 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
364 for handle in tasks.drain(..) {
365 handle.abort();
366 }
367 }
368}
369
370impl ExecutionClient for OKXExecutionClient {
371 fn is_connected(&self) -> bool {
372 self.connected
373 }
374
375 fn client_id(&self) -> ClientId {
376 self.core.client_id
377 }
378
379 fn account_id(&self) -> AccountId {
380 self.core.account_id
381 }
382
383 fn venue(&self) -> Venue {
384 *OKX_VENUE
385 }
386
387 fn oms_type(&self) -> OmsType {
388 self.core.oms_type
389 }
390
391 fn get_account(&self) -> Option<AccountAny> {
392 self.core.get_account()
393 }
394
395 fn generate_account_state(
396 &self,
397 balances: Vec<AccountBalance>,
398 margins: Vec<MarginBalance>,
399 reported: bool,
400 ts_event: UnixNanos,
401 ) -> anyhow::Result<()> {
402 self.core
403 .generate_account_state(balances, margins, reported, ts_event)
404 }
405
406 fn start(&mut self) -> anyhow::Result<()> {
407 if self.started {
408 return Ok(());
409 }
410
411 self.ensure_instruments_initialized()?;
412 self.started = true;
413 tracing::info!(
414 client_id = %self.core.client_id,
415 account_id = %self.core.account_id,
416 account_type = ?self.core.account_type,
417 trade_mode = ?self.trade_mode,
418 instrument_types = ?self.config.instrument_types,
419 use_fills_channel = self.config.use_fills_channel,
420 is_demo = self.config.is_demo,
421 http_proxy_url = ?self.config.http_proxy_url,
422 ws_proxy_url = ?self.config.ws_proxy_url,
423 "OKX execution client started"
424 );
425 Ok(())
426 }
427
428 fn stop(&mut self) -> anyhow::Result<()> {
429 if !self.started {
430 return Ok(());
431 }
432
433 self.started = false;
434 self.connected = false;
435 if let Some(handle) = self.ws_stream_handle.take() {
436 handle.abort();
437 }
438 self.abort_pending_tasks();
439 tracing::info!("OKX execution client {} stopped", self.core.client_id);
440 Ok(())
441 }
442
443 fn submit_order(
444 &self,
445 cmd: &nautilus_common::messages::execution::SubmitOrder,
446 ) -> anyhow::Result<()> {
447 let order = &cmd.order;
448
449 if order.is_closed() {
450 tracing::warn!("Cannot submit closed order {}", order.client_order_id());
451 return Ok(());
452 }
453
454 self.core.generate_order_submitted(
455 order.strategy_id(),
456 order.instrument_id(),
457 order.client_order_id(),
458 cmd.ts_init,
459 );
460
461 let result = if self.is_conditional_order(order.order_type()) {
462 self.submit_conditional_order(cmd)
463 } else {
464 self.submit_regular_order(cmd)
465 };
466
467 if let Err(e) = result {
468 self.core.generate_order_rejected(
469 order.strategy_id(),
470 order.instrument_id(),
471 order.client_order_id(),
472 &format!("submit-order-error: {e}"),
473 cmd.ts_init,
474 false,
475 );
476 return Err(e);
477 }
478
479 Ok(())
480 }
481
482 fn submit_order_list(
483 &self,
484 cmd: &nautilus_common::messages::execution::SubmitOrderList,
485 ) -> anyhow::Result<()> {
486 tracing::warn!(
487 "submit_order_list not yet implemented for OKX execution client (got {} orders)",
488 cmd.order_list.orders.len()
489 );
490 Ok(())
491 }
492
493 fn modify_order(
494 &self,
495 cmd: &nautilus_common::messages::execution::ModifyOrder,
496 ) -> anyhow::Result<()> {
497 let ws_private = self.ws_private.clone();
498 let command = cmd.clone();
499
500 self.spawn_task("modify_order", async move {
501 ws_private
502 .modify_order(
503 command.trader_id,
504 command.strategy_id,
505 command.instrument_id,
506 Some(command.client_order_id),
507 command.price,
508 command.quantity,
509 Some(command.venue_order_id),
510 )
511 .await?;
512 Ok(())
513 });
514
515 Ok(())
516 }
517
518 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
519 self.cancel_ws_order(cmd)
520 }
521
522 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
523 self.mass_cancel_instrument(cmd.instrument_id)
524 }
525
526 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
527 let mut payload = Vec::with_capacity(cmd.cancels.len());
528
529 for cancel in &cmd.cancels {
530 payload.push((
531 cancel.instrument_id,
532 Some(cancel.client_order_id),
533 Some(cancel.venue_order_id),
534 ));
535 }
536
537 let ws_private = self.ws_private.clone();
538 self.spawn_task("batch_cancel_orders", async move {
539 ws_private.batch_cancel_orders(payload).await?;
540 Ok(())
541 });
542
543 Ok(())
544 }
545
546 fn query_account(
547 &self,
548 _cmd: &nautilus_common::messages::execution::QueryAccount,
549 ) -> anyhow::Result<()> {
550 self.update_account_state()
551 }
552
553 fn query_order(
554 &self,
555 cmd: &nautilus_common::messages::execution::QueryOrder,
556 ) -> anyhow::Result<()> {
557 tracing::debug!(
558 "query_order not implemented for OKX execution client (client_order_id={})",
559 cmd.client_order_id
560 );
561 Ok(())
562 }
563}
564
565#[async_trait(?Send)]
566impl LiveExecutionClient for OKXExecutionClient {
567 fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ExecutionEvent> {
568 get_exec_event_sender()
569 }
570
571 fn get_clock(&self) -> std::cell::Ref<'_, dyn nautilus_common::clock::Clock> {
572 self.core.clock().borrow()
573 }
574
575 async fn connect(&mut self) -> anyhow::Result<()> {
576 if self.connected {
577 return Ok(());
578 }
579
580 self.ensure_instruments_initialized_async().await?;
581
582 self.ws_private.connect().await?;
583 self.ws_private.wait_until_active(10.0).await?;
584
585 for inst_type in self.instrument_types() {
586 tracing::info!(
587 "Subscribing to channels for instrument type: {:?}",
588 inst_type
589 );
590 self.ws_private.subscribe_orders(inst_type).await?;
591
592 if inst_type != OKXInstrumentType::Option {
594 self.ws_private.subscribe_orders_algo(inst_type).await?;
595 }
596
597 if self.config.use_fills_channel
598 && let Err(e) = self.ws_private.subscribe_fills(inst_type).await
599 {
600 tracing::warn!("Failed to subscribe to fills channel ({inst_type:?}): {e}");
601 }
602 }
603
604 self.ws_private.subscribe_account().await?;
605
606 self.ws_business.connect().await?;
607 self.ws_business.wait_until_active(10.0).await?;
608
609 self.start_ws_stream()?;
610 self.start_ws_business_stream()?;
611 self.refresh_account_state().await?;
612
613 self.connected = true;
614 tracing::info!(client_id = %self.core.client_id, "Connected");
615
616 Ok(())
617 }
618
619 async fn disconnect(&mut self) -> anyhow::Result<()> {
620 if !self.connected {
621 return Ok(());
622 }
623
624 self.http_client.cancel_all_requests();
625 if let Err(e) = self.ws_private.close().await {
626 tracing::warn!("Error while closing OKX private websocket: {e:?}");
627 }
628
629 if let Err(e) = self.ws_business.close().await {
630 tracing::warn!("Error while closing OKX business websocket: {e:?}");
631 }
632
633 if let Some(handle) = self.ws_stream_handle.take() {
634 handle.abort();
635 }
636
637 if let Some(handle) = self.ws_business_stream_handle.take() {
638 handle.abort();
639 }
640
641 self.abort_pending_tasks();
642
643 self.connected = false;
644 tracing::info!(client_id = %self.core.client_id, "Disconnected");
645 Ok(())
646 }
647
648 async fn generate_order_status_report(
649 &self,
650 cmd: &GenerateOrderStatusReport,
651 ) -> anyhow::Result<Option<OrderStatusReport>> {
652 let Some(instrument_id) = cmd.instrument_id else {
653 tracing::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
654 return Ok(None);
655 };
656
657 let mut reports = self
658 .http_client
659 .request_order_status_reports(
660 self.core.account_id,
661 None,
662 Some(instrument_id),
663 None,
664 None,
665 false,
666 None,
667 )
668 .await?;
669
670 if let Some(client_order_id) = cmd.client_order_id {
671 reports.retain(|report| report.client_order_id == Some(client_order_id));
672 }
673
674 if let Some(venue_order_id) = cmd.venue_order_id {
675 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
676 }
677
678 Ok(reports.into_iter().next())
679 }
680
681 async fn generate_order_status_reports(
682 &self,
683 cmd: &GenerateOrderStatusReport,
684 ) -> anyhow::Result<Vec<OrderStatusReport>> {
685 let mut reports = Vec::new();
686
687 if let Some(instrument_id) = cmd.instrument_id {
688 let mut fetched = self
689 .http_client
690 .request_order_status_reports(
691 self.core.account_id,
692 None,
693 Some(instrument_id),
694 None,
695 None,
696 false,
697 None,
698 )
699 .await?;
700 reports.append(&mut fetched);
701 } else {
702 for inst_type in self.instrument_types() {
703 let mut fetched = self
704 .http_client
705 .request_order_status_reports(
706 self.core.account_id,
707 Some(inst_type),
708 None,
709 None,
710 None,
711 false,
712 None,
713 )
714 .await?;
715 reports.append(&mut fetched);
716 }
717 }
718
719 if let Some(client_order_id) = cmd.client_order_id {
720 reports.retain(|report| report.client_order_id == Some(client_order_id));
721 }
722
723 if let Some(venue_order_id) = cmd.venue_order_id {
724 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
725 }
726
727 Ok(reports)
728 }
729
730 async fn generate_fill_reports(
731 &self,
732 cmd: GenerateFillReports,
733 ) -> anyhow::Result<Vec<FillReport>> {
734 let start_dt = nanos_to_datetime(cmd.start);
735 let end_dt = nanos_to_datetime(cmd.end);
736 let mut reports = Vec::new();
737
738 if let Some(instrument_id) = cmd.instrument_id {
739 let mut fetched = self
740 .http_client
741 .request_fill_reports(
742 self.core.account_id,
743 None,
744 Some(instrument_id),
745 start_dt,
746 end_dt,
747 None,
748 )
749 .await?;
750 reports.append(&mut fetched);
751 } else {
752 for inst_type in self.instrument_types() {
753 let mut fetched = self
754 .http_client
755 .request_fill_reports(
756 self.core.account_id,
757 Some(inst_type),
758 None,
759 start_dt,
760 end_dt,
761 None,
762 )
763 .await?;
764 reports.append(&mut fetched);
765 }
766 }
767
768 if let Some(venue_order_id) = cmd.venue_order_id {
769 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
770 }
771
772 Ok(reports)
773 }
774
775 async fn generate_position_status_reports(
776 &self,
777 cmd: &GeneratePositionReports,
778 ) -> anyhow::Result<Vec<PositionStatusReport>> {
779 let mut reports = Vec::new();
780
781 if let Some(instrument_id) = cmd.instrument_id {
783 let mut fetched = self
784 .http_client
785 .request_position_status_reports(self.core.account_id, None, Some(instrument_id))
786 .await?;
787 reports.append(&mut fetched);
788 } else {
789 for inst_type in self.instrument_types() {
790 let mut fetched = self
791 .http_client
792 .request_position_status_reports(self.core.account_id, Some(inst_type), None)
793 .await?;
794 reports.append(&mut fetched);
795 }
796 }
797
798 let mut margin_reports = self
801 .http_client
802 .request_spot_margin_position_reports(self.core.account_id)
803 .await?;
804
805 if let Some(instrument_id) = cmd.instrument_id {
807 margin_reports.retain(|report| report.instrument_id == instrument_id);
808 }
809
810 reports.append(&mut margin_reports);
811
812 let _ = nanos_to_datetime(cmd.start);
813 let _ = nanos_to_datetime(cmd.end);
814
815 Ok(reports)
816 }
817
818 async fn generate_mass_status(
819 &self,
820 lookback_mins: Option<u64>,
821 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
822 tracing::warn!(
823 "generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})"
824 );
825 Ok(None)
826 }
827}
828
829impl OKXExecutionClient {
830 fn start_ws_stream(&mut self) -> anyhow::Result<()> {
831 if self.ws_stream_handle.is_some() {
832 return Ok(());
833 }
834
835 let stream = self.ws_private.stream();
836 let runtime = get_runtime();
837 let handle = runtime.spawn(async move {
838 pin_mut!(stream);
839 while let Some(message) = stream.next().await {
840 dispatch_ws_message(message);
841 }
842 });
843
844 self.ws_stream_handle = Some(handle);
845 Ok(())
846 }
847
848 fn start_ws_business_stream(&mut self) -> anyhow::Result<()> {
849 if self.ws_business_stream_handle.is_some() {
850 return Ok(());
851 }
852
853 let stream = self.ws_business.stream();
854 let runtime = get_runtime();
855 let handle = runtime.spawn(async move {
856 pin_mut!(stream);
857 while let Some(message) = stream.next().await {
858 dispatch_ws_message(message);
859 }
860 });
861
862 self.ws_business_stream_handle = Some(handle);
863 Ok(())
864 }
865}
866
867fn dispatch_ws_message(message: NautilusWsMessage) {
868 match message {
869 NautilusWsMessage::AccountUpdate(state) => dispatch_account_state(state),
870 NautilusWsMessage::PositionUpdate(report) => dispatch_position_status_report(report),
871 NautilusWsMessage::ExecutionReports(reports) => {
872 for report in reports {
873 dispatch_execution_report(report);
874 }
875 }
876 NautilusWsMessage::OrderRejected(event) => {
877 dispatch_order_event(OrderEventAny::Rejected(event));
878 }
879 NautilusWsMessage::OrderCancelRejected(event) => {
880 dispatch_order_event(OrderEventAny::CancelRejected(event));
881 }
882 NautilusWsMessage::OrderModifyRejected(event) => {
883 dispatch_order_event(OrderEventAny::ModifyRejected(event));
884 }
885 NautilusWsMessage::Error(e) => {
886 tracing::warn!(
887 "OKX websocket error: code={} message={} conn_id={:?}",
888 e.code,
889 e.message,
890 e.conn_id
891 );
892 }
893 NautilusWsMessage::Reconnected => {
894 tracing::info!("OKX websocket reconnected");
895 }
896 NautilusWsMessage::Authenticated => {
897 tracing::debug!("OKX websocket authenticated");
898 }
899 NautilusWsMessage::Deltas(_)
900 | NautilusWsMessage::Raw(_)
901 | NautilusWsMessage::Data(_)
902 | NautilusWsMessage::FundingRates(_)
903 | NautilusWsMessage::Instrument(_) => {
904 tracing::debug!("Ignoring OKX websocket data message");
905 }
906 }
907}
908
909fn dispatch_account_state(state: AccountState) {
910 msgbus::send_any(
911 "Portfolio.update_account".into(),
912 &state as &dyn std::any::Any,
913 );
914}
915
916fn dispatch_position_status_report(report: PositionStatusReport) {
917 let sender = get_exec_event_sender();
918 let exec_report = nautilus_common::messages::ExecutionReport::Position(Box::new(report));
919 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
920 tracing::warn!("Failed to send position status report: {e}");
921 }
922}
923
924fn dispatch_execution_report(report: ExecutionReport) {
925 let sender = get_exec_event_sender();
926 match report {
927 ExecutionReport::Order(order_report) => {
928 let exec_report =
929 nautilus_common::messages::ExecutionReport::OrderStatus(Box::new(order_report));
930 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
931 tracing::warn!("Failed to send order status report: {e}");
932 }
933 }
934 ExecutionReport::Fill(fill_report) => {
935 let exec_report =
936 nautilus_common::messages::ExecutionReport::Fill(Box::new(fill_report));
937 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
938 tracing::warn!("Failed to send fill report: {e}");
939 }
940 }
941 }
942}
943
944fn dispatch_order_event(event: OrderEventAny) {
945 let sender = get_exec_event_sender();
946 if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
947 tracing::warn!("Failed to send order event: {e}");
948 }
949}
950
951fn nanos_to_datetime(value: Option<UnixNanos>) -> Option<DateTime<Utc>> {
952 value.map(|nanos| nanos.to_datetime_utc())
953}
954
955#[cfg(test)]
956mod tests {
957 use nautilus_common::messages::execution::{BatchCancelOrders, CancelOrder};
958 use nautilus_core::UnixNanos;
959 use nautilus_model::identifiers::{
960 ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId,
961 };
962 use rstest::rstest;
963
964 #[rstest]
965 fn test_batch_cancel_orders_builds_payload() {
966 use nautilus_model::identifiers::ClientId;
967
968 let trader_id = TraderId::from("TRADER-001");
969 let strategy_id = StrategyId::from("STRATEGY-001");
970 let client_id = ClientId::from("OKX");
971 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
972 let client_order_id1 = ClientOrderId::new("order1");
973 let client_order_id2 = ClientOrderId::new("order2");
974 let venue_order_id1 = VenueOrderId::new("venue1");
975 let venue_order_id2 = VenueOrderId::new("venue2");
976
977 let cmd = BatchCancelOrders {
978 trader_id,
979 client_id,
980 strategy_id,
981 instrument_id,
982 cancels: vec![
983 CancelOrder {
984 trader_id,
985 client_id,
986 strategy_id,
987 instrument_id,
988 client_order_id: client_order_id1,
989 venue_order_id: venue_order_id1,
990 command_id: Default::default(),
991 ts_init: UnixNanos::default(),
992 },
993 CancelOrder {
994 trader_id,
995 client_id,
996 strategy_id,
997 instrument_id,
998 client_order_id: client_order_id2,
999 venue_order_id: venue_order_id2,
1000 command_id: Default::default(),
1001 ts_init: UnixNanos::default(),
1002 },
1003 ],
1004 command_id: Default::default(),
1005 ts_init: UnixNanos::default(),
1006 };
1007
1008 let mut payload = Vec::with_capacity(cmd.cancels.len());
1010 for cancel in &cmd.cancels {
1011 payload.push((
1012 cancel.instrument_id,
1013 Some(cancel.client_order_id),
1014 Some(cancel.venue_order_id),
1015 ));
1016 }
1017
1018 assert_eq!(payload.len(), 2);
1019 assert_eq!(payload[0].0, instrument_id);
1020 assert_eq!(payload[0].1, Some(client_order_id1));
1021 assert_eq!(payload[0].2, Some(venue_order_id1));
1022 assert_eq!(payload[1].0, instrument_id);
1023 assert_eq!(payload[1].1, Some(client_order_id2));
1024 assert_eq!(payload[1].2, Some(venue_order_id2));
1025 }
1026
1027 #[rstest]
1028 fn test_batch_cancel_orders_with_empty_cancels() {
1029 use nautilus_model::identifiers::ClientId;
1030
1031 let cmd = BatchCancelOrders {
1032 trader_id: TraderId::from("TRADER-001"),
1033 client_id: ClientId::from("OKX"),
1034 strategy_id: StrategyId::from("STRATEGY-001"),
1035 instrument_id: InstrumentId::from("BTC-USDT.OKX"),
1036 cancels: vec![],
1037 command_id: Default::default(),
1038 ts_init: UnixNanos::default(),
1039 };
1040
1041 let payload: Vec<(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>)> =
1042 Vec::with_capacity(cmd.cancels.len());
1043 assert_eq!(payload.len(), 0);
1044 }
1045}