1use std::{
19 collections::VecDeque,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use dashmap::DashMap;
28use nautilus_core::{
29 nanos::UnixNanos,
30 time::{AtomicTime, get_atomic_clock_realtime},
31};
32use nautilus_model::{
33 data::Data,
34 instruments::{Instrument, InstrumentAny},
35};
36use nautilus_network::websocket::{SubscriptionState, WebSocketClient};
37use tokio_tungstenite::tungstenite::Message;
38use ustr::Ustr;
39
40use super::{
41 client::SymbolDataTypes,
42 parse::{
43 parse_book_l1_quote, parse_book_l2_deltas, parse_book_l3_deltas, parse_candle_bar,
44 parse_trade_tick,
45 },
46};
47use crate::{
48 common::enums::{AxCandleWidth, AxMarketDataLevel, AxMdRequestType},
49 websocket::{
50 messages::{
51 AxMdCandle, AxMdMessage, AxMdSubscribe, AxMdSubscribeCandles, AxMdUnsubscribe,
52 AxMdUnsubscribeCandles, NautilusDataWsMessage,
53 },
54 parse::parse_md_message,
55 },
56};
57
58#[derive(Debug)]
60pub enum HandlerCommand {
61 SetClient(WebSocketClient),
63 Disconnect,
65 ReplaySubscriptions,
67 Subscribe {
69 request_id: i64,
71 symbol: Ustr,
73 level: AxMarketDataLevel,
75 },
76 Unsubscribe {
78 request_id: i64,
80 symbol: Ustr,
82 },
83 SubscribeCandles {
85 request_id: i64,
87 symbol: Ustr,
89 width: AxCandleWidth,
91 },
92 UnsubscribeCandles {
94 request_id: i64,
96 symbol: Ustr,
98 width: AxCandleWidth,
100 },
101 InitializeInstruments(Vec<InstrumentAny>),
103 UpdateInstrument(Box<InstrumentAny>),
105}
106
107pub(crate) struct FeedHandler {
111 clock: &'static AtomicTime,
112 signal: Arc<AtomicBool>,
113 client: Option<WebSocketClient>,
114 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
115 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
116 #[allow(dead_code)]
117 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusDataWsMessage>,
118 subscriptions: SubscriptionState,
119 symbol_data_types: Arc<DashMap<String, SymbolDataTypes>>,
120 instruments: AHashMap<Ustr, InstrumentAny>,
121 message_queue: VecDeque<NautilusDataWsMessage>,
122 replay_request_id: i64,
123 needs_subscription_replay: bool,
124 book_sequences: AHashMap<Ustr, u64>,
125 candle_cache: AHashMap<(Ustr, AxCandleWidth), AxMdCandle>,
126 pending_subscribe_requests: AHashMap<i64, String>,
128}
129
130impl FeedHandler {
131 #[must_use]
133 pub fn new(
134 signal: Arc<AtomicBool>,
135 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
136 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
137 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusDataWsMessage>,
138 subscriptions: SubscriptionState,
139 symbol_data_types: Arc<DashMap<String, SymbolDataTypes>>,
140 ) -> Self {
141 Self {
142 clock: get_atomic_clock_realtime(),
143 signal,
144 client: None,
145 cmd_rx,
146 raw_rx,
147 out_tx,
148 subscriptions,
149 symbol_data_types,
150 instruments: AHashMap::new(),
151 message_queue: VecDeque::new(),
152 replay_request_id: -1,
153 needs_subscription_replay: false,
154 book_sequences: AHashMap::new(),
155 candle_cache: AHashMap::new(),
156 pending_subscribe_requests: AHashMap::new(),
157 }
158 }
159
160 fn next_replay_request_id(&mut self) -> i64 {
161 self.replay_request_id -= 1;
162 self.replay_request_id
163 }
164
165 async fn replay_subscriptions(&mut self) {
166 self.candle_cache.clear();
168
169 let topics = self.subscriptions.all_topics();
170 if topics.is_empty() {
171 log::debug!("No subscriptions to replay after reconnect");
172 return;
173 }
174
175 log::info!("Replaying {} subscriptions after reconnect", topics.len());
176
177 for topic in topics {
178 self.subscriptions.mark_subscribe(&topic);
179
180 if let Some(rest) = topic.strip_prefix("candles:") {
182 if let Some((symbol, width_str)) = rest.rsplit_once(':') {
183 if let Some(width) = Self::parse_candle_width(width_str) {
184 let request_id = self.next_replay_request_id();
185 log::debug!(
186 "Replaying candle subscription: symbol={symbol}, width={width:?}"
187 );
188 self.send_subscribe_candles(request_id, Ustr::from(symbol), width)
189 .await;
190 } else {
191 log::warn!("Failed to parse candle width from topic: {topic}");
192 }
193 } else {
194 log::warn!("Invalid candle topic format: {topic}");
195 }
196 } else if let Some((symbol, level_str)) = topic.rsplit_once(':') {
197 if let Some(level) = Self::parse_market_data_level(level_str) {
198 let request_id = self.next_replay_request_id();
199 log::debug!(
200 "Replaying market data subscription: symbol={symbol}, level={level:?}"
201 );
202 self.send_subscribe(request_id, Ustr::from(symbol), level)
203 .await;
204 } else {
205 log::warn!("Failed to parse market data level from topic: {topic}");
206 }
207 } else {
208 log::warn!("Unknown topic format: {topic}");
209 }
210 }
211
212 log::info!("Subscription replay completed");
213 }
214
215 fn parse_market_data_level(s: &str) -> Option<AxMarketDataLevel> {
216 match s {
217 "Level1" => Some(AxMarketDataLevel::Level1),
218 "Level2" => Some(AxMarketDataLevel::Level2),
219 "Level3" => Some(AxMarketDataLevel::Level3),
220 _ => None,
221 }
222 }
223
224 fn parse_candle_width(s: &str) -> Option<AxCandleWidth> {
225 match s {
226 "Seconds1" => Some(AxCandleWidth::Seconds1),
227 "Seconds5" => Some(AxCandleWidth::Seconds5),
228 "Minutes1" => Some(AxCandleWidth::Minutes1),
229 "Minutes5" => Some(AxCandleWidth::Minutes5),
230 "Minutes15" => Some(AxCandleWidth::Minutes15),
231 "Hours1" => Some(AxCandleWidth::Hours1),
232 "Days1" => Some(AxCandleWidth::Days1),
233 _ => None,
234 }
235 }
236
237 fn generate_ts_init(&self) -> UnixNanos {
238 self.clock.get_time_ns()
239 }
240
241 fn next_book_sequence(&mut self, symbol: Ustr) -> u64 {
242 let seq = self.book_sequences.entry(symbol).or_insert(0);
243 *seq += 1;
244 *seq
245 }
246
247 pub async fn next(&mut self) -> Option<NautilusDataWsMessage> {
251 loop {
252 if self.needs_subscription_replay && self.message_queue.is_empty() {
253 self.needs_subscription_replay = false;
254 self.replay_subscriptions().await;
255 }
256
257 if let Some(msg) = self.message_queue.pop_front() {
258 return Some(msg);
259 }
260
261 tokio::select! {
262 Some(cmd) = self.cmd_rx.recv() => {
263 self.handle_command(cmd).await;
264 }
265
266 () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
267 if self.signal.load(Ordering::Acquire) {
268 log::debug!("Stop signal received during idle period");
269 return None;
270 }
271 continue;
272 }
273
274 msg = self.raw_rx.recv() => {
275 let msg = match msg {
276 Some(msg) => msg,
277 None => {
278 log::debug!("WebSocket stream closed");
279 return None;
280 }
281 };
282
283 if let Message::Ping(data) = &msg {
284 log::trace!("Received ping frame with {} bytes", data.len());
285 if let Some(client) = &self.client
286 && let Err(e) = client.send_pong(data.to_vec()).await
287 {
288 log::warn!("Failed to send pong frame: {e}");
289 }
290 continue;
291 }
292
293 if let Some(messages) = self.parse_raw_message(msg) {
294 self.message_queue.extend(messages);
295 }
296
297 if self.signal.load(Ordering::Acquire) {
298 log::debug!("Stop signal received");
299 return None;
300 }
301 }
302 }
303 }
304 }
305
306 async fn handle_command(&mut self, cmd: HandlerCommand) {
307 match cmd {
308 HandlerCommand::SetClient(client) => {
309 log::debug!("WebSocketClient received by handler");
310 self.client = Some(client);
311 }
312 HandlerCommand::Disconnect => {
313 log::debug!("Disconnect command received");
314 self.book_sequences.clear();
315 self.candle_cache.clear();
316 if let Some(client) = self.client.take() {
317 client.disconnect().await;
318 }
319 }
320 HandlerCommand::ReplaySubscriptions => {
321 log::debug!("ReplaySubscriptions command received");
322 self.replay_subscriptions().await;
323 }
324 HandlerCommand::Subscribe {
325 request_id,
326 symbol,
327 level,
328 } => {
329 log::debug!(
330 "Subscribe command received: request_id={request_id}, symbol={symbol}, level={level:?}"
331 );
332 let topic = format!("{symbol}:{level:?}");
333 self.pending_subscribe_requests.insert(request_id, topic);
334 self.send_subscribe(request_id, symbol, level).await;
335 }
336 HandlerCommand::Unsubscribe { request_id, symbol } => {
337 log::debug!(
338 "Unsubscribe command received: request_id={request_id}, symbol={symbol}"
339 );
340 self.send_unsubscribe(request_id, symbol).await;
341 }
342 HandlerCommand::SubscribeCandles {
343 request_id,
344 symbol,
345 width,
346 } => {
347 log::debug!(
348 "SubscribeCandles command received: request_id={request_id}, symbol={symbol}, width={width:?}"
349 );
350 let topic = format!("candles:{symbol}:{width:?}");
351 self.pending_subscribe_requests.insert(request_id, topic);
352 self.send_subscribe_candles(request_id, symbol, width).await;
353 }
354 HandlerCommand::UnsubscribeCandles {
355 request_id,
356 symbol,
357 width,
358 } => {
359 log::debug!(
360 "UnsubscribeCandles command received: request_id={request_id}, symbol={symbol}, width={width:?}"
361 );
362 self.candle_cache.remove(&(symbol, width));
363 self.send_unsubscribe_candles(request_id, symbol, width)
364 .await;
365 }
366 HandlerCommand::InitializeInstruments(instruments) => {
367 for inst in instruments {
368 self.instruments.insert(inst.symbol().inner(), inst);
369 }
370 }
371 HandlerCommand::UpdateInstrument(inst) => {
372 self.instruments.insert(inst.symbol().inner(), *inst);
373 }
374 }
375 }
376
377 async fn send_subscribe(&self, request_id: i64, symbol: Ustr, level: AxMarketDataLevel) {
378 let msg = AxMdSubscribe {
379 rid: request_id,
380 msg_type: AxMdRequestType::Subscribe,
381 symbol,
382 level,
383 };
384
385 if let Err(e) = self.send_json(&msg).await {
386 log::error!("Failed to send subscribe message: {e}");
387 }
388 }
389
390 async fn send_unsubscribe(&self, request_id: i64, symbol: Ustr) {
391 let msg = AxMdUnsubscribe {
392 rid: request_id,
393 msg_type: AxMdRequestType::Unsubscribe,
394 symbol,
395 };
396
397 if let Err(e) = self.send_json(&msg).await {
398 log::error!("Failed to send unsubscribe message: {e}");
399 }
400 }
401
402 async fn send_subscribe_candles(&self, request_id: i64, symbol: Ustr, width: AxCandleWidth) {
403 let msg = AxMdSubscribeCandles {
404 rid: request_id,
405 msg_type: AxMdRequestType::SubscribeCandles,
406 symbol,
407 width,
408 };
409
410 if let Err(e) = self.send_json(&msg).await {
411 log::error!("Failed to send subscribe_candles message: {e}");
412 }
413 }
414
415 async fn send_unsubscribe_candles(&self, request_id: i64, symbol: Ustr, width: AxCandleWidth) {
416 let msg = AxMdUnsubscribeCandles {
417 rid: request_id,
418 msg_type: AxMdRequestType::UnsubscribeCandles,
419 symbol,
420 width,
421 };
422
423 if let Err(e) = self.send_json(&msg).await {
424 log::error!("Failed to send unsubscribe_candles message: {e}");
425 }
426 }
427
428 async fn send_json<T: serde::Serialize>(&self, msg: &T) -> Result<(), String> {
429 let Some(client) = &self.client else {
430 return Err("No WebSocket client available".to_string());
431 };
432
433 let payload = serde_json::to_string(msg).map_err(|e| e.to_string())?;
434 log::trace!("Sending: {payload}");
435
436 client
437 .send_text(payload, None)
438 .await
439 .map_err(|e| e.to_string())
440 }
441
442 fn parse_raw_message(&mut self, msg: Message) -> Option<Vec<NautilusDataWsMessage>> {
443 match msg {
444 Message::Text(text) => {
445 if text == nautilus_network::RECONNECTED {
446 log::info!("Received WebSocket reconnected signal");
447 self.needs_subscription_replay = true;
448 return Some(vec![NautilusDataWsMessage::Reconnected]);
449 }
450
451 log::trace!("Raw websocket message: {text}");
452
453 match parse_md_message(&text) {
454 Ok(message) => self.handle_message(message),
455 Err(e) => {
456 log::error!("Failed to parse WebSocket message: {e}: {text}");
457 None
458 }
459 }
460 }
461 Message::Binary(data) => {
462 log::debug!("Received binary message with {} bytes", data.len());
463 None
464 }
465 Message::Close(_) => {
466 log::debug!("Received close message, waiting for reconnection");
467 None
468 }
469 _ => None,
470 }
471 }
472
473 fn handle_message(&mut self, message: AxMdMessage) -> Option<Vec<NautilusDataWsMessage>> {
474 match message {
475 AxMdMessage::BookL1(book) => {
476 log::debug!("Received book L1: {}", book.s);
477
478 let l1_subscribed = self
479 .symbol_data_types
480 .get(book.s.as_str())
481 .is_some_and(|e| e.quotes || e.book_level == Some(AxMarketDataLevel::Level1));
482
483 if !l1_subscribed {
484 return None;
485 }
486
487 let Some(instrument) = self.instruments.get(&book.s) else {
488 log::error!(
489 "No instrument cached for symbol '{}' - cannot parse L1 book",
490 book.s
491 );
492 return None;
493 };
494
495 let ts_init = self.generate_ts_init();
496 match parse_book_l1_quote(&book, instrument, ts_init) {
497 Ok(quote) => Some(vec![NautilusDataWsMessage::Data(vec![Data::Quote(quote)])]),
498 Err(e) => {
499 log::error!("Failed to parse L1 to QuoteTick: {e}");
500 None
501 }
502 }
503 }
504 AxMdMessage::BookL2(book) => {
505 log::debug!(
506 "Received book L2: {} ({} bids, {} asks)",
507 book.s,
508 book.b.len(),
509 book.a.len()
510 );
511
512 let symbol = book.s;
513 let sequence = self.next_book_sequence(symbol);
514
515 let Some(instrument) = self.instruments.get(&symbol) else {
516 log::error!(
517 "No instrument cached for symbol '{symbol}' - cannot parse L2 book"
518 );
519 return None;
520 };
521
522 let ts_init = self.generate_ts_init();
523 match parse_book_l2_deltas(&book, instrument, sequence, ts_init) {
524 Ok(deltas) => Some(vec![NautilusDataWsMessage::Deltas(deltas)]),
525 Err(e) => {
526 log::error!("Failed to parse L2 to OrderBookDeltas: {e}");
527 None
528 }
529 }
530 }
531 AxMdMessage::BookL3(book) => {
532 log::debug!(
533 "Received book L3: {} ({} bids, {} asks)",
534 book.s,
535 book.b.len(),
536 book.a.len()
537 );
538
539 let symbol = book.s;
540 let sequence = self.next_book_sequence(symbol);
541
542 let Some(instrument) = self.instruments.get(&symbol) else {
543 log::error!(
544 "No instrument cached for symbol '{symbol}' - cannot parse L3 book"
545 );
546 return None;
547 };
548
549 let ts_init = self.generate_ts_init();
550 match parse_book_l3_deltas(&book, instrument, sequence, ts_init) {
551 Ok(deltas) => Some(vec![NautilusDataWsMessage::Deltas(deltas)]),
552 Err(e) => {
553 log::error!("Failed to parse L3 to OrderBookDeltas: {e}");
554 None
555 }
556 }
557 }
558 AxMdMessage::Ticker(ticker) => {
559 log::debug!(
561 "Received ticker: {} last={} vol={} oi={:?}",
562 ticker.s,
563 ticker.p,
564 ticker.v,
565 ticker.oi
566 );
567 None
568 }
569 AxMdMessage::Trade(trade) => {
570 log::debug!("Received trade: {} {} @ {}", trade.s, trade.q, trade.p);
571
572 let trades_subscribed = self
573 .symbol_data_types
574 .get(trade.s.as_str())
575 .is_some_and(|e| e.trades);
576
577 if !trades_subscribed {
578 return None;
579 }
580
581 let Some(instrument) = self.instruments.get(&trade.s) else {
582 log::error!(
583 "No instrument cached for symbol '{}' - cannot parse trade",
584 trade.s
585 );
586 return None;
587 };
588
589 let ts_init = self.generate_ts_init();
590 match parse_trade_tick(&trade, instrument, ts_init) {
591 Ok(tick) => Some(vec![NautilusDataWsMessage::Data(vec![Data::Trade(tick)])]),
592 Err(e) => {
593 log::error!("Failed to parse trade to TradeTick: {e}");
594 None
595 }
596 }
597 }
598 AxMdMessage::Candle(candle) => self.handle_candle(candle),
599 AxMdMessage::Heartbeat(heartbeat) => {
600 log::trace!("Received heartbeat ts={}", heartbeat.ts);
601 Some(vec![NautilusDataWsMessage::Heartbeat])
602 }
603 AxMdMessage::Error(error) => {
604 let is_benign = error.message.contains("already subscribed")
605 || error.message.contains("not subscribed");
606
607 if is_benign {
608 if let Some(rid) = error.request_id {
610 self.pending_subscribe_requests.remove(&rid);
611 }
612 log::warn!("Subscription state: {}", error.message);
613 } else {
614 if let Some(rid) = error.request_id
616 && let Some(topic) = self.pending_subscribe_requests.remove(&rid)
617 {
618 log::warn!(
619 "Rolling back subscription for topic '{topic}' \
620 due to error: {}",
621 error.message
622 );
623 self.subscriptions.mark_unsubscribe(&topic);
624 }
625 log::error!("Received error from exchange: {}", error.message);
626 }
627 Some(vec![NautilusDataWsMessage::Error(error)])
628 }
629 AxMdMessage::SubscriptionResponse(response) => {
630 self.pending_subscribe_requests.remove(&response.rid);
631
632 if let Some(symbol) = &response.result.subscribed {
633 log::debug!("Subscription confirmed for symbol: {symbol}");
634 } else if let Some(candle) = &response.result.subscribed_candle {
635 log::debug!("Candle subscription confirmed: {candle}");
636 } else if let Some(symbol) = &response.result.unsubscribed {
637 log::debug!("Unsubscription confirmed for symbol: {symbol}");
638 } else if let Some(candle) = &response.result.unsubscribed_candle {
639 log::debug!("Candle unsubscription confirmed: {candle}");
640 }
641 None
642 }
643 }
644 }
645
646 fn handle_candle(&mut self, candle: AxMdCandle) -> Option<Vec<NautilusDataWsMessage>> {
647 log::debug!(
648 "Received candle: {} {} O={} C={}",
649 candle.symbol,
650 candle.width,
651 candle.open,
652 candle.close
653 );
654
655 let cache_key = (candle.symbol, candle.width);
656
657 let closed_candle = if let Some(cached) = self.candle_cache.get(&cache_key) {
659 if cached.ts == candle.ts {
660 None
661 } else {
662 Some(cached.clone())
663 }
664 } else {
665 None
666 };
667
668 self.candle_cache.insert(cache_key, candle);
669
670 let closed = closed_candle?;
671
672 let Some(instrument) = self.instruments.get(&closed.symbol) else {
673 log::error!(
674 "No instrument cached for symbol '{}' - cannot parse candle",
675 closed.symbol
676 );
677 return None;
678 };
679
680 let ts_init = self.generate_ts_init();
681 match parse_candle_bar(&closed, instrument, ts_init) {
682 Ok(bar) => Some(vec![NautilusDataWsMessage::Bar(bar)]),
683 Err(e) => {
684 log::error!("Failed to parse candle to Bar: {e}");
685 None
686 }
687 }
688 }
689}