bybit_ws_data/
ws_data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Connects to the Bybit public WebSocket feed and streams specified market data topics.
17//! Useful when manually validating the Rust WebSocket client implementation.
18
19use std::error::Error;
20
21use futures_util::StreamExt;
22use nautilus_bybit::{
23    common::enums::{BybitEnvironment, BybitProductType},
24    websocket::{client::BybitWebSocketClient, messages::NautilusWsMessage},
25};
26use tokio::{pin, signal};
27use tracing::level_filters::LevelFilter;
28
29#[tokio::main]
30async fn main() -> Result<(), Box<dyn Error>> {
31    tracing_subscriber::fmt()
32        .with_max_level(LevelFilter::INFO)
33        .with_target(false)
34        .compact()
35        .init();
36
37    let mut client = BybitWebSocketClient::new_public_with(
38        BybitProductType::Linear,
39        BybitEnvironment::Mainnet,
40        None,
41        None,
42    );
43    client.connect().await?;
44
45    client
46        .subscribe(vec![
47            "orderbook.1.BTCUSDT".to_string(),
48            "publicTrade.BTCUSDT".to_string(),
49            "tickers.BTCUSDT".to_string(),
50        ])
51        .await?;
52
53    let stream = client.stream();
54    let shutdown = signal::ctrl_c();
55    pin!(stream);
56    pin!(shutdown);
57
58    tracing::info!("Streaming Bybit market data; press Ctrl+C to exit");
59
60    loop {
61        tokio::select! {
62            Some(event) = stream.next() => {
63                match event {
64                    NautilusWsMessage::Data(data_vec) => {
65                        tracing::info!(count = data_vec.len(), "data update");
66                        for data in data_vec {
67                            match data {
68                                nautilus_model::data::Data::Trade(tick) => {
69                                    tracing::info!(instrument = %tick.instrument_id, price = %tick.price, size = %tick.size, "trade");
70                                }
71                                nautilus_model::data::Data::Quote(quote) => {
72                                    tracing::info!(instrument = %quote.instrument_id, bid = %quote.bid_price, ask = %quote.ask_price, "quote");
73                                }
74                                nautilus_model::data::Data::Bar(bar) => {
75                                    tracing::info!(bar_type = %bar.bar_type, close = %bar.close, "bar");
76                                }
77                                _ => {
78                                    tracing::debug!("other data type");
79                                }
80                            }
81                        }
82                    }
83                    NautilusWsMessage::Deltas(deltas) => {
84                        tracing::info!(instrument = %deltas.instrument_id, sequence = deltas.sequence, "orderbook deltas");
85                    }
86                    NautilusWsMessage::FundingRates(rates) => {
87                        tracing::info!(count = rates.len(), "funding rate updates");
88                        for rate in rates {
89                            tracing::info!(
90                                instrument = %rate.instrument_id,
91                                rate = %rate.rate,
92                                next_funding = ?rate.next_funding_ns,
93                                "funding rate"
94                            );
95                        }
96                    }
97                    NautilusWsMessage::OrderStatusReports(reports) => {
98                        tracing::info!(count = reports.len(), "order status reports");
99                        for report in reports {
100                            tracing::info!(
101                                instrument = %report.instrument_id,
102                                client_order_id = ?report.client_order_id,
103                                venue_order_id = ?report.venue_order_id,
104                                status = ?report.order_status,
105                                "order status report"
106                            );
107                        }
108                    }
109                    NautilusWsMessage::FillReports(reports) => {
110                        tracing::info!(count = reports.len(), "fill reports");
111                        for report in reports {
112                            tracing::info!(
113                                instrument = %report.instrument_id,
114                                client_order_id = ?report.client_order_id,
115                                venue_order_id = ?report.venue_order_id,
116                                last_qty = %report.last_qty,
117                                last_px = %report.last_px,
118                                "fill report"
119                            );
120                        }
121                    }
122                    NautilusWsMessage::PositionStatusReport(report) => {
123                        tracing::info!(instrument = %report.instrument_id, quantity = %report.quantity, "position status report");
124                    }
125                    NautilusWsMessage::AccountState(state) => {
126                        tracing::info!(account_id = %state.account_id, balances = state.balances.len(), "account state");
127                    }
128                    NautilusWsMessage::OrderRejected(event) => {
129                        tracing::warn!(trader_id = %event.trader_id, client_order_id = %event.client_order_id, reason = %event.reason, "order rejected");
130                    }
131                    NautilusWsMessage::OrderCancelRejected(event) => {
132                        tracing::warn!(trader_id = %event.trader_id, client_order_id = %event.client_order_id, reason = %event.reason, "order cancel rejected");
133                    }
134                    NautilusWsMessage::OrderModifyRejected(event) => {
135                        tracing::warn!(trader_id = %event.trader_id, client_order_id = %event.client_order_id, reason = %event.reason, "order modify rejected");
136                    }
137                    NautilusWsMessage::Error(err) => {
138                        tracing::error!(code = err.code, message = %err.message, "websocket error");
139                    }
140                    NautilusWsMessage::Reconnected => {
141                        tracing::warn!("WebSocket reconnected");
142                    }
143                    NautilusWsMessage::Authenticated => {
144                        tracing::info!("Authenticated successfully");
145                    }
146                }
147            }
148            _ = &mut shutdown => {
149                tracing::info!("Received Ctrl+C, closing connection");
150                client.close().await?;
151                break;
152            }
153            else => break,
154        }
155    }
156
157    Ok(())
158}