Skip to main content

nautilus_kraken/websocket/spot_v2/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Kraken Spot v2.
17
18use std::{
19    collections::VecDeque,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use nautilus_core::{AtomicTime, UUID4, UnixNanos, time::get_atomic_clock_realtime};
28use nautilus_model::{
29    data::{Bar, Data, OrderBookDeltas},
30    events::{OrderAccepted, OrderCanceled, OrderExpired, OrderRejected, OrderUpdated},
31    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
32    instruments::{Instrument, InstrumentAny},
33    types::Quantity,
34};
35use nautilus_network::{
36    RECONNECTED,
37    websocket::{SubscriptionState, WebSocketClient},
38};
39use serde_json::Value;
40use tokio_tungstenite::tungstenite::Message;
41use ustr::Ustr;
42
43use super::{
44    enums::{KrakenExecType, KrakenWsChannel},
45    messages::{
46        KrakenWsBookData, KrakenWsExecutionData, KrakenWsMessage, KrakenWsOhlcData,
47        KrakenWsResponse, KrakenWsTickerData, KrakenWsTradeData, NautilusWsMessage,
48    },
49    parse::{
50        parse_book_deltas, parse_quote_tick, parse_trade_tick, parse_ws_bar, parse_ws_fill_report,
51        parse_ws_order_status_report,
52    },
53};
54
55/// Cached information about a client order needed for event generation.
56#[derive(Debug, Clone)]
57struct CachedOrderInfo {
58    instrument_id: InstrumentId,
59    trader_id: TraderId,
60    strategy_id: StrategyId,
61}
62
63/// Commands sent from the outer client to the inner message handler.
64#[derive(Debug)]
65#[allow(
66    clippy::large_enum_variant,
67    reason = "Commands are ephemeral and immediately consumed"
68)]
69pub enum SpotHandlerCommand {
70    SetClient(WebSocketClient),
71    Disconnect,
72    SendText {
73        payload: String,
74    },
75    InitializeInstruments(Vec<InstrumentAny>),
76    UpdateInstrument(InstrumentAny),
77    SetAccountId(AccountId),
78    CacheClientOrder {
79        client_order_id: ClientOrderId,
80        instrument_id: InstrumentId,
81        trader_id: TraderId,
82        strategy_id: StrategyId,
83    },
84}
85
86/// Key for buffering OHLC bars: (symbol, interval).
87type OhlcBufferKey = (Ustr, u32);
88
89/// Buffered OHLC bar with its interval start time for period detection.
90type OhlcBufferEntry = (Bar, UnixNanos);
91
92/// WebSocket message handler for Kraken.
93pub(super) struct SpotFeedHandler {
94    clock: &'static AtomicTime,
95    signal: Arc<AtomicBool>,
96    client: Option<WebSocketClient>,
97    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<SpotHandlerCommand>,
98    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
99    subscriptions: SubscriptionState,
100    instruments_cache: AHashMap<Ustr, InstrumentAny>,
101    client_order_cache: AHashMap<ClientOrderId, CachedOrderInfo>,
102    order_qty_cache: AHashMap<VenueOrderId, f64>,
103    book_sequence: u64,
104    pending_messages: VecDeque<NautilusWsMessage>,
105    account_id: Option<AccountId>,
106    ohlc_buffer: AHashMap<OhlcBufferKey, OhlcBufferEntry>,
107}
108
109impl SpotFeedHandler {
110    /// Creates a new [`SpotFeedHandler`] instance.
111    pub(super) fn new(
112        signal: Arc<AtomicBool>,
113        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<SpotHandlerCommand>,
114        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
115        subscriptions: SubscriptionState,
116    ) -> Self {
117        Self {
118            clock: get_atomic_clock_realtime(),
119            signal,
120            client: None,
121            cmd_rx,
122            raw_rx,
123            subscriptions,
124            instruments_cache: AHashMap::new(),
125            client_order_cache: AHashMap::new(),
126            order_qty_cache: AHashMap::new(),
127            book_sequence: 0,
128            pending_messages: VecDeque::new(),
129            account_id: None,
130            ohlc_buffer: AHashMap::new(),
131        }
132    }
133
134    pub(super) fn is_stopped(&self) -> bool {
135        self.signal.load(Ordering::Relaxed)
136    }
137
138    /// Checks if a topic is active (confirmed or pending subscribe).
139    fn is_subscribed(&self, topic: &str) -> bool {
140        self.subscriptions.all_topics().iter().any(|t| t == topic)
141    }
142
143    fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
144        self.instruments_cache.get(symbol).cloned()
145    }
146
147    /// Flushes all buffered OHLC bars to pending messages.
148    ///
149    /// Called when the stream ends to ensure the last bar for each symbol/interval
150    /// is not lost.
151    fn flush_ohlc_buffer(&mut self) {
152        if self.ohlc_buffer.is_empty() {
153            return;
154        }
155
156        let bars: Vec<Data> = self
157            .ohlc_buffer
158            .drain()
159            .map(|(_, (bar, _))| Data::Bar(bar))
160            .collect();
161
162        if !bars.is_empty() {
163            log::debug!("Flushing {} buffered OHLC bars on stream end", bars.len());
164            self.pending_messages
165                .push_back(NautilusWsMessage::Data(bars));
166        }
167    }
168
169    /// Processes messages and commands, returning when stopped or stream ends.
170    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
171        // Check for pending messages first (e.g., from multi-message scenarios like trades)
172        if let Some(msg) = self.pending_messages.pop_front() {
173            return Some(msg);
174        }
175
176        loop {
177            tokio::select! {
178                Some(cmd) = self.cmd_rx.recv() => {
179                    match cmd {
180                        SpotHandlerCommand::SetClient(client) => {
181                            log::debug!("WebSocketClient received by handler");
182                            self.client = Some(client);
183                        }
184                        SpotHandlerCommand::Disconnect => {
185                            log::debug!("Disconnect command received");
186                            if let Some(client) = self.client.take() {
187                                client.disconnect().await;
188                            }
189                        }
190                        SpotHandlerCommand::SendText { payload } => {
191                            if let Some(client) = &self.client
192                                && let Err(e) = client.send_text(payload.clone(), None).await
193                            {
194                                log::error!("Failed to send text: {e}");
195                            }
196                        }
197                        SpotHandlerCommand::InitializeInstruments(instruments) => {
198                            for inst in instruments {
199                                // Cache by symbol (ISO 4217-A3 format like "ETH/USD")
200                                // which matches what v2 WebSocket messages use
201                                self.instruments_cache.insert(inst.symbol().inner(), inst);
202                            }
203                        }
204                        SpotHandlerCommand::UpdateInstrument(inst) => {
205                            self.instruments_cache.insert(inst.symbol().inner(), inst);
206                        }
207                        SpotHandlerCommand::SetAccountId(account_id) => {
208                            log::debug!("Account ID set for execution reports: {account_id}");
209                            self.account_id = Some(account_id);
210                        }
211                        SpotHandlerCommand::CacheClientOrder {
212                            client_order_id,
213                            instrument_id,
214                            trader_id,
215                            strategy_id,
216                        } => {
217                            log::debug!(
218                                "Cached client order info: \
219                                client_order_id={client_order_id}, instrument_id={instrument_id}"
220                            );
221                            self.client_order_cache.insert(
222                                client_order_id,
223                                CachedOrderInfo {
224                                    instrument_id,
225                                    trader_id,
226                                    strategy_id,
227                                },
228                            );
229                        }
230                    }
231                    continue;
232                }
233
234                msg = self.raw_rx.recv() => {
235                    let msg = match msg {
236                        Some(msg) => msg,
237                        None => {
238                            log::debug!("WebSocket stream closed");
239                            self.flush_ohlc_buffer();
240                            return self.pending_messages.pop_front();
241                        }
242                    };
243
244                    if let Message::Ping(data) = &msg {
245                        log::trace!("Received ping frame with {} bytes", data.len());
246                        if let Some(client) = &self.client
247                            && let Err(e) = client.send_pong(data.to_vec()).await
248                        {
249                            log::warn!("Failed to send pong frame: {e}");
250                        }
251                        continue;
252                    }
253
254                    if self.signal.load(Ordering::Relaxed) {
255                        log::debug!("Stop signal received");
256                        self.flush_ohlc_buffer();
257                        return self.pending_messages.pop_front();
258                    }
259
260                    let text = match msg {
261                        Message::Text(text) => text.to_string(),
262                        Message::Binary(data) => {
263                            match String::from_utf8(data.to_vec()) {
264                                Ok(text) => text,
265                                Err(e) => {
266                                    log::warn!("Failed to decode binary message: {e}");
267                                    continue;
268                                }
269                            }
270                        }
271                        Message::Pong(_) => {
272                            log::trace!("Received pong");
273                            continue;
274                        }
275                        Message::Close(_) => {
276                            log::info!("WebSocket connection closed");
277                            self.flush_ohlc_buffer();
278                            return self.pending_messages.pop_front();
279                        }
280                        Message::Frame(_) => {
281                            log::trace!("Received raw frame");
282                            continue;
283                        }
284                        _ => continue,
285                    };
286
287                    if text == RECONNECTED {
288                        log::info!("Received WebSocket reconnected signal");
289                        return Some(NautilusWsMessage::Reconnected);
290                    }
291
292                    let ts_init = self.clock.get_time_ns();
293
294                    if let Some(nautilus_msg) = self.parse_message(&text, ts_init) {
295                        return Some(nautilus_msg);
296                    }
297
298                    continue;
299                }
300            }
301        }
302    }
303
304    fn parse_message(&mut self, text: &str, ts_init: UnixNanos) -> Option<NautilusWsMessage> {
305        // Fast pre-filter for high-frequency control messages (no JSON parsing)
306        // Heartbeats and status messages are short and share common prefix
307        if text.len() < 50 && text.starts_with("{\"channel\":\"") {
308            if text.contains("heartbeat") {
309                log::trace!("Received heartbeat");
310                return None;
311            }
312            if text.contains("status") {
313                log::debug!("Received status message");
314                return None;
315            }
316        }
317
318        let value: Value = match serde_json::from_str(text) {
319            Ok(v) => v,
320            Err(e) => {
321                log::warn!("Failed to parse message: {e}");
322                return None;
323            }
324        };
325
326        // Control messages have "method" field
327        if value.get("method").is_some() {
328            self.handle_control_message(value);
329            return None;
330        }
331
332        // Data messages have "channel" and "data" fields
333        if value.get("channel").is_some() && value.get("data").is_some() {
334            match serde_json::from_value::<KrakenWsMessage>(value) {
335                Ok(msg) => return self.handle_data_message(msg, ts_init),
336                Err(e) => {
337                    log::debug!("Failed to parse data message: {e}");
338                    return None;
339                }
340            }
341        }
342
343        log::debug!("Unhandled message structure: {text}");
344        None
345    }
346
347    fn handle_control_message(&self, value: Value) {
348        match serde_json::from_value::<KrakenWsResponse>(value) {
349            Ok(response) => match response {
350                KrakenWsResponse::Subscribe(sub) => {
351                    if sub.success {
352                        if let Some(result) = &sub.result {
353                            log::debug!(
354                                "Subscription confirmed: channel={:?}, req_id={:?}",
355                                result.channel,
356                                sub.req_id
357                            );
358                        } else {
359                            log::debug!("Subscription confirmed: req_id={:?}", sub.req_id);
360                        }
361                    } else {
362                        log::warn!(
363                            "Subscription failed: error={:?}, req_id={:?}",
364                            sub.error,
365                            sub.req_id
366                        );
367                    }
368                }
369                KrakenWsResponse::Unsubscribe(unsub) => {
370                    if unsub.success {
371                        log::debug!("Unsubscription confirmed: req_id={:?}", unsub.req_id);
372                    } else {
373                        log::warn!(
374                            "Unsubscription failed: error={:?}, req_id={:?}",
375                            unsub.error,
376                            unsub.req_id
377                        );
378                    }
379                }
380                KrakenWsResponse::Pong(pong) => {
381                    log::trace!("Received pong: req_id={:?}", pong.req_id);
382                }
383                KrakenWsResponse::Other => {
384                    log::debug!("Received unknown control response");
385                }
386            },
387            Err(_) => {
388                log::debug!("Received control message (failed to parse details)");
389            }
390        }
391    }
392
393    fn handle_data_message(
394        &mut self,
395        msg: KrakenWsMessage,
396        ts_init: UnixNanos,
397    ) -> Option<NautilusWsMessage> {
398        match msg.channel {
399            KrakenWsChannel::Book => self.handle_book_message(msg, ts_init),
400            KrakenWsChannel::Ticker => self.handle_ticker_message(msg, ts_init),
401            KrakenWsChannel::Trade => self.handle_trade_message(msg, ts_init),
402            KrakenWsChannel::Ohlc => self.handle_ohlc_message(msg, ts_init),
403            KrakenWsChannel::Executions => self.handle_executions_message(msg, ts_init),
404            _ => {
405                log::warn!("Unhandled channel: {:?}", msg.channel);
406                None
407            }
408        }
409    }
410
411    fn handle_book_message(
412        &mut self,
413        msg: KrakenWsMessage,
414        ts_init: UnixNanos,
415    ) -> Option<NautilusWsMessage> {
416        let mut all_deltas = Vec::new();
417        let mut instrument_id = None;
418
419        for data in msg.data {
420            match serde_json::from_value::<KrakenWsBookData>(data) {
421                Ok(book_data) => {
422                    let symbol = &book_data.symbol;
423
424                    if !self.is_subscribed(&format!("book:{symbol}")) {
425                        continue;
426                    }
427
428                    let instrument = self.get_instrument(symbol)?;
429                    instrument_id = Some(instrument.id());
430
431                    match parse_book_deltas(&book_data, &instrument, self.book_sequence, ts_init) {
432                        Ok(mut deltas) => {
433                            self.book_sequence += deltas.len() as u64;
434                            all_deltas.append(&mut deltas);
435                        }
436                        Err(e) => {
437                            log::error!("Failed to parse book deltas: {e}");
438                        }
439                    }
440                }
441                Err(e) => {
442                    log::error!("Failed to deserialize book data: {e}");
443                }
444            }
445        }
446
447        if all_deltas.is_empty() {
448            None
449        } else {
450            let deltas = OrderBookDeltas::new(instrument_id?, all_deltas);
451            Some(NautilusWsMessage::Deltas(deltas))
452        }
453    }
454
455    fn handle_ticker_message(
456        &self,
457        msg: KrakenWsMessage,
458        ts_init: UnixNanos,
459    ) -> Option<NautilusWsMessage> {
460        let mut quotes = Vec::new();
461
462        for data in msg.data {
463            match serde_json::from_value::<KrakenWsTickerData>(data) {
464                Ok(ticker_data) => {
465                    let symbol = &ticker_data.symbol;
466
467                    // Accept both quotes:{symbol} (BBO via subscribe_quotes) and
468                    // ticker:{symbol} (raw ticker via subscribe API).
469                    let quotes_key = format!("quotes:{symbol}");
470                    let ticker_key = format!("ticker:{symbol}");
471                    if !self.is_subscribed(&quotes_key) && !self.is_subscribed(&ticker_key) {
472                        continue;
473                    }
474
475                    let instrument = self.get_instrument(symbol)?;
476
477                    match parse_quote_tick(&ticker_data, &instrument, ts_init) {
478                        Ok(quote) => quotes.push(Data::Quote(quote)),
479                        Err(e) => {
480                            log::error!("Failed to parse quote tick: {e}");
481                        }
482                    }
483                }
484                Err(e) => {
485                    log::error!("Failed to deserialize ticker data: {e}");
486                }
487            }
488        }
489
490        if quotes.is_empty() {
491            None
492        } else {
493            Some(NautilusWsMessage::Data(quotes))
494        }
495    }
496
497    fn handle_trade_message(
498        &self,
499        msg: KrakenWsMessage,
500        ts_init: UnixNanos,
501    ) -> Option<NautilusWsMessage> {
502        let mut trades = Vec::new();
503
504        for data in msg.data {
505            match serde_json::from_value::<KrakenWsTradeData>(data) {
506                Ok(trade_data) => {
507                    let instrument = self.get_instrument(&trade_data.symbol)?;
508
509                    match parse_trade_tick(&trade_data, &instrument, ts_init) {
510                        Ok(trade) => trades.push(Data::Trade(trade)),
511                        Err(e) => {
512                            log::error!("Failed to parse trade tick: {e}");
513                        }
514                    }
515                }
516                Err(e) => {
517                    log::error!("Failed to deserialize trade data: {e}");
518                }
519            }
520        }
521
522        if trades.is_empty() {
523            None
524        } else {
525            Some(NautilusWsMessage::Data(trades))
526        }
527    }
528
529    fn handle_ohlc_message(
530        &mut self,
531        msg: KrakenWsMessage,
532        ts_init: UnixNanos,
533    ) -> Option<NautilusWsMessage> {
534        let mut closed_bars = Vec::new();
535
536        for data in msg.data {
537            match serde_json::from_value::<KrakenWsOhlcData>(data) {
538                Ok(ohlc_data) => {
539                    let instrument = self.get_instrument(&ohlc_data.symbol)?;
540
541                    match parse_ws_bar(&ohlc_data, &instrument, ts_init) {
542                        Ok(new_bar) => {
543                            let key = (ohlc_data.symbol, ohlc_data.interval);
544                            let new_interval_begin = UnixNanos::from(
545                                ohlc_data.interval_begin.timestamp_nanos_opt().unwrap_or(0) as u64,
546                            );
547
548                            // Check if we have a buffered bar for this symbol/interval
549                            if let Some((buffered_bar, buffered_interval_begin)) =
550                                self.ohlc_buffer.get(&key)
551                            {
552                                // If interval_begin changed, the buffered bar is closed
553                                if new_interval_begin != *buffered_interval_begin {
554                                    closed_bars.push(Data::Bar(*buffered_bar));
555                                }
556                            }
557
558                            // Update buffer with the new (potentially incomplete) bar
559                            self.ohlc_buffer.insert(key, (new_bar, new_interval_begin));
560                        }
561                        Err(e) => {
562                            log::error!("Failed to parse bar: {e}");
563                        }
564                    }
565                }
566                Err(e) => {
567                    log::error!("Failed to deserialize OHLC data: {e}");
568                }
569            }
570        }
571
572        if closed_bars.is_empty() {
573            None
574        } else {
575            Some(NautilusWsMessage::Data(closed_bars))
576        }
577    }
578
579    fn handle_executions_message(
580        &mut self,
581        msg: KrakenWsMessage,
582        ts_init: UnixNanos,
583    ) -> Option<NautilusWsMessage> {
584        let Some(account_id) = self.account_id else {
585            log::warn!("Cannot process execution message: account_id not set");
586            return None;
587        };
588
589        // Process all executions in batch and queue them (snapshots can have many records)
590        for data in msg.data {
591            match serde_json::from_value::<KrakenWsExecutionData>(data) {
592                Ok(exec_data) => {
593                    log::debug!(
594                        "Received execution message: exec_type={:?}, order_id={}, \
595                        order_status={:?}, order_qty={:?}, cum_qty={:?}, last_qty={:?}",
596                        exec_data.exec_type,
597                        exec_data.order_id,
598                        exec_data.order_status,
599                        exec_data.order_qty,
600                        exec_data.cum_qty,
601                        exec_data.last_qty
602                    );
603
604                    // Cache order_qty for subsequent messages that may not include it
605                    if let Some(qty) = exec_data.order_qty {
606                        self.order_qty_cache
607                            .insert(VenueOrderId::new(&exec_data.order_id), qty);
608                    }
609
610                    // Resolve instrument and cached order info
611                    let (instrument, cached_info) = if let Some(ref symbol) = exec_data.symbol {
612                        let symbol_ustr = Ustr::from(symbol.as_str());
613                        let inst = self.instruments_cache.get(&symbol_ustr).cloned();
614                        if inst.is_none() {
615                            log::warn!(
616                                "No instrument found for symbol: symbol={symbol}, order_id={}",
617                                exec_data.order_id
618                            );
619                        }
620                        let cached = exec_data
621                            .cl_ord_id
622                            .as_ref()
623                            .filter(|id| !id.is_empty())
624                            .and_then(|id| {
625                                self.client_order_cache
626                                    .get(&ClientOrderId::new(id))
627                                    .cloned()
628                            });
629                        (inst, cached)
630                    } else if let Some(ref cl_ord_id) =
631                        exec_data.cl_ord_id.as_ref().filter(|id| !id.is_empty())
632                    {
633                        let cached = self
634                            .client_order_cache
635                            .get(&ClientOrderId::new(cl_ord_id))
636                            .cloned();
637                        let inst = cached.as_ref().and_then(|info| {
638                            self.instruments_cache
639                                .iter()
640                                .find(|(_, inst)| inst.id() == info.instrument_id)
641                                .map(|(_, inst)| inst.clone())
642                        });
643                        (inst, cached)
644                    } else {
645                        (None, None)
646                    };
647
648                    let Some(instrument) = instrument else {
649                        log::debug!(
650                            "Execution missing symbol and order not in cache (external order): \
651                            order_id={}, cl_ord_id={:?}, exec_type={:?}",
652                            exec_data.order_id,
653                            exec_data.cl_ord_id,
654                            exec_data.exec_type
655                        );
656                        continue;
657                    };
658
659                    let cached_order_qty = self
660                        .order_qty_cache
661                        .get(&VenueOrderId::new(&exec_data.order_id))
662                        .copied();
663                    let ts_event = chrono::DateTime::parse_from_rfc3339(&exec_data.timestamp)
664                        .map_or(ts_init, |t| {
665                            UnixNanos::from(t.timestamp_nanos_opt().unwrap_or(0) as u64)
666                        });
667
668                    // Emit proper order events when we have cached info, otherwise fall back
669                    // to OrderStatusReport for external orders or reconciliation
670                    if let Some(ref info) = cached_info {
671                        let client_order_id = exec_data
672                            .cl_ord_id
673                            .as_ref()
674                            .map(ClientOrderId::new)
675                            .expect("cl_ord_id should exist if cached");
676                        let venue_order_id = VenueOrderId::new(&exec_data.order_id);
677
678                        match exec_data.exec_type {
679                            KrakenExecType::PendingNew => {
680                                // Order received and validated - emit accepted
681                                let accepted = OrderAccepted::new(
682                                    info.trader_id,
683                                    info.strategy_id,
684                                    instrument.id(),
685                                    client_order_id,
686                                    venue_order_id,
687                                    account_id,
688                                    UUID4::new(),
689                                    ts_event,
690                                    ts_init,
691                                    false,
692                                );
693                                self.pending_messages
694                                    .push_back(NautilusWsMessage::OrderAccepted(accepted));
695                            }
696                            KrakenExecType::New => {
697                                // Order is now live - already accepted, skip
698                            }
699                            KrakenExecType::Canceled => {
700                                // Check if this is a post-only rejection based on reason
701                                // Kraken sends reason="Post only order" for post-only rejections
702                                let is_post_only_rejection = exec_data
703                                    .reason
704                                    .as_ref()
705                                    .is_some_and(|r| r.eq_ignore_ascii_case("Post only order"));
706
707                                if is_post_only_rejection {
708                                    let reason = exec_data
709                                        .reason
710                                        .as_deref()
711                                        .unwrap_or("Post-only order would have crossed");
712                                    let rejected = OrderRejected::new(
713                                        info.trader_id,
714                                        info.strategy_id,
715                                        instrument.id(),
716                                        client_order_id,
717                                        account_id,
718                                        Ustr::from(reason),
719                                        UUID4::new(),
720                                        ts_event,
721                                        ts_init,
722                                        false,
723                                        true, // due_post_only
724                                    );
725                                    self.pending_messages
726                                        .push_back(NautilusWsMessage::OrderRejected(rejected));
727                                } else {
728                                    let canceled = OrderCanceled::new(
729                                        info.trader_id,
730                                        info.strategy_id,
731                                        instrument.id(),
732                                        client_order_id,
733                                        UUID4::new(),
734                                        ts_event,
735                                        ts_init,
736                                        false,
737                                        Some(venue_order_id),
738                                        Some(account_id),
739                                    );
740                                    self.pending_messages
741                                        .push_back(NautilusWsMessage::OrderCanceled(canceled));
742                                }
743                            }
744                            KrakenExecType::Expired => {
745                                let expired = OrderExpired::new(
746                                    info.trader_id,
747                                    info.strategy_id,
748                                    instrument.id(),
749                                    client_order_id,
750                                    UUID4::new(),
751                                    ts_event,
752                                    ts_init,
753                                    false,
754                                    Some(venue_order_id),
755                                    Some(account_id),
756                                );
757                                self.pending_messages
758                                    .push_back(NautilusWsMessage::OrderExpired(expired));
759                            }
760                            KrakenExecType::Amended | KrakenExecType::Restated => {
761                                // For modifications, emit OrderUpdated
762                                if let Some(order_qty) = exec_data.order_qty.or(cached_order_qty) {
763                                    let updated = OrderUpdated::new(
764                                        info.trader_id,
765                                        info.strategy_id,
766                                        instrument.id(),
767                                        client_order_id,
768                                        Quantity::new(order_qty, instrument.size_precision()),
769                                        UUID4::new(),
770                                        ts_event,
771                                        ts_init,
772                                        false,
773                                        Some(venue_order_id),
774                                        Some(account_id),
775                                        None, // price
776                                        None, // trigger_price
777                                        None, // protection_price
778                                    );
779                                    self.pending_messages
780                                        .push_back(NautilusWsMessage::OrderUpdated(updated));
781                                }
782                            }
783                            KrakenExecType::Trade | KrakenExecType::Filled => {
784                                // Trades use OrderStatusReport + FillReport
785                                let has_complete_trade_data =
786                                    exec_data.last_qty.is_some_and(|q| q > 0.0)
787                                        && exec_data.last_price.is_some_and(|p| p > 0.0);
788
789                                if let Ok(status_report) = parse_ws_order_status_report(
790                                    &exec_data,
791                                    &instrument,
792                                    account_id,
793                                    cached_order_qty,
794                                    ts_init,
795                                ) {
796                                    self.pending_messages.push_back(
797                                        NautilusWsMessage::OrderStatusReport(Box::new(
798                                            status_report,
799                                        )),
800                                    );
801                                }
802
803                                if has_complete_trade_data
804                                    && let Ok(fill_report) = parse_ws_fill_report(
805                                        &exec_data,
806                                        &instrument,
807                                        account_id,
808                                        ts_init,
809                                    )
810                                {
811                                    self.pending_messages
812                                        .push_back(NautilusWsMessage::FillReport(Box::new(
813                                            fill_report,
814                                        )));
815                                }
816                            }
817                            KrakenExecType::IcebergRefill => {
818                                // Iceberg order refill - treat similar to order update
819                                if let Some(order_qty) = exec_data.order_qty.or(cached_order_qty) {
820                                    let updated = OrderUpdated::new(
821                                        info.trader_id,
822                                        info.strategy_id,
823                                        instrument.id(),
824                                        client_order_id,
825                                        Quantity::new(order_qty, instrument.size_precision()),
826                                        UUID4::new(),
827                                        ts_event,
828                                        ts_init,
829                                        false,
830                                        Some(venue_order_id),
831                                        Some(account_id),
832                                        None,
833                                        None,
834                                        None,
835                                    );
836                                    self.pending_messages
837                                        .push_back(NautilusWsMessage::OrderUpdated(updated));
838                                }
839                            }
840                            KrakenExecType::Status => {
841                                // Status update without state change - emit OrderStatusReport
842                                if let Ok(status_report) = parse_ws_order_status_report(
843                                    &exec_data,
844                                    &instrument,
845                                    account_id,
846                                    cached_order_qty,
847                                    ts_init,
848                                ) {
849                                    self.pending_messages.push_back(
850                                        NautilusWsMessage::OrderStatusReport(Box::new(
851                                            status_report,
852                                        )),
853                                    );
854                                }
855                            }
856                        }
857                    } else {
858                        // No cached info - external order or reconciliation, use OrderStatusReport
859                        if exec_data.exec_type == KrakenExecType::Trade
860                            || exec_data.exec_type == KrakenExecType::Filled
861                        {
862                            let has_order_data = exec_data.order_qty.is_some()
863                                || cached_order_qty.is_some()
864                                || exec_data.cum_qty.is_some();
865
866                            let has_complete_trade_data =
867                                exec_data.last_qty.is_some_and(|q| q > 0.0)
868                                    && exec_data.last_price.is_some_and(|p| p > 0.0);
869
870                            if has_order_data
871                                && let Ok(status_report) = parse_ws_order_status_report(
872                                    &exec_data,
873                                    &instrument,
874                                    account_id,
875                                    cached_order_qty,
876                                    ts_init,
877                                )
878                            {
879                                self.pending_messages.push_back(
880                                    NautilusWsMessage::OrderStatusReport(Box::new(status_report)),
881                                );
882                            }
883
884                            if has_complete_trade_data
885                                && let Ok(fill_report) = parse_ws_fill_report(
886                                    &exec_data,
887                                    &instrument,
888                                    account_id,
889                                    ts_init,
890                                )
891                            {
892                                self.pending_messages
893                                    .push_back(NautilusWsMessage::FillReport(Box::new(
894                                        fill_report,
895                                    )));
896                            }
897                        } else if let Ok(report) = parse_ws_order_status_report(
898                            &exec_data,
899                            &instrument,
900                            account_id,
901                            cached_order_qty,
902                            ts_init,
903                        ) {
904                            self.pending_messages
905                                .push_back(NautilusWsMessage::OrderStatusReport(Box::new(report)));
906                        }
907                    }
908                }
909                Err(e) => {
910                    log::error!("Failed to deserialize execution data: {e}");
911                }
912            }
913        }
914
915        // Return first queued message (rest returned via next() pending check)
916        self.pending_messages.pop_front()
917    }
918}
919
920#[cfg(test)]
921mod tests {
922    use nautilus_model::{
923        identifiers::{InstrumentId, Symbol, Venue},
924        instruments::{InstrumentAny, currency_pair::CurrencyPair},
925        types::{Currency, Price, Quantity},
926    };
927    use rstest::rstest;
928
929    use super::*;
930
931    fn create_test_handler() -> SpotFeedHandler {
932        let signal = Arc::new(AtomicBool::new(false));
933        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
934        let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
935        let subscriptions = SubscriptionState::new(':');
936
937        SpotFeedHandler::new(signal, cmd_rx, raw_rx, subscriptions)
938    }
939
940    fn create_test_instrument(symbol: &str) -> InstrumentAny {
941        let instrument_id = InstrumentId::new(Symbol::new(symbol), Venue::new("KRAKEN"));
942        InstrumentAny::CurrencyPair(CurrencyPair::new(
943            instrument_id,
944            Symbol::new(symbol),
945            Currency::BTC(),
946            Currency::USD(),
947            2,
948            8,
949            Price::from("0.01"),
950            Quantity::from("0.00000001"),
951            None,
952            None,
953            None,
954            None,
955            None,
956            None,
957            None,
958            None,
959            None,
960            None,
961            None,
962            None,
963            UnixNanos::default(),
964            UnixNanos::default(),
965        ))
966    }
967
968    #[rstest]
969    fn test_ticker_message_filtered_without_quotes_subscription() {
970        let mut handler = create_test_handler();
971        let instrument = create_test_instrument("BTC/USD");
972        handler
973            .instruments_cache
974            .insert(Ustr::from("BTC/USD"), instrument);
975
976        let json = r#"{
977            "channel": "ticker",
978            "type": "snapshot",
979            "data": [{
980                "symbol": "BTC/USD",
981                "bid": 105944.20,
982                "bid_qty": 2.5,
983                "ask": 105944.30,
984                "ask_qty": 3.2,
985                "last": 105899.40,
986                "volume": 163.28908096,
987                "vwap": 105904.39279,
988                "low": 104711.00,
989                "high": 106613.10,
990                "change": 250.00,
991                "change_pct": 0.24
992            }]
993        }"#;
994
995        let ts_init = UnixNanos::from(1_000_000_000);
996        let result = handler.parse_message(json, ts_init);
997
998        assert!(
999            result.is_none(),
1000            "Ticker message should be filtered when no quotes subscription exists"
1001        );
1002    }
1003
1004    #[rstest]
1005    fn test_ticker_message_passes_with_quotes_subscription() {
1006        let mut handler = create_test_handler();
1007        let instrument = create_test_instrument("BTC/USD");
1008        handler
1009            .instruments_cache
1010            .insert(Ustr::from("BTC/USD"), instrument);
1011
1012        handler.subscriptions.mark_subscribe("quotes:BTC/USD");
1013        handler.subscriptions.confirm_subscribe("quotes:BTC/USD");
1014
1015        let json = r#"{
1016            "channel": "ticker",
1017            "type": "snapshot",
1018            "data": [{
1019                "symbol": "BTC/USD",
1020                "bid": 105944.20,
1021                "bid_qty": 2.5,
1022                "ask": 105944.30,
1023                "ask_qty": 3.2,
1024                "last": 105899.40,
1025                "volume": 163.28908096,
1026                "vwap": 105904.39279,
1027                "low": 104711.00,
1028                "high": 106613.10,
1029                "change": 250.00,
1030                "change_pct": 0.24
1031            }]
1032        }"#;
1033
1034        let ts_init = UnixNanos::from(1_000_000_000);
1035        let result = handler.parse_message(json, ts_init);
1036
1037        assert!(
1038            result.is_some(),
1039            "Ticker message should pass with quotes subscription"
1040        );
1041        match result.unwrap() {
1042            NautilusWsMessage::Data(data) => {
1043                assert!(!data.is_empty(), "Should have quote data");
1044            }
1045            _ => panic!("Expected Data message with quote"),
1046        }
1047    }
1048
1049    #[rstest]
1050    fn test_ticker_message_passes_with_ticker_subscription() {
1051        let mut handler = create_test_handler();
1052        let instrument = create_test_instrument("BTC/USD");
1053        handler
1054            .instruments_cache
1055            .insert(Ustr::from("BTC/USD"), instrument);
1056
1057        // Direct ticker subscription via subscribe(Ticker, ...) API
1058        handler.subscriptions.mark_subscribe("ticker:BTC/USD");
1059        handler.subscriptions.confirm_subscribe("ticker:BTC/USD");
1060
1061        let json = r#"{
1062            "channel": "ticker",
1063            "type": "snapshot",
1064            "data": [{
1065                "symbol": "BTC/USD",
1066                "bid": 105944.20,
1067                "bid_qty": 2.5,
1068                "ask": 105944.30,
1069                "ask_qty": 3.2,
1070                "last": 105899.40,
1071                "volume": 163.28908096,
1072                "vwap": 105904.39279,
1073                "low": 104711.00,
1074                "high": 106613.10,
1075                "change": 250.00,
1076                "change_pct": 0.24
1077            }]
1078        }"#;
1079
1080        let ts_init = UnixNanos::from(1_000_000_000);
1081        let result = handler.parse_message(json, ts_init);
1082
1083        assert!(
1084            result.is_some(),
1085            "Ticker message should pass with ticker: subscription"
1086        );
1087        match result.unwrap() {
1088            NautilusWsMessage::Data(data) => {
1089                assert!(!data.is_empty(), "Should have quote data");
1090            }
1091            _ => panic!("Expected Data message with quote"),
1092        }
1093    }
1094
1095    #[rstest]
1096    fn test_book_message_filtered_without_book_subscription() {
1097        let mut handler = create_test_handler();
1098        let instrument = create_test_instrument("BTC/USD");
1099        handler
1100            .instruments_cache
1101            .insert(Ustr::from("BTC/USD"), instrument);
1102
1103        let json = r#"{
1104            "channel": "book",
1105            "type": "snapshot",
1106            "data": [{
1107                "symbol": "BTC/USD",
1108                "bids": [{"price": 105944.20, "qty": 2.5}],
1109                "asks": [{"price": 105944.30, "qty": 3.2}],
1110                "checksum": 12345
1111            }]
1112        }"#;
1113
1114        let ts_init = UnixNanos::from(1_000_000_000);
1115        let result = handler.parse_message(json, ts_init);
1116
1117        assert!(
1118            result.is_none(),
1119            "Book message should be filtered when no book subscription exists"
1120        );
1121    }
1122
1123    #[rstest]
1124    fn test_book_message_passes_with_book_subscription() {
1125        let mut handler = create_test_handler();
1126        let instrument = create_test_instrument("BTC/USD");
1127        handler
1128            .instruments_cache
1129            .insert(Ustr::from("BTC/USD"), instrument);
1130
1131        handler.subscriptions.mark_subscribe("book:BTC/USD");
1132        handler.subscriptions.confirm_subscribe("book:BTC/USD");
1133
1134        let json = r#"{
1135            "channel": "book",
1136            "type": "snapshot",
1137            "data": [{
1138                "symbol": "BTC/USD",
1139                "bids": [{"price": 105944.20, "qty": 2.5}],
1140                "asks": [{"price": 105944.30, "qty": 3.2}],
1141                "checksum": 12345
1142            }]
1143        }"#;
1144
1145        let ts_init = UnixNanos::from(1_000_000_000);
1146        let result = handler.parse_message(json, ts_init);
1147
1148        assert!(
1149            result.is_some(),
1150            "Book message should pass with book subscription"
1151        );
1152        match result.unwrap() {
1153            NautilusWsMessage::Deltas(_) => {}
1154            _ => panic!("Expected Deltas message"),
1155        }
1156    }
1157
1158    #[rstest]
1159    fn test_quotes_and_book_subscriptions_independent() {
1160        let mut handler = create_test_handler();
1161        let instrument = create_test_instrument("BTC/USD");
1162        handler
1163            .instruments_cache
1164            .insert(Ustr::from("BTC/USD"), instrument);
1165
1166        handler.subscriptions.mark_subscribe("quotes:BTC/USD");
1167        handler.subscriptions.confirm_subscribe("quotes:BTC/USD");
1168
1169        let book_json = r#"{
1170            "channel": "book",
1171            "type": "snapshot",
1172            "data": [{
1173                "symbol": "BTC/USD",
1174                "bids": [{"price": 105944.20, "qty": 2.5}],
1175                "asks": [{"price": 105944.30, "qty": 3.2}],
1176                "checksum": 12345
1177            }]
1178        }"#;
1179
1180        let ts_init = UnixNanos::from(1_000_000_000);
1181        let book_result = handler.parse_message(book_json, ts_init);
1182        assert!(
1183            book_result.is_none(),
1184            "Book message should be filtered without book: subscription"
1185        );
1186
1187        let ticker_json = r#"{
1188            "channel": "ticker",
1189            "type": "snapshot",
1190            "data": [{
1191                "symbol": "BTC/USD",
1192                "bid": 105944.20,
1193                "bid_qty": 2.5,
1194                "ask": 105944.30,
1195                "ask_qty": 3.2,
1196                "last": 105899.40,
1197                "volume": 163.28908096,
1198                "vwap": 105904.39279,
1199                "low": 104711.00,
1200                "high": 106613.10,
1201                "change": 250.00,
1202                "change_pct": 0.24
1203            }]
1204        }"#;
1205
1206        let ticker_result = handler.parse_message(ticker_json, ts_init);
1207        assert!(
1208            ticker_result.is_some(),
1209            "Ticker should pass with quotes subscription"
1210        );
1211    }
1212}