bybit_ws_data/
ws_data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Connects to the Bybit public WebSocket feed and streams specified market data topics.
17//! Useful when manually validating the Rust WebSocket client implementation.
18
19use std::error::Error;
20
21use futures_util::StreamExt;
22use nautilus_bybit::{
23    common::enums::{BybitEnvironment, BybitProductType},
24    websocket::{client::BybitWebSocketClient, messages::NautilusWsMessage},
25};
26use nautilus_model::data::Data;
27use tokio::{pin, signal};
28use tracing::level_filters::LevelFilter;
29
30#[tokio::main]
31async fn main() -> Result<(), Box<dyn Error>> {
32    tracing_subscriber::fmt()
33        .with_max_level(LevelFilter::INFO)
34        .with_target(false)
35        .compact()
36        .init();
37
38    let mut client = BybitWebSocketClient::new_public_with(
39        BybitProductType::Linear,
40        BybitEnvironment::Mainnet,
41        None,
42        None,
43    );
44    client.connect().await?;
45
46    client
47        .subscribe(vec![
48            "orderbook.1.BTCUSDT".to_string(),
49            "publicTrade.BTCUSDT".to_string(),
50            "tickers.BTCUSDT".to_string(),
51        ])
52        .await?;
53
54    let stream = client.stream();
55    let shutdown = signal::ctrl_c();
56    pin!(stream);
57    pin!(shutdown);
58
59    tracing::info!("Streaming Bybit market data; press Ctrl+C to exit");
60
61    loop {
62        tokio::select! {
63            Some(event) = stream.next() => {
64                match event {
65                    NautilusWsMessage::Data(data_vec) => {
66                        tracing::info!(count = data_vec.len(), "data update");
67                        for data in data_vec {
68                            match data {
69                                Data::Trade(tick) => {
70                                    tracing::info!(instrument = %tick.instrument_id, price = %tick.price, size = %tick.size, "trade");
71                                }
72                                Data::Quote(quote) => {
73                                    tracing::info!(instrument = %quote.instrument_id, bid = %quote.bid_price, ask = %quote.ask_price, "quote");
74                                }
75                                Data::Bar(bar) => {
76                                    tracing::info!(bar_type = %bar.bar_type, close = %bar.close, "bar");
77                                }
78                                _ => {
79                                    tracing::debug!("other data type");
80                                }
81                            }
82                        }
83                    }
84                    NautilusWsMessage::Deltas(deltas) => {
85                        tracing::info!(instrument = %deltas.instrument_id, sequence = deltas.sequence, "orderbook deltas");
86                    }
87                    NautilusWsMessage::FundingRates(rates) => {
88                        tracing::info!(count = rates.len(), "funding rate updates");
89                        for rate in rates {
90                            tracing::info!(
91                                instrument = %rate.instrument_id,
92                                rate = %rate.rate,
93                                next_funding = ?rate.next_funding_ns,
94                                "funding rate"
95                            );
96                        }
97                    }
98                    NautilusWsMessage::OrderStatusReports(reports) => {
99                        tracing::info!(count = reports.len(), "order status reports");
100                        for report in reports {
101                            tracing::info!(
102                                instrument = %report.instrument_id,
103                                client_order_id = ?report.client_order_id,
104                                venue_order_id = ?report.venue_order_id,
105                                status = ?report.order_status,
106                                "order status report"
107                            );
108                        }
109                    }
110                    NautilusWsMessage::FillReports(reports) => {
111                        tracing::info!(count = reports.len(), "fill reports");
112                        for report in reports {
113                            tracing::info!(
114                                instrument = %report.instrument_id,
115                                client_order_id = ?report.client_order_id,
116                                venue_order_id = ?report.venue_order_id,
117                                last_qty = %report.last_qty,
118                                last_px = %report.last_px,
119                                "fill report"
120                            );
121                        }
122                    }
123                    NautilusWsMessage::PositionStatusReport(report) => {
124                        tracing::info!(instrument = %report.instrument_id, quantity = %report.quantity, "position status report");
125                    }
126                    NautilusWsMessage::AccountState(state) => {
127                        tracing::info!(account_id = %state.account_id, balances = state.balances.len(), "account state");
128                    }
129                    NautilusWsMessage::OrderRejected(event) => {
130                        tracing::warn!(trader_id = %event.trader_id, client_order_id = %event.client_order_id, reason = %event.reason, "order rejected");
131                    }
132                    NautilusWsMessage::OrderCancelRejected(event) => {
133                        tracing::warn!(trader_id = %event.trader_id, client_order_id = %event.client_order_id, reason = %event.reason, "order cancel rejected");
134                    }
135                    NautilusWsMessage::OrderModifyRejected(event) => {
136                        tracing::warn!(trader_id = %event.trader_id, client_order_id = %event.client_order_id, reason = %event.reason, "order modify rejected");
137                    }
138                    NautilusWsMessage::Error(err) => {
139                        tracing::error!(code = err.code, message = %err.message, "websocket error");
140                    }
141                    NautilusWsMessage::Reconnected => {
142                        tracing::warn!("WebSocket reconnected");
143                    }
144                    NautilusWsMessage::Authenticated => {
145                        tracing::info!("Authenticated successfully");
146                    }
147                }
148            }
149            _ = &mut shutdown => {
150                tracing::info!("Received Ctrl+C, closing connection");
151                client.close().await?;
152                break;
153            }
154            else => break,
155        }
156    }
157
158    Ok(())
159}