nautilus_bitmex/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the BitMEX adapter.
17
18pub mod canceller;
19pub mod submitter;
20
21use std::{any::Any, future::Future, sync::Mutex};
22
23use anyhow::Context;
24use async_trait::async_trait;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27    live::{runner::get_exec_event_sender, runtime::get_runtime},
28    messages::{
29        ExecutionEvent, ExecutionReport,
30        execution::{
31            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
32            GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
33            ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
34        },
35    },
36    msgbus,
37};
38use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
39use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
40use nautilus_model::{
41    accounts::AccountAny,
42    enums::OmsType,
43    events::{AccountState, OrderEventAny, OrderRejected},
44    identifiers::{AccountId, ClientId, Venue, VenueOrderId},
45    instruments::Instrument,
46    orders::Order,
47    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48    types::{AccountBalance, MarginBalance},
49};
50use tokio::task::JoinHandle;
51
52use crate::{
53    config::BitmexExecClientConfig,
54    execution::{
55        canceller::{CancelBroadcaster, CancelBroadcasterConfig},
56        submitter::{SubmitBroadcaster, SubmitBroadcasterConfig},
57    },
58    http::client::BitmexHttpClient,
59    websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
60};
61
62#[derive(Debug)]
63pub struct BitmexExecutionClient {
64    core: ExecutionClientCore,
65    config: BitmexExecClientConfig,
66    http_client: BitmexHttpClient,
67    ws_client: BitmexWebSocketClient,
68    _submitter: SubmitBroadcaster,
69    _canceller: CancelBroadcaster,
70    started: bool,
71    connected: bool,
72    instruments_initialized: bool,
73    ws_stream_handle: Option<JoinHandle<()>>,
74    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
75}
76
77impl BitmexExecutionClient {
78    /// Creates a new [`BitmexExecutionClient`].
79    ///
80    /// # Errors
81    ///
82    /// Returns an error if either the HTTP or WebSocket client fail to construct.
83    pub fn new(core: ExecutionClientCore, config: BitmexExecClientConfig) -> anyhow::Result<Self> {
84        if !config.has_api_credentials() {
85            anyhow::bail!("BitMEX execution client requires API key and secret");
86        }
87
88        let http_client = BitmexHttpClient::new(
89            Some(config.http_base_url()),
90            config.api_key.clone(),
91            config.api_secret.clone(),
92            config.use_testnet,
93            config.http_timeout_secs,
94            config.max_retries,
95            config.retry_delay_initial_ms,
96            config.retry_delay_max_ms,
97            config.recv_window_ms,
98            config.max_requests_per_second,
99            config.max_requests_per_minute,
100            config.http_proxy_url.clone(),
101        )
102        .context("failed to construct BitMEX HTTP client")?;
103
104        let account_id = config.account_id.unwrap_or(core.account_id);
105        let ws_client = BitmexWebSocketClient::new(
106            Some(config.ws_url()),
107            config.api_key.clone(),
108            config.api_secret.clone(),
109            Some(account_id),
110            config.heartbeat_interval_secs,
111        )
112        .context("failed to construct BitMEX execution websocket client")?;
113
114        let pool_size = config.submitter_pool_size.unwrap_or(1);
115        let submitter_proxy_urls = match &config.submitter_proxy_urls {
116            Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
117            None => vec![config.http_proxy_url.clone(); pool_size],
118        };
119
120        let submitter_config = SubmitBroadcasterConfig {
121            pool_size,
122            api_key: config.api_key.clone(),
123            api_secret: config.api_secret.clone(),
124            base_url: config.base_url_http.clone(),
125            testnet: config.use_testnet,
126            timeout_secs: config.http_timeout_secs,
127            max_retries: config.max_retries,
128            retry_delay_ms: config.retry_delay_initial_ms,
129            retry_delay_max_ms: config.retry_delay_max_ms,
130            recv_window_ms: config.recv_window_ms,
131            max_requests_per_second: config.max_requests_per_second,
132            max_requests_per_minute: config.max_requests_per_minute,
133            proxy_urls: submitter_proxy_urls,
134            ..Default::default()
135        };
136
137        let _submitter = SubmitBroadcaster::new(submitter_config)
138            .context("failed to create SubmitBroadcaster")?;
139
140        let canceller_pool_size = config.canceller_pool_size.unwrap_or(1);
141        let canceller_proxy_urls = match &config.canceller_proxy_urls {
142            Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
143            None => vec![config.http_proxy_url.clone(); canceller_pool_size],
144        };
145
146        let canceller_config = CancelBroadcasterConfig {
147            pool_size: canceller_pool_size,
148            api_key: config.api_key.clone(),
149            api_secret: config.api_secret.clone(),
150            base_url: config.base_url_http.clone(),
151            testnet: config.use_testnet,
152            timeout_secs: config.http_timeout_secs,
153            max_retries: config.max_retries,
154            retry_delay_ms: config.retry_delay_initial_ms,
155            retry_delay_max_ms: config.retry_delay_max_ms,
156            recv_window_ms: config.recv_window_ms,
157            max_requests_per_second: config.max_requests_per_second,
158            max_requests_per_minute: config.max_requests_per_minute,
159            proxy_urls: canceller_proxy_urls,
160            ..Default::default()
161        };
162
163        let _canceller = CancelBroadcaster::new(canceller_config)
164            .context("failed to create CancelBroadcaster")?;
165
166        Ok(Self {
167            core,
168            config,
169            http_client,
170            ws_client,
171            _submitter,
172            _canceller,
173            started: false,
174            connected: false,
175            instruments_initialized: false,
176            ws_stream_handle: None,
177            pending_tasks: Mutex::new(Vec::new()),
178        })
179    }
180
181    fn spawn_task<F>(&self, label: &'static str, fut: F)
182    where
183        F: Future<Output = anyhow::Result<()>> + Send + 'static,
184    {
185        let handle = get_runtime().spawn(async move {
186            if let Err(e) = fut.await {
187                tracing::error!("{label}: {e:?}");
188            }
189        });
190
191        self.pending_tasks
192            .lock()
193            .expect("pending task lock poisoned")
194            .push(handle);
195    }
196
197    fn abort_pending_tasks(&self) {
198        let mut guard = self
199            .pending_tasks
200            .lock()
201            .expect("pending task lock poisoned");
202        for handle in guard.drain(..) {
203            handle.abort();
204        }
205    }
206
207    async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
208        if self.instruments_initialized {
209            return Ok(());
210        }
211
212        let http = self.http_client.clone();
213        let mut instruments = http
214            .request_instruments(self.config.active_only)
215            .await
216            .context("failed to request BitMEX instruments")?;
217
218        instruments.sort_by_key(|instrument| instrument.id());
219
220        for instrument in &instruments {
221            self.http_client.cache_instrument(instrument.clone());
222            self._submitter.cache_instrument(instrument.clone());
223            self._canceller.cache_instrument(instrument.clone());
224        }
225
226        self.ws_client.cache_instruments(instruments);
227
228        self.instruments_initialized = true;
229        Ok(())
230    }
231
232    fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
233        if self.instruments_initialized {
234            return Ok(());
235        }
236
237        let runtime = get_runtime();
238        runtime.block_on(self.ensure_instruments_initialized_async())
239    }
240
241    async fn refresh_account_state(&self) -> anyhow::Result<()> {
242        let account_state = self
243            .http_client
244            .request_account_state(self.core.account_id)
245            .await
246            .context("failed to request BitMEX account state")?;
247
248        dispatch_account_state(account_state);
249        Ok(())
250    }
251
252    fn update_account_state(&self) -> anyhow::Result<()> {
253        let runtime = get_runtime();
254        runtime.block_on(self.refresh_account_state())
255    }
256
257    fn start_ws_stream(&mut self) -> anyhow::Result<()> {
258        if self.ws_stream_handle.is_some() {
259            return Ok(());
260        }
261
262        let stream = self.ws_client.stream();
263
264        let handle = get_runtime().spawn(async move {
265            pin_mut!(stream);
266            while let Some(message) = stream.next().await {
267                dispatch_ws_message(message);
268            }
269        });
270
271        self.ws_stream_handle = Some(handle);
272        Ok(())
273    }
274}
275
276#[async_trait(?Send)]
277impl ExecutionClient for BitmexExecutionClient {
278    fn is_connected(&self) -> bool {
279        self.connected
280    }
281
282    fn client_id(&self) -> ClientId {
283        self.core.client_id
284    }
285
286    fn account_id(&self) -> AccountId {
287        self.core.account_id
288    }
289
290    fn venue(&self) -> Venue {
291        self.core.venue
292    }
293
294    fn oms_type(&self) -> OmsType {
295        self.core.oms_type
296    }
297
298    fn get_account(&self) -> Option<AccountAny> {
299        self.core.get_account()
300    }
301
302    fn generate_account_state(
303        &self,
304        balances: Vec<AccountBalance>,
305        margins: Vec<MarginBalance>,
306        reported: bool,
307        ts_event: UnixNanos,
308    ) -> anyhow::Result<()> {
309        self.core
310            .generate_account_state(balances, margins, reported, ts_event)
311    }
312
313    fn start(&mut self) -> anyhow::Result<()> {
314        if self.started {
315            return Ok(());
316        }
317
318        self.ensure_instruments_initialized()?;
319        self.started = true;
320        tracing::info!(
321            client_id = %self.core.client_id,
322            account_id = %self.core.account_id,
323            use_testnet = self.config.use_testnet,
324            submitter_pool_size = ?self.config.submitter_pool_size,
325            canceller_pool_size = ?self.config.canceller_pool_size,
326            http_proxy_url = ?self.config.http_proxy_url,
327            ws_proxy_url = ?self.config.ws_proxy_url,
328            submitter_proxy_urls = ?self.config.submitter_proxy_urls,
329            canceller_proxy_urls = ?self.config.canceller_proxy_urls,
330            "BitMEX execution client started"
331        );
332        Ok(())
333    }
334
335    fn stop(&mut self) -> anyhow::Result<()> {
336        if !self.started {
337            return Ok(());
338        }
339
340        self.started = false;
341        self.connected = false;
342        if let Some(handle) = self.ws_stream_handle.take() {
343            handle.abort();
344        }
345        self.abort_pending_tasks();
346        tracing::info!("BitMEX execution client {} stopped", self.core.client_id);
347        Ok(())
348    }
349
350    async fn connect(&mut self) -> anyhow::Result<()> {
351        if self.connected {
352            return Ok(());
353        }
354
355        self.ensure_instruments_initialized_async().await?;
356
357        self._submitter.start().await?;
358        self._canceller.start().await?;
359
360        self.ws_client.connect().await?;
361        self.ws_client.wait_until_active(10.0).await?;
362
363        self.ws_client.subscribe_orders().await?;
364        self.ws_client.subscribe_executions().await?;
365        self.ws_client.subscribe_positions().await?;
366        self.ws_client.subscribe_wallet().await?;
367        if let Err(e) = self.ws_client.subscribe_margin().await {
368            tracing::debug!("Margin subscription unavailable: {e:?}");
369        }
370
371        self.start_ws_stream()?;
372        self.refresh_account_state().await?;
373
374        self.connected = true;
375        self.core.set_connected(true);
376        tracing::info!(client_id = %self.core.client_id, "Connected");
377        Ok(())
378    }
379
380    async fn disconnect(&mut self) -> anyhow::Result<()> {
381        if !self.connected {
382            return Ok(());
383        }
384
385        self.http_client.cancel_all_requests();
386        self._submitter.stop().await;
387        self._canceller.stop().await;
388
389        if let Err(e) = self.ws_client.close().await {
390            tracing::warn!("Error while closing BitMEX execution websocket: {e:?}");
391        }
392
393        if let Some(handle) = self.ws_stream_handle.take() {
394            handle.abort();
395        }
396
397        self.abort_pending_tasks();
398        self.connected = false;
399        self.core.set_connected(false);
400        tracing::info!(client_id = %self.core.client_id, "Disconnected");
401        Ok(())
402    }
403
404    async fn generate_order_status_report(
405        &self,
406        cmd: &GenerateOrderStatusReport,
407    ) -> anyhow::Result<Option<OrderStatusReport>> {
408        let instrument_id = cmd
409            .instrument_id
410            .context("BitMEX generate_order_status_report requires an instrument identifier")?;
411
412        self.http_client
413            .query_order(
414                instrument_id,
415                cmd.client_order_id,
416                cmd.venue_order_id.map(|id| VenueOrderId::from(id.as_str())),
417            )
418            .await
419            .context("failed to query BitMEX order status")
420    }
421
422    async fn generate_order_status_reports(
423        &self,
424        cmd: &GenerateOrderStatusReports,
425    ) -> anyhow::Result<Vec<OrderStatusReport>> {
426        let reports = self
427            .http_client
428            .request_order_status_reports(cmd.instrument_id, cmd.open_only, None)
429            .await
430            .context("failed to request BitMEX order status reports")?;
431        Ok(reports)
432    }
433
434    async fn generate_fill_reports(
435        &self,
436        cmd: GenerateFillReports,
437    ) -> anyhow::Result<Vec<FillReport>> {
438        let mut reports = self
439            .http_client
440            .request_fill_reports(cmd.instrument_id, None)
441            .await
442            .context("failed to request BitMEX fill reports")?;
443
444        if let Some(order_id) = cmd.venue_order_id {
445            reports.retain(|report| report.venue_order_id.as_str() == order_id.as_str());
446        }
447
448        Ok(reports)
449    }
450
451    async fn generate_position_status_reports(
452        &self,
453        cmd: &GeneratePositionStatusReports,
454    ) -> anyhow::Result<Vec<PositionStatusReport>> {
455        let mut reports = self
456            .http_client
457            .request_position_status_reports()
458            .await
459            .context("failed to request BitMEX position reports")?;
460
461        if let Some(instrument_id) = cmd.instrument_id {
462            reports.retain(|report| report.instrument_id == instrument_id);
463        }
464
465        Ok(reports)
466    }
467
468    async fn generate_mass_status(
469        &self,
470        _lookback_mins: Option<u64>,
471    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
472        tracing::warn!("generate_mass_status not yet implemented for BitMEX execution client");
473        Ok(None)
474    }
475
476    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
477        self.update_account_state()
478    }
479
480    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
481        let http_client = self.http_client.clone();
482        let instrument_id = cmd.instrument_id;
483        let client_order_id = Some(cmd.client_order_id);
484        let venue_order_id = cmd.venue_order_id;
485
486        self.spawn_task("query_order", async move {
487            match http_client
488                .request_order_status_report(instrument_id, client_order_id, venue_order_id)
489                .await
490            {
491                Ok(report) => dispatch_order_status_report(report),
492                Err(e) => tracing::error!("BitMEX query order failed: {e:?}"),
493            }
494            Ok(())
495        });
496
497        Ok(())
498    }
499
500    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
501        let order = cmd.order.clone();
502
503        if order.is_closed() {
504            tracing::warn!("Cannot submit closed order {}", order.client_order_id());
505            return Ok(());
506        }
507
508        self.core.generate_order_submitted(
509            order.strategy_id(),
510            order.instrument_id(),
511            order.client_order_id(),
512            cmd.ts_init,
513        );
514
515        let submit_tries = cmd
516            .params
517            .as_ref()
518            .and_then(|params| params.get("submit_tries"))
519            .and_then(|s| s.parse::<usize>().ok())
520            .filter(|&n| n > 0);
521
522        let use_broadcaster = submit_tries.is_some_and(|n| n > 1);
523
524        let http_client = self.http_client.clone();
525        let submitter = self._submitter.clone_for_async();
526        let trader_id = self.core.trader_id;
527        let strategy_id = order.strategy_id();
528        let instrument_id = order.instrument_id();
529        let account_id = self.core.account_id;
530        let client_order_id = order.client_order_id();
531        let order_side = order.order_side();
532        let order_type = order.order_type();
533        let quantity = order.quantity();
534        let time_in_force = order.time_in_force();
535        let price = order.price();
536        let trigger_price = order.trigger_price();
537        let trigger_type = order.trigger_type();
538        let display_qty = order.display_qty();
539        let post_only = order.is_post_only();
540        let reduce_only = order.is_reduce_only();
541        let order_list_id = order.order_list_id();
542        let contingency_type = order.contingency_type();
543        let ts_event = cmd.ts_init;
544
545        self.spawn_task("submit_order", async move {
546            let result = if use_broadcaster {
547                submitter
548                    .broadcast_submit(
549                        instrument_id,
550                        client_order_id,
551                        order_side,
552                        order_type,
553                        quantity,
554                        time_in_force,
555                        price,
556                        trigger_price,
557                        trigger_type,
558                        display_qty,
559                        post_only,
560                        reduce_only,
561                        order_list_id,
562                        contingency_type,
563                        submit_tries,
564                    )
565                    .await
566            } else {
567                http_client
568                    .submit_order(
569                        instrument_id,
570                        client_order_id,
571                        order_side,
572                        order_type,
573                        quantity,
574                        time_in_force,
575                        price,
576                        trigger_price,
577                        trigger_type,
578                        display_qty,
579                        post_only,
580                        reduce_only,
581                        order_list_id,
582                        contingency_type,
583                    )
584                    .await
585            };
586
587            match result {
588                Ok(report) => dispatch_order_status_report(report),
589                Err(e) => {
590                    let event = OrderRejected::new(
591                        trader_id,
592                        strategy_id,
593                        instrument_id,
594                        client_order_id,
595                        account_id,
596                        format!("submit-order-error: {e}").into(),
597                        UUID4::new(),
598                        ts_event,
599                        get_atomic_clock_realtime().get_time_ns(),
600                        false,
601                        post_only,
602                    );
603                    dispatch_order_event(OrderEventAny::Rejected(event));
604                }
605            }
606            Ok(())
607        });
608
609        Ok(())
610    }
611
612    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
613        tracing::warn!(
614            "submit_order_list not yet implemented for BitMEX execution client ({} orders)",
615            cmd.order_list.orders.len()
616        );
617        Ok(())
618    }
619
620    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
621        let http_client = self.http_client.clone();
622        let instrument_id = cmd.instrument_id;
623        let client_order_id = Some(cmd.client_order_id);
624        let venue_order_id = cmd.venue_order_id;
625        let quantity = cmd.quantity;
626        let price = cmd.price;
627        let trigger_price = cmd.trigger_price;
628
629        self.spawn_task("modify_order", async move {
630            match http_client
631                .modify_order(
632                    instrument_id,
633                    client_order_id,
634                    venue_order_id,
635                    quantity,
636                    price,
637                    trigger_price,
638                )
639                .await
640            {
641                Ok(report) => dispatch_order_status_report(report),
642                Err(e) => tracing::error!("BitMEX modify order failed: {e:?}"),
643            }
644            Ok(())
645        });
646
647        Ok(())
648    }
649
650    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
651        let canceller = self._canceller.clone_for_async();
652        let instrument_id = cmd.instrument_id;
653        let client_order_id = Some(cmd.client_order_id);
654        let venue_order_id = cmd.venue_order_id;
655
656        self.spawn_task("cancel_order", async move {
657            match canceller
658                .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
659                .await
660            {
661                Ok(Some(report)) => dispatch_order_status_report(report),
662                Ok(None) => {
663                    // Idempotent success - order already cancelled
664                    tracing::debug!("Order already cancelled: {:?}", client_order_id);
665                }
666                Err(e) => tracing::error!("BitMEX cancel order failed: {e:?}"),
667            }
668            Ok(())
669        });
670
671        Ok(())
672    }
673
674    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
675        let canceller = self._canceller.clone_for_async();
676        let instrument_id = cmd.instrument_id;
677        let order_side = Some(cmd.order_side);
678
679        self.spawn_task("cancel_all_orders", async move {
680            match canceller
681                .broadcast_cancel_all(instrument_id, order_side)
682                .await
683            {
684                Ok(reports) => {
685                    for report in reports {
686                        dispatch_order_status_report(report);
687                    }
688                }
689                Err(e) => tracing::error!("BitMEX cancel all failed: {e:?}"),
690            }
691            Ok(())
692        });
693
694        Ok(())
695    }
696
697    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
698        let canceller = self._canceller.clone_for_async();
699        let instrument_id = cmd.instrument_id;
700        let venue_ids: Vec<VenueOrderId> = cmd
701            .cancels
702            .iter()
703            .filter_map(|cancel| cancel.venue_order_id)
704            .collect();
705
706        self.spawn_task("batch_cancel_orders", async move {
707            match canceller
708                .broadcast_batch_cancel(instrument_id, None, Some(venue_ids))
709                .await
710            {
711                Ok(reports) => {
712                    for report in reports {
713                        dispatch_order_status_report(report);
714                    }
715                }
716                Err(e) => tracing::error!("BitMEX batch cancel failed: {e:?}"),
717            }
718            Ok(())
719        });
720
721        Ok(())
722    }
723}
724
725fn dispatch_ws_message(message: NautilusWsMessage) {
726    match message {
727        NautilusWsMessage::OrderStatusReports(reports) => {
728            for report in reports {
729                dispatch_order_status_report(report);
730            }
731        }
732        NautilusWsMessage::FillReports(reports) => {
733            for report in reports {
734                dispatch_fill_report(report);
735            }
736        }
737        NautilusWsMessage::PositionStatusReport(report) => {
738            dispatch_position_status_report(report);
739        }
740        NautilusWsMessage::AccountState(state) => dispatch_account_state(state),
741        NautilusWsMessage::OrderUpdated(event) => {
742            dispatch_order_event(OrderEventAny::Updated(event));
743        }
744        NautilusWsMessage::Data(_)
745        | NautilusWsMessage::Instruments(_)
746        | NautilusWsMessage::FundingRateUpdates(_) => {
747            tracing::debug!("Ignoring BitMEX data message on execution stream");
748        }
749        NautilusWsMessage::Reconnected => {
750            tracing::info!("BitMEX execution websocket reconnected");
751        }
752        NautilusWsMessage::Authenticated => {
753            tracing::debug!("BitMEX execution websocket authenticated");
754        }
755    }
756}
757
758fn dispatch_account_state(state: AccountState) {
759    msgbus::send_any("Portfolio.update_account".into(), &state as &dyn Any);
760}
761
762fn dispatch_order_status_report(report: OrderStatusReport) {
763    let sender = get_exec_event_sender();
764    let exec_report = ExecutionReport::OrderStatus(Box::new(report));
765    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
766        tracing::warn!("Failed to send order status report: {e}");
767    }
768}
769
770fn dispatch_fill_report(report: FillReport) {
771    let sender = get_exec_event_sender();
772    let exec_report = ExecutionReport::Fill(Box::new(report));
773    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
774        tracing::warn!("Failed to send fill report: {e}");
775    }
776}
777
778fn dispatch_position_status_report(report: PositionStatusReport) {
779    let sender = get_exec_event_sender();
780    let exec_report = ExecutionReport::Position(Box::new(report));
781    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
782        tracing::warn!("Failed to send position status report: {e}");
783    }
784}
785
786fn dispatch_order_event(event: OrderEventAny) {
787    let sender = get_exec_event_sender();
788    if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
789        tracing::warn!("Failed to send order event: {e}");
790    }
791}