1use std::{cell::Ref, future::Future, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use chrono::{DateTime, Utc};
23use futures_util::{StreamExt, pin_mut};
24use nautilus_common::{
25 clock::Clock, messages::ExecutionEvent, msgbus, runner::get_exec_event_sender,
26 runtime::get_runtime,
27};
28use nautilus_core::UnixNanos;
29use nautilus_execution::client::{ExecutionClient, LiveExecutionClient, base::ExecutionClientCore};
30use nautilus_live::execution::LiveExecutionClientExt;
31use nautilus_model::{
32 enums::{AccountType, OrderType},
33 events::{AccountState, OrderEventAny},
34 identifiers::InstrumentId,
35 orders::Order,
36 reports::ExecutionMassStatus,
37};
38use tokio::task::JoinHandle;
39
40use crate::{
41 common::{
42 consts::{OKX_CONDITIONAL_ORDER_TYPES, OKX_VENUE},
43 enums::{OKXInstrumentType, OKXTradeMode},
44 },
45 config::OKXExecClientConfig,
46 http::client::OKXHttpClient,
47 websocket::{
48 client::OKXWebSocketClient,
49 messages::{ExecutionReport, NautilusWsMessage},
50 },
51};
52
53#[derive(Debug)]
54pub struct OKXExecutionClient {
55 core: ExecutionClientCore,
56 config: OKXExecClientConfig,
57 http_client: OKXHttpClient,
58 ws_client: OKXWebSocketClient,
59 trade_mode: OKXTradeMode,
60 started: bool,
61 connected: bool,
62 instruments_initialized: bool,
63 ws_stream_handle: Option<JoinHandle<()>>,
64 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
65}
66
67impl OKXExecutionClient {
68 pub fn new(core: ExecutionClientCore, config: OKXExecClientConfig) -> anyhow::Result<Self> {
74 let http_client = if config.has_api_credentials() {
75 OKXHttpClient::with_credentials(
76 config.api_key.clone(),
77 config.api_secret.clone(),
78 config.api_passphrase.clone(),
79 config.base_url_http.clone(),
80 config.http_timeout_secs,
81 config.max_retries,
82 config.retry_delay_initial_ms,
83 config.retry_delay_max_ms,
84 config.is_demo,
85 )?
86 } else {
87 OKXHttpClient::new(
88 config.base_url_http.clone(),
89 config.http_timeout_secs,
90 config.max_retries,
91 config.retry_delay_initial_ms,
92 config.retry_delay_max_ms,
93 config.is_demo,
94 )?
95 };
96
97 let account_id = core.account_id;
98 let ws_client = OKXWebSocketClient::new(
99 Some(config.ws_private_url()),
100 config.api_key.clone(),
101 config.api_secret.clone(),
102 config.api_passphrase.clone(),
103 Some(account_id),
104 None,
105 )
106 .context("failed to construct OKX execution websocket client")?;
107
108 let trade_mode = match core.account_type {
109 AccountType::Cash => OKXTradeMode::Cash,
110 _ => OKXTradeMode::Isolated,
111 };
112
113 Ok(Self {
114 core,
115 config,
116 http_client,
117 ws_client,
118 trade_mode,
119 started: false,
120 connected: false,
121 instruments_initialized: false,
122 ws_stream_handle: None,
123 pending_tasks: Mutex::new(Vec::new()),
124 })
125 }
126
127 fn instrument_types(&self) -> Vec<OKXInstrumentType> {
128 if self.config.instrument_types.is_empty() {
129 vec![OKXInstrumentType::Spot]
130 } else {
131 self.config.instrument_types.clone()
132 }
133 }
134
135 async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
136 if self.instruments_initialized {
137 return Ok(());
138 }
139
140 let mut all_instruments = Vec::new();
141 for instrument_type in self.instrument_types() {
142 let instruments = self
143 .http_client
144 .request_instruments(instrument_type, None)
145 .await
146 .with_context(|| {
147 format!("failed to request OKX instruments for {instrument_type:?}")
148 })?;
149
150 if instruments.is_empty() {
151 tracing::warn!("No instruments returned for {instrument_type:?}");
152 continue;
153 }
154
155 self.http_client.add_instruments(instruments.clone());
156 all_instruments.extend(instruments);
157 }
158
159 if all_instruments.is_empty() {
160 tracing::warn!(
161 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
162 );
163 } else {
164 self.ws_client.initialize_instruments_cache(all_instruments);
165 }
166
167 self.instruments_initialized = true;
168 Ok(())
169 }
170
171 fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
172 if self.instruments_initialized {
173 return Ok(());
174 }
175
176 let runtime = get_runtime();
177 runtime.block_on(self.ensure_instruments_initialized_async())
178 }
179
180 async fn refresh_account_state(&self) -> anyhow::Result<()> {
181 let account_state = self
182 .http_client
183 .request_account_state(self.core.account_id)
184 .await
185 .context("failed to request OKX account state")?;
186
187 self.core.generate_account_state(
188 account_state.balances.clone(),
189 account_state.margins.clone(),
190 account_state.is_reported,
191 account_state.ts_event,
192 )
193 }
194
195 fn update_account_state(&self) -> anyhow::Result<()> {
196 let runtime = get_runtime();
197 runtime.block_on(self.refresh_account_state())
198 }
199
200 fn is_conditional_order(&self, order_type: OrderType) -> bool {
201 OKX_CONDITIONAL_ORDER_TYPES.contains(&order_type)
202 }
203
204 fn submit_regular_order(
205 &self,
206 cmd: &nautilus_common::messages::execution::SubmitOrder,
207 ) -> anyhow::Result<()> {
208 let order = cmd.order.clone();
209 let ws_client = self.ws_client.clone();
210 let trade_mode = self.trade_mode;
211
212 self.spawn_task("submit_order", async move {
213 ws_client
214 .submit_order(
215 order.trader_id(),
216 order.strategy_id(),
217 order.instrument_id(),
218 trade_mode,
219 order.client_order_id(),
220 order.order_side(),
221 order.order_type(),
222 order.quantity(),
223 Some(order.time_in_force()),
224 order.price(),
225 order.trigger_price(),
226 Some(order.is_post_only()),
227 Some(order.is_reduce_only()),
228 Some(order.is_quote_quantity()),
229 None,
230 )
231 .await?;
232 Ok(())
233 });
234
235 Ok(())
236 }
237
238 fn submit_conditional_order(
239 &self,
240 cmd: &nautilus_common::messages::execution::SubmitOrder,
241 ) -> anyhow::Result<()> {
242 let order = cmd.order.clone();
243 let trigger_price = order
244 .trigger_price()
245 .ok_or_else(|| anyhow::anyhow!("conditional order requires a trigger price"))?;
246 let http_client = self.http_client.clone();
247 let trade_mode = self.trade_mode;
248
249 self.spawn_task("submit_algo_order", async move {
250 http_client
251 .place_algo_order_with_domain_types(
252 order.instrument_id(),
253 trade_mode,
254 order.client_order_id(),
255 order.order_side(),
256 order.order_type(),
257 order.quantity(),
258 trigger_price,
259 order.trigger_type(),
260 order.price(),
261 Some(order.is_reduce_only()),
262 )
263 .await?;
264 Ok(())
265 });
266
267 Ok(())
268 }
269
270 fn cancel_ws_order(
271 &self,
272 cmd: &nautilus_common::messages::execution::CancelOrder,
273 ) -> anyhow::Result<()> {
274 let ws_client = self.ws_client.clone();
275 let command = cmd.clone();
276
277 self.spawn_task("cancel_order", async move {
278 ws_client
279 .cancel_order(
280 command.trader_id,
281 command.strategy_id,
282 command.instrument_id,
283 Some(command.client_order_id),
284 Some(command.venue_order_id),
285 )
286 .await?;
287 Ok(())
288 });
289
290 Ok(())
291 }
292
293 fn mass_cancel_instrument(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
294 let ws_client = self.ws_client.clone();
295 self.spawn_task("mass_cancel_orders", async move {
296 ws_client.mass_cancel_orders(instrument_id).await?;
297 Ok(())
298 });
299 Ok(())
300 }
301
302 fn spawn_task<F>(&self, description: &'static str, fut: F)
303 where
304 F: Future<Output = anyhow::Result<()>> + Send + 'static,
305 {
306 let runtime = get_runtime();
307 let handle = runtime.spawn(async move {
308 if let Err(err) = fut.await {
309 tracing::warn!("{description} failed: {err:?}");
310 }
311 });
312
313 let mut tasks = self.pending_tasks.lock().unwrap();
314 tasks.retain(|handle| !handle.is_finished());
315 tasks.push(handle);
316 }
317
318 fn abort_pending_tasks(&self) {
319 let mut tasks = self.pending_tasks.lock().unwrap();
320 for handle in tasks.drain(..) {
321 handle.abort();
322 }
323 }
324}
325
326impl ExecutionClient for OKXExecutionClient {
327 fn is_connected(&self) -> bool {
328 self.connected
329 }
330
331 fn client_id(&self) -> nautilus_model::identifiers::ClientId {
332 self.core.client_id
333 }
334
335 fn account_id(&self) -> nautilus_model::identifiers::AccountId {
336 self.core.account_id
337 }
338
339 fn venue(&self) -> nautilus_model::identifiers::Venue {
340 *OKX_VENUE
341 }
342
343 fn oms_type(&self) -> nautilus_model::enums::OmsType {
344 self.core.oms_type
345 }
346
347 fn get_account(&self) -> Option<nautilus_model::accounts::AccountAny> {
348 self.core.get_account()
349 }
350
351 fn generate_account_state(
352 &self,
353 balances: Vec<nautilus_model::types::AccountBalance>,
354 margins: Vec<nautilus_model::types::MarginBalance>,
355 reported: bool,
356 ts_event: nautilus_core::UnixNanos,
357 ) -> anyhow::Result<()> {
358 self.core
359 .generate_account_state(balances, margins, reported, ts_event)
360 }
361
362 fn start(&mut self) -> anyhow::Result<()> {
363 if self.started {
364 return Ok(());
365 }
366
367 self.ensure_instruments_initialized()?;
368 self.started = true;
369 tracing::info!(
370 client_id = %self.core.client_id,
371 account_id = %self.core.account_id,
372 account_type = ?self.core.account_type,
373 trade_mode = ?self.trade_mode,
374 instrument_types = ?self.config.instrument_types,
375 use_fills_channel = self.config.use_fills_channel,
376 is_demo = self.config.is_demo,
377 "OKX execution client started"
378 );
379 Ok(())
380 }
381
382 fn stop(&mut self) -> anyhow::Result<()> {
383 if !self.started {
384 return Ok(());
385 }
386
387 self.started = false;
388 self.connected = false;
389 if let Some(handle) = self.ws_stream_handle.take() {
390 handle.abort();
391 }
392 self.abort_pending_tasks();
393 tracing::info!("OKX execution client {} stopped", self.core.client_id);
394 Ok(())
395 }
396
397 fn submit_order(
398 &self,
399 cmd: &nautilus_common::messages::execution::SubmitOrder,
400 ) -> anyhow::Result<()> {
401 let order = &cmd.order;
402
403 if order.is_closed() {
404 tracing::warn!("Cannot submit closed order {}", order.client_order_id());
405 return Ok(());
406 }
407
408 self.core.generate_order_submitted(
409 order.strategy_id(),
410 order.instrument_id(),
411 order.client_order_id(),
412 cmd.ts_init,
413 );
414
415 let result = if self.is_conditional_order(order.order_type()) {
416 self.submit_conditional_order(cmd)
417 } else {
418 self.submit_regular_order(cmd)
419 };
420
421 if let Err(err) = result {
422 self.core.generate_order_rejected(
423 order.strategy_id(),
424 order.instrument_id(),
425 order.client_order_id(),
426 &format!("submit-order-error: {err}"),
427 cmd.ts_init,
428 false,
429 );
430 return Err(err);
431 }
432
433 Ok(())
434 }
435
436 fn submit_order_list(
437 &self,
438 cmd: &nautilus_common::messages::execution::SubmitOrderList,
439 ) -> anyhow::Result<()> {
440 tracing::warn!(
441 "submit_order_list not yet implemented for OKX execution client (got {} orders)",
442 cmd.order_list.orders.len()
443 );
444 Ok(())
445 }
446
447 fn modify_order(
448 &self,
449 cmd: &nautilus_common::messages::execution::ModifyOrder,
450 ) -> anyhow::Result<()> {
451 let ws_client = self.ws_client.clone();
452 let command = cmd.clone();
453
454 self.spawn_task("modify_order", async move {
455 ws_client
456 .modify_order(
457 command.trader_id,
458 command.strategy_id,
459 command.instrument_id,
460 Some(command.client_order_id),
461 command.price,
462 command.quantity,
463 Some(command.venue_order_id),
464 )
465 .await?;
466 Ok(())
467 });
468
469 Ok(())
470 }
471
472 fn cancel_order(
473 &self,
474 cmd: &nautilus_common::messages::execution::CancelOrder,
475 ) -> anyhow::Result<()> {
476 self.cancel_ws_order(cmd)
477 }
478
479 fn cancel_all_orders(
480 &self,
481 cmd: &nautilus_common::messages::execution::CancelAllOrders,
482 ) -> anyhow::Result<()> {
483 self.mass_cancel_instrument(cmd.instrument_id)
484 }
485
486 fn batch_cancel_orders(
487 &self,
488 cmd: &nautilus_common::messages::execution::BatchCancelOrders,
489 ) -> anyhow::Result<()> {
490 let mut payload = Vec::with_capacity(cmd.cancels.len());
491
492 for cancel in &cmd.cancels {
493 payload.push((
494 OKXInstrumentType::Spot, cancel.instrument_id,
496 Some(cancel.client_order_id),
497 Some(cancel.venue_order_id.as_str().to_string()),
498 ));
499 }
500
501 let ws_client = self.ws_client.clone();
502 self.spawn_task("batch_cancel_orders", async move {
503 ws_client.batch_cancel_orders(payload).await?;
504 Ok(())
505 });
506
507 Ok(())
508 }
509
510 fn query_account(
511 &self,
512 _cmd: &nautilus_common::messages::execution::QueryAccount,
513 ) -> anyhow::Result<()> {
514 self.update_account_state()
515 }
516
517 fn query_order(
518 &self,
519 cmd: &nautilus_common::messages::execution::QueryOrder,
520 ) -> anyhow::Result<()> {
521 tracing::debug!(
522 "query_order not implemented for OKX execution client (client_order_id={})",
523 cmd.client_order_id
524 );
525 Ok(())
526 }
527}
528
529#[async_trait(?Send)]
530impl LiveExecutionClient for OKXExecutionClient {
531 async fn connect(&mut self) -> anyhow::Result<()> {
532 if self.connected {
533 return Ok(());
534 }
535
536 self.ensure_instruments_initialized_async().await?;
537
538 if let Ok(Some(vip_level)) = self.http_client.request_vip_level().await {
540 self.ws_client.set_vip_level(vip_level);
541 }
542
543 self.ws_client.connect().await?;
544 self.ws_client.wait_until_active(10.0).await?;
545
546 for inst_type in self.instrument_types() {
547 tracing::info!(
548 "Subscribing to orders channel for instrument type: {:?}",
549 inst_type
550 );
551 self.ws_client.subscribe_orders(inst_type).await?;
552
553 if inst_type != OKXInstrumentType::Option {
555 self.ws_client.subscribe_orders_algo(inst_type).await?;
556 }
557
558 if self.config.use_fills_channel
559 && let Err(err) = self.ws_client.subscribe_fills(inst_type).await
560 {
561 tracing::warn!("Failed to subscribe to fills channel ({inst_type:?}): {err}");
562 }
563 }
564
565 self.ws_client.subscribe_account().await?;
566
567 self.start_ws_stream()?;
568 self.refresh_account_state().await?;
569
570 self.connected = true;
571 tracing::info!("OKX execution client {} connected", self.core.client_id);
572 Ok(())
573 }
574
575 async fn disconnect(&mut self) -> anyhow::Result<()> {
576 if !self.connected {
577 return Ok(());
578 }
579
580 self.http_client.cancel_all_requests();
581 if let Err(err) = self.ws_client.close().await {
582 tracing::warn!("Error while closing OKX websocket: {err:?}");
583 }
584
585 if let Some(handle) = self.ws_stream_handle.take() {
586 handle.abort();
587 }
588
589 self.abort_pending_tasks();
590
591 self.connected = false;
592 tracing::info!("OKX execution client {} disconnected", self.core.client_id);
593 Ok(())
594 }
595
596 async fn generate_order_status_report(
597 &self,
598 cmd: &nautilus_common::messages::execution::GenerateOrderStatusReport,
599 ) -> anyhow::Result<Option<nautilus_model::reports::OrderStatusReport>> {
600 let Some(instrument_id) = cmd.instrument_id else {
601 tracing::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
602 return Ok(None);
603 };
604
605 let mut reports = self
606 .http_client
607 .request_order_status_reports(
608 self.core.account_id,
609 None,
610 Some(instrument_id),
611 None,
612 None,
613 false,
614 None,
615 )
616 .await?;
617
618 if let Some(client_order_id) = cmd.client_order_id {
619 reports.retain(|report| report.client_order_id == Some(client_order_id));
620 }
621
622 if let Some(venue_order_id) = cmd.venue_order_id {
623 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
624 }
625
626 Ok(reports.into_iter().next())
627 }
628
629 async fn generate_order_status_reports(
630 &self,
631 cmd: &nautilus_common::messages::execution::GenerateOrderStatusReport,
632 ) -> anyhow::Result<Vec<nautilus_model::reports::OrderStatusReport>> {
633 let mut reports = Vec::new();
634
635 if let Some(instrument_id) = cmd.instrument_id {
636 let mut fetched = self
637 .http_client
638 .request_order_status_reports(
639 self.core.account_id,
640 None,
641 Some(instrument_id),
642 None,
643 None,
644 false,
645 None,
646 )
647 .await?;
648 reports.append(&mut fetched);
649 } else {
650 for inst_type in self.instrument_types() {
651 let mut fetched = self
652 .http_client
653 .request_order_status_reports(
654 self.core.account_id,
655 Some(inst_type),
656 None,
657 None,
658 None,
659 false,
660 None,
661 )
662 .await?;
663 reports.append(&mut fetched);
664 }
665 }
666
667 if let Some(client_order_id) = cmd.client_order_id {
668 reports.retain(|report| report.client_order_id == Some(client_order_id));
669 }
670
671 if let Some(venue_order_id) = cmd.venue_order_id {
672 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
673 }
674
675 Ok(reports)
676 }
677
678 async fn generate_fill_reports(
679 &self,
680 cmd: nautilus_common::messages::execution::GenerateFillReports,
681 ) -> anyhow::Result<Vec<nautilus_model::reports::FillReport>> {
682 let start_dt = nanos_to_datetime(cmd.start);
683 let end_dt = nanos_to_datetime(cmd.end);
684 let mut reports = Vec::new();
685
686 if let Some(instrument_id) = cmd.instrument_id {
687 let mut fetched = self
688 .http_client
689 .request_fill_reports(
690 self.core.account_id,
691 None,
692 Some(instrument_id),
693 start_dt,
694 end_dt,
695 None,
696 )
697 .await?;
698 reports.append(&mut fetched);
699 } else {
700 for inst_type in self.instrument_types() {
701 let mut fetched = self
702 .http_client
703 .request_fill_reports(
704 self.core.account_id,
705 Some(inst_type),
706 None,
707 start_dt,
708 end_dt,
709 None,
710 )
711 .await?;
712 reports.append(&mut fetched);
713 }
714 }
715
716 if let Some(venue_order_id) = cmd.venue_order_id {
717 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
718 }
719
720 Ok(reports)
721 }
722
723 async fn generate_position_status_reports(
724 &self,
725 cmd: &nautilus_common::messages::execution::GeneratePositionReports,
726 ) -> anyhow::Result<Vec<nautilus_model::reports::PositionStatusReport>> {
727 let mut reports = Vec::new();
728
729 if let Some(instrument_id) = cmd.instrument_id {
730 let mut fetched = self
731 .http_client
732 .request_position_status_reports(self.core.account_id, None, Some(instrument_id))
733 .await?;
734 reports.append(&mut fetched);
735 } else {
736 for inst_type in self.instrument_types() {
737 let mut fetched = self
738 .http_client
739 .request_position_status_reports(self.core.account_id, Some(inst_type), None)
740 .await?;
741 reports.append(&mut fetched);
742 }
743 }
744
745 let _ = nanos_to_datetime(cmd.start);
746 let _ = nanos_to_datetime(cmd.end);
747
748 Ok(reports)
749 }
750
751 async fn generate_mass_status(
752 &self,
753 lookback_mins: Option<u64>,
754 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
755 tracing::warn!(
756 "generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})"
757 );
758 Ok(None)
759 }
760}
761
762impl LiveExecutionClientExt for OKXExecutionClient {
763 fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ExecutionEvent> {
764 get_exec_event_sender()
765 }
766
767 fn get_clock(&self) -> Ref<'_, dyn Clock> {
768 self.core.clock().borrow()
769 }
770}
771
772impl OKXExecutionClient {
773 fn start_ws_stream(&mut self) -> anyhow::Result<()> {
774 if self.ws_stream_handle.is_some() {
775 return Ok(());
776 }
777
778 let stream = self.ws_client.stream();
779 let runtime = get_runtime();
780 let handle = runtime.spawn(async move {
781 pin_mut!(stream);
782 while let Some(message) = stream.next().await {
783 dispatch_ws_message(message);
784 }
785 });
786
787 self.ws_stream_handle = Some(handle);
788 Ok(())
789 }
790}
791
792fn dispatch_ws_message(message: NautilusWsMessage) {
793 match message {
794 NautilusWsMessage::AccountUpdate(state) => dispatch_account_state(state),
795 NautilusWsMessage::ExecutionReports(reports) => {
796 for report in reports {
797 dispatch_execution_report(report);
798 }
799 }
800 NautilusWsMessage::OrderRejected(event) => {
801 dispatch_order_event(OrderEventAny::Rejected(event));
802 }
803 NautilusWsMessage::OrderCancelRejected(event) => {
804 dispatch_order_event(OrderEventAny::CancelRejected(event));
805 }
806 NautilusWsMessage::OrderModifyRejected(event) => {
807 dispatch_order_event(OrderEventAny::ModifyRejected(event));
808 }
809 NautilusWsMessage::Error(err) => {
810 tracing::warn!(
811 "OKX websocket error: code={} message={} conn_id={:?}",
812 err.code,
813 err.message,
814 err.conn_id
815 );
816 }
817 NautilusWsMessage::Reconnected => {
818 tracing::info!("OKX websocket reconnected");
819 }
820 NautilusWsMessage::Deltas(_)
821 | NautilusWsMessage::Raw(_)
822 | NautilusWsMessage::Data(_)
823 | NautilusWsMessage::FundingRates(_)
824 | NautilusWsMessage::Instrument(_) => {
825 tracing::debug!("Ignoring OKX websocket data message");
826 }
827 }
828}
829
830fn dispatch_account_state(state: AccountState) {
831 msgbus::send_any(
832 "Portfolio.update_account".into(),
833 &state as &dyn std::any::Any,
834 );
835}
836
837fn dispatch_execution_report(report: ExecutionReport) {
838 let sender = get_exec_event_sender();
839 match report {
840 ExecutionReport::Order(order_report) => {
841 let exec_report =
842 nautilus_common::messages::ExecutionReport::OrderStatus(Box::new(order_report));
843 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
844 tracing::warn!("Failed to send order status report: {e}");
845 }
846 }
847 ExecutionReport::Fill(fill_report) => {
848 let exec_report =
849 nautilus_common::messages::ExecutionReport::Fill(Box::new(fill_report));
850 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
851 tracing::warn!("Failed to send fill report: {e}");
852 }
853 }
854 }
855}
856
857fn dispatch_order_event(event: OrderEventAny) {
858 let sender = get_exec_event_sender();
859 if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
860 tracing::warn!("Failed to send order event: {e}");
861 }
862}
863
864fn nanos_to_datetime(value: Option<UnixNanos>) -> Option<DateTime<Utc>> {
865 value.map(|nanos| nanos.to_datetime_utc())
866}