Skip to main content

nautilus_bitmex/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the BitMEX adapter.
17
18use std::{future::Future, str::FromStr, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use futures_util::{StreamExt, pin_mut};
23use indexmap::IndexMap;
24use nautilus_common::{
25    clients::ExecutionClient,
26    enums::LogLevel,
27    live::{get_runtime, runner::get_exec_event_sender},
28    messages::execution::{
29        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
30        GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
31        GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
32        GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder, SubmitOrder,
33        SubmitOrderList,
34    },
35};
36use nautilus_core::{
37    UnixNanos,
38    time::{AtomicTime, get_atomic_clock_realtime},
39};
40use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
41use nautilus_model::{
42    accounts::AccountAny,
43    enums::{AccountType, OmsType, OrderSide},
44    events::OrderEventAny,
45    identifiers::{AccountId, ClientId, ClientOrderId, Venue, VenueOrderId},
46    instruments::{Instrument, InstrumentAny},
47    orders::{Order, OrderAny},
48    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
49    types::{AccountBalance, MarginBalance},
50};
51use rust_decimal::prelude::ToPrimitive;
52use tokio::task::JoinHandle;
53
54use crate::{
55    broadcast::{
56        canceller::{CancelBroadcaster, CancelBroadcasterConfig},
57        submitter::{SubmitBroadcaster, SubmitBroadcasterConfig},
58    },
59    common::enums::BitmexPegPriceType,
60    config::BitmexExecClientConfig,
61    http::client::BitmexHttpClient,
62    websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
63};
64
65#[derive(Debug)]
66pub struct BitmexExecutionClient {
67    core: ExecutionClientCore,
68    clock: &'static AtomicTime,
69    config: BitmexExecClientConfig,
70    emitter: ExecutionEventEmitter,
71    http_client: BitmexHttpClient,
72    ws_client: BitmexWebSocketClient,
73    _submitter: SubmitBroadcaster,
74    _canceller: CancelBroadcaster,
75    ws_stream_handle: Option<JoinHandle<()>>,
76    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
77}
78
79impl BitmexExecutionClient {
80    fn log_report_receipt(count: usize, report_type: &str, log_level: LogLevel) {
81        let plural = if count == 1 { "" } else { "s" };
82        let message = format!("Received {count} {report_type}{plural}");
83
84        match log_level {
85            LogLevel::Off => {}
86            LogLevel::Trace => log::trace!("{message}"),
87            LogLevel::Debug => log::debug!("{message}"),
88            LogLevel::Info => log::info!("{message}"),
89            LogLevel::Warning => log::warn!("{message}"),
90            LogLevel::Error => log::error!("{message}"),
91        }
92    }
93
94    /// Creates a new [`BitmexExecutionClient`].
95    ///
96    /// # Errors
97    ///
98    /// Returns an error if either the HTTP or WebSocket client fail to construct.
99    pub fn new(core: ExecutionClientCore, config: BitmexExecClientConfig) -> anyhow::Result<Self> {
100        if !config.has_api_credentials() {
101            anyhow::bail!("BitMEX execution client requires API key and secret");
102        }
103
104        let trader_id = core.trader_id;
105        let account_id = config.account_id.unwrap_or(core.account_id);
106        let clock = get_atomic_clock_realtime();
107        let emitter =
108            ExecutionEventEmitter::new(clock, trader_id, account_id, AccountType::Margin, None);
109        let http_client = BitmexHttpClient::new(
110            Some(config.http_base_url()),
111            config.api_key.clone(),
112            config.api_secret.clone(),
113            config.use_testnet,
114            config.http_timeout_secs,
115            config.max_retries,
116            config.retry_delay_initial_ms,
117            config.retry_delay_max_ms,
118            config.recv_window_ms,
119            config.max_requests_per_second,
120            config.max_requests_per_minute,
121            config.http_proxy_url.clone(),
122        )
123        .context("failed to construct BitMEX HTTP client")?;
124        let ws_client = BitmexWebSocketClient::new(
125            Some(config.ws_url()),
126            config.api_key.clone(),
127            config.api_secret.clone(),
128            Some(account_id),
129            config.heartbeat_interval_secs,
130        )
131        .context("failed to construct BitMEX execution websocket client")?;
132
133        let pool_size = config.submitter_pool_size.unwrap_or(1);
134        let submitter_proxy_urls = match &config.submitter_proxy_urls {
135            Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
136            None => vec![config.http_proxy_url.clone(); pool_size],
137        };
138
139        let submitter_config = SubmitBroadcasterConfig {
140            pool_size,
141            api_key: config.api_key.clone(),
142            api_secret: config.api_secret.clone(),
143            base_url: config.base_url_http.clone(),
144            testnet: config.use_testnet,
145            timeout_secs: config.http_timeout_secs,
146            max_retries: config.max_retries,
147            retry_delay_ms: config.retry_delay_initial_ms,
148            retry_delay_max_ms: config.retry_delay_max_ms,
149            recv_window_ms: config.recv_window_ms,
150            max_requests_per_second: config.max_requests_per_second,
151            max_requests_per_minute: config.max_requests_per_minute,
152            proxy_urls: submitter_proxy_urls,
153            ..Default::default()
154        };
155
156        let _submitter = SubmitBroadcaster::new(submitter_config)
157            .context("failed to create SubmitBroadcaster")?;
158
159        let canceller_pool_size = config.canceller_pool_size.unwrap_or(1);
160        let canceller_proxy_urls = match &config.canceller_proxy_urls {
161            Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
162            None => vec![config.http_proxy_url.clone(); canceller_pool_size],
163        };
164
165        let canceller_config = CancelBroadcasterConfig {
166            pool_size: canceller_pool_size,
167            api_key: config.api_key.clone(),
168            api_secret: config.api_secret.clone(),
169            base_url: config.base_url_http.clone(),
170            testnet: config.use_testnet,
171            timeout_secs: config.http_timeout_secs,
172            max_retries: config.max_retries,
173            retry_delay_ms: config.retry_delay_initial_ms,
174            retry_delay_max_ms: config.retry_delay_max_ms,
175            recv_window_ms: config.recv_window_ms,
176            max_requests_per_second: config.max_requests_per_second,
177            max_requests_per_minute: config.max_requests_per_minute,
178            proxy_urls: canceller_proxy_urls,
179            ..Default::default()
180        };
181
182        let _canceller = CancelBroadcaster::new(canceller_config)
183            .context("failed to create CancelBroadcaster")?;
184
185        Ok(Self {
186            core,
187            clock,
188            config,
189            emitter,
190            http_client,
191            ws_client,
192            _submitter,
193            _canceller,
194            ws_stream_handle: None,
195            pending_tasks: Mutex::new(Vec::new()),
196        })
197    }
198
199    fn spawn_task<F>(&self, label: &'static str, fut: F)
200    where
201        F: Future<Output = anyhow::Result<()>> + Send + 'static,
202    {
203        let handle = get_runtime().spawn(async move {
204            if let Err(e) = fut.await {
205                log::error!("{label}: {e:?}");
206            }
207        });
208
209        let mut guard = self
210            .pending_tasks
211            .lock()
212            .expect("pending task lock poisoned");
213
214        // Remove completed tasks to prevent unbounded growth
215        guard.retain(|h| !h.is_finished());
216        guard.push(handle);
217    }
218
219    fn abort_pending_tasks(&self) {
220        let mut guard = self
221            .pending_tasks
222            .lock()
223            .expect("pending task lock poisoned");
224        for handle in guard.drain(..) {
225            handle.abort();
226        }
227    }
228
229    async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
230        if self.core.instruments_initialized() {
231            return Ok(());
232        }
233
234        let mut instruments: Vec<InstrumentAny> = {
235            let cache = self.core.cache();
236            cache
237                .instruments(&self.core.venue, None)
238                .into_iter()
239                .cloned()
240                .collect()
241        };
242
243        if instruments.is_empty() {
244            let http = self.http_client.clone();
245            instruments = http
246                .request_instruments(self.config.active_only)
247                .await
248                .context("failed to request BitMEX instruments")?;
249        } else {
250            log::debug!(
251                "Reusing {} cached BitMEX instruments for execution client initialization",
252                instruments.len()
253            );
254        }
255
256        instruments.sort_by_key(|instrument| instrument.id());
257
258        for instrument in &instruments {
259            self.http_client.cache_instrument(instrument.clone());
260            self._submitter.cache_instrument(instrument.clone());
261            self._canceller.cache_instrument(instrument.clone());
262        }
263
264        self.ws_client.cache_instruments(instruments);
265
266        self.core.set_instruments_initialized();
267        Ok(())
268    }
269
270    async fn refresh_account_state(&self) -> anyhow::Result<()> {
271        let account_state = self
272            .http_client
273            .request_account_state(self.core.account_id)
274            .await
275            .context("failed to request BitMEX account state")?;
276
277        self.emitter.send_account_state(account_state);
278        Ok(())
279    }
280
281    fn start_ws_stream(&mut self) -> anyhow::Result<()> {
282        if self.ws_stream_handle.is_some() {
283            return Ok(());
284        }
285
286        let stream = self.ws_client.stream();
287        let emitter = self.emitter.clone();
288
289        let handle = get_runtime().spawn(async move {
290            pin_mut!(stream);
291            while let Some(message) = stream.next().await {
292                dispatch_ws_message(message, &emitter);
293            }
294        });
295
296        self.ws_stream_handle = Some(handle);
297        Ok(())
298    }
299
300    fn submit_cached_order(
301        &self,
302        order: OrderAny,
303        submit_tries: Option<usize>,
304        peg_price_type: Option<BitmexPegPriceType>,
305        peg_offset_value: Option<f64>,
306        task_label: &'static str,
307    ) -> anyhow::Result<()> {
308        if order.is_closed() {
309            log::warn!("Cannot submit closed order {}", order.client_order_id());
310            return Ok(());
311        }
312
313        self.emitter.emit_order_submitted(&order);
314
315        let use_broadcaster = submit_tries.is_some_and(|n| n > 1);
316        let http_client = self.http_client.clone();
317        let submitter = self._submitter.clone_for_async();
318        let emitter = self.emitter.clone();
319        let clock = self.clock;
320        let strategy_id = order.strategy_id();
321        let instrument_id = order.instrument_id();
322        let client_order_id = order.client_order_id();
323        let order_side = order.order_side();
324        let order_type = order.order_type();
325        let quantity = order.quantity();
326        let time_in_force = order.time_in_force();
327        let price = order.price();
328        let trigger_price = order.trigger_price();
329        let trigger_type = order.trigger_type();
330        let trailing_offset = order.trailing_offset().and_then(|d| d.to_f64());
331        let trailing_offset_type = order.trailing_offset_type();
332        let display_qty = order.display_qty();
333        let post_only = order.is_post_only();
334        let reduce_only = order.is_reduce_only();
335        let order_list_id = order.order_list_id();
336        let contingency_type = order.contingency_type();
337
338        self.spawn_task(task_label, async move {
339            let result = if use_broadcaster {
340                submitter
341                    .broadcast_submit(
342                        instrument_id,
343                        client_order_id,
344                        order_side,
345                        order_type,
346                        quantity,
347                        time_in_force,
348                        price,
349                        trigger_price,
350                        trigger_type,
351                        trailing_offset,
352                        trailing_offset_type,
353                        display_qty,
354                        post_only,
355                        reduce_only,
356                        order_list_id,
357                        contingency_type,
358                        submit_tries,
359                        peg_price_type,
360                        peg_offset_value,
361                    )
362                    .await
363            } else {
364                http_client
365                    .submit_order(
366                        instrument_id,
367                        client_order_id,
368                        order_side,
369                        order_type,
370                        quantity,
371                        time_in_force,
372                        price,
373                        trigger_price,
374                        trigger_type,
375                        trailing_offset,
376                        trailing_offset_type,
377                        display_qty,
378                        post_only,
379                        reduce_only,
380                        order_list_id,
381                        contingency_type,
382                        peg_price_type,
383                        peg_offset_value,
384                    )
385                    .await
386            };
387
388            match result {
389                Ok(report) => emitter.send_order_status_report(report),
390                Err(e) => {
391                    let error_msg = e.to_string();
392
393                    // If all transports returned "Duplicate clOrdID", the order likely exists
394                    // but the success response was lost. Wait for WebSocket confirmation.
395                    if error_msg.contains("IDEMPOTENT_DUPLICATE") {
396                        log::warn!(
397                            "Order {client_order_id} may exist (duplicate clOrdID from all transports), \
398                             awaiting WebSocket confirmation",
399                        );
400                        return Ok(());
401                    }
402
403                    let ts_event = clock.get_time_ns();
404                    emitter.emit_order_rejected_event(
405                        strategy_id,
406                        instrument_id,
407                        client_order_id,
408                        &format!("submit-order-error: {error_msg}"),
409                        ts_event,
410                        post_only,
411                    );
412                }
413            }
414            Ok(())
415        });
416
417        Ok(())
418    }
419}
420
421#[async_trait(?Send)]
422impl ExecutionClient for BitmexExecutionClient {
423    fn is_connected(&self) -> bool {
424        self.core.is_connected()
425    }
426
427    fn client_id(&self) -> ClientId {
428        self.core.client_id
429    }
430
431    fn account_id(&self) -> AccountId {
432        self.core.account_id
433    }
434
435    fn venue(&self) -> Venue {
436        self.core.venue
437    }
438
439    fn oms_type(&self) -> OmsType {
440        self.core.oms_type
441    }
442
443    fn get_account(&self) -> Option<AccountAny> {
444        self.core.cache().account(&self.core.account_id).cloned()
445    }
446
447    fn generate_account_state(
448        &self,
449        balances: Vec<AccountBalance>,
450        margins: Vec<MarginBalance>,
451        reported: bool,
452        ts_event: UnixNanos,
453    ) -> anyhow::Result<()> {
454        self.emitter
455            .emit_account_state(balances, margins, reported, ts_event);
456        Ok(())
457    }
458
459    fn start(&mut self) -> anyhow::Result<()> {
460        if self.core.is_started() {
461            return Ok(());
462        }
463
464        self.emitter.set_sender(get_exec_event_sender());
465        self.core.set_started();
466        log::info!(
467            "BitMEX execution client started: client_id={}, account_id={}, use_testnet={}, submitter_pool_size={:?}, canceller_pool_size={:?}, http_proxy_url={:?}, ws_proxy_url={:?}, submitter_proxy_urls={:?}, canceller_proxy_urls={:?}",
468            self.core.client_id,
469            self.core.account_id,
470            self.config.use_testnet,
471            self.config.submitter_pool_size,
472            self.config.canceller_pool_size,
473            self.config.http_proxy_url,
474            self.config.ws_proxy_url,
475            self.config.submitter_proxy_urls,
476            self.config.canceller_proxy_urls,
477        );
478        Ok(())
479    }
480
481    fn stop(&mut self) -> anyhow::Result<()> {
482        if self.core.is_stopped() {
483            return Ok(());
484        }
485
486        self.core.set_stopped();
487        self.core.set_disconnected();
488        if let Some(handle) = self.ws_stream_handle.take() {
489            handle.abort();
490        }
491        self.abort_pending_tasks();
492        log::info!("BitMEX execution client {} stopped", self.core.client_id);
493        Ok(())
494    }
495
496    async fn connect(&mut self) -> anyhow::Result<()> {
497        if self.core.is_connected() {
498            return Ok(());
499        }
500
501        // Reset cancellation token so HTTP requests succeed after reconnect
502        self.http_client.reset_cancellation_token();
503
504        self.ensure_instruments_initialized_async().await?;
505
506        self.ws_client.connect().await?;
507        self.ws_client.wait_until_active(10.0).await?;
508
509        // Start submitter/canceller after WS connection succeeds
510        self._submitter.start().await?;
511        self._canceller.start().await?;
512
513        self.ws_client.subscribe_orders().await?;
514        self.ws_client.subscribe_executions().await?;
515        self.ws_client.subscribe_positions().await?;
516        self.ws_client.subscribe_wallet().await?;
517        if let Err(e) = self.ws_client.subscribe_margin().await {
518            log::debug!("Margin subscription unavailable: {e:?}");
519        }
520
521        self.start_ws_stream()?;
522        self.refresh_account_state().await?;
523
524        self.core.set_connected();
525        log::info!("Connected: client_id={}", self.core.client_id);
526        Ok(())
527    }
528
529    async fn disconnect(&mut self) -> anyhow::Result<()> {
530        if self.core.is_disconnected() {
531            return Ok(());
532        }
533
534        self.http_client.cancel_all_requests();
535        self._submitter.stop().await;
536        self._canceller.stop().await;
537
538        if let Err(e) = self.ws_client.close().await {
539            log::warn!("Error while closing BitMEX execution websocket: {e:?}");
540        }
541
542        if let Some(handle) = self.ws_stream_handle.take() {
543            handle.abort();
544        }
545
546        self.abort_pending_tasks();
547        self.core.set_disconnected();
548        log::info!("Disconnected: client_id={}", self.core.client_id);
549        Ok(())
550    }
551
552    async fn generate_order_status_report(
553        &self,
554        cmd: &GenerateOrderStatusReport,
555    ) -> anyhow::Result<Option<OrderStatusReport>> {
556        let instrument_id = cmd
557            .instrument_id
558            .context("BitMEX generate_order_status_report requires an instrument identifier")?;
559
560        self.http_client
561            .query_order(
562                instrument_id,
563                cmd.client_order_id,
564                cmd.venue_order_id.map(|id| VenueOrderId::from(id.as_str())),
565            )
566            .await
567            .context("failed to query BitMEX order status")
568    }
569
570    async fn generate_order_status_reports(
571        &self,
572        cmd: &GenerateOrderStatusReports,
573    ) -> anyhow::Result<Vec<OrderStatusReport>> {
574        let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
575        let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
576
577        let mut reports = self
578            .http_client
579            .request_order_status_reports(cmd.instrument_id, cmd.open_only, start_dt, end_dt, None)
580            .await
581            .context("failed to request BitMEX order status reports")?;
582
583        if let Some(start) = cmd.start {
584            reports.retain(|report| report.ts_last >= start);
585        }
586
587        if let Some(end) = cmd.end {
588            reports.retain(|report| report.ts_last <= end);
589        }
590
591        Self::log_report_receipt(reports.len(), "OrderStatusReport", cmd.log_receipt_level);
592
593        Ok(reports)
594    }
595
596    async fn generate_fill_reports(
597        &self,
598        cmd: GenerateFillReports,
599    ) -> anyhow::Result<Vec<FillReport>> {
600        let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
601        let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
602
603        let mut reports = self
604            .http_client
605            .request_fill_reports(cmd.instrument_id, start_dt, end_dt, None)
606            .await
607            .context("failed to request BitMEX fill reports")?;
608
609        if let Some(order_id) = cmd.venue_order_id {
610            reports.retain(|report| report.venue_order_id.as_str() == order_id.as_str());
611        }
612
613        if let Some(start) = cmd.start {
614            reports.retain(|report| report.ts_event >= start);
615        }
616
617        if let Some(end) = cmd.end {
618            reports.retain(|report| report.ts_event <= end);
619        }
620
621        Self::log_report_receipt(reports.len(), "FillReport", cmd.log_receipt_level);
622
623        Ok(reports)
624    }
625
626    async fn generate_position_status_reports(
627        &self,
628        cmd: &GeneratePositionStatusReports,
629    ) -> anyhow::Result<Vec<PositionStatusReport>> {
630        let mut reports = self
631            .http_client
632            .request_position_status_reports()
633            .await
634            .context("failed to request BitMEX position reports")?;
635
636        if let Some(instrument_id) = cmd.instrument_id {
637            reports.retain(|report| report.instrument_id == instrument_id);
638        }
639
640        if let Some(start) = cmd.start {
641            reports.retain(|report| report.ts_last >= start);
642        }
643
644        if let Some(end) = cmd.end {
645            reports.retain(|report| report.ts_last <= end);
646        }
647
648        Self::log_report_receipt(reports.len(), "PositionStatusReport", cmd.log_receipt_level);
649
650        Ok(reports)
651    }
652
653    async fn generate_mass_status(
654        &self,
655        lookback_mins: Option<u64>,
656    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
657        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
658
659        let ts_now = self.clock.get_time_ns();
660        let start = lookback_mins.map(|mins| {
661            let lookback_ns = mins.saturating_mul(60).saturating_mul(1_000_000_000);
662            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
663        });
664
665        let order_cmd = GenerateOrderStatusReportsBuilder::default()
666            .ts_init(ts_now)
667            .open_only(false)
668            .start(start)
669            .build()
670            .map_err(|e| anyhow::anyhow!("{e}"))?;
671
672        let fill_cmd = GenerateFillReportsBuilder::default()
673            .ts_init(ts_now)
674            .start(start)
675            .build()
676            .map_err(|e| anyhow::anyhow!("{e}"))?;
677
678        let position_cmd = GeneratePositionStatusReportsBuilder::default()
679            .ts_init(ts_now)
680            .start(start)
681            .build()
682            .map_err(|e| anyhow::anyhow!("{e}"))?;
683
684        let (order_reports, fill_reports, position_reports) = tokio::try_join!(
685            self.generate_order_status_reports(&order_cmd),
686            self.generate_fill_reports(fill_cmd),
687            self.generate_position_status_reports(&position_cmd),
688        )?;
689
690        let mut mass_status = ExecutionMassStatus::new(
691            self.core.client_id,
692            self.core.account_id,
693            self.core.venue,
694            ts_now,
695            None,
696        );
697        mass_status.add_order_reports(order_reports);
698        mass_status.add_fill_reports(fill_reports);
699        mass_status.add_position_reports(position_reports);
700
701        Ok(Some(mass_status))
702    }
703
704    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
705        let http_client = self.http_client.clone();
706        let emitter = self.emitter.clone();
707        let account_id = self.core.account_id;
708
709        self.spawn_task("query_account", async move {
710            match http_client.request_account_state(account_id).await {
711                Ok(account_state) => emitter.send_account_state(account_state),
712                Err(e) => log::error!("BitMEX query account failed: {e:?}"),
713            }
714            Ok(())
715        });
716
717        Ok(())
718    }
719
720    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
721        let http_client = self.http_client.clone();
722        let instrument_id = cmd.instrument_id;
723        let client_order_id = Some(cmd.client_order_id);
724        let venue_order_id = cmd.venue_order_id;
725        let emitter = self.emitter.clone();
726
727        self.spawn_task("query_order", async move {
728            match http_client
729                .request_order_status_report(instrument_id, client_order_id, venue_order_id)
730                .await
731            {
732                Ok(report) => emitter.send_order_status_report(report),
733                Err(e) => log::error!("BitMEX query order failed: {e:?}"),
734            }
735            Ok(())
736        });
737
738        Ok(())
739    }
740
741    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
742        let submit_tries = cmd
743            .params
744            .as_ref()
745            .and_then(|params| params.get("submit_tries"))
746            .and_then(|s| s.parse::<usize>().ok())
747            .filter(|&n| n > 0);
748
749        let peg_price_type = parse_peg_price_type(cmd.params.as_ref())?;
750        let peg_offset_value = parse_peg_offset_value(cmd.params.as_ref())?;
751
752        let order = self
753            .core
754            .cache()
755            .order(&cmd.client_order_id)
756            .cloned()
757            .ok_or_else(|| {
758                anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
759            })?;
760
761        self.submit_cached_order(
762            order,
763            submit_tries,
764            peg_price_type,
765            peg_offset_value,
766            "submit_order",
767        )
768    }
769
770    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
771        if cmd.order_list.client_order_ids.is_empty() {
772            log::debug!("submit_order_list called with empty order list");
773            return Ok(());
774        }
775
776        let submit_tries = cmd
777            .params
778            .as_ref()
779            .and_then(|params| params.get("submit_tries"))
780            .and_then(|s| s.parse::<usize>().ok())
781            .filter(|&n| n > 0);
782
783        let peg_price_type = parse_peg_price_type(cmd.params.as_ref())?;
784        let peg_offset_value = parse_peg_offset_value(cmd.params.as_ref())?;
785
786        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
787
788        log::info!(
789            "Submitting BitMEX order list: order_list_id={}, count={}",
790            cmd.order_list.id,
791            orders.len(),
792        );
793
794        for order in orders {
795            self.submit_cached_order(
796                order,
797                submit_tries,
798                peg_price_type,
799                peg_offset_value,
800                "submit_order_list_item",
801            )?;
802        }
803
804        Ok(())
805    }
806
807    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
808        let http_client = self.http_client.clone();
809        let emitter = self.emitter.clone();
810        let instrument_id = cmd.instrument_id;
811        let client_order_id = Some(cmd.client_order_id);
812        let venue_order_id = cmd.venue_order_id;
813        let quantity = cmd.quantity;
814        let price = cmd.price;
815        let trigger_price = cmd.trigger_price;
816
817        self.spawn_task("modify_order", async move {
818            match http_client
819                .modify_order(
820                    instrument_id,
821                    client_order_id,
822                    venue_order_id,
823                    quantity,
824                    price,
825                    trigger_price,
826                )
827                .await
828            {
829                Ok(report) => emitter.send_order_status_report(report),
830                Err(e) => log::error!("BitMEX modify order failed: {e:?}"),
831            }
832            Ok(())
833        });
834
835        Ok(())
836    }
837
838    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
839        let canceller = self._canceller.clone_for_async();
840        let emitter = self.emitter.clone();
841        let instrument_id = cmd.instrument_id;
842        let client_order_id = Some(cmd.client_order_id);
843        let venue_order_id = cmd.venue_order_id;
844
845        self.spawn_task("cancel_order", async move {
846            match canceller
847                .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
848                .await
849            {
850                Ok(Some(report)) => emitter.send_order_status_report(report),
851                Ok(None) => {
852                    // Idempotent success - order already cancelled
853                    log::debug!("Order already cancelled: {client_order_id:?}");
854                }
855                Err(e) => log::error!("BitMEX cancel order failed: {e:?}"),
856            }
857            Ok(())
858        });
859
860        Ok(())
861    }
862
863    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
864        let canceller = self._canceller.clone_for_async();
865        let emitter = self.emitter.clone();
866        let instrument_id = cmd.instrument_id;
867        let order_side = if cmd.order_side == OrderSide::NoOrderSide {
868            log::debug!(
869                "BitMEX cancel_all_orders received NoOrderSide for {instrument_id}, using unfiltered cancel-all",
870            );
871            None
872        } else {
873            Some(cmd.order_side)
874        };
875
876        self.spawn_task("cancel_all_orders", async move {
877            match canceller
878                .broadcast_cancel_all(instrument_id, order_side)
879                .await
880            {
881                Ok(reports) => {
882                    for report in reports {
883                        emitter.send_order_status_report(report);
884                    }
885                }
886                Err(e) => log::error!("BitMEX cancel all failed: {e:?}"),
887            }
888            Ok(())
889        });
890
891        Ok(())
892    }
893
894    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
895        let canceller = self._canceller.clone_for_async();
896        let emitter = self.emitter.clone();
897        let instrument_id = cmd.instrument_id;
898
899        let client_ids: Vec<ClientOrderId> = cmd
900            .cancels
901            .iter()
902            .map(|cancel| cancel.client_order_id)
903            .collect();
904
905        let venue_ids: Vec<VenueOrderId> = cmd
906            .cancels
907            .iter()
908            .filter_map(|cancel| cancel.venue_order_id)
909            .collect();
910
911        let client_ids_opt = if client_ids.is_empty() {
912            None
913        } else {
914            Some(client_ids)
915        };
916
917        let venue_ids_opt = if venue_ids.is_empty() {
918            None
919        } else {
920            Some(venue_ids)
921        };
922
923        self.spawn_task("batch_cancel_orders", async move {
924            match canceller
925                .broadcast_batch_cancel(instrument_id, client_ids_opt, venue_ids_opt)
926                .await
927            {
928                Ok(reports) => {
929                    for report in reports {
930                        emitter.send_order_status_report(report);
931                    }
932                }
933                Err(e) => log::error!("BitMEX batch cancel failed: {e:?}"),
934            }
935            Ok(())
936        });
937
938        Ok(())
939    }
940}
941
942/// Dispatches a WebSocket message using the event emitter.
943fn dispatch_ws_message(message: NautilusWsMessage, emitter: &ExecutionEventEmitter) {
944    match message {
945        NautilusWsMessage::OrderStatusReports(reports) => {
946            for report in reports {
947                emitter.send_order_status_report(report);
948            }
949        }
950        NautilusWsMessage::FillReports(reports) => {
951            for report in reports {
952                emitter.send_fill_report(report);
953            }
954        }
955        NautilusWsMessage::PositionStatusReports(reports) => {
956            for report in reports {
957                emitter.send_position_report(report);
958            }
959        }
960        NautilusWsMessage::AccountStates(states) => {
961            for state in states {
962                emitter.send_account_state(state);
963            }
964        }
965        NautilusWsMessage::OrderUpdated(event) => {
966            emitter.send_order_event(OrderEventAny::Updated(*event));
967        }
968        NautilusWsMessage::OrderUpdates(events) => {
969            for event in events {
970                emitter.send_order_event(OrderEventAny::Updated(event));
971            }
972        }
973        NautilusWsMessage::Data(_)
974        | NautilusWsMessage::Instruments(_)
975        | NautilusWsMessage::FundingRateUpdates(_) => {
976            log::debug!("Ignoring BitMEX data message on execution stream");
977        }
978        NautilusWsMessage::Reconnected => {
979            log::info!("BitMEX execution websocket reconnected");
980        }
981        NautilusWsMessage::Authenticated => {
982            log::debug!("BitMEX execution websocket authenticated");
983        }
984    }
985}
986
987fn parse_peg_price_type(
988    params: Option<&IndexMap<String, String>>,
989) -> anyhow::Result<Option<BitmexPegPriceType>> {
990    let value = params.and_then(|p| p.get("peg_price_type"));
991    match value {
992        Some(s) => BitmexPegPriceType::from_str(s)
993            .map(Some)
994            .map_err(|_| anyhow::anyhow!("Invalid peg_price_type: {s}")),
995        None => Ok(None),
996    }
997}
998
999fn parse_peg_offset_value(
1000    params: Option<&IndexMap<String, String>>,
1001) -> anyhow::Result<Option<f64>> {
1002    let value = params.and_then(|p| p.get("peg_offset_value"));
1003    match value {
1004        Some(s) => s
1005            .parse::<f64>()
1006            .map(Some)
1007            .map_err(|_| anyhow::anyhow!("Invalid peg_offset_value: {s}")),
1008        None => Ok(None),
1009    }
1010}