nautilus_kraken/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Kraken.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use nautilus_core::{AtomicTime, UnixNanos, time::get_atomic_clock_realtime};
25use nautilus_model::{
26    data::Data,
27    instruments::{Instrument, InstrumentAny},
28};
29use nautilus_network::websocket::WebSocketClient;
30use serde_json::Value;
31use tokio_tungstenite::tungstenite::Message;
32use ustr::Ustr;
33
34use super::{
35    enums::KrakenWsChannel,
36    messages::{
37        KrakenWsBookData, KrakenWsMessage, KrakenWsResponse, KrakenWsTickerData, KrakenWsTradeData,
38        NautilusWsMessage,
39    },
40    parse::{parse_book_deltas, parse_quote_tick, parse_trade_tick},
41};
42
43/// Commands sent from the outer client to the inner message handler.
44#[derive(Debug)]
45#[allow(
46    clippy::large_enum_variant,
47    reason = "Commands are ephemeral and immediately consumed"
48)]
49pub enum HandlerCommand {
50    /// Set the WebSocketClient for the handler to use.
51    SetClient(WebSocketClient),
52    /// Disconnect the WebSocket connection.
53    Disconnect,
54    /// Send text payload to the WebSocket.
55    SendText { payload: String },
56    /// Initialize the instruments cache with the given instruments.
57    InitializeInstruments(Vec<InstrumentAny>),
58    /// Update a single instrument in the cache.
59    UpdateInstrument(InstrumentAny),
60}
61
62/// WebSocket message handler for Kraken.
63pub(super) struct FeedHandler {
64    clock: &'static AtomicTime,
65    signal: Arc<AtomicBool>,
66    client: Option<WebSocketClient>,
67    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
68    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
69    instruments_cache: AHashMap<Ustr, InstrumentAny>,
70    book_sequence: u64,
71}
72
73impl FeedHandler {
74    /// Creates a new [`FeedHandler`] instance.
75    pub(super) fn new(
76        signal: Arc<AtomicBool>,
77        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
78        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
79    ) -> Self {
80        Self {
81            clock: get_atomic_clock_realtime(),
82            signal,
83            client: None,
84            cmd_rx,
85            raw_rx,
86            instruments_cache: AHashMap::new(),
87            book_sequence: 0,
88        }
89    }
90
91    pub(super) fn is_stopped(&self) -> bool {
92        self.signal.load(Ordering::Relaxed)
93    }
94
95    fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
96        self.instruments_cache.get(symbol).cloned()
97    }
98
99    /// Processes messages and commands, returning when stopped or stream ends.
100    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
101        loop {
102            tokio::select! {
103                Some(cmd) = self.cmd_rx.recv() => {
104                    match cmd {
105                        HandlerCommand::SetClient(client) => {
106                            tracing::debug!("WebSocketClient received by handler");
107                            self.client = Some(client);
108                        }
109                        HandlerCommand::Disconnect => {
110                            tracing::debug!("Disconnect command received");
111                            if let Some(client) = self.client.take() {
112                                client.disconnect().await;
113                            }
114                        }
115                        HandlerCommand::SendText { payload } => {
116                            if let Some(client) = &self.client
117                                && let Err(e) = client.send_text(payload.clone(), None).await
118                            {
119                                tracing::error!(error = %e, "Failed to send text");
120                            }
121                        }
122                        HandlerCommand::InitializeInstruments(instruments) => {
123                            for inst in instruments {
124                                self.instruments_cache.insert(inst.symbol().inner(), inst);
125                            }
126                        }
127                        HandlerCommand::UpdateInstrument(inst) => {
128                            self.instruments_cache.insert(inst.symbol().inner(), inst);
129                        }
130                    }
131                    continue;
132                }
133
134                msg = self.raw_rx.recv() => {
135                    let msg = match msg {
136                        Some(msg) => msg,
137                        None => {
138                            tracing::debug!("WebSocket stream closed");
139                            return None;
140                        }
141                    };
142
143                    if let Message::Ping(data) = &msg {
144                        tracing::trace!("Received ping frame with {} bytes", data.len());
145                        if let Some(client) = &self.client
146                            && let Err(e) = client.send_pong(data.to_vec()).await
147                        {
148                            tracing::warn!(error = %e, "Failed to send pong frame");
149                        }
150                        continue;
151                    }
152
153                    if self.signal.load(Ordering::Relaxed) {
154                        tracing::debug!("Stop signal received");
155                        return None;
156                    }
157
158                    let text = match msg {
159                        Message::Text(text) => text.to_string(),
160                        Message::Binary(data) => {
161                            match String::from_utf8(data.to_vec()) {
162                                Ok(text) => text,
163                                Err(e) => {
164                                    tracing::warn!("Failed to decode binary message: {e}");
165                                    continue;
166                                }
167                            }
168                        }
169                        Message::Pong(_) => {
170                            tracing::trace!("Received pong");
171                            continue;
172                        }
173                        Message::Close(_) => {
174                            tracing::info!("WebSocket connection closed");
175                            return None;
176                        }
177                        Message::Frame(_) => {
178                            tracing::trace!("Received raw frame");
179                            continue;
180                        }
181                        _ => continue,
182                    };
183
184                    let ts_init = self.clock.get_time_ns();
185
186                    if let Some(nautilus_msg) = self.parse_message(&text, ts_init) {
187                        return Some(nautilus_msg);
188                    }
189
190                    continue;
191                }
192            }
193        }
194    }
195
196    fn parse_message(&mut self, text: &str, ts_init: UnixNanos) -> Option<NautilusWsMessage> {
197        // Try to parse as a data message first
198        if let Ok(msg) = serde_json::from_str::<KrakenWsMessage>(text) {
199            return self.handle_data_message(msg, ts_init);
200        }
201
202        // Check for control messages (heartbeat, status, subscription responses)
203        if let Ok(value) = serde_json::from_str::<Value>(text) {
204            if value.get("channel").and_then(|v| v.as_str()) == Some("heartbeat") {
205                tracing::trace!("Received heartbeat");
206                return None;
207            }
208
209            if value.get("channel").and_then(|v| v.as_str()) == Some("status") {
210                tracing::debug!("Received status message");
211                return None;
212            }
213
214            if value.get("method").is_some() {
215                if let Ok(response) = serde_json::from_value::<KrakenWsResponse>(value) {
216                    match response {
217                        KrakenWsResponse::Subscribe(sub) => {
218                            if sub.success {
219                                if let Some(result) = &sub.result {
220                                    tracing::debug!(
221                                        channel = ?result.channel,
222                                        req_id = ?sub.req_id,
223                                        "Subscription confirmed"
224                                    );
225                                } else {
226                                    tracing::debug!(req_id = ?sub.req_id, "Subscription confirmed");
227                                }
228                            } else {
229                                tracing::warn!(
230                                    error = ?sub.error,
231                                    req_id = ?sub.req_id,
232                                    "Subscription failed"
233                                );
234                            }
235                        }
236                        KrakenWsResponse::Unsubscribe(unsub) => {
237                            if unsub.success {
238                                tracing::debug!(req_id = ?unsub.req_id, "Unsubscription confirmed");
239                            } else {
240                                tracing::warn!(
241                                    error = ?unsub.error,
242                                    req_id = ?unsub.req_id,
243                                    "Unsubscription failed"
244                                );
245                            }
246                        }
247                        KrakenWsResponse::Pong(pong) => {
248                            tracing::trace!(req_id = ?pong.req_id, "Received pong");
249                        }
250                        KrakenWsResponse::Other => {
251                            tracing::debug!("Received unknown subscription response");
252                        }
253                    }
254                } else {
255                    tracing::debug!("Received subscription response (failed to parse details)");
256                }
257                return None;
258            }
259        }
260
261        tracing::warn!("Failed to parse message: {text}");
262        None
263    }
264
265    fn handle_data_message(
266        &mut self,
267        msg: KrakenWsMessage,
268        ts_init: UnixNanos,
269    ) -> Option<NautilusWsMessage> {
270        match msg.channel {
271            KrakenWsChannel::Book => self.handle_book_message(msg, ts_init),
272            KrakenWsChannel::Ticker => self.handle_ticker_message(msg, ts_init),
273            KrakenWsChannel::Trade => self.handle_trade_message(msg, ts_init),
274            KrakenWsChannel::Ohlc => self.handle_ohlc_message(msg, ts_init),
275            _ => {
276                tracing::warn!("Unhandled channel: {:?}", msg.channel);
277                None
278            }
279        }
280    }
281
282    fn handle_book_message(
283        &mut self,
284        msg: KrakenWsMessage,
285        ts_init: UnixNanos,
286    ) -> Option<NautilusWsMessage> {
287        let mut all_deltas = Vec::new();
288        let mut instrument_id = None;
289
290        for data in msg.data {
291            match serde_json::from_value::<KrakenWsBookData>(data) {
292                Ok(book_data) => {
293                    let instrument = self.get_instrument(&book_data.symbol)?;
294                    instrument_id = Some(instrument.id());
295
296                    match parse_book_deltas(&book_data, &instrument, self.book_sequence, ts_init) {
297                        Ok(mut deltas) => {
298                            self.book_sequence += deltas.len() as u64;
299                            all_deltas.append(&mut deltas);
300                        }
301                        Err(e) => {
302                            tracing::error!("Failed to parse book deltas: {e}");
303                        }
304                    }
305                }
306                Err(e) => {
307                    tracing::error!("Failed to deserialize book data: {e}");
308                }
309            }
310        }
311
312        if all_deltas.is_empty() {
313            None
314        } else {
315            use nautilus_model::data::OrderBookDeltas;
316            let deltas = OrderBookDeltas::new(instrument_id?, all_deltas);
317            Some(NautilusWsMessage::Deltas(deltas))
318        }
319    }
320
321    fn handle_ticker_message(
322        &self,
323        msg: KrakenWsMessage,
324        ts_init: UnixNanos,
325    ) -> Option<NautilusWsMessage> {
326        let mut quotes = Vec::new();
327
328        for data in msg.data {
329            match serde_json::from_value::<KrakenWsTickerData>(data) {
330                Ok(ticker_data) => {
331                    let instrument = self.get_instrument(&ticker_data.symbol)?;
332
333                    match parse_quote_tick(&ticker_data, &instrument, ts_init) {
334                        Ok(quote) => quotes.push(Data::Quote(quote)),
335                        Err(e) => {
336                            tracing::error!("Failed to parse quote tick: {e}");
337                        }
338                    }
339                }
340                Err(e) => {
341                    tracing::error!("Failed to deserialize ticker data: {e}");
342                }
343            }
344        }
345
346        if quotes.is_empty() {
347            None
348        } else {
349            Some(NautilusWsMessage::Data(quotes))
350        }
351    }
352
353    fn handle_trade_message(
354        &self,
355        msg: KrakenWsMessage,
356        ts_init: UnixNanos,
357    ) -> Option<NautilusWsMessage> {
358        let mut trades = Vec::new();
359
360        for data in msg.data {
361            match serde_json::from_value::<KrakenWsTradeData>(data) {
362                Ok(trade_data) => {
363                    let instrument = self.get_instrument(&trade_data.symbol)?;
364
365                    match parse_trade_tick(&trade_data, &instrument, ts_init) {
366                        Ok(trade) => trades.push(Data::Trade(trade)),
367                        Err(e) => {
368                            tracing::error!("Failed to parse trade tick: {e}");
369                        }
370                    }
371                }
372                Err(e) => {
373                    tracing::error!("Failed to deserialize trade data: {e}");
374                }
375            }
376        }
377
378        if trades.is_empty() {
379            None
380        } else {
381            Some(NautilusWsMessage::Data(trades))
382        }
383    }
384
385    fn handle_ohlc_message(
386        &self,
387        _msg: KrakenWsMessage,
388        _ts_init: UnixNanos,
389    ) -> Option<NautilusWsMessage> {
390        // OHLC/Bar parsing not yet implemented in parse.rs
391        tracing::debug!("OHLC message received but parsing not yet implemented");
392        None
393    }
394}