nautilus_bitmex/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the BitMEX adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc, RwLock,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use chrono::{DateTime, Utc};
29use futures_util::StreamExt;
30use nautilus_common::{
31    messages::{
32        DataEvent,
33        data::{
34            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
35            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
36            SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots,
37            SubscribeFundingRates, SubscribeIndexPrices, SubscribeMarkPrices, SubscribeQuotes,
38            SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
39            UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeFundingRates,
40            UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
41        },
42    },
43    runner::get_data_event_sender,
44};
45use nautilus_core::{
46    UnixNanos,
47    time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_data::client::DataClient;
50use nautilus_model::{
51    data::Data,
52    enums::BookType,
53    identifiers::{ClientId, InstrumentId, Venue},
54    instruments::{Instrument, InstrumentAny},
55};
56use tokio::{task::JoinHandle, time::Duration};
57use tokio_util::sync::CancellationToken;
58
59use crate::{
60    common::consts::BITMEX_VENUE,
61    config::BitmexDataClientConfig,
62    http::client::BitmexHttpClient,
63    websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
64};
65
66#[derive(Clone, Copy, Debug, Eq, PartialEq)]
67enum BitmexBookChannel {
68    OrderBookL2,
69    OrderBookL2_25,
70    OrderBook10,
71}
72
73#[derive(Debug)]
74pub struct BitmexDataClient {
75    client_id: ClientId,
76    config: BitmexDataClientConfig,
77    http_client: BitmexHttpClient,
78    ws_client: Option<BitmexWebSocketClient>,
79    is_connected: AtomicBool,
80    cancellation_token: CancellationToken,
81    tasks: Vec<JoinHandle<()>>,
82    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
83    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
84    book_channels: Arc<RwLock<AHashMap<InstrumentId, BitmexBookChannel>>>,
85    clock: &'static AtomicTime,
86    instrument_refresh_active: bool,
87}
88
89impl BitmexDataClient {
90    /// Creates a new [`BitmexDataClient`] instance.
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if the HTTP client cannot be constructed.
95    pub fn new(client_id: ClientId, config: BitmexDataClientConfig) -> anyhow::Result<Self> {
96        let clock = get_atomic_clock_realtime();
97        let data_sender = get_data_event_sender();
98
99        let http_client = BitmexHttpClient::new(
100            Some(config.http_base_url()),
101            config.api_key.clone(),
102            config.api_secret.clone(),
103            config.use_testnet,
104            config.http_timeout_secs,
105            config.max_retries,
106            config.retry_delay_initial_ms,
107            config.retry_delay_max_ms,
108            config.recv_window_ms,
109            config.max_requests_per_second,
110            config.max_requests_per_minute,
111        )
112        .context("failed to construct BitMEX HTTP client")?;
113
114        Ok(Self {
115            client_id,
116            config,
117            http_client,
118            ws_client: None,
119            is_connected: AtomicBool::new(false),
120            cancellation_token: CancellationToken::new(),
121            tasks: Vec::new(),
122            data_sender,
123            instruments: Arc::new(RwLock::new(AHashMap::new())),
124            book_channels: Arc::new(RwLock::new(AHashMap::new())),
125            clock,
126            instrument_refresh_active: false,
127        })
128    }
129
130    fn venue(&self) -> Venue {
131        *BITMEX_VENUE
132    }
133
134    fn ws_client(&self) -> anyhow::Result<&BitmexWebSocketClient> {
135        self.ws_client
136            .as_ref()
137            .context("websocket client not initialized; call connect first")
138    }
139
140    fn ws_client_mut(&mut self) -> anyhow::Result<&mut BitmexWebSocketClient> {
141        self.ws_client
142            .as_mut()
143            .context("websocket client not initialized; call connect first")
144    }
145
146    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
147        if let Err(err) = sender.send(DataEvent::Data(data)) {
148            tracing::error!("Failed to emit data event: {err}");
149        }
150    }
151
152    fn spawn_ws<F>(&self, fut: F, context: &'static str)
153    where
154        F: Future<Output = anyhow::Result<()>> + Send + 'static,
155    {
156        tokio::spawn(async move {
157            if let Err(err) = fut.await {
158                tracing::error!("{context}: {err:?}");
159            }
160        });
161    }
162
163    fn spawn_stream_task(
164        &mut self,
165        stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
166    ) -> anyhow::Result<()> {
167        let data_sender = self.data_sender.clone();
168        let instruments = Arc::clone(&self.instruments);
169        let cancellation = self.cancellation_token.clone();
170
171        let handle = tokio::spawn(async move {
172            tokio::pin!(stream);
173
174            loop {
175                tokio::select! {
176                    maybe_msg = stream.next() => {
177                        match maybe_msg {
178                            Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
179                            None => {
180                                tracing::debug!("BitMEX websocket stream ended");
181                                break;
182                            }
183                        }
184                    }
185                    _ = cancellation.cancelled() => {
186                        tracing::debug!("BitMEX websocket stream task cancelled");
187                        break;
188                    }
189                }
190            }
191        });
192
193        self.tasks.push(handle);
194        Ok(())
195    }
196
197    fn handle_ws_message(
198        message: NautilusWsMessage,
199        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
200        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
201    ) {
202        match message {
203            NautilusWsMessage::Data(payloads) => {
204                for data in payloads {
205                    Self::send_data(sender, data);
206                }
207            }
208            NautilusWsMessage::FundingRateUpdates(updates) => {
209                for update in updates {
210                    tracing::debug!(
211                        instrument = %update.instrument_id,
212                        rate = %update.rate,
213                        "Funding rate update received (not forwarded)",
214                    );
215                }
216            }
217            NautilusWsMessage::OrderStatusReports(_)
218            | NautilusWsMessage::OrderUpdated(_)
219            | NautilusWsMessage::FillReports(_)
220            | NautilusWsMessage::PositionStatusReport(_)
221            | NautilusWsMessage::AccountState(_) => {
222                tracing::debug!("Ignoring trading message on data client");
223            }
224            NautilusWsMessage::Reconnected => {
225                tracing::info!("BitMEX websocket reconnected");
226            }
227        }
228
229        // Instrument updates arrive via the REST bootstrap. Keep the argument alive so Clippy
230        // doesn't flag the unused parameter warning when we expand handling later.
231        let _ = instruments;
232    }
233
234    async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
235        let http = self.http_client.clone();
236        let mut instruments = http
237            .request_instruments(self.config.active_only)
238            .await
239            .context("failed to request BitMEX instruments")?;
240
241        instruments.sort_by_key(|instrument| instrument.id());
242
243        {
244            let mut guard = self
245                .instruments
246                .write()
247                .expect("instrument cache lock poisoned");
248            guard.clear();
249            for instrument in &instruments {
250                guard.insert(instrument.id(), instrument.clone());
251            }
252        }
253
254        for instrument in &instruments {
255            self.http_client.add_instrument(instrument.clone());
256        }
257
258        Ok(instruments)
259    }
260
261    fn is_connected(&self) -> bool {
262        self.is_connected.load(Ordering::Relaxed)
263    }
264
265    fn is_disconnected(&self) -> bool {
266        !self.is_connected()
267    }
268
269    fn maybe_spawn_instrument_refresh(&mut self) -> anyhow::Result<()> {
270        let Some(minutes) = self.config.update_instruments_interval_mins else {
271            return Ok(());
272        };
273
274        if minutes == 0 || self.instrument_refresh_active {
275            return Ok(());
276        }
277
278        let interval_secs = minutes.saturating_mul(60);
279        if interval_secs == 0 {
280            return Ok(());
281        }
282
283        let interval = Duration::from_secs(interval_secs);
284        let cancellation = self.cancellation_token.clone();
285        let instruments_cache = Arc::clone(&self.instruments);
286        let active_only = self.config.active_only;
287        let client_id = self.client_id;
288        let http_client = self.http_client.clone();
289
290        let handle = tokio::spawn(async move {
291            let http_client = http_client;
292            loop {
293                let sleep = tokio::time::sleep(interval);
294                tokio::pin!(sleep);
295                tokio::select! {
296                    _ = cancellation.cancelled() => {
297                        tracing::debug!("BitMEX instrument refresh task cancelled");
298                        break;
299                    }
300                    _ = &mut sleep => {
301                        match http_client.request_instruments(active_only).await {
302                            Ok(mut instruments) => {
303                                instruments.sort_by_key(|instrument| instrument.id());
304
305                                {
306                                    let mut guard = instruments_cache
307                                        .write()
308                                        .expect("instrument cache lock poisoned");
309                                    guard.clear();
310                                    for instrument in instruments.iter() {
311                                        guard.insert(instrument.id(), instrument.clone());
312                                    }
313                                }
314
315                                for instrument in instruments {
316                                    http_client.add_instrument(instrument);
317                                }
318
319                                tracing::debug!(client_id=%client_id, "BitMEX instruments refreshed");
320                            }
321                            Err(err) => {
322                                tracing::warn!(client_id=%client_id, error=?err, "Failed to refresh BitMEX instruments");
323                            }
324                        }
325                    }
326                }
327            }
328        });
329
330        self.tasks.push(handle);
331        self.instrument_refresh_active = true;
332        Ok(())
333    }
334}
335
336fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
337    value
338        .and_then(|dt| dt.timestamp_nanos_opt())
339        .and_then(|nanos| u64::try_from(nanos).ok())
340        .map(UnixNanos::from)
341}
342
343#[async_trait::async_trait]
344impl DataClient for BitmexDataClient {
345    fn client_id(&self) -> ClientId {
346        self.client_id
347    }
348
349    fn venue(&self) -> Option<Venue> {
350        Some(self.venue())
351    }
352
353    fn start(&mut self) -> anyhow::Result<()> {
354        tracing::info!("Starting BitMEX data client {id}", id = self.client_id);
355        Ok(())
356    }
357
358    fn stop(&mut self) -> anyhow::Result<()> {
359        tracing::info!("Stopping BitMEX data client {id}", id = self.client_id);
360        self.cancellation_token.cancel();
361        self.is_connected.store(false, Ordering::Relaxed);
362        self.instrument_refresh_active = false;
363        Ok(())
364    }
365
366    fn reset(&mut self) -> anyhow::Result<()> {
367        tracing::debug!("Resetting BitMEX data client {id}", id = self.client_id);
368        self.is_connected.store(false, Ordering::Relaxed);
369        self.cancellation_token = CancellationToken::new();
370        self.tasks.clear();
371        self.book_channels
372            .write()
373            .expect("book channel cache lock poisoned")
374            .clear();
375        self.instrument_refresh_active = false;
376        Ok(())
377    }
378
379    fn dispose(&mut self) -> anyhow::Result<()> {
380        self.stop()
381    }
382
383    async fn connect(&mut self) -> anyhow::Result<()> {
384        if self.is_connected() {
385            return Ok(());
386        }
387
388        if self.ws_client.is_none() {
389            let ws = BitmexWebSocketClient::new(
390                Some(self.config.ws_url()),
391                self.config.api_key.clone(),
392                self.config.api_secret.clone(),
393                None,
394                self.config.heartbeat_interval_secs,
395            )
396            .context("failed to construct BitMEX websocket client")?;
397            self.ws_client = Some(ws);
398        }
399
400        let instruments = self.bootstrap_instruments().await?;
401        if let Some(ws) = self.ws_client.as_mut() {
402            ws.initialize_instruments_cache(instruments);
403        }
404
405        let ws = self.ws_client_mut()?;
406        ws.connect()
407            .await
408            .context("failed to connect BitMEX websocket")?;
409        ws.wait_until_active(10.0)
410            .await
411            .context("BitMEX websocket did not become active")?;
412
413        let stream = ws.stream();
414        self.spawn_stream_task(stream)?;
415        self.maybe_spawn_instrument_refresh()?;
416
417        self.is_connected.store(true, Ordering::Relaxed);
418        tracing::info!("BitMEX data client connected");
419        Ok(())
420    }
421
422    async fn disconnect(&mut self) -> anyhow::Result<()> {
423        if self.is_disconnected() {
424            return Ok(());
425        }
426
427        self.cancellation_token.cancel();
428
429        if let Some(ws) = self.ws_client.as_mut()
430            && let Err(err) = ws.close().await
431        {
432            tracing::warn!("Error while closing BitMEX websocket: {err:?}");
433        }
434
435        for handle in self.tasks.drain(..) {
436            if let Err(err) = handle.await {
437                tracing::error!("Error joining websocket task: {err:?}");
438            }
439        }
440
441        self.cancellation_token = CancellationToken::new();
442        self.is_connected.store(false, Ordering::Relaxed);
443        self.book_channels
444            .write()
445            .expect("book channel cache lock poisoned")
446            .clear();
447        self.instrument_refresh_active = false;
448
449        tracing::info!("BitMEX data client disconnected");
450        Ok(())
451    }
452
453    fn is_connected(&self) -> bool {
454        self.is_connected()
455    }
456
457    fn is_disconnected(&self) -> bool {
458        self.is_disconnected()
459    }
460
461    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
462        if cmd.book_type != BookType::L2_MBP {
463            anyhow::bail!("BitMEX only supports L2_MBP order book deltas");
464        }
465
466        let instrument_id = cmd.instrument_id;
467        let depth = cmd.depth.map(|d| d.get()).unwrap_or(0);
468        let channel = if depth > 0 && depth <= 25 {
469            BitmexBookChannel::OrderBookL2_25
470        } else {
471            BitmexBookChannel::OrderBookL2
472        };
473
474        let ws = self.ws_client()?.clone();
475        let book_channels = Arc::clone(&self.book_channels);
476        self.spawn_ws(
477            async move {
478                match channel {
479                    BitmexBookChannel::OrderBookL2 => ws
480                        .subscribe_book(instrument_id)
481                        .await
482                        .map_err(|err| anyhow::anyhow!(err))?,
483                    BitmexBookChannel::OrderBookL2_25 => ws
484                        .subscribe_book_25(instrument_id)
485                        .await
486                        .map_err(|err| anyhow::anyhow!(err))?,
487                    BitmexBookChannel::OrderBook10 => unreachable!(),
488                }
489                book_channels
490                    .write()
491                    .expect("book channel cache lock poisoned")
492                    .insert(instrument_id, channel);
493                Ok(())
494            },
495            "BitMEX book delta subscription",
496        );
497
498        Ok(())
499    }
500
501    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
502        let instrument_id = cmd.instrument_id;
503        let ws = self.ws_client()?.clone();
504        let book_channels = Arc::clone(&self.book_channels);
505        self.spawn_ws(
506            async move {
507                ws.subscribe_book_depth10(instrument_id)
508                    .await
509                    .map_err(|err| anyhow::anyhow!(err))?;
510                book_channels
511                    .write()
512                    .expect("book channel cache lock poisoned")
513                    .insert(instrument_id, BitmexBookChannel::OrderBook10);
514                Ok(())
515            },
516            "BitMEX book depth10 subscription",
517        );
518        Ok(())
519    }
520
521    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
522        if cmd.book_type != BookType::L2_MBP {
523            anyhow::bail!("BitMEX only supports L2_MBP order book snapshots");
524        }
525
526        let depth = cmd.depth.map(|d| d.get()).unwrap_or(10);
527        if depth != 10 {
528            tracing::warn!("BitMEX orderBook10 provides 10 levels; requested depth={depth}");
529        }
530
531        let instrument_id = cmd.instrument_id;
532        let ws = self.ws_client()?.clone();
533        let book_channels = Arc::clone(&self.book_channels);
534        self.spawn_ws(
535            async move {
536                ws.subscribe_book_depth10(instrument_id)
537                    .await
538                    .map_err(|err| anyhow::anyhow!(err))?;
539                book_channels
540                    .write()
541                    .expect("book channel cache lock poisoned")
542                    .insert(instrument_id, BitmexBookChannel::OrderBook10);
543                Ok(())
544            },
545            "BitMEX book snapshot subscription",
546        );
547        Ok(())
548    }
549
550    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
551        let instrument_id = cmd.instrument_id;
552        let ws = self.ws_client()?.clone();
553        self.spawn_ws(
554            async move {
555                ws.subscribe_quotes(instrument_id)
556                    .await
557                    .map_err(|err| anyhow::anyhow!(err))
558            },
559            "BitMEX quote subscription",
560        );
561        Ok(())
562    }
563
564    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
565        let instrument_id = cmd.instrument_id;
566        let ws = self.ws_client()?.clone();
567        self.spawn_ws(
568            async move {
569                ws.subscribe_trades(instrument_id)
570                    .await
571                    .map_err(|err| anyhow::anyhow!(err))
572            },
573            "BitMEX trade subscription",
574        );
575        Ok(())
576    }
577
578    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
579        let instrument_id = cmd.instrument_id;
580        let ws = self.ws_client()?.clone();
581        self.spawn_ws(
582            async move {
583                ws.subscribe_mark_prices(instrument_id)
584                    .await
585                    .map_err(|err| anyhow::anyhow!(err))
586            },
587            "BitMEX mark price subscription",
588        );
589        Ok(())
590    }
591
592    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
593        let instrument_id = cmd.instrument_id;
594        let ws = self.ws_client()?.clone();
595        self.spawn_ws(
596            async move {
597                ws.subscribe_index_prices(instrument_id)
598                    .await
599                    .map_err(|err| anyhow::anyhow!(err))
600            },
601            "BitMEX index price subscription",
602        );
603        Ok(())
604    }
605
606    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
607        let instrument_id = cmd.instrument_id;
608        let ws = self.ws_client()?.clone();
609        self.spawn_ws(
610            async move {
611                ws.subscribe_funding_rates(instrument_id)
612                    .await
613                    .map_err(|err| anyhow::anyhow!(err))
614            },
615            "BitMEX funding rate subscription",
616        );
617        Ok(())
618    }
619
620    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
621        let bar_type = cmd.bar_type;
622        let ws = self.ws_client()?.clone();
623        self.spawn_ws(
624            async move {
625                ws.subscribe_bars(bar_type)
626                    .await
627                    .map_err(|err| anyhow::anyhow!(err))
628            },
629            "BitMEX bar subscription",
630        );
631        Ok(())
632    }
633
634    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
635        let instrument_id = cmd.instrument_id;
636        let ws = self.ws_client()?.clone();
637        let book_channels = Arc::clone(&self.book_channels);
638        self.spawn_ws(
639            async move {
640                let channel = book_channels
641                    .write()
642                    .expect("book channel cache lock poisoned")
643                    .remove(&instrument_id);
644
645                match channel {
646                    Some(BitmexBookChannel::OrderBookL2) => ws
647                        .unsubscribe_book(instrument_id)
648                        .await
649                        .map_err(|err| anyhow::anyhow!(err))?,
650                    Some(BitmexBookChannel::OrderBookL2_25) => ws
651                        .unsubscribe_book_25(instrument_id)
652                        .await
653                        .map_err(|err| anyhow::anyhow!(err))?,
654                    Some(BitmexBookChannel::OrderBook10) | None => ws
655                        .unsubscribe_book(instrument_id)
656                        .await
657                        .map_err(|err| anyhow::anyhow!(err))?,
658                }
659                Ok(())
660            },
661            "BitMEX book delta unsubscribe",
662        );
663        Ok(())
664    }
665
666    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
667        let instrument_id = cmd.instrument_id;
668        let ws = self.ws_client()?.clone();
669        let book_channels = Arc::clone(&self.book_channels);
670        self.spawn_ws(
671            async move {
672                book_channels
673                    .write()
674                    .expect("book channel cache lock poisoned")
675                    .remove(&instrument_id);
676                ws.unsubscribe_book_depth10(instrument_id)
677                    .await
678                    .map_err(|err| anyhow::anyhow!(err))
679            },
680            "BitMEX book depth10 unsubscribe",
681        );
682        Ok(())
683    }
684
685    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
686        let instrument_id = cmd.instrument_id;
687        let ws = self.ws_client()?.clone();
688        let book_channels = Arc::clone(&self.book_channels);
689        self.spawn_ws(
690            async move {
691                book_channels
692                    .write()
693                    .expect("book channel cache lock poisoned")
694                    .remove(&instrument_id);
695                ws.unsubscribe_book_depth10(instrument_id)
696                    .await
697                    .map_err(|err| anyhow::anyhow!(err))
698            },
699            "BitMEX book snapshot unsubscribe",
700        );
701        Ok(())
702    }
703
704    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
705        let instrument_id = cmd.instrument_id;
706        let ws = self.ws_client()?.clone();
707        self.spawn_ws(
708            async move {
709                ws.unsubscribe_quotes(instrument_id)
710                    .await
711                    .map_err(|err| anyhow::anyhow!(err))
712            },
713            "BitMEX quote unsubscribe",
714        );
715        Ok(())
716    }
717
718    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
719        let instrument_id = cmd.instrument_id;
720        let ws = self.ws_client()?.clone();
721        self.spawn_ws(
722            async move {
723                ws.unsubscribe_trades(instrument_id)
724                    .await
725                    .map_err(|err| anyhow::anyhow!(err))
726            },
727            "BitMEX trade unsubscribe",
728        );
729        Ok(())
730    }
731
732    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
733        let ws = self.ws_client()?.clone();
734        let instrument_id = cmd.instrument_id;
735        self.spawn_ws(
736            async move {
737                ws.unsubscribe_mark_prices(instrument_id)
738                    .await
739                    .map_err(|err| anyhow::anyhow!(err))
740            },
741            "BitMEX mark price unsubscribe",
742        );
743        Ok(())
744    }
745
746    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
747        let ws = self.ws_client()?.clone();
748        let instrument_id = cmd.instrument_id;
749        self.spawn_ws(
750            async move {
751                ws.unsubscribe_index_prices(instrument_id)
752                    .await
753                    .map_err(|err| anyhow::anyhow!(err))
754            },
755            "BitMEX index price unsubscribe",
756        );
757        Ok(())
758    }
759
760    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
761        let ws = self.ws_client()?.clone();
762        let instrument_id = cmd.instrument_id;
763        self.spawn_ws(
764            async move {
765                ws.unsubscribe_funding_rates(instrument_id)
766                    .await
767                    .map_err(|err| anyhow::anyhow!(err))
768            },
769            "BitMEX funding rate unsubscribe",
770        );
771        Ok(())
772    }
773
774    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
775        let bar_type = cmd.bar_type;
776        let ws = self.ws_client()?.clone();
777        self.spawn_ws(
778            async move {
779                ws.unsubscribe_bars(bar_type)
780                    .await
781                    .map_err(|err| anyhow::anyhow!(err))
782            },
783            "BitMEX bar unsubscribe",
784        );
785        Ok(())
786    }
787
788    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
789        let venue = request.venue.unwrap_or_else(|| self.venue());
790        if let Some(req_venue) = request.venue
791            && req_venue != self.venue()
792        {
793            tracing::warn!("Ignoring mismatched venue in instruments request: {req_venue}");
794        }
795
796        let http = self.http_client.clone();
797        let instruments_cache = Arc::clone(&self.instruments);
798        let sender = self.data_sender.clone();
799        let request_id = request.request_id;
800        let client_id = request.client_id.unwrap_or(self.client_id);
801        let params = request.params.clone();
802        let start_nanos = datetime_to_unix_nanos(request.start);
803        let end_nanos = datetime_to_unix_nanos(request.end);
804        let clock = self.clock;
805        let active_only = self.config.active_only;
806
807        tokio::spawn(async move {
808            let http_client = http;
809            match http_client
810                .request_instruments(active_only)
811                .await
812                .context("failed to request instruments from BitMEX")
813            {
814                Ok(instruments) => {
815                    {
816                        let mut guard = instruments_cache
817                            .write()
818                            .expect("instrument cache lock poisoned");
819                        guard.clear();
820                        for instrument in &instruments {
821                            guard.insert(instrument.id(), instrument.clone());
822                            http_client.add_instrument(instrument.clone());
823                        }
824                    }
825
826                    let response = DataResponse::Instruments(InstrumentsResponse::new(
827                        request_id,
828                        client_id,
829                        venue,
830                        instruments,
831                        start_nanos,
832                        end_nanos,
833                        clock.get_time_ns(),
834                        params,
835                    ));
836                    if let Err(err) = sender.send(DataEvent::Response(response)) {
837                        tracing::error!("Failed to send instruments response: {err}");
838                    }
839                }
840                Err(err) => tracing::error!("Instrument request failed: {err:?}"),
841            }
842        });
843
844        Ok(())
845    }
846
847    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
848        if let Some(instrument) = self
849            .instruments
850            .read()
851            .expect("instrument cache lock poisoned")
852            .get(&request.instrument_id)
853            .cloned()
854        {
855            let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
856                request.request_id,
857                request.client_id.unwrap_or(self.client_id),
858                instrument.id(),
859                instrument,
860                datetime_to_unix_nanos(request.start),
861                datetime_to_unix_nanos(request.end),
862                self.clock.get_time_ns(),
863                request.params.clone(),
864            )));
865            if let Err(err) = self.data_sender.send(DataEvent::Response(response)) {
866                tracing::error!("Failed to send instrument response: {err}");
867            }
868            return Ok(());
869        }
870
871        let http_client = self.http_client.clone();
872        let instruments_cache = Arc::clone(&self.instruments);
873        let sender = self.data_sender.clone();
874        let instrument_id = request.instrument_id;
875        let request_id = request.request_id;
876        let client_id = request.client_id.unwrap_or(self.client_id);
877        let start = request.start;
878        let end = request.end;
879        let params = request.params.clone();
880        let clock = self.clock;
881
882        tokio::spawn(async move {
883            match http_client
884                .request_instrument(instrument_id)
885                .await
886                .context("failed to request instrument from BitMEX")
887            {
888                Ok(Some(instrument)) => {
889                    http_client.add_instrument(instrument.clone());
890                    {
891                        let mut guard = instruments_cache
892                            .write()
893                            .expect("instrument cache lock poisoned");
894                        guard.insert(instrument.id(), instrument.clone());
895                    }
896
897                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
898                        request_id,
899                        client_id,
900                        instrument.id(),
901                        instrument,
902                        datetime_to_unix_nanos(start),
903                        datetime_to_unix_nanos(end),
904                        clock.get_time_ns(),
905                        params,
906                    )));
907                    if let Err(err) = sender.send(DataEvent::Response(response)) {
908                        tracing::error!("Failed to send instrument response: {err}");
909                    }
910                }
911                Ok(None) => tracing::warn!("BitMEX instrument {instrument_id} not found"),
912                Err(err) => tracing::error!("Instrument request failed: {err:?}"),
913            }
914        });
915
916        Ok(())
917    }
918
919    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
920        let http = self.http_client.clone();
921        let sender = self.data_sender.clone();
922        let instrument_id = request.instrument_id;
923        let start = request.start;
924        let end = request.end;
925        let limit = request.limit.map(|n| n.get() as u32);
926        let request_id = request.request_id;
927        let client_id = request.client_id.unwrap_or(self.client_id);
928        let params = request.params.clone();
929        let clock = self.clock;
930        let start_nanos = datetime_to_unix_nanos(start);
931        let end_nanos = datetime_to_unix_nanos(end);
932
933        tokio::spawn(async move {
934            match http
935                .request_trades(instrument_id, start, end, limit)
936                .await
937                .context("failed to request trades from BitMEX")
938            {
939                Ok(trades) => {
940                    let response = DataResponse::Trades(TradesResponse::new(
941                        request_id,
942                        client_id,
943                        instrument_id,
944                        trades,
945                        start_nanos,
946                        end_nanos,
947                        clock.get_time_ns(),
948                        params,
949                    ));
950                    if let Err(err) = sender.send(DataEvent::Response(response)) {
951                        tracing::error!("Failed to send trades response: {err}");
952                    }
953                }
954                Err(err) => tracing::error!("Trade request failed: {err:?}"),
955            }
956        });
957
958        Ok(())
959    }
960
961    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
962        let http = self.http_client.clone();
963        let sender = self.data_sender.clone();
964        let bar_type = request.bar_type;
965        let start = request.start;
966        let end = request.end;
967        let limit = request.limit.map(|n| n.get() as u32);
968        let request_id = request.request_id;
969        let client_id = request.client_id.unwrap_or(self.client_id);
970        let params = request.params.clone();
971        let clock = self.clock;
972        let start_nanos = datetime_to_unix_nanos(start);
973        let end_nanos = datetime_to_unix_nanos(end);
974
975        tokio::spawn(async move {
976            match http
977                .request_bars(bar_type, start, end, limit, false)
978                .await
979                .context("failed to request bars from BitMEX")
980            {
981                Ok(bars) => {
982                    let response = DataResponse::Bars(BarsResponse::new(
983                        request_id,
984                        client_id,
985                        bar_type,
986                        bars,
987                        start_nanos,
988                        end_nanos,
989                        clock.get_time_ns(),
990                        params,
991                    ));
992                    if let Err(err) = sender.send(DataEvent::Response(response)) {
993                        tracing::error!("Failed to send bars response: {err}");
994                    }
995                }
996                Err(err) => tracing::error!("Bar request failed: {err:?}"),
997            }
998        });
999
1000        Ok(())
1001    }
1002}