bybit_ws_data/
ws_data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Connects to the Bybit public WebSocket feed and streams specified market data topics.
17//! Useful when manually validating the Rust WebSocket client implementation.
18
19use std::error::Error;
20
21use futures_util::StreamExt;
22use nautilus_bybit::{
23    common::enums::{BybitEnvironment, BybitProductType},
24    websocket::{client::BybitWebSocketClient, messages::BybitWebSocketMessage},
25};
26use tokio::{pin, signal};
27use tracing::level_filters::LevelFilter;
28
29#[tokio::main]
30async fn main() -> Result<(), Box<dyn Error>> {
31    tracing_subscriber::fmt()
32        .with_max_level(LevelFilter::INFO)
33        .with_target(false)
34        .compact()
35        .init();
36
37    let mut client = BybitWebSocketClient::new_public_with(
38        BybitProductType::Linear,
39        BybitEnvironment::Mainnet,
40        None,
41        None,
42    );
43    client.connect().await?;
44
45    client
46        .subscribe(vec![
47            "orderbook.1.BTCUSDT".to_string(),
48            "publicTrade.BTCUSDT".to_string(),
49            "tickers.BTCUSDT".to_string(),
50        ])
51        .await?;
52
53    let stream = client.stream();
54    let shutdown = signal::ctrl_c();
55    pin!(stream);
56    pin!(shutdown);
57
58    tracing::info!("Streaming Bybit market data; press Ctrl+C to exit");
59
60    loop {
61        tokio::select! {
62            Some(event) = stream.next() => {
63                match event {
64                    BybitWebSocketMessage::Orderbook(msg) => {
65                        tracing::info!(topic = %msg.topic, "orderbook update");
66                    }
67                    BybitWebSocketMessage::Trade(msg) => {
68                        tracing::info!(topic = %msg.topic, trades = msg.data.len(), "trade update");
69                    }
70                    BybitWebSocketMessage::TickerLinear(msg) => {
71                        tracing::info!(topic = %msg.topic, bid = ?msg.data.bid1_price, ask = ?msg.data.ask1_price, "linear ticker update");
72                    }
73                    BybitWebSocketMessage::TickerOption(msg) => {
74                        tracing::info!(topic = %msg.topic, bid = %msg.data.bid_price, ask = %msg.data.ask_price, "option ticker update");
75                    }
76                    BybitWebSocketMessage::Response(msg) => {
77                        tracing::debug!(?msg, "response frame");
78                    }
79                    BybitWebSocketMessage::Subscription(msg) => {
80                        tracing::info!(op = %msg.op, success = msg.success, "subscription ack");
81                    }
82                    BybitWebSocketMessage::Auth(msg) => {
83                        tracing::info!(op = %msg.op, "auth ack");
84                    }
85                    BybitWebSocketMessage::Error(err) => {
86                        tracing::error!(code = err.code, message = %err.message, "bybit websocket error");
87                    }
88                    BybitWebSocketMessage::Raw(value) => {
89                        tracing::debug!(payload = %value, "raw message");
90                    }
91                    BybitWebSocketMessage::Reconnected => {
92                        tracing::warn!("WebSocket reconnected");
93                    }
94                    BybitWebSocketMessage::Pong => {
95                        tracing::trace!("Received pong");
96                    }
97                    BybitWebSocketMessage::Kline(msg) => {
98                        tracing::info!(topic = %msg.topic, bars = msg.data.len(), "kline update");
99                    }
100                    BybitWebSocketMessage::AccountOrder(msg) => {
101                        tracing::info!(topic = %msg.topic, orders = msg.data.len(), "account order update");
102                    }
103                    BybitWebSocketMessage::AccountExecution(msg) => {
104                        tracing::info!(topic = %msg.topic, executions = msg.data.len(), "account execution update");
105                    }
106                    BybitWebSocketMessage::AccountWallet(msg) => {
107                        tracing::info!(topic = %msg.topic, wallets = msg.data.len(), "account wallet update");
108                    }
109                    BybitWebSocketMessage::AccountPosition(msg) => {
110                        tracing::info!(topic = %msg.topic, positions = msg.data.len(), "account position update");
111                    }
112                }
113            }
114            _ = &mut shutdown => {
115                tracing::info!("Received Ctrl+C, closing connection");
116                client.close().await?;
117                break;
118            }
119            else => break,
120        }
121    }
122
123    Ok(())
124}