1use std::{
19 future::Future,
20 sync::{
21 Arc, RwLock,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use futures_util::StreamExt;
29use nautilus_common::{
30 clients::DataClient,
31 live::{runner::get_data_event_sender, runtime::get_runtime},
32 messages::{
33 DataEvent,
34 data::{
35 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
36 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
37 SubscribeBookDeltas, SubscribeBookDepth10, SubscribeFundingRates, SubscribeIndexPrices,
38 SubscribeInstrument, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
39 SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
40 UnsubscribeBookDepth10, UnsubscribeFundingRates, UnsubscribeIndexPrices,
41 UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
42 },
43 },
44};
45use nautilus_core::{
46 datetime::datetime_to_unix_nanos,
47 time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_model::{
50 data::Data,
51 enums::BookType,
52 identifiers::{ClientId, InstrumentId, Venue},
53 instruments::{Instrument, InstrumentAny},
54};
55use tokio::{task::JoinHandle, time::Duration};
56use tokio_util::sync::CancellationToken;
57
58use crate::{
59 common::consts::BITMEX_VENUE,
60 config::BitmexDataClientConfig,
61 http::client::BitmexHttpClient,
62 websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
63};
64
65#[derive(Clone, Copy, Debug, Eq, PartialEq)]
66enum BitmexBookChannel {
67 OrderBookL2,
68 OrderBookL2_25,
69 OrderBook10,
70}
71
72#[derive(Debug)]
73pub struct BitmexDataClient {
74 client_id: ClientId,
75 config: BitmexDataClientConfig,
76 http_client: BitmexHttpClient,
77 ws_client: Option<BitmexWebSocketClient>,
78 is_connected: AtomicBool,
79 cancellation_token: CancellationToken,
80 tasks: Vec<JoinHandle<()>>,
81 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
82 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
83 book_channels: Arc<RwLock<AHashMap<InstrumentId, BitmexBookChannel>>>,
84 clock: &'static AtomicTime,
85 instrument_refresh_active: bool,
86}
87
88impl BitmexDataClient {
89 pub fn new(client_id: ClientId, config: BitmexDataClientConfig) -> anyhow::Result<Self> {
95 let clock = get_atomic_clock_realtime();
96 let data_sender = get_data_event_sender();
97
98 let http_client = BitmexHttpClient::new(
99 Some(config.http_base_url()),
100 config.api_key.clone(),
101 config.api_secret.clone(),
102 config.use_testnet,
103 config.http_timeout_secs,
104 config.max_retries,
105 config.retry_delay_initial_ms,
106 config.retry_delay_max_ms,
107 config.recv_window_ms,
108 config.max_requests_per_second,
109 config.max_requests_per_minute,
110 config.http_proxy_url.clone(),
111 )
112 .context("failed to construct BitMEX HTTP client")?;
113
114 Ok(Self {
115 client_id,
116 config,
117 http_client,
118 ws_client: None,
119 is_connected: AtomicBool::new(false),
120 cancellation_token: CancellationToken::new(),
121 tasks: Vec::new(),
122 data_sender,
123 instruments: Arc::new(RwLock::new(AHashMap::new())),
124 book_channels: Arc::new(RwLock::new(AHashMap::new())),
125 clock,
126 instrument_refresh_active: false,
127 })
128 }
129
130 fn venue(&self) -> Venue {
131 *BITMEX_VENUE
132 }
133
134 fn ws_client(&self) -> anyhow::Result<&BitmexWebSocketClient> {
135 self.ws_client
136 .as_ref()
137 .context("websocket client not initialized; call connect first")
138 }
139
140 fn ws_client_mut(&mut self) -> anyhow::Result<&mut BitmexWebSocketClient> {
141 self.ws_client
142 .as_mut()
143 .context("websocket client not initialized; call connect first")
144 }
145
146 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
147 if let Err(e) = sender.send(DataEvent::Data(data)) {
148 log::error!("Failed to emit data event: {e}");
149 }
150 }
151
152 fn spawn_ws<F>(&self, fut: F, context: &'static str)
153 where
154 F: Future<Output = anyhow::Result<()>> + Send + 'static,
155 {
156 get_runtime().spawn(async move {
157 if let Err(e) = fut.await {
158 log::error!("{context}: {e:?}");
159 }
160 });
161 }
162
163 fn spawn_stream_task(
164 &mut self,
165 stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
166 ) -> anyhow::Result<()> {
167 let data_sender = self.data_sender.clone();
168 let instruments = Arc::clone(&self.instruments);
169 let cancellation = self.cancellation_token.clone();
170
171 let handle = get_runtime().spawn(async move {
172 tokio::pin!(stream);
173
174 loop {
175 tokio::select! {
176 maybe_msg = stream.next() => {
177 match maybe_msg {
178 Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
179 None => {
180 log::debug!("BitMEX websocket stream ended");
181 break;
182 }
183 }
184 }
185 () = cancellation.cancelled() => {
186 log::debug!("BitMEX websocket stream task cancelled");
187 break;
188 }
189 }
190 }
191 });
192
193 self.tasks.push(handle);
194 Ok(())
195 }
196
197 fn handle_ws_message(
198 message: NautilusWsMessage,
199 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
200 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
201 ) {
202 match message {
203 NautilusWsMessage::Data(payloads) => {
204 for data in payloads {
205 Self::send_data(sender, data);
206 }
207 }
208 NautilusWsMessage::Instruments(insts) => {
209 let mut guard = instruments.write().expect("instrument cache lock poisoned");
210 for instrument in insts {
211 let instrument_id = instrument.id();
212 guard.insert(instrument_id, instrument.clone());
213 if let Err(e) = sender.send(DataEvent::Instrument(instrument)) {
214 log::error!("Failed to send instrument event: {e}");
215 }
216 }
217 }
218 NautilusWsMessage::FundingRateUpdates(updates) => {
219 for update in updates {
220 log::debug!(
221 "Funding rate update: instrument={}, rate={}",
222 update.instrument_id,
223 update.rate,
224 );
225 if let Err(e) = sender.send(DataEvent::FundingRate(update)) {
226 log::error!("Failed to emit funding rate event: {e}");
227 }
228 }
229 }
230 NautilusWsMessage::OrderStatusReports(_)
231 | NautilusWsMessage::OrderUpdated(_)
232 | NautilusWsMessage::OrderUpdates(_)
233 | NautilusWsMessage::FillReports(_)
234 | NautilusWsMessage::PositionStatusReports(_)
235 | NautilusWsMessage::AccountStates(_) => {
236 log::debug!("Ignoring trading message on data client");
237 }
238 NautilusWsMessage::Reconnected => {
239 log::info!("BitMEX websocket reconnected");
240 }
241 NautilusWsMessage::Authenticated => {
242 log::debug!("BitMEX websocket authenticated");
243 }
244 }
245 }
246
247 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
248 let http = self.http_client.clone();
249 let mut instruments = http
250 .request_instruments(self.config.active_only)
251 .await
252 .context("failed to request BitMEX instruments")?;
253
254 instruments.sort_by_key(|instrument| instrument.id());
255
256 {
257 let mut guard = self
258 .instruments
259 .write()
260 .expect("instrument cache lock poisoned");
261 guard.clear();
262 for instrument in &instruments {
263 guard.insert(instrument.id(), instrument.clone());
264 }
265 }
266
267 for instrument in &instruments {
268 self.http_client.cache_instrument(instrument.clone());
269 }
270
271 Ok(instruments)
272 }
273
274 fn is_connected(&self) -> bool {
275 self.is_connected.load(Ordering::Relaxed)
276 }
277
278 fn is_disconnected(&self) -> bool {
279 !self.is_connected()
280 }
281
282 fn maybe_spawn_instrument_refresh(&mut self) -> anyhow::Result<()> {
283 let Some(minutes) = self.config.update_instruments_interval_mins else {
284 return Ok(());
285 };
286
287 if minutes == 0 || self.instrument_refresh_active {
288 return Ok(());
289 }
290
291 let interval_secs = minutes.saturating_mul(60);
292 if interval_secs == 0 {
293 return Ok(());
294 }
295
296 let interval = Duration::from_secs(interval_secs);
297 let cancellation = self.cancellation_token.clone();
298 let instruments_cache = Arc::clone(&self.instruments);
299 let active_only = self.config.active_only;
300 let client_id = self.client_id;
301 let http_client = self.http_client.clone();
302
303 let handle = get_runtime().spawn(async move {
304 let http_client = http_client;
305 loop {
306 let sleep = tokio::time::sleep(interval);
307 tokio::pin!(sleep);
308 tokio::select! {
309 () = cancellation.cancelled() => {
310 log::debug!("BitMEX instrument refresh task cancelled");
311 break;
312 }
313 () = &mut sleep => {
314 match http_client.request_instruments(active_only).await {
315 Ok(mut instruments) => {
316 instruments.sort_by_key(|instrument| instrument.id());
317
318 {
319 let mut guard = instruments_cache
320 .write()
321 .expect("instrument cache lock poisoned");
322 guard.clear();
323 for instrument in &instruments {
324 guard.insert(instrument.id(), instrument.clone());
325 }
326 }
327
328 for instrument in instruments {
329 http_client.cache_instrument(instrument);
330 }
331
332 log::debug!("BitMEX instruments refreshed: client_id={client_id}");
333 }
334 Err(e) => {
335 log::warn!("Failed to refresh BitMEX instruments: client_id={client_id}, error={e:?}");
336 }
337 }
338 }
339 }
340 }
341 });
342
343 self.tasks.push(handle);
344 self.instrument_refresh_active = true;
345 Ok(())
346 }
347}
348
349#[async_trait::async_trait(?Send)]
350impl DataClient for BitmexDataClient {
351 fn client_id(&self) -> ClientId {
352 self.client_id
353 }
354
355 fn venue(&self) -> Option<Venue> {
356 Some(self.venue())
357 }
358
359 fn start(&mut self) -> anyhow::Result<()> {
360 log::info!(
361 "Starting BitMEX data client: client_id={}, use_testnet={}, http_proxy_url={:?}, ws_proxy_url={:?}",
362 self.client_id,
363 self.config.use_testnet,
364 self.config.http_proxy_url,
365 self.config.ws_proxy_url,
366 );
367 Ok(())
368 }
369
370 fn stop(&mut self) -> anyhow::Result<()> {
371 log::info!("Stopping BitMEX data client {id}", id = self.client_id);
372 self.cancellation_token.cancel();
373 self.is_connected.store(false, Ordering::Relaxed);
374 self.instrument_refresh_active = false;
375 Ok(())
376 }
377
378 fn reset(&mut self) -> anyhow::Result<()> {
379 log::debug!("Resetting BitMEX data client {id}", id = self.client_id);
380 self.is_connected.store(false, Ordering::Relaxed);
381 self.cancellation_token = CancellationToken::new();
382 self.tasks.clear();
383 self.book_channels
384 .write()
385 .expect("book channel cache lock poisoned")
386 .clear();
387 self.instrument_refresh_active = false;
388 Ok(())
389 }
390
391 fn dispose(&mut self) -> anyhow::Result<()> {
392 self.stop()
393 }
394
395 async fn connect(&mut self) -> anyhow::Result<()> {
396 if self.is_connected() {
397 return Ok(());
398 }
399
400 if self.ws_client.is_none() {
401 let ws = BitmexWebSocketClient::new(
402 Some(self.config.ws_url()),
403 self.config.api_key.clone(),
404 self.config.api_secret.clone(),
405 None,
406 self.config.heartbeat_interval_secs,
407 )
408 .context("failed to construct BitMEX websocket client")?;
409 self.ws_client = Some(ws);
410 }
411
412 let instruments = self.bootstrap_instruments().await?;
413 if let Some(ws) = self.ws_client.as_mut() {
414 ws.cache_instruments(instruments);
415 }
416
417 let ws = self.ws_client_mut()?;
418 ws.connect()
419 .await
420 .context("failed to connect BitMEX websocket")?;
421 ws.wait_until_active(10.0)
422 .await
423 .context("BitMEX websocket did not become active")?;
424
425 let stream = ws.stream();
426 self.spawn_stream_task(stream)?;
427 self.maybe_spawn_instrument_refresh()?;
428
429 self.is_connected.store(true, Ordering::Relaxed);
430 log::info!("Connected");
431 Ok(())
432 }
433
434 async fn disconnect(&mut self) -> anyhow::Result<()> {
435 if self.is_disconnected() {
436 return Ok(());
437 }
438
439 self.cancellation_token.cancel();
440
441 if let Some(ws) = self.ws_client.as_mut()
442 && let Err(e) = ws.close().await
443 {
444 log::warn!("Error while closing BitMEX websocket: {e:?}");
445 }
446
447 for handle in self.tasks.drain(..) {
448 if let Err(e) = handle.await {
449 log::error!("Error joining websocket task: {e:?}");
450 }
451 }
452
453 self.cancellation_token = CancellationToken::new();
454 self.is_connected.store(false, Ordering::Relaxed);
455 self.book_channels
456 .write()
457 .expect("book channel cache lock poisoned")
458 .clear();
459 self.instrument_refresh_active = false;
460
461 log::info!("Disconnected");
462 Ok(())
463 }
464
465 fn is_connected(&self) -> bool {
466 self.is_connected()
467 }
468
469 fn is_disconnected(&self) -> bool {
470 self.is_disconnected()
471 }
472
473 fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
474 let ws = self.ws_client()?.clone();
475
476 self.spawn_ws(
477 async move {
478 ws.subscribe_instruments()
479 .await
480 .map_err(|e| anyhow::anyhow!(e))
481 },
482 "BitMEX instruments subscription",
483 );
484 Ok(())
485 }
486
487 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
488 let instrument_id = cmd.instrument_id;
489
490 if let Some(instrument) = self
491 .instruments
492 .read()
493 .expect("instrument cache lock poisoned")
494 .get(&instrument_id)
495 .cloned()
496 {
497 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
498 log::error!("Failed to send instrument event for {instrument_id}: {e}");
499 }
500 return Ok(());
501 }
502
503 log::warn!("Instrument {instrument_id} not found in BitMEX cache");
504
505 let ws = self.ws_client()?.clone();
506 self.spawn_ws(
507 async move {
508 ws.subscribe_instrument(instrument_id)
509 .await
510 .map_err(|e| anyhow::anyhow!(e))
511 },
512 "BitMEX instrument subscription",
513 );
514
515 Ok(())
516 }
517
518 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
519 if cmd.book_type != BookType::L2_MBP {
520 anyhow::bail!("BitMEX only supports L2_MBP order book deltas");
521 }
522
523 let instrument_id = cmd.instrument_id;
524 let depth = cmd.depth.map_or(0, |d| d.get());
525 let channel = if depth > 0 && depth <= 25 {
526 if depth != 25 {
527 log::info!(
528 "BitMEX only supports depth 25 for L2 deltas, using L2_25 for requested depth {depth}"
529 );
530 }
531 BitmexBookChannel::OrderBookL2_25
532 } else {
533 BitmexBookChannel::OrderBookL2
534 };
535
536 let ws = self.ws_client()?.clone();
537 let book_channels = Arc::clone(&self.book_channels);
538
539 self.spawn_ws(
540 async move {
541 match channel {
542 BitmexBookChannel::OrderBookL2 => ws
543 .subscribe_book(instrument_id)
544 .await
545 .map_err(|e| anyhow::anyhow!(e))?,
546 BitmexBookChannel::OrderBookL2_25 => ws
547 .subscribe_book_25(instrument_id)
548 .await
549 .map_err(|e| anyhow::anyhow!(e))?,
550 BitmexBookChannel::OrderBook10 => unreachable!(),
551 }
552 book_channels
553 .write()
554 .expect("book channel cache lock poisoned")
555 .insert(instrument_id, channel);
556 Ok(())
557 },
558 "BitMEX book delta subscription",
559 );
560
561 Ok(())
562 }
563
564 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
565 let instrument_id = cmd.instrument_id;
566 let ws = self.ws_client()?.clone();
567 let book_channels = Arc::clone(&self.book_channels);
568
569 self.spawn_ws(
570 async move {
571 ws.subscribe_book_depth10(instrument_id)
572 .await
573 .map_err(|e| anyhow::anyhow!(e))?;
574 book_channels
575 .write()
576 .expect("book channel cache lock poisoned")
577 .insert(instrument_id, BitmexBookChannel::OrderBook10);
578 Ok(())
579 },
580 "BitMEX book depth10 subscription",
581 );
582 Ok(())
583 }
584
585 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
586 let instrument_id = cmd.instrument_id;
587 let ws = self.ws_client()?.clone();
588
589 self.spawn_ws(
590 async move {
591 ws.subscribe_quotes(instrument_id)
592 .await
593 .map_err(|e| anyhow::anyhow!(e))
594 },
595 "BitMEX quote subscription",
596 );
597 Ok(())
598 }
599
600 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
601 let instrument_id = cmd.instrument_id;
602 let ws = self.ws_client()?.clone();
603
604 self.spawn_ws(
605 async move {
606 ws.subscribe_trades(instrument_id)
607 .await
608 .map_err(|e| anyhow::anyhow!(e))
609 },
610 "BitMEX trade subscription",
611 );
612 Ok(())
613 }
614
615 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
616 let instrument_id = cmd.instrument_id;
617 let ws = self.ws_client()?.clone();
618
619 self.spawn_ws(
620 async move {
621 ws.subscribe_mark_prices(instrument_id)
622 .await
623 .map_err(|e| anyhow::anyhow!(e))
624 },
625 "BitMEX mark price subscription",
626 );
627 Ok(())
628 }
629
630 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
631 let instrument_id = cmd.instrument_id;
632 let ws = self.ws_client()?.clone();
633
634 self.spawn_ws(
635 async move {
636 ws.subscribe_index_prices(instrument_id)
637 .await
638 .map_err(|e| anyhow::anyhow!(e))
639 },
640 "BitMEX index price subscription",
641 );
642 Ok(())
643 }
644
645 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
646 let instrument_id = cmd.instrument_id;
647 let ws = self.ws_client()?.clone();
648
649 self.spawn_ws(
650 async move {
651 ws.subscribe_funding_rates(instrument_id)
652 .await
653 .map_err(|e| anyhow::anyhow!(e))
654 },
655 "BitMEX funding rate subscription",
656 );
657 Ok(())
658 }
659
660 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
661 let bar_type = cmd.bar_type;
662 let ws = self.ws_client()?.clone();
663
664 self.spawn_ws(
665 async move {
666 ws.subscribe_bars(bar_type)
667 .await
668 .map_err(|e| anyhow::anyhow!(e))
669 },
670 "BitMEX bar subscription",
671 );
672 Ok(())
673 }
674
675 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
676 let instrument_id = cmd.instrument_id;
677 let ws = self.ws_client()?.clone();
678 let book_channels = Arc::clone(&self.book_channels);
679
680 self.spawn_ws(
681 async move {
682 let channel = book_channels
683 .write()
684 .expect("book channel cache lock poisoned")
685 .remove(&instrument_id);
686
687 match channel {
688 Some(BitmexBookChannel::OrderBookL2) => ws
689 .unsubscribe_book(instrument_id)
690 .await
691 .map_err(|e| anyhow::anyhow!(e))?,
692 Some(BitmexBookChannel::OrderBookL2_25) => ws
693 .unsubscribe_book_25(instrument_id)
694 .await
695 .map_err(|e| anyhow::anyhow!(e))?,
696 Some(BitmexBookChannel::OrderBook10) => ws
697 .unsubscribe_book_depth10(instrument_id)
698 .await
699 .map_err(|e| anyhow::anyhow!(e))?,
700 None => ws
701 .unsubscribe_book(instrument_id)
702 .await
703 .map_err(|e| anyhow::anyhow!(e))?,
704 }
705 Ok(())
706 },
707 "BitMEX book delta unsubscribe",
708 );
709 Ok(())
710 }
711
712 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
713 let instrument_id = cmd.instrument_id;
714 let ws = self.ws_client()?.clone();
715 let book_channels = Arc::clone(&self.book_channels);
716
717 self.spawn_ws(
718 async move {
719 book_channels
720 .write()
721 .expect("book channel cache lock poisoned")
722 .remove(&instrument_id);
723 ws.unsubscribe_book_depth10(instrument_id)
724 .await
725 .map_err(|e| anyhow::anyhow!(e))
726 },
727 "BitMEX book depth10 unsubscribe",
728 );
729 Ok(())
730 }
731
732 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
733 let instrument_id = cmd.instrument_id;
734 let ws = self.ws_client()?.clone();
735
736 self.spawn_ws(
737 async move {
738 ws.unsubscribe_quotes(instrument_id)
739 .await
740 .map_err(|e| anyhow::anyhow!(e))
741 },
742 "BitMEX quote unsubscribe",
743 );
744 Ok(())
745 }
746
747 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
748 let instrument_id = cmd.instrument_id;
749 let ws = self.ws_client()?.clone();
750
751 self.spawn_ws(
752 async move {
753 ws.unsubscribe_trades(instrument_id)
754 .await
755 .map_err(|e| anyhow::anyhow!(e))
756 },
757 "BitMEX trade unsubscribe",
758 );
759 Ok(())
760 }
761
762 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
763 let ws = self.ws_client()?.clone();
764 let instrument_id = cmd.instrument_id;
765
766 self.spawn_ws(
767 async move {
768 ws.unsubscribe_mark_prices(instrument_id)
769 .await
770 .map_err(|e| anyhow::anyhow!(e))
771 },
772 "BitMEX mark price unsubscribe",
773 );
774 Ok(())
775 }
776
777 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
778 let ws = self.ws_client()?.clone();
779 let instrument_id = cmd.instrument_id;
780
781 self.spawn_ws(
782 async move {
783 ws.unsubscribe_index_prices(instrument_id)
784 .await
785 .map_err(|e| anyhow::anyhow!(e))
786 },
787 "BitMEX index price unsubscribe",
788 );
789 Ok(())
790 }
791
792 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
793 let ws = self.ws_client()?.clone();
794 let instrument_id = cmd.instrument_id;
795
796 self.spawn_ws(
797 async move {
798 ws.unsubscribe_funding_rates(instrument_id)
799 .await
800 .map_err(|e| anyhow::anyhow!(e))
801 },
802 "BitMEX funding rate unsubscribe",
803 );
804 Ok(())
805 }
806
807 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
808 let bar_type = cmd.bar_type;
809 let ws = self.ws_client()?.clone();
810
811 self.spawn_ws(
812 async move {
813 ws.unsubscribe_bars(bar_type)
814 .await
815 .map_err(|e| anyhow::anyhow!(e))
816 },
817 "BitMEX bar unsubscribe",
818 );
819 Ok(())
820 }
821
822 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
823 if let Some(req_venue) = request.venue
824 && req_venue != self.venue()
825 {
826 log::warn!("Ignoring mismatched venue in instruments request: {req_venue}");
827 }
828 let venue = self.venue();
829
830 let http = self.http_client.clone();
831 let instruments_cache = Arc::clone(&self.instruments);
832 let sender = self.data_sender.clone();
833 let request_id = request.request_id;
834 let client_id = request.client_id.unwrap_or(self.client_id);
835 let params = request.params;
836 let start_nanos = datetime_to_unix_nanos(request.start);
837 let end_nanos = datetime_to_unix_nanos(request.end);
838 let clock = self.clock;
839 let active_only = self.config.active_only;
840
841 get_runtime().spawn(async move {
842 let http_client = http;
843 match http_client
844 .request_instruments(active_only)
845 .await
846 .context("failed to request instruments from BitMEX")
847 {
848 Ok(instruments) => {
849 {
850 let mut guard = instruments_cache
851 .write()
852 .expect("instrument cache lock poisoned");
853 guard.clear();
854 for instrument in &instruments {
855 guard.insert(instrument.id(), instrument.clone());
856 http_client.cache_instrument(instrument.clone());
857 }
858 }
859
860 let response = DataResponse::Instruments(InstrumentsResponse::new(
861 request_id,
862 client_id,
863 venue,
864 instruments,
865 start_nanos,
866 end_nanos,
867 clock.get_time_ns(),
868 params,
869 ));
870 if let Err(e) = sender.send(DataEvent::Response(response)) {
871 log::error!("Failed to send instruments response: {e}");
872 }
873 }
874 Err(e) => log::error!("Instrument request failed: {e:?}"),
875 }
876 });
877
878 Ok(())
879 }
880
881 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
882 if let Some(instrument) = self
883 .instruments
884 .read()
885 .expect("instrument cache lock poisoned")
886 .get(&request.instrument_id)
887 .cloned()
888 {
889 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
890 request.request_id,
891 request.client_id.unwrap_or(self.client_id),
892 instrument.id(),
893 instrument,
894 datetime_to_unix_nanos(request.start),
895 datetime_to_unix_nanos(request.end),
896 self.clock.get_time_ns(),
897 request.params,
898 )));
899 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
900 log::error!("Failed to send instrument response: {e}");
901 }
902 return Ok(());
903 }
904
905 let http_client = self.http_client.clone();
906 let instruments_cache = Arc::clone(&self.instruments);
907 let sender = self.data_sender.clone();
908 let instrument_id = request.instrument_id;
909 let request_id = request.request_id;
910 let client_id = request.client_id.unwrap_or(self.client_id);
911 let start = request.start;
912 let end = request.end;
913 let params = request.params;
914 let clock = self.clock;
915
916 get_runtime().spawn(async move {
917 match http_client
918 .request_instrument(instrument_id)
919 .await
920 .context("failed to request instrument from BitMEX")
921 {
922 Ok(Some(instrument)) => {
923 http_client.cache_instrument(instrument.clone());
924 {
925 let mut guard = instruments_cache
926 .write()
927 .expect("instrument cache lock poisoned");
928 guard.insert(instrument.id(), instrument.clone());
929 }
930
931 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
932 request_id,
933 client_id,
934 instrument.id(),
935 instrument,
936 datetime_to_unix_nanos(start),
937 datetime_to_unix_nanos(end),
938 clock.get_time_ns(),
939 params,
940 )));
941 if let Err(e) = sender.send(DataEvent::Response(response)) {
942 log::error!("Failed to send instrument response: {e}");
943 }
944 }
945 Ok(None) => log::warn!("BitMEX instrument {instrument_id} not found"),
946 Err(e) => log::error!("Instrument request failed: {e:?}"),
947 }
948 });
949
950 Ok(())
951 }
952
953 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
954 let http = self.http_client.clone();
955 let sender = self.data_sender.clone();
956 let instrument_id = request.instrument_id;
957 let start = request.start;
958 let end = request.end;
959 let limit = request.limit.map(|n| n.get() as u32);
960 let request_id = request.request_id;
961 let client_id = request.client_id.unwrap_or(self.client_id);
962 let params = request.params;
963 let clock = self.clock;
964 let start_nanos = datetime_to_unix_nanos(start);
965 let end_nanos = datetime_to_unix_nanos(end);
966
967 get_runtime().spawn(async move {
968 match http
969 .request_trades(instrument_id, start, end, limit)
970 .await
971 .context("failed to request trades from BitMEX")
972 {
973 Ok(trades) => {
974 let response = DataResponse::Trades(TradesResponse::new(
975 request_id,
976 client_id,
977 instrument_id,
978 trades,
979 start_nanos,
980 end_nanos,
981 clock.get_time_ns(),
982 params,
983 ));
984 if let Err(e) = sender.send(DataEvent::Response(response)) {
985 log::error!("Failed to send trades response: {e}");
986 }
987 }
988 Err(e) => log::error!("Trade request failed: {e:?}"),
989 }
990 });
991
992 Ok(())
993 }
994
995 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
996 let http = self.http_client.clone();
997 let sender = self.data_sender.clone();
998 let bar_type = request.bar_type;
999 let start = request.start;
1000 let end = request.end;
1001 let limit = request.limit.map(|n| n.get() as u32);
1002 let request_id = request.request_id;
1003 let client_id = request.client_id.unwrap_or(self.client_id);
1004 let params = request.params;
1005 let clock = self.clock;
1006 let start_nanos = datetime_to_unix_nanos(start);
1007 let end_nanos = datetime_to_unix_nanos(end);
1008
1009 get_runtime().spawn(async move {
1010 match http
1011 .request_bars(bar_type, start, end, limit, false)
1012 .await
1013 .context("failed to request bars from BitMEX")
1014 {
1015 Ok(bars) => {
1016 let response = DataResponse::Bars(BarsResponse::new(
1017 request_id,
1018 client_id,
1019 bar_type,
1020 bars,
1021 start_nanos,
1022 end_nanos,
1023 clock.get_time_ns(),
1024 params,
1025 ));
1026 if let Err(e) = sender.send(DataEvent::Response(response)) {
1027 log::error!("Failed to send bars response: {e}");
1028 }
1029 }
1030 Err(e) => log::error!("Bar request failed: {e:?}"),
1031 }
1032 });
1033
1034 Ok(())
1035 }
1036}