1use std::error::Error;
20
21use futures_util::StreamExt;
22use nautilus_bybit::{
23 common::enums::{BybitEnvironment, BybitProductType},
24 websocket::{client::BybitWebSocketClient, messages::BybitWebSocketMessage},
25};
26use tokio::{pin, signal};
27use tracing::level_filters::LevelFilter;
28
29#[tokio::main]
30async fn main() -> Result<(), Box<dyn Error>> {
31 tracing_subscriber::fmt()
32 .with_max_level(LevelFilter::INFO)
33 .with_target(false)
34 .compact()
35 .init();
36
37 let mut client = BybitWebSocketClient::new_public_with(
38 BybitProductType::Linear,
39 BybitEnvironment::Mainnet,
40 None,
41 None,
42 );
43 client.connect().await?;
44
45 client
46 .subscribe(vec![
47 "orderbook.1.BTCUSDT".to_string(),
48 "publicTrade.BTCUSDT".to_string(),
49 "tickers.BTCUSDT".to_string(),
50 ])
51 .await?;
52
53 let stream = client.stream();
54 let shutdown = signal::ctrl_c();
55 pin!(stream);
56 pin!(shutdown);
57
58 tracing::info!("Streaming Bybit market data; press Ctrl+C to exit");
59
60 loop {
61 tokio::select! {
62 Some(event) = stream.next() => {
63 match event {
64 BybitWebSocketMessage::Orderbook(msg) => {
65 tracing::info!(topic = %msg.topic, "orderbook update");
66 }
67 BybitWebSocketMessage::Trade(msg) => {
68 tracing::info!(topic = %msg.topic, trades = msg.data.len(), "trade update");
69 }
70 BybitWebSocketMessage::TickerLinear(msg) => {
71 tracing::info!(topic = %msg.topic, bid = ?msg.data.bid1_price, ask = ?msg.data.ask1_price, "linear ticker update");
72 }
73 BybitWebSocketMessage::TickerOption(msg) => {
74 tracing::info!(topic = %msg.topic, bid = %msg.data.bid_price, ask = %msg.data.ask_price, "option ticker update");
75 }
76 BybitWebSocketMessage::Response(msg) => {
77 tracing::debug!(?msg, "response frame");
78 }
79 BybitWebSocketMessage::Subscription(msg) => {
80 tracing::info!(op = %msg.op, success = msg.success, "subscription ack");
81 }
82 BybitWebSocketMessage::Auth(msg) => {
83 tracing::info!(op = %msg.op, "auth ack");
84 }
85 BybitWebSocketMessage::Error(err) => {
86 tracing::error!(code = err.code, message = %err.message, "bybit websocket error");
87 }
88 BybitWebSocketMessage::Raw(value) => {
89 tracing::debug!(payload = %value, "raw message");
90 }
91 BybitWebSocketMessage::Reconnected => {
92 tracing::warn!("WebSocket reconnected");
93 }
94 BybitWebSocketMessage::Pong => {
95 tracing::trace!("Received pong");
96 }
97 BybitWebSocketMessage::Kline(msg) => {
98 tracing::info!(topic = %msg.topic, bars = msg.data.len(), "kline update");
99 }
100 BybitWebSocketMessage::AccountOrder(msg) => {
101 tracing::info!(topic = %msg.topic, orders = msg.data.len(), "account order update");
102 }
103 BybitWebSocketMessage::AccountExecution(msg) => {
104 tracing::info!(topic = %msg.topic, executions = msg.data.len(), "account execution update");
105 }
106 BybitWebSocketMessage::AccountWallet(msg) => {
107 tracing::info!(topic = %msg.topic, wallets = msg.data.len(), "account wallet update");
108 }
109 BybitWebSocketMessage::AccountPosition(msg) => {
110 tracing::info!(topic = %msg.topic, positions = msg.data.len(), "account position update");
111 }
112 }
113 }
114 _ = &mut shutdown => {
115 tracing::info!("Received Ctrl+C, closing connection");
116 client.close().await?;
117 break;
118 }
119 else => break,
120 }
121 }
122
123 Ok(())
124}