nautilus_binance/futures/websocket/
handler.rs1use std::{
19 collections::HashMap,
20 fmt::Debug,
21 sync::{
22 Arc,
23 atomic::{AtomicBool, AtomicU64, Ordering},
24 },
25};
26
27use nautilus_model::{
28 data::Data,
29 instruments::{Instrument, InstrumentAny},
30};
31use nautilus_network::{
32 RECONNECTED,
33 websocket::{SubscriptionState, WebSocketClient},
34};
35use ustr::Ustr;
36
37use super::{
38 messages::{
39 BinanceFuturesAggTradeMsg, BinanceFuturesBookTickerMsg, BinanceFuturesDepthUpdateMsg,
40 BinanceFuturesHandlerCommand, BinanceFuturesTradeMsg, BinanceFuturesWsErrorMsg,
41 BinanceFuturesWsErrorResponse, BinanceFuturesWsSubscribeRequest,
42 BinanceFuturesWsSubscribeResponse, NautilusFuturesWsMessage,
43 },
44 parse::{
45 extract_event_type, extract_symbol, parse_agg_trade, parse_book_ticker, parse_depth_update,
46 parse_trade,
47 },
48};
49use crate::common::enums::{BinanceWsEventType, BinanceWsMethod};
50
51pub struct BinanceFuturesWsFeedHandler {
53 #[allow(dead_code)] signal: Arc<AtomicBool>,
55 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesHandlerCommand>,
56 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
57 #[allow(dead_code)] out_tx: tokio::sync::mpsc::UnboundedSender<NautilusFuturesWsMessage>,
59 client: Option<WebSocketClient>,
60 instruments: HashMap<Ustr, InstrumentAny>,
61 subscriptions_state: SubscriptionState,
62 request_id_counter: Arc<AtomicU64>,
63 pending_requests: HashMap<u64, Vec<String>>,
64}
65
66impl Debug for BinanceFuturesWsFeedHandler {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct(stringify!(BinanceFuturesWsFeedHandler))
69 .field("instruments_count", &self.instruments.len())
70 .field("pending_requests", &self.pending_requests.len())
71 .finish_non_exhaustive()
72 }
73}
74
75impl BinanceFuturesWsFeedHandler {
76 pub fn new(
78 signal: Arc<AtomicBool>,
79 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesHandlerCommand>,
80 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
81 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusFuturesWsMessage>,
82 subscriptions_state: SubscriptionState,
83 request_id_counter: Arc<AtomicU64>,
84 ) -> Self {
85 Self {
86 signal,
87 cmd_rx,
88 raw_rx,
89 out_tx,
90 client: None,
91 instruments: HashMap::new(),
92 subscriptions_state,
93 request_id_counter,
94 pending_requests: HashMap::new(),
95 }
96 }
97
98 pub async fn next(&mut self) -> Option<NautilusFuturesWsMessage> {
102 loop {
103 if self.signal.load(Ordering::Relaxed) {
104 return None;
105 }
106
107 tokio::select! {
108 Some(cmd) = self.cmd_rx.recv() => {
109 self.handle_command(cmd).await;
110 }
111 Some(raw) = self.raw_rx.recv() => {
112 if let Some(msg) = self.handle_raw_message(raw).await {
113 return Some(msg);
114 }
115 }
116 else => {
117 return None;
118 }
119 }
120 }
121 }
122
123 async fn handle_command(&mut self, cmd: BinanceFuturesHandlerCommand) {
124 match cmd {
125 BinanceFuturesHandlerCommand::SetClient(client) => {
126 self.client = Some(client);
127 }
128 BinanceFuturesHandlerCommand::Disconnect => {
129 if let Some(client) = &self.client {
130 let () = client.disconnect().await;
131 }
132 self.client = None;
133 }
134 BinanceFuturesHandlerCommand::InitializeInstruments(instruments) => {
135 for inst in instruments {
136 self.instruments.insert(inst.raw_symbol().inner(), inst);
137 }
138 }
139 BinanceFuturesHandlerCommand::UpdateInstrument(instrument) => {
140 self.instruments
141 .insert(instrument.raw_symbol().inner(), instrument);
142 }
143 BinanceFuturesHandlerCommand::Subscribe { streams } => {
144 self.send_subscribe(streams).await;
145 }
146 BinanceFuturesHandlerCommand::Unsubscribe { streams } => {
147 self.send_unsubscribe(streams).await;
148 }
149 }
150 }
151
152 async fn send_subscribe(&mut self, streams: Vec<String>) {
153 let Some(client) = &self.client else {
154 log::warn!("Cannot subscribe: no client connected");
155 return;
156 };
157
158 let request_id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
159
160 self.pending_requests.insert(request_id, streams.clone());
162
163 for stream in &streams {
165 self.subscriptions_state.mark_subscribe(stream);
166 }
167
168 let request = BinanceFuturesWsSubscribeRequest {
169 method: BinanceWsMethod::Subscribe,
170 params: streams,
171 id: request_id,
172 };
173
174 let json = match serde_json::to_string(&request) {
175 Ok(j) => j,
176 Err(e) => {
177 log::error!("Failed to serialize subscribe request: {e}");
178 return;
179 }
180 };
181
182 if let Err(e) = client.send_text(json, None).await {
183 log::error!("Failed to send subscribe request: {e}");
184 }
185 }
186
187 async fn send_unsubscribe(&mut self, streams: Vec<String>) {
188 let Some(client) = &self.client else {
189 log::warn!("Cannot unsubscribe: no client connected");
190 return;
191 };
192
193 let request_id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
194
195 let request = BinanceFuturesWsSubscribeRequest {
196 method: BinanceWsMethod::Unsubscribe,
197 params: streams.clone(),
198 id: request_id,
199 };
200
201 let json = match serde_json::to_string(&request) {
202 Ok(j) => j,
203 Err(e) => {
204 log::error!("Failed to serialize unsubscribe request: {e}");
205 return;
206 }
207 };
208
209 if let Err(e) = client.send_text(json, None).await {
210 log::error!("Failed to send unsubscribe request: {e}");
211 }
212
213 for stream in &streams {
215 self.subscriptions_state.confirm_unsubscribe(stream);
216 }
217 }
218
219 async fn handle_raw_message(&mut self, raw: Vec<u8>) -> Option<NautilusFuturesWsMessage> {
220 if let Ok(text) = std::str::from_utf8(&raw)
222 && text == RECONNECTED
223 {
224 log::info!("WebSocket reconnected signal received");
225 return Some(NautilusFuturesWsMessage::Reconnected);
226 }
227
228 let json: serde_json::Value = match serde_json::from_slice(&raw) {
230 Ok(j) => j,
231 Err(e) => {
232 log::warn!("Failed to parse JSON message: {e}");
233 return None;
234 }
235 };
236
237 if json.get("result").is_some() || json.get("id").is_some() {
239 self.handle_subscription_response(&json);
240 return None;
241 }
242
243 if let Some(code) = json.get("code")
245 && let Some(code) = code.as_i64()
246 {
247 let msg = json
248 .get("msg")
249 .and_then(|m| m.as_str())
250 .unwrap_or("Unknown error")
251 .to_string();
252 return Some(NautilusFuturesWsMessage::Error(BinanceFuturesWsErrorMsg {
253 code,
254 msg,
255 }));
256 }
257
258 self.handle_stream_data(&json)
260 }
261
262 fn handle_subscription_response(&mut self, json: &serde_json::Value) {
263 if let Ok(response) =
264 serde_json::from_value::<BinanceFuturesWsSubscribeResponse>(json.clone())
265 {
266 if let Some(streams) = self.pending_requests.remove(&response.id) {
267 if response.result.is_none() {
268 for stream in &streams {
270 self.subscriptions_state.confirm_subscribe(stream);
271 }
272 log::debug!("Subscription confirmed: streams={streams:?}");
273 } else {
274 for stream in &streams {
276 self.subscriptions_state.mark_failure(stream);
277 }
278 log::warn!(
279 "Subscription failed: streams={streams:?}, result={:?}",
280 response.result
281 );
282 }
283 }
284 } else if let Ok(error) =
285 serde_json::from_value::<BinanceFuturesWsErrorResponse>(json.clone())
286 {
287 if let Some(id) = error.id
288 && let Some(streams) = self.pending_requests.remove(&id)
289 {
290 for stream in &streams {
291 self.subscriptions_state.mark_failure(stream);
292 }
293 }
294 log::warn!(
295 "WebSocket error response: code={}, msg={}",
296 error.code,
297 error.msg
298 );
299 }
300 }
301
302 fn handle_stream_data(&self, json: &serde_json::Value) -> Option<NautilusFuturesWsMessage> {
303 let event_type = extract_event_type(json)?;
304 let symbol = extract_symbol(json)?;
305
306 let Some(instrument) = self.instruments.get(&symbol) else {
308 log::warn!(
309 "No instrument in cache, dropping message: symbol={symbol}, event_type={event_type:?}"
310 );
311 return None;
312 };
313
314 match event_type {
315 BinanceWsEventType::AggTrade => {
316 if let Ok(msg) = serde_json::from_value::<BinanceFuturesAggTradeMsg>(json.clone()) {
317 match parse_agg_trade(&msg, instrument) {
318 Ok(trade) => {
319 return Some(NautilusFuturesWsMessage::Data(vec![Data::Trade(trade)]));
320 }
321 Err(e) => {
322 log::warn!("Failed to parse aggregate trade: {e}");
323 }
324 }
325 }
326 }
327 BinanceWsEventType::Trade => {
328 if let Ok(msg) = serde_json::from_value::<BinanceFuturesTradeMsg>(json.clone()) {
329 match parse_trade(&msg, instrument) {
330 Ok(trade) => {
331 return Some(NautilusFuturesWsMessage::Data(vec![Data::Trade(trade)]));
332 }
333 Err(e) => {
334 log::warn!("Failed to parse trade: {e}");
335 }
336 }
337 }
338 }
339 BinanceWsEventType::BookTicker => {
340 if let Ok(msg) = serde_json::from_value::<BinanceFuturesBookTickerMsg>(json.clone())
341 {
342 match parse_book_ticker(&msg, instrument) {
343 Ok(quote) => {
344 return Some(NautilusFuturesWsMessage::Data(vec![Data::Quote(quote)]));
345 }
346 Err(e) => {
347 log::warn!("Failed to parse book ticker: {e}");
348 }
349 }
350 }
351 }
352 BinanceWsEventType::DepthUpdate => {
353 if let Ok(msg) =
354 serde_json::from_value::<BinanceFuturesDepthUpdateMsg>(json.clone())
355 {
356 match parse_depth_update(&msg, instrument) {
357 Ok(deltas) => {
358 return Some(NautilusFuturesWsMessage::Deltas(deltas));
359 }
360 Err(e) => {
361 log::warn!("Failed to parse depth update: {e}");
362 }
363 }
364 }
365 }
366 BinanceWsEventType::MarkPriceUpdate
367 | BinanceWsEventType::Kline
368 | BinanceWsEventType::ForceOrder
369 | BinanceWsEventType::Ticker24Hr
370 | BinanceWsEventType::MiniTicker24Hr => {
371 return Some(NautilusFuturesWsMessage::RawJson(json.clone()));
373 }
374 BinanceWsEventType::Unknown => {
375 log::debug!("Unknown event type: {:?}", json.get("e"));
376 }
377 }
378
379 None
380 }
381}