1use std::error::Error;
20
21use futures_util::StreamExt;
22use nautilus_bybit::{
23 common::enums::{BybitEnvironment, BybitProductType},
24 websocket::{client::BybitWebSocketClient, messages::NautilusWsMessage},
25};
26use nautilus_model::data::Data;
27use tokio::{pin, signal};
28use tracing::level_filters::LevelFilter;
29
30#[tokio::main]
31async fn main() -> Result<(), Box<dyn Error>> {
32 tracing_subscriber::fmt()
33 .with_max_level(LevelFilter::INFO)
34 .with_target(false)
35 .compact()
36 .init();
37
38 let mut client = BybitWebSocketClient::new_public_with(
39 BybitProductType::Linear,
40 BybitEnvironment::Mainnet,
41 None,
42 None,
43 );
44 client.connect().await?;
45
46 client
47 .subscribe(vec![
48 "orderbook.1.BTCUSDT".to_string(),
49 "publicTrade.BTCUSDT".to_string(),
50 "tickers.BTCUSDT".to_string(),
51 ])
52 .await?;
53
54 let stream = client.stream();
55 let shutdown = signal::ctrl_c();
56 pin!(stream);
57 pin!(shutdown);
58
59 tracing::info!("Streaming Bybit market data; press Ctrl+C to exit");
60
61 loop {
62 tokio::select! {
63 Some(event) = stream.next() => {
64 match event {
65 NautilusWsMessage::Data(data_vec) => {
66 tracing::info!(count = data_vec.len(), "data update");
67 for data in data_vec {
68 match data {
69 Data::Trade(tick) => {
70 tracing::info!(instrument = %tick.instrument_id, price = %tick.price, size = %tick.size, "trade");
71 }
72 Data::Quote(quote) => {
73 tracing::info!(instrument = %quote.instrument_id, bid = %quote.bid_price, ask = %quote.ask_price, "quote");
74 }
75 Data::Bar(bar) => {
76 tracing::info!(bar_type = %bar.bar_type, close = %bar.close, "bar");
77 }
78 _ => {
79 tracing::debug!("other data type");
80 }
81 }
82 }
83 }
84 NautilusWsMessage::Deltas(deltas) => {
85 tracing::info!(instrument = %deltas.instrument_id, sequence = deltas.sequence, "orderbook deltas");
86 }
87 NautilusWsMessage::FundingRates(rates) => {
88 tracing::info!(count = rates.len(), "funding rate updates");
89 for rate in rates {
90 tracing::info!(
91 instrument = %rate.instrument_id,
92 rate = %rate.rate,
93 next_funding = ?rate.next_funding_ns,
94 "funding rate"
95 );
96 }
97 }
98 NautilusWsMessage::OrderStatusReports(reports) => {
99 tracing::info!(count = reports.len(), "order status reports");
100 for report in reports {
101 tracing::info!(
102 instrument = %report.instrument_id,
103 client_order_id = ?report.client_order_id,
104 venue_order_id = ?report.venue_order_id,
105 status = ?report.order_status,
106 "order status report"
107 );
108 }
109 }
110 NautilusWsMessage::FillReports(reports) => {
111 tracing::info!(count = reports.len(), "fill reports");
112 for report in reports {
113 tracing::info!(
114 instrument = %report.instrument_id,
115 client_order_id = ?report.client_order_id,
116 venue_order_id = ?report.venue_order_id,
117 last_qty = %report.last_qty,
118 last_px = %report.last_px,
119 "fill report"
120 );
121 }
122 }
123 NautilusWsMessage::PositionStatusReport(report) => {
124 tracing::info!(instrument = %report.instrument_id, quantity = %report.quantity, "position status report");
125 }
126 NautilusWsMessage::AccountState(state) => {
127 tracing::info!(account_id = %state.account_id, balances = state.balances.len(), "account state");
128 }
129 NautilusWsMessage::OrderRejected(event) => {
130 tracing::warn!(trader_id = %event.trader_id, client_order_id = %event.client_order_id, reason = %event.reason, "order rejected");
131 }
132 NautilusWsMessage::OrderCancelRejected(event) => {
133 tracing::warn!(trader_id = %event.trader_id, client_order_id = %event.client_order_id, reason = %event.reason, "order cancel rejected");
134 }
135 NautilusWsMessage::OrderModifyRejected(event) => {
136 tracing::warn!(trader_id = %event.trader_id, client_order_id = %event.client_order_id, reason = %event.reason, "order modify rejected");
137 }
138 NautilusWsMessage::Error(err) => {
139 tracing::error!(code = err.code, message = %err.message, "websocket error");
140 }
141 NautilusWsMessage::Reconnected => {
142 tracing::warn!("WebSocket reconnected");
143 }
144 NautilusWsMessage::Authenticated => {
145 tracing::info!("Authenticated successfully");
146 }
147 }
148 }
149 _ = &mut shutdown => {
150 tracing::info!("Received Ctrl+C, closing connection");
151 client.close().await?;
152 break;
153 }
154 else => break,
155 }
156 }
157
158 Ok(())
159}