Skip to main content

nautilus_architect_ax/websocket/data/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Market data WebSocket message handler for Ax.
17
18use std::{
19    collections::VecDeque,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use dashmap::DashMap;
28use nautilus_core::{
29    nanos::UnixNanos,
30    time::{AtomicTime, get_atomic_clock_realtime},
31};
32use nautilus_model::{
33    data::Data,
34    instruments::{Instrument, InstrumentAny},
35};
36use nautilus_network::websocket::{SubscriptionState, WebSocketClient};
37use tokio_tungstenite::tungstenite::Message;
38use ustr::Ustr;
39
40use super::{
41    client::SymbolDataTypes,
42    parse::{
43        parse_book_l1_quote, parse_book_l2_deltas, parse_book_l3_deltas, parse_candle_bar,
44        parse_trade_tick,
45    },
46};
47use crate::{
48    common::enums::{AxCandleWidth, AxMarketDataLevel, AxMdRequestType},
49    websocket::{
50        messages::{
51            AxMdCandle, AxMdMessage, AxMdSubscribe, AxMdSubscribeCandles, AxMdUnsubscribe,
52            AxMdUnsubscribeCandles, NautilusDataWsMessage,
53        },
54        parse::parse_md_message,
55    },
56};
57
58/// Commands sent from the outer client to the inner message handler.
59#[derive(Debug)]
60pub enum HandlerCommand {
61    /// Set the WebSocket client for this handler.
62    SetClient(WebSocketClient),
63    /// Disconnect the WebSocket connection.
64    Disconnect,
65    /// Replay all subscriptions after a reconnection.
66    ReplaySubscriptions,
67    /// Subscribe to market data for a symbol.
68    Subscribe {
69        /// Request ID for correlation.
70        request_id: i64,
71        /// Instrument symbol.
72        symbol: Ustr,
73        /// Market data level.
74        level: AxMarketDataLevel,
75    },
76    /// Unsubscribe from market data for a symbol.
77    Unsubscribe {
78        /// Request ID for correlation.
79        request_id: i64,
80        /// Instrument symbol.
81        symbol: Ustr,
82    },
83    /// Subscribe to candle data for a symbol.
84    SubscribeCandles {
85        /// Request ID for correlation.
86        request_id: i64,
87        /// Instrument symbol.
88        symbol: Ustr,
89        /// Candle width/interval.
90        width: AxCandleWidth,
91    },
92    /// Unsubscribe from candle data for a symbol.
93    UnsubscribeCandles {
94        /// Request ID for correlation.
95        request_id: i64,
96        /// Instrument symbol.
97        symbol: Ustr,
98        /// Candle width/interval.
99        width: AxCandleWidth,
100    },
101    /// Initialize the instrument cache with instruments.
102    InitializeInstruments(Vec<InstrumentAny>),
103    /// Update a single instrument in the cache.
104    UpdateInstrument(Box<InstrumentAny>),
105}
106
107/// Market data feed handler that processes WebSocket messages.
108///
109/// Runs in a dedicated Tokio task and owns the WebSocket client exclusively.
110pub(crate) struct FeedHandler {
111    clock: &'static AtomicTime,
112    signal: Arc<AtomicBool>,
113    client: Option<WebSocketClient>,
114    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
115    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
116    #[allow(dead_code)]
117    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusDataWsMessage>,
118    subscriptions: SubscriptionState,
119    symbol_data_types: Arc<DashMap<String, SymbolDataTypes>>,
120    instruments: AHashMap<Ustr, InstrumentAny>,
121    message_queue: VecDeque<NautilusDataWsMessage>,
122    replay_request_id: i64,
123    needs_subscription_replay: bool,
124    book_sequences: AHashMap<Ustr, u64>,
125    candle_cache: AHashMap<(Ustr, AxCandleWidth), AxMdCandle>,
126    /// Maps request_id -> subscription topic for pending subscribe requests
127    pending_subscribe_requests: AHashMap<i64, String>,
128}
129
130impl FeedHandler {
131    /// Creates a new [`FeedHandler`] instance.
132    #[must_use]
133    pub fn new(
134        signal: Arc<AtomicBool>,
135        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
136        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
137        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusDataWsMessage>,
138        subscriptions: SubscriptionState,
139        symbol_data_types: Arc<DashMap<String, SymbolDataTypes>>,
140    ) -> Self {
141        Self {
142            clock: get_atomic_clock_realtime(),
143            signal,
144            client: None,
145            cmd_rx,
146            raw_rx,
147            out_tx,
148            subscriptions,
149            symbol_data_types,
150            instruments: AHashMap::new(),
151            message_queue: VecDeque::new(),
152            replay_request_id: -1,
153            needs_subscription_replay: false,
154            book_sequences: AHashMap::new(),
155            candle_cache: AHashMap::new(),
156            pending_subscribe_requests: AHashMap::new(),
157        }
158    }
159
160    fn next_replay_request_id(&mut self) -> i64 {
161        self.replay_request_id -= 1;
162        self.replay_request_id
163    }
164
165    async fn replay_subscriptions(&mut self) {
166        // Clear stale candle data (book sequences persist to maintain monotonicity)
167        self.candle_cache.clear();
168
169        let topics = self.subscriptions.all_topics();
170        if topics.is_empty() {
171            log::debug!("No subscriptions to replay after reconnect");
172            return;
173        }
174
175        log::info!("Replaying {} subscriptions after reconnect", topics.len());
176
177        for topic in topics {
178            self.subscriptions.mark_subscribe(&topic);
179
180            // Topic format: "symbol:Level" or "candles:symbol:Width"
181            if let Some(rest) = topic.strip_prefix("candles:") {
182                if let Some((symbol, width_str)) = rest.rsplit_once(':') {
183                    if let Some(width) = Self::parse_candle_width(width_str) {
184                        let request_id = self.next_replay_request_id();
185                        log::debug!(
186                            "Replaying candle subscription: symbol={symbol}, width={width:?}"
187                        );
188                        self.send_subscribe_candles(request_id, Ustr::from(symbol), width)
189                            .await;
190                    } else {
191                        log::warn!("Failed to parse candle width from topic: {topic}");
192                    }
193                } else {
194                    log::warn!("Invalid candle topic format: {topic}");
195                }
196            } else if let Some((symbol, level_str)) = topic.rsplit_once(':') {
197                if let Some(level) = Self::parse_market_data_level(level_str) {
198                    let request_id = self.next_replay_request_id();
199                    log::debug!(
200                        "Replaying market data subscription: symbol={symbol}, level={level:?}"
201                    );
202                    self.send_subscribe(request_id, Ustr::from(symbol), level)
203                        .await;
204                } else {
205                    log::warn!("Failed to parse market data level from topic: {topic}");
206                }
207            } else {
208                log::warn!("Unknown topic format: {topic}");
209            }
210        }
211
212        log::info!("Subscription replay completed");
213    }
214
215    fn parse_market_data_level(s: &str) -> Option<AxMarketDataLevel> {
216        match s {
217            "Level1" => Some(AxMarketDataLevel::Level1),
218            "Level2" => Some(AxMarketDataLevel::Level2),
219            "Level3" => Some(AxMarketDataLevel::Level3),
220            _ => None,
221        }
222    }
223
224    fn parse_candle_width(s: &str) -> Option<AxCandleWidth> {
225        match s {
226            "Seconds1" => Some(AxCandleWidth::Seconds1),
227            "Seconds5" => Some(AxCandleWidth::Seconds5),
228            "Minutes1" => Some(AxCandleWidth::Minutes1),
229            "Minutes5" => Some(AxCandleWidth::Minutes5),
230            "Minutes15" => Some(AxCandleWidth::Minutes15),
231            "Hours1" => Some(AxCandleWidth::Hours1),
232            "Days1" => Some(AxCandleWidth::Days1),
233            _ => None,
234        }
235    }
236
237    fn generate_ts_init(&self) -> UnixNanos {
238        self.clock.get_time_ns()
239    }
240
241    fn next_book_sequence(&mut self, symbol: Ustr) -> u64 {
242        let seq = self.book_sequences.entry(symbol).or_insert(0);
243        *seq += 1;
244        *seq
245    }
246
247    /// Returns the next message from the handler.
248    ///
249    /// This method blocks until a message is available or the handler is stopped.
250    pub async fn next(&mut self) -> Option<NautilusDataWsMessage> {
251        loop {
252            if self.needs_subscription_replay && self.message_queue.is_empty() {
253                self.needs_subscription_replay = false;
254                self.replay_subscriptions().await;
255            }
256
257            if let Some(msg) = self.message_queue.pop_front() {
258                return Some(msg);
259            }
260
261            tokio::select! {
262                Some(cmd) = self.cmd_rx.recv() => {
263                    self.handle_command(cmd).await;
264                }
265
266                () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
267                    if self.signal.load(Ordering::Acquire) {
268                        log::debug!("Stop signal received during idle period");
269                        return None;
270                    }
271                    continue;
272                }
273
274                msg = self.raw_rx.recv() => {
275                    let msg = match msg {
276                        Some(msg) => msg,
277                        None => {
278                            log::debug!("WebSocket stream closed");
279                            return None;
280                        }
281                    };
282
283                    if let Message::Ping(data) = &msg {
284                        log::trace!("Received ping frame with {} bytes", data.len());
285                        if let Some(client) = &self.client
286                            && let Err(e) = client.send_pong(data.to_vec()).await
287                        {
288                            log::warn!("Failed to send pong frame: {e}");
289                        }
290                        continue;
291                    }
292
293                    if let Some(messages) = self.parse_raw_message(msg) {
294                        self.message_queue.extend(messages);
295                    }
296
297                    if self.signal.load(Ordering::Acquire) {
298                        log::debug!("Stop signal received");
299                        return None;
300                    }
301                }
302            }
303        }
304    }
305
306    async fn handle_command(&mut self, cmd: HandlerCommand) {
307        match cmd {
308            HandlerCommand::SetClient(client) => {
309                log::debug!("WebSocketClient received by handler");
310                self.client = Some(client);
311            }
312            HandlerCommand::Disconnect => {
313                log::debug!("Disconnect command received");
314                self.book_sequences.clear();
315                self.candle_cache.clear();
316                if let Some(client) = self.client.take() {
317                    client.disconnect().await;
318                }
319            }
320            HandlerCommand::ReplaySubscriptions => {
321                log::debug!("ReplaySubscriptions command received");
322                self.replay_subscriptions().await;
323            }
324            HandlerCommand::Subscribe {
325                request_id,
326                symbol,
327                level,
328            } => {
329                log::debug!(
330                    "Subscribe command received: request_id={request_id}, symbol={symbol}, level={level:?}"
331                );
332                let topic = format!("{symbol}:{level:?}");
333                self.pending_subscribe_requests.insert(request_id, topic);
334                self.send_subscribe(request_id, symbol, level).await;
335            }
336            HandlerCommand::Unsubscribe { request_id, symbol } => {
337                log::debug!(
338                    "Unsubscribe command received: request_id={request_id}, symbol={symbol}"
339                );
340                self.send_unsubscribe(request_id, symbol).await;
341            }
342            HandlerCommand::SubscribeCandles {
343                request_id,
344                symbol,
345                width,
346            } => {
347                log::debug!(
348                    "SubscribeCandles command received: request_id={request_id}, symbol={symbol}, width={width:?}"
349                );
350                let topic = format!("candles:{symbol}:{width:?}");
351                self.pending_subscribe_requests.insert(request_id, topic);
352                self.send_subscribe_candles(request_id, symbol, width).await;
353            }
354            HandlerCommand::UnsubscribeCandles {
355                request_id,
356                symbol,
357                width,
358            } => {
359                log::debug!(
360                    "UnsubscribeCandles command received: request_id={request_id}, symbol={symbol}, width={width:?}"
361                );
362                self.candle_cache.remove(&(symbol, width));
363                self.send_unsubscribe_candles(request_id, symbol, width)
364                    .await;
365            }
366            HandlerCommand::InitializeInstruments(instruments) => {
367                for inst in instruments {
368                    self.instruments.insert(inst.symbol().inner(), inst);
369                }
370            }
371            HandlerCommand::UpdateInstrument(inst) => {
372                self.instruments.insert(inst.symbol().inner(), *inst);
373            }
374        }
375    }
376
377    async fn send_subscribe(&self, request_id: i64, symbol: Ustr, level: AxMarketDataLevel) {
378        let msg = AxMdSubscribe {
379            rid: request_id,
380            msg_type: AxMdRequestType::Subscribe,
381            symbol,
382            level,
383        };
384
385        if let Err(e) = self.send_json(&msg).await {
386            log::error!("Failed to send subscribe message: {e}");
387        }
388    }
389
390    async fn send_unsubscribe(&self, request_id: i64, symbol: Ustr) {
391        let msg = AxMdUnsubscribe {
392            rid: request_id,
393            msg_type: AxMdRequestType::Unsubscribe,
394            symbol,
395        };
396
397        if let Err(e) = self.send_json(&msg).await {
398            log::error!("Failed to send unsubscribe message: {e}");
399        }
400    }
401
402    async fn send_subscribe_candles(&self, request_id: i64, symbol: Ustr, width: AxCandleWidth) {
403        let msg = AxMdSubscribeCandles {
404            rid: request_id,
405            msg_type: AxMdRequestType::SubscribeCandles,
406            symbol,
407            width,
408        };
409
410        if let Err(e) = self.send_json(&msg).await {
411            log::error!("Failed to send subscribe_candles message: {e}");
412        }
413    }
414
415    async fn send_unsubscribe_candles(&self, request_id: i64, symbol: Ustr, width: AxCandleWidth) {
416        let msg = AxMdUnsubscribeCandles {
417            rid: request_id,
418            msg_type: AxMdRequestType::UnsubscribeCandles,
419            symbol,
420            width,
421        };
422
423        if let Err(e) = self.send_json(&msg).await {
424            log::error!("Failed to send unsubscribe_candles message: {e}");
425        }
426    }
427
428    async fn send_json<T: serde::Serialize>(&self, msg: &T) -> Result<(), String> {
429        let Some(client) = &self.client else {
430            return Err("No WebSocket client available".to_string());
431        };
432
433        let payload = serde_json::to_string(msg).map_err(|e| e.to_string())?;
434        log::trace!("Sending: {payload}");
435
436        client
437            .send_text(payload, None)
438            .await
439            .map_err(|e| e.to_string())
440    }
441
442    fn parse_raw_message(&mut self, msg: Message) -> Option<Vec<NautilusDataWsMessage>> {
443        match msg {
444            Message::Text(text) => {
445                if text == nautilus_network::RECONNECTED {
446                    log::info!("Received WebSocket reconnected signal");
447                    self.needs_subscription_replay = true;
448                    return Some(vec![NautilusDataWsMessage::Reconnected]);
449                }
450
451                log::trace!("Raw websocket message: {text}");
452
453                match parse_md_message(&text) {
454                    Ok(message) => self.handle_message(message),
455                    Err(e) => {
456                        log::error!("Failed to parse WebSocket message: {e}: {text}");
457                        None
458                    }
459                }
460            }
461            Message::Binary(data) => {
462                log::debug!("Received binary message with {} bytes", data.len());
463                None
464            }
465            Message::Close(_) => {
466                log::debug!("Received close message, waiting for reconnection");
467                None
468            }
469            _ => None,
470        }
471    }
472
473    fn handle_message(&mut self, message: AxMdMessage) -> Option<Vec<NautilusDataWsMessage>> {
474        match message {
475            AxMdMessage::BookL1(book) => {
476                log::debug!("Received book L1: {}", book.s);
477
478                let l1_subscribed = self
479                    .symbol_data_types
480                    .get(book.s.as_str())
481                    .is_some_and(|e| e.quotes || e.book_level == Some(AxMarketDataLevel::Level1));
482
483                if !l1_subscribed {
484                    return None;
485                }
486
487                let Some(instrument) = self.instruments.get(&book.s) else {
488                    log::error!(
489                        "No instrument cached for symbol '{}' - cannot parse L1 book",
490                        book.s
491                    );
492                    return None;
493                };
494
495                let ts_init = self.generate_ts_init();
496                match parse_book_l1_quote(&book, instrument, ts_init) {
497                    Ok(quote) => Some(vec![NautilusDataWsMessage::Data(vec![Data::Quote(quote)])]),
498                    Err(e) => {
499                        log::error!("Failed to parse L1 to QuoteTick: {e}");
500                        None
501                    }
502                }
503            }
504            AxMdMessage::BookL2(book) => {
505                log::debug!(
506                    "Received book L2: {} ({} bids, {} asks)",
507                    book.s,
508                    book.b.len(),
509                    book.a.len()
510                );
511
512                let symbol = book.s;
513                let sequence = self.next_book_sequence(symbol);
514
515                let Some(instrument) = self.instruments.get(&symbol) else {
516                    log::error!(
517                        "No instrument cached for symbol '{symbol}' - cannot parse L2 book"
518                    );
519                    return None;
520                };
521
522                let ts_init = self.generate_ts_init();
523                match parse_book_l2_deltas(&book, instrument, sequence, ts_init) {
524                    Ok(deltas) => Some(vec![NautilusDataWsMessage::Deltas(deltas)]),
525                    Err(e) => {
526                        log::error!("Failed to parse L2 to OrderBookDeltas: {e}");
527                        None
528                    }
529                }
530            }
531            AxMdMessage::BookL3(book) => {
532                log::debug!(
533                    "Received book L3: {} ({} bids, {} asks)",
534                    book.s,
535                    book.b.len(),
536                    book.a.len()
537                );
538
539                let symbol = book.s;
540                let sequence = self.next_book_sequence(symbol);
541
542                let Some(instrument) = self.instruments.get(&symbol) else {
543                    log::error!(
544                        "No instrument cached for symbol '{symbol}' - cannot parse L3 book"
545                    );
546                    return None;
547                };
548
549                let ts_init = self.generate_ts_init();
550                match parse_book_l3_deltas(&book, instrument, sequence, ts_init) {
551                    Ok(deltas) => Some(vec![NautilusDataWsMessage::Deltas(deltas)]),
552                    Err(e) => {
553                        log::error!("Failed to parse L3 to OrderBookDeltas: {e}");
554                        None
555                    }
556                }
557            }
558            AxMdMessage::Ticker(ticker) => {
559                // Ticker lacks bid/ask, L1 book subscription provides actual quotes
560                log::debug!(
561                    "Received ticker: {} last={} vol={} oi={:?}",
562                    ticker.s,
563                    ticker.p,
564                    ticker.v,
565                    ticker.oi
566                );
567                None
568            }
569            AxMdMessage::Trade(trade) => {
570                log::debug!("Received trade: {} {} @ {}", trade.s, trade.q, trade.p);
571
572                let trades_subscribed = self
573                    .symbol_data_types
574                    .get(trade.s.as_str())
575                    .is_some_and(|e| e.trades);
576
577                if !trades_subscribed {
578                    return None;
579                }
580
581                let Some(instrument) = self.instruments.get(&trade.s) else {
582                    log::error!(
583                        "No instrument cached for symbol '{}' - cannot parse trade",
584                        trade.s
585                    );
586                    return None;
587                };
588
589                let ts_init = self.generate_ts_init();
590                match parse_trade_tick(&trade, instrument, ts_init) {
591                    Ok(tick) => Some(vec![NautilusDataWsMessage::Data(vec![Data::Trade(tick)])]),
592                    Err(e) => {
593                        log::error!("Failed to parse trade to TradeTick: {e}");
594                        None
595                    }
596                }
597            }
598            AxMdMessage::Candle(candle) => self.handle_candle(candle),
599            AxMdMessage::Heartbeat(heartbeat) => {
600                log::trace!("Received heartbeat ts={}", heartbeat.ts);
601                Some(vec![NautilusDataWsMessage::Heartbeat])
602            }
603            AxMdMessage::Error(error) => {
604                let is_benign = error.message.contains("already subscribed")
605                    || error.message.contains("not subscribed");
606
607                if is_benign {
608                    // Benign: venue state matches intent, just clean up tracking
609                    if let Some(rid) = error.request_id {
610                        self.pending_subscribe_requests.remove(&rid);
611                    }
612                    log::warn!("Subscription state: {}", error.message);
613                } else {
614                    // Real error: roll back subscription state
615                    if let Some(rid) = error.request_id
616                        && let Some(topic) = self.pending_subscribe_requests.remove(&rid)
617                    {
618                        log::warn!(
619                            "Rolling back subscription for topic '{topic}' \
620                             due to error: {}",
621                            error.message
622                        );
623                        self.subscriptions.mark_unsubscribe(&topic);
624                    }
625                    log::error!("Received error from exchange: {}", error.message);
626                }
627                Some(vec![NautilusDataWsMessage::Error(error)])
628            }
629            AxMdMessage::SubscriptionResponse(response) => {
630                self.pending_subscribe_requests.remove(&response.rid);
631
632                if let Some(symbol) = &response.result.subscribed {
633                    log::debug!("Subscription confirmed for symbol: {symbol}");
634                } else if let Some(candle) = &response.result.subscribed_candle {
635                    log::debug!("Candle subscription confirmed: {candle}");
636                } else if let Some(symbol) = &response.result.unsubscribed {
637                    log::debug!("Unsubscription confirmed for symbol: {symbol}");
638                } else if let Some(candle) = &response.result.unsubscribed_candle {
639                    log::debug!("Candle unsubscription confirmed: {candle}");
640                }
641                None
642            }
643        }
644    }
645
646    fn handle_candle(&mut self, candle: AxMdCandle) -> Option<Vec<NautilusDataWsMessage>> {
647        log::debug!(
648            "Received candle: {} {} O={} C={}",
649            candle.symbol,
650            candle.width,
651            candle.open,
652            candle.close
653        );
654
655        let cache_key = (candle.symbol, candle.width);
656
657        // Only emit when timestamp changes (previous candle closed)
658        let closed_candle = if let Some(cached) = self.candle_cache.get(&cache_key) {
659            if cached.ts == candle.ts {
660                None
661            } else {
662                Some(cached.clone())
663            }
664        } else {
665            None
666        };
667
668        self.candle_cache.insert(cache_key, candle);
669
670        let closed = closed_candle?;
671
672        let Some(instrument) = self.instruments.get(&closed.symbol) else {
673            log::error!(
674                "No instrument cached for symbol '{}' - cannot parse candle",
675                closed.symbol
676            );
677            return None;
678        };
679
680        let ts_init = self.generate_ts_init();
681        match parse_candle_bar(&closed, instrument, ts_init) {
682            Ok(bar) => Some(vec![NautilusDataWsMessage::Bar(bar)]),
683            Err(e) => {
684                log::error!("Failed to parse candle to Bar: {e}");
685                None
686            }
687        }
688    }
689}