1use std::{
19 future::Future,
20 sync::{
21 Arc, RwLock,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use futures_util::StreamExt;
29use nautilus_common::{
30 live::{runner::get_data_event_sender, runtime::get_runtime},
31 messages::{
32 DataEvent,
33 data::{
34 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
35 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
36 SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots,
37 SubscribeFundingRates, SubscribeIndexPrices, SubscribeMarkPrices, SubscribeQuotes,
38 SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
39 UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeFundingRates,
40 UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
41 },
42 },
43};
44use nautilus_core::{
45 datetime::datetime_to_unix_nanos,
46 time::{AtomicTime, get_atomic_clock_realtime},
47};
48use nautilus_data::client::DataClient;
49use nautilus_model::{
50 data::Data,
51 enums::BookType,
52 identifiers::{ClientId, InstrumentId, Venue},
53 instruments::{Instrument, InstrumentAny},
54};
55use tokio::{task::JoinHandle, time::Duration};
56use tokio_util::sync::CancellationToken;
57
58use crate::{
59 common::consts::BITMEX_VENUE,
60 config::BitmexDataClientConfig,
61 http::client::BitmexHttpClient,
62 websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
63};
64
65#[derive(Clone, Copy, Debug, Eq, PartialEq)]
66enum BitmexBookChannel {
67 OrderBookL2,
68 OrderBookL2_25,
69 OrderBook10,
70}
71
72#[derive(Debug)]
73pub struct BitmexDataClient {
74 client_id: ClientId,
75 config: BitmexDataClientConfig,
76 http_client: BitmexHttpClient,
77 ws_client: Option<BitmexWebSocketClient>,
78 is_connected: AtomicBool,
79 cancellation_token: CancellationToken,
80 tasks: Vec<JoinHandle<()>>,
81 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
82 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
83 book_channels: Arc<RwLock<AHashMap<InstrumentId, BitmexBookChannel>>>,
84 clock: &'static AtomicTime,
85 instrument_refresh_active: bool,
86}
87
88impl BitmexDataClient {
89 pub fn new(client_id: ClientId, config: BitmexDataClientConfig) -> anyhow::Result<Self> {
95 let clock = get_atomic_clock_realtime();
96 let data_sender = get_data_event_sender();
97
98 let http_client = BitmexHttpClient::new(
99 Some(config.http_base_url()),
100 config.api_key.clone(),
101 config.api_secret.clone(),
102 config.use_testnet,
103 config.http_timeout_secs,
104 config.max_retries,
105 config.retry_delay_initial_ms,
106 config.retry_delay_max_ms,
107 config.recv_window_ms,
108 config.max_requests_per_second,
109 config.max_requests_per_minute,
110 config.http_proxy_url.clone(),
111 )
112 .context("failed to construct BitMEX HTTP client")?;
113
114 Ok(Self {
115 client_id,
116 config,
117 http_client,
118 ws_client: None,
119 is_connected: AtomicBool::new(false),
120 cancellation_token: CancellationToken::new(),
121 tasks: Vec::new(),
122 data_sender,
123 instruments: Arc::new(RwLock::new(AHashMap::new())),
124 book_channels: Arc::new(RwLock::new(AHashMap::new())),
125 clock,
126 instrument_refresh_active: false,
127 })
128 }
129
130 fn venue(&self) -> Venue {
131 *BITMEX_VENUE
132 }
133
134 fn ws_client(&self) -> anyhow::Result<&BitmexWebSocketClient> {
135 self.ws_client
136 .as_ref()
137 .context("websocket client not initialized; call connect first")
138 }
139
140 fn ws_client_mut(&mut self) -> anyhow::Result<&mut BitmexWebSocketClient> {
141 self.ws_client
142 .as_mut()
143 .context("websocket client not initialized; call connect first")
144 }
145
146 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
147 if let Err(e) = sender.send(DataEvent::Data(data)) {
148 tracing::error!("Failed to emit data event: {e}");
149 }
150 }
151
152 fn spawn_ws<F>(&self, fut: F, context: &'static str)
153 where
154 F: Future<Output = anyhow::Result<()>> + Send + 'static,
155 {
156 get_runtime().spawn(async move {
157 if let Err(e) = fut.await {
158 tracing::error!("{context}: {e:?}");
159 }
160 });
161 }
162
163 fn spawn_stream_task(
164 &mut self,
165 stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
166 ) -> anyhow::Result<()> {
167 let data_sender = self.data_sender.clone();
168 let instruments = Arc::clone(&self.instruments);
169 let cancellation = self.cancellation_token.clone();
170
171 let handle = get_runtime().spawn(async move {
172 tokio::pin!(stream);
173
174 loop {
175 tokio::select! {
176 maybe_msg = stream.next() => {
177 match maybe_msg {
178 Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
179 None => {
180 tracing::debug!("BitMEX websocket stream ended");
181 break;
182 }
183 }
184 }
185 _ = cancellation.cancelled() => {
186 tracing::debug!("BitMEX websocket stream task cancelled");
187 break;
188 }
189 }
190 }
191 });
192
193 self.tasks.push(handle);
194 Ok(())
195 }
196
197 fn handle_ws_message(
198 message: NautilusWsMessage,
199 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
200 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
201 ) {
202 match message {
203 NautilusWsMessage::Data(payloads) => {
204 for data in payloads {
205 Self::send_data(sender, data);
206 }
207 }
208 NautilusWsMessage::Instruments(insts) => {
209 let mut guard = instruments.write().expect("instrument cache lock poisoned");
210 for instrument in insts {
211 let instrument_id = instrument.id();
212 guard.insert(instrument_id, instrument);
213 }
214 let _ = sender;
216 }
217 NautilusWsMessage::FundingRateUpdates(updates) => {
218 for update in updates {
219 tracing::debug!(
220 instrument = %update.instrument_id,
221 rate = %update.rate,
222 "Funding rate update received (not forwarded)",
223 );
224 }
225 }
226 NautilusWsMessage::OrderStatusReports(_)
227 | NautilusWsMessage::OrderUpdated(_)
228 | NautilusWsMessage::FillReports(_)
229 | NautilusWsMessage::PositionStatusReport(_)
230 | NautilusWsMessage::AccountState(_) => {
231 tracing::debug!("Ignoring trading message on data client");
232 }
233 NautilusWsMessage::Reconnected => {
234 tracing::info!("BitMEX websocket reconnected");
235 }
236 NautilusWsMessage::Authenticated => {
237 tracing::debug!("BitMEX websocket authenticated");
238 }
239 }
240 }
241
242 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
243 let http = self.http_client.clone();
244 let mut instruments = http
245 .request_instruments(self.config.active_only)
246 .await
247 .context("failed to request BitMEX instruments")?;
248
249 instruments.sort_by_key(|instrument| instrument.id());
250
251 {
252 let mut guard = self
253 .instruments
254 .write()
255 .expect("instrument cache lock poisoned");
256 guard.clear();
257 for instrument in &instruments {
258 guard.insert(instrument.id(), instrument.clone());
259 }
260 }
261
262 for instrument in &instruments {
263 self.http_client.cache_instrument(instrument.clone());
264 }
265
266 Ok(instruments)
267 }
268
269 fn is_connected(&self) -> bool {
270 self.is_connected.load(Ordering::Relaxed)
271 }
272
273 fn is_disconnected(&self) -> bool {
274 !self.is_connected()
275 }
276
277 fn maybe_spawn_instrument_refresh(&mut self) -> anyhow::Result<()> {
278 let Some(minutes) = self.config.update_instruments_interval_mins else {
279 return Ok(());
280 };
281
282 if minutes == 0 || self.instrument_refresh_active {
283 return Ok(());
284 }
285
286 let interval_secs = minutes.saturating_mul(60);
287 if interval_secs == 0 {
288 return Ok(());
289 }
290
291 let interval = Duration::from_secs(interval_secs);
292 let cancellation = self.cancellation_token.clone();
293 let instruments_cache = Arc::clone(&self.instruments);
294 let active_only = self.config.active_only;
295 let client_id = self.client_id;
296 let http_client = self.http_client.clone();
297
298 let handle = get_runtime().spawn(async move {
299 let http_client = http_client;
300 loop {
301 let sleep = tokio::time::sleep(interval);
302 tokio::pin!(sleep);
303 tokio::select! {
304 _ = cancellation.cancelled() => {
305 tracing::debug!("BitMEX instrument refresh task cancelled");
306 break;
307 }
308 _ = &mut sleep => {
309 match http_client.request_instruments(active_only).await {
310 Ok(mut instruments) => {
311 instruments.sort_by_key(|instrument| instrument.id());
312
313 {
314 let mut guard = instruments_cache
315 .write()
316 .expect("instrument cache lock poisoned");
317 guard.clear();
318 for instrument in &instruments {
319 guard.insert(instrument.id(), instrument.clone());
320 }
321 }
322
323 for instrument in instruments {
324 http_client.cache_instrument(instrument);
325 }
326
327 tracing::debug!(client_id=%client_id, "BitMEX instruments refreshed");
328 }
329 Err(e) => {
330 tracing::warn!(client_id=%client_id, error=?e, "Failed to refresh BitMEX instruments");
331 }
332 }
333 }
334 }
335 }
336 });
337
338 self.tasks.push(handle);
339 self.instrument_refresh_active = true;
340 Ok(())
341 }
342}
343
344#[async_trait::async_trait(?Send)]
345impl DataClient for BitmexDataClient {
346 fn client_id(&self) -> ClientId {
347 self.client_id
348 }
349
350 fn venue(&self) -> Option<Venue> {
351 Some(self.venue())
352 }
353
354 fn start(&mut self) -> anyhow::Result<()> {
355 tracing::info!(
356 client_id = %self.client_id,
357 use_testnet = self.config.use_testnet,
358 http_proxy_url = ?self.config.http_proxy_url,
359 ws_proxy_url = ?self.config.ws_proxy_url,
360 "Starting BitMEX data client"
361 );
362 Ok(())
363 }
364
365 fn stop(&mut self) -> anyhow::Result<()> {
366 tracing::info!("Stopping BitMEX data client {id}", id = self.client_id);
367 self.cancellation_token.cancel();
368 self.is_connected.store(false, Ordering::Relaxed);
369 self.instrument_refresh_active = false;
370 Ok(())
371 }
372
373 fn reset(&mut self) -> anyhow::Result<()> {
374 tracing::debug!("Resetting BitMEX data client {id}", id = self.client_id);
375 self.is_connected.store(false, Ordering::Relaxed);
376 self.cancellation_token = CancellationToken::new();
377 self.tasks.clear();
378 self.book_channels
379 .write()
380 .expect("book channel cache lock poisoned")
381 .clear();
382 self.instrument_refresh_active = false;
383 Ok(())
384 }
385
386 fn dispose(&mut self) -> anyhow::Result<()> {
387 self.stop()
388 }
389
390 async fn connect(&mut self) -> anyhow::Result<()> {
391 if self.is_connected() {
392 return Ok(());
393 }
394
395 if self.ws_client.is_none() {
396 let ws = BitmexWebSocketClient::new(
397 Some(self.config.ws_url()),
398 self.config.api_key.clone(),
399 self.config.api_secret.clone(),
400 None,
401 self.config.heartbeat_interval_secs,
402 )
403 .context("failed to construct BitMEX websocket client")?;
404 self.ws_client = Some(ws);
405 }
406
407 let instruments = self.bootstrap_instruments().await?;
408 if let Some(ws) = self.ws_client.as_mut() {
409 ws.cache_instruments(instruments);
410 }
411
412 let ws = self.ws_client_mut()?;
413 ws.connect()
414 .await
415 .context("failed to connect BitMEX websocket")?;
416 ws.wait_until_active(10.0)
417 .await
418 .context("BitMEX websocket did not become active")?;
419
420 let stream = ws.stream();
421 self.spawn_stream_task(stream)?;
422 self.maybe_spawn_instrument_refresh()?;
423
424 self.is_connected.store(true, Ordering::Relaxed);
425 tracing::info!("Connected");
426 Ok(())
427 }
428
429 async fn disconnect(&mut self) -> anyhow::Result<()> {
430 if self.is_disconnected() {
431 return Ok(());
432 }
433
434 self.cancellation_token.cancel();
435
436 if let Some(ws) = self.ws_client.as_mut()
437 && let Err(e) = ws.close().await
438 {
439 tracing::warn!("Error while closing BitMEX websocket: {e:?}");
440 }
441
442 for handle in self.tasks.drain(..) {
443 if let Err(e) = handle.await {
444 tracing::error!("Error joining websocket task: {e:?}");
445 }
446 }
447
448 self.cancellation_token = CancellationToken::new();
449 self.is_connected.store(false, Ordering::Relaxed);
450 self.book_channels
451 .write()
452 .expect("book channel cache lock poisoned")
453 .clear();
454 self.instrument_refresh_active = false;
455
456 tracing::info!("Disconnected");
457 Ok(())
458 }
459
460 fn is_connected(&self) -> bool {
461 self.is_connected()
462 }
463
464 fn is_disconnected(&self) -> bool {
465 self.is_disconnected()
466 }
467
468 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
469 if cmd.book_type != BookType::L2_MBP {
470 anyhow::bail!("BitMEX only supports L2_MBP order book deltas");
471 }
472
473 let instrument_id = cmd.instrument_id;
474 let depth = cmd.depth.map_or(0, |d| d.get());
475 let channel = if depth > 0 && depth <= 25 {
476 BitmexBookChannel::OrderBookL2_25
477 } else {
478 BitmexBookChannel::OrderBookL2
479 };
480
481 let ws = self.ws_client()?.clone();
482 let book_channels = Arc::clone(&self.book_channels);
483
484 self.spawn_ws(
485 async move {
486 match channel {
487 BitmexBookChannel::OrderBookL2 => ws
488 .subscribe_book(instrument_id)
489 .await
490 .map_err(|err| anyhow::anyhow!(err))?,
491 BitmexBookChannel::OrderBookL2_25 => ws
492 .subscribe_book_25(instrument_id)
493 .await
494 .map_err(|err| anyhow::anyhow!(err))?,
495 BitmexBookChannel::OrderBook10 => unreachable!(),
496 }
497 book_channels
498 .write()
499 .expect("book channel cache lock poisoned")
500 .insert(instrument_id, channel);
501 Ok(())
502 },
503 "BitMEX book delta subscription",
504 );
505
506 Ok(())
507 }
508
509 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
510 let instrument_id = cmd.instrument_id;
511 let ws = self.ws_client()?.clone();
512 let book_channels = Arc::clone(&self.book_channels);
513
514 self.spawn_ws(
515 async move {
516 ws.subscribe_book_depth10(instrument_id)
517 .await
518 .map_err(|err| anyhow::anyhow!(err))?;
519 book_channels
520 .write()
521 .expect("book channel cache lock poisoned")
522 .insert(instrument_id, BitmexBookChannel::OrderBook10);
523 Ok(())
524 },
525 "BitMEX book depth10 subscription",
526 );
527 Ok(())
528 }
529
530 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
531 if cmd.book_type != BookType::L2_MBP {
532 anyhow::bail!("BitMEX only supports L2_MBP order book snapshots");
533 }
534
535 let depth = cmd.depth.map_or(10, |d| d.get());
536 if depth != 10 {
537 tracing::warn!("BitMEX orderBook10 provides 10 levels; requested depth={depth}");
538 }
539
540 let instrument_id = cmd.instrument_id;
541 let ws = self.ws_client()?.clone();
542 let book_channels = Arc::clone(&self.book_channels);
543
544 self.spawn_ws(
545 async move {
546 ws.subscribe_book_depth10(instrument_id)
547 .await
548 .map_err(|err| anyhow::anyhow!(err))?;
549 book_channels
550 .write()
551 .expect("book channel cache lock poisoned")
552 .insert(instrument_id, BitmexBookChannel::OrderBook10);
553 Ok(())
554 },
555 "BitMEX book snapshot subscription",
556 );
557 Ok(())
558 }
559
560 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
561 let instrument_id = cmd.instrument_id;
562 let ws = self.ws_client()?.clone();
563
564 self.spawn_ws(
565 async move {
566 ws.subscribe_quotes(instrument_id)
567 .await
568 .map_err(|err| anyhow::anyhow!(err))
569 },
570 "BitMEX quote subscription",
571 );
572 Ok(())
573 }
574
575 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
576 let instrument_id = cmd.instrument_id;
577 let ws = self.ws_client()?.clone();
578
579 self.spawn_ws(
580 async move {
581 ws.subscribe_trades(instrument_id)
582 .await
583 .map_err(|err| anyhow::anyhow!(err))
584 },
585 "BitMEX trade subscription",
586 );
587 Ok(())
588 }
589
590 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
591 let instrument_id = cmd.instrument_id;
592 let ws = self.ws_client()?.clone();
593
594 self.spawn_ws(
595 async move {
596 ws.subscribe_mark_prices(instrument_id)
597 .await
598 .map_err(|err| anyhow::anyhow!(err))
599 },
600 "BitMEX mark price subscription",
601 );
602 Ok(())
603 }
604
605 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
606 let instrument_id = cmd.instrument_id;
607 let ws = self.ws_client()?.clone();
608
609 self.spawn_ws(
610 async move {
611 ws.subscribe_index_prices(instrument_id)
612 .await
613 .map_err(|err| anyhow::anyhow!(err))
614 },
615 "BitMEX index price subscription",
616 );
617 Ok(())
618 }
619
620 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
621 let instrument_id = cmd.instrument_id;
622 let ws = self.ws_client()?.clone();
623
624 self.spawn_ws(
625 async move {
626 ws.subscribe_funding_rates(instrument_id)
627 .await
628 .map_err(|err| anyhow::anyhow!(err))
629 },
630 "BitMEX funding rate subscription",
631 );
632 Ok(())
633 }
634
635 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
636 let bar_type = cmd.bar_type;
637 let ws = self.ws_client()?.clone();
638
639 self.spawn_ws(
640 async move {
641 ws.subscribe_bars(bar_type)
642 .await
643 .map_err(|err| anyhow::anyhow!(err))
644 },
645 "BitMEX bar subscription",
646 );
647 Ok(())
648 }
649
650 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
651 let instrument_id = cmd.instrument_id;
652 let ws = self.ws_client()?.clone();
653 let book_channels = Arc::clone(&self.book_channels);
654
655 self.spawn_ws(
656 async move {
657 let channel = book_channels
658 .write()
659 .expect("book channel cache lock poisoned")
660 .remove(&instrument_id);
661
662 match channel {
663 Some(BitmexBookChannel::OrderBookL2) => ws
664 .unsubscribe_book(instrument_id)
665 .await
666 .map_err(|err| anyhow::anyhow!(err))?,
667 Some(BitmexBookChannel::OrderBookL2_25) => ws
668 .unsubscribe_book_25(instrument_id)
669 .await
670 .map_err(|err| anyhow::anyhow!(err))?,
671 Some(BitmexBookChannel::OrderBook10) | None => ws
672 .unsubscribe_book(instrument_id)
673 .await
674 .map_err(|err| anyhow::anyhow!(err))?,
675 }
676 Ok(())
677 },
678 "BitMEX book delta unsubscribe",
679 );
680 Ok(())
681 }
682
683 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
684 let instrument_id = cmd.instrument_id;
685 let ws = self.ws_client()?.clone();
686 let book_channels = Arc::clone(&self.book_channels);
687
688 self.spawn_ws(
689 async move {
690 book_channels
691 .write()
692 .expect("book channel cache lock poisoned")
693 .remove(&instrument_id);
694 ws.unsubscribe_book_depth10(instrument_id)
695 .await
696 .map_err(|err| anyhow::anyhow!(err))
697 },
698 "BitMEX book depth10 unsubscribe",
699 );
700 Ok(())
701 }
702
703 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
704 let instrument_id = cmd.instrument_id;
705 let ws = self.ws_client()?.clone();
706 let book_channels = Arc::clone(&self.book_channels);
707
708 self.spawn_ws(
709 async move {
710 book_channels
711 .write()
712 .expect("book channel cache lock poisoned")
713 .remove(&instrument_id);
714 ws.unsubscribe_book_depth10(instrument_id)
715 .await
716 .map_err(|err| anyhow::anyhow!(err))
717 },
718 "BitMEX book snapshot unsubscribe",
719 );
720 Ok(())
721 }
722
723 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
724 let instrument_id = cmd.instrument_id;
725 let ws = self.ws_client()?.clone();
726
727 self.spawn_ws(
728 async move {
729 ws.unsubscribe_quotes(instrument_id)
730 .await
731 .map_err(|err| anyhow::anyhow!(err))
732 },
733 "BitMEX quote unsubscribe",
734 );
735 Ok(())
736 }
737
738 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
739 let instrument_id = cmd.instrument_id;
740 let ws = self.ws_client()?.clone();
741
742 self.spawn_ws(
743 async move {
744 ws.unsubscribe_trades(instrument_id)
745 .await
746 .map_err(|err| anyhow::anyhow!(err))
747 },
748 "BitMEX trade unsubscribe",
749 );
750 Ok(())
751 }
752
753 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
754 let ws = self.ws_client()?.clone();
755 let instrument_id = cmd.instrument_id;
756
757 self.spawn_ws(
758 async move {
759 ws.unsubscribe_mark_prices(instrument_id)
760 .await
761 .map_err(|err| anyhow::anyhow!(err))
762 },
763 "BitMEX mark price unsubscribe",
764 );
765 Ok(())
766 }
767
768 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
769 let ws = self.ws_client()?.clone();
770 let instrument_id = cmd.instrument_id;
771
772 self.spawn_ws(
773 async move {
774 ws.unsubscribe_index_prices(instrument_id)
775 .await
776 .map_err(|err| anyhow::anyhow!(err))
777 },
778 "BitMEX index price unsubscribe",
779 );
780 Ok(())
781 }
782
783 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
784 let ws = self.ws_client()?.clone();
785 let instrument_id = cmd.instrument_id;
786
787 self.spawn_ws(
788 async move {
789 ws.unsubscribe_funding_rates(instrument_id)
790 .await
791 .map_err(|err| anyhow::anyhow!(err))
792 },
793 "BitMEX funding rate unsubscribe",
794 );
795 Ok(())
796 }
797
798 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
799 let bar_type = cmd.bar_type;
800 let ws = self.ws_client()?.clone();
801
802 self.spawn_ws(
803 async move {
804 ws.unsubscribe_bars(bar_type)
805 .await
806 .map_err(|err| anyhow::anyhow!(err))
807 },
808 "BitMEX bar unsubscribe",
809 );
810 Ok(())
811 }
812
813 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
814 let venue = request.venue.unwrap_or_else(|| self.venue());
815 if let Some(req_venue) = request.venue
816 && req_venue != self.venue()
817 {
818 tracing::warn!("Ignoring mismatched venue in instruments request: {req_venue}");
819 }
820
821 let http = self.http_client.clone();
822 let instruments_cache = Arc::clone(&self.instruments);
823 let sender = self.data_sender.clone();
824 let request_id = request.request_id;
825 let client_id = request.client_id.unwrap_or(self.client_id);
826 let params = request.params.clone();
827 let start_nanos = datetime_to_unix_nanos(request.start);
828 let end_nanos = datetime_to_unix_nanos(request.end);
829 let clock = self.clock;
830 let active_only = self.config.active_only;
831
832 get_runtime().spawn(async move {
833 let http_client = http;
834 match http_client
835 .request_instruments(active_only)
836 .await
837 .context("failed to request instruments from BitMEX")
838 {
839 Ok(instruments) => {
840 {
841 let mut guard = instruments_cache
842 .write()
843 .expect("instrument cache lock poisoned");
844 guard.clear();
845 for instrument in &instruments {
846 guard.insert(instrument.id(), instrument.clone());
847 http_client.cache_instrument(instrument.clone());
848 }
849 }
850
851 let response = DataResponse::Instruments(InstrumentsResponse::new(
852 request_id,
853 client_id,
854 venue,
855 instruments,
856 start_nanos,
857 end_nanos,
858 clock.get_time_ns(),
859 params,
860 ));
861 if let Err(e) = sender.send(DataEvent::Response(response)) {
862 tracing::error!("Failed to send instruments response: {e}");
863 }
864 }
865 Err(e) => tracing::error!("Instrument request failed: {e:?}"),
866 }
867 });
868
869 Ok(())
870 }
871
872 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
873 if let Some(instrument) = self
874 .instruments
875 .read()
876 .expect("instrument cache lock poisoned")
877 .get(&request.instrument_id)
878 .cloned()
879 {
880 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
881 request.request_id,
882 request.client_id.unwrap_or(self.client_id),
883 instrument.id(),
884 instrument,
885 datetime_to_unix_nanos(request.start),
886 datetime_to_unix_nanos(request.end),
887 self.clock.get_time_ns(),
888 request.params.clone(),
889 )));
890 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
891 tracing::error!("Failed to send instrument response: {e}");
892 }
893 return Ok(());
894 }
895
896 let http_client = self.http_client.clone();
897 let instruments_cache = Arc::clone(&self.instruments);
898 let sender = self.data_sender.clone();
899 let instrument_id = request.instrument_id;
900 let request_id = request.request_id;
901 let client_id = request.client_id.unwrap_or(self.client_id);
902 let start = request.start;
903 let end = request.end;
904 let params = request.params.clone();
905 let clock = self.clock;
906
907 get_runtime().spawn(async move {
908 match http_client
909 .request_instrument(instrument_id)
910 .await
911 .context("failed to request instrument from BitMEX")
912 {
913 Ok(Some(instrument)) => {
914 http_client.cache_instrument(instrument.clone());
915 {
916 let mut guard = instruments_cache
917 .write()
918 .expect("instrument cache lock poisoned");
919 guard.insert(instrument.id(), instrument.clone());
920 }
921
922 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
923 request_id,
924 client_id,
925 instrument.id(),
926 instrument,
927 datetime_to_unix_nanos(start),
928 datetime_to_unix_nanos(end),
929 clock.get_time_ns(),
930 params,
931 )));
932 if let Err(e) = sender.send(DataEvent::Response(response)) {
933 tracing::error!("Failed to send instrument response: {e}");
934 }
935 }
936 Ok(None) => tracing::warn!("BitMEX instrument {instrument_id} not found"),
937 Err(e) => tracing::error!("Instrument request failed: {e:?}"),
938 }
939 });
940
941 Ok(())
942 }
943
944 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
945 let http = self.http_client.clone();
946 let sender = self.data_sender.clone();
947 let instrument_id = request.instrument_id;
948 let start = request.start;
949 let end = request.end;
950 let limit = request.limit.map(|n| n.get() as u32);
951 let request_id = request.request_id;
952 let client_id = request.client_id.unwrap_or(self.client_id);
953 let params = request.params.clone();
954 let clock = self.clock;
955 let start_nanos = datetime_to_unix_nanos(start);
956 let end_nanos = datetime_to_unix_nanos(end);
957
958 get_runtime().spawn(async move {
959 match http
960 .request_trades(instrument_id, start, end, limit)
961 .await
962 .context("failed to request trades from BitMEX")
963 {
964 Ok(trades) => {
965 let response = DataResponse::Trades(TradesResponse::new(
966 request_id,
967 client_id,
968 instrument_id,
969 trades,
970 start_nanos,
971 end_nanos,
972 clock.get_time_ns(),
973 params,
974 ));
975 if let Err(e) = sender.send(DataEvent::Response(response)) {
976 tracing::error!("Failed to send trades response: {e}");
977 }
978 }
979 Err(e) => tracing::error!("Trade request failed: {e:?}"),
980 }
981 });
982
983 Ok(())
984 }
985
986 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
987 let http = self.http_client.clone();
988 let sender = self.data_sender.clone();
989 let bar_type = request.bar_type;
990 let start = request.start;
991 let end = request.end;
992 let limit = request.limit.map(|n| n.get() as u32);
993 let request_id = request.request_id;
994 let client_id = request.client_id.unwrap_or(self.client_id);
995 let params = request.params.clone();
996 let clock = self.clock;
997 let start_nanos = datetime_to_unix_nanos(start);
998 let end_nanos = datetime_to_unix_nanos(end);
999
1000 get_runtime().spawn(async move {
1001 match http
1002 .request_bars(bar_type, start, end, limit, false)
1003 .await
1004 .context("failed to request bars from BitMEX")
1005 {
1006 Ok(bars) => {
1007 let response = DataResponse::Bars(BarsResponse::new(
1008 request_id,
1009 client_id,
1010 bar_type,
1011 bars,
1012 start_nanos,
1013 end_nanos,
1014 clock.get_time_ns(),
1015 params,
1016 ));
1017 if let Err(e) = sender.send(DataEvent::Response(response)) {
1018 tracing::error!("Failed to send bars response: {e}");
1019 }
1020 }
1021 Err(e) => tracing::error!("Bar request failed: {e:?}"),
1022 }
1023 });
1024
1025 Ok(())
1026 }
1027}