nautilus_binance/futures/websocket/
handler.rs1use std::{
19 collections::HashMap,
20 fmt::Debug,
21 sync::{
22 Arc,
23 atomic::{AtomicBool, AtomicU64, Ordering},
24 },
25};
26
27use nautilus_model::{
28 data::Data,
29 instruments::{Instrument, InstrumentAny},
30};
31use nautilus_network::{
32 RECONNECTED,
33 websocket::{SubscriptionState, WebSocketClient},
34};
35use ustr::Ustr;
36
37use super::{
38 messages::{
39 BinanceFuturesAggTradeMsg, BinanceFuturesBookTickerMsg, BinanceFuturesDepthUpdateMsg,
40 BinanceFuturesHandlerCommand, BinanceFuturesTradeMsg, BinanceFuturesWsErrorMsg,
41 BinanceFuturesWsErrorResponse, BinanceFuturesWsSubscribeRequest,
42 BinanceFuturesWsSubscribeResponse, NautilusFuturesWsMessage,
43 },
44 parse::{
45 extract_event_type, extract_symbol, parse_agg_trade, parse_book_ticker, parse_depth_update,
46 parse_trade,
47 },
48};
49use crate::common::enums::{BinanceWsEventType, BinanceWsMethod};
50
51pub struct BinanceFuturesWsFeedHandler {
53 #[allow(dead_code)] signal: Arc<AtomicBool>,
55 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesHandlerCommand>,
56 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
57 #[allow(dead_code)] out_tx: tokio::sync::mpsc::UnboundedSender<NautilusFuturesWsMessage>,
59 client: Option<WebSocketClient>,
60 instruments: HashMap<Ustr, InstrumentAny>,
61 subscriptions_state: SubscriptionState,
62 request_id_counter: Arc<AtomicU64>,
63 pending_requests: HashMap<u64, Vec<String>>,
64}
65
66impl Debug for BinanceFuturesWsFeedHandler {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct("BinanceFuturesWsFeedHandler")
69 .field("instruments_count", &self.instruments.len())
70 .field("pending_requests", &self.pending_requests.len())
71 .finish_non_exhaustive()
72 }
73}
74
75impl BinanceFuturesWsFeedHandler {
76 pub fn new(
78 signal: Arc<AtomicBool>,
79 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesHandlerCommand>,
80 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
81 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusFuturesWsMessage>,
82 subscriptions_state: SubscriptionState,
83 request_id_counter: Arc<AtomicU64>,
84 ) -> Self {
85 Self {
86 signal,
87 cmd_rx,
88 raw_rx,
89 out_tx,
90 client: None,
91 instruments: HashMap::new(),
92 subscriptions_state,
93 request_id_counter,
94 pending_requests: HashMap::new(),
95 }
96 }
97
98 pub async fn next(&mut self) -> Option<NautilusFuturesWsMessage> {
102 loop {
103 if self.signal.load(Ordering::Relaxed) {
104 return None;
105 }
106
107 tokio::select! {
108 Some(cmd) = self.cmd_rx.recv() => {
109 self.handle_command(cmd).await;
110 }
111 Some(raw) = self.raw_rx.recv() => {
112 if let Some(msg) = self.handle_raw_message(raw).await {
113 return Some(msg);
114 }
115 }
116 else => {
117 return None;
118 }
119 }
120 }
121 }
122
123 async fn handle_command(&mut self, cmd: BinanceFuturesHandlerCommand) {
124 match cmd {
125 BinanceFuturesHandlerCommand::SetClient(client) => {
126 self.client = Some(client);
127 }
128 BinanceFuturesHandlerCommand::Disconnect => {
129 if let Some(client) = &self.client {
130 let _ = client.disconnect().await;
131 }
132 self.client = None;
133 }
134 BinanceFuturesHandlerCommand::InitializeInstruments(instruments) => {
135 for inst in instruments {
136 self.instruments.insert(inst.raw_symbol().inner(), inst);
137 }
138 }
139 BinanceFuturesHandlerCommand::UpdateInstrument(instrument) => {
140 self.instruments
141 .insert(instrument.raw_symbol().inner(), instrument);
142 }
143 BinanceFuturesHandlerCommand::Subscribe { streams } => {
144 self.send_subscribe(streams).await;
145 }
146 BinanceFuturesHandlerCommand::Unsubscribe { streams } => {
147 self.send_unsubscribe(streams).await;
148 }
149 }
150 }
151
152 async fn send_subscribe(&mut self, streams: Vec<String>) {
153 let Some(client) = &self.client else {
154 tracing::warn!("Cannot subscribe: no client connected");
155 return;
156 };
157
158 let request_id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
159
160 self.pending_requests.insert(request_id, streams.clone());
162
163 for stream in &streams {
165 self.subscriptions_state.mark_subscribe(stream);
166 }
167
168 let request = BinanceFuturesWsSubscribeRequest {
169 method: BinanceWsMethod::Subscribe,
170 params: streams,
171 id: request_id,
172 };
173
174 let json = match serde_json::to_string(&request) {
175 Ok(j) => j,
176 Err(e) => {
177 tracing::error!(error = %e, "Failed to serialize subscribe request");
178 return;
179 }
180 };
181
182 if let Err(e) = client.send_text(json, None).await {
183 tracing::error!(error = %e, "Failed to send subscribe request");
184 }
185 }
186
187 async fn send_unsubscribe(&mut self, streams: Vec<String>) {
188 let Some(client) = &self.client else {
189 tracing::warn!("Cannot unsubscribe: no client connected");
190 return;
191 };
192
193 let request_id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
194
195 let request = BinanceFuturesWsSubscribeRequest {
196 method: BinanceWsMethod::Unsubscribe,
197 params: streams.clone(),
198 id: request_id,
199 };
200
201 let json = match serde_json::to_string(&request) {
202 Ok(j) => j,
203 Err(e) => {
204 tracing::error!(error = %e, "Failed to serialize unsubscribe request");
205 return;
206 }
207 };
208
209 if let Err(e) = client.send_text(json, None).await {
210 tracing::error!(error = %e, "Failed to send unsubscribe request");
211 }
212
213 for stream in &streams {
215 self.subscriptions_state.confirm_unsubscribe(stream);
216 }
217 }
218
219 async fn handle_raw_message(&mut self, raw: Vec<u8>) -> Option<NautilusFuturesWsMessage> {
220 if let Ok(text) = std::str::from_utf8(&raw)
222 && text == RECONNECTED
223 {
224 tracing::info!("WebSocket reconnected signal received");
225 return Some(NautilusFuturesWsMessage::Reconnected);
226 }
227
228 let json: serde_json::Value = match serde_json::from_slice(&raw) {
230 Ok(j) => j,
231 Err(e) => {
232 tracing::warn!(error = %e, "Failed to parse JSON message");
233 return None;
234 }
235 };
236
237 if json.get("result").is_some() || json.get("id").is_some() {
239 self.handle_subscription_response(&json);
240 return None;
241 }
242
243 if let Some(code) = json.get("code")
245 && let Some(code) = code.as_i64()
246 {
247 let msg = json
248 .get("msg")
249 .and_then(|m| m.as_str())
250 .unwrap_or("Unknown error")
251 .to_string();
252 return Some(NautilusFuturesWsMessage::Error(BinanceFuturesWsErrorMsg {
253 code,
254 msg,
255 }));
256 }
257
258 self.handle_stream_data(&json)
260 }
261
262 fn handle_subscription_response(&mut self, json: &serde_json::Value) {
263 if let Ok(response) =
264 serde_json::from_value::<BinanceFuturesWsSubscribeResponse>(json.clone())
265 {
266 if let Some(streams) = self.pending_requests.remove(&response.id) {
267 if response.result.is_none() {
268 for stream in &streams {
270 self.subscriptions_state.confirm_subscribe(stream);
271 }
272 tracing::debug!(streams = ?streams, "Subscription confirmed");
273 } else {
274 for stream in &streams {
276 self.subscriptions_state.mark_failure(stream);
277 }
278 tracing::warn!(streams = ?streams, result = ?response.result, "Subscription failed");
279 }
280 }
281 } else if let Ok(error) =
282 serde_json::from_value::<BinanceFuturesWsErrorResponse>(json.clone())
283 {
284 if let Some(id) = error.id
285 && let Some(streams) = self.pending_requests.remove(&id)
286 {
287 for stream in &streams {
288 self.subscriptions_state.mark_failure(stream);
289 }
290 }
291 tracing::warn!(code = error.code, msg = %error.msg, "WebSocket error response");
292 }
293 }
294
295 fn handle_stream_data(&self, json: &serde_json::Value) -> Option<NautilusFuturesWsMessage> {
296 let event_type = extract_event_type(json)?;
297 let symbol = extract_symbol(json)?;
298
299 let Some(instrument) = self.instruments.get(&symbol) else {
301 tracing::warn!(
302 symbol = %symbol,
303 event_type = ?event_type,
304 "No instrument in cache, dropping message"
305 );
306 return None;
307 };
308
309 match event_type {
310 BinanceWsEventType::AggTrade => {
311 if let Ok(msg) = serde_json::from_value::<BinanceFuturesAggTradeMsg>(json.clone()) {
312 match parse_agg_trade(&msg, instrument) {
313 Ok(trade) => {
314 return Some(NautilusFuturesWsMessage::Data(vec![Data::Trade(trade)]));
315 }
316 Err(e) => {
317 tracing::warn!(error = %e, "Failed to parse aggregate trade");
318 }
319 }
320 }
321 }
322 BinanceWsEventType::Trade => {
323 if let Ok(msg) = serde_json::from_value::<BinanceFuturesTradeMsg>(json.clone()) {
324 match parse_trade(&msg, instrument) {
325 Ok(trade) => {
326 return Some(NautilusFuturesWsMessage::Data(vec![Data::Trade(trade)]));
327 }
328 Err(e) => {
329 tracing::warn!(error = %e, "Failed to parse trade");
330 }
331 }
332 }
333 }
334 BinanceWsEventType::BookTicker => {
335 if let Ok(msg) = serde_json::from_value::<BinanceFuturesBookTickerMsg>(json.clone())
336 {
337 match parse_book_ticker(&msg, instrument) {
338 Ok(quote) => {
339 return Some(NautilusFuturesWsMessage::Data(vec![Data::Quote(quote)]));
340 }
341 Err(e) => {
342 tracing::warn!(error = %e, "Failed to parse book ticker");
343 }
344 }
345 }
346 }
347 BinanceWsEventType::DepthUpdate => {
348 if let Ok(msg) =
349 serde_json::from_value::<BinanceFuturesDepthUpdateMsg>(json.clone())
350 {
351 match parse_depth_update(&msg, instrument) {
352 Ok(deltas) => {
353 return Some(NautilusFuturesWsMessage::Deltas(deltas));
354 }
355 Err(e) => {
356 tracing::warn!(error = %e, "Failed to parse depth update");
357 }
358 }
359 }
360 }
361 BinanceWsEventType::MarkPriceUpdate
362 | BinanceWsEventType::Kline
363 | BinanceWsEventType::ForceOrder
364 | BinanceWsEventType::Ticker24Hr
365 | BinanceWsEventType::MiniTicker24Hr => {
366 return Some(NautilusFuturesWsMessage::RawJson(json.clone()));
368 }
369 BinanceWsEventType::Unknown => {
370 tracing::debug!(event_type = ?json.get("e"), "Unknown event type");
371 }
372 }
373
374 None
375 }
376}