1pub mod canceller;
19pub mod submitter;
20
21use std::{any::Any, future::Future, sync::Mutex};
22
23use anyhow::Context;
24use async_trait::async_trait;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27 messages::{
28 ExecutionEvent,
29 execution::{
30 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
31 GenerateOrderStatusReport, GeneratePositionReports, ModifyOrder, QueryAccount,
32 QueryOrder, SubmitOrder, SubmitOrderList,
33 },
34 },
35 msgbus,
36 runner::get_exec_event_sender,
37 runtime::get_runtime,
38};
39use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
40use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
41use nautilus_live::execution::client::LiveExecutionClient;
42use nautilus_model::{
43 events::{AccountState, OrderEventAny, OrderRejected},
44 identifiers::{AccountId, VenueOrderId},
45 instruments::Instrument,
46 orders::Order,
47 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48};
49use tokio::task::JoinHandle;
50
51use crate::{
52 config::BitmexExecClientConfig,
53 execution::{
54 canceller::{CancelBroadcaster, CancelBroadcasterConfig},
55 submitter::{SubmitBroadcaster, SubmitBroadcasterConfig},
56 },
57 http::client::BitmexHttpClient,
58 websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
59};
60
61#[derive(Debug)]
62pub struct BitmexExecutionClient {
63 core: ExecutionClientCore,
64 config: BitmexExecClientConfig,
65 http_client: BitmexHttpClient,
66 ws_client: BitmexWebSocketClient,
67 _submitter: SubmitBroadcaster,
68 _canceller: CancelBroadcaster,
69 started: bool,
70 connected: bool,
71 instruments_initialized: bool,
72 ws_stream_handle: Option<JoinHandle<()>>,
73 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
74}
75
76impl BitmexExecutionClient {
77 pub fn new(core: ExecutionClientCore, config: BitmexExecClientConfig) -> anyhow::Result<Self> {
83 if !config.has_api_credentials() {
84 anyhow::bail!("BitMEX execution client requires API key and secret");
85 }
86
87 let http_client = BitmexHttpClient::new(
88 Some(config.http_base_url()),
89 config.api_key.clone(),
90 config.api_secret.clone(),
91 config.use_testnet,
92 config.http_timeout_secs,
93 config.max_retries,
94 config.retry_delay_initial_ms,
95 config.retry_delay_max_ms,
96 config.recv_window_ms,
97 config.max_requests_per_second,
98 config.max_requests_per_minute,
99 config.http_proxy_url.clone(),
100 )
101 .context("failed to construct BitMEX HTTP client")?;
102
103 let account_id = config.account_id.unwrap_or(core.account_id);
104 let ws_client = BitmexWebSocketClient::new(
105 Some(config.ws_url()),
106 config.api_key.clone(),
107 config.api_secret.clone(),
108 Some(account_id),
109 config.heartbeat_interval_secs,
110 )
111 .context("failed to construct BitMEX execution websocket client")?;
112
113 let pool_size = config.submitter_pool_size.unwrap_or(1);
114 let submitter_proxy_urls = match &config.submitter_proxy_urls {
115 Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
116 None => vec![config.http_proxy_url.clone(); pool_size],
117 };
118
119 let submitter_config = SubmitBroadcasterConfig {
120 pool_size,
121 api_key: config.api_key.clone(),
122 api_secret: config.api_secret.clone(),
123 base_url: config.base_url_http.clone(),
124 testnet: config.use_testnet,
125 timeout_secs: config.http_timeout_secs,
126 max_retries: config.max_retries,
127 retry_delay_ms: config.retry_delay_initial_ms,
128 retry_delay_max_ms: config.retry_delay_max_ms,
129 recv_window_ms: config.recv_window_ms,
130 max_requests_per_second: config.max_requests_per_second,
131 max_requests_per_minute: config.max_requests_per_minute,
132 proxy_urls: submitter_proxy_urls,
133 ..Default::default()
134 };
135
136 let _submitter = SubmitBroadcaster::new(submitter_config)
137 .context("failed to create SubmitBroadcaster")?;
138
139 let canceller_pool_size = config.canceller_pool_size.unwrap_or(1);
140 let canceller_proxy_urls = match &config.canceller_proxy_urls {
141 Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
142 None => vec![config.http_proxy_url.clone(); canceller_pool_size],
143 };
144
145 let canceller_config = CancelBroadcasterConfig {
146 pool_size: canceller_pool_size,
147 api_key: config.api_key.clone(),
148 api_secret: config.api_secret.clone(),
149 base_url: config.base_url_http.clone(),
150 testnet: config.use_testnet,
151 timeout_secs: config.http_timeout_secs,
152 max_retries: config.max_retries,
153 retry_delay_ms: config.retry_delay_initial_ms,
154 retry_delay_max_ms: config.retry_delay_max_ms,
155 recv_window_ms: config.recv_window_ms,
156 max_requests_per_second: config.max_requests_per_second,
157 max_requests_per_minute: config.max_requests_per_minute,
158 proxy_urls: canceller_proxy_urls,
159 ..Default::default()
160 };
161
162 let _canceller = CancelBroadcaster::new(canceller_config)
163 .context("failed to create CancelBroadcaster")?;
164
165 Ok(Self {
166 core,
167 config,
168 http_client,
169 ws_client,
170 _submitter,
171 _canceller,
172 started: false,
173 connected: false,
174 instruments_initialized: false,
175 ws_stream_handle: None,
176 pending_tasks: Mutex::new(Vec::new()),
177 })
178 }
179
180 fn spawn_task<F>(&self, label: &'static str, fut: F)
181 where
182 F: Future<Output = anyhow::Result<()>> + Send + 'static,
183 {
184 let handle = tokio::spawn(async move {
185 if let Err(e) = fut.await {
186 tracing::error!("{label}: {e:?}");
187 }
188 });
189
190 self.pending_tasks
191 .lock()
192 .expect("pending task lock poisoned")
193 .push(handle);
194 }
195
196 fn abort_pending_tasks(&self) {
197 let mut guard = self
198 .pending_tasks
199 .lock()
200 .expect("pending task lock poisoned");
201 for handle in guard.drain(..) {
202 handle.abort();
203 }
204 }
205
206 async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
207 if self.instruments_initialized {
208 return Ok(());
209 }
210
211 let http = self.http_client.clone();
212 let mut instruments = http
213 .request_instruments(self.config.active_only)
214 .await
215 .context("failed to request BitMEX instruments")?;
216
217 instruments.sort_by_key(|instrument| instrument.id());
218
219 for instrument in &instruments {
220 self.http_client.cache_instrument(instrument.clone());
221 self._submitter.cache_instrument(instrument.clone());
222 self._canceller.cache_instrument(instrument.clone());
223 }
224
225 self.ws_client.cache_instruments(instruments);
226
227 self.instruments_initialized = true;
228 Ok(())
229 }
230
231 fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
232 if self.instruments_initialized {
233 return Ok(());
234 }
235
236 let runtime = get_runtime();
237 runtime.block_on(self.ensure_instruments_initialized_async())
238 }
239
240 async fn refresh_account_state(&self) -> anyhow::Result<()> {
241 let account_state = self
242 .http_client
243 .request_account_state(self.core.account_id)
244 .await
245 .context("failed to request BitMEX account state")?;
246
247 dispatch_account_state(account_state);
248 Ok(())
249 }
250
251 fn update_account_state(&self) -> anyhow::Result<()> {
252 let runtime = get_runtime();
253 runtime.block_on(self.refresh_account_state())
254 }
255
256 fn start_ws_stream(&mut self) -> anyhow::Result<()> {
257 if self.ws_stream_handle.is_some() {
258 return Ok(());
259 }
260
261 let stream = self.ws_client.stream();
262
263 let handle = tokio::spawn(async move {
264 pin_mut!(stream);
265 while let Some(message) = stream.next().await {
266 dispatch_ws_message(message);
267 }
268 });
269
270 self.ws_stream_handle = Some(handle);
271 Ok(())
272 }
273}
274
275impl ExecutionClient for BitmexExecutionClient {
276 fn is_connected(&self) -> bool {
277 self.connected
278 }
279
280 fn client_id(&self) -> nautilus_model::identifiers::ClientId {
281 self.core.client_id
282 }
283
284 fn account_id(&self) -> AccountId {
285 self.core.account_id
286 }
287
288 fn venue(&self) -> nautilus_model::identifiers::Venue {
289 self.core.venue
290 }
291
292 fn oms_type(&self) -> nautilus_model::enums::OmsType {
293 self.core.oms_type
294 }
295
296 fn get_account(&self) -> Option<nautilus_model::accounts::AccountAny> {
297 self.core.get_account()
298 }
299
300 fn generate_account_state(
301 &self,
302 balances: Vec<nautilus_model::types::AccountBalance>,
303 margins: Vec<nautilus_model::types::MarginBalance>,
304 reported: bool,
305 ts_event: UnixNanos,
306 ) -> anyhow::Result<()> {
307 self.core
308 .generate_account_state(balances, margins, reported, ts_event)
309 }
310
311 fn start(&mut self) -> anyhow::Result<()> {
312 if self.started {
313 return Ok(());
314 }
315
316 self.ensure_instruments_initialized()?;
317 self.started = true;
318 tracing::info!(
319 client_id = %self.core.client_id,
320 account_id = %self.core.account_id,
321 use_testnet = self.config.use_testnet,
322 submitter_pool_size = ?self.config.submitter_pool_size,
323 canceller_pool_size = ?self.config.canceller_pool_size,
324 http_proxy_url = ?self.config.http_proxy_url,
325 ws_proxy_url = ?self.config.ws_proxy_url,
326 submitter_proxy_urls = ?self.config.submitter_proxy_urls,
327 canceller_proxy_urls = ?self.config.canceller_proxy_urls,
328 "BitMEX execution client started"
329 );
330 Ok(())
331 }
332
333 fn stop(&mut self) -> anyhow::Result<()> {
334 if !self.started {
335 return Ok(());
336 }
337
338 self.started = false;
339 self.connected = false;
340 if let Some(handle) = self.ws_stream_handle.take() {
341 handle.abort();
342 }
343 self.abort_pending_tasks();
344 tracing::info!("BitMEX execution client {} stopped", self.core.client_id);
345 Ok(())
346 }
347
348 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
349 let order = cmd.order.clone();
350
351 if order.is_closed() {
352 tracing::warn!("Cannot submit closed order {}", order.client_order_id());
353 return Ok(());
354 }
355
356 self.core.generate_order_submitted(
357 order.strategy_id(),
358 order.instrument_id(),
359 order.client_order_id(),
360 cmd.ts_init,
361 );
362
363 let submit_tries = cmd
364 .params
365 .as_ref()
366 .and_then(|params| params.get("submit_tries"))
367 .and_then(|s| s.parse::<usize>().ok())
368 .filter(|&n| n > 0);
369
370 let use_broadcaster = submit_tries.is_some_and(|n| n > 1);
371
372 let http_client = self.http_client.clone();
373 let submitter = self._submitter.clone_for_async();
374 let trader_id = self.core.trader_id;
375 let strategy_id = order.strategy_id();
376 let instrument_id = order.instrument_id();
377 let account_id = self.core.account_id;
378 let client_order_id = order.client_order_id();
379 let order_side = order.order_side();
380 let order_type = order.order_type();
381 let quantity = order.quantity();
382 let time_in_force = order.time_in_force();
383 let price = order.price();
384 let trigger_price = order.trigger_price();
385 let trigger_type = order.trigger_type();
386 let display_qty = order.display_qty();
387 let post_only = order.is_post_only();
388 let reduce_only = order.is_reduce_only();
389 let order_list_id = order.order_list_id();
390 let contingency_type = order.contingency_type();
391 let ts_event = cmd.ts_init;
392
393 self.spawn_task("submit_order", async move {
394 let result = if use_broadcaster {
395 submitter
396 .broadcast_submit(
397 instrument_id,
398 client_order_id,
399 order_side,
400 order_type,
401 quantity,
402 time_in_force,
403 price,
404 trigger_price,
405 trigger_type,
406 display_qty,
407 post_only,
408 reduce_only,
409 order_list_id,
410 contingency_type,
411 submit_tries,
412 )
413 .await
414 } else {
415 http_client
416 .submit_order(
417 instrument_id,
418 client_order_id,
419 order_side,
420 order_type,
421 quantity,
422 time_in_force,
423 price,
424 trigger_price,
425 trigger_type,
426 display_qty,
427 post_only,
428 reduce_only,
429 order_list_id,
430 contingency_type,
431 )
432 .await
433 };
434
435 match result {
436 Ok(report) => dispatch_order_status_report(report),
437 Err(e) => {
438 let event = OrderRejected::new(
439 trader_id,
440 strategy_id,
441 instrument_id,
442 client_order_id,
443 account_id,
444 format!("submit-order-error: {e}").into(),
445 UUID4::new(),
446 ts_event,
447 get_atomic_clock_realtime().get_time_ns(),
448 false,
449 post_only,
450 );
451 dispatch_order_event(OrderEventAny::Rejected(event));
452 }
453 }
454 Ok(())
455 });
456
457 Ok(())
458 }
459
460 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
461 tracing::warn!(
462 "submit_order_list not yet implemented for BitMEX execution client ({} orders)",
463 cmd.order_list.orders.len()
464 );
465 Ok(())
466 }
467
468 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
469 let http_client = self.http_client.clone();
470 let instrument_id = cmd.instrument_id;
471 let client_order_id = Some(cmd.client_order_id);
472 let venue_order_id = Some(cmd.venue_order_id);
473 let quantity = cmd.quantity;
474 let price = cmd.price;
475 let trigger_price = cmd.trigger_price;
476
477 self.spawn_task("modify_order", async move {
478 match http_client
479 .modify_order(
480 instrument_id,
481 client_order_id,
482 venue_order_id,
483 quantity,
484 price,
485 trigger_price,
486 )
487 .await
488 {
489 Ok(report) => dispatch_order_status_report(report),
490 Err(e) => tracing::error!("BitMEX modify order failed: {e:?}"),
491 }
492 Ok(())
493 });
494
495 Ok(())
496 }
497
498 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
499 let canceller = self._canceller.clone_for_async();
500 let instrument_id = cmd.instrument_id;
501 let client_order_id = Some(cmd.client_order_id);
502 let venue_order_id = Some(cmd.venue_order_id);
503
504 self.spawn_task("cancel_order", async move {
505 match canceller
506 .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
507 .await
508 {
509 Ok(Some(report)) => dispatch_order_status_report(report),
510 Ok(None) => {
511 tracing::debug!("Order already cancelled: {:?}", client_order_id);
513 }
514 Err(e) => tracing::error!("BitMEX cancel order failed: {e:?}"),
515 }
516 Ok(())
517 });
518
519 Ok(())
520 }
521
522 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
523 let canceller = self._canceller.clone_for_async();
524 let instrument_id = cmd.instrument_id;
525 let order_side = Some(cmd.order_side);
526
527 self.spawn_task("cancel_all_orders", async move {
528 match canceller
529 .broadcast_cancel_all(instrument_id, order_side)
530 .await
531 {
532 Ok(reports) => {
533 for report in reports {
534 dispatch_order_status_report(report);
535 }
536 }
537 Err(e) => tracing::error!("BitMEX cancel all failed: {e:?}"),
538 }
539 Ok(())
540 });
541
542 Ok(())
543 }
544
545 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
546 let canceller = self._canceller.clone_for_async();
547 let instrument_id = cmd.instrument_id;
548 let venue_ids: Vec<VenueOrderId> = cmd
549 .cancels
550 .iter()
551 .map(|cancel| cancel.venue_order_id)
552 .collect();
553
554 self.spawn_task("batch_cancel_orders", async move {
555 match canceller
556 .broadcast_batch_cancel(instrument_id, None, Some(venue_ids))
557 .await
558 {
559 Ok(reports) => {
560 for report in reports {
561 dispatch_order_status_report(report);
562 }
563 }
564 Err(e) => tracing::error!("BitMEX batch cancel failed: {e:?}"),
565 }
566 Ok(())
567 });
568
569 Ok(())
570 }
571
572 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
573 self.update_account_state()
574 }
575
576 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
577 let http_client = self.http_client.clone();
578 let instrument_id = cmd.instrument_id;
579 let client_order_id = Some(cmd.client_order_id);
580 let venue_order_id = Some(cmd.venue_order_id);
581
582 self.spawn_task("query_order", async move {
583 match http_client
584 .request_order_status_report(instrument_id, client_order_id, venue_order_id)
585 .await
586 {
587 Ok(report) => dispatch_order_status_report(report),
588 Err(e) => tracing::error!("BitMEX query order failed: {e:?}"),
589 }
590 Ok(())
591 });
592
593 Ok(())
594 }
595}
596
597#[async_trait(?Send)]
598impl LiveExecutionClient for BitmexExecutionClient {
599 async fn connect(&mut self) -> anyhow::Result<()> {
600 if self.connected {
601 return Ok(());
602 }
603
604 self.ensure_instruments_initialized_async().await?;
605
606 self._submitter.start().await?;
607 self._canceller.start().await?;
608
609 self.ws_client.connect().await?;
610 self.ws_client.wait_until_active(10.0).await?;
611
612 self.ws_client.subscribe_orders().await?;
613 self.ws_client.subscribe_executions().await?;
614 self.ws_client.subscribe_positions().await?;
615 self.ws_client.subscribe_wallet().await?;
616 if let Err(e) = self.ws_client.subscribe_margin().await {
617 tracing::debug!("Margin subscription unavailable: {e:?}");
618 }
619
620 self.start_ws_stream()?;
621 self.refresh_account_state().await?;
622
623 self.connected = true;
624 self.core.set_connected(true);
625 tracing::info!(client_id = %self.core.client_id, "Connected");
626 Ok(())
627 }
628
629 async fn disconnect(&mut self) -> anyhow::Result<()> {
630 if !self.connected {
631 return Ok(());
632 }
633
634 self.http_client.cancel_all_requests();
635 self._submitter.stop().await;
636 self._canceller.stop().await;
637
638 if let Err(e) = self.ws_client.close().await {
639 tracing::warn!("Error while closing BitMEX execution websocket: {e:?}");
640 }
641
642 if let Some(handle) = self.ws_stream_handle.take() {
643 handle.abort();
644 }
645
646 self.abort_pending_tasks();
647 self.connected = false;
648 self.core.set_connected(false);
649 tracing::info!(client_id = %self.core.client_id, "Disconnected");
650 Ok(())
651 }
652
653 fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ExecutionEvent> {
654 get_exec_event_sender()
655 }
656
657 fn get_clock(&self) -> std::cell::Ref<'_, dyn nautilus_common::clock::Clock> {
658 self.core.clock().borrow()
659 }
660
661 async fn generate_order_status_report(
662 &self,
663 cmd: &GenerateOrderStatusReport,
664 ) -> anyhow::Result<Option<OrderStatusReport>> {
665 let instrument_id = cmd
666 .instrument_id
667 .context("BitMEX generate_order_status_report requires an instrument identifier")?;
668
669 self.http_client
670 .query_order(
671 instrument_id,
672 cmd.client_order_id,
673 cmd.venue_order_id.map(|id| VenueOrderId::from(id.as_str())),
674 )
675 .await
676 .context("failed to query BitMEX order status")
677 }
678
679 async fn generate_order_status_reports(
680 &self,
681 cmd: &GenerateOrderStatusReport,
682 ) -> anyhow::Result<Vec<OrderStatusReport>> {
683 let reports = self
684 .http_client
685 .request_order_status_reports(cmd.instrument_id, false, None)
686 .await
687 .context("failed to request BitMEX order status reports")?;
688 Ok(reports)
689 }
690
691 async fn generate_fill_reports(
692 &self,
693 cmd: GenerateFillReports,
694 ) -> anyhow::Result<Vec<FillReport>> {
695 let mut reports = self
696 .http_client
697 .request_fill_reports(cmd.instrument_id, None)
698 .await
699 .context("failed to request BitMEX fill reports")?;
700
701 if let Some(order_id) = cmd.venue_order_id {
702 reports.retain(|report| report.venue_order_id.as_str() == order_id.as_str());
703 }
704
705 Ok(reports)
706 }
707
708 async fn generate_position_status_reports(
709 &self,
710 cmd: &GeneratePositionReports,
711 ) -> anyhow::Result<Vec<PositionStatusReport>> {
712 let mut reports = self
713 .http_client
714 .request_position_status_reports()
715 .await
716 .context("failed to request BitMEX position reports")?;
717
718 if let Some(instrument_id) = cmd.instrument_id {
719 reports.retain(|report| report.instrument_id == instrument_id);
720 }
721
722 Ok(reports)
723 }
724
725 async fn generate_mass_status(
726 &self,
727 _lookback_mins: Option<u64>,
728 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
729 tracing::warn!("generate_mass_status not yet implemented for BitMEX execution client");
730 Ok(None)
731 }
732}
733
734fn dispatch_ws_message(message: NautilusWsMessage) {
735 match message {
736 NautilusWsMessage::OrderStatusReports(reports) => {
737 for report in reports {
738 dispatch_order_status_report(report);
739 }
740 }
741 NautilusWsMessage::FillReports(reports) => {
742 for report in reports {
743 dispatch_fill_report(report);
744 }
745 }
746 NautilusWsMessage::PositionStatusReport(report) => {
747 dispatch_position_status_report(report);
748 }
749 NautilusWsMessage::AccountState(state) => dispatch_account_state(state),
750 NautilusWsMessage::OrderUpdated(event) => {
751 dispatch_order_event(OrderEventAny::Updated(event));
752 }
753 NautilusWsMessage::Data(_)
754 | NautilusWsMessage::Instruments(_)
755 | NautilusWsMessage::FundingRateUpdates(_) => {
756 tracing::debug!("Ignoring BitMEX data message on execution stream");
757 }
758 NautilusWsMessage::Reconnected => {
759 tracing::info!("BitMEX execution websocket reconnected");
760 }
761 NautilusWsMessage::Authenticated => {
762 tracing::debug!("BitMEX execution websocket authenticated");
763 }
764 }
765}
766
767fn dispatch_account_state(state: AccountState) {
768 msgbus::send_any("Portfolio.update_account".into(), &state as &dyn Any);
769}
770
771fn dispatch_order_status_report(report: OrderStatusReport) {
772 let sender = get_exec_event_sender();
773 let exec_report = nautilus_common::messages::ExecutionReport::OrderStatus(Box::new(report));
774 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
775 tracing::warn!("Failed to send order status report: {e}");
776 }
777}
778
779fn dispatch_fill_report(report: FillReport) {
780 let sender = get_exec_event_sender();
781 let exec_report = nautilus_common::messages::ExecutionReport::Fill(Box::new(report));
782 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
783 tracing::warn!("Failed to send fill report: {e}");
784 }
785}
786
787fn dispatch_position_status_report(report: PositionStatusReport) {
788 let sender = get_exec_event_sender();
789 let exec_report = nautilus_common::messages::ExecutionReport::Position(Box::new(report));
790 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
791 tracing::warn!("Failed to send position status report: {e}");
792 }
793}
794
795fn dispatch_order_event(event: OrderEventAny) {
796 let sender = get_exec_event_sender();
797 if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
798 tracing::warn!("Failed to send order event: {e}");
799 }
800}