nautilus_bitmex/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the BitMEX adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc, RwLock,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use chrono::{DateTime, Utc};
29use futures_util::StreamExt;
30use nautilus_common::{
31    messages::{
32        DataEvent,
33        data::{
34            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
35            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
36            SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots,
37            SubscribeFundingRates, SubscribeIndexPrices, SubscribeMarkPrices, SubscribeQuotes,
38            SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
39            UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeFundingRates,
40            UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
41        },
42    },
43    runner::get_data_event_sender,
44};
45use nautilus_core::{
46    UnixNanos,
47    time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_data::client::DataClient;
50use nautilus_model::{
51    data::Data,
52    enums::BookType,
53    identifiers::{ClientId, InstrumentId, Venue},
54    instruments::{Instrument, InstrumentAny},
55};
56use tokio::{task::JoinHandle, time::Duration};
57use tokio_util::sync::CancellationToken;
58
59use crate::{
60    common::consts::BITMEX_VENUE,
61    config::BitmexDataClientConfig,
62    http::client::BitmexHttpClient,
63    websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
64};
65
66#[derive(Clone, Copy, Debug, Eq, PartialEq)]
67enum BitmexBookChannel {
68    OrderBookL2,
69    OrderBookL2_25,
70    OrderBook10,
71}
72
73#[derive(Debug)]
74pub struct BitmexDataClient {
75    client_id: ClientId,
76    config: BitmexDataClientConfig,
77    http_client: BitmexHttpClient,
78    ws_client: Option<BitmexWebSocketClient>,
79    is_connected: AtomicBool,
80    cancellation_token: CancellationToken,
81    tasks: Vec<JoinHandle<()>>,
82    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
83    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
84    book_channels: Arc<RwLock<AHashMap<InstrumentId, BitmexBookChannel>>>,
85    clock: &'static AtomicTime,
86    instrument_refresh_active: bool,
87}
88
89impl BitmexDataClient {
90    /// Creates a new [`BitmexDataClient`] instance.
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if the HTTP client cannot be constructed.
95    pub fn new(client_id: ClientId, config: BitmexDataClientConfig) -> anyhow::Result<Self> {
96        let clock = get_atomic_clock_realtime();
97        let data_sender = get_data_event_sender();
98
99        let http_client = BitmexHttpClient::new(
100            Some(config.http_base_url()),
101            config.api_key.clone(),
102            config.api_secret.clone(),
103            config.use_testnet,
104            config.http_timeout_secs,
105            config.max_retries,
106            config.retry_delay_initial_ms,
107            config.retry_delay_max_ms,
108            config.recv_window_ms,
109            config.max_requests_per_second,
110            config.max_requests_per_minute,
111            config.http_proxy_url.clone(),
112        )
113        .context("failed to construct BitMEX HTTP client")?;
114
115        Ok(Self {
116            client_id,
117            config,
118            http_client,
119            ws_client: None,
120            is_connected: AtomicBool::new(false),
121            cancellation_token: CancellationToken::new(),
122            tasks: Vec::new(),
123            data_sender,
124            instruments: Arc::new(RwLock::new(AHashMap::new())),
125            book_channels: Arc::new(RwLock::new(AHashMap::new())),
126            clock,
127            instrument_refresh_active: false,
128        })
129    }
130
131    fn venue(&self) -> Venue {
132        *BITMEX_VENUE
133    }
134
135    fn ws_client(&self) -> anyhow::Result<&BitmexWebSocketClient> {
136        self.ws_client
137            .as_ref()
138            .context("websocket client not initialized; call connect first")
139    }
140
141    fn ws_client_mut(&mut self) -> anyhow::Result<&mut BitmexWebSocketClient> {
142        self.ws_client
143            .as_mut()
144            .context("websocket client not initialized; call connect first")
145    }
146
147    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
148        if let Err(e) = sender.send(DataEvent::Data(data)) {
149            tracing::error!("Failed to emit data event: {e}");
150        }
151    }
152
153    fn spawn_ws<F>(&self, fut: F, context: &'static str)
154    where
155        F: Future<Output = anyhow::Result<()>> + Send + 'static,
156    {
157        tokio::spawn(async move {
158            if let Err(e) = fut.await {
159                tracing::error!("{context}: {e:?}");
160            }
161        });
162    }
163
164    fn spawn_stream_task(
165        &mut self,
166        stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
167    ) -> anyhow::Result<()> {
168        let data_sender = self.data_sender.clone();
169        let instruments = Arc::clone(&self.instruments);
170        let cancellation = self.cancellation_token.clone();
171
172        let handle = tokio::spawn(async move {
173            tokio::pin!(stream);
174
175            loop {
176                tokio::select! {
177                    maybe_msg = stream.next() => {
178                        match maybe_msg {
179                            Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
180                            None => {
181                                tracing::debug!("BitMEX websocket stream ended");
182                                break;
183                            }
184                        }
185                    }
186                    _ = cancellation.cancelled() => {
187                        tracing::debug!("BitMEX websocket stream task cancelled");
188                        break;
189                    }
190                }
191            }
192        });
193
194        self.tasks.push(handle);
195        Ok(())
196    }
197
198    fn handle_ws_message(
199        message: NautilusWsMessage,
200        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
201        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
202    ) {
203        match message {
204            NautilusWsMessage::Data(payloads) => {
205                for data in payloads {
206                    Self::send_data(sender, data);
207                }
208            }
209            NautilusWsMessage::Instruments(insts) => {
210                let mut guard = instruments.write().expect("instrument cache lock poisoned");
211                for instrument in insts {
212                    let instrument_id = instrument.id();
213                    guard.insert(instrument_id, instrument);
214                }
215                // TODO: Send instruments to data engine
216                let _ = sender;
217            }
218            NautilusWsMessage::FundingRateUpdates(updates) => {
219                for update in updates {
220                    tracing::debug!(
221                        instrument = %update.instrument_id,
222                        rate = %update.rate,
223                        "Funding rate update received (not forwarded)",
224                    );
225                }
226            }
227            NautilusWsMessage::OrderStatusReports(_)
228            | NautilusWsMessage::OrderUpdated(_)
229            | NautilusWsMessage::FillReports(_)
230            | NautilusWsMessage::PositionStatusReport(_)
231            | NautilusWsMessage::AccountState(_) => {
232                tracing::debug!("Ignoring trading message on data client");
233            }
234            NautilusWsMessage::Reconnected => {
235                tracing::info!("BitMEX websocket reconnected");
236            }
237            NautilusWsMessage::Authenticated => {
238                tracing::debug!("BitMEX websocket authenticated");
239            }
240        }
241    }
242
243    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
244        let http = self.http_client.clone();
245        let mut instruments = http
246            .request_instruments(self.config.active_only)
247            .await
248            .context("failed to request BitMEX instruments")?;
249
250        instruments.sort_by_key(|instrument| instrument.id());
251
252        {
253            let mut guard = self
254                .instruments
255                .write()
256                .expect("instrument cache lock poisoned");
257            guard.clear();
258            for instrument in &instruments {
259                guard.insert(instrument.id(), instrument.clone());
260            }
261        }
262
263        for instrument in &instruments {
264            self.http_client.cache_instrument(instrument.clone());
265        }
266
267        Ok(instruments)
268    }
269
270    fn is_connected(&self) -> bool {
271        self.is_connected.load(Ordering::Relaxed)
272    }
273
274    fn is_disconnected(&self) -> bool {
275        !self.is_connected()
276    }
277
278    fn maybe_spawn_instrument_refresh(&mut self) -> anyhow::Result<()> {
279        let Some(minutes) = self.config.update_instruments_interval_mins else {
280            return Ok(());
281        };
282
283        if minutes == 0 || self.instrument_refresh_active {
284            return Ok(());
285        }
286
287        let interval_secs = minutes.saturating_mul(60);
288        if interval_secs == 0 {
289            return Ok(());
290        }
291
292        let interval = Duration::from_secs(interval_secs);
293        let cancellation = self.cancellation_token.clone();
294        let instruments_cache = Arc::clone(&self.instruments);
295        let active_only = self.config.active_only;
296        let client_id = self.client_id;
297        let http_client = self.http_client.clone();
298
299        let handle = tokio::spawn(async move {
300            let http_client = http_client;
301            loop {
302                let sleep = tokio::time::sleep(interval);
303                tokio::pin!(sleep);
304                tokio::select! {
305                    _ = cancellation.cancelled() => {
306                        tracing::debug!("BitMEX instrument refresh task cancelled");
307                        break;
308                    }
309                    _ = &mut sleep => {
310                        match http_client.request_instruments(active_only).await {
311                            Ok(mut instruments) => {
312                                instruments.sort_by_key(|instrument| instrument.id());
313
314                                {
315                                    let mut guard = instruments_cache
316                                        .write()
317                                        .expect("instrument cache lock poisoned");
318                                    guard.clear();
319                                    for instrument in instruments.iter() {
320                                        guard.insert(instrument.id(), instrument.clone());
321                                    }
322                                }
323
324                                for instrument in instruments {
325                                    http_client.cache_instrument(instrument);
326                                }
327
328                                tracing::debug!(client_id=%client_id, "BitMEX instruments refreshed");
329                            }
330                            Err(e) => {
331                                tracing::warn!(client_id=%client_id, error=?e, "Failed to refresh BitMEX instruments");
332                            }
333                        }
334                    }
335                }
336            }
337        });
338
339        self.tasks.push(handle);
340        self.instrument_refresh_active = true;
341        Ok(())
342    }
343}
344
345fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
346    value
347        .and_then(|dt| dt.timestamp_nanos_opt())
348        .and_then(|nanos| u64::try_from(nanos).ok())
349        .map(UnixNanos::from)
350}
351
352#[async_trait::async_trait]
353impl DataClient for BitmexDataClient {
354    fn client_id(&self) -> ClientId {
355        self.client_id
356    }
357
358    fn venue(&self) -> Option<Venue> {
359        Some(self.venue())
360    }
361
362    fn start(&mut self) -> anyhow::Result<()> {
363        tracing::info!(
364            client_id = %self.client_id,
365            use_testnet = self.config.use_testnet,
366            http_proxy_url = ?self.config.http_proxy_url,
367            ws_proxy_url = ?self.config.ws_proxy_url,
368            "Starting BitMEX data client"
369        );
370        Ok(())
371    }
372
373    fn stop(&mut self) -> anyhow::Result<()> {
374        tracing::info!("Stopping BitMEX data client {id}", id = self.client_id);
375        self.cancellation_token.cancel();
376        self.is_connected.store(false, Ordering::Relaxed);
377        self.instrument_refresh_active = false;
378        Ok(())
379    }
380
381    fn reset(&mut self) -> anyhow::Result<()> {
382        tracing::debug!("Resetting BitMEX data client {id}", id = self.client_id);
383        self.is_connected.store(false, Ordering::Relaxed);
384        self.cancellation_token = CancellationToken::new();
385        self.tasks.clear();
386        self.book_channels
387            .write()
388            .expect("book channel cache lock poisoned")
389            .clear();
390        self.instrument_refresh_active = false;
391        Ok(())
392    }
393
394    fn dispose(&mut self) -> anyhow::Result<()> {
395        self.stop()
396    }
397
398    async fn connect(&mut self) -> anyhow::Result<()> {
399        if self.is_connected() {
400            return Ok(());
401        }
402
403        if self.ws_client.is_none() {
404            let ws = BitmexWebSocketClient::new(
405                Some(self.config.ws_url()),
406                self.config.api_key.clone(),
407                self.config.api_secret.clone(),
408                None,
409                self.config.heartbeat_interval_secs,
410            )
411            .context("failed to construct BitMEX websocket client")?;
412            self.ws_client = Some(ws);
413        }
414
415        let instruments = self.bootstrap_instruments().await?;
416        if let Some(ws) = self.ws_client.as_mut() {
417            ws.cache_instruments(instruments);
418        }
419
420        let ws = self.ws_client_mut()?;
421        ws.connect()
422            .await
423            .context("failed to connect BitMEX websocket")?;
424        ws.wait_until_active(10.0)
425            .await
426            .context("BitMEX websocket did not become active")?;
427
428        let stream = ws.stream();
429        self.spawn_stream_task(stream)?;
430        self.maybe_spawn_instrument_refresh()?;
431
432        self.is_connected.store(true, Ordering::Relaxed);
433        tracing::info!("Connected");
434        Ok(())
435    }
436
437    async fn disconnect(&mut self) -> anyhow::Result<()> {
438        if self.is_disconnected() {
439            return Ok(());
440        }
441
442        self.cancellation_token.cancel();
443
444        if let Some(ws) = self.ws_client.as_mut()
445            && let Err(e) = ws.close().await
446        {
447            tracing::warn!("Error while closing BitMEX websocket: {e:?}");
448        }
449
450        for handle in self.tasks.drain(..) {
451            if let Err(e) = handle.await {
452                tracing::error!("Error joining websocket task: {e:?}");
453            }
454        }
455
456        self.cancellation_token = CancellationToken::new();
457        self.is_connected.store(false, Ordering::Relaxed);
458        self.book_channels
459            .write()
460            .expect("book channel cache lock poisoned")
461            .clear();
462        self.instrument_refresh_active = false;
463
464        tracing::info!("Disconnected");
465        Ok(())
466    }
467
468    fn is_connected(&self) -> bool {
469        self.is_connected()
470    }
471
472    fn is_disconnected(&self) -> bool {
473        self.is_disconnected()
474    }
475
476    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
477        if cmd.book_type != BookType::L2_MBP {
478            anyhow::bail!("BitMEX only supports L2_MBP order book deltas");
479        }
480
481        let instrument_id = cmd.instrument_id;
482        let depth = cmd.depth.map_or(0, |d| d.get());
483        let channel = if depth > 0 && depth <= 25 {
484            BitmexBookChannel::OrderBookL2_25
485        } else {
486            BitmexBookChannel::OrderBookL2
487        };
488
489        let ws = self.ws_client()?.clone();
490        let book_channels = Arc::clone(&self.book_channels);
491
492        self.spawn_ws(
493            async move {
494                match channel {
495                    BitmexBookChannel::OrderBookL2 => ws
496                        .subscribe_book(instrument_id)
497                        .await
498                        .map_err(|err| anyhow::anyhow!(err))?,
499                    BitmexBookChannel::OrderBookL2_25 => ws
500                        .subscribe_book_25(instrument_id)
501                        .await
502                        .map_err(|err| anyhow::anyhow!(err))?,
503                    BitmexBookChannel::OrderBook10 => unreachable!(),
504                }
505                book_channels
506                    .write()
507                    .expect("book channel cache lock poisoned")
508                    .insert(instrument_id, channel);
509                Ok(())
510            },
511            "BitMEX book delta subscription",
512        );
513
514        Ok(())
515    }
516
517    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
518        let instrument_id = cmd.instrument_id;
519        let ws = self.ws_client()?.clone();
520        let book_channels = Arc::clone(&self.book_channels);
521
522        self.spawn_ws(
523            async move {
524                ws.subscribe_book_depth10(instrument_id)
525                    .await
526                    .map_err(|err| anyhow::anyhow!(err))?;
527                book_channels
528                    .write()
529                    .expect("book channel cache lock poisoned")
530                    .insert(instrument_id, BitmexBookChannel::OrderBook10);
531                Ok(())
532            },
533            "BitMEX book depth10 subscription",
534        );
535        Ok(())
536    }
537
538    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
539        if cmd.book_type != BookType::L2_MBP {
540            anyhow::bail!("BitMEX only supports L2_MBP order book snapshots");
541        }
542
543        let depth = cmd.depth.map_or(10, |d| d.get());
544        if depth != 10 {
545            tracing::warn!("BitMEX orderBook10 provides 10 levels; requested depth={depth}");
546        }
547
548        let instrument_id = cmd.instrument_id;
549        let ws = self.ws_client()?.clone();
550        let book_channels = Arc::clone(&self.book_channels);
551
552        self.spawn_ws(
553            async move {
554                ws.subscribe_book_depth10(instrument_id)
555                    .await
556                    .map_err(|err| anyhow::anyhow!(err))?;
557                book_channels
558                    .write()
559                    .expect("book channel cache lock poisoned")
560                    .insert(instrument_id, BitmexBookChannel::OrderBook10);
561                Ok(())
562            },
563            "BitMEX book snapshot subscription",
564        );
565        Ok(())
566    }
567
568    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
569        let instrument_id = cmd.instrument_id;
570        let ws = self.ws_client()?.clone();
571
572        self.spawn_ws(
573            async move {
574                ws.subscribe_quotes(instrument_id)
575                    .await
576                    .map_err(|err| anyhow::anyhow!(err))
577            },
578            "BitMEX quote subscription",
579        );
580        Ok(())
581    }
582
583    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
584        let instrument_id = cmd.instrument_id;
585        let ws = self.ws_client()?.clone();
586
587        self.spawn_ws(
588            async move {
589                ws.subscribe_trades(instrument_id)
590                    .await
591                    .map_err(|err| anyhow::anyhow!(err))
592            },
593            "BitMEX trade subscription",
594        );
595        Ok(())
596    }
597
598    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
599        let instrument_id = cmd.instrument_id;
600        let ws = self.ws_client()?.clone();
601
602        self.spawn_ws(
603            async move {
604                ws.subscribe_mark_prices(instrument_id)
605                    .await
606                    .map_err(|err| anyhow::anyhow!(err))
607            },
608            "BitMEX mark price subscription",
609        );
610        Ok(())
611    }
612
613    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
614        let instrument_id = cmd.instrument_id;
615        let ws = self.ws_client()?.clone();
616
617        self.spawn_ws(
618            async move {
619                ws.subscribe_index_prices(instrument_id)
620                    .await
621                    .map_err(|err| anyhow::anyhow!(err))
622            },
623            "BitMEX index price subscription",
624        );
625        Ok(())
626    }
627
628    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
629        let instrument_id = cmd.instrument_id;
630        let ws = self.ws_client()?.clone();
631
632        self.spawn_ws(
633            async move {
634                ws.subscribe_funding_rates(instrument_id)
635                    .await
636                    .map_err(|err| anyhow::anyhow!(err))
637            },
638            "BitMEX funding rate subscription",
639        );
640        Ok(())
641    }
642
643    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
644        let bar_type = cmd.bar_type;
645        let ws = self.ws_client()?.clone();
646
647        self.spawn_ws(
648            async move {
649                ws.subscribe_bars(bar_type)
650                    .await
651                    .map_err(|err| anyhow::anyhow!(err))
652            },
653            "BitMEX bar subscription",
654        );
655        Ok(())
656    }
657
658    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
659        let instrument_id = cmd.instrument_id;
660        let ws = self.ws_client()?.clone();
661        let book_channels = Arc::clone(&self.book_channels);
662
663        self.spawn_ws(
664            async move {
665                let channel = book_channels
666                    .write()
667                    .expect("book channel cache lock poisoned")
668                    .remove(&instrument_id);
669
670                match channel {
671                    Some(BitmexBookChannel::OrderBookL2) => ws
672                        .unsubscribe_book(instrument_id)
673                        .await
674                        .map_err(|err| anyhow::anyhow!(err))?,
675                    Some(BitmexBookChannel::OrderBookL2_25) => ws
676                        .unsubscribe_book_25(instrument_id)
677                        .await
678                        .map_err(|err| anyhow::anyhow!(err))?,
679                    Some(BitmexBookChannel::OrderBook10) | None => ws
680                        .unsubscribe_book(instrument_id)
681                        .await
682                        .map_err(|err| anyhow::anyhow!(err))?,
683                }
684                Ok(())
685            },
686            "BitMEX book delta unsubscribe",
687        );
688        Ok(())
689    }
690
691    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
692        let instrument_id = cmd.instrument_id;
693        let ws = self.ws_client()?.clone();
694        let book_channels = Arc::clone(&self.book_channels);
695
696        self.spawn_ws(
697            async move {
698                book_channels
699                    .write()
700                    .expect("book channel cache lock poisoned")
701                    .remove(&instrument_id);
702                ws.unsubscribe_book_depth10(instrument_id)
703                    .await
704                    .map_err(|err| anyhow::anyhow!(err))
705            },
706            "BitMEX book depth10 unsubscribe",
707        );
708        Ok(())
709    }
710
711    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
712        let instrument_id = cmd.instrument_id;
713        let ws = self.ws_client()?.clone();
714        let book_channels = Arc::clone(&self.book_channels);
715
716        self.spawn_ws(
717            async move {
718                book_channels
719                    .write()
720                    .expect("book channel cache lock poisoned")
721                    .remove(&instrument_id);
722                ws.unsubscribe_book_depth10(instrument_id)
723                    .await
724                    .map_err(|err| anyhow::anyhow!(err))
725            },
726            "BitMEX book snapshot unsubscribe",
727        );
728        Ok(())
729    }
730
731    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
732        let instrument_id = cmd.instrument_id;
733        let ws = self.ws_client()?.clone();
734
735        self.spawn_ws(
736            async move {
737                ws.unsubscribe_quotes(instrument_id)
738                    .await
739                    .map_err(|err| anyhow::anyhow!(err))
740            },
741            "BitMEX quote unsubscribe",
742        );
743        Ok(())
744    }
745
746    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
747        let instrument_id = cmd.instrument_id;
748        let ws = self.ws_client()?.clone();
749
750        self.spawn_ws(
751            async move {
752                ws.unsubscribe_trades(instrument_id)
753                    .await
754                    .map_err(|err| anyhow::anyhow!(err))
755            },
756            "BitMEX trade unsubscribe",
757        );
758        Ok(())
759    }
760
761    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
762        let ws = self.ws_client()?.clone();
763        let instrument_id = cmd.instrument_id;
764
765        self.spawn_ws(
766            async move {
767                ws.unsubscribe_mark_prices(instrument_id)
768                    .await
769                    .map_err(|err| anyhow::anyhow!(err))
770            },
771            "BitMEX mark price unsubscribe",
772        );
773        Ok(())
774    }
775
776    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
777        let ws = self.ws_client()?.clone();
778        let instrument_id = cmd.instrument_id;
779
780        self.spawn_ws(
781            async move {
782                ws.unsubscribe_index_prices(instrument_id)
783                    .await
784                    .map_err(|err| anyhow::anyhow!(err))
785            },
786            "BitMEX index price unsubscribe",
787        );
788        Ok(())
789    }
790
791    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
792        let ws = self.ws_client()?.clone();
793        let instrument_id = cmd.instrument_id;
794
795        self.spawn_ws(
796            async move {
797                ws.unsubscribe_funding_rates(instrument_id)
798                    .await
799                    .map_err(|err| anyhow::anyhow!(err))
800            },
801            "BitMEX funding rate unsubscribe",
802        );
803        Ok(())
804    }
805
806    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
807        let bar_type = cmd.bar_type;
808        let ws = self.ws_client()?.clone();
809
810        self.spawn_ws(
811            async move {
812                ws.unsubscribe_bars(bar_type)
813                    .await
814                    .map_err(|err| anyhow::anyhow!(err))
815            },
816            "BitMEX bar unsubscribe",
817        );
818        Ok(())
819    }
820
821    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
822        let venue = request.venue.unwrap_or_else(|| self.venue());
823        if let Some(req_venue) = request.venue
824            && req_venue != self.venue()
825        {
826            tracing::warn!("Ignoring mismatched venue in instruments request: {req_venue}");
827        }
828
829        let http = self.http_client.clone();
830        let instruments_cache = Arc::clone(&self.instruments);
831        let sender = self.data_sender.clone();
832        let request_id = request.request_id;
833        let client_id = request.client_id.unwrap_or(self.client_id);
834        let params = request.params.clone();
835        let start_nanos = datetime_to_unix_nanos(request.start);
836        let end_nanos = datetime_to_unix_nanos(request.end);
837        let clock = self.clock;
838        let active_only = self.config.active_only;
839
840        tokio::spawn(async move {
841            let http_client = http;
842            match http_client
843                .request_instruments(active_only)
844                .await
845                .context("failed to request instruments from BitMEX")
846            {
847                Ok(instruments) => {
848                    {
849                        let mut guard = instruments_cache
850                            .write()
851                            .expect("instrument cache lock poisoned");
852                        guard.clear();
853                        for instrument in &instruments {
854                            guard.insert(instrument.id(), instrument.clone());
855                            http_client.cache_instrument(instrument.clone());
856                        }
857                    }
858
859                    let response = DataResponse::Instruments(InstrumentsResponse::new(
860                        request_id,
861                        client_id,
862                        venue,
863                        instruments,
864                        start_nanos,
865                        end_nanos,
866                        clock.get_time_ns(),
867                        params,
868                    ));
869                    if let Err(e) = sender.send(DataEvent::Response(response)) {
870                        tracing::error!("Failed to send instruments response: {e}");
871                    }
872                }
873                Err(e) => tracing::error!("Instrument request failed: {e:?}"),
874            }
875        });
876
877        Ok(())
878    }
879
880    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
881        if let Some(instrument) = self
882            .instruments
883            .read()
884            .expect("instrument cache lock poisoned")
885            .get(&request.instrument_id)
886            .cloned()
887        {
888            let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
889                request.request_id,
890                request.client_id.unwrap_or(self.client_id),
891                instrument.id(),
892                instrument,
893                datetime_to_unix_nanos(request.start),
894                datetime_to_unix_nanos(request.end),
895                self.clock.get_time_ns(),
896                request.params.clone(),
897            )));
898            if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
899                tracing::error!("Failed to send instrument response: {e}");
900            }
901            return Ok(());
902        }
903
904        let http_client = self.http_client.clone();
905        let instruments_cache = Arc::clone(&self.instruments);
906        let sender = self.data_sender.clone();
907        let instrument_id = request.instrument_id;
908        let request_id = request.request_id;
909        let client_id = request.client_id.unwrap_or(self.client_id);
910        let start = request.start;
911        let end = request.end;
912        let params = request.params.clone();
913        let clock = self.clock;
914
915        tokio::spawn(async move {
916            match http_client
917                .request_instrument(instrument_id)
918                .await
919                .context("failed to request instrument from BitMEX")
920            {
921                Ok(Some(instrument)) => {
922                    http_client.cache_instrument(instrument.clone());
923                    {
924                        let mut guard = instruments_cache
925                            .write()
926                            .expect("instrument cache lock poisoned");
927                        guard.insert(instrument.id(), instrument.clone());
928                    }
929
930                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
931                        request_id,
932                        client_id,
933                        instrument.id(),
934                        instrument,
935                        datetime_to_unix_nanos(start),
936                        datetime_to_unix_nanos(end),
937                        clock.get_time_ns(),
938                        params,
939                    )));
940                    if let Err(e) = sender.send(DataEvent::Response(response)) {
941                        tracing::error!("Failed to send instrument response: {e}");
942                    }
943                }
944                Ok(None) => tracing::warn!("BitMEX instrument {instrument_id} not found"),
945                Err(e) => tracing::error!("Instrument request failed: {e:?}"),
946            }
947        });
948
949        Ok(())
950    }
951
952    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
953        let http = self.http_client.clone();
954        let sender = self.data_sender.clone();
955        let instrument_id = request.instrument_id;
956        let start = request.start;
957        let end = request.end;
958        let limit = request.limit.map(|n| n.get() as u32);
959        let request_id = request.request_id;
960        let client_id = request.client_id.unwrap_or(self.client_id);
961        let params = request.params.clone();
962        let clock = self.clock;
963        let start_nanos = datetime_to_unix_nanos(start);
964        let end_nanos = datetime_to_unix_nanos(end);
965
966        tokio::spawn(async move {
967            match http
968                .request_trades(instrument_id, start, end, limit)
969                .await
970                .context("failed to request trades from BitMEX")
971            {
972                Ok(trades) => {
973                    let response = DataResponse::Trades(TradesResponse::new(
974                        request_id,
975                        client_id,
976                        instrument_id,
977                        trades,
978                        start_nanos,
979                        end_nanos,
980                        clock.get_time_ns(),
981                        params,
982                    ));
983                    if let Err(e) = sender.send(DataEvent::Response(response)) {
984                        tracing::error!("Failed to send trades response: {e}");
985                    }
986                }
987                Err(e) => tracing::error!("Trade request failed: {e:?}"),
988            }
989        });
990
991        Ok(())
992    }
993
994    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
995        let http = self.http_client.clone();
996        let sender = self.data_sender.clone();
997        let bar_type = request.bar_type;
998        let start = request.start;
999        let end = request.end;
1000        let limit = request.limit.map(|n| n.get() as u32);
1001        let request_id = request.request_id;
1002        let client_id = request.client_id.unwrap_or(self.client_id);
1003        let params = request.params.clone();
1004        let clock = self.clock;
1005        let start_nanos = datetime_to_unix_nanos(start);
1006        let end_nanos = datetime_to_unix_nanos(end);
1007
1008        tokio::spawn(async move {
1009            match http
1010                .request_bars(bar_type, start, end, limit, false)
1011                .await
1012                .context("failed to request bars from BitMEX")
1013            {
1014                Ok(bars) => {
1015                    let response = DataResponse::Bars(BarsResponse::new(
1016                        request_id,
1017                        client_id,
1018                        bar_type,
1019                        bars,
1020                        start_nanos,
1021                        end_nanos,
1022                        clock.get_time_ns(),
1023                        params,
1024                    ));
1025                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1026                        tracing::error!("Failed to send bars response: {e}");
1027                    }
1028                }
1029                Err(e) => tracing::error!("Bar request failed: {e:?}"),
1030            }
1031        });
1032
1033        Ok(())
1034    }
1035}