1use std::{
19 future::Future,
20 sync::{
21 Arc, RwLock,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::{AHashMap, AHashSet};
27use anyhow::Context;
28use futures_util::{StreamExt, pin_mut};
29use nautilus_common::{
30 clients::DataClient,
31 live::{runner::get_data_event_sender, runtime::get_runtime},
32 messages::{
33 DataEvent,
34 data::{
35 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
36 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
37 SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices, SubscribeMarkPrices,
38 SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
39 UnsubscribeBookDeltas, UnsubscribeFundingRates, UnsubscribeIndexPrices,
40 UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
41 },
42 },
43};
44use nautilus_core::{
45 MUTEX_POISONED,
46 datetime::datetime_to_unix_nanos,
47 time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_model::{
50 data::{Data, OrderBookDeltas_API},
51 enums::BookType,
52 identifiers::{ClientId, InstrumentId, Venue},
53 instruments::{Instrument, InstrumentAny},
54};
55use tokio::{task::JoinHandle, time::Duration};
56use tokio_util::sync::CancellationToken;
57
58use crate::{
59 common::{
60 consts::{BYBIT_DEFAULT_ORDERBOOK_DEPTH, BYBIT_VENUE},
61 enums::BybitProductType,
62 parse::extract_raw_symbol,
63 },
64 config::BybitDataClientConfig,
65 http::client::BybitHttpClient,
66 websocket::{client::BybitWebSocketClient, messages::NautilusWsMessage},
67};
68
69#[derive(Debug)]
71pub struct BybitDataClient {
72 client_id: ClientId,
73 config: BybitDataClientConfig,
74 http_client: BybitHttpClient,
75 ws_clients: Vec<BybitWebSocketClient>,
76 is_connected: AtomicBool,
77 cancellation_token: CancellationToken,
78 tasks: Vec<JoinHandle<()>>,
79 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
80 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
81 book_depths: Arc<RwLock<AHashMap<InstrumentId, u32>>>,
82 quote_depths: Arc<RwLock<AHashMap<InstrumentId, u32>>>,
83 ticker_subs: Arc<RwLock<AHashMap<InstrumentId, AHashSet<&'static str>>>>,
84 clock: &'static AtomicTime,
85}
86
87impl BybitDataClient {
88 pub fn new(client_id: ClientId, config: BybitDataClientConfig) -> anyhow::Result<Self> {
94 let clock = get_atomic_clock_realtime();
95 let data_sender = get_data_event_sender();
96
97 let http_client = if let (Some(api_key), Some(api_secret)) =
98 (config.api_key.clone(), config.api_secret.clone())
99 {
100 BybitHttpClient::with_credentials(
101 api_key,
102 api_secret,
103 Some(config.http_base_url()),
104 config.http_timeout_secs,
105 config.max_retries,
106 config.retry_delay_initial_ms,
107 config.retry_delay_max_ms,
108 config.recv_window_ms,
109 config.http_proxy_url.clone(),
110 )?
111 } else {
112 BybitHttpClient::new(
113 Some(config.http_base_url()),
114 config.http_timeout_secs,
115 config.max_retries,
116 config.retry_delay_initial_ms,
117 config.retry_delay_max_ms,
118 config.recv_window_ms,
119 config.http_proxy_url.clone(),
120 )?
121 };
122
123 let product_types = if config.product_types.is_empty() {
125 vec![BybitProductType::Linear]
126 } else {
127 config.product_types.clone()
128 };
129
130 let ws_clients: Vec<BybitWebSocketClient> = product_types
131 .iter()
132 .map(|product_type| {
133 BybitWebSocketClient::new_public_with(
134 *product_type,
135 config.environment,
136 Some(config.ws_public_url_for(*product_type)),
137 config.heartbeat_interval_secs,
138 )
139 })
140 .collect();
141
142 Ok(Self {
143 client_id,
144 config,
145 http_client,
146 ws_clients,
147 is_connected: AtomicBool::new(false),
148 cancellation_token: CancellationToken::new(),
149 tasks: Vec::new(),
150 data_sender,
151 instruments: Arc::new(RwLock::new(AHashMap::new())),
152 book_depths: Arc::new(RwLock::new(AHashMap::new())),
153 quote_depths: Arc::new(RwLock::new(AHashMap::new())),
154 ticker_subs: Arc::new(RwLock::new(AHashMap::new())),
155 clock,
156 })
157 }
158
159 fn venue(&self) -> Venue {
160 *BYBIT_VENUE
161 }
162
163 fn get_ws_client_for_product(
164 &self,
165 product_type: BybitProductType,
166 ) -> Option<&BybitWebSocketClient> {
167 self.ws_clients
168 .iter()
169 .find(|ws| ws.product_type() == Some(product_type))
170 }
171
172 fn get_product_type_for_instrument(
173 &self,
174 instrument_id: InstrumentId,
175 ) -> Option<BybitProductType> {
176 let guard = self.instruments.read().expect(MUTEX_POISONED);
177 guard.get(&instrument_id).map(|inst| {
178 let symbol_str = instrument_id.symbol.as_str();
180 if symbol_str.ends_with("-SPOT") || !symbol_str.contains('-') {
181 BybitProductType::Spot
182 } else if symbol_str.ends_with("-OPTION") {
183 BybitProductType::Option
184 } else if inst.is_inverse() {
185 BybitProductType::Inverse
186 } else {
187 BybitProductType::Linear
188 }
189 })
190 }
191
192 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
193 if let Err(e) = sender.send(DataEvent::Data(data)) {
194 log::error!("Failed to emit data event: {e}");
195 }
196 }
197
198 fn spawn_ws<F>(&self, fut: F, context: &'static str)
199 where
200 F: Future<Output = anyhow::Result<()>> + Send + 'static,
201 {
202 get_runtime().spawn(async move {
203 if let Err(e) = fut.await {
204 log::error!("{context}: {e:?}");
205 }
206 });
207 }
208
209 fn handle_ws_message(
210 message: NautilusWsMessage,
211 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
212 _instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
213 ticker_subs: &Arc<RwLock<AHashMap<InstrumentId, AHashSet<&'static str>>>>,
214 quote_depths: &Arc<RwLock<AHashMap<InstrumentId, u32>>>,
215 book_depths: &Arc<RwLock<AHashMap<InstrumentId, u32>>>,
216 ) {
217 match message {
218 NautilusWsMessage::Data(payloads) => {
219 let ticker = ticker_subs.read().expect(MUTEX_POISONED);
220 let depths = quote_depths.read().expect(MUTEX_POISONED);
221 for data in payloads {
222 if let Data::Quote(ref quote) = data {
224 let has_ticker_sub = ticker
225 .get("e.instrument_id)
226 .is_some_and(|s| s.contains("quotes"));
227 let has_depth_sub = depths.contains_key("e.instrument_id);
228 if !has_ticker_sub && !has_depth_sub {
229 continue;
230 }
231 }
232 Self::send_data(data_sender, data);
233 }
234 }
235 NautilusWsMessage::Deltas(deltas) => {
236 let books = book_depths.read().expect(MUTEX_POISONED);
237 if books.contains_key(&deltas.instrument_id) {
238 Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
239 }
240 }
241 NautilusWsMessage::FundingRates(updates) => {
242 let subs = ticker_subs.read().expect(MUTEX_POISONED);
243 for update in updates {
244 if !subs
245 .get(&update.instrument_id)
246 .is_some_and(|s| s.contains("funding"))
247 {
248 continue;
249 }
250 if let Err(e) = data_sender.send(DataEvent::FundingRate(update)) {
251 log::error!("Failed to emit funding rate event: {e}");
252 }
253 }
254 }
255 NautilusWsMessage::MarkPrices(updates) => {
256 let subs = ticker_subs.read().expect(MUTEX_POISONED);
257 for update in updates {
258 if subs
259 .get(&update.instrument_id)
260 .is_some_and(|s| s.contains("mark_prices"))
261 {
262 Self::send_data(data_sender, Data::MarkPriceUpdate(update));
263 }
264 }
265 }
266 NautilusWsMessage::IndexPrices(updates) => {
267 let subs = ticker_subs.read().expect(MUTEX_POISONED);
268 for update in updates {
269 if subs
270 .get(&update.instrument_id)
271 .is_some_and(|s| s.contains("index_prices"))
272 {
273 Self::send_data(data_sender, Data::IndexPriceUpdate(update));
274 }
275 }
276 }
277 NautilusWsMessage::OrderStatusReports(_)
278 | NautilusWsMessage::FillReports(_)
279 | NautilusWsMessage::PositionStatusReport(_)
280 | NautilusWsMessage::AccountState(_)
281 | NautilusWsMessage::OrderRejected(_)
282 | NautilusWsMessage::OrderCancelRejected(_)
283 | NautilusWsMessage::OrderModifyRejected(_) => {
284 log::debug!("Ignoring trading message on data client");
285 }
286 NautilusWsMessage::Error(e) => {
287 log::error!(
288 "Bybit websocket error: code={} message={}",
289 e.code,
290 e.message
291 );
292 }
293 NautilusWsMessage::Reconnected => {
294 log::info!("Websocket reconnected");
295 }
296 NautilusWsMessage::Authenticated => {
297 log::debug!("Websocket authenticated");
298 }
299 }
300 }
301}
302
303fn upsert_instrument(
304 cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
305 instrument: InstrumentAny,
306) {
307 let mut guard = cache.write().expect(MUTEX_POISONED);
308 guard.insert(instrument.id(), instrument);
309}
310
311#[async_trait::async_trait(?Send)]
312impl DataClient for BybitDataClient {
313 fn client_id(&self) -> ClientId {
314 self.client_id
315 }
316
317 fn venue(&self) -> Option<Venue> {
318 Some(self.venue())
319 }
320
321 fn start(&mut self) -> anyhow::Result<()> {
322 log::info!(
323 "Started: client_id={}, product_types={:?}, environment={:?}, http_proxy_url={:?}, ws_proxy_url={:?}",
324 self.client_id,
325 self.config.product_types,
326 self.config.environment,
327 self.config.http_proxy_url,
328 self.config.ws_proxy_url,
329 );
330 Ok(())
331 }
332
333 fn stop(&mut self) -> anyhow::Result<()> {
334 log::info!("Stopping {id}", id = self.client_id);
335 self.cancellation_token.cancel();
336 self.is_connected.store(false, Ordering::Relaxed);
337 Ok(())
338 }
339
340 fn reset(&mut self) -> anyhow::Result<()> {
341 log::debug!("Resetting {id}", id = self.client_id);
342 self.is_connected.store(false, Ordering::Relaxed);
343 self.cancellation_token = CancellationToken::new();
344 self.tasks.clear();
345 self.book_depths.write().expect(MUTEX_POISONED).clear();
346 self.quote_depths.write().expect(MUTEX_POISONED).clear();
347 self.ticker_subs.write().expect(MUTEX_POISONED).clear();
348 Ok(())
349 }
350
351 fn dispose(&mut self) -> anyhow::Result<()> {
352 log::debug!("Disposing {id}", id = self.client_id);
353 self.stop()
354 }
355
356 async fn connect(&mut self) -> anyhow::Result<()> {
357 if self.is_connected() {
358 return Ok(());
359 }
360
361 let product_types = if self.config.product_types.is_empty() {
362 vec![BybitProductType::Linear]
363 } else {
364 self.config.product_types.clone()
365 };
366
367 let mut all_instruments = Vec::new();
368 for product_type in &product_types {
369 let fetched = self
370 .http_client
371 .request_instruments(*product_type, None)
372 .await
373 .with_context(|| {
374 format!("failed to request Bybit instruments for {product_type:?}")
375 })?;
376
377 self.http_client.cache_instruments(fetched.clone());
378
379 let mut guard = self.instruments.write().expect(MUTEX_POISONED);
380 for instrument in &fetched {
381 guard.insert(instrument.id(), instrument.clone());
382 }
383 drop(guard);
384
385 all_instruments.extend(fetched);
386 }
387
388 for instrument in all_instruments {
389 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
390 log::warn!("Failed to send instrument: {e}");
391 }
392 }
393
394 for ws_client in &mut self.ws_clients {
395 let instruments: Vec<_> = self
397 .instruments
398 .read()
399 .expect(MUTEX_POISONED)
400 .values()
401 .cloned()
402 .collect();
403 ws_client.cache_instruments(instruments);
404
405 ws_client
406 .connect()
407 .await
408 .context("failed to connect Bybit WebSocket")?;
409 ws_client
410 .wait_until_active(10.0)
411 .await
412 .context("WebSocket did not become active")?;
413
414 let stream = ws_client.stream();
415 let sender = self.data_sender.clone();
416 let insts = self.instruments.clone();
417 let ticker_subs = self.ticker_subs.clone();
418 let quote_depths = self.quote_depths.clone();
419 let book_depths = self.book_depths.clone();
420 let cancel = self.cancellation_token.clone();
421 let handle = get_runtime().spawn(async move {
422 pin_mut!(stream);
423 loop {
424 tokio::select! {
425 Some(message) = stream.next() => {
426 Self::handle_ws_message(message, &sender, &insts, &ticker_subs, "e_depths, &book_depths);
427 }
428 () = cancel.cancelled() => {
429 log::debug!("WebSocket stream task cancelled");
430 break;
431 }
432 }
433 }
434 });
435 self.tasks.push(handle);
436 }
437
438 self.is_connected.store(true, Ordering::Release);
439 log::info!("Connected: client_id={}", self.client_id);
440 Ok(())
441 }
442
443 async fn disconnect(&mut self) -> anyhow::Result<()> {
444 if self.is_disconnected() {
445 return Ok(());
446 }
447
448 self.cancellation_token.cancel();
449
450 self.cancellation_token = CancellationToken::new();
452
453 for ws_client in &mut self.ws_clients {
454 if let Err(e) = ws_client.close().await {
455 log::warn!("Error closing WebSocket: {e:?}");
456 }
457 }
458
459 tokio::time::sleep(Duration::from_millis(500)).await;
461
462 let handles: Vec<_> = self.tasks.drain(..).collect();
463 for handle in handles {
464 if let Err(e) = handle.await {
465 log::error!("Error joining WebSocket task: {e}");
466 }
467 }
468
469 self.book_depths.write().expect(MUTEX_POISONED).clear();
470 self.quote_depths.write().expect(MUTEX_POISONED).clear();
471 self.ticker_subs.write().expect(MUTEX_POISONED).clear();
472 self.is_connected.store(false, Ordering::Release);
473 log::info!("Disconnected: client_id={}", self.client_id);
474 Ok(())
475 }
476
477 fn is_connected(&self) -> bool {
478 self.is_connected.load(Ordering::Relaxed)
479 }
480
481 fn is_disconnected(&self) -> bool {
482 !self.is_connected()
483 }
484
485 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
486 if cmd.book_type != BookType::L2_MBP {
487 anyhow::bail!("Bybit only supports L2_MBP order book deltas");
488 }
489
490 let depth = cmd
491 .depth
492 .map_or(BYBIT_DEFAULT_ORDERBOOK_DEPTH, |d| d.get() as u32);
493 if !matches!(depth, 1 | 50 | 200 | 500) {
494 anyhow::bail!("invalid depth {depth}; valid values are 1, 50, 200, or 500");
495 }
496
497 let instrument_id = cmd.instrument_id;
498 let product_type = self
499 .get_product_type_for_instrument(instrument_id)
500 .unwrap_or(BybitProductType::Linear);
501
502 let ws = self
503 .get_ws_client_for_product(product_type)
504 .context("no WebSocket client for product type")?
505 .clone();
506
507 let book_depths = Arc::clone(&self.book_depths);
508
509 self.spawn_ws(
510 async move {
511 ws.subscribe_orderbook(instrument_id, depth)
512 .await
513 .context("orderbook subscription")?;
514 book_depths
515 .write()
516 .expect("book depths cache lock poisoned")
517 .insert(instrument_id, depth);
518 Ok(())
519 },
520 "order book delta subscription",
521 );
522
523 Ok(())
524 }
525
526 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
527 let instrument_id = cmd.instrument_id;
528 let product_type = self
529 .get_product_type_for_instrument(instrument_id)
530 .unwrap_or(BybitProductType::Linear);
531
532 let ws = self
533 .get_ws_client_for_product(product_type)
534 .context("no WebSocket client for product type")?
535 .clone();
536
537 if product_type == BybitProductType::Spot {
539 let depth = 1;
540 self.quote_depths
541 .write()
542 .expect(MUTEX_POISONED)
543 .insert(instrument_id, depth);
544
545 self.spawn_ws(
546 async move {
547 ws.subscribe_orderbook(instrument_id, depth)
548 .await
549 .context("orderbook subscription for quotes")
550 },
551 "quote subscription (spot orderbook)",
552 );
553 } else {
554 let should_subscribe = {
555 let mut subs = self.ticker_subs.write().expect(MUTEX_POISONED);
556 let entry = subs.entry(instrument_id).or_default();
557 let is_first = entry.is_empty();
558 entry.insert("quotes");
559 is_first
560 };
561
562 if should_subscribe {
563 self.spawn_ws(
564 async move {
565 ws.subscribe_ticker(instrument_id)
566 .await
567 .context("ticker subscription")
568 },
569 "quote subscription",
570 );
571 }
572 }
573 Ok(())
574 }
575
576 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
577 let instrument_id = cmd.instrument_id;
578 let product_type = self
579 .get_product_type_for_instrument(instrument_id)
580 .unwrap_or(BybitProductType::Linear);
581
582 let ws = self
583 .get_ws_client_for_product(product_type)
584 .context("no WebSocket client for product type")?
585 .clone();
586
587 self.spawn_ws(
588 async move {
589 ws.subscribe_trades(instrument_id)
590 .await
591 .context("trades subscription")
592 },
593 "trade subscription",
594 );
595 Ok(())
596 }
597
598 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
599 let instrument_id = cmd.instrument_id;
600 let product_type = self
601 .get_product_type_for_instrument(instrument_id)
602 .unwrap_or(BybitProductType::Linear);
603
604 if product_type == BybitProductType::Spot {
605 anyhow::bail!("Funding rates not available for Spot instruments");
606 }
607
608 let should_subscribe = {
609 let mut subs = self.ticker_subs.write().expect(MUTEX_POISONED);
610 let entry = subs.entry(instrument_id).or_default();
611 let first = entry.is_empty();
612 entry.insert("funding");
613 first
614 };
615
616 if should_subscribe {
617 let ws = self
618 .get_ws_client_for_product(product_type)
619 .context("no WebSocket client for product type")?
620 .clone();
621
622 self.spawn_ws(
623 async move {
624 ws.subscribe_ticker(instrument_id)
625 .await
626 .context("ticker subscription for funding rates")
627 },
628 "funding rate subscription",
629 );
630 }
631 Ok(())
632 }
633
634 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
635 let instrument_id = cmd.instrument_id;
636 let product_type = self
637 .get_product_type_for_instrument(instrument_id)
638 .unwrap_or(BybitProductType::Linear);
639
640 if product_type == BybitProductType::Spot {
641 anyhow::bail!("Mark prices not available for Spot instruments");
642 }
643
644 let should_subscribe = {
645 let mut subs = self.ticker_subs.write().expect(MUTEX_POISONED);
646 let entry = subs.entry(instrument_id).or_default();
647 let first = entry.is_empty();
648 entry.insert("mark_prices");
649 first
650 };
651
652 if should_subscribe {
653 let ws = self
654 .get_ws_client_for_product(product_type)
655 .context("no WebSocket client for product type")?
656 .clone();
657
658 self.spawn_ws(
659 async move {
660 ws.subscribe_ticker(instrument_id)
661 .await
662 .context("ticker subscription for mark prices")
663 },
664 "mark price subscription",
665 );
666 }
667 Ok(())
668 }
669
670 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
671 let instrument_id = cmd.instrument_id;
672 let product_type = self
673 .get_product_type_for_instrument(instrument_id)
674 .unwrap_or(BybitProductType::Linear);
675
676 if product_type == BybitProductType::Spot {
677 anyhow::bail!("Index prices not available for Spot instruments");
678 }
679
680 let should_subscribe = {
681 let mut subs = self.ticker_subs.write().expect(MUTEX_POISONED);
682 let entry = subs.entry(instrument_id).or_default();
683 let first = entry.is_empty();
684 entry.insert("index_prices");
685 first
686 };
687
688 if should_subscribe {
689 let ws = self
690 .get_ws_client_for_product(product_type)
691 .context("no WebSocket client for product type")?
692 .clone();
693
694 self.spawn_ws(
695 async move {
696 ws.subscribe_ticker(instrument_id)
697 .await
698 .context("ticker subscription for index prices")
699 },
700 "index price subscription",
701 );
702 }
703 Ok(())
704 }
705
706 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
707 let bar_type = cmd.bar_type;
708 let instrument_id = bar_type.instrument_id();
709 let product_type = self
710 .get_product_type_for_instrument(instrument_id)
711 .unwrap_or(BybitProductType::Linear);
712
713 let ws = self
714 .get_ws_client_for_product(product_type)
715 .context("no WebSocket client for product type")?
716 .clone();
717
718 self.spawn_ws(
719 async move {
720 ws.subscribe_bars(bar_type)
721 .await
722 .context("bars subscription")
723 },
724 "bar subscription",
725 );
726 Ok(())
727 }
728
729 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
730 let instrument_id = cmd.instrument_id;
731 let depth = self
732 .book_depths
733 .write()
734 .expect(MUTEX_POISONED)
735 .remove(&instrument_id)
736 .unwrap_or(BYBIT_DEFAULT_ORDERBOOK_DEPTH);
737
738 let product_type = self
739 .get_product_type_for_instrument(instrument_id)
740 .unwrap_or(BybitProductType::Linear);
741
742 let quote_using_same_depth = self
744 .quote_depths
745 .read()
746 .expect(MUTEX_POISONED)
747 .get(&instrument_id)
748 .is_some_and(|&d| d == depth);
749
750 if quote_using_same_depth {
751 return Ok(());
752 }
753
754 let ws = self
755 .get_ws_client_for_product(product_type)
756 .context("no WebSocket client for product type")?
757 .clone();
758
759 self.spawn_ws(
760 async move {
761 ws.unsubscribe_orderbook(instrument_id, depth)
762 .await
763 .context("orderbook unsubscribe")
764 },
765 "order book unsubscribe",
766 );
767 Ok(())
768 }
769
770 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
771 let instrument_id = cmd.instrument_id;
772 let product_type = self
773 .get_product_type_for_instrument(instrument_id)
774 .unwrap_or(BybitProductType::Linear);
775
776 let ws = self
777 .get_ws_client_for_product(product_type)
778 .context("no WebSocket client for product type")?
779 .clone();
780
781 if product_type == BybitProductType::Spot {
782 let depth = self
783 .quote_depths
784 .write()
785 .expect(MUTEX_POISONED)
786 .remove(&instrument_id)
787 .unwrap_or(1);
788
789 let book_using_same_depth = self
791 .book_depths
792 .read()
793 .expect(MUTEX_POISONED)
794 .get(&instrument_id)
795 .is_some_and(|&d| d == depth);
796
797 if !book_using_same_depth {
798 self.spawn_ws(
799 async move {
800 ws.unsubscribe_orderbook(instrument_id, depth)
801 .await
802 .context("orderbook unsubscribe for quotes")
803 },
804 "quote unsubscribe (spot orderbook)",
805 );
806 }
807 } else {
808 let should_unsubscribe = {
809 let mut subs = self.ticker_subs.write().expect(MUTEX_POISONED);
810 if let Some(entry) = subs.get_mut(&instrument_id) {
811 entry.remove("quotes");
812 if entry.is_empty() {
813 subs.remove(&instrument_id);
814 true
815 } else {
816 false
817 }
818 } else {
819 false
820 }
821 };
822
823 if should_unsubscribe {
824 self.spawn_ws(
825 async move {
826 ws.unsubscribe_ticker(instrument_id)
827 .await
828 .context("ticker unsubscribe")
829 },
830 "quote unsubscribe",
831 );
832 }
833 }
834 Ok(())
835 }
836
837 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
838 let instrument_id = cmd.instrument_id;
839 let product_type = self
840 .get_product_type_for_instrument(instrument_id)
841 .unwrap_or(BybitProductType::Linear);
842
843 let ws = self
844 .get_ws_client_for_product(product_type)
845 .context("no WebSocket client for product type")?
846 .clone();
847
848 self.spawn_ws(
849 async move {
850 ws.unsubscribe_trades(instrument_id)
851 .await
852 .context("trades unsubscribe")
853 },
854 "trade unsubscribe",
855 );
856 Ok(())
857 }
858
859 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
860 let instrument_id = cmd.instrument_id;
861 let product_type = self
862 .get_product_type_for_instrument(instrument_id)
863 .unwrap_or(BybitProductType::Linear);
864
865 let should_unsubscribe = {
866 let mut subs = self.ticker_subs.write().expect(MUTEX_POISONED);
867 if let Some(entry) = subs.get_mut(&instrument_id) {
868 entry.remove("funding");
869 if entry.is_empty() {
870 subs.remove(&instrument_id);
871 true
872 } else {
873 false
874 }
875 } else {
876 false
877 }
878 };
879
880 if should_unsubscribe {
881 let ws = self
882 .get_ws_client_for_product(product_type)
883 .context("no WebSocket client for product type")?
884 .clone();
885
886 self.spawn_ws(
887 async move {
888 ws.unsubscribe_ticker(instrument_id)
889 .await
890 .context("ticker unsubscribe for funding rates")
891 },
892 "funding rate unsubscribe",
893 );
894 }
895 Ok(())
896 }
897
898 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
899 let instrument_id = cmd.instrument_id;
900 let product_type = self
901 .get_product_type_for_instrument(instrument_id)
902 .unwrap_or(BybitProductType::Linear);
903
904 let should_unsubscribe = {
905 let mut subs = self.ticker_subs.write().expect(MUTEX_POISONED);
906 if let Some(entry) = subs.get_mut(&instrument_id) {
907 entry.remove("mark_prices");
908 if entry.is_empty() {
909 subs.remove(&instrument_id);
910 true
911 } else {
912 false
913 }
914 } else {
915 false
916 }
917 };
918
919 if should_unsubscribe {
920 let ws = self
921 .get_ws_client_for_product(product_type)
922 .context("no WebSocket client for product type")?
923 .clone();
924
925 self.spawn_ws(
926 async move {
927 ws.unsubscribe_ticker(instrument_id)
928 .await
929 .context("ticker unsubscribe for mark prices")
930 },
931 "mark price unsubscribe",
932 );
933 }
934 Ok(())
935 }
936
937 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
938 let instrument_id = cmd.instrument_id;
939 let product_type = self
940 .get_product_type_for_instrument(instrument_id)
941 .unwrap_or(BybitProductType::Linear);
942
943 let should_unsubscribe = {
944 let mut subs = self.ticker_subs.write().expect(MUTEX_POISONED);
945 if let Some(entry) = subs.get_mut(&instrument_id) {
946 entry.remove("index_prices");
947 if entry.is_empty() {
948 subs.remove(&instrument_id);
949 true
950 } else {
951 false
952 }
953 } else {
954 false
955 }
956 };
957
958 if should_unsubscribe {
959 let ws = self
960 .get_ws_client_for_product(product_type)
961 .context("no WebSocket client for product type")?
962 .clone();
963
964 self.spawn_ws(
965 async move {
966 ws.unsubscribe_ticker(instrument_id)
967 .await
968 .context("ticker unsubscribe for index prices")
969 },
970 "index price unsubscribe",
971 );
972 }
973 Ok(())
974 }
975
976 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
977 let bar_type = cmd.bar_type;
978 let instrument_id = bar_type.instrument_id();
979 let product_type = self
980 .get_product_type_for_instrument(instrument_id)
981 .unwrap_or(BybitProductType::Linear);
982
983 let ws = self
984 .get_ws_client_for_product(product_type)
985 .context("no WebSocket client for product type")?
986 .clone();
987
988 self.spawn_ws(
989 async move {
990 ws.unsubscribe_bars(bar_type)
991 .await
992 .context("bars unsubscribe")
993 },
994 "bar unsubscribe",
995 );
996 Ok(())
997 }
998
999 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
1000 let http = self.http_client.clone();
1001 let sender = self.data_sender.clone();
1002 let instruments_cache = self.instruments.clone();
1003 let request_id = request.request_id;
1004 let client_id = request.client_id.unwrap_or(self.client_id);
1005 let venue = self.venue();
1006 let start = request.start;
1007 let end = request.end;
1008 let params = request.params;
1009 let clock = self.clock;
1010 let start_nanos = datetime_to_unix_nanos(start);
1011 let end_nanos = datetime_to_unix_nanos(end);
1012 let product_types = if self.config.product_types.is_empty() {
1013 vec![BybitProductType::Linear]
1014 } else {
1015 self.config.product_types.clone()
1016 };
1017
1018 get_runtime().spawn(async move {
1019 let mut all_instruments = Vec::new();
1020
1021 for product_type in product_types {
1022 match http.request_instruments(product_type, None).await {
1023 Ok(instruments) => {
1024 for instrument in instruments {
1025 upsert_instrument(&instruments_cache, instrument.clone());
1026 all_instruments.push(instrument);
1027 }
1028 }
1029 Err(e) => {
1030 log::error!("Failed to fetch instruments for {product_type:?}: {e:?}");
1031 }
1032 }
1033 }
1034
1035 let response = DataResponse::Instruments(InstrumentsResponse::new(
1036 request_id,
1037 client_id,
1038 venue,
1039 all_instruments,
1040 start_nanos,
1041 end_nanos,
1042 clock.get_time_ns(),
1043 params,
1044 ));
1045
1046 if let Err(e) = sender.send(DataEvent::Response(response)) {
1047 log::error!("Failed to send instruments response: {e}");
1048 }
1049 });
1050
1051 Ok(())
1052 }
1053
1054 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1055 let http = self.http_client.clone();
1056 let sender = self.data_sender.clone();
1057 let instruments = self.instruments.clone();
1058 let instrument_id = request.instrument_id;
1059 let request_id = request.request_id;
1060 let client_id = request.client_id.unwrap_or(self.client_id);
1061 let start = request.start;
1062 let end = request.end;
1063 let params = request.params;
1064 let clock = self.clock;
1065 let start_nanos = datetime_to_unix_nanos(start);
1066 let end_nanos = datetime_to_unix_nanos(end);
1067
1068 let symbol_str = instrument_id.symbol.as_str();
1070 let product_type = if symbol_str.ends_with("-SPOT") || !symbol_str.contains('-') {
1071 BybitProductType::Spot
1072 } else if symbol_str.ends_with("-OPTION") {
1073 BybitProductType::Option
1074 } else if symbol_str.contains("USD")
1075 && !symbol_str.contains("USDT")
1076 && !symbol_str.contains("USDC")
1077 {
1078 BybitProductType::Inverse
1079 } else {
1080 BybitProductType::Linear
1081 };
1082 let raw_symbol = extract_raw_symbol(symbol_str).to_string();
1083
1084 get_runtime().spawn(async move {
1085 match http
1086 .request_instruments(product_type, Some(raw_symbol))
1087 .await
1088 .context("fetch instrument from API")
1089 {
1090 Ok(fetched) => {
1091 if let Some(instrument) = fetched.into_iter().find(|i| i.id() == instrument_id)
1092 {
1093 upsert_instrument(&instruments, instrument.clone());
1094
1095 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1096 request_id,
1097 client_id,
1098 instrument.id(),
1099 instrument,
1100 start_nanos,
1101 end_nanos,
1102 clock.get_time_ns(),
1103 params,
1104 )));
1105
1106 if let Err(e) = sender.send(DataEvent::Response(response)) {
1107 log::error!("Failed to send instrument response: {e}");
1108 }
1109 } else {
1110 log::error!("Instrument not found: {instrument_id}");
1111 }
1112 }
1113 Err(e) => log::error!("Instrument request failed: {e:?}"),
1114 }
1115 });
1116
1117 Ok(())
1118 }
1119
1120 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1121 let http = self.http_client.clone();
1122 let sender = self.data_sender.clone();
1123 let instrument_id = request.instrument_id;
1124 let start = request.start;
1125 let end = request.end;
1126 let limit = request.limit.map(|n| n.get() as u32);
1127 let request_id = request.request_id;
1128 let client_id = request.client_id.unwrap_or(self.client_id);
1129 let params = request.params;
1130 let clock = self.clock;
1131 let start_nanos = datetime_to_unix_nanos(start);
1132 let end_nanos = datetime_to_unix_nanos(end);
1133
1134 let symbol_str = instrument_id.symbol.as_str();
1136 let product_type = if symbol_str.ends_with("-SPOT") || !symbol_str.contains('-') {
1137 BybitProductType::Spot
1138 } else if symbol_str.ends_with("-OPTION") {
1139 BybitProductType::Option
1140 } else if symbol_str.contains("USD")
1141 && !symbol_str.contains("USDT")
1142 && !symbol_str.contains("USDC")
1143 {
1144 BybitProductType::Inverse
1145 } else {
1146 BybitProductType::Linear
1147 };
1148
1149 get_runtime().spawn(async move {
1150 match http
1151 .request_trades(product_type, instrument_id, limit)
1152 .await
1153 .context("failed to request trades from Bybit")
1154 {
1155 Ok(trades) => {
1156 let response = DataResponse::Trades(TradesResponse::new(
1157 request_id,
1158 client_id,
1159 instrument_id,
1160 trades,
1161 start_nanos,
1162 end_nanos,
1163 clock.get_time_ns(),
1164 params,
1165 ));
1166 if let Err(e) = sender.send(DataEvent::Response(response)) {
1167 log::error!("Failed to send trades response: {e}");
1168 }
1169 }
1170 Err(e) => log::error!("Trade request failed: {e:?}"),
1171 }
1172 });
1173
1174 Ok(())
1175 }
1176
1177 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1178 let http = self.http_client.clone();
1179 let sender = self.data_sender.clone();
1180 let bar_type = request.bar_type;
1181 let start = request.start;
1182 let end = request.end;
1183 let limit = request.limit.map(|n| n.get() as u32);
1184 let request_id = request.request_id;
1185 let client_id = request.client_id.unwrap_or(self.client_id);
1186 let params = request.params;
1187 let clock = self.clock;
1188 let start_nanos = datetime_to_unix_nanos(start);
1189 let end_nanos = datetime_to_unix_nanos(end);
1190
1191 let instrument_id = bar_type.instrument_id();
1193 let symbol_str = instrument_id.symbol.as_str();
1194 let product_type = if symbol_str.ends_with("-SPOT") || !symbol_str.contains('-') {
1195 BybitProductType::Spot
1196 } else if symbol_str.ends_with("-OPTION") {
1197 BybitProductType::Option
1198 } else if symbol_str.contains("USD")
1199 && !symbol_str.contains("USDT")
1200 && !symbol_str.contains("USDC")
1201 {
1202 BybitProductType::Inverse
1203 } else {
1204 BybitProductType::Linear
1205 };
1206
1207 get_runtime().spawn(async move {
1208 match http
1209 .request_bars(product_type, bar_type, start, end, limit, true)
1210 .await
1211 .context("failed to request bars from Bybit")
1212 {
1213 Ok(bars) => {
1214 let response = DataResponse::Bars(BarsResponse::new(
1215 request_id,
1216 client_id,
1217 bar_type,
1218 bars,
1219 start_nanos,
1220 end_nanos,
1221 clock.get_time_ns(),
1222 params,
1223 ));
1224 if let Err(e) = sender.send(DataEvent::Response(response)) {
1225 log::error!("Failed to send bars response: {e}");
1226 }
1227 }
1228 Err(e) => log::error!("Bar request failed: {e:?}"),
1229 }
1230 });
1231
1232 Ok(())
1233 }
1234}