nautilus_bitmex/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the BitMEX adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc, RwLock,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use futures_util::StreamExt;
29use nautilus_common::{
30    clients::DataClient,
31    live::{runner::get_data_event_sender, runtime::get_runtime},
32    messages::{
33        DataEvent,
34        data::{
35            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
36            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
37            SubscribeBookDeltas, SubscribeBookDepth10, SubscribeFundingRates, SubscribeIndexPrices,
38            SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
39            UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeFundingRates,
40            UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
41        },
42    },
43};
44use nautilus_core::{
45    datetime::datetime_to_unix_nanos,
46    time::{AtomicTime, get_atomic_clock_realtime},
47};
48use nautilus_model::{
49    data::Data,
50    enums::BookType,
51    identifiers::{ClientId, InstrumentId, Venue},
52    instruments::{Instrument, InstrumentAny},
53};
54use tokio::{task::JoinHandle, time::Duration};
55use tokio_util::sync::CancellationToken;
56
57use crate::{
58    common::consts::BITMEX_VENUE,
59    config::BitmexDataClientConfig,
60    http::client::BitmexHttpClient,
61    websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
62};
63
64#[derive(Clone, Copy, Debug, Eq, PartialEq)]
65enum BitmexBookChannel {
66    OrderBookL2,
67    OrderBookL2_25,
68    OrderBook10,
69}
70
71#[derive(Debug)]
72pub struct BitmexDataClient {
73    client_id: ClientId,
74    config: BitmexDataClientConfig,
75    http_client: BitmexHttpClient,
76    ws_client: Option<BitmexWebSocketClient>,
77    is_connected: AtomicBool,
78    cancellation_token: CancellationToken,
79    tasks: Vec<JoinHandle<()>>,
80    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
81    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
82    book_channels: Arc<RwLock<AHashMap<InstrumentId, BitmexBookChannel>>>,
83    clock: &'static AtomicTime,
84    instrument_refresh_active: bool,
85}
86
87impl BitmexDataClient {
88    /// Creates a new [`BitmexDataClient`] instance.
89    ///
90    /// # Errors
91    ///
92    /// Returns an error if the HTTP client cannot be constructed.
93    pub fn new(client_id: ClientId, config: BitmexDataClientConfig) -> anyhow::Result<Self> {
94        let clock = get_atomic_clock_realtime();
95        let data_sender = get_data_event_sender();
96
97        let http_client = BitmexHttpClient::new(
98            Some(config.http_base_url()),
99            config.api_key.clone(),
100            config.api_secret.clone(),
101            config.use_testnet,
102            config.http_timeout_secs,
103            config.max_retries,
104            config.retry_delay_initial_ms,
105            config.retry_delay_max_ms,
106            config.recv_window_ms,
107            config.max_requests_per_second,
108            config.max_requests_per_minute,
109            config.http_proxy_url.clone(),
110        )
111        .context("failed to construct BitMEX HTTP client")?;
112
113        Ok(Self {
114            client_id,
115            config,
116            http_client,
117            ws_client: None,
118            is_connected: AtomicBool::new(false),
119            cancellation_token: CancellationToken::new(),
120            tasks: Vec::new(),
121            data_sender,
122            instruments: Arc::new(RwLock::new(AHashMap::new())),
123            book_channels: Arc::new(RwLock::new(AHashMap::new())),
124            clock,
125            instrument_refresh_active: false,
126        })
127    }
128
129    fn venue(&self) -> Venue {
130        *BITMEX_VENUE
131    }
132
133    fn ws_client(&self) -> anyhow::Result<&BitmexWebSocketClient> {
134        self.ws_client
135            .as_ref()
136            .context("websocket client not initialized; call connect first")
137    }
138
139    fn ws_client_mut(&mut self) -> anyhow::Result<&mut BitmexWebSocketClient> {
140        self.ws_client
141            .as_mut()
142            .context("websocket client not initialized; call connect first")
143    }
144
145    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
146        if let Err(e) = sender.send(DataEvent::Data(data)) {
147            log::error!("Failed to emit data event: {e}");
148        }
149    }
150
151    fn spawn_ws<F>(&self, fut: F, context: &'static str)
152    where
153        F: Future<Output = anyhow::Result<()>> + Send + 'static,
154    {
155        get_runtime().spawn(async move {
156            if let Err(e) = fut.await {
157                log::error!("{context}: {e:?}");
158            }
159        });
160    }
161
162    fn spawn_stream_task(
163        &mut self,
164        stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
165    ) -> anyhow::Result<()> {
166        let data_sender = self.data_sender.clone();
167        let instruments = Arc::clone(&self.instruments);
168        let cancellation = self.cancellation_token.clone();
169
170        let handle = get_runtime().spawn(async move {
171            tokio::pin!(stream);
172
173            loop {
174                tokio::select! {
175                    maybe_msg = stream.next() => {
176                        match maybe_msg {
177                            Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
178                            None => {
179                                log::debug!("BitMEX websocket stream ended");
180                                break;
181                            }
182                        }
183                    }
184                    () = cancellation.cancelled() => {
185                        log::debug!("BitMEX websocket stream task cancelled");
186                        break;
187                    }
188                }
189            }
190        });
191
192        self.tasks.push(handle);
193        Ok(())
194    }
195
196    fn handle_ws_message(
197        message: NautilusWsMessage,
198        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
199        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
200    ) {
201        match message {
202            NautilusWsMessage::Data(payloads) => {
203                for data in payloads {
204                    Self::send_data(sender, data);
205                }
206            }
207            NautilusWsMessage::Instruments(insts) => {
208                let mut guard = instruments.write().expect("instrument cache lock poisoned");
209                for instrument in insts {
210                    let instrument_id = instrument.id();
211                    guard.insert(instrument_id, instrument);
212                }
213                // TODO: Send instruments to data engine
214                let _ = sender;
215            }
216            NautilusWsMessage::FundingRateUpdates(updates) => {
217                for update in updates {
218                    log::debug!(
219                        "Funding rate update received (not forwarded): instrument={}, rate={}",
220                        update.instrument_id,
221                        update.rate,
222                    );
223                }
224            }
225            NautilusWsMessage::OrderStatusReports(_)
226            | NautilusWsMessage::OrderUpdated(_)
227            | NautilusWsMessage::FillReports(_)
228            | NautilusWsMessage::PositionStatusReport(_)
229            | NautilusWsMessage::AccountState(_) => {
230                log::debug!("Ignoring trading message on data client");
231            }
232            NautilusWsMessage::Reconnected => {
233                log::info!("BitMEX websocket reconnected");
234            }
235            NautilusWsMessage::Authenticated => {
236                log::debug!("BitMEX websocket authenticated");
237            }
238        }
239    }
240
241    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
242        let http = self.http_client.clone();
243        let mut instruments = http
244            .request_instruments(self.config.active_only)
245            .await
246            .context("failed to request BitMEX instruments")?;
247
248        instruments.sort_by_key(|instrument| instrument.id());
249
250        {
251            let mut guard = self
252                .instruments
253                .write()
254                .expect("instrument cache lock poisoned");
255            guard.clear();
256            for instrument in &instruments {
257                guard.insert(instrument.id(), instrument.clone());
258            }
259        }
260
261        for instrument in &instruments {
262            self.http_client.cache_instrument(instrument.clone());
263        }
264
265        Ok(instruments)
266    }
267
268    fn is_connected(&self) -> bool {
269        self.is_connected.load(Ordering::Relaxed)
270    }
271
272    fn is_disconnected(&self) -> bool {
273        !self.is_connected()
274    }
275
276    fn maybe_spawn_instrument_refresh(&mut self) -> anyhow::Result<()> {
277        let Some(minutes) = self.config.update_instruments_interval_mins else {
278            return Ok(());
279        };
280
281        if minutes == 0 || self.instrument_refresh_active {
282            return Ok(());
283        }
284
285        let interval_secs = minutes.saturating_mul(60);
286        if interval_secs == 0 {
287            return Ok(());
288        }
289
290        let interval = Duration::from_secs(interval_secs);
291        let cancellation = self.cancellation_token.clone();
292        let instruments_cache = Arc::clone(&self.instruments);
293        let active_only = self.config.active_only;
294        let client_id = self.client_id;
295        let http_client = self.http_client.clone();
296
297        let handle = get_runtime().spawn(async move {
298            let http_client = http_client;
299            loop {
300                let sleep = tokio::time::sleep(interval);
301                tokio::pin!(sleep);
302                tokio::select! {
303                    () = cancellation.cancelled() => {
304                        log::debug!("BitMEX instrument refresh task cancelled");
305                        break;
306                    }
307                    () = &mut sleep => {
308                        match http_client.request_instruments(active_only).await {
309                            Ok(mut instruments) => {
310                                instruments.sort_by_key(|instrument| instrument.id());
311
312                                {
313                                    let mut guard = instruments_cache
314                                        .write()
315                                        .expect("instrument cache lock poisoned");
316                                    guard.clear();
317                                    for instrument in &instruments {
318                                        guard.insert(instrument.id(), instrument.clone());
319                                    }
320                                }
321
322                                for instrument in instruments {
323                                    http_client.cache_instrument(instrument);
324                                }
325
326                                log::debug!("BitMEX instruments refreshed: client_id={client_id}");
327                            }
328                            Err(e) => {
329                                log::warn!("Failed to refresh BitMEX instruments: client_id={client_id}, error={e:?}");
330                            }
331                        }
332                    }
333                }
334            }
335        });
336
337        self.tasks.push(handle);
338        self.instrument_refresh_active = true;
339        Ok(())
340    }
341}
342
343#[async_trait::async_trait(?Send)]
344impl DataClient for BitmexDataClient {
345    fn client_id(&self) -> ClientId {
346        self.client_id
347    }
348
349    fn venue(&self) -> Option<Venue> {
350        Some(self.venue())
351    }
352
353    fn start(&mut self) -> anyhow::Result<()> {
354        log::info!(
355            "Starting BitMEX data client: client_id={}, use_testnet={}, http_proxy_url={:?}, ws_proxy_url={:?}",
356            self.client_id,
357            self.config.use_testnet,
358            self.config.http_proxy_url,
359            self.config.ws_proxy_url,
360        );
361        Ok(())
362    }
363
364    fn stop(&mut self) -> anyhow::Result<()> {
365        log::info!("Stopping BitMEX data client {id}", id = self.client_id);
366        self.cancellation_token.cancel();
367        self.is_connected.store(false, Ordering::Relaxed);
368        self.instrument_refresh_active = false;
369        Ok(())
370    }
371
372    fn reset(&mut self) -> anyhow::Result<()> {
373        log::debug!("Resetting BitMEX data client {id}", id = self.client_id);
374        self.is_connected.store(false, Ordering::Relaxed);
375        self.cancellation_token = CancellationToken::new();
376        self.tasks.clear();
377        self.book_channels
378            .write()
379            .expect("book channel cache lock poisoned")
380            .clear();
381        self.instrument_refresh_active = false;
382        Ok(())
383    }
384
385    fn dispose(&mut self) -> anyhow::Result<()> {
386        self.stop()
387    }
388
389    async fn connect(&mut self) -> anyhow::Result<()> {
390        if self.is_connected() {
391            return Ok(());
392        }
393
394        if self.ws_client.is_none() {
395            let ws = BitmexWebSocketClient::new(
396                Some(self.config.ws_url()),
397                self.config.api_key.clone(),
398                self.config.api_secret.clone(),
399                None,
400                self.config.heartbeat_interval_secs,
401            )
402            .context("failed to construct BitMEX websocket client")?;
403            self.ws_client = Some(ws);
404        }
405
406        let instruments = self.bootstrap_instruments().await?;
407        if let Some(ws) = self.ws_client.as_mut() {
408            ws.cache_instruments(instruments);
409        }
410
411        let ws = self.ws_client_mut()?;
412        ws.connect()
413            .await
414            .context("failed to connect BitMEX websocket")?;
415        ws.wait_until_active(10.0)
416            .await
417            .context("BitMEX websocket did not become active")?;
418
419        let stream = ws.stream();
420        self.spawn_stream_task(stream)?;
421        self.maybe_spawn_instrument_refresh()?;
422
423        self.is_connected.store(true, Ordering::Relaxed);
424        log::info!("Connected");
425        Ok(())
426    }
427
428    async fn disconnect(&mut self) -> anyhow::Result<()> {
429        if self.is_disconnected() {
430            return Ok(());
431        }
432
433        self.cancellation_token.cancel();
434
435        if let Some(ws) = self.ws_client.as_mut()
436            && let Err(e) = ws.close().await
437        {
438            log::warn!("Error while closing BitMEX websocket: {e:?}");
439        }
440
441        for handle in self.tasks.drain(..) {
442            if let Err(e) = handle.await {
443                log::error!("Error joining websocket task: {e:?}");
444            }
445        }
446
447        self.cancellation_token = CancellationToken::new();
448        self.is_connected.store(false, Ordering::Relaxed);
449        self.book_channels
450            .write()
451            .expect("book channel cache lock poisoned")
452            .clear();
453        self.instrument_refresh_active = false;
454
455        log::info!("Disconnected");
456        Ok(())
457    }
458
459    fn is_connected(&self) -> bool {
460        self.is_connected()
461    }
462
463    fn is_disconnected(&self) -> bool {
464        self.is_disconnected()
465    }
466
467    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
468        if cmd.book_type != BookType::L2_MBP {
469            anyhow::bail!("BitMEX only supports L2_MBP order book deltas");
470        }
471
472        let instrument_id = cmd.instrument_id;
473        let depth = cmd.depth.map_or(0, |d| d.get());
474        let channel = if depth > 0 && depth <= 25 {
475            BitmexBookChannel::OrderBookL2_25
476        } else {
477            BitmexBookChannel::OrderBookL2
478        };
479
480        let ws = self.ws_client()?.clone();
481        let book_channels = Arc::clone(&self.book_channels);
482
483        self.spawn_ws(
484            async move {
485                match channel {
486                    BitmexBookChannel::OrderBookL2 => ws
487                        .subscribe_book(instrument_id)
488                        .await
489                        .map_err(|err| anyhow::anyhow!(err))?,
490                    BitmexBookChannel::OrderBookL2_25 => ws
491                        .subscribe_book_25(instrument_id)
492                        .await
493                        .map_err(|err| anyhow::anyhow!(err))?,
494                    BitmexBookChannel::OrderBook10 => unreachable!(),
495                }
496                book_channels
497                    .write()
498                    .expect("book channel cache lock poisoned")
499                    .insert(instrument_id, channel);
500                Ok(())
501            },
502            "BitMEX book delta subscription",
503        );
504
505        Ok(())
506    }
507
508    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
509        let instrument_id = cmd.instrument_id;
510        let ws = self.ws_client()?.clone();
511        let book_channels = Arc::clone(&self.book_channels);
512
513        self.spawn_ws(
514            async move {
515                ws.subscribe_book_depth10(instrument_id)
516                    .await
517                    .map_err(|err| anyhow::anyhow!(err))?;
518                book_channels
519                    .write()
520                    .expect("book channel cache lock poisoned")
521                    .insert(instrument_id, BitmexBookChannel::OrderBook10);
522                Ok(())
523            },
524            "BitMEX book depth10 subscription",
525        );
526        Ok(())
527    }
528
529    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
530        let instrument_id = cmd.instrument_id;
531        let ws = self.ws_client()?.clone();
532
533        self.spawn_ws(
534            async move {
535                ws.subscribe_quotes(instrument_id)
536                    .await
537                    .map_err(|err| anyhow::anyhow!(err))
538            },
539            "BitMEX quote subscription",
540        );
541        Ok(())
542    }
543
544    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
545        let instrument_id = cmd.instrument_id;
546        let ws = self.ws_client()?.clone();
547
548        self.spawn_ws(
549            async move {
550                ws.subscribe_trades(instrument_id)
551                    .await
552                    .map_err(|err| anyhow::anyhow!(err))
553            },
554            "BitMEX trade subscription",
555        );
556        Ok(())
557    }
558
559    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
560        let instrument_id = cmd.instrument_id;
561        let ws = self.ws_client()?.clone();
562
563        self.spawn_ws(
564            async move {
565                ws.subscribe_mark_prices(instrument_id)
566                    .await
567                    .map_err(|err| anyhow::anyhow!(err))
568            },
569            "BitMEX mark price subscription",
570        );
571        Ok(())
572    }
573
574    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
575        let instrument_id = cmd.instrument_id;
576        let ws = self.ws_client()?.clone();
577
578        self.spawn_ws(
579            async move {
580                ws.subscribe_index_prices(instrument_id)
581                    .await
582                    .map_err(|err| anyhow::anyhow!(err))
583            },
584            "BitMEX index price subscription",
585        );
586        Ok(())
587    }
588
589    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
590        let instrument_id = cmd.instrument_id;
591        let ws = self.ws_client()?.clone();
592
593        self.spawn_ws(
594            async move {
595                ws.subscribe_funding_rates(instrument_id)
596                    .await
597                    .map_err(|err| anyhow::anyhow!(err))
598            },
599            "BitMEX funding rate subscription",
600        );
601        Ok(())
602    }
603
604    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
605        let bar_type = cmd.bar_type;
606        let ws = self.ws_client()?.clone();
607
608        self.spawn_ws(
609            async move {
610                ws.subscribe_bars(bar_type)
611                    .await
612                    .map_err(|err| anyhow::anyhow!(err))
613            },
614            "BitMEX bar subscription",
615        );
616        Ok(())
617    }
618
619    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
620        let instrument_id = cmd.instrument_id;
621        let ws = self.ws_client()?.clone();
622        let book_channels = Arc::clone(&self.book_channels);
623
624        self.spawn_ws(
625            async move {
626                let channel = book_channels
627                    .write()
628                    .expect("book channel cache lock poisoned")
629                    .remove(&instrument_id);
630
631                match channel {
632                    Some(BitmexBookChannel::OrderBookL2) => ws
633                        .unsubscribe_book(instrument_id)
634                        .await
635                        .map_err(|err| anyhow::anyhow!(err))?,
636                    Some(BitmexBookChannel::OrderBookL2_25) => ws
637                        .unsubscribe_book_25(instrument_id)
638                        .await
639                        .map_err(|err| anyhow::anyhow!(err))?,
640                    Some(BitmexBookChannel::OrderBook10) | None => ws
641                        .unsubscribe_book(instrument_id)
642                        .await
643                        .map_err(|err| anyhow::anyhow!(err))?,
644                }
645                Ok(())
646            },
647            "BitMEX book delta unsubscribe",
648        );
649        Ok(())
650    }
651
652    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
653        let instrument_id = cmd.instrument_id;
654        let ws = self.ws_client()?.clone();
655        let book_channels = Arc::clone(&self.book_channels);
656
657        self.spawn_ws(
658            async move {
659                book_channels
660                    .write()
661                    .expect("book channel cache lock poisoned")
662                    .remove(&instrument_id);
663                ws.unsubscribe_book_depth10(instrument_id)
664                    .await
665                    .map_err(|err| anyhow::anyhow!(err))
666            },
667            "BitMEX book depth10 unsubscribe",
668        );
669        Ok(())
670    }
671
672    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
673        let instrument_id = cmd.instrument_id;
674        let ws = self.ws_client()?.clone();
675
676        self.spawn_ws(
677            async move {
678                ws.unsubscribe_quotes(instrument_id)
679                    .await
680                    .map_err(|err| anyhow::anyhow!(err))
681            },
682            "BitMEX quote unsubscribe",
683        );
684        Ok(())
685    }
686
687    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
688        let instrument_id = cmd.instrument_id;
689        let ws = self.ws_client()?.clone();
690
691        self.spawn_ws(
692            async move {
693                ws.unsubscribe_trades(instrument_id)
694                    .await
695                    .map_err(|err| anyhow::anyhow!(err))
696            },
697            "BitMEX trade unsubscribe",
698        );
699        Ok(())
700    }
701
702    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
703        let ws = self.ws_client()?.clone();
704        let instrument_id = cmd.instrument_id;
705
706        self.spawn_ws(
707            async move {
708                ws.unsubscribe_mark_prices(instrument_id)
709                    .await
710                    .map_err(|err| anyhow::anyhow!(err))
711            },
712            "BitMEX mark price unsubscribe",
713        );
714        Ok(())
715    }
716
717    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
718        let ws = self.ws_client()?.clone();
719        let instrument_id = cmd.instrument_id;
720
721        self.spawn_ws(
722            async move {
723                ws.unsubscribe_index_prices(instrument_id)
724                    .await
725                    .map_err(|err| anyhow::anyhow!(err))
726            },
727            "BitMEX index price unsubscribe",
728        );
729        Ok(())
730    }
731
732    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
733        let ws = self.ws_client()?.clone();
734        let instrument_id = cmd.instrument_id;
735
736        self.spawn_ws(
737            async move {
738                ws.unsubscribe_funding_rates(instrument_id)
739                    .await
740                    .map_err(|err| anyhow::anyhow!(err))
741            },
742            "BitMEX funding rate unsubscribe",
743        );
744        Ok(())
745    }
746
747    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
748        let bar_type = cmd.bar_type;
749        let ws = self.ws_client()?.clone();
750
751        self.spawn_ws(
752            async move {
753                ws.unsubscribe_bars(bar_type)
754                    .await
755                    .map_err(|err| anyhow::anyhow!(err))
756            },
757            "BitMEX bar unsubscribe",
758        );
759        Ok(())
760    }
761
762    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
763        let venue = request.venue.unwrap_or_else(|| self.venue());
764        if let Some(req_venue) = request.venue
765            && req_venue != self.venue()
766        {
767            log::warn!("Ignoring mismatched venue in instruments request: {req_venue}");
768        }
769
770        let http = self.http_client.clone();
771        let instruments_cache = Arc::clone(&self.instruments);
772        let sender = self.data_sender.clone();
773        let request_id = request.request_id;
774        let client_id = request.client_id.unwrap_or(self.client_id);
775        let params = request.params.clone();
776        let start_nanos = datetime_to_unix_nanos(request.start);
777        let end_nanos = datetime_to_unix_nanos(request.end);
778        let clock = self.clock;
779        let active_only = self.config.active_only;
780
781        get_runtime().spawn(async move {
782            let http_client = http;
783            match http_client
784                .request_instruments(active_only)
785                .await
786                .context("failed to request instruments from BitMEX")
787            {
788                Ok(instruments) => {
789                    {
790                        let mut guard = instruments_cache
791                            .write()
792                            .expect("instrument cache lock poisoned");
793                        guard.clear();
794                        for instrument in &instruments {
795                            guard.insert(instrument.id(), instrument.clone());
796                            http_client.cache_instrument(instrument.clone());
797                        }
798                    }
799
800                    let response = DataResponse::Instruments(InstrumentsResponse::new(
801                        request_id,
802                        client_id,
803                        venue,
804                        instruments,
805                        start_nanos,
806                        end_nanos,
807                        clock.get_time_ns(),
808                        params,
809                    ));
810                    if let Err(e) = sender.send(DataEvent::Response(response)) {
811                        log::error!("Failed to send instruments response: {e}");
812                    }
813                }
814                Err(e) => log::error!("Instrument request failed: {e:?}"),
815            }
816        });
817
818        Ok(())
819    }
820
821    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
822        if let Some(instrument) = self
823            .instruments
824            .read()
825            .expect("instrument cache lock poisoned")
826            .get(&request.instrument_id)
827            .cloned()
828        {
829            let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
830                request.request_id,
831                request.client_id.unwrap_or(self.client_id),
832                instrument.id(),
833                instrument,
834                datetime_to_unix_nanos(request.start),
835                datetime_to_unix_nanos(request.end),
836                self.clock.get_time_ns(),
837                request.params.clone(),
838            )));
839            if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
840                log::error!("Failed to send instrument response: {e}");
841            }
842            return Ok(());
843        }
844
845        let http_client = self.http_client.clone();
846        let instruments_cache = Arc::clone(&self.instruments);
847        let sender = self.data_sender.clone();
848        let instrument_id = request.instrument_id;
849        let request_id = request.request_id;
850        let client_id = request.client_id.unwrap_or(self.client_id);
851        let start = request.start;
852        let end = request.end;
853        let params = request.params.clone();
854        let clock = self.clock;
855
856        get_runtime().spawn(async move {
857            match http_client
858                .request_instrument(instrument_id)
859                .await
860                .context("failed to request instrument from BitMEX")
861            {
862                Ok(Some(instrument)) => {
863                    http_client.cache_instrument(instrument.clone());
864                    {
865                        let mut guard = instruments_cache
866                            .write()
867                            .expect("instrument cache lock poisoned");
868                        guard.insert(instrument.id(), instrument.clone());
869                    }
870
871                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
872                        request_id,
873                        client_id,
874                        instrument.id(),
875                        instrument,
876                        datetime_to_unix_nanos(start),
877                        datetime_to_unix_nanos(end),
878                        clock.get_time_ns(),
879                        params,
880                    )));
881                    if let Err(e) = sender.send(DataEvent::Response(response)) {
882                        log::error!("Failed to send instrument response: {e}");
883                    }
884                }
885                Ok(None) => log::warn!("BitMEX instrument {instrument_id} not found"),
886                Err(e) => log::error!("Instrument request failed: {e:?}"),
887            }
888        });
889
890        Ok(())
891    }
892
893    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
894        let http = self.http_client.clone();
895        let sender = self.data_sender.clone();
896        let instrument_id = request.instrument_id;
897        let start = request.start;
898        let end = request.end;
899        let limit = request.limit.map(|n| n.get() as u32);
900        let request_id = request.request_id;
901        let client_id = request.client_id.unwrap_or(self.client_id);
902        let params = request.params.clone();
903        let clock = self.clock;
904        let start_nanos = datetime_to_unix_nanos(start);
905        let end_nanos = datetime_to_unix_nanos(end);
906
907        get_runtime().spawn(async move {
908            match http
909                .request_trades(instrument_id, start, end, limit)
910                .await
911                .context("failed to request trades from BitMEX")
912            {
913                Ok(trades) => {
914                    let response = DataResponse::Trades(TradesResponse::new(
915                        request_id,
916                        client_id,
917                        instrument_id,
918                        trades,
919                        start_nanos,
920                        end_nanos,
921                        clock.get_time_ns(),
922                        params,
923                    ));
924                    if let Err(e) = sender.send(DataEvent::Response(response)) {
925                        log::error!("Failed to send trades response: {e}");
926                    }
927                }
928                Err(e) => log::error!("Trade request failed: {e:?}"),
929            }
930        });
931
932        Ok(())
933    }
934
935    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
936        let http = self.http_client.clone();
937        let sender = self.data_sender.clone();
938        let bar_type = request.bar_type;
939        let start = request.start;
940        let end = request.end;
941        let limit = request.limit.map(|n| n.get() as u32);
942        let request_id = request.request_id;
943        let client_id = request.client_id.unwrap_or(self.client_id);
944        let params = request.params.clone();
945        let clock = self.clock;
946        let start_nanos = datetime_to_unix_nanos(start);
947        let end_nanos = datetime_to_unix_nanos(end);
948
949        get_runtime().spawn(async move {
950            match http
951                .request_bars(bar_type, start, end, limit, false)
952                .await
953                .context("failed to request bars from BitMEX")
954            {
955                Ok(bars) => {
956                    let response = DataResponse::Bars(BarsResponse::new(
957                        request_id,
958                        client_id,
959                        bar_type,
960                        bars,
961                        start_nanos,
962                        end_nanos,
963                        clock.get_time_ns(),
964                        params,
965                    ));
966                    if let Err(e) = sender.send(DataEvent::Response(response)) {
967                        log::error!("Failed to send bars response: {e}");
968                    }
969                }
970                Err(e) => log::error!("Bar request failed: {e:?}"),
971            }
972        });
973
974        Ok(())
975    }
976}