Skip to main content

nautilus_bybit/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the Bybit adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc, RwLock,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::{AHashMap, AHashSet};
27use anyhow::Context;
28use futures_util::{StreamExt, pin_mut};
29use nautilus_common::{
30    clients::DataClient,
31    live::{runner::get_data_event_sender, runtime::get_runtime},
32    messages::{
33        DataEvent,
34        data::{
35            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
36            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
37            SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices, SubscribeMarkPrices,
38            SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
39            UnsubscribeBookDeltas, UnsubscribeFundingRates, UnsubscribeIndexPrices,
40            UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
41        },
42    },
43};
44use nautilus_core::{
45    MUTEX_POISONED,
46    datetime::datetime_to_unix_nanos,
47    time::{AtomicTime, get_atomic_clock_realtime},
48};
49use nautilus_model::{
50    data::{Data, OrderBookDeltas_API},
51    enums::BookType,
52    identifiers::{ClientId, InstrumentId, Venue},
53    instruments::{Instrument, InstrumentAny},
54};
55use tokio::{task::JoinHandle, time::Duration};
56use tokio_util::sync::CancellationToken;
57
58use crate::{
59    common::{
60        consts::{BYBIT_DEFAULT_ORDERBOOK_DEPTH, BYBIT_VENUE},
61        enums::BybitProductType,
62        parse::extract_raw_symbol,
63    },
64    config::BybitDataClientConfig,
65    http::client::BybitHttpClient,
66    websocket::{client::BybitWebSocketClient, messages::NautilusWsMessage},
67};
68
69/// Live market data client for Bybit.
70#[derive(Debug)]
71pub struct BybitDataClient {
72    client_id: ClientId,
73    config: BybitDataClientConfig,
74    http_client: BybitHttpClient,
75    ws_clients: Vec<BybitWebSocketClient>,
76    is_connected: AtomicBool,
77    cancellation_token: CancellationToken,
78    tasks: Vec<JoinHandle<()>>,
79    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
80    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
81    book_depths: Arc<RwLock<AHashMap<InstrumentId, u32>>>,
82    quote_depths: Arc<RwLock<AHashMap<InstrumentId, u32>>>,
83    ticker_subs: Arc<RwLock<AHashMap<InstrumentId, AHashSet<&'static str>>>>,
84    clock: &'static AtomicTime,
85}
86
87impl BybitDataClient {
88    /// Creates a new [`BybitDataClient`] instance.
89    ///
90    /// # Errors
91    ///
92    /// Returns an error if the client fails to initialize.
93    pub fn new(client_id: ClientId, config: BybitDataClientConfig) -> anyhow::Result<Self> {
94        let clock = get_atomic_clock_realtime();
95        let data_sender = get_data_event_sender();
96
97        let http_client = if let (Some(api_key), Some(api_secret)) =
98            (config.api_key.clone(), config.api_secret.clone())
99        {
100            BybitHttpClient::with_credentials(
101                api_key,
102                api_secret,
103                Some(config.http_base_url()),
104                config.http_timeout_secs,
105                config.max_retries,
106                config.retry_delay_initial_ms,
107                config.retry_delay_max_ms,
108                config.recv_window_ms,
109                config.http_proxy_url.clone(),
110            )?
111        } else {
112            BybitHttpClient::new(
113                Some(config.http_base_url()),
114                config.http_timeout_secs,
115                config.max_retries,
116                config.retry_delay_initial_ms,
117                config.retry_delay_max_ms,
118                config.recv_window_ms,
119                config.http_proxy_url.clone(),
120            )?
121        };
122
123        // Create a WebSocket client for each product type (default to Linear if empty)
124        let product_types = if config.product_types.is_empty() {
125            vec![BybitProductType::Linear]
126        } else {
127            config.product_types.clone()
128        };
129
130        let ws_clients: Vec<BybitWebSocketClient> = product_types
131            .iter()
132            .map(|product_type| {
133                BybitWebSocketClient::new_public_with(
134                    *product_type,
135                    config.environment,
136                    Some(config.ws_public_url_for(*product_type)),
137                    config.heartbeat_interval_secs,
138                )
139            })
140            .collect();
141
142        Ok(Self {
143            client_id,
144            config,
145            http_client,
146            ws_clients,
147            is_connected: AtomicBool::new(false),
148            cancellation_token: CancellationToken::new(),
149            tasks: Vec::new(),
150            data_sender,
151            instruments: Arc::new(RwLock::new(AHashMap::new())),
152            book_depths: Arc::new(RwLock::new(AHashMap::new())),
153            quote_depths: Arc::new(RwLock::new(AHashMap::new())),
154            ticker_subs: Arc::new(RwLock::new(AHashMap::new())),
155            clock,
156        })
157    }
158
159    fn venue(&self) -> Venue {
160        *BYBIT_VENUE
161    }
162
163    fn get_ws_client_for_product(
164        &self,
165        product_type: BybitProductType,
166    ) -> Option<&BybitWebSocketClient> {
167        self.ws_clients
168            .iter()
169            .find(|ws| ws.product_type() == Some(product_type))
170    }
171
172    fn get_product_type_for_instrument(
173        &self,
174        instrument_id: InstrumentId,
175    ) -> Option<BybitProductType> {
176        let guard = self.instruments.read().expect(MUTEX_POISONED);
177        guard.get(&instrument_id).map(|inst| {
178            // Determine product type based on instrument characteristics
179            let symbol_str = instrument_id.symbol.as_str();
180            if symbol_str.ends_with("-SPOT") || !symbol_str.contains('-') {
181                BybitProductType::Spot
182            } else if symbol_str.ends_with("-OPTION") {
183                BybitProductType::Option
184            } else if inst.is_inverse() {
185                BybitProductType::Inverse
186            } else {
187                BybitProductType::Linear
188            }
189        })
190    }
191
192    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
193        if let Err(e) = sender.send(DataEvent::Data(data)) {
194            log::error!("Failed to emit data event: {e}");
195        }
196    }
197
198    fn spawn_ws<F>(&self, fut: F, context: &'static str)
199    where
200        F: Future<Output = anyhow::Result<()>> + Send + 'static,
201    {
202        get_runtime().spawn(async move {
203            if let Err(e) = fut.await {
204                log::error!("{context}: {e:?}");
205            }
206        });
207    }
208
209    fn handle_ws_message(
210        message: NautilusWsMessage,
211        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
212        _instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
213        ticker_subs: &Arc<RwLock<AHashMap<InstrumentId, AHashSet<&'static str>>>>,
214        quote_depths: &Arc<RwLock<AHashMap<InstrumentId, u32>>>,
215        book_depths: &Arc<RwLock<AHashMap<InstrumentId, u32>>>,
216    ) {
217        match message {
218            NautilusWsMessage::Data(payloads) => {
219                let ticker = ticker_subs.read().expect(MUTEX_POISONED);
220                let depths = quote_depths.read().expect(MUTEX_POISONED);
221                for data in payloads {
222                    // Filter quotes - only emit if subscribed via ticker (LINEAR) or depth (SPOT)
223                    if let Data::Quote(ref quote) = data {
224                        let has_ticker_sub = ticker
225                            .get(&quote.instrument_id)
226                            .is_some_and(|s| s.contains("quotes"));
227                        let has_depth_sub = depths.contains_key(&quote.instrument_id);
228                        if !has_ticker_sub && !has_depth_sub {
229                            continue;
230                        }
231                    }
232                    Self::send_data(data_sender, data);
233                }
234            }
235            NautilusWsMessage::Deltas(deltas) => {
236                let books = book_depths.read().expect(MUTEX_POISONED);
237                if books.contains_key(&deltas.instrument_id) {
238                    Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
239                }
240            }
241            NautilusWsMessage::FundingRates(updates) => {
242                let subs = ticker_subs.read().expect(MUTEX_POISONED);
243                for update in updates {
244                    if !subs
245                        .get(&update.instrument_id)
246                        .is_some_and(|s| s.contains("funding"))
247                    {
248                        continue;
249                    }
250                    if let Err(e) = data_sender.send(DataEvent::FundingRate(update)) {
251                        log::error!("Failed to emit funding rate event: {e}");
252                    }
253                }
254            }
255            NautilusWsMessage::MarkPrices(updates) => {
256                let subs = ticker_subs.read().expect(MUTEX_POISONED);
257                for update in updates {
258                    if subs
259                        .get(&update.instrument_id)
260                        .is_some_and(|s| s.contains("mark_prices"))
261                    {
262                        Self::send_data(data_sender, Data::MarkPriceUpdate(update));
263                    }
264                }
265            }
266            NautilusWsMessage::IndexPrices(updates) => {
267                let subs = ticker_subs.read().expect(MUTEX_POISONED);
268                for update in updates {
269                    if subs
270                        .get(&update.instrument_id)
271                        .is_some_and(|s| s.contains("index_prices"))
272                    {
273                        Self::send_data(data_sender, Data::IndexPriceUpdate(update));
274                    }
275                }
276            }
277            NautilusWsMessage::OrderStatusReports(_)
278            | NautilusWsMessage::FillReports(_)
279            | NautilusWsMessage::PositionStatusReport(_)
280            | NautilusWsMessage::AccountState(_)
281            | NautilusWsMessage::OrderRejected(_)
282            | NautilusWsMessage::OrderCancelRejected(_)
283            | NautilusWsMessage::OrderModifyRejected(_) => {
284                log::debug!("Ignoring trading message on data client");
285            }
286            NautilusWsMessage::Error(e) => {
287                log::error!(
288                    "Bybit websocket error: code={} message={}",
289                    e.code,
290                    e.message
291                );
292            }
293            NautilusWsMessage::Reconnected => {
294                log::info!("Websocket reconnected");
295            }
296            NautilusWsMessage::Authenticated => {
297                log::debug!("Websocket authenticated");
298            }
299        }
300    }
301}
302
303fn upsert_instrument(
304    cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
305    instrument: InstrumentAny,
306) {
307    let mut guard = cache.write().expect(MUTEX_POISONED);
308    guard.insert(instrument.id(), instrument);
309}
310
311#[async_trait::async_trait(?Send)]
312impl DataClient for BybitDataClient {
313    fn client_id(&self) -> ClientId {
314        self.client_id
315    }
316
317    fn venue(&self) -> Option<Venue> {
318        Some(self.venue())
319    }
320
321    fn start(&mut self) -> anyhow::Result<()> {
322        log::info!(
323            "Started: client_id={}, product_types={:?}, environment={:?}, http_proxy_url={:?}, ws_proxy_url={:?}",
324            self.client_id,
325            self.config.product_types,
326            self.config.environment,
327            self.config.http_proxy_url,
328            self.config.ws_proxy_url,
329        );
330        Ok(())
331    }
332
333    fn stop(&mut self) -> anyhow::Result<()> {
334        log::info!("Stopping {id}", id = self.client_id);
335        self.cancellation_token.cancel();
336        self.is_connected.store(false, Ordering::Relaxed);
337        Ok(())
338    }
339
340    fn reset(&mut self) -> anyhow::Result<()> {
341        log::debug!("Resetting {id}", id = self.client_id);
342        self.is_connected.store(false, Ordering::Relaxed);
343        self.cancellation_token = CancellationToken::new();
344        self.tasks.clear();
345        self.book_depths.write().expect(MUTEX_POISONED).clear();
346        self.quote_depths.write().expect(MUTEX_POISONED).clear();
347        self.ticker_subs.write().expect(MUTEX_POISONED).clear();
348        Ok(())
349    }
350
351    fn dispose(&mut self) -> anyhow::Result<()> {
352        log::debug!("Disposing {id}", id = self.client_id);
353        self.stop()
354    }
355
356    async fn connect(&mut self) -> anyhow::Result<()> {
357        if self.is_connected() {
358            return Ok(());
359        }
360
361        let product_types = if self.config.product_types.is_empty() {
362            vec![BybitProductType::Linear]
363        } else {
364            self.config.product_types.clone()
365        };
366
367        let mut all_instruments = Vec::new();
368        for product_type in &product_types {
369            let fetched = self
370                .http_client
371                .request_instruments(*product_type, None)
372                .await
373                .with_context(|| {
374                    format!("failed to request Bybit instruments for {product_type:?}")
375                })?;
376
377            self.http_client.cache_instruments(fetched.clone());
378
379            let mut guard = self.instruments.write().expect(MUTEX_POISONED);
380            for instrument in &fetched {
381                guard.insert(instrument.id(), instrument.clone());
382            }
383            drop(guard);
384
385            all_instruments.extend(fetched);
386        }
387
388        for instrument in all_instruments {
389            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
390                log::warn!("Failed to send instrument: {e}");
391            }
392        }
393
394        for ws_client in &mut self.ws_clients {
395            // Cache instruments before connecting so parser has price/size precision
396            let instruments: Vec<_> = self
397                .instruments
398                .read()
399                .expect(MUTEX_POISONED)
400                .values()
401                .cloned()
402                .collect();
403            ws_client.cache_instruments(instruments);
404
405            ws_client
406                .connect()
407                .await
408                .context("failed to connect Bybit WebSocket")?;
409            ws_client
410                .wait_until_active(10.0)
411                .await
412                .context("WebSocket did not become active")?;
413
414            let stream = ws_client.stream();
415            let sender = self.data_sender.clone();
416            let insts = self.instruments.clone();
417            let ticker_subs = self.ticker_subs.clone();
418            let quote_depths = self.quote_depths.clone();
419            let book_depths = self.book_depths.clone();
420            let cancel = self.cancellation_token.clone();
421            let handle = get_runtime().spawn(async move {
422                pin_mut!(stream);
423                loop {
424                    tokio::select! {
425                        Some(message) = stream.next() => {
426                            Self::handle_ws_message(message, &sender, &insts, &ticker_subs, &quote_depths, &book_depths);
427                        }
428                        () = cancel.cancelled() => {
429                            log::debug!("WebSocket stream task cancelled");
430                            break;
431                        }
432                    }
433                }
434            });
435            self.tasks.push(handle);
436        }
437
438        self.is_connected.store(true, Ordering::Release);
439        log::info!("Connected: client_id={}", self.client_id);
440        Ok(())
441    }
442
443    async fn disconnect(&mut self) -> anyhow::Result<()> {
444        if self.is_disconnected() {
445            return Ok(());
446        }
447
448        self.cancellation_token.cancel();
449
450        // Reinitialize token so reconnect can spawn new stream tasks
451        self.cancellation_token = CancellationToken::new();
452
453        for ws_client in &mut self.ws_clients {
454            if let Err(e) = ws_client.close().await {
455                log::warn!("Error closing WebSocket: {e:?}");
456            }
457        }
458
459        // Allow time for unsubscribe confirmations
460        tokio::time::sleep(Duration::from_millis(500)).await;
461
462        let handles: Vec<_> = self.tasks.drain(..).collect();
463        for handle in handles {
464            if let Err(e) = handle.await {
465                log::error!("Error joining WebSocket task: {e}");
466            }
467        }
468
469        self.book_depths.write().expect(MUTEX_POISONED).clear();
470        self.quote_depths.write().expect(MUTEX_POISONED).clear();
471        self.ticker_subs.write().expect(MUTEX_POISONED).clear();
472        self.is_connected.store(false, Ordering::Release);
473        log::info!("Disconnected: client_id={}", self.client_id);
474        Ok(())
475    }
476
477    fn is_connected(&self) -> bool {
478        self.is_connected.load(Ordering::Relaxed)
479    }
480
481    fn is_disconnected(&self) -> bool {
482        !self.is_connected()
483    }
484
485    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
486        if cmd.book_type != BookType::L2_MBP {
487            anyhow::bail!("Bybit only supports L2_MBP order book deltas");
488        }
489
490        let depth = cmd
491            .depth
492            .map_or(BYBIT_DEFAULT_ORDERBOOK_DEPTH, |d| d.get() as u32);
493        if !matches!(depth, 1 | 50 | 200 | 500) {
494            anyhow::bail!("invalid depth {depth}; valid values are 1, 50, 200, or 500");
495        }
496
497        let instrument_id = cmd.instrument_id;
498        let product_type = self
499            .get_product_type_for_instrument(instrument_id)
500            .unwrap_or(BybitProductType::Linear);
501
502        let ws = self
503            .get_ws_client_for_product(product_type)
504            .context("no WebSocket client for product type")?
505            .clone();
506
507        let book_depths = Arc::clone(&self.book_depths);
508
509        self.spawn_ws(
510            async move {
511                ws.subscribe_orderbook(instrument_id, depth)
512                    .await
513                    .context("orderbook subscription")?;
514                book_depths
515                    .write()
516                    .expect("book depths cache lock poisoned")
517                    .insert(instrument_id, depth);
518                Ok(())
519            },
520            "order book delta subscription",
521        );
522
523        Ok(())
524    }
525
526    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
527        let instrument_id = cmd.instrument_id;
528        let product_type = self
529            .get_product_type_for_instrument(instrument_id)
530            .unwrap_or(BybitProductType::Linear);
531
532        let ws = self
533            .get_ws_client_for_product(product_type)
534            .context("no WebSocket client for product type")?
535            .clone();
536
537        // SPOT ticker channel doesn't include bid/ask, use orderbook depth=1
538        if product_type == BybitProductType::Spot {
539            let depth = 1;
540            self.quote_depths
541                .write()
542                .expect(MUTEX_POISONED)
543                .insert(instrument_id, depth);
544
545            self.spawn_ws(
546                async move {
547                    ws.subscribe_orderbook(instrument_id, depth)
548                        .await
549                        .context("orderbook subscription for quotes")
550                },
551                "quote subscription (spot orderbook)",
552            );
553        } else {
554            let should_subscribe = {
555                let mut subs = self.ticker_subs.write().expect(MUTEX_POISONED);
556                let entry = subs.entry(instrument_id).or_default();
557                let is_first = entry.is_empty();
558                entry.insert("quotes");
559                is_first
560            };
561
562            if should_subscribe {
563                self.spawn_ws(
564                    async move {
565                        ws.subscribe_ticker(instrument_id)
566                            .await
567                            .context("ticker subscription")
568                    },
569                    "quote subscription",
570                );
571            }
572        }
573        Ok(())
574    }
575
576    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
577        let instrument_id = cmd.instrument_id;
578        let product_type = self
579            .get_product_type_for_instrument(instrument_id)
580            .unwrap_or(BybitProductType::Linear);
581
582        let ws = self
583            .get_ws_client_for_product(product_type)
584            .context("no WebSocket client for product type")?
585            .clone();
586
587        self.spawn_ws(
588            async move {
589                ws.subscribe_trades(instrument_id)
590                    .await
591                    .context("trades subscription")
592            },
593            "trade subscription",
594        );
595        Ok(())
596    }
597
598    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
599        let instrument_id = cmd.instrument_id;
600        let product_type = self
601            .get_product_type_for_instrument(instrument_id)
602            .unwrap_or(BybitProductType::Linear);
603
604        if product_type == BybitProductType::Spot {
605            anyhow::bail!("Funding rates not available for Spot instruments");
606        }
607
608        let should_subscribe = {
609            let mut subs = self.ticker_subs.write().expect(MUTEX_POISONED);
610            let entry = subs.entry(instrument_id).or_default();
611            let first = entry.is_empty();
612            entry.insert("funding");
613            first
614        };
615
616        if should_subscribe {
617            let ws = self
618                .get_ws_client_for_product(product_type)
619                .context("no WebSocket client for product type")?
620                .clone();
621
622            self.spawn_ws(
623                async move {
624                    ws.subscribe_ticker(instrument_id)
625                        .await
626                        .context("ticker subscription for funding rates")
627                },
628                "funding rate subscription",
629            );
630        }
631        Ok(())
632    }
633
634    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
635        let instrument_id = cmd.instrument_id;
636        let product_type = self
637            .get_product_type_for_instrument(instrument_id)
638            .unwrap_or(BybitProductType::Linear);
639
640        if product_type == BybitProductType::Spot {
641            anyhow::bail!("Mark prices not available for Spot instruments");
642        }
643
644        let should_subscribe = {
645            let mut subs = self.ticker_subs.write().expect(MUTEX_POISONED);
646            let entry = subs.entry(instrument_id).or_default();
647            let first = entry.is_empty();
648            entry.insert("mark_prices");
649            first
650        };
651
652        if should_subscribe {
653            let ws = self
654                .get_ws_client_for_product(product_type)
655                .context("no WebSocket client for product type")?
656                .clone();
657
658            self.spawn_ws(
659                async move {
660                    ws.subscribe_ticker(instrument_id)
661                        .await
662                        .context("ticker subscription for mark prices")
663                },
664                "mark price subscription",
665            );
666        }
667        Ok(())
668    }
669
670    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
671        let instrument_id = cmd.instrument_id;
672        let product_type = self
673            .get_product_type_for_instrument(instrument_id)
674            .unwrap_or(BybitProductType::Linear);
675
676        if product_type == BybitProductType::Spot {
677            anyhow::bail!("Index prices not available for Spot instruments");
678        }
679
680        let should_subscribe = {
681            let mut subs = self.ticker_subs.write().expect(MUTEX_POISONED);
682            let entry = subs.entry(instrument_id).or_default();
683            let first = entry.is_empty();
684            entry.insert("index_prices");
685            first
686        };
687
688        if should_subscribe {
689            let ws = self
690                .get_ws_client_for_product(product_type)
691                .context("no WebSocket client for product type")?
692                .clone();
693
694            self.spawn_ws(
695                async move {
696                    ws.subscribe_ticker(instrument_id)
697                        .await
698                        .context("ticker subscription for index prices")
699                },
700                "index price subscription",
701            );
702        }
703        Ok(())
704    }
705
706    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
707        let bar_type = cmd.bar_type;
708        let instrument_id = bar_type.instrument_id();
709        let product_type = self
710            .get_product_type_for_instrument(instrument_id)
711            .unwrap_or(BybitProductType::Linear);
712
713        let ws = self
714            .get_ws_client_for_product(product_type)
715            .context("no WebSocket client for product type")?
716            .clone();
717
718        self.spawn_ws(
719            async move {
720                ws.subscribe_bars(bar_type)
721                    .await
722                    .context("bars subscription")
723            },
724            "bar subscription",
725        );
726        Ok(())
727    }
728
729    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
730        let instrument_id = cmd.instrument_id;
731        let depth = self
732            .book_depths
733            .write()
734            .expect(MUTEX_POISONED)
735            .remove(&instrument_id)
736            .unwrap_or(BYBIT_DEFAULT_ORDERBOOK_DEPTH);
737
738        let product_type = self
739            .get_product_type_for_instrument(instrument_id)
740            .unwrap_or(BybitProductType::Linear);
741
742        // Check if spot quote subscription is using the same depth
743        let quote_using_same_depth = self
744            .quote_depths
745            .read()
746            .expect(MUTEX_POISONED)
747            .get(&instrument_id)
748            .is_some_and(|&d| d == depth);
749
750        if quote_using_same_depth {
751            return Ok(());
752        }
753
754        let ws = self
755            .get_ws_client_for_product(product_type)
756            .context("no WebSocket client for product type")?
757            .clone();
758
759        self.spawn_ws(
760            async move {
761                ws.unsubscribe_orderbook(instrument_id, depth)
762                    .await
763                    .context("orderbook unsubscribe")
764            },
765            "order book unsubscribe",
766        );
767        Ok(())
768    }
769
770    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
771        let instrument_id = cmd.instrument_id;
772        let product_type = self
773            .get_product_type_for_instrument(instrument_id)
774            .unwrap_or(BybitProductType::Linear);
775
776        let ws = self
777            .get_ws_client_for_product(product_type)
778            .context("no WebSocket client for product type")?
779            .clone();
780
781        if product_type == BybitProductType::Spot {
782            let depth = self
783                .quote_depths
784                .write()
785                .expect(MUTEX_POISONED)
786                .remove(&instrument_id)
787                .unwrap_or(1);
788
789            // Check if book deltas subscription is using the same depth
790            let book_using_same_depth = self
791                .book_depths
792                .read()
793                .expect(MUTEX_POISONED)
794                .get(&instrument_id)
795                .is_some_and(|&d| d == depth);
796
797            if !book_using_same_depth {
798                self.spawn_ws(
799                    async move {
800                        ws.unsubscribe_orderbook(instrument_id, depth)
801                            .await
802                            .context("orderbook unsubscribe for quotes")
803                    },
804                    "quote unsubscribe (spot orderbook)",
805                );
806            }
807        } else {
808            let should_unsubscribe = {
809                let mut subs = self.ticker_subs.write().expect(MUTEX_POISONED);
810                if let Some(entry) = subs.get_mut(&instrument_id) {
811                    entry.remove("quotes");
812                    if entry.is_empty() {
813                        subs.remove(&instrument_id);
814                        true
815                    } else {
816                        false
817                    }
818                } else {
819                    false
820                }
821            };
822
823            if should_unsubscribe {
824                self.spawn_ws(
825                    async move {
826                        ws.unsubscribe_ticker(instrument_id)
827                            .await
828                            .context("ticker unsubscribe")
829                    },
830                    "quote unsubscribe",
831                );
832            }
833        }
834        Ok(())
835    }
836
837    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
838        let instrument_id = cmd.instrument_id;
839        let product_type = self
840            .get_product_type_for_instrument(instrument_id)
841            .unwrap_or(BybitProductType::Linear);
842
843        let ws = self
844            .get_ws_client_for_product(product_type)
845            .context("no WebSocket client for product type")?
846            .clone();
847
848        self.spawn_ws(
849            async move {
850                ws.unsubscribe_trades(instrument_id)
851                    .await
852                    .context("trades unsubscribe")
853            },
854            "trade unsubscribe",
855        );
856        Ok(())
857    }
858
859    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
860        let instrument_id = cmd.instrument_id;
861        let product_type = self
862            .get_product_type_for_instrument(instrument_id)
863            .unwrap_or(BybitProductType::Linear);
864
865        let should_unsubscribe = {
866            let mut subs = self.ticker_subs.write().expect(MUTEX_POISONED);
867            if let Some(entry) = subs.get_mut(&instrument_id) {
868                entry.remove("funding");
869                if entry.is_empty() {
870                    subs.remove(&instrument_id);
871                    true
872                } else {
873                    false
874                }
875            } else {
876                false
877            }
878        };
879
880        if should_unsubscribe {
881            let ws = self
882                .get_ws_client_for_product(product_type)
883                .context("no WebSocket client for product type")?
884                .clone();
885
886            self.spawn_ws(
887                async move {
888                    ws.unsubscribe_ticker(instrument_id)
889                        .await
890                        .context("ticker unsubscribe for funding rates")
891                },
892                "funding rate unsubscribe",
893            );
894        }
895        Ok(())
896    }
897
898    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
899        let instrument_id = cmd.instrument_id;
900        let product_type = self
901            .get_product_type_for_instrument(instrument_id)
902            .unwrap_or(BybitProductType::Linear);
903
904        let should_unsubscribe = {
905            let mut subs = self.ticker_subs.write().expect(MUTEX_POISONED);
906            if let Some(entry) = subs.get_mut(&instrument_id) {
907                entry.remove("mark_prices");
908                if entry.is_empty() {
909                    subs.remove(&instrument_id);
910                    true
911                } else {
912                    false
913                }
914            } else {
915                false
916            }
917        };
918
919        if should_unsubscribe {
920            let ws = self
921                .get_ws_client_for_product(product_type)
922                .context("no WebSocket client for product type")?
923                .clone();
924
925            self.spawn_ws(
926                async move {
927                    ws.unsubscribe_ticker(instrument_id)
928                        .await
929                        .context("ticker unsubscribe for mark prices")
930                },
931                "mark price unsubscribe",
932            );
933        }
934        Ok(())
935    }
936
937    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
938        let instrument_id = cmd.instrument_id;
939        let product_type = self
940            .get_product_type_for_instrument(instrument_id)
941            .unwrap_or(BybitProductType::Linear);
942
943        let should_unsubscribe = {
944            let mut subs = self.ticker_subs.write().expect(MUTEX_POISONED);
945            if let Some(entry) = subs.get_mut(&instrument_id) {
946                entry.remove("index_prices");
947                if entry.is_empty() {
948                    subs.remove(&instrument_id);
949                    true
950                } else {
951                    false
952                }
953            } else {
954                false
955            }
956        };
957
958        if should_unsubscribe {
959            let ws = self
960                .get_ws_client_for_product(product_type)
961                .context("no WebSocket client for product type")?
962                .clone();
963
964            self.spawn_ws(
965                async move {
966                    ws.unsubscribe_ticker(instrument_id)
967                        .await
968                        .context("ticker unsubscribe for index prices")
969                },
970                "index price unsubscribe",
971            );
972        }
973        Ok(())
974    }
975
976    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
977        let bar_type = cmd.bar_type;
978        let instrument_id = bar_type.instrument_id();
979        let product_type = self
980            .get_product_type_for_instrument(instrument_id)
981            .unwrap_or(BybitProductType::Linear);
982
983        let ws = self
984            .get_ws_client_for_product(product_type)
985            .context("no WebSocket client for product type")?
986            .clone();
987
988        self.spawn_ws(
989            async move {
990                ws.unsubscribe_bars(bar_type)
991                    .await
992                    .context("bars unsubscribe")
993            },
994            "bar unsubscribe",
995        );
996        Ok(())
997    }
998
999    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
1000        let http = self.http_client.clone();
1001        let sender = self.data_sender.clone();
1002        let instruments_cache = self.instruments.clone();
1003        let request_id = request.request_id;
1004        let client_id = request.client_id.unwrap_or(self.client_id);
1005        let venue = self.venue();
1006        let start = request.start;
1007        let end = request.end;
1008        let params = request.params;
1009        let clock = self.clock;
1010        let start_nanos = datetime_to_unix_nanos(start);
1011        let end_nanos = datetime_to_unix_nanos(end);
1012        let product_types = if self.config.product_types.is_empty() {
1013            vec![BybitProductType::Linear]
1014        } else {
1015            self.config.product_types.clone()
1016        };
1017
1018        get_runtime().spawn(async move {
1019            let mut all_instruments = Vec::new();
1020
1021            for product_type in product_types {
1022                match http.request_instruments(product_type, None).await {
1023                    Ok(instruments) => {
1024                        for instrument in instruments {
1025                            upsert_instrument(&instruments_cache, instrument.clone());
1026                            all_instruments.push(instrument);
1027                        }
1028                    }
1029                    Err(e) => {
1030                        log::error!("Failed to fetch instruments for {product_type:?}: {e:?}");
1031                    }
1032                }
1033            }
1034
1035            let response = DataResponse::Instruments(InstrumentsResponse::new(
1036                request_id,
1037                client_id,
1038                venue,
1039                all_instruments,
1040                start_nanos,
1041                end_nanos,
1042                clock.get_time_ns(),
1043                params,
1044            ));
1045
1046            if let Err(e) = sender.send(DataEvent::Response(response)) {
1047                log::error!("Failed to send instruments response: {e}");
1048            }
1049        });
1050
1051        Ok(())
1052    }
1053
1054    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1055        let http = self.http_client.clone();
1056        let sender = self.data_sender.clone();
1057        let instruments = self.instruments.clone();
1058        let instrument_id = request.instrument_id;
1059        let request_id = request.request_id;
1060        let client_id = request.client_id.unwrap_or(self.client_id);
1061        let start = request.start;
1062        let end = request.end;
1063        let params = request.params;
1064        let clock = self.clock;
1065        let start_nanos = datetime_to_unix_nanos(start);
1066        let end_nanos = datetime_to_unix_nanos(end);
1067
1068        // Determine product type from symbol
1069        let symbol_str = instrument_id.symbol.as_str();
1070        let product_type = if symbol_str.ends_with("-SPOT") || !symbol_str.contains('-') {
1071            BybitProductType::Spot
1072        } else if symbol_str.ends_with("-OPTION") {
1073            BybitProductType::Option
1074        } else if symbol_str.contains("USD")
1075            && !symbol_str.contains("USDT")
1076            && !symbol_str.contains("USDC")
1077        {
1078            BybitProductType::Inverse
1079        } else {
1080            BybitProductType::Linear
1081        };
1082        let raw_symbol = extract_raw_symbol(symbol_str).to_string();
1083
1084        get_runtime().spawn(async move {
1085            match http
1086                .request_instruments(product_type, Some(raw_symbol))
1087                .await
1088                .context("fetch instrument from API")
1089            {
1090                Ok(fetched) => {
1091                    if let Some(instrument) = fetched.into_iter().find(|i| i.id() == instrument_id)
1092                    {
1093                        upsert_instrument(&instruments, instrument.clone());
1094
1095                        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1096                            request_id,
1097                            client_id,
1098                            instrument.id(),
1099                            instrument,
1100                            start_nanos,
1101                            end_nanos,
1102                            clock.get_time_ns(),
1103                            params,
1104                        )));
1105
1106                        if let Err(e) = sender.send(DataEvent::Response(response)) {
1107                            log::error!("Failed to send instrument response: {e}");
1108                        }
1109                    } else {
1110                        log::error!("Instrument not found: {instrument_id}");
1111                    }
1112                }
1113                Err(e) => log::error!("Instrument request failed: {e:?}"),
1114            }
1115        });
1116
1117        Ok(())
1118    }
1119
1120    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1121        let http = self.http_client.clone();
1122        let sender = self.data_sender.clone();
1123        let instrument_id = request.instrument_id;
1124        let start = request.start;
1125        let end = request.end;
1126        let limit = request.limit.map(|n| n.get() as u32);
1127        let request_id = request.request_id;
1128        let client_id = request.client_id.unwrap_or(self.client_id);
1129        let params = request.params;
1130        let clock = self.clock;
1131        let start_nanos = datetime_to_unix_nanos(start);
1132        let end_nanos = datetime_to_unix_nanos(end);
1133
1134        // Determine product type from symbol
1135        let symbol_str = instrument_id.symbol.as_str();
1136        let product_type = if symbol_str.ends_with("-SPOT") || !symbol_str.contains('-') {
1137            BybitProductType::Spot
1138        } else if symbol_str.ends_with("-OPTION") {
1139            BybitProductType::Option
1140        } else if symbol_str.contains("USD")
1141            && !symbol_str.contains("USDT")
1142            && !symbol_str.contains("USDC")
1143        {
1144            BybitProductType::Inverse
1145        } else {
1146            BybitProductType::Linear
1147        };
1148
1149        get_runtime().spawn(async move {
1150            match http
1151                .request_trades(product_type, instrument_id, limit)
1152                .await
1153                .context("failed to request trades from Bybit")
1154            {
1155                Ok(trades) => {
1156                    let response = DataResponse::Trades(TradesResponse::new(
1157                        request_id,
1158                        client_id,
1159                        instrument_id,
1160                        trades,
1161                        start_nanos,
1162                        end_nanos,
1163                        clock.get_time_ns(),
1164                        params,
1165                    ));
1166                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1167                        log::error!("Failed to send trades response: {e}");
1168                    }
1169                }
1170                Err(e) => log::error!("Trade request failed: {e:?}"),
1171            }
1172        });
1173
1174        Ok(())
1175    }
1176
1177    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1178        let http = self.http_client.clone();
1179        let sender = self.data_sender.clone();
1180        let bar_type = request.bar_type;
1181        let start = request.start;
1182        let end = request.end;
1183        let limit = request.limit.map(|n| n.get() as u32);
1184        let request_id = request.request_id;
1185        let client_id = request.client_id.unwrap_or(self.client_id);
1186        let params = request.params;
1187        let clock = self.clock;
1188        let start_nanos = datetime_to_unix_nanos(start);
1189        let end_nanos = datetime_to_unix_nanos(end);
1190
1191        // Determine product type from symbol
1192        let instrument_id = bar_type.instrument_id();
1193        let symbol_str = instrument_id.symbol.as_str();
1194        let product_type = if symbol_str.ends_with("-SPOT") || !symbol_str.contains('-') {
1195            BybitProductType::Spot
1196        } else if symbol_str.ends_with("-OPTION") {
1197            BybitProductType::Option
1198        } else if symbol_str.contains("USD")
1199            && !symbol_str.contains("USDT")
1200            && !symbol_str.contains("USDC")
1201        {
1202            BybitProductType::Inverse
1203        } else {
1204            BybitProductType::Linear
1205        };
1206
1207        get_runtime().spawn(async move {
1208            match http
1209                .request_bars(product_type, bar_type, start, end, limit, true)
1210                .await
1211                .context("failed to request bars from Bybit")
1212            {
1213                Ok(bars) => {
1214                    let response = DataResponse::Bars(BarsResponse::new(
1215                        request_id,
1216                        client_id,
1217                        bar_type,
1218                        bars,
1219                        start_nanos,
1220                        end_nanos,
1221                        clock.get_time_ns(),
1222                        params,
1223                    ));
1224                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1225                        log::error!("Failed to send bars response: {e}");
1226                    }
1227                }
1228                Err(e) => log::error!("Bar request failed: {e:?}"),
1229            }
1230        });
1231
1232        Ok(())
1233    }
1234}