1use std::error::Error;
20
21use futures_util::StreamExt;
22use nautilus_bybit::{
23 common::enums::{BybitEnvironment, BybitProductType},
24 websocket::{client::BybitWebSocketClient, messages::NautilusWsMessage},
25};
26use tokio::{pin, signal};
27use tracing::level_filters::LevelFilter;
28
29#[tokio::main]
30async fn main() -> Result<(), Box<dyn Error>> {
31 tracing_subscriber::fmt()
32 .with_max_level(LevelFilter::INFO)
33 .with_target(false)
34 .compact()
35 .init();
36
37 let mut client = BybitWebSocketClient::new_public_with(
38 BybitProductType::Linear,
39 BybitEnvironment::Mainnet,
40 None,
41 None,
42 );
43 client.connect().await?;
44
45 client
46 .subscribe(vec![
47 "orderbook.1.BTCUSDT".to_string(),
48 "publicTrade.BTCUSDT".to_string(),
49 "tickers.BTCUSDT".to_string(),
50 ])
51 .await?;
52
53 let stream = client.stream();
54 let shutdown = signal::ctrl_c();
55 pin!(stream);
56 pin!(shutdown);
57
58 tracing::info!("Streaming Bybit market data; press Ctrl+C to exit");
59
60 loop {
61 tokio::select! {
62 Some(event) = stream.next() => {
63 match event {
64 NautilusWsMessage::Data(data_vec) => {
65 tracing::info!(count = data_vec.len(), "data update");
66 for data in data_vec {
67 match data {
68 nautilus_model::data::Data::Trade(tick) => {
69 tracing::info!(instrument = %tick.instrument_id, price = %tick.price, size = %tick.size, "trade");
70 }
71 nautilus_model::data::Data::Quote(quote) => {
72 tracing::info!(instrument = %quote.instrument_id, bid = %quote.bid_price, ask = %quote.ask_price, "quote");
73 }
74 nautilus_model::data::Data::Bar(bar) => {
75 tracing::info!(bar_type = %bar.bar_type, close = %bar.close, "bar");
76 }
77 _ => {
78 tracing::debug!("other data type");
79 }
80 }
81 }
82 }
83 NautilusWsMessage::Deltas(deltas) => {
84 tracing::info!(instrument = %deltas.instrument_id, sequence = deltas.sequence, "orderbook deltas");
85 }
86 NautilusWsMessage::FundingRates(rates) => {
87 tracing::info!(count = rates.len(), "funding rate updates");
88 for rate in rates {
89 tracing::info!(
90 instrument = %rate.instrument_id,
91 rate = %rate.rate,
92 next_funding = ?rate.next_funding_ns,
93 "funding rate"
94 );
95 }
96 }
97 NautilusWsMessage::OrderStatusReports(reports) => {
98 tracing::info!(count = reports.len(), "order status reports");
99 for report in reports {
100 tracing::info!(
101 instrument = %report.instrument_id,
102 client_order_id = ?report.client_order_id,
103 venue_order_id = ?report.venue_order_id,
104 status = ?report.order_status,
105 "order status report"
106 );
107 }
108 }
109 NautilusWsMessage::FillReports(reports) => {
110 tracing::info!(count = reports.len(), "fill reports");
111 for report in reports {
112 tracing::info!(
113 instrument = %report.instrument_id,
114 client_order_id = ?report.client_order_id,
115 venue_order_id = ?report.venue_order_id,
116 last_qty = %report.last_qty,
117 last_px = %report.last_px,
118 "fill report"
119 );
120 }
121 }
122 NautilusWsMessage::PositionStatusReport(report) => {
123 tracing::info!(instrument = %report.instrument_id, quantity = %report.quantity, "position status report");
124 }
125 NautilusWsMessage::AccountState(state) => {
126 tracing::info!(account_id = %state.account_id, balances = state.balances.len(), "account state");
127 }
128 NautilusWsMessage::OrderRejected(event) => {
129 tracing::warn!(trader_id = %event.trader_id, client_order_id = %event.client_order_id, reason = %event.reason, "order rejected");
130 }
131 NautilusWsMessage::OrderCancelRejected(event) => {
132 tracing::warn!(trader_id = %event.trader_id, client_order_id = %event.client_order_id, reason = %event.reason, "order cancel rejected");
133 }
134 NautilusWsMessage::OrderModifyRejected(event) => {
135 tracing::warn!(trader_id = %event.trader_id, client_order_id = %event.client_order_id, reason = %event.reason, "order modify rejected");
136 }
137 NautilusWsMessage::Error(err) => {
138 tracing::error!(code = err.code, message = %err.message, "websocket error");
139 }
140 NautilusWsMessage::Reconnected => {
141 tracing::warn!("WebSocket reconnected");
142 }
143 NautilusWsMessage::Authenticated => {
144 tracing::info!("Authenticated successfully");
145 }
146 }
147 }
148 _ = &mut shutdown => {
149 tracing::info!("Received Ctrl+C, closing connection");
150 client.close().await?;
151 break;
152 }
153 else => break,
154 }
155 }
156
157 Ok(())
158}