1use std::{
19 future::Future,
20 sync::{
21 Arc, RwLock,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use chrono::{DateTime, Utc};
29use futures_util::StreamExt;
30use nautilus_common::{
31 messages::{
32 DataEvent,
33 data::{
34 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
35 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
36 SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots,
37 SubscribeFundingRates, SubscribeIndexPrices, SubscribeMarkPrices, SubscribeQuotes,
38 SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
39 UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeFundingRates,
40 UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
41 },
42 },
43 runner::get_data_event_sender,
44};
45use nautilus_core::{
46 UnixNanos,
47 time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_data::client::DataClient;
50use nautilus_model::{
51 data::Data,
52 enums::BookType,
53 identifiers::{ClientId, InstrumentId, Venue},
54 instruments::{Instrument, InstrumentAny},
55};
56use tokio::{task::JoinHandle, time::Duration};
57use tokio_util::sync::CancellationToken;
58
59use crate::{
60 common::consts::BITMEX_VENUE,
61 config::BitmexDataClientConfig,
62 http::client::BitmexHttpClient,
63 websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
64};
65
66#[derive(Clone, Copy, Debug, Eq, PartialEq)]
67enum BitmexBookChannel {
68 OrderBookL2,
69 OrderBookL2_25,
70 OrderBook10,
71}
72
73#[derive(Debug)]
74pub struct BitmexDataClient {
75 client_id: ClientId,
76 config: BitmexDataClientConfig,
77 http_client: BitmexHttpClient,
78 ws_client: Option<BitmexWebSocketClient>,
79 is_connected: AtomicBool,
80 cancellation_token: CancellationToken,
81 tasks: Vec<JoinHandle<()>>,
82 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
83 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
84 book_channels: Arc<RwLock<AHashMap<InstrumentId, BitmexBookChannel>>>,
85 clock: &'static AtomicTime,
86 instrument_refresh_active: bool,
87}
88
89impl BitmexDataClient {
90 pub fn new(client_id: ClientId, config: BitmexDataClientConfig) -> anyhow::Result<Self> {
96 let clock = get_atomic_clock_realtime();
97 let data_sender = get_data_event_sender();
98
99 let http_client = BitmexHttpClient::new(
100 Some(config.http_base_url()),
101 config.api_key.clone(),
102 config.api_secret.clone(),
103 config.use_testnet,
104 config.http_timeout_secs,
105 config.max_retries,
106 config.retry_delay_initial_ms,
107 config.retry_delay_max_ms,
108 config.recv_window_ms,
109 config.max_requests_per_second,
110 config.max_requests_per_minute,
111 )
112 .context("failed to construct BitMEX HTTP client")?;
113
114 Ok(Self {
115 client_id,
116 config,
117 http_client,
118 ws_client: None,
119 is_connected: AtomicBool::new(false),
120 cancellation_token: CancellationToken::new(),
121 tasks: Vec::new(),
122 data_sender,
123 instruments: Arc::new(RwLock::new(AHashMap::new())),
124 book_channels: Arc::new(RwLock::new(AHashMap::new())),
125 clock,
126 instrument_refresh_active: false,
127 })
128 }
129
130 fn venue(&self) -> Venue {
131 *BITMEX_VENUE
132 }
133
134 fn ws_client(&self) -> anyhow::Result<&BitmexWebSocketClient> {
135 self.ws_client
136 .as_ref()
137 .context("websocket client not initialized; call connect first")
138 }
139
140 fn ws_client_mut(&mut self) -> anyhow::Result<&mut BitmexWebSocketClient> {
141 self.ws_client
142 .as_mut()
143 .context("websocket client not initialized; call connect first")
144 }
145
146 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
147 if let Err(err) = sender.send(DataEvent::Data(data)) {
148 tracing::error!("Failed to emit data event: {err}");
149 }
150 }
151
152 fn spawn_ws<F>(&self, fut: F, context: &'static str)
153 where
154 F: Future<Output = anyhow::Result<()>> + Send + 'static,
155 {
156 tokio::spawn(async move {
157 if let Err(err) = fut.await {
158 tracing::error!("{context}: {err:?}");
159 }
160 });
161 }
162
163 fn spawn_stream_task(
164 &mut self,
165 stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
166 ) -> anyhow::Result<()> {
167 let data_sender = self.data_sender.clone();
168 let instruments = Arc::clone(&self.instruments);
169 let cancellation = self.cancellation_token.clone();
170
171 let handle = tokio::spawn(async move {
172 tokio::pin!(stream);
173
174 loop {
175 tokio::select! {
176 maybe_msg = stream.next() => {
177 match maybe_msg {
178 Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
179 None => {
180 tracing::debug!("BitMEX websocket stream ended");
181 break;
182 }
183 }
184 }
185 _ = cancellation.cancelled() => {
186 tracing::debug!("BitMEX websocket stream task cancelled");
187 break;
188 }
189 }
190 }
191 });
192
193 self.tasks.push(handle);
194 Ok(())
195 }
196
197 fn handle_ws_message(
198 message: NautilusWsMessage,
199 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
200 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
201 ) {
202 match message {
203 NautilusWsMessage::Data(payloads) => {
204 for data in payloads {
205 Self::send_data(sender, data);
206 }
207 }
208 NautilusWsMessage::FundingRateUpdates(updates) => {
209 for update in updates {
210 tracing::debug!(
211 instrument = %update.instrument_id,
212 rate = %update.rate,
213 "Funding rate update received (not forwarded)",
214 );
215 }
216 }
217 NautilusWsMessage::OrderStatusReports(_)
218 | NautilusWsMessage::OrderUpdated(_)
219 | NautilusWsMessage::FillReports(_)
220 | NautilusWsMessage::PositionStatusReport(_)
221 | NautilusWsMessage::AccountState(_) => {
222 tracing::debug!("Ignoring trading message on data client");
223 }
224 NautilusWsMessage::Reconnected => {
225 tracing::info!("BitMEX websocket reconnected");
226 }
227 }
228
229 let _ = instruments;
232 }
233
234 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
235 let http = self.http_client.clone();
236 let mut instruments = http
237 .request_instruments(self.config.active_only)
238 .await
239 .context("failed to request BitMEX instruments")?;
240
241 instruments.sort_by_key(|instrument| instrument.id());
242
243 {
244 let mut guard = self
245 .instruments
246 .write()
247 .expect("instrument cache lock poisoned");
248 guard.clear();
249 for instrument in &instruments {
250 guard.insert(instrument.id(), instrument.clone());
251 }
252 }
253
254 for instrument in &instruments {
255 self.http_client.add_instrument(instrument.clone());
256 }
257
258 Ok(instruments)
259 }
260
261 fn is_connected(&self) -> bool {
262 self.is_connected.load(Ordering::Relaxed)
263 }
264
265 fn is_disconnected(&self) -> bool {
266 !self.is_connected()
267 }
268
269 fn maybe_spawn_instrument_refresh(&mut self) -> anyhow::Result<()> {
270 let Some(minutes) = self.config.update_instruments_interval_mins else {
271 return Ok(());
272 };
273
274 if minutes == 0 || self.instrument_refresh_active {
275 return Ok(());
276 }
277
278 let interval_secs = minutes.saturating_mul(60);
279 if interval_secs == 0 {
280 return Ok(());
281 }
282
283 let interval = Duration::from_secs(interval_secs);
284 let cancellation = self.cancellation_token.clone();
285 let instruments_cache = Arc::clone(&self.instruments);
286 let active_only = self.config.active_only;
287 let client_id = self.client_id;
288 let http_client = self.http_client.clone();
289
290 let handle = tokio::spawn(async move {
291 let http_client = http_client;
292 loop {
293 let sleep = tokio::time::sleep(interval);
294 tokio::pin!(sleep);
295 tokio::select! {
296 _ = cancellation.cancelled() => {
297 tracing::debug!("BitMEX instrument refresh task cancelled");
298 break;
299 }
300 _ = &mut sleep => {
301 match http_client.request_instruments(active_only).await {
302 Ok(mut instruments) => {
303 instruments.sort_by_key(|instrument| instrument.id());
304
305 {
306 let mut guard = instruments_cache
307 .write()
308 .expect("instrument cache lock poisoned");
309 guard.clear();
310 for instrument in instruments.iter() {
311 guard.insert(instrument.id(), instrument.clone());
312 }
313 }
314
315 for instrument in instruments {
316 http_client.add_instrument(instrument);
317 }
318
319 tracing::debug!(client_id=%client_id, "BitMEX instruments refreshed");
320 }
321 Err(err) => {
322 tracing::warn!(client_id=%client_id, error=?err, "Failed to refresh BitMEX instruments");
323 }
324 }
325 }
326 }
327 }
328 });
329
330 self.tasks.push(handle);
331 self.instrument_refresh_active = true;
332 Ok(())
333 }
334}
335
336fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
337 value
338 .and_then(|dt| dt.timestamp_nanos_opt())
339 .and_then(|nanos| u64::try_from(nanos).ok())
340 .map(UnixNanos::from)
341}
342
343#[async_trait::async_trait]
344impl DataClient for BitmexDataClient {
345 fn client_id(&self) -> ClientId {
346 self.client_id
347 }
348
349 fn venue(&self) -> Option<Venue> {
350 Some(self.venue())
351 }
352
353 fn start(&mut self) -> anyhow::Result<()> {
354 tracing::info!("Starting BitMEX data client {id}", id = self.client_id);
355 Ok(())
356 }
357
358 fn stop(&mut self) -> anyhow::Result<()> {
359 tracing::info!("Stopping BitMEX data client {id}", id = self.client_id);
360 self.cancellation_token.cancel();
361 self.is_connected.store(false, Ordering::Relaxed);
362 self.instrument_refresh_active = false;
363 Ok(())
364 }
365
366 fn reset(&mut self) -> anyhow::Result<()> {
367 tracing::debug!("Resetting BitMEX data client {id}", id = self.client_id);
368 self.is_connected.store(false, Ordering::Relaxed);
369 self.cancellation_token = CancellationToken::new();
370 self.tasks.clear();
371 self.book_channels
372 .write()
373 .expect("book channel cache lock poisoned")
374 .clear();
375 self.instrument_refresh_active = false;
376 Ok(())
377 }
378
379 fn dispose(&mut self) -> anyhow::Result<()> {
380 self.stop()
381 }
382
383 async fn connect(&mut self) -> anyhow::Result<()> {
384 if self.is_connected() {
385 return Ok(());
386 }
387
388 if self.ws_client.is_none() {
389 let ws = BitmexWebSocketClient::new(
390 Some(self.config.ws_url()),
391 self.config.api_key.clone(),
392 self.config.api_secret.clone(),
393 None,
394 self.config.heartbeat_interval_secs,
395 )
396 .context("failed to construct BitMEX websocket client")?;
397 self.ws_client = Some(ws);
398 }
399
400 let instruments = self.bootstrap_instruments().await?;
401 if let Some(ws) = self.ws_client.as_mut() {
402 ws.initialize_instruments_cache(instruments);
403 }
404
405 let ws = self.ws_client_mut()?;
406 ws.connect()
407 .await
408 .context("failed to connect BitMEX websocket")?;
409 ws.wait_until_active(10.0)
410 .await
411 .context("BitMEX websocket did not become active")?;
412
413 let stream = ws.stream();
414 self.spawn_stream_task(stream)?;
415 self.maybe_spawn_instrument_refresh()?;
416
417 self.is_connected.store(true, Ordering::Relaxed);
418 tracing::info!("BitMEX data client connected");
419 Ok(())
420 }
421
422 async fn disconnect(&mut self) -> anyhow::Result<()> {
423 if self.is_disconnected() {
424 return Ok(());
425 }
426
427 self.cancellation_token.cancel();
428
429 if let Some(ws) = self.ws_client.as_mut()
430 && let Err(err) = ws.close().await
431 {
432 tracing::warn!("Error while closing BitMEX websocket: {err:?}");
433 }
434
435 for handle in self.tasks.drain(..) {
436 if let Err(err) = handle.await {
437 tracing::error!("Error joining websocket task: {err:?}");
438 }
439 }
440
441 self.cancellation_token = CancellationToken::new();
442 self.is_connected.store(false, Ordering::Relaxed);
443 self.book_channels
444 .write()
445 .expect("book channel cache lock poisoned")
446 .clear();
447 self.instrument_refresh_active = false;
448
449 tracing::info!("BitMEX data client disconnected");
450 Ok(())
451 }
452
453 fn is_connected(&self) -> bool {
454 self.is_connected()
455 }
456
457 fn is_disconnected(&self) -> bool {
458 self.is_disconnected()
459 }
460
461 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
462 if cmd.book_type != BookType::L2_MBP {
463 anyhow::bail!("BitMEX only supports L2_MBP order book deltas");
464 }
465
466 let instrument_id = cmd.instrument_id;
467 let depth = cmd.depth.map(|d| d.get()).unwrap_or(0);
468 let channel = if depth > 0 && depth <= 25 {
469 BitmexBookChannel::OrderBookL2_25
470 } else {
471 BitmexBookChannel::OrderBookL2
472 };
473
474 let ws = self.ws_client()?.clone();
475 let book_channels = Arc::clone(&self.book_channels);
476 self.spawn_ws(
477 async move {
478 match channel {
479 BitmexBookChannel::OrderBookL2 => ws
480 .subscribe_book(instrument_id)
481 .await
482 .map_err(|err| anyhow::anyhow!(err))?,
483 BitmexBookChannel::OrderBookL2_25 => ws
484 .subscribe_book_25(instrument_id)
485 .await
486 .map_err(|err| anyhow::anyhow!(err))?,
487 BitmexBookChannel::OrderBook10 => unreachable!(),
488 }
489 book_channels
490 .write()
491 .expect("book channel cache lock poisoned")
492 .insert(instrument_id, channel);
493 Ok(())
494 },
495 "BitMEX book delta subscription",
496 );
497
498 Ok(())
499 }
500
501 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
502 let instrument_id = cmd.instrument_id;
503 let ws = self.ws_client()?.clone();
504 let book_channels = Arc::clone(&self.book_channels);
505 self.spawn_ws(
506 async move {
507 ws.subscribe_book_depth10(instrument_id)
508 .await
509 .map_err(|err| anyhow::anyhow!(err))?;
510 book_channels
511 .write()
512 .expect("book channel cache lock poisoned")
513 .insert(instrument_id, BitmexBookChannel::OrderBook10);
514 Ok(())
515 },
516 "BitMEX book depth10 subscription",
517 );
518 Ok(())
519 }
520
521 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
522 if cmd.book_type != BookType::L2_MBP {
523 anyhow::bail!("BitMEX only supports L2_MBP order book snapshots");
524 }
525
526 let depth = cmd.depth.map(|d| d.get()).unwrap_or(10);
527 if depth != 10 {
528 tracing::warn!("BitMEX orderBook10 provides 10 levels; requested depth={depth}");
529 }
530
531 let instrument_id = cmd.instrument_id;
532 let ws = self.ws_client()?.clone();
533 let book_channels = Arc::clone(&self.book_channels);
534 self.spawn_ws(
535 async move {
536 ws.subscribe_book_depth10(instrument_id)
537 .await
538 .map_err(|err| anyhow::anyhow!(err))?;
539 book_channels
540 .write()
541 .expect("book channel cache lock poisoned")
542 .insert(instrument_id, BitmexBookChannel::OrderBook10);
543 Ok(())
544 },
545 "BitMEX book snapshot subscription",
546 );
547 Ok(())
548 }
549
550 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
551 let instrument_id = cmd.instrument_id;
552 let ws = self.ws_client()?.clone();
553 self.spawn_ws(
554 async move {
555 ws.subscribe_quotes(instrument_id)
556 .await
557 .map_err(|err| anyhow::anyhow!(err))
558 },
559 "BitMEX quote subscription",
560 );
561 Ok(())
562 }
563
564 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
565 let instrument_id = cmd.instrument_id;
566 let ws = self.ws_client()?.clone();
567 self.spawn_ws(
568 async move {
569 ws.subscribe_trades(instrument_id)
570 .await
571 .map_err(|err| anyhow::anyhow!(err))
572 },
573 "BitMEX trade subscription",
574 );
575 Ok(())
576 }
577
578 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
579 let instrument_id = cmd.instrument_id;
580 let ws = self.ws_client()?.clone();
581 self.spawn_ws(
582 async move {
583 ws.subscribe_mark_prices(instrument_id)
584 .await
585 .map_err(|err| anyhow::anyhow!(err))
586 },
587 "BitMEX mark price subscription",
588 );
589 Ok(())
590 }
591
592 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
593 let instrument_id = cmd.instrument_id;
594 let ws = self.ws_client()?.clone();
595 self.spawn_ws(
596 async move {
597 ws.subscribe_index_prices(instrument_id)
598 .await
599 .map_err(|err| anyhow::anyhow!(err))
600 },
601 "BitMEX index price subscription",
602 );
603 Ok(())
604 }
605
606 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
607 let instrument_id = cmd.instrument_id;
608 let ws = self.ws_client()?.clone();
609 self.spawn_ws(
610 async move {
611 ws.subscribe_funding_rates(instrument_id)
612 .await
613 .map_err(|err| anyhow::anyhow!(err))
614 },
615 "BitMEX funding rate subscription",
616 );
617 Ok(())
618 }
619
620 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
621 let bar_type = cmd.bar_type;
622 let ws = self.ws_client()?.clone();
623 self.spawn_ws(
624 async move {
625 ws.subscribe_bars(bar_type)
626 .await
627 .map_err(|err| anyhow::anyhow!(err))
628 },
629 "BitMEX bar subscription",
630 );
631 Ok(())
632 }
633
634 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
635 let instrument_id = cmd.instrument_id;
636 let ws = self.ws_client()?.clone();
637 let book_channels = Arc::clone(&self.book_channels);
638 self.spawn_ws(
639 async move {
640 let channel = book_channels
641 .write()
642 .expect("book channel cache lock poisoned")
643 .remove(&instrument_id);
644
645 match channel {
646 Some(BitmexBookChannel::OrderBookL2) => ws
647 .unsubscribe_book(instrument_id)
648 .await
649 .map_err(|err| anyhow::anyhow!(err))?,
650 Some(BitmexBookChannel::OrderBookL2_25) => ws
651 .unsubscribe_book_25(instrument_id)
652 .await
653 .map_err(|err| anyhow::anyhow!(err))?,
654 Some(BitmexBookChannel::OrderBook10) | None => ws
655 .unsubscribe_book(instrument_id)
656 .await
657 .map_err(|err| anyhow::anyhow!(err))?,
658 }
659 Ok(())
660 },
661 "BitMEX book delta unsubscribe",
662 );
663 Ok(())
664 }
665
666 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
667 let instrument_id = cmd.instrument_id;
668 let ws = self.ws_client()?.clone();
669 let book_channels = Arc::clone(&self.book_channels);
670 self.spawn_ws(
671 async move {
672 book_channels
673 .write()
674 .expect("book channel cache lock poisoned")
675 .remove(&instrument_id);
676 ws.unsubscribe_book_depth10(instrument_id)
677 .await
678 .map_err(|err| anyhow::anyhow!(err))
679 },
680 "BitMEX book depth10 unsubscribe",
681 );
682 Ok(())
683 }
684
685 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
686 let instrument_id = cmd.instrument_id;
687 let ws = self.ws_client()?.clone();
688 let book_channels = Arc::clone(&self.book_channels);
689 self.spawn_ws(
690 async move {
691 book_channels
692 .write()
693 .expect("book channel cache lock poisoned")
694 .remove(&instrument_id);
695 ws.unsubscribe_book_depth10(instrument_id)
696 .await
697 .map_err(|err| anyhow::anyhow!(err))
698 },
699 "BitMEX book snapshot unsubscribe",
700 );
701 Ok(())
702 }
703
704 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
705 let instrument_id = cmd.instrument_id;
706 let ws = self.ws_client()?.clone();
707 self.spawn_ws(
708 async move {
709 ws.unsubscribe_quotes(instrument_id)
710 .await
711 .map_err(|err| anyhow::anyhow!(err))
712 },
713 "BitMEX quote unsubscribe",
714 );
715 Ok(())
716 }
717
718 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
719 let instrument_id = cmd.instrument_id;
720 let ws = self.ws_client()?.clone();
721 self.spawn_ws(
722 async move {
723 ws.unsubscribe_trades(instrument_id)
724 .await
725 .map_err(|err| anyhow::anyhow!(err))
726 },
727 "BitMEX trade unsubscribe",
728 );
729 Ok(())
730 }
731
732 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
733 let ws = self.ws_client()?.clone();
734 let instrument_id = cmd.instrument_id;
735 self.spawn_ws(
736 async move {
737 ws.unsubscribe_mark_prices(instrument_id)
738 .await
739 .map_err(|err| anyhow::anyhow!(err))
740 },
741 "BitMEX mark price unsubscribe",
742 );
743 Ok(())
744 }
745
746 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
747 let ws = self.ws_client()?.clone();
748 let instrument_id = cmd.instrument_id;
749 self.spawn_ws(
750 async move {
751 ws.unsubscribe_index_prices(instrument_id)
752 .await
753 .map_err(|err| anyhow::anyhow!(err))
754 },
755 "BitMEX index price unsubscribe",
756 );
757 Ok(())
758 }
759
760 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
761 let ws = self.ws_client()?.clone();
762 let instrument_id = cmd.instrument_id;
763 self.spawn_ws(
764 async move {
765 ws.unsubscribe_funding_rates(instrument_id)
766 .await
767 .map_err(|err| anyhow::anyhow!(err))
768 },
769 "BitMEX funding rate unsubscribe",
770 );
771 Ok(())
772 }
773
774 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
775 let bar_type = cmd.bar_type;
776 let ws = self.ws_client()?.clone();
777 self.spawn_ws(
778 async move {
779 ws.unsubscribe_bars(bar_type)
780 .await
781 .map_err(|err| anyhow::anyhow!(err))
782 },
783 "BitMEX bar unsubscribe",
784 );
785 Ok(())
786 }
787
788 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
789 let venue = request.venue.unwrap_or_else(|| self.venue());
790 if let Some(req_venue) = request.venue
791 && req_venue != self.venue()
792 {
793 tracing::warn!("Ignoring mismatched venue in instruments request: {req_venue}");
794 }
795
796 let http = self.http_client.clone();
797 let instruments_cache = Arc::clone(&self.instruments);
798 let sender = self.data_sender.clone();
799 let request_id = request.request_id;
800 let client_id = request.client_id.unwrap_or(self.client_id);
801 let params = request.params.clone();
802 let start_nanos = datetime_to_unix_nanos(request.start);
803 let end_nanos = datetime_to_unix_nanos(request.end);
804 let clock = self.clock;
805 let active_only = self.config.active_only;
806
807 tokio::spawn(async move {
808 let http_client = http;
809 match http_client
810 .request_instruments(active_only)
811 .await
812 .context("failed to request instruments from BitMEX")
813 {
814 Ok(instruments) => {
815 {
816 let mut guard = instruments_cache
817 .write()
818 .expect("instrument cache lock poisoned");
819 guard.clear();
820 for instrument in &instruments {
821 guard.insert(instrument.id(), instrument.clone());
822 http_client.add_instrument(instrument.clone());
823 }
824 }
825
826 let response = DataResponse::Instruments(InstrumentsResponse::new(
827 request_id,
828 client_id,
829 venue,
830 instruments,
831 start_nanos,
832 end_nanos,
833 clock.get_time_ns(),
834 params,
835 ));
836 if let Err(err) = sender.send(DataEvent::Response(response)) {
837 tracing::error!("Failed to send instruments response: {err}");
838 }
839 }
840 Err(err) => tracing::error!("Instrument request failed: {err:?}"),
841 }
842 });
843
844 Ok(())
845 }
846
847 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
848 if let Some(instrument) = self
849 .instruments
850 .read()
851 .expect("instrument cache lock poisoned")
852 .get(&request.instrument_id)
853 .cloned()
854 {
855 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
856 request.request_id,
857 request.client_id.unwrap_or(self.client_id),
858 instrument.id(),
859 instrument,
860 datetime_to_unix_nanos(request.start),
861 datetime_to_unix_nanos(request.end),
862 self.clock.get_time_ns(),
863 request.params.clone(),
864 )));
865 if let Err(err) = self.data_sender.send(DataEvent::Response(response)) {
866 tracing::error!("Failed to send instrument response: {err}");
867 }
868 return Ok(());
869 }
870
871 let http_client = self.http_client.clone();
872 let instruments_cache = Arc::clone(&self.instruments);
873 let sender = self.data_sender.clone();
874 let instrument_id = request.instrument_id;
875 let request_id = request.request_id;
876 let client_id = request.client_id.unwrap_or(self.client_id);
877 let start = request.start;
878 let end = request.end;
879 let params = request.params.clone();
880 let clock = self.clock;
881
882 tokio::spawn(async move {
883 match http_client
884 .request_instrument(instrument_id)
885 .await
886 .context("failed to request instrument from BitMEX")
887 {
888 Ok(Some(instrument)) => {
889 http_client.add_instrument(instrument.clone());
890 {
891 let mut guard = instruments_cache
892 .write()
893 .expect("instrument cache lock poisoned");
894 guard.insert(instrument.id(), instrument.clone());
895 }
896
897 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
898 request_id,
899 client_id,
900 instrument.id(),
901 instrument,
902 datetime_to_unix_nanos(start),
903 datetime_to_unix_nanos(end),
904 clock.get_time_ns(),
905 params,
906 )));
907 if let Err(err) = sender.send(DataEvent::Response(response)) {
908 tracing::error!("Failed to send instrument response: {err}");
909 }
910 }
911 Ok(None) => tracing::warn!("BitMEX instrument {instrument_id} not found"),
912 Err(err) => tracing::error!("Instrument request failed: {err:?}"),
913 }
914 });
915
916 Ok(())
917 }
918
919 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
920 let http = self.http_client.clone();
921 let sender = self.data_sender.clone();
922 let instrument_id = request.instrument_id;
923 let start = request.start;
924 let end = request.end;
925 let limit = request.limit.map(|n| n.get() as u32);
926 let request_id = request.request_id;
927 let client_id = request.client_id.unwrap_or(self.client_id);
928 let params = request.params.clone();
929 let clock = self.clock;
930 let start_nanos = datetime_to_unix_nanos(start);
931 let end_nanos = datetime_to_unix_nanos(end);
932
933 tokio::spawn(async move {
934 match http
935 .request_trades(instrument_id, start, end, limit)
936 .await
937 .context("failed to request trades from BitMEX")
938 {
939 Ok(trades) => {
940 let response = DataResponse::Trades(TradesResponse::new(
941 request_id,
942 client_id,
943 instrument_id,
944 trades,
945 start_nanos,
946 end_nanos,
947 clock.get_time_ns(),
948 params,
949 ));
950 if let Err(err) = sender.send(DataEvent::Response(response)) {
951 tracing::error!("Failed to send trades response: {err}");
952 }
953 }
954 Err(err) => tracing::error!("Trade request failed: {err:?}"),
955 }
956 });
957
958 Ok(())
959 }
960
961 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
962 let http = self.http_client.clone();
963 let sender = self.data_sender.clone();
964 let bar_type = request.bar_type;
965 let start = request.start;
966 let end = request.end;
967 let limit = request.limit.map(|n| n.get() as u32);
968 let request_id = request.request_id;
969 let client_id = request.client_id.unwrap_or(self.client_id);
970 let params = request.params.clone();
971 let clock = self.clock;
972 let start_nanos = datetime_to_unix_nanos(start);
973 let end_nanos = datetime_to_unix_nanos(end);
974
975 tokio::spawn(async move {
976 match http
977 .request_bars(bar_type, start, end, limit, false)
978 .await
979 .context("failed to request bars from BitMEX")
980 {
981 Ok(bars) => {
982 let response = DataResponse::Bars(BarsResponse::new(
983 request_id,
984 client_id,
985 bar_type,
986 bars,
987 start_nanos,
988 end_nanos,
989 clock.get_time_ns(),
990 params,
991 ));
992 if let Err(err) = sender.send(DataEvent::Response(response)) {
993 tracing::error!("Failed to send bars response: {err}");
994 }
995 }
996 Err(err) => tracing::error!("Bar request failed: {err:?}"),
997 }
998 });
999
1000 Ok(())
1001 }
1002}