1use std::{
19 collections::VecDeque,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use nautilus_core::{nanos::UnixNanos, time::get_atomic_clock_realtime};
28use nautilus_model::{
29 data::Data,
30 instruments::{Instrument, InstrumentAny},
31};
32use nautilus_network::websocket::{SubscriptionState, WebSocketClient};
33use tokio_tungstenite::tungstenite::Message;
34use ustr::Ustr;
35
36use super::parse::{
37 parse_book_l1_quote, parse_book_l2_deltas, parse_book_l3_deltas, parse_candle_bar,
38 parse_trade_tick,
39};
40use crate::{
41 common::enums::{AxCandleWidth, AxMarketDataLevel},
42 websocket::messages::{
43 AxMdBookL1, AxMdBookL2, AxMdBookL3, AxMdCandle, AxMdHeartbeat, AxMdSubscribe,
44 AxMdSubscribeCandles, AxMdTrade, AxMdUnsubscribe, AxMdUnsubscribeCandles, AxWsError,
45 NautilusWsMessage,
46 },
47};
48
49#[derive(Debug)]
51pub enum HandlerCommand {
52 SetClient(WebSocketClient),
54 Disconnect,
56 Subscribe {
58 request_id: i64,
60 symbol: String,
62 level: AxMarketDataLevel,
64 },
65 Unsubscribe {
67 request_id: i64,
69 symbol: String,
71 },
72 SubscribeCandles {
74 request_id: i64,
76 symbol: String,
78 width: AxCandleWidth,
80 },
81 UnsubscribeCandles {
83 request_id: i64,
85 symbol: String,
87 width: AxCandleWidth,
89 },
90 InitializeInstruments(Vec<InstrumentAny>),
92 UpdateInstrument(Box<InstrumentAny>),
94}
95
96pub(crate) struct FeedHandler {
100 signal: Arc<AtomicBool>,
101 client: Option<WebSocketClient>,
102 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
103 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
104 #[allow(dead_code)] out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
106 #[allow(dead_code)] subscriptions: SubscriptionState,
108 instruments: AHashMap<Ustr, InstrumentAny>,
109 message_queue: VecDeque<NautilusWsMessage>,
110}
111
112impl FeedHandler {
113 #[must_use]
115 pub fn new(
116 signal: Arc<AtomicBool>,
117 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
118 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
119 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
120 subscriptions: SubscriptionState,
121 ) -> Self {
122 Self {
123 signal,
124 client: None,
125 cmd_rx,
126 raw_rx,
127 out_tx,
128 subscriptions,
129 instruments: AHashMap::new(),
130 message_queue: VecDeque::new(),
131 }
132 }
133
134 fn generate_ts_init(&self) -> UnixNanos {
136 get_atomic_clock_realtime().get_time_ns()
137 }
138
139 pub async fn next(&mut self) -> Option<NautilusWsMessage> {
143 loop {
144 if let Some(msg) = self.message_queue.pop_front() {
145 return Some(msg);
146 }
147
148 tokio::select! {
149 Some(cmd) = self.cmd_rx.recv() => {
150 self.handle_command(cmd).await;
151 }
152
153 () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
154 if self.signal.load(Ordering::Relaxed) {
155 log::debug!("Stop signal received during idle period");
156 return None;
157 }
158 continue;
159 }
160
161 msg = self.raw_rx.recv() => {
162 let msg = match msg {
163 Some(msg) => msg,
164 None => {
165 log::debug!("WebSocket stream closed");
166 return None;
167 }
168 };
169
170 if let Message::Ping(data) = &msg {
171 log::trace!("Received ping frame with {} bytes", data.len());
172 if let Some(client) = &self.client
173 && let Err(e) = client.send_pong(data.to_vec()).await
174 {
175 log::warn!("Failed to send pong frame: {e}");
176 }
177 continue;
178 }
179
180 if let Some(messages) = self.parse_raw_message(msg) {
181 self.message_queue.extend(messages);
182 }
183
184 if self.signal.load(Ordering::Relaxed) {
185 log::debug!("Stop signal received");
186 return None;
187 }
188 }
189 }
190 }
191 }
192
193 async fn handle_command(&mut self, cmd: HandlerCommand) {
194 match cmd {
195 HandlerCommand::SetClient(client) => {
196 log::debug!("WebSocketClient received by handler");
197 self.client = Some(client);
198 }
199 HandlerCommand::Disconnect => {
200 log::debug!("Disconnect command received");
201 if let Some(client) = self.client.take() {
202 client.disconnect().await;
203 }
204 }
205 HandlerCommand::Subscribe {
206 request_id,
207 symbol,
208 level,
209 } => {
210 log::debug!(
211 "Subscribe command received: request_id={request_id}, symbol={symbol}, level={level:?}"
212 );
213 self.send_subscribe(request_id, &symbol, level).await;
214 }
215 HandlerCommand::Unsubscribe { request_id, symbol } => {
216 log::debug!(
217 "Unsubscribe command received: request_id={request_id}, symbol={symbol}"
218 );
219 self.send_unsubscribe(request_id, &symbol).await;
220 }
221 HandlerCommand::SubscribeCandles {
222 request_id,
223 symbol,
224 width,
225 } => {
226 log::debug!(
227 "SubscribeCandles command received: request_id={request_id}, symbol={symbol}, width={width:?}"
228 );
229 self.send_subscribe_candles(request_id, &symbol, width)
230 .await;
231 }
232 HandlerCommand::UnsubscribeCandles {
233 request_id,
234 symbol,
235 width,
236 } => {
237 log::debug!(
238 "UnsubscribeCandles command received: request_id={request_id}, symbol={symbol}, width={width:?}"
239 );
240 self.send_unsubscribe_candles(request_id, &symbol, width)
241 .await;
242 }
243 HandlerCommand::InitializeInstruments(instruments) => {
244 for inst in instruments {
245 self.instruments.insert(inst.symbol().inner(), inst);
246 }
247 }
248 HandlerCommand::UpdateInstrument(inst) => {
249 self.instruments.insert(inst.symbol().inner(), *inst);
250 }
251 }
252 }
253
254 async fn send_subscribe(&self, request_id: i64, symbol: &str, level: AxMarketDataLevel) {
255 let msg = AxMdSubscribe {
256 request_id,
257 msg_type: "subscribe".to_string(),
258 symbol: symbol.to_string(),
259 level,
260 };
261
262 if let Err(e) = self.send_json(&msg).await {
263 log::error!("Failed to send subscribe message: {e}");
264 }
265 }
266
267 async fn send_unsubscribe(&self, request_id: i64, symbol: &str) {
268 let msg = AxMdUnsubscribe {
269 request_id,
270 msg_type: "unsubscribe".to_string(),
271 symbol: symbol.to_string(),
272 };
273
274 if let Err(e) = self.send_json(&msg).await {
275 log::error!("Failed to send unsubscribe message: {e}");
276 }
277 }
278
279 async fn send_subscribe_candles(&self, request_id: i64, symbol: &str, width: AxCandleWidth) {
280 let msg = AxMdSubscribeCandles {
281 request_id,
282 msg_type: "subscribe_candles".to_string(),
283 symbol: symbol.to_string(),
284 width,
285 };
286
287 if let Err(e) = self.send_json(&msg).await {
288 log::error!("Failed to send subscribe_candles message: {e}");
289 }
290 }
291
292 async fn send_unsubscribe_candles(&self, request_id: i64, symbol: &str, width: AxCandleWidth) {
293 let msg = AxMdUnsubscribeCandles {
294 request_id,
295 msg_type: "unsubscribe_candles".to_string(),
296 symbol: symbol.to_string(),
297 width,
298 };
299
300 if let Err(e) = self.send_json(&msg).await {
301 log::error!("Failed to send unsubscribe_candles message: {e}");
302 }
303 }
304
305 async fn send_json<T: serde::Serialize>(&self, msg: &T) -> Result<(), String> {
306 let Some(client) = &self.client else {
307 return Err("No WebSocket client available".to_string());
308 };
309
310 let payload = serde_json::to_string(msg).map_err(|e| e.to_string())?;
311 log::trace!("Sending: {payload}");
312
313 client
314 .send_text(payload, None)
315 .await
316 .map_err(|e| e.to_string())
317 }
318
319 fn parse_raw_message(&mut self, msg: Message) -> Option<Vec<NautilusWsMessage>> {
320 match msg {
321 Message::Text(text) => {
322 if text == nautilus_network::RECONNECTED {
323 log::info!("Received WebSocket reconnected signal");
324 return Some(vec![NautilusWsMessage::Reconnected]);
325 }
326
327 log::trace!("Raw websocket message: {text}");
328
329 let value: serde_json::Value = match serde_json::from_str(&text) {
330 Ok(v) => v,
331 Err(e) => {
332 log::error!("Failed to parse WebSocket message: {e}: {text}");
333 return None;
334 }
335 };
336
337 self.classify_and_parse_message(value)
338 }
339 Message::Binary(data) => {
340 log::debug!("Received binary message with {} bytes", data.len());
341 None
342 }
343 Message::Close(_) => {
344 log::debug!("Received close message, waiting for reconnection");
345 None
346 }
347 _ => None,
348 }
349 }
350
351 fn classify_and_parse_message(
352 &self,
353 value: serde_json::Value,
354 ) -> Option<Vec<NautilusWsMessage>> {
355 let obj = value.as_object()?;
356
357 let msg_type = obj.get("t").and_then(|v| v.as_str())?;
359
360 match msg_type {
361 "h" => match serde_json::from_value::<AxMdHeartbeat>(value) {
362 Ok(heartbeat) => {
363 log::trace!("Received heartbeat ts={}", heartbeat.ts);
364 Some(vec![NautilusWsMessage::Heartbeat])
365 }
366 Err(e) => {
367 log::error!("Failed to parse heartbeat: {e}");
368 None
369 }
370 },
371 "s" | "t" => {
372 match serde_json::from_value::<AxMdTrade>(value) {
374 Ok(trade) => {
375 log::debug!("Received trade: {} {} @ {}", trade.s, trade.q, trade.p);
376
377 let Some(instrument) = self.instruments.get(&trade.s) else {
378 log::error!(
379 "No instrument cached for symbol '{}' - cannot parse trade",
380 trade.s
381 );
382 return None;
383 };
384
385 let ts_init = self.generate_ts_init();
386 match parse_trade_tick(&trade, instrument, ts_init) {
387 Ok(tick) => {
388 Some(vec![NautilusWsMessage::Data(vec![Data::Trade(tick)])])
389 }
390 Err(e) => {
391 log::error!("Failed to parse trade to TradeTick: {e}");
392 None
393 }
394 }
395 }
396 Err(e) => {
397 log::error!("Failed to parse trade: {e}");
398 None
399 }
400 }
401 }
402 "c" => match serde_json::from_value::<AxMdCandle>(value) {
403 Ok(candle) => {
404 log::debug!(
405 "Received candle: {} {} O={} C={}",
406 candle.symbol,
407 candle.width,
408 candle.open,
409 candle.close
410 );
411
412 let Some(instrument) = self.instruments.get(&candle.symbol) else {
413 log::error!(
414 "No instrument cached for symbol '{}' - cannot parse candle",
415 candle.symbol
416 );
417 return None;
418 };
419
420 let ts_init = self.generate_ts_init();
421 match parse_candle_bar(&candle, instrument, ts_init) {
422 Ok(bar) => Some(vec![NautilusWsMessage::Bar(bar)]),
423 Err(e) => {
424 log::error!("Failed to parse candle to Bar: {e}");
425 None
426 }
427 }
428 }
429 Err(e) => {
430 log::error!("Failed to parse candle: {e}");
431 None
432 }
433 },
434 "1" => match serde_json::from_value::<AxMdBookL1>(value) {
435 Ok(book) => {
436 log::debug!("Received book L1: {}", book.s);
437
438 let Some(instrument) = self.instruments.get(&book.s) else {
439 log::error!(
440 "No instrument cached for symbol '{}' - cannot parse L1 book",
441 book.s
442 );
443 return None;
444 };
445
446 let ts_init = self.generate_ts_init();
447 match parse_book_l1_quote(&book, instrument, ts_init) {
448 Ok(quote) => Some(vec![NautilusWsMessage::Data(vec![Data::Quote(quote)])]),
449 Err(e) => {
450 log::error!("Failed to parse L1 to QuoteTick: {e}");
451 None
452 }
453 }
454 }
455 Err(e) => {
456 log::error!("Failed to parse book L1: {e}");
457 None
458 }
459 },
460 "2" => match serde_json::from_value::<AxMdBookL2>(value) {
461 Ok(book) => {
462 log::debug!(
463 "Received book L2: {} ({} bids, {} asks)",
464 book.s,
465 book.b.len(),
466 book.a.len()
467 );
468
469 let Some(instrument) = self.instruments.get(&book.s) else {
470 log::error!(
471 "No instrument cached for symbol '{}' - cannot parse L2 book",
472 book.s
473 );
474 return None;
475 };
476
477 let ts_init = self.generate_ts_init();
478 match parse_book_l2_deltas(&book, instrument, ts_init) {
479 Ok(deltas) => Some(vec![NautilusWsMessage::Deltas(deltas)]),
480 Err(e) => {
481 log::error!("Failed to parse L2 to OrderBookDeltas: {e}");
482 None
483 }
484 }
485 }
486 Err(e) => {
487 log::error!("Failed to parse book L2: {e}");
488 None
489 }
490 },
491 "3" => match serde_json::from_value::<AxMdBookL3>(value) {
492 Ok(book) => {
493 log::debug!(
494 "Received book L3: {} ({} bids, {} asks)",
495 book.s,
496 book.b.len(),
497 book.a.len()
498 );
499
500 let Some(instrument) = self.instruments.get(&book.s) else {
501 log::error!(
502 "No instrument cached for symbol '{}' - cannot parse L3 book",
503 book.s
504 );
505 return None;
506 };
507
508 let ts_init = self.generate_ts_init();
509 match parse_book_l3_deltas(&book, instrument, ts_init) {
510 Ok(deltas) => Some(vec![NautilusWsMessage::Deltas(deltas)]),
511 Err(e) => {
512 log::error!("Failed to parse L3 to OrderBookDeltas: {e}");
513 None
514 }
515 }
516 }
517 Err(e) => {
518 log::error!("Failed to parse book L3: {e}");
519 None
520 }
521 },
522 _ => {
523 log::warn!("Unknown message type: {msg_type}");
524 Some(vec![NautilusWsMessage::Error(AxWsError::new(format!(
525 "Unknown message type: {msg_type}"
526 )))])
527 }
528 }
529 }
530}