nautilus_architect_ax/websocket/data/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Market data WebSocket message handler for Ax.
17
18use std::{
19    collections::VecDeque,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use nautilus_core::{nanos::UnixNanos, time::get_atomic_clock_realtime};
28use nautilus_model::{
29    data::Data,
30    instruments::{Instrument, InstrumentAny},
31};
32use nautilus_network::websocket::{SubscriptionState, WebSocketClient};
33use tokio_tungstenite::tungstenite::Message;
34use ustr::Ustr;
35
36use super::parse::{
37    parse_book_l1_quote, parse_book_l2_deltas, parse_book_l3_deltas, parse_candle_bar,
38    parse_trade_tick,
39};
40use crate::{
41    common::enums::{AxCandleWidth, AxMarketDataLevel},
42    websocket::messages::{
43        AxMdBookL1, AxMdBookL2, AxMdBookL3, AxMdCandle, AxMdHeartbeat, AxMdSubscribe,
44        AxMdSubscribeCandles, AxMdTrade, AxMdUnsubscribe, AxMdUnsubscribeCandles, AxWsError,
45        NautilusWsMessage,
46    },
47};
48
49/// Commands sent from the outer client to the inner message handler.
50#[derive(Debug)]
51pub enum HandlerCommand {
52    /// Set the WebSocket client for this handler.
53    SetClient(WebSocketClient),
54    /// Disconnect the WebSocket connection.
55    Disconnect,
56    /// Subscribe to market data for a symbol.
57    Subscribe {
58        /// Request ID for correlation.
59        request_id: i64,
60        /// Instrument symbol.
61        symbol: String,
62        /// Market data level.
63        level: AxMarketDataLevel,
64    },
65    /// Unsubscribe from market data for a symbol.
66    Unsubscribe {
67        /// Request ID for correlation.
68        request_id: i64,
69        /// Instrument symbol.
70        symbol: String,
71    },
72    /// Subscribe to candle data for a symbol.
73    SubscribeCandles {
74        /// Request ID for correlation.
75        request_id: i64,
76        /// Instrument symbol.
77        symbol: String,
78        /// Candle width/interval.
79        width: AxCandleWidth,
80    },
81    /// Unsubscribe from candle data for a symbol.
82    UnsubscribeCandles {
83        /// Request ID for correlation.
84        request_id: i64,
85        /// Instrument symbol.
86        symbol: String,
87        /// Candle width/interval.
88        width: AxCandleWidth,
89    },
90    /// Initialize the instrument cache with instruments.
91    InitializeInstruments(Vec<InstrumentAny>),
92    /// Update a single instrument in the cache.
93    UpdateInstrument(Box<InstrumentAny>),
94}
95
96/// Market data feed handler that processes WebSocket messages.
97///
98/// Runs in a dedicated Tokio task and owns the WebSocket client exclusively.
99pub(crate) struct FeedHandler {
100    signal: Arc<AtomicBool>,
101    client: Option<WebSocketClient>,
102    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
103    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
104    #[allow(dead_code)] // TODO: Use for sending parsed messages
105    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
106    #[allow(dead_code)] // TODO: Use for tracking subscriptions
107    subscriptions: SubscriptionState,
108    instruments: AHashMap<Ustr, InstrumentAny>,
109    message_queue: VecDeque<NautilusWsMessage>,
110}
111
112impl FeedHandler {
113    /// Creates a new [`FeedHandler`] instance.
114    #[must_use]
115    pub fn new(
116        signal: Arc<AtomicBool>,
117        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
118        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
119        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
120        subscriptions: SubscriptionState,
121    ) -> Self {
122        Self {
123            signal,
124            client: None,
125            cmd_rx,
126            raw_rx,
127            out_tx,
128            subscriptions,
129            instruments: AHashMap::new(),
130            message_queue: VecDeque::new(),
131        }
132    }
133
134    /// Generates the current timestamp for `ts_init`.
135    fn generate_ts_init(&self) -> UnixNanos {
136        get_atomic_clock_realtime().get_time_ns()
137    }
138
139    /// Returns the next message from the handler.
140    ///
141    /// This method blocks until a message is available or the handler is stopped.
142    pub async fn next(&mut self) -> Option<NautilusWsMessage> {
143        loop {
144            if let Some(msg) = self.message_queue.pop_front() {
145                return Some(msg);
146            }
147
148            tokio::select! {
149                Some(cmd) = self.cmd_rx.recv() => {
150                    self.handle_command(cmd).await;
151                }
152
153                () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
154                    if self.signal.load(Ordering::Relaxed) {
155                        log::debug!("Stop signal received during idle period");
156                        return None;
157                    }
158                    continue;
159                }
160
161                msg = self.raw_rx.recv() => {
162                    let msg = match msg {
163                        Some(msg) => msg,
164                        None => {
165                            log::debug!("WebSocket stream closed");
166                            return None;
167                        }
168                    };
169
170                    if let Message::Ping(data) = &msg {
171                        log::trace!("Received ping frame with {} bytes", data.len());
172                        if let Some(client) = &self.client
173                            && let Err(e) = client.send_pong(data.to_vec()).await
174                        {
175                            log::warn!("Failed to send pong frame: {e}");
176                        }
177                        continue;
178                    }
179
180                    if let Some(messages) = self.parse_raw_message(msg) {
181                        self.message_queue.extend(messages);
182                    }
183
184                    if self.signal.load(Ordering::Relaxed) {
185                        log::debug!("Stop signal received");
186                        return None;
187                    }
188                }
189            }
190        }
191    }
192
193    async fn handle_command(&mut self, cmd: HandlerCommand) {
194        match cmd {
195            HandlerCommand::SetClient(client) => {
196                log::debug!("WebSocketClient received by handler");
197                self.client = Some(client);
198            }
199            HandlerCommand::Disconnect => {
200                log::debug!("Disconnect command received");
201                if let Some(client) = self.client.take() {
202                    client.disconnect().await;
203                }
204            }
205            HandlerCommand::Subscribe {
206                request_id,
207                symbol,
208                level,
209            } => {
210                log::debug!(
211                    "Subscribe command received: request_id={request_id}, symbol={symbol}, level={level:?}"
212                );
213                self.send_subscribe(request_id, &symbol, level).await;
214            }
215            HandlerCommand::Unsubscribe { request_id, symbol } => {
216                log::debug!(
217                    "Unsubscribe command received: request_id={request_id}, symbol={symbol}"
218                );
219                self.send_unsubscribe(request_id, &symbol).await;
220            }
221            HandlerCommand::SubscribeCandles {
222                request_id,
223                symbol,
224                width,
225            } => {
226                log::debug!(
227                    "SubscribeCandles command received: request_id={request_id}, symbol={symbol}, width={width:?}"
228                );
229                self.send_subscribe_candles(request_id, &symbol, width)
230                    .await;
231            }
232            HandlerCommand::UnsubscribeCandles {
233                request_id,
234                symbol,
235                width,
236            } => {
237                log::debug!(
238                    "UnsubscribeCandles command received: request_id={request_id}, symbol={symbol}, width={width:?}"
239                );
240                self.send_unsubscribe_candles(request_id, &symbol, width)
241                    .await;
242            }
243            HandlerCommand::InitializeInstruments(instruments) => {
244                for inst in instruments {
245                    self.instruments.insert(inst.symbol().inner(), inst);
246                }
247            }
248            HandlerCommand::UpdateInstrument(inst) => {
249                self.instruments.insert(inst.symbol().inner(), *inst);
250            }
251        }
252    }
253
254    async fn send_subscribe(&self, request_id: i64, symbol: &str, level: AxMarketDataLevel) {
255        let msg = AxMdSubscribe {
256            request_id,
257            msg_type: "subscribe".to_string(),
258            symbol: symbol.to_string(),
259            level,
260        };
261
262        if let Err(e) = self.send_json(&msg).await {
263            log::error!("Failed to send subscribe message: {e}");
264        }
265    }
266
267    async fn send_unsubscribe(&self, request_id: i64, symbol: &str) {
268        let msg = AxMdUnsubscribe {
269            request_id,
270            msg_type: "unsubscribe".to_string(),
271            symbol: symbol.to_string(),
272        };
273
274        if let Err(e) = self.send_json(&msg).await {
275            log::error!("Failed to send unsubscribe message: {e}");
276        }
277    }
278
279    async fn send_subscribe_candles(&self, request_id: i64, symbol: &str, width: AxCandleWidth) {
280        let msg = AxMdSubscribeCandles {
281            request_id,
282            msg_type: "subscribe_candles".to_string(),
283            symbol: symbol.to_string(),
284            width,
285        };
286
287        if let Err(e) = self.send_json(&msg).await {
288            log::error!("Failed to send subscribe_candles message: {e}");
289        }
290    }
291
292    async fn send_unsubscribe_candles(&self, request_id: i64, symbol: &str, width: AxCandleWidth) {
293        let msg = AxMdUnsubscribeCandles {
294            request_id,
295            msg_type: "unsubscribe_candles".to_string(),
296            symbol: symbol.to_string(),
297            width,
298        };
299
300        if let Err(e) = self.send_json(&msg).await {
301            log::error!("Failed to send unsubscribe_candles message: {e}");
302        }
303    }
304
305    async fn send_json<T: serde::Serialize>(&self, msg: &T) -> Result<(), String> {
306        let Some(client) = &self.client else {
307            return Err("No WebSocket client available".to_string());
308        };
309
310        let payload = serde_json::to_string(msg).map_err(|e| e.to_string())?;
311        log::trace!("Sending: {payload}");
312
313        client
314            .send_text(payload, None)
315            .await
316            .map_err(|e| e.to_string())
317    }
318
319    fn parse_raw_message(&mut self, msg: Message) -> Option<Vec<NautilusWsMessage>> {
320        match msg {
321            Message::Text(text) => {
322                if text == nautilus_network::RECONNECTED {
323                    log::info!("Received WebSocket reconnected signal");
324                    return Some(vec![NautilusWsMessage::Reconnected]);
325                }
326
327                log::trace!("Raw websocket message: {text}");
328
329                let value: serde_json::Value = match serde_json::from_str(&text) {
330                    Ok(v) => v,
331                    Err(e) => {
332                        log::error!("Failed to parse WebSocket message: {e}: {text}");
333                        return None;
334                    }
335                };
336
337                self.classify_and_parse_message(value)
338            }
339            Message::Binary(data) => {
340                log::debug!("Received binary message with {} bytes", data.len());
341                None
342            }
343            Message::Close(_) => {
344                log::debug!("Received close message, waiting for reconnection");
345                None
346            }
347            _ => None,
348        }
349    }
350
351    fn classify_and_parse_message(
352        &self,
353        value: serde_json::Value,
354    ) -> Option<Vec<NautilusWsMessage>> {
355        let obj = value.as_object()?;
356
357        // Check message type field "t"
358        let msg_type = obj.get("t").and_then(|v| v.as_str())?;
359
360        match msg_type {
361            "h" => match serde_json::from_value::<AxMdHeartbeat>(value) {
362                Ok(heartbeat) => {
363                    log::trace!("Received heartbeat ts={}", heartbeat.ts);
364                    Some(vec![NautilusWsMessage::Heartbeat])
365                }
366                Err(e) => {
367                    log::error!("Failed to parse heartbeat: {e}");
368                    None
369                }
370            },
371            "s" | "t" => {
372                // Both "s" (with direction) and "t" are trade messages
373                match serde_json::from_value::<AxMdTrade>(value) {
374                    Ok(trade) => {
375                        log::debug!("Received trade: {} {} @ {}", trade.s, trade.q, trade.p);
376
377                        let Some(instrument) = self.instruments.get(&trade.s) else {
378                            log::error!(
379                                "No instrument cached for symbol '{}' - cannot parse trade",
380                                trade.s
381                            );
382                            return None;
383                        };
384
385                        let ts_init = self.generate_ts_init();
386                        match parse_trade_tick(&trade, instrument, ts_init) {
387                            Ok(tick) => {
388                                Some(vec![NautilusWsMessage::Data(vec![Data::Trade(tick)])])
389                            }
390                            Err(e) => {
391                                log::error!("Failed to parse trade to TradeTick: {e}");
392                                None
393                            }
394                        }
395                    }
396                    Err(e) => {
397                        log::error!("Failed to parse trade: {e}");
398                        None
399                    }
400                }
401            }
402            "c" => match serde_json::from_value::<AxMdCandle>(value) {
403                Ok(candle) => {
404                    log::debug!(
405                        "Received candle: {} {} O={} C={}",
406                        candle.symbol,
407                        candle.width,
408                        candle.open,
409                        candle.close
410                    );
411
412                    let Some(instrument) = self.instruments.get(&candle.symbol) else {
413                        log::error!(
414                            "No instrument cached for symbol '{}' - cannot parse candle",
415                            candle.symbol
416                        );
417                        return None;
418                    };
419
420                    let ts_init = self.generate_ts_init();
421                    match parse_candle_bar(&candle, instrument, ts_init) {
422                        Ok(bar) => Some(vec![NautilusWsMessage::Bar(bar)]),
423                        Err(e) => {
424                            log::error!("Failed to parse candle to Bar: {e}");
425                            None
426                        }
427                    }
428                }
429                Err(e) => {
430                    log::error!("Failed to parse candle: {e}");
431                    None
432                }
433            },
434            "1" => match serde_json::from_value::<AxMdBookL1>(value) {
435                Ok(book) => {
436                    log::debug!("Received book L1: {}", book.s);
437
438                    let Some(instrument) = self.instruments.get(&book.s) else {
439                        log::error!(
440                            "No instrument cached for symbol '{}' - cannot parse L1 book",
441                            book.s
442                        );
443                        return None;
444                    };
445
446                    let ts_init = self.generate_ts_init();
447                    match parse_book_l1_quote(&book, instrument, ts_init) {
448                        Ok(quote) => Some(vec![NautilusWsMessage::Data(vec![Data::Quote(quote)])]),
449                        Err(e) => {
450                            log::error!("Failed to parse L1 to QuoteTick: {e}");
451                            None
452                        }
453                    }
454                }
455                Err(e) => {
456                    log::error!("Failed to parse book L1: {e}");
457                    None
458                }
459            },
460            "2" => match serde_json::from_value::<AxMdBookL2>(value) {
461                Ok(book) => {
462                    log::debug!(
463                        "Received book L2: {} ({} bids, {} asks)",
464                        book.s,
465                        book.b.len(),
466                        book.a.len()
467                    );
468
469                    let Some(instrument) = self.instruments.get(&book.s) else {
470                        log::error!(
471                            "No instrument cached for symbol '{}' - cannot parse L2 book",
472                            book.s
473                        );
474                        return None;
475                    };
476
477                    let ts_init = self.generate_ts_init();
478                    match parse_book_l2_deltas(&book, instrument, ts_init) {
479                        Ok(deltas) => Some(vec![NautilusWsMessage::Deltas(deltas)]),
480                        Err(e) => {
481                            log::error!("Failed to parse L2 to OrderBookDeltas: {e}");
482                            None
483                        }
484                    }
485                }
486                Err(e) => {
487                    log::error!("Failed to parse book L2: {e}");
488                    None
489                }
490            },
491            "3" => match serde_json::from_value::<AxMdBookL3>(value) {
492                Ok(book) => {
493                    log::debug!(
494                        "Received book L3: {} ({} bids, {} asks)",
495                        book.s,
496                        book.b.len(),
497                        book.a.len()
498                    );
499
500                    let Some(instrument) = self.instruments.get(&book.s) else {
501                        log::error!(
502                            "No instrument cached for symbol '{}' - cannot parse L3 book",
503                            book.s
504                        );
505                        return None;
506                    };
507
508                    let ts_init = self.generate_ts_init();
509                    match parse_book_l3_deltas(&book, instrument, ts_init) {
510                        Ok(deltas) => Some(vec![NautilusWsMessage::Deltas(deltas)]),
511                        Err(e) => {
512                            log::error!("Failed to parse L3 to OrderBookDeltas: {e}");
513                            None
514                        }
515                    }
516                }
517                Err(e) => {
518                    log::error!("Failed to parse book L3: {e}");
519                    None
520                }
521            },
522            _ => {
523                log::warn!("Unknown message type: {msg_type}");
524                Some(vec![NautilusWsMessage::Error(AxWsError::new(format!(
525                    "Unknown message type: {msg_type}"
526                )))])
527            }
528        }
529    }
530}