Skip to main content

nautilus_bitmex/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the BitMEX adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc, RwLock,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use futures_util::StreamExt;
29use nautilus_common::{
30    clients::DataClient,
31    live::{runner::get_data_event_sender, runtime::get_runtime},
32    messages::{
33        DataEvent,
34        data::{
35            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
36            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
37            SubscribeBookDeltas, SubscribeBookDepth10, SubscribeFundingRates, SubscribeIndexPrices,
38            SubscribeInstrument, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
39            SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
40            UnsubscribeBookDepth10, UnsubscribeFundingRates, UnsubscribeIndexPrices,
41            UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
42        },
43    },
44};
45use nautilus_core::{
46    datetime::datetime_to_unix_nanos,
47    time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_model::{
50    data::Data,
51    enums::BookType,
52    identifiers::{ClientId, InstrumentId, Venue},
53    instruments::{Instrument, InstrumentAny},
54};
55use tokio::{task::JoinHandle, time::Duration};
56use tokio_util::sync::CancellationToken;
57
58use crate::{
59    common::consts::BITMEX_VENUE,
60    config::BitmexDataClientConfig,
61    http::client::BitmexHttpClient,
62    websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
63};
64
65#[derive(Clone, Copy, Debug, Eq, PartialEq)]
66enum BitmexBookChannel {
67    OrderBookL2,
68    OrderBookL2_25,
69    OrderBook10,
70}
71
72#[derive(Debug)]
73pub struct BitmexDataClient {
74    client_id: ClientId,
75    config: BitmexDataClientConfig,
76    http_client: BitmexHttpClient,
77    ws_client: Option<BitmexWebSocketClient>,
78    is_connected: AtomicBool,
79    cancellation_token: CancellationToken,
80    tasks: Vec<JoinHandle<()>>,
81    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
82    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
83    book_channels: Arc<RwLock<AHashMap<InstrumentId, BitmexBookChannel>>>,
84    clock: &'static AtomicTime,
85    instrument_refresh_active: bool,
86}
87
88impl BitmexDataClient {
89    /// Creates a new [`BitmexDataClient`] instance.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if the HTTP client cannot be constructed.
94    pub fn new(client_id: ClientId, config: BitmexDataClientConfig) -> anyhow::Result<Self> {
95        let clock = get_atomic_clock_realtime();
96        let data_sender = get_data_event_sender();
97
98        let http_client = BitmexHttpClient::new(
99            Some(config.http_base_url()),
100            config.api_key.clone(),
101            config.api_secret.clone(),
102            config.use_testnet,
103            config.http_timeout_secs,
104            config.max_retries,
105            config.retry_delay_initial_ms,
106            config.retry_delay_max_ms,
107            config.recv_window_ms,
108            config.max_requests_per_second,
109            config.max_requests_per_minute,
110            config.http_proxy_url.clone(),
111        )
112        .context("failed to construct BitMEX HTTP client")?;
113
114        Ok(Self {
115            client_id,
116            config,
117            http_client,
118            ws_client: None,
119            is_connected: AtomicBool::new(false),
120            cancellation_token: CancellationToken::new(),
121            tasks: Vec::new(),
122            data_sender,
123            instruments: Arc::new(RwLock::new(AHashMap::new())),
124            book_channels: Arc::new(RwLock::new(AHashMap::new())),
125            clock,
126            instrument_refresh_active: false,
127        })
128    }
129
130    fn venue(&self) -> Venue {
131        *BITMEX_VENUE
132    }
133
134    fn ws_client(&self) -> anyhow::Result<&BitmexWebSocketClient> {
135        self.ws_client
136            .as_ref()
137            .context("websocket client not initialized; call connect first")
138    }
139
140    fn ws_client_mut(&mut self) -> anyhow::Result<&mut BitmexWebSocketClient> {
141        self.ws_client
142            .as_mut()
143            .context("websocket client not initialized; call connect first")
144    }
145
146    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
147        if let Err(e) = sender.send(DataEvent::Data(data)) {
148            log::error!("Failed to emit data event: {e}");
149        }
150    }
151
152    fn spawn_ws<F>(&self, fut: F, context: &'static str)
153    where
154        F: Future<Output = anyhow::Result<()>> + Send + 'static,
155    {
156        get_runtime().spawn(async move {
157            if let Err(e) = fut.await {
158                log::error!("{context}: {e:?}");
159            }
160        });
161    }
162
163    fn spawn_stream_task(
164        &mut self,
165        stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
166    ) -> anyhow::Result<()> {
167        let data_sender = self.data_sender.clone();
168        let instruments = Arc::clone(&self.instruments);
169        let cancellation = self.cancellation_token.clone();
170
171        let handle = get_runtime().spawn(async move {
172            tokio::pin!(stream);
173
174            loop {
175                tokio::select! {
176                    maybe_msg = stream.next() => {
177                        match maybe_msg {
178                            Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
179                            None => {
180                                log::debug!("BitMEX websocket stream ended");
181                                break;
182                            }
183                        }
184                    }
185                    () = cancellation.cancelled() => {
186                        log::debug!("BitMEX websocket stream task cancelled");
187                        break;
188                    }
189                }
190            }
191        });
192
193        self.tasks.push(handle);
194        Ok(())
195    }
196
197    fn handle_ws_message(
198        message: NautilusWsMessage,
199        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
200        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
201    ) {
202        match message {
203            NautilusWsMessage::Data(payloads) => {
204                for data in payloads {
205                    Self::send_data(sender, data);
206                }
207            }
208            NautilusWsMessage::Instruments(insts) => {
209                let mut guard = instruments.write().expect("instrument cache lock poisoned");
210                for instrument in insts {
211                    let instrument_id = instrument.id();
212                    guard.insert(instrument_id, instrument.clone());
213                    if let Err(e) = sender.send(DataEvent::Instrument(instrument)) {
214                        log::error!("Failed to send instrument event: {e}");
215                    }
216                }
217            }
218            NautilusWsMessage::FundingRateUpdates(updates) => {
219                for update in updates {
220                    log::debug!(
221                        "Funding rate update: instrument={}, rate={}",
222                        update.instrument_id,
223                        update.rate,
224                    );
225                    if let Err(e) = sender.send(DataEvent::FundingRate(update)) {
226                        log::error!("Failed to emit funding rate event: {e}");
227                    }
228                }
229            }
230            NautilusWsMessage::OrderStatusReports(_)
231            | NautilusWsMessage::OrderUpdated(_)
232            | NautilusWsMessage::OrderUpdates(_)
233            | NautilusWsMessage::FillReports(_)
234            | NautilusWsMessage::PositionStatusReports(_)
235            | NautilusWsMessage::AccountStates(_) => {
236                log::debug!("Ignoring trading message on data client");
237            }
238            NautilusWsMessage::Reconnected => {
239                log::info!("BitMEX websocket reconnected");
240            }
241            NautilusWsMessage::Authenticated => {
242                log::debug!("BitMEX websocket authenticated");
243            }
244        }
245    }
246
247    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
248        let http = self.http_client.clone();
249        let mut instruments = http
250            .request_instruments(self.config.active_only)
251            .await
252            .context("failed to request BitMEX instruments")?;
253
254        instruments.sort_by_key(|instrument| instrument.id());
255
256        {
257            let mut guard = self
258                .instruments
259                .write()
260                .expect("instrument cache lock poisoned");
261            guard.clear();
262            for instrument in &instruments {
263                guard.insert(instrument.id(), instrument.clone());
264            }
265        }
266
267        for instrument in &instruments {
268            self.http_client.cache_instrument(instrument.clone());
269        }
270
271        Ok(instruments)
272    }
273
274    fn is_connected(&self) -> bool {
275        self.is_connected.load(Ordering::Relaxed)
276    }
277
278    fn is_disconnected(&self) -> bool {
279        !self.is_connected()
280    }
281
282    fn maybe_spawn_instrument_refresh(&mut self) -> anyhow::Result<()> {
283        let Some(minutes) = self.config.update_instruments_interval_mins else {
284            return Ok(());
285        };
286
287        if minutes == 0 || self.instrument_refresh_active {
288            return Ok(());
289        }
290
291        let interval_secs = minutes.saturating_mul(60);
292        if interval_secs == 0 {
293            return Ok(());
294        }
295
296        let interval = Duration::from_secs(interval_secs);
297        let cancellation = self.cancellation_token.clone();
298        let instruments_cache = Arc::clone(&self.instruments);
299        let active_only = self.config.active_only;
300        let client_id = self.client_id;
301        let http_client = self.http_client.clone();
302
303        let handle = get_runtime().spawn(async move {
304            let http_client = http_client;
305            loop {
306                let sleep = tokio::time::sleep(interval);
307                tokio::pin!(sleep);
308                tokio::select! {
309                    () = cancellation.cancelled() => {
310                        log::debug!("BitMEX instrument refresh task cancelled");
311                        break;
312                    }
313                    () = &mut sleep => {
314                        match http_client.request_instruments(active_only).await {
315                            Ok(mut instruments) => {
316                                instruments.sort_by_key(|instrument| instrument.id());
317
318                                {
319                                    let mut guard = instruments_cache
320                                        .write()
321                                        .expect("instrument cache lock poisoned");
322                                    guard.clear();
323                                    for instrument in &instruments {
324                                        guard.insert(instrument.id(), instrument.clone());
325                                    }
326                                }
327
328                                for instrument in instruments {
329                                    http_client.cache_instrument(instrument);
330                                }
331
332                                log::debug!("BitMEX instruments refreshed: client_id={client_id}");
333                            }
334                            Err(e) => {
335                                log::warn!("Failed to refresh BitMEX instruments: client_id={client_id}, error={e:?}");
336                            }
337                        }
338                    }
339                }
340            }
341        });
342
343        self.tasks.push(handle);
344        self.instrument_refresh_active = true;
345        Ok(())
346    }
347}
348
349#[async_trait::async_trait(?Send)]
350impl DataClient for BitmexDataClient {
351    fn client_id(&self) -> ClientId {
352        self.client_id
353    }
354
355    fn venue(&self) -> Option<Venue> {
356        Some(self.venue())
357    }
358
359    fn start(&mut self) -> anyhow::Result<()> {
360        log::info!(
361            "Starting BitMEX data client: client_id={}, use_testnet={}, http_proxy_url={:?}, ws_proxy_url={:?}",
362            self.client_id,
363            self.config.use_testnet,
364            self.config.http_proxy_url,
365            self.config.ws_proxy_url,
366        );
367        Ok(())
368    }
369
370    fn stop(&mut self) -> anyhow::Result<()> {
371        log::info!("Stopping BitMEX data client {id}", id = self.client_id);
372        self.cancellation_token.cancel();
373        self.is_connected.store(false, Ordering::Relaxed);
374        self.instrument_refresh_active = false;
375        Ok(())
376    }
377
378    fn reset(&mut self) -> anyhow::Result<()> {
379        log::debug!("Resetting BitMEX data client {id}", id = self.client_id);
380        self.is_connected.store(false, Ordering::Relaxed);
381        self.cancellation_token = CancellationToken::new();
382        self.tasks.clear();
383        self.book_channels
384            .write()
385            .expect("book channel cache lock poisoned")
386            .clear();
387        self.instrument_refresh_active = false;
388        Ok(())
389    }
390
391    fn dispose(&mut self) -> anyhow::Result<()> {
392        self.stop()
393    }
394
395    async fn connect(&mut self) -> anyhow::Result<()> {
396        if self.is_connected() {
397            return Ok(());
398        }
399
400        if self.ws_client.is_none() {
401            let ws = BitmexWebSocketClient::new(
402                Some(self.config.ws_url()),
403                self.config.api_key.clone(),
404                self.config.api_secret.clone(),
405                None,
406                self.config.heartbeat_interval_secs,
407            )
408            .context("failed to construct BitMEX websocket client")?;
409            self.ws_client = Some(ws);
410        }
411
412        let instruments = self.bootstrap_instruments().await?;
413        if let Some(ws) = self.ws_client.as_mut() {
414            ws.cache_instruments(instruments);
415        }
416
417        let ws = self.ws_client_mut()?;
418        ws.connect()
419            .await
420            .context("failed to connect BitMEX websocket")?;
421        ws.wait_until_active(10.0)
422            .await
423            .context("BitMEX websocket did not become active")?;
424
425        let stream = ws.stream();
426        self.spawn_stream_task(stream)?;
427        self.maybe_spawn_instrument_refresh()?;
428
429        self.is_connected.store(true, Ordering::Relaxed);
430        log::info!("Connected");
431        Ok(())
432    }
433
434    async fn disconnect(&mut self) -> anyhow::Result<()> {
435        if self.is_disconnected() {
436            return Ok(());
437        }
438
439        self.cancellation_token.cancel();
440
441        if let Some(ws) = self.ws_client.as_mut()
442            && let Err(e) = ws.close().await
443        {
444            log::warn!("Error while closing BitMEX websocket: {e:?}");
445        }
446
447        for handle in self.tasks.drain(..) {
448            if let Err(e) = handle.await {
449                log::error!("Error joining websocket task: {e:?}");
450            }
451        }
452
453        self.cancellation_token = CancellationToken::new();
454        self.is_connected.store(false, Ordering::Relaxed);
455        self.book_channels
456            .write()
457            .expect("book channel cache lock poisoned")
458            .clear();
459        self.instrument_refresh_active = false;
460
461        log::info!("Disconnected");
462        Ok(())
463    }
464
465    fn is_connected(&self) -> bool {
466        self.is_connected()
467    }
468
469    fn is_disconnected(&self) -> bool {
470        self.is_disconnected()
471    }
472
473    fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
474        let ws = self.ws_client()?.clone();
475
476        self.spawn_ws(
477            async move {
478                ws.subscribe_instruments()
479                    .await
480                    .map_err(|e| anyhow::anyhow!(e))
481            },
482            "BitMEX instruments subscription",
483        );
484        Ok(())
485    }
486
487    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
488        let instrument_id = cmd.instrument_id;
489
490        if let Some(instrument) = self
491            .instruments
492            .read()
493            .expect("instrument cache lock poisoned")
494            .get(&instrument_id)
495            .cloned()
496        {
497            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
498                log::error!("Failed to send instrument event for {instrument_id}: {e}");
499            }
500            return Ok(());
501        }
502
503        log::warn!("Instrument {instrument_id} not found in BitMEX cache");
504
505        let ws = self.ws_client()?.clone();
506        self.spawn_ws(
507            async move {
508                ws.subscribe_instrument(instrument_id)
509                    .await
510                    .map_err(|e| anyhow::anyhow!(e))
511            },
512            "BitMEX instrument subscription",
513        );
514
515        Ok(())
516    }
517
518    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
519        if cmd.book_type != BookType::L2_MBP {
520            anyhow::bail!("BitMEX only supports L2_MBP order book deltas");
521        }
522
523        let instrument_id = cmd.instrument_id;
524        let depth = cmd.depth.map_or(0, |d| d.get());
525        let channel = if depth > 0 && depth <= 25 {
526            if depth != 25 {
527                log::info!(
528                    "BitMEX only supports depth 25 for L2 deltas, using L2_25 for requested depth {depth}"
529                );
530            }
531            BitmexBookChannel::OrderBookL2_25
532        } else {
533            BitmexBookChannel::OrderBookL2
534        };
535
536        let ws = self.ws_client()?.clone();
537        let book_channels = Arc::clone(&self.book_channels);
538
539        self.spawn_ws(
540            async move {
541                match channel {
542                    BitmexBookChannel::OrderBookL2 => ws
543                        .subscribe_book(instrument_id)
544                        .await
545                        .map_err(|e| anyhow::anyhow!(e))?,
546                    BitmexBookChannel::OrderBookL2_25 => ws
547                        .subscribe_book_25(instrument_id)
548                        .await
549                        .map_err(|e| anyhow::anyhow!(e))?,
550                    BitmexBookChannel::OrderBook10 => unreachable!(),
551                }
552                book_channels
553                    .write()
554                    .expect("book channel cache lock poisoned")
555                    .insert(instrument_id, channel);
556                Ok(())
557            },
558            "BitMEX book delta subscription",
559        );
560
561        Ok(())
562    }
563
564    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
565        let instrument_id = cmd.instrument_id;
566        let ws = self.ws_client()?.clone();
567        let book_channels = Arc::clone(&self.book_channels);
568
569        self.spawn_ws(
570            async move {
571                ws.subscribe_book_depth10(instrument_id)
572                    .await
573                    .map_err(|e| anyhow::anyhow!(e))?;
574                book_channels
575                    .write()
576                    .expect("book channel cache lock poisoned")
577                    .insert(instrument_id, BitmexBookChannel::OrderBook10);
578                Ok(())
579            },
580            "BitMEX book depth10 subscription",
581        );
582        Ok(())
583    }
584
585    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
586        let instrument_id = cmd.instrument_id;
587        let ws = self.ws_client()?.clone();
588
589        self.spawn_ws(
590            async move {
591                ws.subscribe_quotes(instrument_id)
592                    .await
593                    .map_err(|e| anyhow::anyhow!(e))
594            },
595            "BitMEX quote subscription",
596        );
597        Ok(())
598    }
599
600    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
601        let instrument_id = cmd.instrument_id;
602        let ws = self.ws_client()?.clone();
603
604        self.spawn_ws(
605            async move {
606                ws.subscribe_trades(instrument_id)
607                    .await
608                    .map_err(|e| anyhow::anyhow!(e))
609            },
610            "BitMEX trade subscription",
611        );
612        Ok(())
613    }
614
615    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
616        let instrument_id = cmd.instrument_id;
617        let ws = self.ws_client()?.clone();
618
619        self.spawn_ws(
620            async move {
621                ws.subscribe_mark_prices(instrument_id)
622                    .await
623                    .map_err(|e| anyhow::anyhow!(e))
624            },
625            "BitMEX mark price subscription",
626        );
627        Ok(())
628    }
629
630    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
631        let instrument_id = cmd.instrument_id;
632        let ws = self.ws_client()?.clone();
633
634        self.spawn_ws(
635            async move {
636                ws.subscribe_index_prices(instrument_id)
637                    .await
638                    .map_err(|e| anyhow::anyhow!(e))
639            },
640            "BitMEX index price subscription",
641        );
642        Ok(())
643    }
644
645    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
646        let instrument_id = cmd.instrument_id;
647        let ws = self.ws_client()?.clone();
648
649        self.spawn_ws(
650            async move {
651                ws.subscribe_funding_rates(instrument_id)
652                    .await
653                    .map_err(|e| anyhow::anyhow!(e))
654            },
655            "BitMEX funding rate subscription",
656        );
657        Ok(())
658    }
659
660    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
661        let bar_type = cmd.bar_type;
662        let ws = self.ws_client()?.clone();
663
664        self.spawn_ws(
665            async move {
666                ws.subscribe_bars(bar_type)
667                    .await
668                    .map_err(|e| anyhow::anyhow!(e))
669            },
670            "BitMEX bar subscription",
671        );
672        Ok(())
673    }
674
675    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
676        let instrument_id = cmd.instrument_id;
677        let ws = self.ws_client()?.clone();
678        let book_channels = Arc::clone(&self.book_channels);
679
680        self.spawn_ws(
681            async move {
682                let channel = book_channels
683                    .write()
684                    .expect("book channel cache lock poisoned")
685                    .remove(&instrument_id);
686
687                match channel {
688                    Some(BitmexBookChannel::OrderBookL2) => ws
689                        .unsubscribe_book(instrument_id)
690                        .await
691                        .map_err(|e| anyhow::anyhow!(e))?,
692                    Some(BitmexBookChannel::OrderBookL2_25) => ws
693                        .unsubscribe_book_25(instrument_id)
694                        .await
695                        .map_err(|e| anyhow::anyhow!(e))?,
696                    Some(BitmexBookChannel::OrderBook10) => ws
697                        .unsubscribe_book_depth10(instrument_id)
698                        .await
699                        .map_err(|e| anyhow::anyhow!(e))?,
700                    None => ws
701                        .unsubscribe_book(instrument_id)
702                        .await
703                        .map_err(|e| anyhow::anyhow!(e))?,
704                }
705                Ok(())
706            },
707            "BitMEX book delta unsubscribe",
708        );
709        Ok(())
710    }
711
712    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
713        let instrument_id = cmd.instrument_id;
714        let ws = self.ws_client()?.clone();
715        let book_channels = Arc::clone(&self.book_channels);
716
717        self.spawn_ws(
718            async move {
719                book_channels
720                    .write()
721                    .expect("book channel cache lock poisoned")
722                    .remove(&instrument_id);
723                ws.unsubscribe_book_depth10(instrument_id)
724                    .await
725                    .map_err(|e| anyhow::anyhow!(e))
726            },
727            "BitMEX book depth10 unsubscribe",
728        );
729        Ok(())
730    }
731
732    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
733        let instrument_id = cmd.instrument_id;
734        let ws = self.ws_client()?.clone();
735
736        self.spawn_ws(
737            async move {
738                ws.unsubscribe_quotes(instrument_id)
739                    .await
740                    .map_err(|e| anyhow::anyhow!(e))
741            },
742            "BitMEX quote unsubscribe",
743        );
744        Ok(())
745    }
746
747    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
748        let instrument_id = cmd.instrument_id;
749        let ws = self.ws_client()?.clone();
750
751        self.spawn_ws(
752            async move {
753                ws.unsubscribe_trades(instrument_id)
754                    .await
755                    .map_err(|e| anyhow::anyhow!(e))
756            },
757            "BitMEX trade unsubscribe",
758        );
759        Ok(())
760    }
761
762    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
763        let ws = self.ws_client()?.clone();
764        let instrument_id = cmd.instrument_id;
765
766        self.spawn_ws(
767            async move {
768                ws.unsubscribe_mark_prices(instrument_id)
769                    .await
770                    .map_err(|e| anyhow::anyhow!(e))
771            },
772            "BitMEX mark price unsubscribe",
773        );
774        Ok(())
775    }
776
777    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
778        let ws = self.ws_client()?.clone();
779        let instrument_id = cmd.instrument_id;
780
781        self.spawn_ws(
782            async move {
783                ws.unsubscribe_index_prices(instrument_id)
784                    .await
785                    .map_err(|e| anyhow::anyhow!(e))
786            },
787            "BitMEX index price unsubscribe",
788        );
789        Ok(())
790    }
791
792    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
793        let ws = self.ws_client()?.clone();
794        let instrument_id = cmd.instrument_id;
795
796        self.spawn_ws(
797            async move {
798                ws.unsubscribe_funding_rates(instrument_id)
799                    .await
800                    .map_err(|e| anyhow::anyhow!(e))
801            },
802            "BitMEX funding rate unsubscribe",
803        );
804        Ok(())
805    }
806
807    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
808        let bar_type = cmd.bar_type;
809        let ws = self.ws_client()?.clone();
810
811        self.spawn_ws(
812            async move {
813                ws.unsubscribe_bars(bar_type)
814                    .await
815                    .map_err(|e| anyhow::anyhow!(e))
816            },
817            "BitMEX bar unsubscribe",
818        );
819        Ok(())
820    }
821
822    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
823        if let Some(req_venue) = request.venue
824            && req_venue != self.venue()
825        {
826            log::warn!("Ignoring mismatched venue in instruments request: {req_venue}");
827        }
828        let venue = self.venue();
829
830        let http = self.http_client.clone();
831        let instruments_cache = Arc::clone(&self.instruments);
832        let sender = self.data_sender.clone();
833        let request_id = request.request_id;
834        let client_id = request.client_id.unwrap_or(self.client_id);
835        let params = request.params;
836        let start_nanos = datetime_to_unix_nanos(request.start);
837        let end_nanos = datetime_to_unix_nanos(request.end);
838        let clock = self.clock;
839        let active_only = self.config.active_only;
840
841        get_runtime().spawn(async move {
842            let http_client = http;
843            match http_client
844                .request_instruments(active_only)
845                .await
846                .context("failed to request instruments from BitMEX")
847            {
848                Ok(instruments) => {
849                    {
850                        let mut guard = instruments_cache
851                            .write()
852                            .expect("instrument cache lock poisoned");
853                        guard.clear();
854                        for instrument in &instruments {
855                            guard.insert(instrument.id(), instrument.clone());
856                            http_client.cache_instrument(instrument.clone());
857                        }
858                    }
859
860                    let response = DataResponse::Instruments(InstrumentsResponse::new(
861                        request_id,
862                        client_id,
863                        venue,
864                        instruments,
865                        start_nanos,
866                        end_nanos,
867                        clock.get_time_ns(),
868                        params,
869                    ));
870                    if let Err(e) = sender.send(DataEvent::Response(response)) {
871                        log::error!("Failed to send instruments response: {e}");
872                    }
873                }
874                Err(e) => log::error!("Instrument request failed: {e:?}"),
875            }
876        });
877
878        Ok(())
879    }
880
881    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
882        if let Some(instrument) = self
883            .instruments
884            .read()
885            .expect("instrument cache lock poisoned")
886            .get(&request.instrument_id)
887            .cloned()
888        {
889            let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
890                request.request_id,
891                request.client_id.unwrap_or(self.client_id),
892                instrument.id(),
893                instrument,
894                datetime_to_unix_nanos(request.start),
895                datetime_to_unix_nanos(request.end),
896                self.clock.get_time_ns(),
897                request.params,
898            )));
899            if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
900                log::error!("Failed to send instrument response: {e}");
901            }
902            return Ok(());
903        }
904
905        let http_client = self.http_client.clone();
906        let instruments_cache = Arc::clone(&self.instruments);
907        let sender = self.data_sender.clone();
908        let instrument_id = request.instrument_id;
909        let request_id = request.request_id;
910        let client_id = request.client_id.unwrap_or(self.client_id);
911        let start = request.start;
912        let end = request.end;
913        let params = request.params;
914        let clock = self.clock;
915
916        get_runtime().spawn(async move {
917            match http_client
918                .request_instrument(instrument_id)
919                .await
920                .context("failed to request instrument from BitMEX")
921            {
922                Ok(Some(instrument)) => {
923                    http_client.cache_instrument(instrument.clone());
924                    {
925                        let mut guard = instruments_cache
926                            .write()
927                            .expect("instrument cache lock poisoned");
928                        guard.insert(instrument.id(), instrument.clone());
929                    }
930
931                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
932                        request_id,
933                        client_id,
934                        instrument.id(),
935                        instrument,
936                        datetime_to_unix_nanos(start),
937                        datetime_to_unix_nanos(end),
938                        clock.get_time_ns(),
939                        params,
940                    )));
941                    if let Err(e) = sender.send(DataEvent::Response(response)) {
942                        log::error!("Failed to send instrument response: {e}");
943                    }
944                }
945                Ok(None) => log::warn!("BitMEX instrument {instrument_id} not found"),
946                Err(e) => log::error!("Instrument request failed: {e:?}"),
947            }
948        });
949
950        Ok(())
951    }
952
953    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
954        let http = self.http_client.clone();
955        let sender = self.data_sender.clone();
956        let instrument_id = request.instrument_id;
957        let start = request.start;
958        let end = request.end;
959        let limit = request.limit.map(|n| n.get() as u32);
960        let request_id = request.request_id;
961        let client_id = request.client_id.unwrap_or(self.client_id);
962        let params = request.params;
963        let clock = self.clock;
964        let start_nanos = datetime_to_unix_nanos(start);
965        let end_nanos = datetime_to_unix_nanos(end);
966
967        get_runtime().spawn(async move {
968            match http
969                .request_trades(instrument_id, start, end, limit)
970                .await
971                .context("failed to request trades from BitMEX")
972            {
973                Ok(trades) => {
974                    let response = DataResponse::Trades(TradesResponse::new(
975                        request_id,
976                        client_id,
977                        instrument_id,
978                        trades,
979                        start_nanos,
980                        end_nanos,
981                        clock.get_time_ns(),
982                        params,
983                    ));
984                    if let Err(e) = sender.send(DataEvent::Response(response)) {
985                        log::error!("Failed to send trades response: {e}");
986                    }
987                }
988                Err(e) => log::error!("Trade request failed: {e:?}"),
989            }
990        });
991
992        Ok(())
993    }
994
995    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
996        let http = self.http_client.clone();
997        let sender = self.data_sender.clone();
998        let bar_type = request.bar_type;
999        let start = request.start;
1000        let end = request.end;
1001        let limit = request.limit.map(|n| n.get() as u32);
1002        let request_id = request.request_id;
1003        let client_id = request.client_id.unwrap_or(self.client_id);
1004        let params = request.params;
1005        let clock = self.clock;
1006        let start_nanos = datetime_to_unix_nanos(start);
1007        let end_nanos = datetime_to_unix_nanos(end);
1008
1009        get_runtime().spawn(async move {
1010            match http
1011                .request_bars(bar_type, start, end, limit, false)
1012                .await
1013                .context("failed to request bars from BitMEX")
1014            {
1015                Ok(bars) => {
1016                    let response = DataResponse::Bars(BarsResponse::new(
1017                        request_id,
1018                        client_id,
1019                        bar_type,
1020                        bars,
1021                        start_nanos,
1022                        end_nanos,
1023                        clock.get_time_ns(),
1024                        params,
1025                    ));
1026                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1027                        log::error!("Failed to send bars response: {e}");
1028                    }
1029                }
1030                Err(e) => log::error!("Bar request failed: {e:?}"),
1031            }
1032        });
1033
1034        Ok(())
1035    }
1036}