nautilus_bitmex/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the BitMEX adapter.
17
18pub mod canceller;
19pub mod submitter;
20
21use std::{any::Any, future::Future, sync::Mutex};
22
23use anyhow::Context;
24use async_trait::async_trait;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27    clients::ExecutionClient,
28    live::{runner::get_exec_event_sender, runtime::get_runtime},
29    messages::{
30        ExecutionEvent, ExecutionReport,
31        execution::{
32            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
33            GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
34            ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
35        },
36    },
37    msgbus,
38};
39use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
40use nautilus_live::ExecutionClientCore;
41use nautilus_model::{
42    accounts::AccountAny,
43    enums::OmsType,
44    events::{AccountState, OrderEventAny, OrderRejected},
45    identifiers::{AccountId, ClientId, Venue, VenueOrderId},
46    instruments::Instrument,
47    orders::Order,
48    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
49    types::{AccountBalance, MarginBalance},
50};
51use tokio::task::JoinHandle;
52
53use crate::{
54    config::BitmexExecClientConfig,
55    execution::{
56        canceller::{CancelBroadcaster, CancelBroadcasterConfig},
57        submitter::{SubmitBroadcaster, SubmitBroadcasterConfig},
58    },
59    http::client::BitmexHttpClient,
60    websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
61};
62
63#[derive(Debug)]
64pub struct BitmexExecutionClient {
65    core: ExecutionClientCore,
66    config: BitmexExecClientConfig,
67    http_client: BitmexHttpClient,
68    ws_client: BitmexWebSocketClient,
69    _submitter: SubmitBroadcaster,
70    _canceller: CancelBroadcaster,
71    started: bool,
72    connected: bool,
73    instruments_initialized: bool,
74    ws_stream_handle: Option<JoinHandle<()>>,
75    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
76}
77
78impl BitmexExecutionClient {
79    /// Creates a new [`BitmexExecutionClient`].
80    ///
81    /// # Errors
82    ///
83    /// Returns an error if either the HTTP or WebSocket client fail to construct.
84    pub fn new(core: ExecutionClientCore, config: BitmexExecClientConfig) -> anyhow::Result<Self> {
85        if !config.has_api_credentials() {
86            anyhow::bail!("BitMEX execution client requires API key and secret");
87        }
88
89        let http_client = BitmexHttpClient::new(
90            Some(config.http_base_url()),
91            config.api_key.clone(),
92            config.api_secret.clone(),
93            config.use_testnet,
94            config.http_timeout_secs,
95            config.max_retries,
96            config.retry_delay_initial_ms,
97            config.retry_delay_max_ms,
98            config.recv_window_ms,
99            config.max_requests_per_second,
100            config.max_requests_per_minute,
101            config.http_proxy_url.clone(),
102        )
103        .context("failed to construct BitMEX HTTP client")?;
104
105        let account_id = config.account_id.unwrap_or(core.account_id);
106        let ws_client = BitmexWebSocketClient::new(
107            Some(config.ws_url()),
108            config.api_key.clone(),
109            config.api_secret.clone(),
110            Some(account_id),
111            config.heartbeat_interval_secs,
112        )
113        .context("failed to construct BitMEX execution websocket client")?;
114
115        let pool_size = config.submitter_pool_size.unwrap_or(1);
116        let submitter_proxy_urls = match &config.submitter_proxy_urls {
117            Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
118            None => vec![config.http_proxy_url.clone(); pool_size],
119        };
120
121        let submitter_config = SubmitBroadcasterConfig {
122            pool_size,
123            api_key: config.api_key.clone(),
124            api_secret: config.api_secret.clone(),
125            base_url: config.base_url_http.clone(),
126            testnet: config.use_testnet,
127            timeout_secs: config.http_timeout_secs,
128            max_retries: config.max_retries,
129            retry_delay_ms: config.retry_delay_initial_ms,
130            retry_delay_max_ms: config.retry_delay_max_ms,
131            recv_window_ms: config.recv_window_ms,
132            max_requests_per_second: config.max_requests_per_second,
133            max_requests_per_minute: config.max_requests_per_minute,
134            proxy_urls: submitter_proxy_urls,
135            ..Default::default()
136        };
137
138        let _submitter = SubmitBroadcaster::new(submitter_config)
139            .context("failed to create SubmitBroadcaster")?;
140
141        let canceller_pool_size = config.canceller_pool_size.unwrap_or(1);
142        let canceller_proxy_urls = match &config.canceller_proxy_urls {
143            Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
144            None => vec![config.http_proxy_url.clone(); canceller_pool_size],
145        };
146
147        let canceller_config = CancelBroadcasterConfig {
148            pool_size: canceller_pool_size,
149            api_key: config.api_key.clone(),
150            api_secret: config.api_secret.clone(),
151            base_url: config.base_url_http.clone(),
152            testnet: config.use_testnet,
153            timeout_secs: config.http_timeout_secs,
154            max_retries: config.max_retries,
155            retry_delay_ms: config.retry_delay_initial_ms,
156            retry_delay_max_ms: config.retry_delay_max_ms,
157            recv_window_ms: config.recv_window_ms,
158            max_requests_per_second: config.max_requests_per_second,
159            max_requests_per_minute: config.max_requests_per_minute,
160            proxy_urls: canceller_proxy_urls,
161            ..Default::default()
162        };
163
164        let _canceller = CancelBroadcaster::new(canceller_config)
165            .context("failed to create CancelBroadcaster")?;
166
167        Ok(Self {
168            core,
169            config,
170            http_client,
171            ws_client,
172            _submitter,
173            _canceller,
174            started: false,
175            connected: false,
176            instruments_initialized: false,
177            ws_stream_handle: None,
178            pending_tasks: Mutex::new(Vec::new()),
179        })
180    }
181
182    fn spawn_task<F>(&self, label: &'static str, fut: F)
183    where
184        F: Future<Output = anyhow::Result<()>> + Send + 'static,
185    {
186        let handle = get_runtime().spawn(async move {
187            if let Err(e) = fut.await {
188                log::error!("{label}: {e:?}");
189            }
190        });
191
192        self.pending_tasks
193            .lock()
194            .expect("pending task lock poisoned")
195            .push(handle);
196    }
197
198    fn abort_pending_tasks(&self) {
199        let mut guard = self
200            .pending_tasks
201            .lock()
202            .expect("pending task lock poisoned");
203        for handle in guard.drain(..) {
204            handle.abort();
205        }
206    }
207
208    async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
209        if self.instruments_initialized {
210            return Ok(());
211        }
212
213        let http = self.http_client.clone();
214        let mut instruments = http
215            .request_instruments(self.config.active_only)
216            .await
217            .context("failed to request BitMEX instruments")?;
218
219        instruments.sort_by_key(|instrument| instrument.id());
220
221        for instrument in &instruments {
222            self.http_client.cache_instrument(instrument.clone());
223            self._submitter.cache_instrument(instrument.clone());
224            self._canceller.cache_instrument(instrument.clone());
225        }
226
227        self.ws_client.cache_instruments(instruments);
228
229        self.instruments_initialized = true;
230        Ok(())
231    }
232
233    fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
234        if self.instruments_initialized {
235            return Ok(());
236        }
237
238        let runtime = get_runtime();
239        runtime.block_on(self.ensure_instruments_initialized_async())
240    }
241
242    async fn refresh_account_state(&self) -> anyhow::Result<()> {
243        let account_state = self
244            .http_client
245            .request_account_state(self.core.account_id)
246            .await
247            .context("failed to request BitMEX account state")?;
248
249        dispatch_account_state(account_state);
250        Ok(())
251    }
252
253    fn update_account_state(&self) -> anyhow::Result<()> {
254        let runtime = get_runtime();
255        runtime.block_on(self.refresh_account_state())
256    }
257
258    fn start_ws_stream(&mut self) -> anyhow::Result<()> {
259        if self.ws_stream_handle.is_some() {
260            return Ok(());
261        }
262
263        let stream = self.ws_client.stream();
264
265        let handle = get_runtime().spawn(async move {
266            pin_mut!(stream);
267            while let Some(message) = stream.next().await {
268                dispatch_ws_message(message);
269            }
270        });
271
272        self.ws_stream_handle = Some(handle);
273        Ok(())
274    }
275}
276
277#[async_trait(?Send)]
278impl ExecutionClient for BitmexExecutionClient {
279    fn is_connected(&self) -> bool {
280        self.connected
281    }
282
283    fn client_id(&self) -> ClientId {
284        self.core.client_id
285    }
286
287    fn account_id(&self) -> AccountId {
288        self.core.account_id
289    }
290
291    fn venue(&self) -> Venue {
292        self.core.venue
293    }
294
295    fn oms_type(&self) -> OmsType {
296        self.core.oms_type
297    }
298
299    fn get_account(&self) -> Option<AccountAny> {
300        self.core.get_account()
301    }
302
303    fn generate_account_state(
304        &self,
305        balances: Vec<AccountBalance>,
306        margins: Vec<MarginBalance>,
307        reported: bool,
308        ts_event: UnixNanos,
309    ) -> anyhow::Result<()> {
310        self.core
311            .generate_account_state(balances, margins, reported, ts_event)
312    }
313
314    fn start(&mut self) -> anyhow::Result<()> {
315        if self.started {
316            return Ok(());
317        }
318
319        self.ensure_instruments_initialized()?;
320        self.started = true;
321        log::info!(
322            "BitMEX execution client started: client_id={}, account_id={}, use_testnet={}, submitter_pool_size={:?}, canceller_pool_size={:?}, http_proxy_url={:?}, ws_proxy_url={:?}, submitter_proxy_urls={:?}, canceller_proxy_urls={:?}",
323            self.core.client_id,
324            self.core.account_id,
325            self.config.use_testnet,
326            self.config.submitter_pool_size,
327            self.config.canceller_pool_size,
328            self.config.http_proxy_url,
329            self.config.ws_proxy_url,
330            self.config.submitter_proxy_urls,
331            self.config.canceller_proxy_urls,
332        );
333        Ok(())
334    }
335
336    fn stop(&mut self) -> anyhow::Result<()> {
337        if !self.started {
338            return Ok(());
339        }
340
341        self.started = false;
342        self.connected = false;
343        if let Some(handle) = self.ws_stream_handle.take() {
344            handle.abort();
345        }
346        self.abort_pending_tasks();
347        log::info!("BitMEX execution client {} stopped", self.core.client_id);
348        Ok(())
349    }
350
351    async fn connect(&mut self) -> anyhow::Result<()> {
352        if self.connected {
353            return Ok(());
354        }
355
356        self.ensure_instruments_initialized_async().await?;
357
358        self._submitter.start().await?;
359        self._canceller.start().await?;
360
361        self.ws_client.connect().await?;
362        self.ws_client.wait_until_active(10.0).await?;
363
364        self.ws_client.subscribe_orders().await?;
365        self.ws_client.subscribe_executions().await?;
366        self.ws_client.subscribe_positions().await?;
367        self.ws_client.subscribe_wallet().await?;
368        if let Err(e) = self.ws_client.subscribe_margin().await {
369            log::debug!("Margin subscription unavailable: {e:?}");
370        }
371
372        self.start_ws_stream()?;
373        self.refresh_account_state().await?;
374
375        self.connected = true;
376        self.core.set_connected(true);
377        log::info!("Connected: client_id={}", self.core.client_id);
378        Ok(())
379    }
380
381    async fn disconnect(&mut self) -> anyhow::Result<()> {
382        if !self.connected {
383            return Ok(());
384        }
385
386        self.http_client.cancel_all_requests();
387        self._submitter.stop().await;
388        self._canceller.stop().await;
389
390        if let Err(e) = self.ws_client.close().await {
391            log::warn!("Error while closing BitMEX execution websocket: {e:?}");
392        }
393
394        if let Some(handle) = self.ws_stream_handle.take() {
395            handle.abort();
396        }
397
398        self.abort_pending_tasks();
399        self.connected = false;
400        self.core.set_connected(false);
401        log::info!("Disconnected: client_id={}", self.core.client_id);
402        Ok(())
403    }
404
405    async fn generate_order_status_report(
406        &self,
407        cmd: &GenerateOrderStatusReport,
408    ) -> anyhow::Result<Option<OrderStatusReport>> {
409        let instrument_id = cmd
410            .instrument_id
411            .context("BitMEX generate_order_status_report requires an instrument identifier")?;
412
413        self.http_client
414            .query_order(
415                instrument_id,
416                cmd.client_order_id,
417                cmd.venue_order_id.map(|id| VenueOrderId::from(id.as_str())),
418            )
419            .await
420            .context("failed to query BitMEX order status")
421    }
422
423    async fn generate_order_status_reports(
424        &self,
425        cmd: &GenerateOrderStatusReports,
426    ) -> anyhow::Result<Vec<OrderStatusReport>> {
427        let reports = self
428            .http_client
429            .request_order_status_reports(cmd.instrument_id, cmd.open_only, None)
430            .await
431            .context("failed to request BitMEX order status reports")?;
432        Ok(reports)
433    }
434
435    async fn generate_fill_reports(
436        &self,
437        cmd: GenerateFillReports,
438    ) -> anyhow::Result<Vec<FillReport>> {
439        let mut reports = self
440            .http_client
441            .request_fill_reports(cmd.instrument_id, None)
442            .await
443            .context("failed to request BitMEX fill reports")?;
444
445        if let Some(order_id) = cmd.venue_order_id {
446            reports.retain(|report| report.venue_order_id.as_str() == order_id.as_str());
447        }
448
449        Ok(reports)
450    }
451
452    async fn generate_position_status_reports(
453        &self,
454        cmd: &GeneratePositionStatusReports,
455    ) -> anyhow::Result<Vec<PositionStatusReport>> {
456        let mut reports = self
457            .http_client
458            .request_position_status_reports()
459            .await
460            .context("failed to request BitMEX position reports")?;
461
462        if let Some(instrument_id) = cmd.instrument_id {
463            reports.retain(|report| report.instrument_id == instrument_id);
464        }
465
466        Ok(reports)
467    }
468
469    async fn generate_mass_status(
470        &self,
471        _lookback_mins: Option<u64>,
472    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
473        log::warn!("generate_mass_status not yet implemented for BitMEX execution client");
474        Ok(None)
475    }
476
477    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
478        self.update_account_state()
479    }
480
481    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
482        let http_client = self.http_client.clone();
483        let instrument_id = cmd.instrument_id;
484        let client_order_id = Some(cmd.client_order_id);
485        let venue_order_id = cmd.venue_order_id;
486
487        self.spawn_task("query_order", async move {
488            match http_client
489                .request_order_status_report(instrument_id, client_order_id, venue_order_id)
490                .await
491            {
492                Ok(report) => dispatch_order_status_report(report),
493                Err(e) => log::error!("BitMEX query order failed: {e:?}"),
494            }
495            Ok(())
496        });
497
498        Ok(())
499    }
500
501    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
502        let order = cmd.order.clone();
503
504        if order.is_closed() {
505            log::warn!("Cannot submit closed order {}", order.client_order_id());
506            return Ok(());
507        }
508
509        self.core.generate_order_submitted(
510            order.strategy_id(),
511            order.instrument_id(),
512            order.client_order_id(),
513            cmd.ts_init,
514        );
515
516        let submit_tries = cmd
517            .params
518            .as_ref()
519            .and_then(|params| params.get("submit_tries"))
520            .and_then(|s| s.parse::<usize>().ok())
521            .filter(|&n| n > 0);
522
523        let use_broadcaster = submit_tries.is_some_and(|n| n > 1);
524
525        let http_client = self.http_client.clone();
526        let submitter = self._submitter.clone_for_async();
527        let trader_id = self.core.trader_id;
528        let strategy_id = order.strategy_id();
529        let instrument_id = order.instrument_id();
530        let account_id = self.core.account_id;
531        let client_order_id = order.client_order_id();
532        let order_side = order.order_side();
533        let order_type = order.order_type();
534        let quantity = order.quantity();
535        let time_in_force = order.time_in_force();
536        let price = order.price();
537        let trigger_price = order.trigger_price();
538        let trigger_type = order.trigger_type();
539        let display_qty = order.display_qty();
540        let post_only = order.is_post_only();
541        let reduce_only = order.is_reduce_only();
542        let order_list_id = order.order_list_id();
543        let contingency_type = order.contingency_type();
544        let ts_event = cmd.ts_init;
545
546        self.spawn_task("submit_order", async move {
547            let result = if use_broadcaster {
548                submitter
549                    .broadcast_submit(
550                        instrument_id,
551                        client_order_id,
552                        order_side,
553                        order_type,
554                        quantity,
555                        time_in_force,
556                        price,
557                        trigger_price,
558                        trigger_type,
559                        display_qty,
560                        post_only,
561                        reduce_only,
562                        order_list_id,
563                        contingency_type,
564                        submit_tries,
565                    )
566                    .await
567            } else {
568                http_client
569                    .submit_order(
570                        instrument_id,
571                        client_order_id,
572                        order_side,
573                        order_type,
574                        quantity,
575                        time_in_force,
576                        price,
577                        trigger_price,
578                        trigger_type,
579                        display_qty,
580                        post_only,
581                        reduce_only,
582                        order_list_id,
583                        contingency_type,
584                    )
585                    .await
586            };
587
588            match result {
589                Ok(report) => dispatch_order_status_report(report),
590                Err(e) => {
591                    let event = OrderRejected::new(
592                        trader_id,
593                        strategy_id,
594                        instrument_id,
595                        client_order_id,
596                        account_id,
597                        format!("submit-order-error: {e}").into(),
598                        UUID4::new(),
599                        ts_event,
600                        get_atomic_clock_realtime().get_time_ns(),
601                        false,
602                        post_only,
603                    );
604                    dispatch_order_event(OrderEventAny::Rejected(event));
605                }
606            }
607            Ok(())
608        });
609
610        Ok(())
611    }
612
613    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
614        log::warn!(
615            "submit_order_list not yet implemented for BitMEX execution client ({} orders)",
616            cmd.order_list.orders.len()
617        );
618        Ok(())
619    }
620
621    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
622        let http_client = self.http_client.clone();
623        let instrument_id = cmd.instrument_id;
624        let client_order_id = Some(cmd.client_order_id);
625        let venue_order_id = cmd.venue_order_id;
626        let quantity = cmd.quantity;
627        let price = cmd.price;
628        let trigger_price = cmd.trigger_price;
629
630        self.spawn_task("modify_order", async move {
631            match http_client
632                .modify_order(
633                    instrument_id,
634                    client_order_id,
635                    venue_order_id,
636                    quantity,
637                    price,
638                    trigger_price,
639                )
640                .await
641            {
642                Ok(report) => dispatch_order_status_report(report),
643                Err(e) => log::error!("BitMEX modify order failed: {e:?}"),
644            }
645            Ok(())
646        });
647
648        Ok(())
649    }
650
651    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
652        let canceller = self._canceller.clone_for_async();
653        let instrument_id = cmd.instrument_id;
654        let client_order_id = Some(cmd.client_order_id);
655        let venue_order_id = cmd.venue_order_id;
656
657        self.spawn_task("cancel_order", async move {
658            match canceller
659                .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
660                .await
661            {
662                Ok(Some(report)) => dispatch_order_status_report(report),
663                Ok(None) => {
664                    // Idempotent success - order already cancelled
665                    log::debug!("Order already cancelled: {client_order_id:?}");
666                }
667                Err(e) => log::error!("BitMEX cancel order failed: {e:?}"),
668            }
669            Ok(())
670        });
671
672        Ok(())
673    }
674
675    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
676        let canceller = self._canceller.clone_for_async();
677        let instrument_id = cmd.instrument_id;
678        let order_side = Some(cmd.order_side);
679
680        self.spawn_task("cancel_all_orders", async move {
681            match canceller
682                .broadcast_cancel_all(instrument_id, order_side)
683                .await
684            {
685                Ok(reports) => {
686                    for report in reports {
687                        dispatch_order_status_report(report);
688                    }
689                }
690                Err(e) => log::error!("BitMEX cancel all failed: {e:?}"),
691            }
692            Ok(())
693        });
694
695        Ok(())
696    }
697
698    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
699        let canceller = self._canceller.clone_for_async();
700        let instrument_id = cmd.instrument_id;
701        let venue_ids: Vec<VenueOrderId> = cmd
702            .cancels
703            .iter()
704            .filter_map(|cancel| cancel.venue_order_id)
705            .collect();
706
707        self.spawn_task("batch_cancel_orders", async move {
708            match canceller
709                .broadcast_batch_cancel(instrument_id, None, Some(venue_ids))
710                .await
711            {
712                Ok(reports) => {
713                    for report in reports {
714                        dispatch_order_status_report(report);
715                    }
716                }
717                Err(e) => log::error!("BitMEX batch cancel failed: {e:?}"),
718            }
719            Ok(())
720        });
721
722        Ok(())
723    }
724}
725
726fn dispatch_ws_message(message: NautilusWsMessage) {
727    match message {
728        NautilusWsMessage::OrderStatusReports(reports) => {
729            for report in reports {
730                dispatch_order_status_report(report);
731            }
732        }
733        NautilusWsMessage::FillReports(reports) => {
734            for report in reports {
735                dispatch_fill_report(report);
736            }
737        }
738        NautilusWsMessage::PositionStatusReport(report) => {
739            dispatch_position_status_report(report);
740        }
741        NautilusWsMessage::AccountState(state) => dispatch_account_state(state),
742        NautilusWsMessage::OrderUpdated(event) => {
743            dispatch_order_event(OrderEventAny::Updated(event));
744        }
745        NautilusWsMessage::Data(_)
746        | NautilusWsMessage::Instruments(_)
747        | NautilusWsMessage::FundingRateUpdates(_) => {
748            log::debug!("Ignoring BitMEX data message on execution stream");
749        }
750        NautilusWsMessage::Reconnected => {
751            log::info!("BitMEX execution websocket reconnected");
752        }
753        NautilusWsMessage::Authenticated => {
754            log::debug!("BitMEX execution websocket authenticated");
755        }
756    }
757}
758
759fn dispatch_account_state(state: AccountState) {
760    msgbus::send_any("Portfolio.update_account".into(), &state as &dyn Any);
761}
762
763fn dispatch_order_status_report(report: OrderStatusReport) {
764    let sender = get_exec_event_sender();
765    let exec_report = ExecutionReport::Order(Box::new(report));
766    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
767        log::warn!("Failed to send order status report: {e}");
768    }
769}
770
771fn dispatch_fill_report(report: FillReport) {
772    let sender = get_exec_event_sender();
773    let exec_report = ExecutionReport::Fill(Box::new(report));
774    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
775        log::warn!("Failed to send fill report: {e}");
776    }
777}
778
779fn dispatch_position_status_report(report: PositionStatusReport) {
780    let sender = get_exec_event_sender();
781    let exec_report = ExecutionReport::Position(Box::new(report));
782    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
783        log::warn!("Failed to send position status report: {e}");
784    }
785}
786
787fn dispatch_order_event(event: OrderEventAny) {
788    let sender = get_exec_event_sender();
789    if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
790        log::warn!("Failed to send order event: {e}");
791    }
792}