1use std::{
19 future::Future,
20 sync::{
21 Arc, RwLock,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use chrono::{DateTime, Utc};
29use futures_util::StreamExt;
30use nautilus_common::{
31 messages::{
32 DataEvent,
33 data::{
34 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
35 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
36 SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots,
37 SubscribeFundingRates, SubscribeIndexPrices, SubscribeMarkPrices, SubscribeQuotes,
38 SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
39 UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeFundingRates,
40 UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
41 },
42 },
43 runner::get_data_event_sender,
44};
45use nautilus_core::{
46 UnixNanos,
47 time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_data::client::DataClient;
50use nautilus_model::{
51 data::Data,
52 enums::BookType,
53 identifiers::{ClientId, InstrumentId, Venue},
54 instruments::{Instrument, InstrumentAny},
55};
56use tokio::{task::JoinHandle, time::Duration};
57use tokio_util::sync::CancellationToken;
58
59use crate::{
60 common::consts::BITMEX_VENUE,
61 config::BitmexDataClientConfig,
62 http::client::BitmexHttpClient,
63 websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
64};
65
66#[derive(Clone, Copy, Debug, Eq, PartialEq)]
67enum BitmexBookChannel {
68 OrderBookL2,
69 OrderBookL2_25,
70 OrderBook10,
71}
72
73#[derive(Debug)]
74pub struct BitmexDataClient {
75 client_id: ClientId,
76 config: BitmexDataClientConfig,
77 http_client: BitmexHttpClient,
78 ws_client: Option<BitmexWebSocketClient>,
79 is_connected: AtomicBool,
80 cancellation_token: CancellationToken,
81 tasks: Vec<JoinHandle<()>>,
82 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
83 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
84 book_channels: Arc<RwLock<AHashMap<InstrumentId, BitmexBookChannel>>>,
85 clock: &'static AtomicTime,
86 instrument_refresh_active: bool,
87}
88
89impl BitmexDataClient {
90 pub fn new(client_id: ClientId, config: BitmexDataClientConfig) -> anyhow::Result<Self> {
96 let clock = get_atomic_clock_realtime();
97 let data_sender = get_data_event_sender();
98
99 let http_client = BitmexHttpClient::new(
100 Some(config.http_base_url()),
101 config.api_key.clone(),
102 config.api_secret.clone(),
103 config.use_testnet,
104 config.http_timeout_secs,
105 config.max_retries,
106 config.retry_delay_initial_ms,
107 config.retry_delay_max_ms,
108 config.recv_window_ms,
109 config.max_requests_per_second,
110 config.max_requests_per_minute,
111 config.http_proxy_url.clone(),
112 )
113 .context("failed to construct BitMEX HTTP client")?;
114
115 Ok(Self {
116 client_id,
117 config,
118 http_client,
119 ws_client: None,
120 is_connected: AtomicBool::new(false),
121 cancellation_token: CancellationToken::new(),
122 tasks: Vec::new(),
123 data_sender,
124 instruments: Arc::new(RwLock::new(AHashMap::new())),
125 book_channels: Arc::new(RwLock::new(AHashMap::new())),
126 clock,
127 instrument_refresh_active: false,
128 })
129 }
130
131 fn venue(&self) -> Venue {
132 *BITMEX_VENUE
133 }
134
135 fn ws_client(&self) -> anyhow::Result<&BitmexWebSocketClient> {
136 self.ws_client
137 .as_ref()
138 .context("websocket client not initialized; call connect first")
139 }
140
141 fn ws_client_mut(&mut self) -> anyhow::Result<&mut BitmexWebSocketClient> {
142 self.ws_client
143 .as_mut()
144 .context("websocket client not initialized; call connect first")
145 }
146
147 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
148 if let Err(e) = sender.send(DataEvent::Data(data)) {
149 tracing::error!("Failed to emit data event: {e}");
150 }
151 }
152
153 fn spawn_ws<F>(&self, fut: F, context: &'static str)
154 where
155 F: Future<Output = anyhow::Result<()>> + Send + 'static,
156 {
157 tokio::spawn(async move {
158 if let Err(e) = fut.await {
159 tracing::error!("{context}: {e:?}");
160 }
161 });
162 }
163
164 fn spawn_stream_task(
165 &mut self,
166 stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
167 ) -> anyhow::Result<()> {
168 let data_sender = self.data_sender.clone();
169 let instruments = Arc::clone(&self.instruments);
170 let cancellation = self.cancellation_token.clone();
171
172 let handle = tokio::spawn(async move {
173 tokio::pin!(stream);
174
175 loop {
176 tokio::select! {
177 maybe_msg = stream.next() => {
178 match maybe_msg {
179 Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
180 None => {
181 tracing::debug!("BitMEX websocket stream ended");
182 break;
183 }
184 }
185 }
186 _ = cancellation.cancelled() => {
187 tracing::debug!("BitMEX websocket stream task cancelled");
188 break;
189 }
190 }
191 }
192 });
193
194 self.tasks.push(handle);
195 Ok(())
196 }
197
198 fn handle_ws_message(
199 message: NautilusWsMessage,
200 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
201 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
202 ) {
203 match message {
204 NautilusWsMessage::Data(payloads) => {
205 for data in payloads {
206 Self::send_data(sender, data);
207 }
208 }
209 NautilusWsMessage::Instruments(insts) => {
210 let mut guard = instruments.write().expect("instrument cache lock poisoned");
211 for instrument in insts {
212 let instrument_id = instrument.id();
213 guard.insert(instrument_id, instrument);
214 }
215 let _ = sender;
217 }
218 NautilusWsMessage::FundingRateUpdates(updates) => {
219 for update in updates {
220 tracing::debug!(
221 instrument = %update.instrument_id,
222 rate = %update.rate,
223 "Funding rate update received (not forwarded)",
224 );
225 }
226 }
227 NautilusWsMessage::OrderStatusReports(_)
228 | NautilusWsMessage::OrderUpdated(_)
229 | NautilusWsMessage::FillReports(_)
230 | NautilusWsMessage::PositionStatusReport(_)
231 | NautilusWsMessage::AccountState(_) => {
232 tracing::debug!("Ignoring trading message on data client");
233 }
234 NautilusWsMessage::Reconnected => {
235 tracing::info!("BitMEX websocket reconnected");
236 }
237 NautilusWsMessage::Authenticated => {
238 tracing::debug!("BitMEX websocket authenticated");
239 }
240 }
241 }
242
243 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
244 let http = self.http_client.clone();
245 let mut instruments = http
246 .request_instruments(self.config.active_only)
247 .await
248 .context("failed to request BitMEX instruments")?;
249
250 instruments.sort_by_key(|instrument| instrument.id());
251
252 {
253 let mut guard = self
254 .instruments
255 .write()
256 .expect("instrument cache lock poisoned");
257 guard.clear();
258 for instrument in &instruments {
259 guard.insert(instrument.id(), instrument.clone());
260 }
261 }
262
263 for instrument in &instruments {
264 self.http_client.cache_instrument(instrument.clone());
265 }
266
267 Ok(instruments)
268 }
269
270 fn is_connected(&self) -> bool {
271 self.is_connected.load(Ordering::Relaxed)
272 }
273
274 fn is_disconnected(&self) -> bool {
275 !self.is_connected()
276 }
277
278 fn maybe_spawn_instrument_refresh(&mut self) -> anyhow::Result<()> {
279 let Some(minutes) = self.config.update_instruments_interval_mins else {
280 return Ok(());
281 };
282
283 if minutes == 0 || self.instrument_refresh_active {
284 return Ok(());
285 }
286
287 let interval_secs = minutes.saturating_mul(60);
288 if interval_secs == 0 {
289 return Ok(());
290 }
291
292 let interval = Duration::from_secs(interval_secs);
293 let cancellation = self.cancellation_token.clone();
294 let instruments_cache = Arc::clone(&self.instruments);
295 let active_only = self.config.active_only;
296 let client_id = self.client_id;
297 let http_client = self.http_client.clone();
298
299 let handle = tokio::spawn(async move {
300 let http_client = http_client;
301 loop {
302 let sleep = tokio::time::sleep(interval);
303 tokio::pin!(sleep);
304 tokio::select! {
305 _ = cancellation.cancelled() => {
306 tracing::debug!("BitMEX instrument refresh task cancelled");
307 break;
308 }
309 _ = &mut sleep => {
310 match http_client.request_instruments(active_only).await {
311 Ok(mut instruments) => {
312 instruments.sort_by_key(|instrument| instrument.id());
313
314 {
315 let mut guard = instruments_cache
316 .write()
317 .expect("instrument cache lock poisoned");
318 guard.clear();
319 for instrument in instruments.iter() {
320 guard.insert(instrument.id(), instrument.clone());
321 }
322 }
323
324 for instrument in instruments {
325 http_client.cache_instrument(instrument);
326 }
327
328 tracing::debug!(client_id=%client_id, "BitMEX instruments refreshed");
329 }
330 Err(e) => {
331 tracing::warn!(client_id=%client_id, error=?e, "Failed to refresh BitMEX instruments");
332 }
333 }
334 }
335 }
336 }
337 });
338
339 self.tasks.push(handle);
340 self.instrument_refresh_active = true;
341 Ok(())
342 }
343}
344
345fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
346 value
347 .and_then(|dt| dt.timestamp_nanos_opt())
348 .and_then(|nanos| u64::try_from(nanos).ok())
349 .map(UnixNanos::from)
350}
351
352#[async_trait::async_trait]
353impl DataClient for BitmexDataClient {
354 fn client_id(&self) -> ClientId {
355 self.client_id
356 }
357
358 fn venue(&self) -> Option<Venue> {
359 Some(self.venue())
360 }
361
362 fn start(&mut self) -> anyhow::Result<()> {
363 tracing::info!(
364 client_id = %self.client_id,
365 use_testnet = self.config.use_testnet,
366 http_proxy_url = ?self.config.http_proxy_url,
367 ws_proxy_url = ?self.config.ws_proxy_url,
368 "Starting BitMEX data client"
369 );
370 Ok(())
371 }
372
373 fn stop(&mut self) -> anyhow::Result<()> {
374 tracing::info!("Stopping BitMEX data client {id}", id = self.client_id);
375 self.cancellation_token.cancel();
376 self.is_connected.store(false, Ordering::Relaxed);
377 self.instrument_refresh_active = false;
378 Ok(())
379 }
380
381 fn reset(&mut self) -> anyhow::Result<()> {
382 tracing::debug!("Resetting BitMEX data client {id}", id = self.client_id);
383 self.is_connected.store(false, Ordering::Relaxed);
384 self.cancellation_token = CancellationToken::new();
385 self.tasks.clear();
386 self.book_channels
387 .write()
388 .expect("book channel cache lock poisoned")
389 .clear();
390 self.instrument_refresh_active = false;
391 Ok(())
392 }
393
394 fn dispose(&mut self) -> anyhow::Result<()> {
395 self.stop()
396 }
397
398 async fn connect(&mut self) -> anyhow::Result<()> {
399 if self.is_connected() {
400 return Ok(());
401 }
402
403 if self.ws_client.is_none() {
404 let ws = BitmexWebSocketClient::new(
405 Some(self.config.ws_url()),
406 self.config.api_key.clone(),
407 self.config.api_secret.clone(),
408 None,
409 self.config.heartbeat_interval_secs,
410 )
411 .context("failed to construct BitMEX websocket client")?;
412 self.ws_client = Some(ws);
413 }
414
415 let instruments = self.bootstrap_instruments().await?;
416 if let Some(ws) = self.ws_client.as_mut() {
417 ws.cache_instruments(instruments);
418 }
419
420 let ws = self.ws_client_mut()?;
421 ws.connect()
422 .await
423 .context("failed to connect BitMEX websocket")?;
424 ws.wait_until_active(10.0)
425 .await
426 .context("BitMEX websocket did not become active")?;
427
428 let stream = ws.stream();
429 self.spawn_stream_task(stream)?;
430 self.maybe_spawn_instrument_refresh()?;
431
432 self.is_connected.store(true, Ordering::Relaxed);
433 tracing::info!("Connected");
434 Ok(())
435 }
436
437 async fn disconnect(&mut self) -> anyhow::Result<()> {
438 if self.is_disconnected() {
439 return Ok(());
440 }
441
442 self.cancellation_token.cancel();
443
444 if let Some(ws) = self.ws_client.as_mut()
445 && let Err(e) = ws.close().await
446 {
447 tracing::warn!("Error while closing BitMEX websocket: {e:?}");
448 }
449
450 for handle in self.tasks.drain(..) {
451 if let Err(e) = handle.await {
452 tracing::error!("Error joining websocket task: {e:?}");
453 }
454 }
455
456 self.cancellation_token = CancellationToken::new();
457 self.is_connected.store(false, Ordering::Relaxed);
458 self.book_channels
459 .write()
460 .expect("book channel cache lock poisoned")
461 .clear();
462 self.instrument_refresh_active = false;
463
464 tracing::info!("Disconnected");
465 Ok(())
466 }
467
468 fn is_connected(&self) -> bool {
469 self.is_connected()
470 }
471
472 fn is_disconnected(&self) -> bool {
473 self.is_disconnected()
474 }
475
476 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
477 if cmd.book_type != BookType::L2_MBP {
478 anyhow::bail!("BitMEX only supports L2_MBP order book deltas");
479 }
480
481 let instrument_id = cmd.instrument_id;
482 let depth = cmd.depth.map_or(0, |d| d.get());
483 let channel = if depth > 0 && depth <= 25 {
484 BitmexBookChannel::OrderBookL2_25
485 } else {
486 BitmexBookChannel::OrderBookL2
487 };
488
489 let ws = self.ws_client()?.clone();
490 let book_channels = Arc::clone(&self.book_channels);
491
492 self.spawn_ws(
493 async move {
494 match channel {
495 BitmexBookChannel::OrderBookL2 => ws
496 .subscribe_book(instrument_id)
497 .await
498 .map_err(|err| anyhow::anyhow!(err))?,
499 BitmexBookChannel::OrderBookL2_25 => ws
500 .subscribe_book_25(instrument_id)
501 .await
502 .map_err(|err| anyhow::anyhow!(err))?,
503 BitmexBookChannel::OrderBook10 => unreachable!(),
504 }
505 book_channels
506 .write()
507 .expect("book channel cache lock poisoned")
508 .insert(instrument_id, channel);
509 Ok(())
510 },
511 "BitMEX book delta subscription",
512 );
513
514 Ok(())
515 }
516
517 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
518 let instrument_id = cmd.instrument_id;
519 let ws = self.ws_client()?.clone();
520 let book_channels = Arc::clone(&self.book_channels);
521
522 self.spawn_ws(
523 async move {
524 ws.subscribe_book_depth10(instrument_id)
525 .await
526 .map_err(|err| anyhow::anyhow!(err))?;
527 book_channels
528 .write()
529 .expect("book channel cache lock poisoned")
530 .insert(instrument_id, BitmexBookChannel::OrderBook10);
531 Ok(())
532 },
533 "BitMEX book depth10 subscription",
534 );
535 Ok(())
536 }
537
538 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
539 if cmd.book_type != BookType::L2_MBP {
540 anyhow::bail!("BitMEX only supports L2_MBP order book snapshots");
541 }
542
543 let depth = cmd.depth.map_or(10, |d| d.get());
544 if depth != 10 {
545 tracing::warn!("BitMEX orderBook10 provides 10 levels; requested depth={depth}");
546 }
547
548 let instrument_id = cmd.instrument_id;
549 let ws = self.ws_client()?.clone();
550 let book_channels = Arc::clone(&self.book_channels);
551
552 self.spawn_ws(
553 async move {
554 ws.subscribe_book_depth10(instrument_id)
555 .await
556 .map_err(|err| anyhow::anyhow!(err))?;
557 book_channels
558 .write()
559 .expect("book channel cache lock poisoned")
560 .insert(instrument_id, BitmexBookChannel::OrderBook10);
561 Ok(())
562 },
563 "BitMEX book snapshot subscription",
564 );
565 Ok(())
566 }
567
568 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
569 let instrument_id = cmd.instrument_id;
570 let ws = self.ws_client()?.clone();
571
572 self.spawn_ws(
573 async move {
574 ws.subscribe_quotes(instrument_id)
575 .await
576 .map_err(|err| anyhow::anyhow!(err))
577 },
578 "BitMEX quote subscription",
579 );
580 Ok(())
581 }
582
583 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
584 let instrument_id = cmd.instrument_id;
585 let ws = self.ws_client()?.clone();
586
587 self.spawn_ws(
588 async move {
589 ws.subscribe_trades(instrument_id)
590 .await
591 .map_err(|err| anyhow::anyhow!(err))
592 },
593 "BitMEX trade subscription",
594 );
595 Ok(())
596 }
597
598 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
599 let instrument_id = cmd.instrument_id;
600 let ws = self.ws_client()?.clone();
601
602 self.spawn_ws(
603 async move {
604 ws.subscribe_mark_prices(instrument_id)
605 .await
606 .map_err(|err| anyhow::anyhow!(err))
607 },
608 "BitMEX mark price subscription",
609 );
610 Ok(())
611 }
612
613 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
614 let instrument_id = cmd.instrument_id;
615 let ws = self.ws_client()?.clone();
616
617 self.spawn_ws(
618 async move {
619 ws.subscribe_index_prices(instrument_id)
620 .await
621 .map_err(|err| anyhow::anyhow!(err))
622 },
623 "BitMEX index price subscription",
624 );
625 Ok(())
626 }
627
628 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
629 let instrument_id = cmd.instrument_id;
630 let ws = self.ws_client()?.clone();
631
632 self.spawn_ws(
633 async move {
634 ws.subscribe_funding_rates(instrument_id)
635 .await
636 .map_err(|err| anyhow::anyhow!(err))
637 },
638 "BitMEX funding rate subscription",
639 );
640 Ok(())
641 }
642
643 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
644 let bar_type = cmd.bar_type;
645 let ws = self.ws_client()?.clone();
646
647 self.spawn_ws(
648 async move {
649 ws.subscribe_bars(bar_type)
650 .await
651 .map_err(|err| anyhow::anyhow!(err))
652 },
653 "BitMEX bar subscription",
654 );
655 Ok(())
656 }
657
658 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
659 let instrument_id = cmd.instrument_id;
660 let ws = self.ws_client()?.clone();
661 let book_channels = Arc::clone(&self.book_channels);
662
663 self.spawn_ws(
664 async move {
665 let channel = book_channels
666 .write()
667 .expect("book channel cache lock poisoned")
668 .remove(&instrument_id);
669
670 match channel {
671 Some(BitmexBookChannel::OrderBookL2) => ws
672 .unsubscribe_book(instrument_id)
673 .await
674 .map_err(|err| anyhow::anyhow!(err))?,
675 Some(BitmexBookChannel::OrderBookL2_25) => ws
676 .unsubscribe_book_25(instrument_id)
677 .await
678 .map_err(|err| anyhow::anyhow!(err))?,
679 Some(BitmexBookChannel::OrderBook10) | None => ws
680 .unsubscribe_book(instrument_id)
681 .await
682 .map_err(|err| anyhow::anyhow!(err))?,
683 }
684 Ok(())
685 },
686 "BitMEX book delta unsubscribe",
687 );
688 Ok(())
689 }
690
691 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
692 let instrument_id = cmd.instrument_id;
693 let ws = self.ws_client()?.clone();
694 let book_channels = Arc::clone(&self.book_channels);
695
696 self.spawn_ws(
697 async move {
698 book_channels
699 .write()
700 .expect("book channel cache lock poisoned")
701 .remove(&instrument_id);
702 ws.unsubscribe_book_depth10(instrument_id)
703 .await
704 .map_err(|err| anyhow::anyhow!(err))
705 },
706 "BitMEX book depth10 unsubscribe",
707 );
708 Ok(())
709 }
710
711 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
712 let instrument_id = cmd.instrument_id;
713 let ws = self.ws_client()?.clone();
714 let book_channels = Arc::clone(&self.book_channels);
715
716 self.spawn_ws(
717 async move {
718 book_channels
719 .write()
720 .expect("book channel cache lock poisoned")
721 .remove(&instrument_id);
722 ws.unsubscribe_book_depth10(instrument_id)
723 .await
724 .map_err(|err| anyhow::anyhow!(err))
725 },
726 "BitMEX book snapshot unsubscribe",
727 );
728 Ok(())
729 }
730
731 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
732 let instrument_id = cmd.instrument_id;
733 let ws = self.ws_client()?.clone();
734
735 self.spawn_ws(
736 async move {
737 ws.unsubscribe_quotes(instrument_id)
738 .await
739 .map_err(|err| anyhow::anyhow!(err))
740 },
741 "BitMEX quote unsubscribe",
742 );
743 Ok(())
744 }
745
746 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
747 let instrument_id = cmd.instrument_id;
748 let ws = self.ws_client()?.clone();
749
750 self.spawn_ws(
751 async move {
752 ws.unsubscribe_trades(instrument_id)
753 .await
754 .map_err(|err| anyhow::anyhow!(err))
755 },
756 "BitMEX trade unsubscribe",
757 );
758 Ok(())
759 }
760
761 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
762 let ws = self.ws_client()?.clone();
763 let instrument_id = cmd.instrument_id;
764
765 self.spawn_ws(
766 async move {
767 ws.unsubscribe_mark_prices(instrument_id)
768 .await
769 .map_err(|err| anyhow::anyhow!(err))
770 },
771 "BitMEX mark price unsubscribe",
772 );
773 Ok(())
774 }
775
776 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
777 let ws = self.ws_client()?.clone();
778 let instrument_id = cmd.instrument_id;
779
780 self.spawn_ws(
781 async move {
782 ws.unsubscribe_index_prices(instrument_id)
783 .await
784 .map_err(|err| anyhow::anyhow!(err))
785 },
786 "BitMEX index price unsubscribe",
787 );
788 Ok(())
789 }
790
791 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
792 let ws = self.ws_client()?.clone();
793 let instrument_id = cmd.instrument_id;
794
795 self.spawn_ws(
796 async move {
797 ws.unsubscribe_funding_rates(instrument_id)
798 .await
799 .map_err(|err| anyhow::anyhow!(err))
800 },
801 "BitMEX funding rate unsubscribe",
802 );
803 Ok(())
804 }
805
806 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
807 let bar_type = cmd.bar_type;
808 let ws = self.ws_client()?.clone();
809
810 self.spawn_ws(
811 async move {
812 ws.unsubscribe_bars(bar_type)
813 .await
814 .map_err(|err| anyhow::anyhow!(err))
815 },
816 "BitMEX bar unsubscribe",
817 );
818 Ok(())
819 }
820
821 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
822 let venue = request.venue.unwrap_or_else(|| self.venue());
823 if let Some(req_venue) = request.venue
824 && req_venue != self.venue()
825 {
826 tracing::warn!("Ignoring mismatched venue in instruments request: {req_venue}");
827 }
828
829 let http = self.http_client.clone();
830 let instruments_cache = Arc::clone(&self.instruments);
831 let sender = self.data_sender.clone();
832 let request_id = request.request_id;
833 let client_id = request.client_id.unwrap_or(self.client_id);
834 let params = request.params.clone();
835 let start_nanos = datetime_to_unix_nanos(request.start);
836 let end_nanos = datetime_to_unix_nanos(request.end);
837 let clock = self.clock;
838 let active_only = self.config.active_only;
839
840 tokio::spawn(async move {
841 let http_client = http;
842 match http_client
843 .request_instruments(active_only)
844 .await
845 .context("failed to request instruments from BitMEX")
846 {
847 Ok(instruments) => {
848 {
849 let mut guard = instruments_cache
850 .write()
851 .expect("instrument cache lock poisoned");
852 guard.clear();
853 for instrument in &instruments {
854 guard.insert(instrument.id(), instrument.clone());
855 http_client.cache_instrument(instrument.clone());
856 }
857 }
858
859 let response = DataResponse::Instruments(InstrumentsResponse::new(
860 request_id,
861 client_id,
862 venue,
863 instruments,
864 start_nanos,
865 end_nanos,
866 clock.get_time_ns(),
867 params,
868 ));
869 if let Err(e) = sender.send(DataEvent::Response(response)) {
870 tracing::error!("Failed to send instruments response: {e}");
871 }
872 }
873 Err(e) => tracing::error!("Instrument request failed: {e:?}"),
874 }
875 });
876
877 Ok(())
878 }
879
880 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
881 if let Some(instrument) = self
882 .instruments
883 .read()
884 .expect("instrument cache lock poisoned")
885 .get(&request.instrument_id)
886 .cloned()
887 {
888 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
889 request.request_id,
890 request.client_id.unwrap_or(self.client_id),
891 instrument.id(),
892 instrument,
893 datetime_to_unix_nanos(request.start),
894 datetime_to_unix_nanos(request.end),
895 self.clock.get_time_ns(),
896 request.params.clone(),
897 )));
898 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
899 tracing::error!("Failed to send instrument response: {e}");
900 }
901 return Ok(());
902 }
903
904 let http_client = self.http_client.clone();
905 let instruments_cache = Arc::clone(&self.instruments);
906 let sender = self.data_sender.clone();
907 let instrument_id = request.instrument_id;
908 let request_id = request.request_id;
909 let client_id = request.client_id.unwrap_or(self.client_id);
910 let start = request.start;
911 let end = request.end;
912 let params = request.params.clone();
913 let clock = self.clock;
914
915 tokio::spawn(async move {
916 match http_client
917 .request_instrument(instrument_id)
918 .await
919 .context("failed to request instrument from BitMEX")
920 {
921 Ok(Some(instrument)) => {
922 http_client.cache_instrument(instrument.clone());
923 {
924 let mut guard = instruments_cache
925 .write()
926 .expect("instrument cache lock poisoned");
927 guard.insert(instrument.id(), instrument.clone());
928 }
929
930 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
931 request_id,
932 client_id,
933 instrument.id(),
934 instrument,
935 datetime_to_unix_nanos(start),
936 datetime_to_unix_nanos(end),
937 clock.get_time_ns(),
938 params,
939 )));
940 if let Err(e) = sender.send(DataEvent::Response(response)) {
941 tracing::error!("Failed to send instrument response: {e}");
942 }
943 }
944 Ok(None) => tracing::warn!("BitMEX instrument {instrument_id} not found"),
945 Err(e) => tracing::error!("Instrument request failed: {e:?}"),
946 }
947 });
948
949 Ok(())
950 }
951
952 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
953 let http = self.http_client.clone();
954 let sender = self.data_sender.clone();
955 let instrument_id = request.instrument_id;
956 let start = request.start;
957 let end = request.end;
958 let limit = request.limit.map(|n| n.get() as u32);
959 let request_id = request.request_id;
960 let client_id = request.client_id.unwrap_or(self.client_id);
961 let params = request.params.clone();
962 let clock = self.clock;
963 let start_nanos = datetime_to_unix_nanos(start);
964 let end_nanos = datetime_to_unix_nanos(end);
965
966 tokio::spawn(async move {
967 match http
968 .request_trades(instrument_id, start, end, limit)
969 .await
970 .context("failed to request trades from BitMEX")
971 {
972 Ok(trades) => {
973 let response = DataResponse::Trades(TradesResponse::new(
974 request_id,
975 client_id,
976 instrument_id,
977 trades,
978 start_nanos,
979 end_nanos,
980 clock.get_time_ns(),
981 params,
982 ));
983 if let Err(e) = sender.send(DataEvent::Response(response)) {
984 tracing::error!("Failed to send trades response: {e}");
985 }
986 }
987 Err(e) => tracing::error!("Trade request failed: {e:?}"),
988 }
989 });
990
991 Ok(())
992 }
993
994 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
995 let http = self.http_client.clone();
996 let sender = self.data_sender.clone();
997 let bar_type = request.bar_type;
998 let start = request.start;
999 let end = request.end;
1000 let limit = request.limit.map(|n| n.get() as u32);
1001 let request_id = request.request_id;
1002 let client_id = request.client_id.unwrap_or(self.client_id);
1003 let params = request.params.clone();
1004 let clock = self.clock;
1005 let start_nanos = datetime_to_unix_nanos(start);
1006 let end_nanos = datetime_to_unix_nanos(end);
1007
1008 tokio::spawn(async move {
1009 match http
1010 .request_bars(bar_type, start, end, limit, false)
1011 .await
1012 .context("failed to request bars from BitMEX")
1013 {
1014 Ok(bars) => {
1015 let response = DataResponse::Bars(BarsResponse::new(
1016 request_id,
1017 client_id,
1018 bar_type,
1019 bars,
1020 start_nanos,
1021 end_nanos,
1022 clock.get_time_ns(),
1023 params,
1024 ));
1025 if let Err(e) = sender.send(DataEvent::Response(response)) {
1026 tracing::error!("Failed to send bars response: {e}");
1027 }
1028 }
1029 Err(e) => tracing::error!("Bar request failed: {e:?}"),
1030 }
1031 });
1032
1033 Ok(())
1034 }
1035}