1use std::{
19 future::Future,
20 sync::{
21 Arc, RwLock,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use futures_util::StreamExt;
29use nautilus_common::{
30 clients::DataClient,
31 live::{runner::get_data_event_sender, runtime::get_runtime},
32 messages::{
33 DataEvent,
34 data::{
35 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
36 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
37 SubscribeBookDeltas, SubscribeBookDepth10, SubscribeFundingRates, SubscribeIndexPrices,
38 SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
39 UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeFundingRates,
40 UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
41 },
42 },
43};
44use nautilus_core::{
45 datetime::datetime_to_unix_nanos,
46 time::{AtomicTime, get_atomic_clock_realtime},
47};
48use nautilus_model::{
49 data::Data,
50 enums::BookType,
51 identifiers::{ClientId, InstrumentId, Venue},
52 instruments::{Instrument, InstrumentAny},
53};
54use tokio::{task::JoinHandle, time::Duration};
55use tokio_util::sync::CancellationToken;
56
57use crate::{
58 common::consts::BITMEX_VENUE,
59 config::BitmexDataClientConfig,
60 http::client::BitmexHttpClient,
61 websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
62};
63
64#[derive(Clone, Copy, Debug, Eq, PartialEq)]
65enum BitmexBookChannel {
66 OrderBookL2,
67 OrderBookL2_25,
68 OrderBook10,
69}
70
71#[derive(Debug)]
72pub struct BitmexDataClient {
73 client_id: ClientId,
74 config: BitmexDataClientConfig,
75 http_client: BitmexHttpClient,
76 ws_client: Option<BitmexWebSocketClient>,
77 is_connected: AtomicBool,
78 cancellation_token: CancellationToken,
79 tasks: Vec<JoinHandle<()>>,
80 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
81 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
82 book_channels: Arc<RwLock<AHashMap<InstrumentId, BitmexBookChannel>>>,
83 clock: &'static AtomicTime,
84 instrument_refresh_active: bool,
85}
86
87impl BitmexDataClient {
88 pub fn new(client_id: ClientId, config: BitmexDataClientConfig) -> anyhow::Result<Self> {
94 let clock = get_atomic_clock_realtime();
95 let data_sender = get_data_event_sender();
96
97 let http_client = BitmexHttpClient::new(
98 Some(config.http_base_url()),
99 config.api_key.clone(),
100 config.api_secret.clone(),
101 config.use_testnet,
102 config.http_timeout_secs,
103 config.max_retries,
104 config.retry_delay_initial_ms,
105 config.retry_delay_max_ms,
106 config.recv_window_ms,
107 config.max_requests_per_second,
108 config.max_requests_per_minute,
109 config.http_proxy_url.clone(),
110 )
111 .context("failed to construct BitMEX HTTP client")?;
112
113 Ok(Self {
114 client_id,
115 config,
116 http_client,
117 ws_client: None,
118 is_connected: AtomicBool::new(false),
119 cancellation_token: CancellationToken::new(),
120 tasks: Vec::new(),
121 data_sender,
122 instruments: Arc::new(RwLock::new(AHashMap::new())),
123 book_channels: Arc::new(RwLock::new(AHashMap::new())),
124 clock,
125 instrument_refresh_active: false,
126 })
127 }
128
129 fn venue(&self) -> Venue {
130 *BITMEX_VENUE
131 }
132
133 fn ws_client(&self) -> anyhow::Result<&BitmexWebSocketClient> {
134 self.ws_client
135 .as_ref()
136 .context("websocket client not initialized; call connect first")
137 }
138
139 fn ws_client_mut(&mut self) -> anyhow::Result<&mut BitmexWebSocketClient> {
140 self.ws_client
141 .as_mut()
142 .context("websocket client not initialized; call connect first")
143 }
144
145 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
146 if let Err(e) = sender.send(DataEvent::Data(data)) {
147 log::error!("Failed to emit data event: {e}");
148 }
149 }
150
151 fn spawn_ws<F>(&self, fut: F, context: &'static str)
152 where
153 F: Future<Output = anyhow::Result<()>> + Send + 'static,
154 {
155 get_runtime().spawn(async move {
156 if let Err(e) = fut.await {
157 log::error!("{context}: {e:?}");
158 }
159 });
160 }
161
162 fn spawn_stream_task(
163 &mut self,
164 stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
165 ) -> anyhow::Result<()> {
166 let data_sender = self.data_sender.clone();
167 let instruments = Arc::clone(&self.instruments);
168 let cancellation = self.cancellation_token.clone();
169
170 let handle = get_runtime().spawn(async move {
171 tokio::pin!(stream);
172
173 loop {
174 tokio::select! {
175 maybe_msg = stream.next() => {
176 match maybe_msg {
177 Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
178 None => {
179 log::debug!("BitMEX websocket stream ended");
180 break;
181 }
182 }
183 }
184 () = cancellation.cancelled() => {
185 log::debug!("BitMEX websocket stream task cancelled");
186 break;
187 }
188 }
189 }
190 });
191
192 self.tasks.push(handle);
193 Ok(())
194 }
195
196 fn handle_ws_message(
197 message: NautilusWsMessage,
198 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
199 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
200 ) {
201 match message {
202 NautilusWsMessage::Data(payloads) => {
203 for data in payloads {
204 Self::send_data(sender, data);
205 }
206 }
207 NautilusWsMessage::Instruments(insts) => {
208 let mut guard = instruments.write().expect("instrument cache lock poisoned");
209 for instrument in insts {
210 let instrument_id = instrument.id();
211 guard.insert(instrument_id, instrument);
212 }
213 let _ = sender;
215 }
216 NautilusWsMessage::FundingRateUpdates(updates) => {
217 for update in updates {
218 log::debug!(
219 "Funding rate update received (not forwarded): instrument={}, rate={}",
220 update.instrument_id,
221 update.rate,
222 );
223 }
224 }
225 NautilusWsMessage::OrderStatusReports(_)
226 | NautilusWsMessage::OrderUpdated(_)
227 | NautilusWsMessage::FillReports(_)
228 | NautilusWsMessage::PositionStatusReport(_)
229 | NautilusWsMessage::AccountState(_) => {
230 log::debug!("Ignoring trading message on data client");
231 }
232 NautilusWsMessage::Reconnected => {
233 log::info!("BitMEX websocket reconnected");
234 }
235 NautilusWsMessage::Authenticated => {
236 log::debug!("BitMEX websocket authenticated");
237 }
238 }
239 }
240
241 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
242 let http = self.http_client.clone();
243 let mut instruments = http
244 .request_instruments(self.config.active_only)
245 .await
246 .context("failed to request BitMEX instruments")?;
247
248 instruments.sort_by_key(|instrument| instrument.id());
249
250 {
251 let mut guard = self
252 .instruments
253 .write()
254 .expect("instrument cache lock poisoned");
255 guard.clear();
256 for instrument in &instruments {
257 guard.insert(instrument.id(), instrument.clone());
258 }
259 }
260
261 for instrument in &instruments {
262 self.http_client.cache_instrument(instrument.clone());
263 }
264
265 Ok(instruments)
266 }
267
268 fn is_connected(&self) -> bool {
269 self.is_connected.load(Ordering::Relaxed)
270 }
271
272 fn is_disconnected(&self) -> bool {
273 !self.is_connected()
274 }
275
276 fn maybe_spawn_instrument_refresh(&mut self) -> anyhow::Result<()> {
277 let Some(minutes) = self.config.update_instruments_interval_mins else {
278 return Ok(());
279 };
280
281 if minutes == 0 || self.instrument_refresh_active {
282 return Ok(());
283 }
284
285 let interval_secs = minutes.saturating_mul(60);
286 if interval_secs == 0 {
287 return Ok(());
288 }
289
290 let interval = Duration::from_secs(interval_secs);
291 let cancellation = self.cancellation_token.clone();
292 let instruments_cache = Arc::clone(&self.instruments);
293 let active_only = self.config.active_only;
294 let client_id = self.client_id;
295 let http_client = self.http_client.clone();
296
297 let handle = get_runtime().spawn(async move {
298 let http_client = http_client;
299 loop {
300 let sleep = tokio::time::sleep(interval);
301 tokio::pin!(sleep);
302 tokio::select! {
303 () = cancellation.cancelled() => {
304 log::debug!("BitMEX instrument refresh task cancelled");
305 break;
306 }
307 () = &mut sleep => {
308 match http_client.request_instruments(active_only).await {
309 Ok(mut instruments) => {
310 instruments.sort_by_key(|instrument| instrument.id());
311
312 {
313 let mut guard = instruments_cache
314 .write()
315 .expect("instrument cache lock poisoned");
316 guard.clear();
317 for instrument in &instruments {
318 guard.insert(instrument.id(), instrument.clone());
319 }
320 }
321
322 for instrument in instruments {
323 http_client.cache_instrument(instrument);
324 }
325
326 log::debug!("BitMEX instruments refreshed: client_id={client_id}");
327 }
328 Err(e) => {
329 log::warn!("Failed to refresh BitMEX instruments: client_id={client_id}, error={e:?}");
330 }
331 }
332 }
333 }
334 }
335 });
336
337 self.tasks.push(handle);
338 self.instrument_refresh_active = true;
339 Ok(())
340 }
341}
342
343#[async_trait::async_trait(?Send)]
344impl DataClient for BitmexDataClient {
345 fn client_id(&self) -> ClientId {
346 self.client_id
347 }
348
349 fn venue(&self) -> Option<Venue> {
350 Some(self.venue())
351 }
352
353 fn start(&mut self) -> anyhow::Result<()> {
354 log::info!(
355 "Starting BitMEX data client: client_id={}, use_testnet={}, http_proxy_url={:?}, ws_proxy_url={:?}",
356 self.client_id,
357 self.config.use_testnet,
358 self.config.http_proxy_url,
359 self.config.ws_proxy_url,
360 );
361 Ok(())
362 }
363
364 fn stop(&mut self) -> anyhow::Result<()> {
365 log::info!("Stopping BitMEX data client {id}", id = self.client_id);
366 self.cancellation_token.cancel();
367 self.is_connected.store(false, Ordering::Relaxed);
368 self.instrument_refresh_active = false;
369 Ok(())
370 }
371
372 fn reset(&mut self) -> anyhow::Result<()> {
373 log::debug!("Resetting BitMEX data client {id}", id = self.client_id);
374 self.is_connected.store(false, Ordering::Relaxed);
375 self.cancellation_token = CancellationToken::new();
376 self.tasks.clear();
377 self.book_channels
378 .write()
379 .expect("book channel cache lock poisoned")
380 .clear();
381 self.instrument_refresh_active = false;
382 Ok(())
383 }
384
385 fn dispose(&mut self) -> anyhow::Result<()> {
386 self.stop()
387 }
388
389 async fn connect(&mut self) -> anyhow::Result<()> {
390 if self.is_connected() {
391 return Ok(());
392 }
393
394 if self.ws_client.is_none() {
395 let ws = BitmexWebSocketClient::new(
396 Some(self.config.ws_url()),
397 self.config.api_key.clone(),
398 self.config.api_secret.clone(),
399 None,
400 self.config.heartbeat_interval_secs,
401 )
402 .context("failed to construct BitMEX websocket client")?;
403 self.ws_client = Some(ws);
404 }
405
406 let instruments = self.bootstrap_instruments().await?;
407 if let Some(ws) = self.ws_client.as_mut() {
408 ws.cache_instruments(instruments);
409 }
410
411 let ws = self.ws_client_mut()?;
412 ws.connect()
413 .await
414 .context("failed to connect BitMEX websocket")?;
415 ws.wait_until_active(10.0)
416 .await
417 .context("BitMEX websocket did not become active")?;
418
419 let stream = ws.stream();
420 self.spawn_stream_task(stream)?;
421 self.maybe_spawn_instrument_refresh()?;
422
423 self.is_connected.store(true, Ordering::Relaxed);
424 log::info!("Connected");
425 Ok(())
426 }
427
428 async fn disconnect(&mut self) -> anyhow::Result<()> {
429 if self.is_disconnected() {
430 return Ok(());
431 }
432
433 self.cancellation_token.cancel();
434
435 if let Some(ws) = self.ws_client.as_mut()
436 && let Err(e) = ws.close().await
437 {
438 log::warn!("Error while closing BitMEX websocket: {e:?}");
439 }
440
441 for handle in self.tasks.drain(..) {
442 if let Err(e) = handle.await {
443 log::error!("Error joining websocket task: {e:?}");
444 }
445 }
446
447 self.cancellation_token = CancellationToken::new();
448 self.is_connected.store(false, Ordering::Relaxed);
449 self.book_channels
450 .write()
451 .expect("book channel cache lock poisoned")
452 .clear();
453 self.instrument_refresh_active = false;
454
455 log::info!("Disconnected");
456 Ok(())
457 }
458
459 fn is_connected(&self) -> bool {
460 self.is_connected()
461 }
462
463 fn is_disconnected(&self) -> bool {
464 self.is_disconnected()
465 }
466
467 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
468 if cmd.book_type != BookType::L2_MBP {
469 anyhow::bail!("BitMEX only supports L2_MBP order book deltas");
470 }
471
472 let instrument_id = cmd.instrument_id;
473 let depth = cmd.depth.map_or(0, |d| d.get());
474 let channel = if depth > 0 && depth <= 25 {
475 BitmexBookChannel::OrderBookL2_25
476 } else {
477 BitmexBookChannel::OrderBookL2
478 };
479
480 let ws = self.ws_client()?.clone();
481 let book_channels = Arc::clone(&self.book_channels);
482
483 self.spawn_ws(
484 async move {
485 match channel {
486 BitmexBookChannel::OrderBookL2 => ws
487 .subscribe_book(instrument_id)
488 .await
489 .map_err(|err| anyhow::anyhow!(err))?,
490 BitmexBookChannel::OrderBookL2_25 => ws
491 .subscribe_book_25(instrument_id)
492 .await
493 .map_err(|err| anyhow::anyhow!(err))?,
494 BitmexBookChannel::OrderBook10 => unreachable!(),
495 }
496 book_channels
497 .write()
498 .expect("book channel cache lock poisoned")
499 .insert(instrument_id, channel);
500 Ok(())
501 },
502 "BitMEX book delta subscription",
503 );
504
505 Ok(())
506 }
507
508 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
509 let instrument_id = cmd.instrument_id;
510 let ws = self.ws_client()?.clone();
511 let book_channels = Arc::clone(&self.book_channels);
512
513 self.spawn_ws(
514 async move {
515 ws.subscribe_book_depth10(instrument_id)
516 .await
517 .map_err(|err| anyhow::anyhow!(err))?;
518 book_channels
519 .write()
520 .expect("book channel cache lock poisoned")
521 .insert(instrument_id, BitmexBookChannel::OrderBook10);
522 Ok(())
523 },
524 "BitMEX book depth10 subscription",
525 );
526 Ok(())
527 }
528
529 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
530 let instrument_id = cmd.instrument_id;
531 let ws = self.ws_client()?.clone();
532
533 self.spawn_ws(
534 async move {
535 ws.subscribe_quotes(instrument_id)
536 .await
537 .map_err(|err| anyhow::anyhow!(err))
538 },
539 "BitMEX quote subscription",
540 );
541 Ok(())
542 }
543
544 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
545 let instrument_id = cmd.instrument_id;
546 let ws = self.ws_client()?.clone();
547
548 self.spawn_ws(
549 async move {
550 ws.subscribe_trades(instrument_id)
551 .await
552 .map_err(|err| anyhow::anyhow!(err))
553 },
554 "BitMEX trade subscription",
555 );
556 Ok(())
557 }
558
559 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
560 let instrument_id = cmd.instrument_id;
561 let ws = self.ws_client()?.clone();
562
563 self.spawn_ws(
564 async move {
565 ws.subscribe_mark_prices(instrument_id)
566 .await
567 .map_err(|err| anyhow::anyhow!(err))
568 },
569 "BitMEX mark price subscription",
570 );
571 Ok(())
572 }
573
574 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
575 let instrument_id = cmd.instrument_id;
576 let ws = self.ws_client()?.clone();
577
578 self.spawn_ws(
579 async move {
580 ws.subscribe_index_prices(instrument_id)
581 .await
582 .map_err(|err| anyhow::anyhow!(err))
583 },
584 "BitMEX index price subscription",
585 );
586 Ok(())
587 }
588
589 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
590 let instrument_id = cmd.instrument_id;
591 let ws = self.ws_client()?.clone();
592
593 self.spawn_ws(
594 async move {
595 ws.subscribe_funding_rates(instrument_id)
596 .await
597 .map_err(|err| anyhow::anyhow!(err))
598 },
599 "BitMEX funding rate subscription",
600 );
601 Ok(())
602 }
603
604 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
605 let bar_type = cmd.bar_type;
606 let ws = self.ws_client()?.clone();
607
608 self.spawn_ws(
609 async move {
610 ws.subscribe_bars(bar_type)
611 .await
612 .map_err(|err| anyhow::anyhow!(err))
613 },
614 "BitMEX bar subscription",
615 );
616 Ok(())
617 }
618
619 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
620 let instrument_id = cmd.instrument_id;
621 let ws = self.ws_client()?.clone();
622 let book_channels = Arc::clone(&self.book_channels);
623
624 self.spawn_ws(
625 async move {
626 let channel = book_channels
627 .write()
628 .expect("book channel cache lock poisoned")
629 .remove(&instrument_id);
630
631 match channel {
632 Some(BitmexBookChannel::OrderBookL2) => ws
633 .unsubscribe_book(instrument_id)
634 .await
635 .map_err(|err| anyhow::anyhow!(err))?,
636 Some(BitmexBookChannel::OrderBookL2_25) => ws
637 .unsubscribe_book_25(instrument_id)
638 .await
639 .map_err(|err| anyhow::anyhow!(err))?,
640 Some(BitmexBookChannel::OrderBook10) | None => ws
641 .unsubscribe_book(instrument_id)
642 .await
643 .map_err(|err| anyhow::anyhow!(err))?,
644 }
645 Ok(())
646 },
647 "BitMEX book delta unsubscribe",
648 );
649 Ok(())
650 }
651
652 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
653 let instrument_id = cmd.instrument_id;
654 let ws = self.ws_client()?.clone();
655 let book_channels = Arc::clone(&self.book_channels);
656
657 self.spawn_ws(
658 async move {
659 book_channels
660 .write()
661 .expect("book channel cache lock poisoned")
662 .remove(&instrument_id);
663 ws.unsubscribe_book_depth10(instrument_id)
664 .await
665 .map_err(|err| anyhow::anyhow!(err))
666 },
667 "BitMEX book depth10 unsubscribe",
668 );
669 Ok(())
670 }
671
672 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
673 let instrument_id = cmd.instrument_id;
674 let ws = self.ws_client()?.clone();
675
676 self.spawn_ws(
677 async move {
678 ws.unsubscribe_quotes(instrument_id)
679 .await
680 .map_err(|err| anyhow::anyhow!(err))
681 },
682 "BitMEX quote unsubscribe",
683 );
684 Ok(())
685 }
686
687 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
688 let instrument_id = cmd.instrument_id;
689 let ws = self.ws_client()?.clone();
690
691 self.spawn_ws(
692 async move {
693 ws.unsubscribe_trades(instrument_id)
694 .await
695 .map_err(|err| anyhow::anyhow!(err))
696 },
697 "BitMEX trade unsubscribe",
698 );
699 Ok(())
700 }
701
702 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
703 let ws = self.ws_client()?.clone();
704 let instrument_id = cmd.instrument_id;
705
706 self.spawn_ws(
707 async move {
708 ws.unsubscribe_mark_prices(instrument_id)
709 .await
710 .map_err(|err| anyhow::anyhow!(err))
711 },
712 "BitMEX mark price unsubscribe",
713 );
714 Ok(())
715 }
716
717 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
718 let ws = self.ws_client()?.clone();
719 let instrument_id = cmd.instrument_id;
720
721 self.spawn_ws(
722 async move {
723 ws.unsubscribe_index_prices(instrument_id)
724 .await
725 .map_err(|err| anyhow::anyhow!(err))
726 },
727 "BitMEX index price unsubscribe",
728 );
729 Ok(())
730 }
731
732 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
733 let ws = self.ws_client()?.clone();
734 let instrument_id = cmd.instrument_id;
735
736 self.spawn_ws(
737 async move {
738 ws.unsubscribe_funding_rates(instrument_id)
739 .await
740 .map_err(|err| anyhow::anyhow!(err))
741 },
742 "BitMEX funding rate unsubscribe",
743 );
744 Ok(())
745 }
746
747 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
748 let bar_type = cmd.bar_type;
749 let ws = self.ws_client()?.clone();
750
751 self.spawn_ws(
752 async move {
753 ws.unsubscribe_bars(bar_type)
754 .await
755 .map_err(|err| anyhow::anyhow!(err))
756 },
757 "BitMEX bar unsubscribe",
758 );
759 Ok(())
760 }
761
762 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
763 let venue = request.venue.unwrap_or_else(|| self.venue());
764 if let Some(req_venue) = request.venue
765 && req_venue != self.venue()
766 {
767 log::warn!("Ignoring mismatched venue in instruments request: {req_venue}");
768 }
769
770 let http = self.http_client.clone();
771 let instruments_cache = Arc::clone(&self.instruments);
772 let sender = self.data_sender.clone();
773 let request_id = request.request_id;
774 let client_id = request.client_id.unwrap_or(self.client_id);
775 let params = request.params.clone();
776 let start_nanos = datetime_to_unix_nanos(request.start);
777 let end_nanos = datetime_to_unix_nanos(request.end);
778 let clock = self.clock;
779 let active_only = self.config.active_only;
780
781 get_runtime().spawn(async move {
782 let http_client = http;
783 match http_client
784 .request_instruments(active_only)
785 .await
786 .context("failed to request instruments from BitMEX")
787 {
788 Ok(instruments) => {
789 {
790 let mut guard = instruments_cache
791 .write()
792 .expect("instrument cache lock poisoned");
793 guard.clear();
794 for instrument in &instruments {
795 guard.insert(instrument.id(), instrument.clone());
796 http_client.cache_instrument(instrument.clone());
797 }
798 }
799
800 let response = DataResponse::Instruments(InstrumentsResponse::new(
801 request_id,
802 client_id,
803 venue,
804 instruments,
805 start_nanos,
806 end_nanos,
807 clock.get_time_ns(),
808 params,
809 ));
810 if let Err(e) = sender.send(DataEvent::Response(response)) {
811 log::error!("Failed to send instruments response: {e}");
812 }
813 }
814 Err(e) => log::error!("Instrument request failed: {e:?}"),
815 }
816 });
817
818 Ok(())
819 }
820
821 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
822 if let Some(instrument) = self
823 .instruments
824 .read()
825 .expect("instrument cache lock poisoned")
826 .get(&request.instrument_id)
827 .cloned()
828 {
829 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
830 request.request_id,
831 request.client_id.unwrap_or(self.client_id),
832 instrument.id(),
833 instrument,
834 datetime_to_unix_nanos(request.start),
835 datetime_to_unix_nanos(request.end),
836 self.clock.get_time_ns(),
837 request.params.clone(),
838 )));
839 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
840 log::error!("Failed to send instrument response: {e}");
841 }
842 return Ok(());
843 }
844
845 let http_client = self.http_client.clone();
846 let instruments_cache = Arc::clone(&self.instruments);
847 let sender = self.data_sender.clone();
848 let instrument_id = request.instrument_id;
849 let request_id = request.request_id;
850 let client_id = request.client_id.unwrap_or(self.client_id);
851 let start = request.start;
852 let end = request.end;
853 let params = request.params.clone();
854 let clock = self.clock;
855
856 get_runtime().spawn(async move {
857 match http_client
858 .request_instrument(instrument_id)
859 .await
860 .context("failed to request instrument from BitMEX")
861 {
862 Ok(Some(instrument)) => {
863 http_client.cache_instrument(instrument.clone());
864 {
865 let mut guard = instruments_cache
866 .write()
867 .expect("instrument cache lock poisoned");
868 guard.insert(instrument.id(), instrument.clone());
869 }
870
871 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
872 request_id,
873 client_id,
874 instrument.id(),
875 instrument,
876 datetime_to_unix_nanos(start),
877 datetime_to_unix_nanos(end),
878 clock.get_time_ns(),
879 params,
880 )));
881 if let Err(e) = sender.send(DataEvent::Response(response)) {
882 log::error!("Failed to send instrument response: {e}");
883 }
884 }
885 Ok(None) => log::warn!("BitMEX instrument {instrument_id} not found"),
886 Err(e) => log::error!("Instrument request failed: {e:?}"),
887 }
888 });
889
890 Ok(())
891 }
892
893 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
894 let http = self.http_client.clone();
895 let sender = self.data_sender.clone();
896 let instrument_id = request.instrument_id;
897 let start = request.start;
898 let end = request.end;
899 let limit = request.limit.map(|n| n.get() as u32);
900 let request_id = request.request_id;
901 let client_id = request.client_id.unwrap_or(self.client_id);
902 let params = request.params.clone();
903 let clock = self.clock;
904 let start_nanos = datetime_to_unix_nanos(start);
905 let end_nanos = datetime_to_unix_nanos(end);
906
907 get_runtime().spawn(async move {
908 match http
909 .request_trades(instrument_id, start, end, limit)
910 .await
911 .context("failed to request trades from BitMEX")
912 {
913 Ok(trades) => {
914 let response = DataResponse::Trades(TradesResponse::new(
915 request_id,
916 client_id,
917 instrument_id,
918 trades,
919 start_nanos,
920 end_nanos,
921 clock.get_time_ns(),
922 params,
923 ));
924 if let Err(e) = sender.send(DataEvent::Response(response)) {
925 log::error!("Failed to send trades response: {e}");
926 }
927 }
928 Err(e) => log::error!("Trade request failed: {e:?}"),
929 }
930 });
931
932 Ok(())
933 }
934
935 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
936 let http = self.http_client.clone();
937 let sender = self.data_sender.clone();
938 let bar_type = request.bar_type;
939 let start = request.start;
940 let end = request.end;
941 let limit = request.limit.map(|n| n.get() as u32);
942 let request_id = request.request_id;
943 let client_id = request.client_id.unwrap_or(self.client_id);
944 let params = request.params.clone();
945 let clock = self.clock;
946 let start_nanos = datetime_to_unix_nanos(start);
947 let end_nanos = datetime_to_unix_nanos(end);
948
949 get_runtime().spawn(async move {
950 match http
951 .request_bars(bar_type, start, end, limit, false)
952 .await
953 .context("failed to request bars from BitMEX")
954 {
955 Ok(bars) => {
956 let response = DataResponse::Bars(BarsResponse::new(
957 request_id,
958 client_id,
959 bar_type,
960 bars,
961 start_nanos,
962 end_nanos,
963 clock.get_time_ns(),
964 params,
965 ));
966 if let Err(e) = sender.send(DataEvent::Response(response)) {
967 log::error!("Failed to send bars response: {e}");
968 }
969 }
970 Err(e) => log::error!("Bar request failed: {e:?}"),
971 }
972 });
973
974 Ok(())
975 }
976}