nautilus_bitmex/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the BitMEX adapter.
17
18pub mod canceller;
19
20use std::{any::Any, cell::Ref, future::Future, sync::Mutex};
21
22use anyhow::Context;
23use async_trait::async_trait;
24use futures_util::{StreamExt, pin_mut};
25use nautilus_common::{
26    clock::Clock,
27    messages::{
28        ExecutionEvent,
29        execution::{
30            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
31            GenerateOrderStatusReport, GeneratePositionReports, ModifyOrder, QueryAccount,
32            QueryOrder, SubmitOrder, SubmitOrderList,
33        },
34    },
35    msgbus,
36    runner::get_exec_event_sender,
37    runtime::get_runtime,
38};
39use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
40use nautilus_execution::client::{ExecutionClient, LiveExecutionClient, base::ExecutionClientCore};
41use nautilus_live::execution::LiveExecutionClientExt;
42use nautilus_model::{
43    events::{AccountState, OrderEventAny, OrderRejected},
44    identifiers::{AccountId, VenueOrderId},
45    instruments::Instrument,
46    orders::Order,
47    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48};
49use tokio::task::JoinHandle;
50
51use crate::{
52    config::BitmexExecClientConfig,
53    http::client::BitmexHttpClient,
54    websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
55};
56
57#[derive(Debug)]
58pub struct BitmexExecutionClient {
59    core: ExecutionClientCore,
60    config: BitmexExecClientConfig,
61    http_client: BitmexHttpClient,
62    ws_client: BitmexWebSocketClient,
63    started: bool,
64    connected: bool,
65    instruments_initialized: bool,
66    ws_stream_handle: Option<JoinHandle<()>>,
67    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
68}
69
70impl BitmexExecutionClient {
71    /// Creates a new [`BitmexExecutionClient`].
72    ///
73    /// # Errors
74    ///
75    /// Returns an error if either the HTTP or WebSocket client fail to construct.
76    pub fn new(core: ExecutionClientCore, config: BitmexExecClientConfig) -> anyhow::Result<Self> {
77        if !config.has_api_credentials() {
78            anyhow::bail!("BitMEX execution client requires API key and secret");
79        }
80
81        let http_client = BitmexHttpClient::new(
82            Some(config.http_base_url()),
83            config.api_key.clone(),
84            config.api_secret.clone(),
85            config.use_testnet,
86            config.http_timeout_secs,
87            config.max_retries,
88            config.retry_delay_initial_ms,
89            config.retry_delay_max_ms,
90            config.recv_window_ms,
91            config.max_requests_per_second,
92            config.max_requests_per_minute,
93        )
94        .context("failed to construct BitMEX HTTP client")?;
95
96        let account_id = config.account_id.unwrap_or(core.account_id);
97        let ws_client = BitmexWebSocketClient::new(
98            Some(config.ws_url()),
99            config.api_key.clone(),
100            config.api_secret.clone(),
101            Some(account_id),
102            config.heartbeat_interval_secs,
103        )
104        .context("failed to construct BitMEX execution websocket client")?;
105
106        Ok(Self {
107            core,
108            config,
109            http_client,
110            ws_client,
111            started: false,
112            connected: false,
113            instruments_initialized: false,
114            ws_stream_handle: None,
115            pending_tasks: Mutex::new(Vec::new()),
116        })
117    }
118
119    fn spawn_task<F>(&self, label: &'static str, fut: F)
120    where
121        F: Future<Output = anyhow::Result<()>> + Send + 'static,
122    {
123        let handle = tokio::spawn(async move {
124            if let Err(err) = fut.await {
125                tracing::error!("{label}: {err:?}");
126            }
127        });
128
129        self.pending_tasks
130            .lock()
131            .expect("pending task lock poisoned")
132            .push(handle);
133    }
134
135    fn abort_pending_tasks(&self) {
136        let mut guard = self
137            .pending_tasks
138            .lock()
139            .expect("pending task lock poisoned");
140        for handle in guard.drain(..) {
141            handle.abort();
142        }
143    }
144
145    async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
146        if self.instruments_initialized {
147            return Ok(());
148        }
149
150        let http = self.http_client.clone();
151        let mut instruments = http
152            .request_instruments(self.config.active_only)
153            .await
154            .context("failed to request BitMEX instruments")?;
155
156        instruments.sort_by_key(|instrument| instrument.id());
157
158        for instrument in &instruments {
159            self.http_client.add_instrument(instrument.clone());
160        }
161
162        self.ws_client.initialize_instruments_cache(instruments);
163
164        self.instruments_initialized = true;
165        Ok(())
166    }
167
168    fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
169        if self.instruments_initialized {
170            return Ok(());
171        }
172
173        let runtime = get_runtime();
174        runtime.block_on(self.ensure_instruments_initialized_async())
175    }
176
177    async fn refresh_account_state(&self) -> anyhow::Result<()> {
178        let account_state = self
179            .http_client
180            .request_account_state(self.core.account_id)
181            .await
182            .context("failed to request BitMEX account state")?;
183
184        dispatch_account_state(account_state);
185        Ok(())
186    }
187
188    fn update_account_state(&self) -> anyhow::Result<()> {
189        let runtime = get_runtime();
190        runtime.block_on(self.refresh_account_state())
191    }
192
193    fn start_ws_stream(&mut self) -> anyhow::Result<()> {
194        if self.ws_stream_handle.is_some() {
195            return Ok(());
196        }
197
198        let stream = self.ws_client.stream();
199        let handle = tokio::spawn(async move {
200            pin_mut!(stream);
201            while let Some(message) = stream.next().await {
202                dispatch_ws_message(message);
203            }
204        });
205
206        self.ws_stream_handle = Some(handle);
207        Ok(())
208    }
209}
210
211impl ExecutionClient for BitmexExecutionClient {
212    fn is_connected(&self) -> bool {
213        self.connected
214    }
215
216    fn client_id(&self) -> nautilus_model::identifiers::ClientId {
217        self.core.client_id
218    }
219
220    fn account_id(&self) -> AccountId {
221        self.core.account_id
222    }
223
224    fn venue(&self) -> nautilus_model::identifiers::Venue {
225        self.core.venue
226    }
227
228    fn oms_type(&self) -> nautilus_model::enums::OmsType {
229        self.core.oms_type
230    }
231
232    fn get_account(&self) -> Option<nautilus_model::accounts::AccountAny> {
233        self.core.get_account()
234    }
235
236    fn generate_account_state(
237        &self,
238        balances: Vec<nautilus_model::types::AccountBalance>,
239        margins: Vec<nautilus_model::types::MarginBalance>,
240        reported: bool,
241        ts_event: UnixNanos,
242    ) -> anyhow::Result<()> {
243        self.core
244            .generate_account_state(balances, margins, reported, ts_event)
245    }
246
247    fn start(&mut self) -> anyhow::Result<()> {
248        if self.started {
249            return Ok(());
250        }
251
252        self.ensure_instruments_initialized()?;
253        self.started = true;
254        tracing::info!("BitMEX execution client {} started", self.core.client_id);
255        Ok(())
256    }
257
258    fn stop(&mut self) -> anyhow::Result<()> {
259        if !self.started {
260            return Ok(());
261        }
262
263        self.started = false;
264        self.connected = false;
265        if let Some(handle) = self.ws_stream_handle.take() {
266            handle.abort();
267        }
268        self.abort_pending_tasks();
269        tracing::info!("BitMEX execution client {} stopped", self.core.client_id);
270        Ok(())
271    }
272
273    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
274        let order = cmd.order.clone();
275
276        if order.is_closed() {
277            tracing::warn!("Cannot submit closed order {}", order.client_order_id());
278            return Ok(());
279        }
280
281        self.core.generate_order_submitted(
282            order.strategy_id(),
283            order.instrument_id(),
284            order.client_order_id(),
285            cmd.ts_init,
286        );
287
288        let http_client = self.http_client.clone();
289        let trader_id = self.core.trader_id;
290        let strategy_id = order.strategy_id();
291        let instrument_id = order.instrument_id();
292        let account_id = self.core.account_id;
293        let client_order_id = order.client_order_id();
294        let order_side = order.order_side();
295        let order_type = order.order_type();
296        let quantity = order.quantity();
297        let time_in_force = order.time_in_force();
298        let price = order.price();
299        let trigger_price = order.trigger_price();
300        let trigger_type = order.trigger_type();
301        let display_qty = order.display_qty();
302        let post_only = order.is_post_only();
303        let reduce_only = order.is_reduce_only();
304        let order_list_id = order.order_list_id();
305        let contingency_type = order.contingency_type();
306        let ts_event = cmd.ts_init;
307
308        self.spawn_task("submit_order", async move {
309            match http_client
310                .submit_order(
311                    instrument_id,
312                    client_order_id,
313                    order_side,
314                    order_type,
315                    quantity,
316                    time_in_force,
317                    price,
318                    trigger_price,
319                    trigger_type,
320                    display_qty,
321                    post_only,
322                    reduce_only,
323                    order_list_id,
324                    contingency_type,
325                )
326                .await
327            {
328                Ok(report) => dispatch_order_status_report(report),
329                Err(err) => {
330                    let event = OrderRejected::new(
331                        trader_id,
332                        strategy_id,
333                        instrument_id,
334                        client_order_id,
335                        account_id,
336                        format!("submit-order-error: {err}").into(),
337                        UUID4::new(),
338                        ts_event,
339                        get_atomic_clock_realtime().get_time_ns(),
340                        false,
341                        post_only,
342                    );
343                    dispatch_order_event(OrderEventAny::Rejected(event));
344                }
345            }
346            Ok(())
347        });
348
349        Ok(())
350    }
351
352    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
353        tracing::warn!(
354            "submit_order_list not yet implemented for BitMEX execution client ({} orders)",
355            cmd.order_list.orders.len()
356        );
357        Ok(())
358    }
359
360    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
361        let http_client = self.http_client.clone();
362        let instrument_id = cmd.instrument_id;
363        let client_order_id = Some(cmd.client_order_id);
364        let venue_order_id = Some(cmd.venue_order_id);
365        let quantity = cmd.quantity;
366        let price = cmd.price;
367        let trigger_price = cmd.trigger_price;
368
369        self.spawn_task("modify_order", async move {
370            match http_client
371                .modify_order(
372                    instrument_id,
373                    client_order_id,
374                    venue_order_id,
375                    quantity,
376                    price,
377                    trigger_price,
378                )
379                .await
380            {
381                Ok(report) => dispatch_order_status_report(report),
382                Err(err) => tracing::error!("BitMEX modify order failed: {err:?}"),
383            }
384            Ok(())
385        });
386
387        Ok(())
388    }
389
390    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
391        let http_client = self.http_client.clone();
392        let instrument_id = cmd.instrument_id;
393        let client_order_id = Some(cmd.client_order_id);
394        let venue_order_id = Some(cmd.venue_order_id);
395
396        self.spawn_task("cancel_order", async move {
397            match http_client
398                .cancel_order(instrument_id, client_order_id, venue_order_id)
399                .await
400            {
401                Ok(report) => dispatch_order_status_report(report),
402                Err(err) => tracing::error!("BitMEX cancel order failed: {err:?}"),
403            }
404            Ok(())
405        });
406
407        Ok(())
408    }
409
410    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
411        let http_client = self.http_client.clone();
412        let instrument_id = cmd.instrument_id;
413        let order_side = Some(cmd.order_side);
414
415        self.spawn_task("cancel_all_orders", async move {
416            match http_client
417                .cancel_all_orders(instrument_id, order_side)
418                .await
419            {
420                Ok(reports) => {
421                    for report in reports {
422                        dispatch_order_status_report(report);
423                    }
424                }
425                Err(err) => tracing::error!("BitMEX cancel all failed: {err:?}"),
426            }
427            Ok(())
428        });
429
430        Ok(())
431    }
432
433    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
434        let http_client = self.http_client.clone();
435        let instrument_id = cmd.instrument_id;
436        let venue_ids: Vec<VenueOrderId> = cmd
437            .cancels
438            .iter()
439            .map(|cancel| cancel.venue_order_id)
440            .collect();
441
442        self.spawn_task("batch_cancel_orders", async move {
443            match http_client
444                .cancel_orders(instrument_id, None, Some(venue_ids))
445                .await
446            {
447                Ok(reports) => {
448                    for report in reports {
449                        dispatch_order_status_report(report);
450                    }
451                }
452                Err(err) => tracing::error!("BitMEX batch cancel failed: {err:?}"),
453            }
454            Ok(())
455        });
456
457        Ok(())
458    }
459
460    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
461        self.update_account_state()
462    }
463
464    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
465        let http_client = self.http_client.clone();
466        let instrument_id = cmd.instrument_id;
467        let client_order_id = Some(cmd.client_order_id);
468        let venue_order_id = Some(cmd.venue_order_id);
469
470        self.spawn_task("query_order", async move {
471            match http_client
472                .request_order_status_report(instrument_id, client_order_id, venue_order_id)
473                .await
474            {
475                Ok(report) => dispatch_order_status_report(report),
476                Err(err) => tracing::error!("BitMEX query order failed: {err:?}"),
477            }
478            Ok(())
479        });
480
481        Ok(())
482    }
483}
484
485#[async_trait(?Send)]
486impl LiveExecutionClient for BitmexExecutionClient {
487    async fn connect(&mut self) -> anyhow::Result<()> {
488        if self.connected {
489            return Ok(());
490        }
491
492        self.ensure_instruments_initialized_async().await?;
493
494        self.ws_client.connect().await?;
495        self.ws_client.wait_until_active(10.0).await?;
496
497        self.ws_client.subscribe_orders().await?;
498        self.ws_client.subscribe_executions().await?;
499        self.ws_client.subscribe_positions().await?;
500        self.ws_client.subscribe_wallet().await?;
501        if let Err(err) = self.ws_client.subscribe_margin().await {
502            tracing::debug!("Margin subscription unavailable: {err:?}");
503        }
504
505        self.start_ws_stream()?;
506        self.refresh_account_state().await?;
507
508        self.connected = true;
509        self.core.set_connected(true);
510        tracing::info!("BitMEX execution client {} connected", self.core.client_id);
511        Ok(())
512    }
513
514    async fn disconnect(&mut self) -> anyhow::Result<()> {
515        if !self.connected {
516            return Ok(());
517        }
518
519        self.http_client.cancel_all_requests();
520        if let Err(err) = self.ws_client.close().await {
521            tracing::warn!("Error while closing BitMEX execution websocket: {err:?}");
522        }
523
524        if let Some(handle) = self.ws_stream_handle.take() {
525            handle.abort();
526        }
527
528        self.abort_pending_tasks();
529        self.connected = false;
530        self.core.set_connected(false);
531        tracing::info!(
532            "BitMEX execution client {} disconnected",
533            self.core.client_id
534        );
535        Ok(())
536    }
537
538    async fn generate_order_status_report(
539        &self,
540        cmd: &GenerateOrderStatusReport,
541    ) -> anyhow::Result<Option<OrderStatusReport>> {
542        let instrument_id = cmd
543            .instrument_id
544            .context("BitMEX generate_order_status_report requires an instrument identifier")?;
545
546        self.http_client
547            .query_order(
548                instrument_id,
549                cmd.client_order_id,
550                cmd.venue_order_id.map(|id| VenueOrderId::from(id.as_str())),
551            )
552            .await
553            .context("failed to query BitMEX order status")
554    }
555
556    async fn generate_order_status_reports(
557        &self,
558        cmd: &GenerateOrderStatusReport,
559    ) -> anyhow::Result<Vec<OrderStatusReport>> {
560        let reports = self
561            .http_client
562            .request_order_status_reports(cmd.instrument_id, false, None)
563            .await
564            .context("failed to request BitMEX order status reports")?;
565        Ok(reports)
566    }
567
568    async fn generate_fill_reports(
569        &self,
570        cmd: GenerateFillReports,
571    ) -> anyhow::Result<Vec<FillReport>> {
572        let mut reports = self
573            .http_client
574            .request_fill_reports(cmd.instrument_id, None)
575            .await
576            .context("failed to request BitMEX fill reports")?;
577
578        if let Some(order_id) = cmd.venue_order_id {
579            reports.retain(|report| report.venue_order_id.as_str() == order_id.as_str());
580        }
581
582        Ok(reports)
583    }
584
585    async fn generate_position_status_reports(
586        &self,
587        cmd: &GeneratePositionReports,
588    ) -> anyhow::Result<Vec<PositionStatusReport>> {
589        let mut reports = self
590            .http_client
591            .request_position_status_reports()
592            .await
593            .context("failed to request BitMEX position reports")?;
594
595        if let Some(instrument_id) = cmd.instrument_id {
596            reports.retain(|report| report.instrument_id == instrument_id);
597        }
598
599        Ok(reports)
600    }
601
602    async fn generate_mass_status(
603        &self,
604        _lookback_mins: Option<u64>,
605    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
606        tracing::warn!("generate_mass_status not yet implemented for BitMEX execution client");
607        Ok(None)
608    }
609}
610
611fn dispatch_ws_message(message: NautilusWsMessage) {
612    match message {
613        NautilusWsMessage::OrderStatusReports(reports) => {
614            for report in reports {
615                dispatch_order_status_report(report);
616            }
617        }
618        NautilusWsMessage::FillReports(reports) => {
619            for report in reports {
620                dispatch_fill_report(report);
621            }
622        }
623        NautilusWsMessage::PositionStatusReport(report) => {
624            dispatch_position_status_report(report);
625        }
626        NautilusWsMessage::AccountState(state) => dispatch_account_state(state),
627        NautilusWsMessage::OrderUpdated(event) => {
628            dispatch_order_event(OrderEventAny::Updated(event));
629        }
630        NautilusWsMessage::Data(_) | NautilusWsMessage::FundingRateUpdates(_) => {
631            tracing::debug!("Ignoring BitMEX data message on execution stream");
632        }
633        NautilusWsMessage::Reconnected => {
634            tracing::info!("BitMEX execution websocket reconnected");
635        }
636    }
637}
638
639fn dispatch_account_state(state: AccountState) {
640    msgbus::send_any("Portfolio.update_account".into(), &state as &dyn Any);
641}
642
643fn dispatch_order_status_report(report: OrderStatusReport) {
644    let sender = get_exec_event_sender();
645    let exec_report = nautilus_common::messages::ExecutionReport::OrderStatus(Box::new(report));
646    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
647        tracing::warn!("Failed to send order status report: {e}");
648    }
649}
650
651fn dispatch_fill_report(report: FillReport) {
652    let sender = get_exec_event_sender();
653    let exec_report = nautilus_common::messages::ExecutionReport::Fill(Box::new(report));
654    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
655        tracing::warn!("Failed to send fill report: {e}");
656    }
657}
658
659fn dispatch_position_status_report(report: PositionStatusReport) {
660    let sender = get_exec_event_sender();
661    let exec_report = nautilus_common::messages::ExecutionReport::Position(Box::new(report));
662    if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
663        tracing::warn!("Failed to send position status report: {e}");
664    }
665}
666
667fn dispatch_order_event(event: OrderEventAny) {
668    let sender = get_exec_event_sender();
669    if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
670        tracing::warn!("Failed to send order event: {e}");
671    }
672}
673
674impl LiveExecutionClientExt for BitmexExecutionClient {
675    fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ExecutionEvent> {
676        get_exec_event_sender()
677    }
678
679    fn get_clock(&self) -> Ref<'_, dyn Clock> {
680        self.core.clock().borrow()
681    }
682}