nautilus_bitmex/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the BitMEX adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc, RwLock,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use futures_util::StreamExt;
29use nautilus_common::{
30    live::{runner::get_data_event_sender, runtime::get_runtime},
31    messages::{
32        DataEvent,
33        data::{
34            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
35            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
36            SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots,
37            SubscribeFundingRates, SubscribeIndexPrices, SubscribeMarkPrices, SubscribeQuotes,
38            SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
39            UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeFundingRates,
40            UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
41        },
42    },
43};
44use nautilus_core::{
45    datetime::datetime_to_unix_nanos,
46    time::{AtomicTime, get_atomic_clock_realtime},
47};
48use nautilus_data::client::DataClient;
49use nautilus_model::{
50    data::Data,
51    enums::BookType,
52    identifiers::{ClientId, InstrumentId, Venue},
53    instruments::{Instrument, InstrumentAny},
54};
55use tokio::{task::JoinHandle, time::Duration};
56use tokio_util::sync::CancellationToken;
57
58use crate::{
59    common::consts::BITMEX_VENUE,
60    config::BitmexDataClientConfig,
61    http::client::BitmexHttpClient,
62    websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
63};
64
65#[derive(Clone, Copy, Debug, Eq, PartialEq)]
66enum BitmexBookChannel {
67    OrderBookL2,
68    OrderBookL2_25,
69    OrderBook10,
70}
71
72#[derive(Debug)]
73pub struct BitmexDataClient {
74    client_id: ClientId,
75    config: BitmexDataClientConfig,
76    http_client: BitmexHttpClient,
77    ws_client: Option<BitmexWebSocketClient>,
78    is_connected: AtomicBool,
79    cancellation_token: CancellationToken,
80    tasks: Vec<JoinHandle<()>>,
81    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
82    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
83    book_channels: Arc<RwLock<AHashMap<InstrumentId, BitmexBookChannel>>>,
84    clock: &'static AtomicTime,
85    instrument_refresh_active: bool,
86}
87
88impl BitmexDataClient {
89    /// Creates a new [`BitmexDataClient`] instance.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if the HTTP client cannot be constructed.
94    pub fn new(client_id: ClientId, config: BitmexDataClientConfig) -> anyhow::Result<Self> {
95        let clock = get_atomic_clock_realtime();
96        let data_sender = get_data_event_sender();
97
98        let http_client = BitmexHttpClient::new(
99            Some(config.http_base_url()),
100            config.api_key.clone(),
101            config.api_secret.clone(),
102            config.use_testnet,
103            config.http_timeout_secs,
104            config.max_retries,
105            config.retry_delay_initial_ms,
106            config.retry_delay_max_ms,
107            config.recv_window_ms,
108            config.max_requests_per_second,
109            config.max_requests_per_minute,
110            config.http_proxy_url.clone(),
111        )
112        .context("failed to construct BitMEX HTTP client")?;
113
114        Ok(Self {
115            client_id,
116            config,
117            http_client,
118            ws_client: None,
119            is_connected: AtomicBool::new(false),
120            cancellation_token: CancellationToken::new(),
121            tasks: Vec::new(),
122            data_sender,
123            instruments: Arc::new(RwLock::new(AHashMap::new())),
124            book_channels: Arc::new(RwLock::new(AHashMap::new())),
125            clock,
126            instrument_refresh_active: false,
127        })
128    }
129
130    fn venue(&self) -> Venue {
131        *BITMEX_VENUE
132    }
133
134    fn ws_client(&self) -> anyhow::Result<&BitmexWebSocketClient> {
135        self.ws_client
136            .as_ref()
137            .context("websocket client not initialized; call connect first")
138    }
139
140    fn ws_client_mut(&mut self) -> anyhow::Result<&mut BitmexWebSocketClient> {
141        self.ws_client
142            .as_mut()
143            .context("websocket client not initialized; call connect first")
144    }
145
146    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
147        if let Err(e) = sender.send(DataEvent::Data(data)) {
148            tracing::error!("Failed to emit data event: {e}");
149        }
150    }
151
152    fn spawn_ws<F>(&self, fut: F, context: &'static str)
153    where
154        F: Future<Output = anyhow::Result<()>> + Send + 'static,
155    {
156        get_runtime().spawn(async move {
157            if let Err(e) = fut.await {
158                tracing::error!("{context}: {e:?}");
159            }
160        });
161    }
162
163    fn spawn_stream_task(
164        &mut self,
165        stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
166    ) -> anyhow::Result<()> {
167        let data_sender = self.data_sender.clone();
168        let instruments = Arc::clone(&self.instruments);
169        let cancellation = self.cancellation_token.clone();
170
171        let handle = get_runtime().spawn(async move {
172            tokio::pin!(stream);
173
174            loop {
175                tokio::select! {
176                    maybe_msg = stream.next() => {
177                        match maybe_msg {
178                            Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
179                            None => {
180                                tracing::debug!("BitMEX websocket stream ended");
181                                break;
182                            }
183                        }
184                    }
185                    _ = cancellation.cancelled() => {
186                        tracing::debug!("BitMEX websocket stream task cancelled");
187                        break;
188                    }
189                }
190            }
191        });
192
193        self.tasks.push(handle);
194        Ok(())
195    }
196
197    fn handle_ws_message(
198        message: NautilusWsMessage,
199        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
200        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
201    ) {
202        match message {
203            NautilusWsMessage::Data(payloads) => {
204                for data in payloads {
205                    Self::send_data(sender, data);
206                }
207            }
208            NautilusWsMessage::Instruments(insts) => {
209                let mut guard = instruments.write().expect("instrument cache lock poisoned");
210                for instrument in insts {
211                    let instrument_id = instrument.id();
212                    guard.insert(instrument_id, instrument);
213                }
214                // TODO: Send instruments to data engine
215                let _ = sender;
216            }
217            NautilusWsMessage::FundingRateUpdates(updates) => {
218                for update in updates {
219                    tracing::debug!(
220                        instrument = %update.instrument_id,
221                        rate = %update.rate,
222                        "Funding rate update received (not forwarded)",
223                    );
224                }
225            }
226            NautilusWsMessage::OrderStatusReports(_)
227            | NautilusWsMessage::OrderUpdated(_)
228            | NautilusWsMessage::FillReports(_)
229            | NautilusWsMessage::PositionStatusReport(_)
230            | NautilusWsMessage::AccountState(_) => {
231                tracing::debug!("Ignoring trading message on data client");
232            }
233            NautilusWsMessage::Reconnected => {
234                tracing::info!("BitMEX websocket reconnected");
235            }
236            NautilusWsMessage::Authenticated => {
237                tracing::debug!("BitMEX websocket authenticated");
238            }
239        }
240    }
241
242    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
243        let http = self.http_client.clone();
244        let mut instruments = http
245            .request_instruments(self.config.active_only)
246            .await
247            .context("failed to request BitMEX instruments")?;
248
249        instruments.sort_by_key(|instrument| instrument.id());
250
251        {
252            let mut guard = self
253                .instruments
254                .write()
255                .expect("instrument cache lock poisoned");
256            guard.clear();
257            for instrument in &instruments {
258                guard.insert(instrument.id(), instrument.clone());
259            }
260        }
261
262        for instrument in &instruments {
263            self.http_client.cache_instrument(instrument.clone());
264        }
265
266        Ok(instruments)
267    }
268
269    fn is_connected(&self) -> bool {
270        self.is_connected.load(Ordering::Relaxed)
271    }
272
273    fn is_disconnected(&self) -> bool {
274        !self.is_connected()
275    }
276
277    fn maybe_spawn_instrument_refresh(&mut self) -> anyhow::Result<()> {
278        let Some(minutes) = self.config.update_instruments_interval_mins else {
279            return Ok(());
280        };
281
282        if minutes == 0 || self.instrument_refresh_active {
283            return Ok(());
284        }
285
286        let interval_secs = minutes.saturating_mul(60);
287        if interval_secs == 0 {
288            return Ok(());
289        }
290
291        let interval = Duration::from_secs(interval_secs);
292        let cancellation = self.cancellation_token.clone();
293        let instruments_cache = Arc::clone(&self.instruments);
294        let active_only = self.config.active_only;
295        let client_id = self.client_id;
296        let http_client = self.http_client.clone();
297
298        let handle = get_runtime().spawn(async move {
299            let http_client = http_client;
300            loop {
301                let sleep = tokio::time::sleep(interval);
302                tokio::pin!(sleep);
303                tokio::select! {
304                    _ = cancellation.cancelled() => {
305                        tracing::debug!("BitMEX instrument refresh task cancelled");
306                        break;
307                    }
308                    _ = &mut sleep => {
309                        match http_client.request_instruments(active_only).await {
310                            Ok(mut instruments) => {
311                                instruments.sort_by_key(|instrument| instrument.id());
312
313                                {
314                                    let mut guard = instruments_cache
315                                        .write()
316                                        .expect("instrument cache lock poisoned");
317                                    guard.clear();
318                                    for instrument in &instruments {
319                                        guard.insert(instrument.id(), instrument.clone());
320                                    }
321                                }
322
323                                for instrument in instruments {
324                                    http_client.cache_instrument(instrument);
325                                }
326
327                                tracing::debug!(client_id=%client_id, "BitMEX instruments refreshed");
328                            }
329                            Err(e) => {
330                                tracing::warn!(client_id=%client_id, error=?e, "Failed to refresh BitMEX instruments");
331                            }
332                        }
333                    }
334                }
335            }
336        });
337
338        self.tasks.push(handle);
339        self.instrument_refresh_active = true;
340        Ok(())
341    }
342}
343
344#[async_trait::async_trait(?Send)]
345impl DataClient for BitmexDataClient {
346    fn client_id(&self) -> ClientId {
347        self.client_id
348    }
349
350    fn venue(&self) -> Option<Venue> {
351        Some(self.venue())
352    }
353
354    fn start(&mut self) -> anyhow::Result<()> {
355        tracing::info!(
356            client_id = %self.client_id,
357            use_testnet = self.config.use_testnet,
358            http_proxy_url = ?self.config.http_proxy_url,
359            ws_proxy_url = ?self.config.ws_proxy_url,
360            "Starting BitMEX data client"
361        );
362        Ok(())
363    }
364
365    fn stop(&mut self) -> anyhow::Result<()> {
366        tracing::info!("Stopping BitMEX data client {id}", id = self.client_id);
367        self.cancellation_token.cancel();
368        self.is_connected.store(false, Ordering::Relaxed);
369        self.instrument_refresh_active = false;
370        Ok(())
371    }
372
373    fn reset(&mut self) -> anyhow::Result<()> {
374        tracing::debug!("Resetting BitMEX data client {id}", id = self.client_id);
375        self.is_connected.store(false, Ordering::Relaxed);
376        self.cancellation_token = CancellationToken::new();
377        self.tasks.clear();
378        self.book_channels
379            .write()
380            .expect("book channel cache lock poisoned")
381            .clear();
382        self.instrument_refresh_active = false;
383        Ok(())
384    }
385
386    fn dispose(&mut self) -> anyhow::Result<()> {
387        self.stop()
388    }
389
390    async fn connect(&mut self) -> anyhow::Result<()> {
391        if self.is_connected() {
392            return Ok(());
393        }
394
395        if self.ws_client.is_none() {
396            let ws = BitmexWebSocketClient::new(
397                Some(self.config.ws_url()),
398                self.config.api_key.clone(),
399                self.config.api_secret.clone(),
400                None,
401                self.config.heartbeat_interval_secs,
402            )
403            .context("failed to construct BitMEX websocket client")?;
404            self.ws_client = Some(ws);
405        }
406
407        let instruments = self.bootstrap_instruments().await?;
408        if let Some(ws) = self.ws_client.as_mut() {
409            ws.cache_instruments(instruments);
410        }
411
412        let ws = self.ws_client_mut()?;
413        ws.connect()
414            .await
415            .context("failed to connect BitMEX websocket")?;
416        ws.wait_until_active(10.0)
417            .await
418            .context("BitMEX websocket did not become active")?;
419
420        let stream = ws.stream();
421        self.spawn_stream_task(stream)?;
422        self.maybe_spawn_instrument_refresh()?;
423
424        self.is_connected.store(true, Ordering::Relaxed);
425        tracing::info!("Connected");
426        Ok(())
427    }
428
429    async fn disconnect(&mut self) -> anyhow::Result<()> {
430        if self.is_disconnected() {
431            return Ok(());
432        }
433
434        self.cancellation_token.cancel();
435
436        if let Some(ws) = self.ws_client.as_mut()
437            && let Err(e) = ws.close().await
438        {
439            tracing::warn!("Error while closing BitMEX websocket: {e:?}");
440        }
441
442        for handle in self.tasks.drain(..) {
443            if let Err(e) = handle.await {
444                tracing::error!("Error joining websocket task: {e:?}");
445            }
446        }
447
448        self.cancellation_token = CancellationToken::new();
449        self.is_connected.store(false, Ordering::Relaxed);
450        self.book_channels
451            .write()
452            .expect("book channel cache lock poisoned")
453            .clear();
454        self.instrument_refresh_active = false;
455
456        tracing::info!("Disconnected");
457        Ok(())
458    }
459
460    fn is_connected(&self) -> bool {
461        self.is_connected()
462    }
463
464    fn is_disconnected(&self) -> bool {
465        self.is_disconnected()
466    }
467
468    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
469        if cmd.book_type != BookType::L2_MBP {
470            anyhow::bail!("BitMEX only supports L2_MBP order book deltas");
471        }
472
473        let instrument_id = cmd.instrument_id;
474        let depth = cmd.depth.map_or(0, |d| d.get());
475        let channel = if depth > 0 && depth <= 25 {
476            BitmexBookChannel::OrderBookL2_25
477        } else {
478            BitmexBookChannel::OrderBookL2
479        };
480
481        let ws = self.ws_client()?.clone();
482        let book_channels = Arc::clone(&self.book_channels);
483
484        self.spawn_ws(
485            async move {
486                match channel {
487                    BitmexBookChannel::OrderBookL2 => ws
488                        .subscribe_book(instrument_id)
489                        .await
490                        .map_err(|err| anyhow::anyhow!(err))?,
491                    BitmexBookChannel::OrderBookL2_25 => ws
492                        .subscribe_book_25(instrument_id)
493                        .await
494                        .map_err(|err| anyhow::anyhow!(err))?,
495                    BitmexBookChannel::OrderBook10 => unreachable!(),
496                }
497                book_channels
498                    .write()
499                    .expect("book channel cache lock poisoned")
500                    .insert(instrument_id, channel);
501                Ok(())
502            },
503            "BitMEX book delta subscription",
504        );
505
506        Ok(())
507    }
508
509    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
510        let instrument_id = cmd.instrument_id;
511        let ws = self.ws_client()?.clone();
512        let book_channels = Arc::clone(&self.book_channels);
513
514        self.spawn_ws(
515            async move {
516                ws.subscribe_book_depth10(instrument_id)
517                    .await
518                    .map_err(|err| anyhow::anyhow!(err))?;
519                book_channels
520                    .write()
521                    .expect("book channel cache lock poisoned")
522                    .insert(instrument_id, BitmexBookChannel::OrderBook10);
523                Ok(())
524            },
525            "BitMEX book depth10 subscription",
526        );
527        Ok(())
528    }
529
530    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
531        if cmd.book_type != BookType::L2_MBP {
532            anyhow::bail!("BitMEX only supports L2_MBP order book snapshots");
533        }
534
535        let depth = cmd.depth.map_or(10, |d| d.get());
536        if depth != 10 {
537            tracing::warn!("BitMEX orderBook10 provides 10 levels; requested depth={depth}");
538        }
539
540        let instrument_id = cmd.instrument_id;
541        let ws = self.ws_client()?.clone();
542        let book_channels = Arc::clone(&self.book_channels);
543
544        self.spawn_ws(
545            async move {
546                ws.subscribe_book_depth10(instrument_id)
547                    .await
548                    .map_err(|err| anyhow::anyhow!(err))?;
549                book_channels
550                    .write()
551                    .expect("book channel cache lock poisoned")
552                    .insert(instrument_id, BitmexBookChannel::OrderBook10);
553                Ok(())
554            },
555            "BitMEX book snapshot subscription",
556        );
557        Ok(())
558    }
559
560    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
561        let instrument_id = cmd.instrument_id;
562        let ws = self.ws_client()?.clone();
563
564        self.spawn_ws(
565            async move {
566                ws.subscribe_quotes(instrument_id)
567                    .await
568                    .map_err(|err| anyhow::anyhow!(err))
569            },
570            "BitMEX quote subscription",
571        );
572        Ok(())
573    }
574
575    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
576        let instrument_id = cmd.instrument_id;
577        let ws = self.ws_client()?.clone();
578
579        self.spawn_ws(
580            async move {
581                ws.subscribe_trades(instrument_id)
582                    .await
583                    .map_err(|err| anyhow::anyhow!(err))
584            },
585            "BitMEX trade subscription",
586        );
587        Ok(())
588    }
589
590    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
591        let instrument_id = cmd.instrument_id;
592        let ws = self.ws_client()?.clone();
593
594        self.spawn_ws(
595            async move {
596                ws.subscribe_mark_prices(instrument_id)
597                    .await
598                    .map_err(|err| anyhow::anyhow!(err))
599            },
600            "BitMEX mark price subscription",
601        );
602        Ok(())
603    }
604
605    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
606        let instrument_id = cmd.instrument_id;
607        let ws = self.ws_client()?.clone();
608
609        self.spawn_ws(
610            async move {
611                ws.subscribe_index_prices(instrument_id)
612                    .await
613                    .map_err(|err| anyhow::anyhow!(err))
614            },
615            "BitMEX index price subscription",
616        );
617        Ok(())
618    }
619
620    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
621        let instrument_id = cmd.instrument_id;
622        let ws = self.ws_client()?.clone();
623
624        self.spawn_ws(
625            async move {
626                ws.subscribe_funding_rates(instrument_id)
627                    .await
628                    .map_err(|err| anyhow::anyhow!(err))
629            },
630            "BitMEX funding rate subscription",
631        );
632        Ok(())
633    }
634
635    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
636        let bar_type = cmd.bar_type;
637        let ws = self.ws_client()?.clone();
638
639        self.spawn_ws(
640            async move {
641                ws.subscribe_bars(bar_type)
642                    .await
643                    .map_err(|err| anyhow::anyhow!(err))
644            },
645            "BitMEX bar subscription",
646        );
647        Ok(())
648    }
649
650    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
651        let instrument_id = cmd.instrument_id;
652        let ws = self.ws_client()?.clone();
653        let book_channels = Arc::clone(&self.book_channels);
654
655        self.spawn_ws(
656            async move {
657                let channel = book_channels
658                    .write()
659                    .expect("book channel cache lock poisoned")
660                    .remove(&instrument_id);
661
662                match channel {
663                    Some(BitmexBookChannel::OrderBookL2) => ws
664                        .unsubscribe_book(instrument_id)
665                        .await
666                        .map_err(|err| anyhow::anyhow!(err))?,
667                    Some(BitmexBookChannel::OrderBookL2_25) => ws
668                        .unsubscribe_book_25(instrument_id)
669                        .await
670                        .map_err(|err| anyhow::anyhow!(err))?,
671                    Some(BitmexBookChannel::OrderBook10) | None => ws
672                        .unsubscribe_book(instrument_id)
673                        .await
674                        .map_err(|err| anyhow::anyhow!(err))?,
675                }
676                Ok(())
677            },
678            "BitMEX book delta unsubscribe",
679        );
680        Ok(())
681    }
682
683    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
684        let instrument_id = cmd.instrument_id;
685        let ws = self.ws_client()?.clone();
686        let book_channels = Arc::clone(&self.book_channels);
687
688        self.spawn_ws(
689            async move {
690                book_channels
691                    .write()
692                    .expect("book channel cache lock poisoned")
693                    .remove(&instrument_id);
694                ws.unsubscribe_book_depth10(instrument_id)
695                    .await
696                    .map_err(|err| anyhow::anyhow!(err))
697            },
698            "BitMEX book depth10 unsubscribe",
699        );
700        Ok(())
701    }
702
703    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
704        let instrument_id = cmd.instrument_id;
705        let ws = self.ws_client()?.clone();
706        let book_channels = Arc::clone(&self.book_channels);
707
708        self.spawn_ws(
709            async move {
710                book_channels
711                    .write()
712                    .expect("book channel cache lock poisoned")
713                    .remove(&instrument_id);
714                ws.unsubscribe_book_depth10(instrument_id)
715                    .await
716                    .map_err(|err| anyhow::anyhow!(err))
717            },
718            "BitMEX book snapshot unsubscribe",
719        );
720        Ok(())
721    }
722
723    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
724        let instrument_id = cmd.instrument_id;
725        let ws = self.ws_client()?.clone();
726
727        self.spawn_ws(
728            async move {
729                ws.unsubscribe_quotes(instrument_id)
730                    .await
731                    .map_err(|err| anyhow::anyhow!(err))
732            },
733            "BitMEX quote unsubscribe",
734        );
735        Ok(())
736    }
737
738    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
739        let instrument_id = cmd.instrument_id;
740        let ws = self.ws_client()?.clone();
741
742        self.spawn_ws(
743            async move {
744                ws.unsubscribe_trades(instrument_id)
745                    .await
746                    .map_err(|err| anyhow::anyhow!(err))
747            },
748            "BitMEX trade unsubscribe",
749        );
750        Ok(())
751    }
752
753    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
754        let ws = self.ws_client()?.clone();
755        let instrument_id = cmd.instrument_id;
756
757        self.spawn_ws(
758            async move {
759                ws.unsubscribe_mark_prices(instrument_id)
760                    .await
761                    .map_err(|err| anyhow::anyhow!(err))
762            },
763            "BitMEX mark price unsubscribe",
764        );
765        Ok(())
766    }
767
768    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
769        let ws = self.ws_client()?.clone();
770        let instrument_id = cmd.instrument_id;
771
772        self.spawn_ws(
773            async move {
774                ws.unsubscribe_index_prices(instrument_id)
775                    .await
776                    .map_err(|err| anyhow::anyhow!(err))
777            },
778            "BitMEX index price unsubscribe",
779        );
780        Ok(())
781    }
782
783    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
784        let ws = self.ws_client()?.clone();
785        let instrument_id = cmd.instrument_id;
786
787        self.spawn_ws(
788            async move {
789                ws.unsubscribe_funding_rates(instrument_id)
790                    .await
791                    .map_err(|err| anyhow::anyhow!(err))
792            },
793            "BitMEX funding rate unsubscribe",
794        );
795        Ok(())
796    }
797
798    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
799        let bar_type = cmd.bar_type;
800        let ws = self.ws_client()?.clone();
801
802        self.spawn_ws(
803            async move {
804                ws.unsubscribe_bars(bar_type)
805                    .await
806                    .map_err(|err| anyhow::anyhow!(err))
807            },
808            "BitMEX bar unsubscribe",
809        );
810        Ok(())
811    }
812
813    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
814        let venue = request.venue.unwrap_or_else(|| self.venue());
815        if let Some(req_venue) = request.venue
816            && req_venue != self.venue()
817        {
818            tracing::warn!("Ignoring mismatched venue in instruments request: {req_venue}");
819        }
820
821        let http = self.http_client.clone();
822        let instruments_cache = Arc::clone(&self.instruments);
823        let sender = self.data_sender.clone();
824        let request_id = request.request_id;
825        let client_id = request.client_id.unwrap_or(self.client_id);
826        let params = request.params.clone();
827        let start_nanos = datetime_to_unix_nanos(request.start);
828        let end_nanos = datetime_to_unix_nanos(request.end);
829        let clock = self.clock;
830        let active_only = self.config.active_only;
831
832        get_runtime().spawn(async move {
833            let http_client = http;
834            match http_client
835                .request_instruments(active_only)
836                .await
837                .context("failed to request instruments from BitMEX")
838            {
839                Ok(instruments) => {
840                    {
841                        let mut guard = instruments_cache
842                            .write()
843                            .expect("instrument cache lock poisoned");
844                        guard.clear();
845                        for instrument in &instruments {
846                            guard.insert(instrument.id(), instrument.clone());
847                            http_client.cache_instrument(instrument.clone());
848                        }
849                    }
850
851                    let response = DataResponse::Instruments(InstrumentsResponse::new(
852                        request_id,
853                        client_id,
854                        venue,
855                        instruments,
856                        start_nanos,
857                        end_nanos,
858                        clock.get_time_ns(),
859                        params,
860                    ));
861                    if let Err(e) = sender.send(DataEvent::Response(response)) {
862                        tracing::error!("Failed to send instruments response: {e}");
863                    }
864                }
865                Err(e) => tracing::error!("Instrument request failed: {e:?}"),
866            }
867        });
868
869        Ok(())
870    }
871
872    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
873        if let Some(instrument) = self
874            .instruments
875            .read()
876            .expect("instrument cache lock poisoned")
877            .get(&request.instrument_id)
878            .cloned()
879        {
880            let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
881                request.request_id,
882                request.client_id.unwrap_or(self.client_id),
883                instrument.id(),
884                instrument,
885                datetime_to_unix_nanos(request.start),
886                datetime_to_unix_nanos(request.end),
887                self.clock.get_time_ns(),
888                request.params.clone(),
889            )));
890            if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
891                tracing::error!("Failed to send instrument response: {e}");
892            }
893            return Ok(());
894        }
895
896        let http_client = self.http_client.clone();
897        let instruments_cache = Arc::clone(&self.instruments);
898        let sender = self.data_sender.clone();
899        let instrument_id = request.instrument_id;
900        let request_id = request.request_id;
901        let client_id = request.client_id.unwrap_or(self.client_id);
902        let start = request.start;
903        let end = request.end;
904        let params = request.params.clone();
905        let clock = self.clock;
906
907        get_runtime().spawn(async move {
908            match http_client
909                .request_instrument(instrument_id)
910                .await
911                .context("failed to request instrument from BitMEX")
912            {
913                Ok(Some(instrument)) => {
914                    http_client.cache_instrument(instrument.clone());
915                    {
916                        let mut guard = instruments_cache
917                            .write()
918                            .expect("instrument cache lock poisoned");
919                        guard.insert(instrument.id(), instrument.clone());
920                    }
921
922                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
923                        request_id,
924                        client_id,
925                        instrument.id(),
926                        instrument,
927                        datetime_to_unix_nanos(start),
928                        datetime_to_unix_nanos(end),
929                        clock.get_time_ns(),
930                        params,
931                    )));
932                    if let Err(e) = sender.send(DataEvent::Response(response)) {
933                        tracing::error!("Failed to send instrument response: {e}");
934                    }
935                }
936                Ok(None) => tracing::warn!("BitMEX instrument {instrument_id} not found"),
937                Err(e) => tracing::error!("Instrument request failed: {e:?}"),
938            }
939        });
940
941        Ok(())
942    }
943
944    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
945        let http = self.http_client.clone();
946        let sender = self.data_sender.clone();
947        let instrument_id = request.instrument_id;
948        let start = request.start;
949        let end = request.end;
950        let limit = request.limit.map(|n| n.get() as u32);
951        let request_id = request.request_id;
952        let client_id = request.client_id.unwrap_or(self.client_id);
953        let params = request.params.clone();
954        let clock = self.clock;
955        let start_nanos = datetime_to_unix_nanos(start);
956        let end_nanos = datetime_to_unix_nanos(end);
957
958        get_runtime().spawn(async move {
959            match http
960                .request_trades(instrument_id, start, end, limit)
961                .await
962                .context("failed to request trades from BitMEX")
963            {
964                Ok(trades) => {
965                    let response = DataResponse::Trades(TradesResponse::new(
966                        request_id,
967                        client_id,
968                        instrument_id,
969                        trades,
970                        start_nanos,
971                        end_nanos,
972                        clock.get_time_ns(),
973                        params,
974                    ));
975                    if let Err(e) = sender.send(DataEvent::Response(response)) {
976                        tracing::error!("Failed to send trades response: {e}");
977                    }
978                }
979                Err(e) => tracing::error!("Trade request failed: {e:?}"),
980            }
981        });
982
983        Ok(())
984    }
985
986    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
987        let http = self.http_client.clone();
988        let sender = self.data_sender.clone();
989        let bar_type = request.bar_type;
990        let start = request.start;
991        let end = request.end;
992        let limit = request.limit.map(|n| n.get() as u32);
993        let request_id = request.request_id;
994        let client_id = request.client_id.unwrap_or(self.client_id);
995        let params = request.params.clone();
996        let clock = self.clock;
997        let start_nanos = datetime_to_unix_nanos(start);
998        let end_nanos = datetime_to_unix_nanos(end);
999
1000        get_runtime().spawn(async move {
1001            match http
1002                .request_bars(bar_type, start, end, limit, false)
1003                .await
1004                .context("failed to request bars from BitMEX")
1005            {
1006                Ok(bars) => {
1007                    let response = DataResponse::Bars(BarsResponse::new(
1008                        request_id,
1009                        client_id,
1010                        bar_type,
1011                        bars,
1012                        start_nanos,
1013                        end_nanos,
1014                        clock.get_time_ns(),
1015                        params,
1016                    ));
1017                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1018                        tracing::error!("Failed to send bars response: {e}");
1019                    }
1020                }
1021                Err(e) => tracing::error!("Bar request failed: {e:?}"),
1022            }
1023        });
1024
1025        Ok(())
1026    }
1027}