nautilus_bitmex/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the BitMEX adapter.
17
18pub mod canceller;
19pub mod submitter;
20
21use std::{any::Any, future::Future, sync::Mutex};
22
23use anyhow::Context;
24use async_trait::async_trait;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27    live::{runner::get_exec_event_sender, runtime::get_runtime},
28    messages::{
29        ExecutionEvent, ExecutionReport,
30        execution::{
31            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
32            GenerateOrderStatusReport, GeneratePositionReports, ModifyOrder, QueryAccount,
33            QueryOrder, SubmitOrder, SubmitOrderList,
34        },
35    },
36    msgbus,
37};
38use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
39use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
40use nautilus_live::execution::client::LiveExecutionClient;
41use nautilus_model::{
42    accounts::AccountAny,
43    enums::OmsType,
44    events::{AccountState, OrderEventAny, OrderRejected},
45    identifiers::{AccountId, ClientId, Venue, VenueOrderId},
46    instruments::Instrument,
47    orders::Order,
48    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
49    types::{AccountBalance, MarginBalance},
50};
51use tokio::task::JoinHandle;
52
53use crate::{
54    config::BitmexExecClientConfig,
55    execution::{
56        canceller::{CancelBroadcaster, CancelBroadcasterConfig},
57        submitter::{SubmitBroadcaster, SubmitBroadcasterConfig},
58    },
59    http::client::BitmexHttpClient,
60    websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
61};
62
63#[derive(Debug)]
64pub struct BitmexExecutionClient {
65    core: ExecutionClientCore,
66    config: BitmexExecClientConfig,
67    http_client: BitmexHttpClient,
68    ws_client: BitmexWebSocketClient,
69    _submitter: SubmitBroadcaster,
70    _canceller: CancelBroadcaster,
71    started: bool,
72    connected: bool,
73    instruments_initialized: bool,
74    ws_stream_handle: Option<JoinHandle<()>>,
75    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
76}
77
78impl BitmexExecutionClient {
79    /// Creates a new [`BitmexExecutionClient`].
80    ///
81    /// # Errors
82    ///
83    /// Returns an error if either the HTTP or WebSocket client fail to construct.
84    pub fn new(core: ExecutionClientCore, config: BitmexExecClientConfig) -> anyhow::Result<Self> {
85        if !config.has_api_credentials() {
86            anyhow::bail!("BitMEX execution client requires API key and secret");
87        }
88
89        let http_client = BitmexHttpClient::new(
90            Some(config.http_base_url()),
91            config.api_key.clone(),
92            config.api_secret.clone(),
93            config.use_testnet,
94            config.http_timeout_secs,
95            config.max_retries,
96            config.retry_delay_initial_ms,
97            config.retry_delay_max_ms,
98            config.recv_window_ms,
99            config.max_requests_per_second,
100            config.max_requests_per_minute,
101            config.http_proxy_url.clone(),
102        )
103        .context("failed to construct BitMEX HTTP client")?;
104
105        let account_id = config.account_id.unwrap_or(core.account_id);
106        let ws_client = BitmexWebSocketClient::new(
107            Some(config.ws_url()),
108            config.api_key.clone(),
109            config.api_secret.clone(),
110            Some(account_id),
111            config.heartbeat_interval_secs,
112        )
113        .context("failed to construct BitMEX execution websocket client")?;
114
115        let pool_size = config.submitter_pool_size.unwrap_or(1);
116        let submitter_proxy_urls = match &config.submitter_proxy_urls {
117            Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
118            None => vec![config.http_proxy_url.clone(); pool_size],
119        };
120
121        let submitter_config = SubmitBroadcasterConfig {
122            pool_size,
123            api_key: config.api_key.clone(),
124            api_secret: config.api_secret.clone(),
125            base_url: config.base_url_http.clone(),
126            testnet: config.use_testnet,
127            timeout_secs: config.http_timeout_secs,
128            max_retries: config.max_retries,
129            retry_delay_ms: config.retry_delay_initial_ms,
130            retry_delay_max_ms: config.retry_delay_max_ms,
131            recv_window_ms: config.recv_window_ms,
132            max_requests_per_second: config.max_requests_per_second,
133            max_requests_per_minute: config.max_requests_per_minute,
134            proxy_urls: submitter_proxy_urls,
135            ..Default::default()
136        };
137
138        let _submitter = SubmitBroadcaster::new(submitter_config)
139            .context("failed to create SubmitBroadcaster")?;
140
141        let canceller_pool_size = config.canceller_pool_size.unwrap_or(1);
142        let canceller_proxy_urls = match &config.canceller_proxy_urls {
143            Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
144            None => vec![config.http_proxy_url.clone(); canceller_pool_size],
145        };
146
147        let canceller_config = CancelBroadcasterConfig {
148            pool_size: canceller_pool_size,
149            api_key: config.api_key.clone(),
150            api_secret: config.api_secret.clone(),
151            base_url: config.base_url_http.clone(),
152            testnet: config.use_testnet,
153            timeout_secs: config.http_timeout_secs,
154            max_retries: config.max_retries,
155            retry_delay_ms: config.retry_delay_initial_ms,
156            retry_delay_max_ms: config.retry_delay_max_ms,
157            recv_window_ms: config.recv_window_ms,
158            max_requests_per_second: config.max_requests_per_second,
159            max_requests_per_minute: config.max_requests_per_minute,
160            proxy_urls: canceller_proxy_urls,
161            ..Default::default()
162        };
163
164        let _canceller = CancelBroadcaster::new(canceller_config)
165            .context("failed to create CancelBroadcaster")?;
166
167        Ok(Self {
168            core,
169            config,
170            http_client,
171            ws_client,
172            _submitter,
173            _canceller,
174            started: false,
175            connected: false,
176            instruments_initialized: false,
177            ws_stream_handle: None,
178            pending_tasks: Mutex::new(Vec::new()),
179        })
180    }
181
182    fn spawn_task<F>(&self, label: &'static str, fut: F)
183    where
184        F: Future<Output = anyhow::Result<()>> + Send + 'static,
185    {
186        let handle = tokio::spawn(async move {
187            if let Err(e) = fut.await {
188                tracing::error!("{label}: {e:?}");
189            }
190        });
191
192        self.pending_tasks
193            .lock()
194            .expect("pending task lock poisoned")
195            .push(handle);
196    }
197
198    fn abort_pending_tasks(&self) {
199        let mut guard = self
200            .pending_tasks
201            .lock()
202            .expect("pending task lock poisoned");
203        for handle in guard.drain(..) {
204            handle.abort();
205        }
206    }
207
208    async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
209        if self.instruments_initialized {
210            return Ok(());
211        }
212
213        let http = self.http_client.clone();
214        let mut instruments = http
215            .request_instruments(self.config.active_only)
216            .await
217            .context("failed to request BitMEX instruments")?;
218
219        instruments.sort_by_key(|instrument| instrument.id());
220
221        for instrument in &instruments {
222            self.http_client.cache_instrument(instrument.clone());
223            self._submitter.cache_instrument(instrument.clone());
224            self._canceller.cache_instrument(instrument.clone());
225        }
226
227        self.ws_client.cache_instruments(instruments);
228
229        self.instruments_initialized = true;
230        Ok(())
231    }
232
233    fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
234        if self.instruments_initialized {
235            return Ok(());
236        }
237
238        let runtime = get_runtime();
239        runtime.block_on(self.ensure_instruments_initialized_async())
240    }
241
242    async fn refresh_account_state(&self) -> anyhow::Result<()> {
243        let account_state = self
244            .http_client
245            .request_account_state(self.core.account_id)
246            .await
247            .context("failed to request BitMEX account state")?;
248
249        dispatch_account_state(account_state);
250        Ok(())
251    }
252
253    fn update_account_state(&self) -> anyhow::Result<()> {
254        let runtime = get_runtime();
255        runtime.block_on(self.refresh_account_state())
256    }
257
258    fn start_ws_stream(&mut self) -> anyhow::Result<()> {
259        if self.ws_stream_handle.is_some() {
260            return Ok(());
261        }
262
263        let stream = self.ws_client.stream();
264
265        let handle = tokio::spawn(async move {
266            pin_mut!(stream);
267            while let Some(message) = stream.next().await {
268                dispatch_ws_message(message);
269            }
270        });
271
272        self.ws_stream_handle = Some(handle);
273        Ok(())
274    }
275}
276
277#[async_trait(?Send)]
278impl ExecutionClient for BitmexExecutionClient {
279    fn is_connected(&self) -> bool {
280        self.connected
281    }
282
283    fn client_id(&self) -> ClientId {
284        self.core.client_id
285    }
286
287    fn account_id(&self) -> AccountId {
288        self.core.account_id
289    }
290
291    fn venue(&self) -> Venue {
292        self.core.venue
293    }
294
295    fn oms_type(&self) -> OmsType {
296        self.core.oms_type
297    }
298
299    fn get_account(&self) -> Option<AccountAny> {
300        self.core.get_account()
301    }
302
303    fn generate_account_state(
304        &self,
305        balances: Vec<AccountBalance>,
306        margins: Vec<MarginBalance>,
307        reported: bool,
308        ts_event: UnixNanos,
309    ) -> anyhow::Result<()> {
310        self.core
311            .generate_account_state(balances, margins, reported, ts_event)
312    }
313
314    fn start(&mut self) -> anyhow::Result<()> {
315        if self.started {
316            return Ok(());
317        }
318
319        self.ensure_instruments_initialized()?;
320        self.started = true;
321        tracing::info!(
322            client_id = %self.core.client_id,
323            account_id = %self.core.account_id,
324            use_testnet = self.config.use_testnet,
325            submitter_pool_size = ?self.config.submitter_pool_size,
326            canceller_pool_size = ?self.config.canceller_pool_size,
327            http_proxy_url = ?self.config.http_proxy_url,
328            ws_proxy_url = ?self.config.ws_proxy_url,
329            submitter_proxy_urls = ?self.config.submitter_proxy_urls,
330            canceller_proxy_urls = ?self.config.canceller_proxy_urls,
331            "BitMEX execution client started"
332        );
333        Ok(())
334    }
335
336    fn stop(&mut self) -> anyhow::Result<()> {
337        if !self.started {
338            return Ok(());
339        }
340
341        self.started = false;
342        self.connected = false;
343        if let Some(handle) = self.ws_stream_handle.take() {
344            handle.abort();
345        }
346        self.abort_pending_tasks();
347        tracing::info!("BitMEX execution client {} stopped", self.core.client_id);
348        Ok(())
349    }
350
351    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
352        let order = cmd.order.clone();
353
354        if order.is_closed() {
355            tracing::warn!("Cannot submit closed order {}", order.client_order_id());
356            return Ok(());
357        }
358
359        self.core.generate_order_submitted(
360            order.strategy_id(),
361            order.instrument_id(),
362            order.client_order_id(),
363            cmd.ts_init,
364        );
365
366        let submit_tries = cmd
367            .params
368            .as_ref()
369            .and_then(|params| params.get("submit_tries"))
370            .and_then(|s| s.parse::<usize>().ok())
371            .filter(|&n| n > 0);
372
373        let use_broadcaster = submit_tries.is_some_and(|n| n > 1);
374
375        let http_client = self.http_client.clone();
376        let submitter = self._submitter.clone_for_async();
377        let trader_id = self.core.trader_id;
378        let strategy_id = order.strategy_id();
379        let instrument_id = order.instrument_id();
380        let account_id = self.core.account_id;
381        let client_order_id = order.client_order_id();
382        let order_side = order.order_side();
383        let order_type = order.order_type();
384        let quantity = order.quantity();
385        let time_in_force = order.time_in_force();
386        let price = order.price();
387        let trigger_price = order.trigger_price();
388        let trigger_type = order.trigger_type();
389        let display_qty = order.display_qty();
390        let post_only = order.is_post_only();
391        let reduce_only = order.is_reduce_only();
392        let order_list_id = order.order_list_id();
393        let contingency_type = order.contingency_type();
394        let ts_event = cmd.ts_init;
395
396        self.spawn_task("submit_order", async move {
397            let result = if use_broadcaster {
398                submitter
399                    .broadcast_submit(
400                        instrument_id,
401                        client_order_id,
402                        order_side,
403                        order_type,
404                        quantity,
405                        time_in_force,
406                        price,
407                        trigger_price,
408                        trigger_type,
409                        display_qty,
410                        post_only,
411                        reduce_only,
412                        order_list_id,
413                        contingency_type,
414                        submit_tries,
415                    )
416                    .await
417            } else {
418                http_client
419                    .submit_order(
420                        instrument_id,
421                        client_order_id,
422                        order_side,
423                        order_type,
424                        quantity,
425                        time_in_force,
426                        price,
427                        trigger_price,
428                        trigger_type,
429                        display_qty,
430                        post_only,
431                        reduce_only,
432                        order_list_id,
433                        contingency_type,
434                    )
435                    .await
436            };
437
438            match result {
439                Ok(report) => dispatch_order_status_report(report),
440                Err(e) => {
441                    let event = OrderRejected::new(
442                        trader_id,
443                        strategy_id,
444                        instrument_id,
445                        client_order_id,
446                        account_id,
447                        format!("submit-order-error: {e}").into(),
448                        UUID4::new(),
449                        ts_event,
450                        get_atomic_clock_realtime().get_time_ns(),
451                        false,
452                        post_only,
453                    );
454                    dispatch_order_event(OrderEventAny::Rejected(event));
455                }
456            }
457            Ok(())
458        });
459
460        Ok(())
461    }
462
463    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
464        tracing::warn!(
465            "submit_order_list not yet implemented for BitMEX execution client ({} orders)",
466            cmd.order_list.orders.len()
467        );
468        Ok(())
469    }
470
471    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
472        let http_client = self.http_client.clone();
473        let instrument_id = cmd.instrument_id;
474        let client_order_id = Some(cmd.client_order_id);
475        let venue_order_id = Some(cmd.venue_order_id);
476        let quantity = cmd.quantity;
477        let price = cmd.price;
478        let trigger_price = cmd.trigger_price;
479
480        self.spawn_task("modify_order", async move {
481            match http_client
482                .modify_order(
483                    instrument_id,
484                    client_order_id,
485                    venue_order_id,
486                    quantity,
487                    price,
488                    trigger_price,
489                )
490                .await
491            {
492                Ok(report) => dispatch_order_status_report(report),
493                Err(e) => tracing::error!("BitMEX modify order failed: {e:?}"),
494            }
495            Ok(())
496        });
497
498        Ok(())
499    }
500
501    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
502        let canceller = self._canceller.clone_for_async();
503        let instrument_id = cmd.instrument_id;
504        let client_order_id = Some(cmd.client_order_id);
505        let venue_order_id = Some(cmd.venue_order_id);
506
507        self.spawn_task("cancel_order", async move {
508            match canceller
509                .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
510                .await
511            {
512                Ok(Some(report)) => dispatch_order_status_report(report),
513                Ok(None) => {
514                    // Idempotent success - order already cancelled
515                    tracing::debug!("Order already cancelled: {:?}", client_order_id);
516                }
517                Err(e) => tracing::error!("BitMEX cancel order failed: {e:?}"),
518            }
519            Ok(())
520        });
521
522        Ok(())
523    }
524
525    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
526        let canceller = self._canceller.clone_for_async();
527        let instrument_id = cmd.instrument_id;
528        let order_side = Some(cmd.order_side);
529
530        self.spawn_task("cancel_all_orders", async move {
531            match canceller
532                .broadcast_cancel_all(instrument_id, order_side)
533                .await
534            {
535                Ok(reports) => {
536                    for report in reports {
537                        dispatch_order_status_report(report);
538                    }
539                }
540                Err(e) => tracing::error!("BitMEX cancel all failed: {e:?}"),
541            }
542            Ok(())
543        });
544
545        Ok(())
546    }
547
548    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
549        let canceller = self._canceller.clone_for_async();
550        let instrument_id = cmd.instrument_id;
551        let venue_ids: Vec<VenueOrderId> = cmd
552            .cancels
553            .iter()
554            .map(|cancel| cancel.venue_order_id)
555            .collect();
556
557        self.spawn_task("batch_cancel_orders", async move {
558            match canceller
559                .broadcast_batch_cancel(instrument_id, None, Some(venue_ids))
560                .await
561            {
562                Ok(reports) => {
563                    for report in reports {
564                        dispatch_order_status_report(report);
565                    }
566                }
567                Err(e) => tracing::error!("BitMEX batch cancel failed: {e:?}"),
568            }
569            Ok(())
570        });
571
572        Ok(())
573    }
574
575    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
576        self.update_account_state()
577    }
578
579    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
580        let http_client = self.http_client.clone();
581        let instrument_id = cmd.instrument_id;
582        let client_order_id = Some(cmd.client_order_id);
583        let venue_order_id = Some(cmd.venue_order_id);
584
585        self.spawn_task("query_order", async move {
586            match http_client
587                .request_order_status_report(instrument_id, client_order_id, venue_order_id)
588                .await
589            {
590                Ok(report) => dispatch_order_status_report(report),
591                Err(e) => tracing::error!("BitMEX query order failed: {e:?}"),
592            }
593            Ok(())
594        });
595
596        Ok(())
597    }
598
599    async fn connect(&mut self) -> anyhow::Result<()> {
600        if self.connected {
601            return Ok(());
602        }
603
604        self.ensure_instruments_initialized_async().await?;
605
606        self._submitter.start().await?;
607        self._canceller.start().await?;
608
609        self.ws_client.connect().await?;
610        self.ws_client.wait_until_active(10.0).await?;
611
612        self.ws_client.subscribe_orders().await?;
613        self.ws_client.subscribe_executions().await?;
614        self.ws_client.subscribe_positions().await?;
615        self.ws_client.subscribe_wallet().await?;
616        if let Err(e) = self.ws_client.subscribe_margin().await {
617            tracing::debug!("Margin subscription unavailable: {e:?}");
618        }
619
620        self.start_ws_stream()?;
621        self.refresh_account_state().await?;
622
623        self.connected = true;
624        self.core.set_connected(true);
625        tracing::info!(client_id = %self.core.client_id, "Connected");
626        Ok(())
627    }
628
629    async fn disconnect(&mut self) -> anyhow::Result<()> {
630        if !self.connected {
631            return Ok(());
632        }
633
634        self.http_client.cancel_all_requests();
635        self._submitter.stop().await;
636        self._canceller.stop().await;
637
638        if let Err(e) = self.ws_client.close().await {
639            tracing::warn!("Error while closing BitMEX execution websocket: {e:?}");
640        }
641
642        if let Some(handle) = self.ws_stream_handle.take() {
643            handle.abort();
644        }
645
646        self.abort_pending_tasks();
647        self.connected = false;
648        self.core.set_connected(false);
649        tracing::info!(client_id = %self.core.client_id, "Disconnected");
650        Ok(())
651    }
652}
653
654#[async_trait(?Send)]
655impl LiveExecutionClient for BitmexExecutionClient {
656    async fn generate_order_status_report(
657        &self,
658        cmd: &GenerateOrderStatusReport,
659    ) -> anyhow::Result<Option<OrderStatusReport>> {
660        let instrument_id = cmd
661            .instrument_id
662            .context("BitMEX generate_order_status_report requires an instrument identifier")?;
663
664        self.http_client
665            .query_order(
666                instrument_id,
667                cmd.client_order_id,
668                cmd.venue_order_id.map(|id| VenueOrderId::from(id.as_str())),
669            )
670            .await
671            .context("failed to query BitMEX order status")
672    }
673
674    async fn generate_order_status_reports(
675        &self,
676        cmd: &GenerateOrderStatusReport,
677    ) -> anyhow::Result<Vec<OrderStatusReport>> {
678        let reports = self
679            .http_client
680            .request_order_status_reports(cmd.instrument_id, false, None)
681            .await
682            .context("failed to request BitMEX order status reports")?;
683        Ok(reports)
684    }
685
686    async fn generate_fill_reports(
687        &self,
688        cmd: GenerateFillReports,
689    ) -> anyhow::Result<Vec<FillReport>> {
690        let mut reports = self
691            .http_client
692            .request_fill_reports(cmd.instrument_id, None)
693            .await
694            .context("failed to request BitMEX fill reports")?;
695
696        if let Some(order_id) = cmd.venue_order_id {
697            reports.retain(|report| report.venue_order_id.as_str() == order_id.as_str());
698        }
699
700        Ok(reports)
701    }
702
703    async fn generate_position_status_reports(
704        &self,
705        cmd: &GeneratePositionReports,
706    ) -> anyhow::Result<Vec<PositionStatusReport>> {
707        let mut reports = self
708            .http_client
709            .request_position_status_reports()
710            .await
711            .context("failed to request BitMEX position reports")?;
712
713        if let Some(instrument_id) = cmd.instrument_id {
714            reports.retain(|report| report.instrument_id == instrument_id);
715        }
716
717        Ok(reports)
718    }
719
720    async fn generate_mass_status(
721        &self,
722        _lookback_mins: Option<u64>,
723    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
724        tracing::warn!("generate_mass_status not yet implemented for BitMEX execution client");
725        Ok(None)
726    }
727}
728
729fn dispatch_ws_message(message: NautilusWsMessage) {
730    match message {
731        NautilusWsMessage::OrderStatusReports(reports) => {
732            for report in reports {
733                dispatch_order_status_report(report);
734            }
735        }
736        NautilusWsMessage::FillReports(reports) => {
737            for report in reports {
738                dispatch_fill_report(report);
739            }
740        }
741        NautilusWsMessage::PositionStatusReport(report) => {
742            dispatch_position_status_report(report);
743        }
744        NautilusWsMessage::AccountState(state) => dispatch_account_state(state),
745        NautilusWsMessage::OrderUpdated(event) => {
746            dispatch_order_event(OrderEventAny::Updated(event));
747        }
748        NautilusWsMessage::Data(_)
749        | NautilusWsMessage::Instruments(_)
750        | NautilusWsMessage::FundingRateUpdates(_) => {
751            tracing::debug!("Ignoring BitMEX data message on execution stream");
752        }
753        NautilusWsMessage::Reconnected => {
754            tracing::info!("BitMEX execution websocket reconnected");
755        }
756        NautilusWsMessage::Authenticated => {
757            tracing::debug!("BitMEX execution websocket authenticated");
758        }
759    }
760}
761
762fn dispatch_account_state(state: AccountState) {
763    msgbus::send_any("Portfolio.update_account".into(), &state as &dyn Any);
764}
765
766fn dispatch_order_status_report(report: OrderStatusReport) {
767    let sender = get_exec_event_sender();
768    let exec_report = ExecutionReport::OrderStatus(Box::new(report));
769    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
770        tracing::warn!("Failed to send order status report: {e}");
771    }
772}
773
774fn dispatch_fill_report(report: FillReport) {
775    let sender = get_exec_event_sender();
776    let exec_report = ExecutionReport::Fill(Box::new(report));
777    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
778        tracing::warn!("Failed to send fill report: {e}");
779    }
780}
781
782fn dispatch_position_status_report(report: PositionStatusReport) {
783    let sender = get_exec_event_sender();
784    let exec_report = ExecutionReport::Position(Box::new(report));
785    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
786        tracing::warn!("Failed to send position status report: {e}");
787    }
788}
789
790fn dispatch_order_event(event: OrderEventAny) {
791    let sender = get_exec_event_sender();
792    if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
793        tracing::warn!("Failed to send order event: {e}");
794    }
795}