nautilus_bitmex/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the BitMEX adapter.
17
18pub mod canceller;
19pub mod submitter;
20
21use std::{any::Any, future::Future, sync::Mutex};
22
23use anyhow::Context;
24use async_trait::async_trait;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27    messages::{
28        ExecutionEvent,
29        execution::{
30            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
31            GenerateOrderStatusReport, GeneratePositionReports, ModifyOrder, QueryAccount,
32            QueryOrder, SubmitOrder, SubmitOrderList,
33        },
34    },
35    msgbus,
36    runner::get_exec_event_sender,
37    runtime::get_runtime,
38};
39use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
40use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
41use nautilus_live::execution::client::LiveExecutionClient;
42use nautilus_model::{
43    events::{AccountState, OrderEventAny, OrderRejected},
44    identifiers::{AccountId, VenueOrderId},
45    instruments::Instrument,
46    orders::Order,
47    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48};
49use tokio::task::JoinHandle;
50
51use crate::{
52    config::BitmexExecClientConfig,
53    execution::{
54        canceller::{CancelBroadcaster, CancelBroadcasterConfig},
55        submitter::{SubmitBroadcaster, SubmitBroadcasterConfig},
56    },
57    http::client::BitmexHttpClient,
58    websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
59};
60
61#[derive(Debug)]
62pub struct BitmexExecutionClient {
63    core: ExecutionClientCore,
64    config: BitmexExecClientConfig,
65    http_client: BitmexHttpClient,
66    ws_client: BitmexWebSocketClient,
67    _submitter: SubmitBroadcaster,
68    _canceller: CancelBroadcaster,
69    started: bool,
70    connected: bool,
71    instruments_initialized: bool,
72    ws_stream_handle: Option<JoinHandle<()>>,
73    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
74}
75
76impl BitmexExecutionClient {
77    /// Creates a new [`BitmexExecutionClient`].
78    ///
79    /// # Errors
80    ///
81    /// Returns an error if either the HTTP or WebSocket client fail to construct.
82    pub fn new(core: ExecutionClientCore, config: BitmexExecClientConfig) -> anyhow::Result<Self> {
83        if !config.has_api_credentials() {
84            anyhow::bail!("BitMEX execution client requires API key and secret");
85        }
86
87        let http_client = BitmexHttpClient::new(
88            Some(config.http_base_url()),
89            config.api_key.clone(),
90            config.api_secret.clone(),
91            config.use_testnet,
92            config.http_timeout_secs,
93            config.max_retries,
94            config.retry_delay_initial_ms,
95            config.retry_delay_max_ms,
96            config.recv_window_ms,
97            config.max_requests_per_second,
98            config.max_requests_per_minute,
99            config.http_proxy_url.clone(),
100        )
101        .context("failed to construct BitMEX HTTP client")?;
102
103        let account_id = config.account_id.unwrap_or(core.account_id);
104        let ws_client = BitmexWebSocketClient::new(
105            Some(config.ws_url()),
106            config.api_key.clone(),
107            config.api_secret.clone(),
108            Some(account_id),
109            config.heartbeat_interval_secs,
110        )
111        .context("failed to construct BitMEX execution websocket client")?;
112
113        let pool_size = config.submitter_pool_size.unwrap_or(1);
114        let submitter_proxy_urls = match &config.submitter_proxy_urls {
115            Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
116            None => vec![config.http_proxy_url.clone(); pool_size],
117        };
118
119        let submitter_config = SubmitBroadcasterConfig {
120            pool_size,
121            api_key: config.api_key.clone(),
122            api_secret: config.api_secret.clone(),
123            base_url: config.base_url_http.clone(),
124            testnet: config.use_testnet,
125            timeout_secs: config.http_timeout_secs,
126            max_retries: config.max_retries,
127            retry_delay_ms: config.retry_delay_initial_ms,
128            retry_delay_max_ms: config.retry_delay_max_ms,
129            recv_window_ms: config.recv_window_ms,
130            max_requests_per_second: config.max_requests_per_second,
131            max_requests_per_minute: config.max_requests_per_minute,
132            proxy_urls: submitter_proxy_urls,
133            ..Default::default()
134        };
135
136        let _submitter = SubmitBroadcaster::new(submitter_config)
137            .context("failed to create SubmitBroadcaster")?;
138
139        let canceller_pool_size = config.canceller_pool_size.unwrap_or(1);
140        let canceller_proxy_urls = match &config.canceller_proxy_urls {
141            Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
142            None => vec![config.http_proxy_url.clone(); canceller_pool_size],
143        };
144
145        let canceller_config = CancelBroadcasterConfig {
146            pool_size: canceller_pool_size,
147            api_key: config.api_key.clone(),
148            api_secret: config.api_secret.clone(),
149            base_url: config.base_url_http.clone(),
150            testnet: config.use_testnet,
151            timeout_secs: config.http_timeout_secs,
152            max_retries: config.max_retries,
153            retry_delay_ms: config.retry_delay_initial_ms,
154            retry_delay_max_ms: config.retry_delay_max_ms,
155            recv_window_ms: config.recv_window_ms,
156            max_requests_per_second: config.max_requests_per_second,
157            max_requests_per_minute: config.max_requests_per_minute,
158            proxy_urls: canceller_proxy_urls,
159            ..Default::default()
160        };
161
162        let _canceller = CancelBroadcaster::new(canceller_config)
163            .context("failed to create CancelBroadcaster")?;
164
165        Ok(Self {
166            core,
167            config,
168            http_client,
169            ws_client,
170            _submitter,
171            _canceller,
172            started: false,
173            connected: false,
174            instruments_initialized: false,
175            ws_stream_handle: None,
176            pending_tasks: Mutex::new(Vec::new()),
177        })
178    }
179
180    fn spawn_task<F>(&self, label: &'static str, fut: F)
181    where
182        F: Future<Output = anyhow::Result<()>> + Send + 'static,
183    {
184        let handle = tokio::spawn(async move {
185            if let Err(e) = fut.await {
186                tracing::error!("{label}: {e:?}");
187            }
188        });
189
190        self.pending_tasks
191            .lock()
192            .expect("pending task lock poisoned")
193            .push(handle);
194    }
195
196    fn abort_pending_tasks(&self) {
197        let mut guard = self
198            .pending_tasks
199            .lock()
200            .expect("pending task lock poisoned");
201        for handle in guard.drain(..) {
202            handle.abort();
203        }
204    }
205
206    async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
207        if self.instruments_initialized {
208            return Ok(());
209        }
210
211        let http = self.http_client.clone();
212        let mut instruments = http
213            .request_instruments(self.config.active_only)
214            .await
215            .context("failed to request BitMEX instruments")?;
216
217        instruments.sort_by_key(|instrument| instrument.id());
218
219        for instrument in &instruments {
220            self.http_client.cache_instrument(instrument.clone());
221            self._submitter.cache_instrument(instrument.clone());
222            self._canceller.cache_instrument(instrument.clone());
223        }
224
225        self.ws_client.cache_instruments(instruments);
226
227        self.instruments_initialized = true;
228        Ok(())
229    }
230
231    fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
232        if self.instruments_initialized {
233            return Ok(());
234        }
235
236        let runtime = get_runtime();
237        runtime.block_on(self.ensure_instruments_initialized_async())
238    }
239
240    async fn refresh_account_state(&self) -> anyhow::Result<()> {
241        let account_state = self
242            .http_client
243            .request_account_state(self.core.account_id)
244            .await
245            .context("failed to request BitMEX account state")?;
246
247        dispatch_account_state(account_state);
248        Ok(())
249    }
250
251    fn update_account_state(&self) -> anyhow::Result<()> {
252        let runtime = get_runtime();
253        runtime.block_on(self.refresh_account_state())
254    }
255
256    fn start_ws_stream(&mut self) -> anyhow::Result<()> {
257        if self.ws_stream_handle.is_some() {
258            return Ok(());
259        }
260
261        let stream = self.ws_client.stream();
262
263        let handle = tokio::spawn(async move {
264            pin_mut!(stream);
265            while let Some(message) = stream.next().await {
266                dispatch_ws_message(message);
267            }
268        });
269
270        self.ws_stream_handle = Some(handle);
271        Ok(())
272    }
273}
274
275impl ExecutionClient for BitmexExecutionClient {
276    fn is_connected(&self) -> bool {
277        self.connected
278    }
279
280    fn client_id(&self) -> nautilus_model::identifiers::ClientId {
281        self.core.client_id
282    }
283
284    fn account_id(&self) -> AccountId {
285        self.core.account_id
286    }
287
288    fn venue(&self) -> nautilus_model::identifiers::Venue {
289        self.core.venue
290    }
291
292    fn oms_type(&self) -> nautilus_model::enums::OmsType {
293        self.core.oms_type
294    }
295
296    fn get_account(&self) -> Option<nautilus_model::accounts::AccountAny> {
297        self.core.get_account()
298    }
299
300    fn generate_account_state(
301        &self,
302        balances: Vec<nautilus_model::types::AccountBalance>,
303        margins: Vec<nautilus_model::types::MarginBalance>,
304        reported: bool,
305        ts_event: UnixNanos,
306    ) -> anyhow::Result<()> {
307        self.core
308            .generate_account_state(balances, margins, reported, ts_event)
309    }
310
311    fn start(&mut self) -> anyhow::Result<()> {
312        if self.started {
313            return Ok(());
314        }
315
316        self.ensure_instruments_initialized()?;
317        self.started = true;
318        tracing::info!(
319            client_id = %self.core.client_id,
320            account_id = %self.core.account_id,
321            use_testnet = self.config.use_testnet,
322            submitter_pool_size = ?self.config.submitter_pool_size,
323            canceller_pool_size = ?self.config.canceller_pool_size,
324            http_proxy_url = ?self.config.http_proxy_url,
325            ws_proxy_url = ?self.config.ws_proxy_url,
326            submitter_proxy_urls = ?self.config.submitter_proxy_urls,
327            canceller_proxy_urls = ?self.config.canceller_proxy_urls,
328            "BitMEX execution client started"
329        );
330        Ok(())
331    }
332
333    fn stop(&mut self) -> anyhow::Result<()> {
334        if !self.started {
335            return Ok(());
336        }
337
338        self.started = false;
339        self.connected = false;
340        if let Some(handle) = self.ws_stream_handle.take() {
341            handle.abort();
342        }
343        self.abort_pending_tasks();
344        tracing::info!("BitMEX execution client {} stopped", self.core.client_id);
345        Ok(())
346    }
347
348    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
349        let order = cmd.order.clone();
350
351        if order.is_closed() {
352            tracing::warn!("Cannot submit closed order {}", order.client_order_id());
353            return Ok(());
354        }
355
356        self.core.generate_order_submitted(
357            order.strategy_id(),
358            order.instrument_id(),
359            order.client_order_id(),
360            cmd.ts_init,
361        );
362
363        let submit_tries = cmd
364            .params
365            .as_ref()
366            .and_then(|params| params.get("submit_tries"))
367            .and_then(|s| s.parse::<usize>().ok())
368            .filter(|&n| n > 0);
369
370        let use_broadcaster = submit_tries.is_some_and(|n| n > 1);
371
372        let http_client = self.http_client.clone();
373        let submitter = self._submitter.clone_for_async();
374        let trader_id = self.core.trader_id;
375        let strategy_id = order.strategy_id();
376        let instrument_id = order.instrument_id();
377        let account_id = self.core.account_id;
378        let client_order_id = order.client_order_id();
379        let order_side = order.order_side();
380        let order_type = order.order_type();
381        let quantity = order.quantity();
382        let time_in_force = order.time_in_force();
383        let price = order.price();
384        let trigger_price = order.trigger_price();
385        let trigger_type = order.trigger_type();
386        let display_qty = order.display_qty();
387        let post_only = order.is_post_only();
388        let reduce_only = order.is_reduce_only();
389        let order_list_id = order.order_list_id();
390        let contingency_type = order.contingency_type();
391        let ts_event = cmd.ts_init;
392
393        self.spawn_task("submit_order", async move {
394            let result = if use_broadcaster {
395                submitter
396                    .broadcast_submit(
397                        instrument_id,
398                        client_order_id,
399                        order_side,
400                        order_type,
401                        quantity,
402                        time_in_force,
403                        price,
404                        trigger_price,
405                        trigger_type,
406                        display_qty,
407                        post_only,
408                        reduce_only,
409                        order_list_id,
410                        contingency_type,
411                        submit_tries,
412                    )
413                    .await
414            } else {
415                http_client
416                    .submit_order(
417                        instrument_id,
418                        client_order_id,
419                        order_side,
420                        order_type,
421                        quantity,
422                        time_in_force,
423                        price,
424                        trigger_price,
425                        trigger_type,
426                        display_qty,
427                        post_only,
428                        reduce_only,
429                        order_list_id,
430                        contingency_type,
431                    )
432                    .await
433            };
434
435            match result {
436                Ok(report) => dispatch_order_status_report(report),
437                Err(e) => {
438                    let event = OrderRejected::new(
439                        trader_id,
440                        strategy_id,
441                        instrument_id,
442                        client_order_id,
443                        account_id,
444                        format!("submit-order-error: {e}").into(),
445                        UUID4::new(),
446                        ts_event,
447                        get_atomic_clock_realtime().get_time_ns(),
448                        false,
449                        post_only,
450                    );
451                    dispatch_order_event(OrderEventAny::Rejected(event));
452                }
453            }
454            Ok(())
455        });
456
457        Ok(())
458    }
459
460    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
461        tracing::warn!(
462            "submit_order_list not yet implemented for BitMEX execution client ({} orders)",
463            cmd.order_list.orders.len()
464        );
465        Ok(())
466    }
467
468    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
469        let http_client = self.http_client.clone();
470        let instrument_id = cmd.instrument_id;
471        let client_order_id = Some(cmd.client_order_id);
472        let venue_order_id = Some(cmd.venue_order_id);
473        let quantity = cmd.quantity;
474        let price = cmd.price;
475        let trigger_price = cmd.trigger_price;
476
477        self.spawn_task("modify_order", async move {
478            match http_client
479                .modify_order(
480                    instrument_id,
481                    client_order_id,
482                    venue_order_id,
483                    quantity,
484                    price,
485                    trigger_price,
486                )
487                .await
488            {
489                Ok(report) => dispatch_order_status_report(report),
490                Err(e) => tracing::error!("BitMEX modify order failed: {e:?}"),
491            }
492            Ok(())
493        });
494
495        Ok(())
496    }
497
498    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
499        let canceller = self._canceller.clone_for_async();
500        let instrument_id = cmd.instrument_id;
501        let client_order_id = Some(cmd.client_order_id);
502        let venue_order_id = Some(cmd.venue_order_id);
503
504        self.spawn_task("cancel_order", async move {
505            match canceller
506                .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
507                .await
508            {
509                Ok(Some(report)) => dispatch_order_status_report(report),
510                Ok(None) => {
511                    // Idempotent success - order already cancelled
512                    tracing::debug!("Order already cancelled: {:?}", client_order_id);
513                }
514                Err(e) => tracing::error!("BitMEX cancel order failed: {e:?}"),
515            }
516            Ok(())
517        });
518
519        Ok(())
520    }
521
522    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
523        let canceller = self._canceller.clone_for_async();
524        let instrument_id = cmd.instrument_id;
525        let order_side = Some(cmd.order_side);
526
527        self.spawn_task("cancel_all_orders", async move {
528            match canceller
529                .broadcast_cancel_all(instrument_id, order_side)
530                .await
531            {
532                Ok(reports) => {
533                    for report in reports {
534                        dispatch_order_status_report(report);
535                    }
536                }
537                Err(e) => tracing::error!("BitMEX cancel all failed: {e:?}"),
538            }
539            Ok(())
540        });
541
542        Ok(())
543    }
544
545    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
546        let canceller = self._canceller.clone_for_async();
547        let instrument_id = cmd.instrument_id;
548        let venue_ids: Vec<VenueOrderId> = cmd
549            .cancels
550            .iter()
551            .map(|cancel| cancel.venue_order_id)
552            .collect();
553
554        self.spawn_task("batch_cancel_orders", async move {
555            match canceller
556                .broadcast_batch_cancel(instrument_id, None, Some(venue_ids))
557                .await
558            {
559                Ok(reports) => {
560                    for report in reports {
561                        dispatch_order_status_report(report);
562                    }
563                }
564                Err(e) => tracing::error!("BitMEX batch cancel failed: {e:?}"),
565            }
566            Ok(())
567        });
568
569        Ok(())
570    }
571
572    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
573        self.update_account_state()
574    }
575
576    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
577        let http_client = self.http_client.clone();
578        let instrument_id = cmd.instrument_id;
579        let client_order_id = Some(cmd.client_order_id);
580        let venue_order_id = Some(cmd.venue_order_id);
581
582        self.spawn_task("query_order", async move {
583            match http_client
584                .request_order_status_report(instrument_id, client_order_id, venue_order_id)
585                .await
586            {
587                Ok(report) => dispatch_order_status_report(report),
588                Err(e) => tracing::error!("BitMEX query order failed: {e:?}"),
589            }
590            Ok(())
591        });
592
593        Ok(())
594    }
595}
596
597#[async_trait(?Send)]
598impl LiveExecutionClient for BitmexExecutionClient {
599    async fn connect(&mut self) -> anyhow::Result<()> {
600        if self.connected {
601            return Ok(());
602        }
603
604        self.ensure_instruments_initialized_async().await?;
605
606        self._submitter.start().await?;
607        self._canceller.start().await?;
608
609        self.ws_client.connect().await?;
610        self.ws_client.wait_until_active(10.0).await?;
611
612        self.ws_client.subscribe_orders().await?;
613        self.ws_client.subscribe_executions().await?;
614        self.ws_client.subscribe_positions().await?;
615        self.ws_client.subscribe_wallet().await?;
616        if let Err(e) = self.ws_client.subscribe_margin().await {
617            tracing::debug!("Margin subscription unavailable: {e:?}");
618        }
619
620        self.start_ws_stream()?;
621        self.refresh_account_state().await?;
622
623        self.connected = true;
624        self.core.set_connected(true);
625        tracing::info!(client_id = %self.core.client_id, "Connected");
626        Ok(())
627    }
628
629    async fn disconnect(&mut self) -> anyhow::Result<()> {
630        if !self.connected {
631            return Ok(());
632        }
633
634        self.http_client.cancel_all_requests();
635        self._submitter.stop().await;
636        self._canceller.stop().await;
637
638        if let Err(e) = self.ws_client.close().await {
639            tracing::warn!("Error while closing BitMEX execution websocket: {e:?}");
640        }
641
642        if let Some(handle) = self.ws_stream_handle.take() {
643            handle.abort();
644        }
645
646        self.abort_pending_tasks();
647        self.connected = false;
648        self.core.set_connected(false);
649        tracing::info!(client_id = %self.core.client_id, "Disconnected");
650        Ok(())
651    }
652
653    fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ExecutionEvent> {
654        get_exec_event_sender()
655    }
656
657    fn get_clock(&self) -> std::cell::Ref<'_, dyn nautilus_common::clock::Clock> {
658        self.core.clock().borrow()
659    }
660
661    async fn generate_order_status_report(
662        &self,
663        cmd: &GenerateOrderStatusReport,
664    ) -> anyhow::Result<Option<OrderStatusReport>> {
665        let instrument_id = cmd
666            .instrument_id
667            .context("BitMEX generate_order_status_report requires an instrument identifier")?;
668
669        self.http_client
670            .query_order(
671                instrument_id,
672                cmd.client_order_id,
673                cmd.venue_order_id.map(|id| VenueOrderId::from(id.as_str())),
674            )
675            .await
676            .context("failed to query BitMEX order status")
677    }
678
679    async fn generate_order_status_reports(
680        &self,
681        cmd: &GenerateOrderStatusReport,
682    ) -> anyhow::Result<Vec<OrderStatusReport>> {
683        let reports = self
684            .http_client
685            .request_order_status_reports(cmd.instrument_id, false, None)
686            .await
687            .context("failed to request BitMEX order status reports")?;
688        Ok(reports)
689    }
690
691    async fn generate_fill_reports(
692        &self,
693        cmd: GenerateFillReports,
694    ) -> anyhow::Result<Vec<FillReport>> {
695        let mut reports = self
696            .http_client
697            .request_fill_reports(cmd.instrument_id, None)
698            .await
699            .context("failed to request BitMEX fill reports")?;
700
701        if let Some(order_id) = cmd.venue_order_id {
702            reports.retain(|report| report.venue_order_id.as_str() == order_id.as_str());
703        }
704
705        Ok(reports)
706    }
707
708    async fn generate_position_status_reports(
709        &self,
710        cmd: &GeneratePositionReports,
711    ) -> anyhow::Result<Vec<PositionStatusReport>> {
712        let mut reports = self
713            .http_client
714            .request_position_status_reports()
715            .await
716            .context("failed to request BitMEX position reports")?;
717
718        if let Some(instrument_id) = cmd.instrument_id {
719            reports.retain(|report| report.instrument_id == instrument_id);
720        }
721
722        Ok(reports)
723    }
724
725    async fn generate_mass_status(
726        &self,
727        _lookback_mins: Option<u64>,
728    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
729        tracing::warn!("generate_mass_status not yet implemented for BitMEX execution client");
730        Ok(None)
731    }
732}
733
734fn dispatch_ws_message(message: NautilusWsMessage) {
735    match message {
736        NautilusWsMessage::OrderStatusReports(reports) => {
737            for report in reports {
738                dispatch_order_status_report(report);
739            }
740        }
741        NautilusWsMessage::FillReports(reports) => {
742            for report in reports {
743                dispatch_fill_report(report);
744            }
745        }
746        NautilusWsMessage::PositionStatusReport(report) => {
747            dispatch_position_status_report(report);
748        }
749        NautilusWsMessage::AccountState(state) => dispatch_account_state(state),
750        NautilusWsMessage::OrderUpdated(event) => {
751            dispatch_order_event(OrderEventAny::Updated(event));
752        }
753        NautilusWsMessage::Data(_)
754        | NautilusWsMessage::Instruments(_)
755        | NautilusWsMessage::FundingRateUpdates(_) => {
756            tracing::debug!("Ignoring BitMEX data message on execution stream");
757        }
758        NautilusWsMessage::Reconnected => {
759            tracing::info!("BitMEX execution websocket reconnected");
760        }
761        NautilusWsMessage::Authenticated => {
762            tracing::debug!("BitMEX execution websocket authenticated");
763        }
764    }
765}
766
767fn dispatch_account_state(state: AccountState) {
768    msgbus::send_any("Portfolio.update_account".into(), &state as &dyn Any);
769}
770
771fn dispatch_order_status_report(report: OrderStatusReport) {
772    let sender = get_exec_event_sender();
773    let exec_report = nautilus_common::messages::ExecutionReport::OrderStatus(Box::new(report));
774    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
775        tracing::warn!("Failed to send order status report: {e}");
776    }
777}
778
779fn dispatch_fill_report(report: FillReport) {
780    let sender = get_exec_event_sender();
781    let exec_report = nautilus_common::messages::ExecutionReport::Fill(Box::new(report));
782    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
783        tracing::warn!("Failed to send fill report: {e}");
784    }
785}
786
787fn dispatch_position_status_report(report: PositionStatusReport) {
788    let sender = get_exec_event_sender();
789    let exec_report = nautilus_common::messages::ExecutionReport::Position(Box::new(report));
790    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
791        tracing::warn!("Failed to send position status report: {e}");
792    }
793}
794
795fn dispatch_order_event(event: OrderEventAny) {
796    let sender = get_exec_event_sender();
797    if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
798        tracing::warn!("Failed to send order event: {e}");
799    }
800}