1use std::{cell::Ref, future::Future, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use chrono::{DateTime, Utc};
23use futures_util::{StreamExt, pin_mut};
24use nautilus_common::{
25 clock::Clock,
26 messages::{
27 ExecutionEvent,
28 execution::{
29 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
30 GenerateOrderStatusReport, GeneratePositionReports,
31 },
32 },
33 msgbus,
34 runner::get_exec_event_sender,
35 runtime::get_runtime,
36};
37use nautilus_core::{MUTEX_POISONED, UnixNanos};
38use nautilus_execution::client::{ExecutionClient, LiveExecutionClient, base::ExecutionClientCore};
39use nautilus_live::execution::LiveExecutionClientExt;
40use nautilus_model::{
41 accounts::AccountAny,
42 enums::{AccountType, OmsType, OrderType},
43 events::{AccountState, OrderEventAny},
44 identifiers::{AccountId, ClientId, InstrumentId, Venue},
45 orders::Order,
46 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
47 types::{AccountBalance, MarginBalance},
48};
49use tokio::task::JoinHandle;
50
51use crate::{
52 common::{
53 consts::{OKX_CONDITIONAL_ORDER_TYPES, OKX_VENUE},
54 enums::{OKXInstrumentType, OKXMarginMode, OKXTradeMode},
55 },
56 config::OKXExecClientConfig,
57 http::client::OKXHttpClient,
58 websocket::{
59 client::OKXWebSocketClient,
60 messages::{ExecutionReport, NautilusWsMessage},
61 },
62};
63
64#[derive(Debug)]
65pub struct OKXExecutionClient {
66 core: ExecutionClientCore,
67 config: OKXExecClientConfig,
68 http_client: OKXHttpClient,
69 ws_client: OKXWebSocketClient,
70 trade_mode: OKXTradeMode,
71 started: bool,
72 connected: bool,
73 instruments_initialized: bool,
74 ws_stream_handle: Option<JoinHandle<()>>,
75 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
76}
77
78impl OKXExecutionClient {
79 pub fn new(core: ExecutionClientCore, config: OKXExecClientConfig) -> anyhow::Result<Self> {
85 let http_client = if config.has_api_credentials() {
86 OKXHttpClient::with_credentials(
87 config.api_key.clone(),
88 config.api_secret.clone(),
89 config.api_passphrase.clone(),
90 config.base_url_http.clone(),
91 config.http_timeout_secs,
92 config.max_retries,
93 config.retry_delay_initial_ms,
94 config.retry_delay_max_ms,
95 config.is_demo,
96 )?
97 } else {
98 OKXHttpClient::new(
99 config.base_url_http.clone(),
100 config.http_timeout_secs,
101 config.max_retries,
102 config.retry_delay_initial_ms,
103 config.retry_delay_max_ms,
104 config.is_demo,
105 )?
106 };
107
108 let account_id = core.account_id;
109 let ws_client = OKXWebSocketClient::new(
110 Some(config.ws_private_url()),
111 config.api_key.clone(),
112 config.api_secret.clone(),
113 config.api_passphrase.clone(),
114 Some(account_id),
115 None,
116 )
117 .context("failed to construct OKX execution websocket client")?;
118
119 let trade_mode = Self::derive_trade_mode(core.account_type, &config);
120
121 Ok(Self {
122 core,
123 config,
124 http_client,
125 ws_client,
126 trade_mode,
127 started: false,
128 connected: false,
129 instruments_initialized: false,
130 ws_stream_handle: None,
131 pending_tasks: Mutex::new(Vec::new()),
132 })
133 }
134
135 fn derive_trade_mode(account_type: AccountType, config: &OKXExecClientConfig) -> OKXTradeMode {
136 let is_cross_margin = config.margin_mode == Some(OKXMarginMode::Cross);
137
138 if account_type == AccountType::Cash {
139 if !config.use_spot_margin {
140 return OKXTradeMode::Cash;
141 }
142 return if is_cross_margin {
143 OKXTradeMode::Cross
144 } else {
145 OKXTradeMode::Isolated
146 };
147 }
148
149 if is_cross_margin {
150 OKXTradeMode::Cross
151 } else {
152 OKXTradeMode::Isolated
153 }
154 }
155
156 fn instrument_types(&self) -> Vec<OKXInstrumentType> {
157 if self.config.instrument_types.is_empty() {
158 vec![OKXInstrumentType::Spot]
159 } else {
160 self.config.instrument_types.clone()
161 }
162 }
163
164 async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
165 if self.instruments_initialized {
166 return Ok(());
167 }
168
169 let mut all_instruments = Vec::new();
170 for instrument_type in self.instrument_types() {
171 let instruments = self
172 .http_client
173 .request_instruments(instrument_type, None)
174 .await
175 .with_context(|| {
176 format!("failed to request OKX instruments for {instrument_type:?}")
177 })?;
178
179 if instruments.is_empty() {
180 tracing::warn!("No instruments returned for {instrument_type:?}");
181 continue;
182 }
183
184 self.http_client.add_instruments(instruments.clone());
185 all_instruments.extend(instruments);
186 }
187
188 if all_instruments.is_empty() {
189 tracing::warn!(
190 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
191 );
192 } else {
193 self.ws_client.initialize_instruments_cache(all_instruments);
194 }
195
196 self.instruments_initialized = true;
197 Ok(())
198 }
199
200 fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
201 if self.instruments_initialized {
202 return Ok(());
203 }
204
205 let runtime = get_runtime();
206 runtime.block_on(self.ensure_instruments_initialized_async())
207 }
208
209 async fn refresh_account_state(&self) -> anyhow::Result<()> {
210 let account_state = self
211 .http_client
212 .request_account_state(self.core.account_id)
213 .await
214 .context("failed to request OKX account state")?;
215
216 self.core.generate_account_state(
217 account_state.balances.clone(),
218 account_state.margins.clone(),
219 account_state.is_reported,
220 account_state.ts_event,
221 )
222 }
223
224 fn update_account_state(&self) -> anyhow::Result<()> {
225 let runtime = get_runtime();
226 runtime.block_on(self.refresh_account_state())
227 }
228
229 fn is_conditional_order(&self, order_type: OrderType) -> bool {
230 OKX_CONDITIONAL_ORDER_TYPES.contains(&order_type)
231 }
232
233 fn submit_regular_order(
234 &self,
235 cmd: &nautilus_common::messages::execution::SubmitOrder,
236 ) -> anyhow::Result<()> {
237 let order = cmd.order.clone();
238 let ws_client = self.ws_client.clone();
239 let trade_mode = self.trade_mode;
240
241 self.spawn_task("submit_order", async move {
242 ws_client
243 .submit_order(
244 order.trader_id(),
245 order.strategy_id(),
246 order.instrument_id(),
247 trade_mode,
248 order.client_order_id(),
249 order.order_side(),
250 order.order_type(),
251 order.quantity(),
252 Some(order.time_in_force()),
253 order.price(),
254 order.trigger_price(),
255 Some(order.is_post_only()),
256 Some(order.is_reduce_only()),
257 Some(order.is_quote_quantity()),
258 None,
259 )
260 .await?;
261 Ok(())
262 });
263
264 Ok(())
265 }
266
267 fn submit_conditional_order(
268 &self,
269 cmd: &nautilus_common::messages::execution::SubmitOrder,
270 ) -> anyhow::Result<()> {
271 let order = cmd.order.clone();
272 let trigger_price = order
273 .trigger_price()
274 .ok_or_else(|| anyhow::anyhow!("conditional order requires a trigger price"))?;
275 let http_client = self.http_client.clone();
276 let trade_mode = self.trade_mode;
277
278 self.spawn_task("submit_algo_order", async move {
279 http_client
280 .place_algo_order_with_domain_types(
281 order.instrument_id(),
282 trade_mode,
283 order.client_order_id(),
284 order.order_side(),
285 order.order_type(),
286 order.quantity(),
287 trigger_price,
288 order.trigger_type(),
289 order.price(),
290 Some(order.is_reduce_only()),
291 )
292 .await?;
293 Ok(())
294 });
295
296 Ok(())
297 }
298
299 fn cancel_ws_order(
300 &self,
301 cmd: &nautilus_common::messages::execution::CancelOrder,
302 ) -> anyhow::Result<()> {
303 let ws_client = self.ws_client.clone();
304 let command = cmd.clone();
305
306 self.spawn_task("cancel_order", async move {
307 ws_client
308 .cancel_order(
309 command.trader_id,
310 command.strategy_id,
311 command.instrument_id,
312 Some(command.client_order_id),
313 Some(command.venue_order_id),
314 )
315 .await?;
316 Ok(())
317 });
318
319 Ok(())
320 }
321
322 fn mass_cancel_instrument(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
323 let ws_client = self.ws_client.clone();
324 self.spawn_task("mass_cancel_orders", async move {
325 ws_client.mass_cancel_orders(instrument_id).await?;
326 Ok(())
327 });
328 Ok(())
329 }
330
331 fn spawn_task<F>(&self, description: &'static str, fut: F)
332 where
333 F: Future<Output = anyhow::Result<()>> + Send + 'static,
334 {
335 let runtime = get_runtime();
336 let handle = runtime.spawn(async move {
337 if let Err(e) = fut.await {
338 tracing::warn!("{description} failed: {e:?}");
339 }
340 });
341
342 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
343 tasks.retain(|handle| !handle.is_finished());
344 tasks.push(handle);
345 }
346
347 fn abort_pending_tasks(&self) {
348 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
349 for handle in tasks.drain(..) {
350 handle.abort();
351 }
352 }
353}
354
355impl ExecutionClient for OKXExecutionClient {
356 fn is_connected(&self) -> bool {
357 self.connected
358 }
359
360 fn client_id(&self) -> ClientId {
361 self.core.client_id
362 }
363
364 fn account_id(&self) -> AccountId {
365 self.core.account_id
366 }
367
368 fn venue(&self) -> Venue {
369 *OKX_VENUE
370 }
371
372 fn oms_type(&self) -> OmsType {
373 self.core.oms_type
374 }
375
376 fn get_account(&self) -> Option<AccountAny> {
377 self.core.get_account()
378 }
379
380 fn generate_account_state(
381 &self,
382 balances: Vec<AccountBalance>,
383 margins: Vec<MarginBalance>,
384 reported: bool,
385 ts_event: UnixNanos,
386 ) -> anyhow::Result<()> {
387 self.core
388 .generate_account_state(balances, margins, reported, ts_event)
389 }
390
391 fn start(&mut self) -> anyhow::Result<()> {
392 if self.started {
393 return Ok(());
394 }
395
396 self.ensure_instruments_initialized()?;
397 self.started = true;
398 tracing::info!(
399 client_id = %self.core.client_id,
400 account_id = %self.core.account_id,
401 account_type = ?self.core.account_type,
402 trade_mode = ?self.trade_mode,
403 instrument_types = ?self.config.instrument_types,
404 use_fills_channel = self.config.use_fills_channel,
405 is_demo = self.config.is_demo,
406 "OKX execution client started"
407 );
408 Ok(())
409 }
410
411 fn stop(&mut self) -> anyhow::Result<()> {
412 if !self.started {
413 return Ok(());
414 }
415
416 self.started = false;
417 self.connected = false;
418 if let Some(handle) = self.ws_stream_handle.take() {
419 handle.abort();
420 }
421 self.abort_pending_tasks();
422 tracing::info!("OKX execution client {} stopped", self.core.client_id);
423 Ok(())
424 }
425
426 fn submit_order(
427 &self,
428 cmd: &nautilus_common::messages::execution::SubmitOrder,
429 ) -> anyhow::Result<()> {
430 let order = &cmd.order;
431
432 if order.is_closed() {
433 tracing::warn!("Cannot submit closed order {}", order.client_order_id());
434 return Ok(());
435 }
436
437 self.core.generate_order_submitted(
438 order.strategy_id(),
439 order.instrument_id(),
440 order.client_order_id(),
441 cmd.ts_init,
442 );
443
444 let result = if self.is_conditional_order(order.order_type()) {
445 self.submit_conditional_order(cmd)
446 } else {
447 self.submit_regular_order(cmd)
448 };
449
450 if let Err(e) = result {
451 self.core.generate_order_rejected(
452 order.strategy_id(),
453 order.instrument_id(),
454 order.client_order_id(),
455 &format!("submit-order-error: {e}"),
456 cmd.ts_init,
457 false,
458 );
459 return Err(e);
460 }
461
462 Ok(())
463 }
464
465 fn submit_order_list(
466 &self,
467 cmd: &nautilus_common::messages::execution::SubmitOrderList,
468 ) -> anyhow::Result<()> {
469 tracing::warn!(
470 "submit_order_list not yet implemented for OKX execution client (got {} orders)",
471 cmd.order_list.orders.len()
472 );
473 Ok(())
474 }
475
476 fn modify_order(
477 &self,
478 cmd: &nautilus_common::messages::execution::ModifyOrder,
479 ) -> anyhow::Result<()> {
480 let ws_client = self.ws_client.clone();
481 let command = cmd.clone();
482
483 self.spawn_task("modify_order", async move {
484 ws_client
485 .modify_order(
486 command.trader_id,
487 command.strategy_id,
488 command.instrument_id,
489 Some(command.client_order_id),
490 command.price,
491 command.quantity,
492 Some(command.venue_order_id),
493 )
494 .await?;
495 Ok(())
496 });
497
498 Ok(())
499 }
500
501 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
502 self.cancel_ws_order(cmd)
503 }
504
505 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
506 self.mass_cancel_instrument(cmd.instrument_id)
507 }
508
509 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
510 let mut payload = Vec::with_capacity(cmd.cancels.len());
511
512 for cancel in &cmd.cancels {
513 payload.push((
514 cancel.instrument_id,
515 Some(cancel.client_order_id),
516 Some(cancel.venue_order_id),
517 ));
518 }
519
520 let ws_client = self.ws_client.clone();
521 self.spawn_task("batch_cancel_orders", async move {
522 ws_client.batch_cancel_orders(payload).await?;
523 Ok(())
524 });
525
526 Ok(())
527 }
528
529 fn query_account(
530 &self,
531 _cmd: &nautilus_common::messages::execution::QueryAccount,
532 ) -> anyhow::Result<()> {
533 self.update_account_state()
534 }
535
536 fn query_order(
537 &self,
538 cmd: &nautilus_common::messages::execution::QueryOrder,
539 ) -> anyhow::Result<()> {
540 tracing::debug!(
541 "query_order not implemented for OKX execution client (client_order_id={})",
542 cmd.client_order_id
543 );
544 Ok(())
545 }
546}
547
548#[async_trait(?Send)]
549impl LiveExecutionClient for OKXExecutionClient {
550 async fn connect(&mut self) -> anyhow::Result<()> {
551 if self.connected {
552 return Ok(());
553 }
554
555 self.ensure_instruments_initialized_async().await?;
556
557 self.ws_client.connect().await?;
558 self.ws_client.wait_until_active(10.0).await?;
559
560 for inst_type in self.instrument_types() {
561 tracing::info!(
562 "Subscribing to orders channel for instrument type: {:?}",
563 inst_type
564 );
565 self.ws_client.subscribe_orders(inst_type).await?;
566
567 if inst_type != OKXInstrumentType::Option {
569 self.ws_client.subscribe_orders_algo(inst_type).await?;
570 }
571
572 if self.config.use_fills_channel
573 && let Err(e) = self.ws_client.subscribe_fills(inst_type).await
574 {
575 tracing::warn!("Failed to subscribe to fills channel ({inst_type:?}): {e}");
576 }
577 }
578
579 self.ws_client.subscribe_account().await?;
580
581 self.start_ws_stream()?;
582 self.refresh_account_state().await?;
583
584 self.connected = true;
585 tracing::info!("OKX execution client {} connected", self.core.client_id);
586
587 Ok(())
588 }
589
590 async fn disconnect(&mut self) -> anyhow::Result<()> {
591 if !self.connected {
592 return Ok(());
593 }
594
595 self.http_client.cancel_all_requests();
596 if let Err(e) = self.ws_client.close().await {
597 tracing::warn!("Error while closing OKX websocket: {e:?}");
598 }
599
600 if let Some(handle) = self.ws_stream_handle.take() {
601 handle.abort();
602 }
603
604 self.abort_pending_tasks();
605
606 self.connected = false;
607 tracing::info!("OKX execution client {} disconnected", self.core.client_id);
608 Ok(())
609 }
610
611 async fn generate_order_status_report(
612 &self,
613 cmd: &GenerateOrderStatusReport,
614 ) -> anyhow::Result<Option<OrderStatusReport>> {
615 let Some(instrument_id) = cmd.instrument_id else {
616 tracing::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
617 return Ok(None);
618 };
619
620 let mut reports = self
621 .http_client
622 .request_order_status_reports(
623 self.core.account_id,
624 None,
625 Some(instrument_id),
626 None,
627 None,
628 false,
629 None,
630 )
631 .await?;
632
633 if let Some(client_order_id) = cmd.client_order_id {
634 reports.retain(|report| report.client_order_id == Some(client_order_id));
635 }
636
637 if let Some(venue_order_id) = cmd.venue_order_id {
638 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
639 }
640
641 Ok(reports.into_iter().next())
642 }
643
644 async fn generate_order_status_reports(
645 &self,
646 cmd: &GenerateOrderStatusReport,
647 ) -> anyhow::Result<Vec<OrderStatusReport>> {
648 let mut reports = Vec::new();
649
650 if let Some(instrument_id) = cmd.instrument_id {
651 let mut fetched = self
652 .http_client
653 .request_order_status_reports(
654 self.core.account_id,
655 None,
656 Some(instrument_id),
657 None,
658 None,
659 false,
660 None,
661 )
662 .await?;
663 reports.append(&mut fetched);
664 } else {
665 for inst_type in self.instrument_types() {
666 let mut fetched = self
667 .http_client
668 .request_order_status_reports(
669 self.core.account_id,
670 Some(inst_type),
671 None,
672 None,
673 None,
674 false,
675 None,
676 )
677 .await?;
678 reports.append(&mut fetched);
679 }
680 }
681
682 if let Some(client_order_id) = cmd.client_order_id {
683 reports.retain(|report| report.client_order_id == Some(client_order_id));
684 }
685
686 if let Some(venue_order_id) = cmd.venue_order_id {
687 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
688 }
689
690 Ok(reports)
691 }
692
693 async fn generate_fill_reports(
694 &self,
695 cmd: GenerateFillReports,
696 ) -> anyhow::Result<Vec<FillReport>> {
697 let start_dt = nanos_to_datetime(cmd.start);
698 let end_dt = nanos_to_datetime(cmd.end);
699 let mut reports = Vec::new();
700
701 if let Some(instrument_id) = cmd.instrument_id {
702 let mut fetched = self
703 .http_client
704 .request_fill_reports(
705 self.core.account_id,
706 None,
707 Some(instrument_id),
708 start_dt,
709 end_dt,
710 None,
711 )
712 .await?;
713 reports.append(&mut fetched);
714 } else {
715 for inst_type in self.instrument_types() {
716 let mut fetched = self
717 .http_client
718 .request_fill_reports(
719 self.core.account_id,
720 Some(inst_type),
721 None,
722 start_dt,
723 end_dt,
724 None,
725 )
726 .await?;
727 reports.append(&mut fetched);
728 }
729 }
730
731 if let Some(venue_order_id) = cmd.venue_order_id {
732 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
733 }
734
735 Ok(reports)
736 }
737
738 async fn generate_position_status_reports(
739 &self,
740 cmd: &GeneratePositionReports,
741 ) -> anyhow::Result<Vec<PositionStatusReport>> {
742 let mut reports = Vec::new();
743
744 if let Some(instrument_id) = cmd.instrument_id {
745 let mut fetched = self
746 .http_client
747 .request_position_status_reports(self.core.account_id, None, Some(instrument_id))
748 .await?;
749 reports.append(&mut fetched);
750 } else {
751 for inst_type in self.instrument_types() {
752 let mut fetched = self
753 .http_client
754 .request_position_status_reports(self.core.account_id, Some(inst_type), None)
755 .await?;
756 reports.append(&mut fetched);
757 }
758 }
759
760 let _ = nanos_to_datetime(cmd.start);
761 let _ = nanos_to_datetime(cmd.end);
762
763 Ok(reports)
764 }
765
766 async fn generate_mass_status(
767 &self,
768 lookback_mins: Option<u64>,
769 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
770 tracing::warn!(
771 "generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})"
772 );
773 Ok(None)
774 }
775}
776
777impl LiveExecutionClientExt for OKXExecutionClient {
778 fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ExecutionEvent> {
779 get_exec_event_sender()
780 }
781
782 fn get_clock(&self) -> Ref<'_, dyn Clock> {
783 self.core.clock().borrow()
784 }
785}
786
787impl OKXExecutionClient {
788 fn start_ws_stream(&mut self) -> anyhow::Result<()> {
789 if self.ws_stream_handle.is_some() {
790 return Ok(());
791 }
792
793 let stream = self.ws_client.stream();
794 let runtime = get_runtime();
795 let handle = runtime.spawn(async move {
796 pin_mut!(stream);
797 while let Some(message) = stream.next().await {
798 dispatch_ws_message(message);
799 }
800 });
801
802 self.ws_stream_handle = Some(handle);
803 Ok(())
804 }
805}
806
807fn dispatch_ws_message(message: NautilusWsMessage) {
808 match message {
809 NautilusWsMessage::AccountUpdate(state) => dispatch_account_state(state),
810 NautilusWsMessage::ExecutionReports(reports) => {
811 for report in reports {
812 dispatch_execution_report(report);
813 }
814 }
815 NautilusWsMessage::OrderRejected(event) => {
816 dispatch_order_event(OrderEventAny::Rejected(event));
817 }
818 NautilusWsMessage::OrderCancelRejected(event) => {
819 dispatch_order_event(OrderEventAny::CancelRejected(event));
820 }
821 NautilusWsMessage::OrderModifyRejected(event) => {
822 dispatch_order_event(OrderEventAny::ModifyRejected(event));
823 }
824 NautilusWsMessage::Error(e) => {
825 tracing::warn!(
826 "OKX websocket error: code={} message={} conn_id={:?}",
827 e.code,
828 e.message,
829 e.conn_id
830 );
831 }
832 NautilusWsMessage::Reconnected => {
833 tracing::info!("OKX websocket reconnected");
834 }
835 NautilusWsMessage::Deltas(_)
836 | NautilusWsMessage::Raw(_)
837 | NautilusWsMessage::Data(_)
838 | NautilusWsMessage::FundingRates(_)
839 | NautilusWsMessage::Instrument(_) => {
840 tracing::debug!("Ignoring OKX websocket data message");
841 }
842 }
843}
844
845fn dispatch_account_state(state: AccountState) {
846 msgbus::send_any(
847 "Portfolio.update_account".into(),
848 &state as &dyn std::any::Any,
849 );
850}
851
852fn dispatch_execution_report(report: ExecutionReport) {
853 let sender = get_exec_event_sender();
854 match report {
855 ExecutionReport::Order(order_report) => {
856 let exec_report =
857 nautilus_common::messages::ExecutionReport::OrderStatus(Box::new(order_report));
858 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
859 tracing::warn!("Failed to send order status report: {e}");
860 }
861 }
862 ExecutionReport::Fill(fill_report) => {
863 let exec_report =
864 nautilus_common::messages::ExecutionReport::Fill(Box::new(fill_report));
865 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
866 tracing::warn!("Failed to send fill report: {e}");
867 }
868 }
869 }
870}
871
872fn dispatch_order_event(event: OrderEventAny) {
873 let sender = get_exec_event_sender();
874 if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
875 tracing::warn!("Failed to send order event: {e}");
876 }
877}
878
879fn nanos_to_datetime(value: Option<UnixNanos>) -> Option<DateTime<Utc>> {
880 value.map(|nanos| nanos.to_datetime_utc())
881}
882
883#[cfg(test)]
884mod tests {
885 use nautilus_common::messages::execution::{BatchCancelOrders, CancelOrder};
886 use nautilus_core::UnixNanos;
887 use nautilus_model::identifiers::{
888 ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId,
889 };
890
891 #[test]
892 fn test_batch_cancel_orders_builds_payload() {
893 use nautilus_model::identifiers::ClientId;
894
895 let trader_id = TraderId::from("TRADER-001");
896 let strategy_id = StrategyId::from("STRATEGY-001");
897 let client_id = ClientId::from("OKX");
898 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
899 let client_order_id1 = ClientOrderId::new("order1");
900 let client_order_id2 = ClientOrderId::new("order2");
901 let venue_order_id1 = VenueOrderId::new("venue1");
902 let venue_order_id2 = VenueOrderId::new("venue2");
903
904 let cmd = BatchCancelOrders {
905 trader_id,
906 client_id,
907 strategy_id,
908 instrument_id,
909 cancels: vec![
910 CancelOrder {
911 trader_id,
912 client_id,
913 strategy_id,
914 instrument_id,
915 client_order_id: client_order_id1,
916 venue_order_id: venue_order_id1,
917 command_id: Default::default(),
918 ts_init: UnixNanos::default(),
919 },
920 CancelOrder {
921 trader_id,
922 client_id,
923 strategy_id,
924 instrument_id,
925 client_order_id: client_order_id2,
926 venue_order_id: venue_order_id2,
927 command_id: Default::default(),
928 ts_init: UnixNanos::default(),
929 },
930 ],
931 command_id: Default::default(),
932 ts_init: UnixNanos::default(),
933 };
934
935 let mut payload = Vec::with_capacity(cmd.cancels.len());
937 for cancel in &cmd.cancels {
938 payload.push((
939 cancel.instrument_id,
940 Some(cancel.client_order_id),
941 Some(cancel.venue_order_id),
942 ));
943 }
944
945 assert_eq!(payload.len(), 2);
946 assert_eq!(payload[0].0, instrument_id);
947 assert_eq!(payload[0].1, Some(client_order_id1));
948 assert_eq!(payload[0].2, Some(venue_order_id1));
949 assert_eq!(payload[1].0, instrument_id);
950 assert_eq!(payload[1].1, Some(client_order_id2));
951 assert_eq!(payload[1].2, Some(venue_order_id2));
952 }
953
954 #[test]
955 fn test_batch_cancel_orders_with_empty_cancels() {
956 use nautilus_model::identifiers::ClientId;
957
958 let cmd = BatchCancelOrders {
959 trader_id: TraderId::from("TRADER-001"),
960 client_id: ClientId::from("OKX"),
961 strategy_id: StrategyId::from("STRATEGY-001"),
962 instrument_id: InstrumentId::from("BTC-USDT.OKX"),
963 cancels: vec![],
964 command_id: Default::default(),
965 ts_init: UnixNanos::default(),
966 };
967
968 let payload: Vec<(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>)> =
969 Vec::with_capacity(cmd.cancels.len());
970 assert_eq!(payload.len(), 0);
971 }
972}