nautilus_kraken/websocket/spot_v2/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Kraken Spot v2.
17
18use std::{
19    collections::VecDeque,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use nautilus_common::cache::quote::QuoteCache;
28use nautilus_core::{AtomicTime, UUID4, UnixNanos, time::get_atomic_clock_realtime};
29use nautilus_model::{
30    data::{Bar, Data, OrderBookDeltas, QuoteTick},
31    events::{OrderAccepted, OrderCanceled, OrderExpired, OrderRejected, OrderUpdated},
32    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
33    instruments::{Instrument, InstrumentAny},
34    types::{Price, Quantity},
35};
36use nautilus_network::{
37    RECONNECTED,
38    websocket::{SubscriptionState, WebSocketClient},
39};
40use serde_json::Value;
41use tokio_tungstenite::tungstenite::Message;
42use ustr::Ustr;
43
44use super::{
45    enums::{KrakenExecType, KrakenWsChannel},
46    messages::{
47        KrakenWsBookData, KrakenWsExecutionData, KrakenWsMessage, KrakenWsOhlcData,
48        KrakenWsResponse, KrakenWsTickerData, KrakenWsTradeData, NautilusWsMessage,
49    },
50    parse::{
51        parse_book_deltas, parse_quote_tick, parse_trade_tick, parse_ws_bar, parse_ws_fill_report,
52        parse_ws_order_status_report,
53    },
54};
55
56/// Cached information about a client order needed for event generation.
57#[derive(Debug, Clone)]
58struct CachedOrderInfo {
59    instrument_id: InstrumentId,
60    trader_id: TraderId,
61    strategy_id: StrategyId,
62}
63
64/// Commands sent from the outer client to the inner message handler.
65#[derive(Debug)]
66#[allow(
67    clippy::large_enum_variant,
68    reason = "Commands are ephemeral and immediately consumed"
69)]
70pub enum SpotHandlerCommand {
71    SetClient(WebSocketClient),
72    Disconnect,
73    SendText {
74        payload: String,
75    },
76    InitializeInstruments(Vec<InstrumentAny>),
77    UpdateInstrument(InstrumentAny),
78    SetAccountId(AccountId),
79    CacheClientOrder {
80        client_order_id: ClientOrderId,
81        instrument_id: InstrumentId,
82        trader_id: TraderId,
83        strategy_id: StrategyId,
84    },
85}
86
87/// Key for buffering OHLC bars: (symbol, interval).
88type OhlcBufferKey = (Ustr, u32);
89
90/// Buffered OHLC bar with its interval start time for period detection.
91type OhlcBufferEntry = (Bar, UnixNanos);
92
93/// WebSocket message handler for Kraken.
94pub(super) struct SpotFeedHandler {
95    clock: &'static AtomicTime,
96    signal: Arc<AtomicBool>,
97    client: Option<WebSocketClient>,
98    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<SpotHandlerCommand>,
99    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
100    subscriptions: SubscriptionState,
101    instruments_cache: AHashMap<Ustr, InstrumentAny>,
102    client_order_cache: AHashMap<String, CachedOrderInfo>,
103    order_qty_cache: AHashMap<String, f64>,
104    quote_cache: QuoteCache,
105    book_sequence: u64,
106    pending_quotes: Vec<QuoteTick>,
107    pending_messages: VecDeque<NautilusWsMessage>,
108    account_id: Option<AccountId>,
109    ohlc_buffer: AHashMap<OhlcBufferKey, OhlcBufferEntry>,
110}
111
112impl SpotFeedHandler {
113    /// Creates a new [`SpotFeedHandler`] instance.
114    pub(super) fn new(
115        signal: Arc<AtomicBool>,
116        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<SpotHandlerCommand>,
117        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
118        subscriptions: SubscriptionState,
119    ) -> Self {
120        Self {
121            clock: get_atomic_clock_realtime(),
122            signal,
123            client: None,
124            cmd_rx,
125            raw_rx,
126            subscriptions,
127            instruments_cache: AHashMap::new(),
128            client_order_cache: AHashMap::new(),
129            order_qty_cache: AHashMap::new(),
130            quote_cache: QuoteCache::new(),
131            book_sequence: 0,
132            pending_quotes: Vec::new(),
133            pending_messages: VecDeque::new(),
134            account_id: None,
135            ohlc_buffer: AHashMap::new(),
136        }
137    }
138
139    pub(super) fn is_stopped(&self) -> bool {
140        self.signal.load(Ordering::Relaxed)
141    }
142
143    /// Checks if a topic is active (confirmed or pending subscribe).
144    fn is_subscribed(&self, topic: &str) -> bool {
145        self.subscriptions.all_topics().iter().any(|t| t == topic)
146    }
147
148    fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
149        self.instruments_cache.get(symbol).cloned()
150    }
151
152    /// Flushes all buffered OHLC bars to pending messages.
153    ///
154    /// Called when the stream ends to ensure the last bar for each symbol/interval
155    /// is not lost.
156    fn flush_ohlc_buffer(&mut self) {
157        if self.ohlc_buffer.is_empty() {
158            return;
159        }
160
161        let bars: Vec<Data> = self
162            .ohlc_buffer
163            .drain()
164            .map(|(_, (bar, _))| Data::Bar(bar))
165            .collect();
166
167        if !bars.is_empty() {
168            tracing::debug!("Flushing {} buffered OHLC bars on stream end", bars.len());
169            self.pending_messages
170                .push_back(NautilusWsMessage::Data(bars));
171        }
172    }
173
174    /// Processes messages and commands, returning when stopped or stream ends.
175    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
176        // Check for pending messages first (e.g., from multi-message scenarios like trades)
177        if let Some(msg) = self.pending_messages.pop_front() {
178            return Some(msg);
179        }
180
181        if let Some(quote) = self.pending_quotes.pop() {
182            return Some(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
183        }
184
185        loop {
186            tokio::select! {
187                Some(cmd) = self.cmd_rx.recv() => {
188                    match cmd {
189                        SpotHandlerCommand::SetClient(client) => {
190                            tracing::debug!("WebSocketClient received by handler");
191                            self.client = Some(client);
192                        }
193                        SpotHandlerCommand::Disconnect => {
194                            tracing::debug!("Disconnect command received");
195                            if let Some(client) = self.client.take() {
196                                client.disconnect().await;
197                            }
198                        }
199                        SpotHandlerCommand::SendText { payload } => {
200                            if let Some(client) = &self.client
201                                && let Err(e) = client.send_text(payload.clone(), None).await
202                            {
203                                tracing::error!(error = %e, "Failed to send text");
204                            }
205                        }
206                        SpotHandlerCommand::InitializeInstruments(instruments) => {
207                            for inst in instruments {
208                                // Cache by symbol (ISO 4217-A3 format like "ETH/USD")
209                                // which matches what v2 WebSocket messages use
210                                self.instruments_cache.insert(inst.symbol().inner(), inst);
211                            }
212                        }
213                        SpotHandlerCommand::UpdateInstrument(inst) => {
214                            self.instruments_cache.insert(inst.symbol().inner(), inst);
215                        }
216                        SpotHandlerCommand::SetAccountId(account_id) => {
217                            tracing::debug!(%account_id, "Account ID set for execution reports");
218                            self.account_id = Some(account_id);
219                        }
220                        SpotHandlerCommand::CacheClientOrder {
221                            client_order_id,
222                            instrument_id,
223                            trader_id,
224                            strategy_id,
225                        } => {
226                            tracing::debug!(
227                                %client_order_id,
228                                %instrument_id,
229                                "Cached client order info"
230                            );
231                            self.client_order_cache.insert(
232                                client_order_id.to_string(),
233                                CachedOrderInfo {
234                                    instrument_id,
235                                    trader_id,
236                                    strategy_id,
237                                },
238                            );
239                        }
240                    }
241                    continue;
242                }
243
244                msg = self.raw_rx.recv() => {
245                    let msg = match msg {
246                        Some(msg) => msg,
247                        None => {
248                            tracing::debug!("WebSocket stream closed");
249                            self.flush_ohlc_buffer();
250                            return self.pending_messages.pop_front();
251                        }
252                    };
253
254                    if let Message::Ping(data) = &msg {
255                        tracing::trace!("Received ping frame with {} bytes", data.len());
256                        if let Some(client) = &self.client
257                            && let Err(e) = client.send_pong(data.to_vec()).await
258                        {
259                            tracing::warn!(error = %e, "Failed to send pong frame");
260                        }
261                        continue;
262                    }
263
264                    if self.signal.load(Ordering::Relaxed) {
265                        tracing::debug!("Stop signal received");
266                        self.flush_ohlc_buffer();
267                        return self.pending_messages.pop_front();
268                    }
269
270                    let text = match msg {
271                        Message::Text(text) => text.to_string(),
272                        Message::Binary(data) => {
273                            match String::from_utf8(data.to_vec()) {
274                                Ok(text) => text,
275                                Err(e) => {
276                                    tracing::warn!("Failed to decode binary message: {e}");
277                                    continue;
278                                }
279                            }
280                        }
281                        Message::Pong(_) => {
282                            tracing::trace!("Received pong");
283                            continue;
284                        }
285                        Message::Close(_) => {
286                            tracing::info!("WebSocket connection closed");
287                            self.flush_ohlc_buffer();
288                            return self.pending_messages.pop_front();
289                        }
290                        Message::Frame(_) => {
291                            tracing::trace!("Received raw frame");
292                            continue;
293                        }
294                        _ => continue,
295                    };
296
297                    if text == RECONNECTED {
298                        tracing::info!("Received WebSocket reconnected signal");
299                        self.quote_cache.clear();
300                        return Some(NautilusWsMessage::Reconnected);
301                    }
302
303                    let ts_init = self.clock.get_time_ns();
304
305                    if let Some(nautilus_msg) = self.parse_message(&text, ts_init) {
306                        return Some(nautilus_msg);
307                    }
308
309                    continue;
310                }
311            }
312        }
313    }
314
315    fn parse_message(&mut self, text: &str, ts_init: UnixNanos) -> Option<NautilusWsMessage> {
316        // Fast pre-filter for high-frequency control messages (no JSON parsing)
317        // Heartbeats and status messages are short and share common prefix
318        if text.len() < 50 && text.starts_with("{\"channel\":\"") {
319            if text.contains("heartbeat") {
320                tracing::trace!("Received heartbeat");
321                return None;
322            }
323            if text.contains("status") {
324                tracing::debug!("Received status message");
325                return None;
326            }
327        }
328
329        let value: Value = match serde_json::from_str(text) {
330            Ok(v) => v,
331            Err(e) => {
332                tracing::warn!("Failed to parse message: {e}");
333                return None;
334            }
335        };
336
337        // Control messages have "method" field
338        if value.get("method").is_some() {
339            self.handle_control_message(value);
340            return None;
341        }
342
343        // Data messages have "channel" and "data" fields
344        if value.get("channel").is_some() && value.get("data").is_some() {
345            match serde_json::from_value::<KrakenWsMessage>(value) {
346                Ok(msg) => return self.handle_data_message(msg, ts_init),
347                Err(e) => {
348                    tracing::debug!("Failed to parse data message: {e}");
349                    return None;
350                }
351            }
352        }
353
354        tracing::debug!("Unhandled message structure: {text}");
355        None
356    }
357
358    fn handle_control_message(&self, value: Value) {
359        match serde_json::from_value::<KrakenWsResponse>(value) {
360            Ok(response) => match response {
361                KrakenWsResponse::Subscribe(sub) => {
362                    if sub.success {
363                        if let Some(result) = &sub.result {
364                            tracing::debug!(
365                                channel = ?result.channel,
366                                req_id = ?sub.req_id,
367                                "Subscription confirmed"
368                            );
369                        } else {
370                            tracing::debug!(req_id = ?sub.req_id, "Subscription confirmed");
371                        }
372                    } else {
373                        tracing::warn!(
374                            error = ?sub.error,
375                            req_id = ?sub.req_id,
376                            "Subscription failed"
377                        );
378                    }
379                }
380                KrakenWsResponse::Unsubscribe(unsub) => {
381                    if unsub.success {
382                        tracing::debug!(req_id = ?unsub.req_id, "Unsubscription confirmed");
383                    } else {
384                        tracing::warn!(
385                            error = ?unsub.error,
386                            req_id = ?unsub.req_id,
387                            "Unsubscription failed"
388                        );
389                    }
390                }
391                KrakenWsResponse::Pong(pong) => {
392                    tracing::trace!(req_id = ?pong.req_id, "Received pong");
393                }
394                KrakenWsResponse::Other => {
395                    tracing::debug!("Received unknown control response");
396                }
397            },
398            Err(_) => {
399                tracing::debug!("Received control message (failed to parse details)");
400            }
401        }
402    }
403
404    fn handle_data_message(
405        &mut self,
406        msg: KrakenWsMessage,
407        ts_init: UnixNanos,
408    ) -> Option<NautilusWsMessage> {
409        match msg.channel {
410            KrakenWsChannel::Book => self.handle_book_message(msg, ts_init),
411            KrakenWsChannel::Ticker => self.handle_ticker_message(msg, ts_init),
412            KrakenWsChannel::Trade => self.handle_trade_message(msg, ts_init),
413            KrakenWsChannel::Ohlc => self.handle_ohlc_message(msg, ts_init),
414            KrakenWsChannel::Executions => self.handle_executions_message(msg, ts_init),
415            _ => {
416                tracing::warn!("Unhandled channel: {:?}", msg.channel);
417                None
418            }
419        }
420    }
421
422    fn handle_book_message(
423        &mut self,
424        msg: KrakenWsMessage,
425        ts_init: UnixNanos,
426    ) -> Option<NautilusWsMessage> {
427        let mut all_deltas = Vec::new();
428        let mut instrument_id = None;
429
430        for data in msg.data {
431            match serde_json::from_value::<KrakenWsBookData>(data) {
432                Ok(book_data) => {
433                    let symbol = &book_data.symbol;
434                    let instrument = self.get_instrument(symbol)?;
435                    instrument_id = Some(instrument.id());
436
437                    let price_precision = instrument.price_precision();
438                    let size_precision = instrument.size_precision();
439
440                    let has_book = self.is_subscribed(&format!("book:{symbol}"));
441                    let has_quotes = self.is_subscribed(&format!("quotes:{symbol}"));
442
443                    if has_quotes {
444                        let best_bid = book_data.bids.as_ref().and_then(|bids| bids.first());
445                        let best_ask = book_data.asks.as_ref().and_then(|asks| asks.first());
446
447                        let bid_price = best_bid.map(|b| Price::new(b.price, price_precision));
448                        let ask_price = best_ask.map(|a| Price::new(a.price, price_precision));
449                        let bid_size = best_bid.map(|b| Quantity::new(b.qty, size_precision));
450                        let ask_size = best_ask.map(|a| Quantity::new(a.qty, size_precision));
451
452                        if let Ok(quote) = self.quote_cache.process(
453                            instrument.id(),
454                            bid_price,
455                            ask_price,
456                            bid_size,
457                            ask_size,
458                            ts_init,
459                            ts_init,
460                        ) {
461                            self.pending_quotes.push(quote);
462                        }
463                    }
464
465                    if has_book {
466                        match parse_book_deltas(
467                            &book_data,
468                            &instrument,
469                            self.book_sequence,
470                            ts_init,
471                        ) {
472                            Ok(mut deltas) => {
473                                self.book_sequence += deltas.len() as u64;
474                                all_deltas.append(&mut deltas);
475                            }
476                            Err(e) => {
477                                tracing::error!("Failed to parse book deltas: {e}");
478                            }
479                        }
480                    }
481                }
482                Err(e) => {
483                    tracing::error!("Failed to deserialize book data: {e}");
484                }
485            }
486        }
487
488        if all_deltas.is_empty() {
489            if let Some(quote) = self.pending_quotes.pop() {
490                return Some(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
491            }
492            None
493        } else {
494            let deltas = OrderBookDeltas::new(instrument_id?, all_deltas);
495            Some(NautilusWsMessage::Deltas(deltas))
496        }
497    }
498
499    fn handle_ticker_message(
500        &self,
501        msg: KrakenWsMessage,
502        ts_init: UnixNanos,
503    ) -> Option<NautilusWsMessage> {
504        let mut quotes = Vec::new();
505
506        for data in msg.data {
507            match serde_json::from_value::<KrakenWsTickerData>(data) {
508                Ok(ticker_data) => {
509                    let instrument = self.get_instrument(&ticker_data.symbol)?;
510
511                    match parse_quote_tick(&ticker_data, &instrument, ts_init) {
512                        Ok(quote) => quotes.push(Data::Quote(quote)),
513                        Err(e) => {
514                            tracing::error!("Failed to parse quote tick: {e}");
515                        }
516                    }
517                }
518                Err(e) => {
519                    tracing::error!("Failed to deserialize ticker data: {e}");
520                }
521            }
522        }
523
524        if quotes.is_empty() {
525            None
526        } else {
527            Some(NautilusWsMessage::Data(quotes))
528        }
529    }
530
531    fn handle_trade_message(
532        &self,
533        msg: KrakenWsMessage,
534        ts_init: UnixNanos,
535    ) -> Option<NautilusWsMessage> {
536        let mut trades = Vec::new();
537
538        for data in msg.data {
539            match serde_json::from_value::<KrakenWsTradeData>(data) {
540                Ok(trade_data) => {
541                    let instrument = self.get_instrument(&trade_data.symbol)?;
542
543                    match parse_trade_tick(&trade_data, &instrument, ts_init) {
544                        Ok(trade) => trades.push(Data::Trade(trade)),
545                        Err(e) => {
546                            tracing::error!("Failed to parse trade tick: {e}");
547                        }
548                    }
549                }
550                Err(e) => {
551                    tracing::error!("Failed to deserialize trade data: {e}");
552                }
553            }
554        }
555
556        if trades.is_empty() {
557            None
558        } else {
559            Some(NautilusWsMessage::Data(trades))
560        }
561    }
562
563    fn handle_ohlc_message(
564        &mut self,
565        msg: KrakenWsMessage,
566        ts_init: UnixNanos,
567    ) -> Option<NautilusWsMessage> {
568        let mut closed_bars = Vec::new();
569
570        for data in msg.data {
571            match serde_json::from_value::<KrakenWsOhlcData>(data) {
572                Ok(ohlc_data) => {
573                    let instrument = self.get_instrument(&ohlc_data.symbol)?;
574
575                    match parse_ws_bar(&ohlc_data, &instrument, ts_init) {
576                        Ok(new_bar) => {
577                            let key = (ohlc_data.symbol, ohlc_data.interval);
578                            let new_interval_begin = UnixNanos::from(
579                                ohlc_data.interval_begin.timestamp_nanos_opt().unwrap_or(0) as u64,
580                            );
581
582                            // Check if we have a buffered bar for this symbol/interval
583                            if let Some((buffered_bar, buffered_interval_begin)) =
584                                self.ohlc_buffer.get(&key)
585                            {
586                                // If interval_begin changed, the buffered bar is closed
587                                if new_interval_begin != *buffered_interval_begin {
588                                    closed_bars.push(Data::Bar(*buffered_bar));
589                                }
590                            }
591
592                            // Update buffer with the new (potentially incomplete) bar
593                            self.ohlc_buffer.insert(key, (new_bar, new_interval_begin));
594                        }
595                        Err(e) => {
596                            tracing::error!("Failed to parse bar: {e}");
597                        }
598                    }
599                }
600                Err(e) => {
601                    tracing::error!("Failed to deserialize OHLC data: {e}");
602                }
603            }
604        }
605
606        if closed_bars.is_empty() {
607            None
608        } else {
609            Some(NautilusWsMessage::Data(closed_bars))
610        }
611    }
612
613    fn handle_executions_message(
614        &mut self,
615        msg: KrakenWsMessage,
616        ts_init: UnixNanos,
617    ) -> Option<NautilusWsMessage> {
618        let Some(account_id) = self.account_id else {
619            tracing::warn!("Cannot process execution message: account_id not set");
620            return None;
621        };
622
623        // Process all executions in batch and queue them (snapshots can have many records)
624        for data in msg.data {
625            match serde_json::from_value::<KrakenWsExecutionData>(data) {
626                Ok(exec_data) => {
627                    tracing::debug!(
628                        exec_type = ?exec_data.exec_type,
629                        order_id = %exec_data.order_id,
630                        order_status = ?exec_data.order_status,
631                        order_qty = ?exec_data.order_qty,
632                        cum_qty = ?exec_data.cum_qty,
633                        last_qty = ?exec_data.last_qty,
634                        "Received execution message"
635                    );
636
637                    // Cache order_qty for subsequent messages that may not include it
638                    if let Some(qty) = exec_data.order_qty {
639                        self.order_qty_cache.insert(exec_data.order_id.clone(), qty);
640                    }
641
642                    // Resolve instrument and cached order info
643                    let (instrument, cached_info) = if let Some(ref symbol) = exec_data.symbol {
644                        let symbol_ustr = Ustr::from(symbol.as_str());
645                        let inst = self.instruments_cache.get(&symbol_ustr).cloned();
646                        if inst.is_none() {
647                            tracing::warn!(
648                                symbol = %symbol,
649                                order_id = %exec_data.order_id,
650                                "No instrument found for symbol"
651                            );
652                        }
653                        let cached = exec_data
654                            .cl_ord_id
655                            .as_ref()
656                            .and_then(|id| self.client_order_cache.get(id).cloned());
657                        (inst, cached)
658                    } else if let Some(ref cl_ord_id) = exec_data.cl_ord_id {
659                        let cached = self.client_order_cache.get(cl_ord_id).cloned();
660                        let inst = cached.as_ref().and_then(|info| {
661                            self.instruments_cache
662                                .iter()
663                                .find(|(_, inst)| inst.id() == info.instrument_id)
664                                .map(|(_, inst)| inst.clone())
665                        });
666                        (inst, cached)
667                    } else {
668                        (None, None)
669                    };
670
671                    let Some(instrument) = instrument else {
672                        tracing::debug!(
673                            order_id = %exec_data.order_id,
674                            cl_ord_id = ?exec_data.cl_ord_id,
675                            exec_type = ?exec_data.exec_type,
676                            "Execution missing symbol and order not in cache (external order)"
677                        );
678                        continue;
679                    };
680
681                    let cached_order_qty = self.order_qty_cache.get(&exec_data.order_id).copied();
682                    let ts_event = chrono::DateTime::parse_from_rfc3339(&exec_data.timestamp)
683                        .map(|t| UnixNanos::from(t.timestamp_nanos_opt().unwrap_or(0) as u64))
684                        .unwrap_or(ts_init);
685
686                    // Emit proper order events when we have cached info, otherwise fall back
687                    // to OrderStatusReport for external orders or reconciliation
688                    if let Some(ref info) = cached_info {
689                        let client_order_id = exec_data
690                            .cl_ord_id
691                            .as_ref()
692                            .map(ClientOrderId::new)
693                            .expect("cl_ord_id should exist if cached");
694                        let venue_order_id = VenueOrderId::new(&exec_data.order_id);
695
696                        match exec_data.exec_type {
697                            KrakenExecType::PendingNew => {
698                                // Order received and validated - emit accepted
699                                let accepted = OrderAccepted::new(
700                                    info.trader_id,
701                                    info.strategy_id,
702                                    instrument.id(),
703                                    client_order_id,
704                                    venue_order_id,
705                                    account_id,
706                                    UUID4::new(),
707                                    ts_event,
708                                    ts_init,
709                                    false,
710                                );
711                                self.pending_messages
712                                    .push_back(NautilusWsMessage::OrderAccepted(accepted));
713                            }
714                            KrakenExecType::New => {
715                                // Order is now live - already accepted, skip
716                            }
717                            KrakenExecType::Canceled => {
718                                // Check if this is a post-only rejection based on reason
719                                // Kraken sends reason="Post only order" for post-only rejections
720                                let is_post_only_rejection = exec_data
721                                    .reason
722                                    .as_ref()
723                                    .is_some_and(|r| r.eq_ignore_ascii_case("Post only order"));
724
725                                if is_post_only_rejection {
726                                    let reason = exec_data
727                                        .reason
728                                        .as_deref()
729                                        .unwrap_or("Post-only order would have crossed");
730                                    let rejected = OrderRejected::new(
731                                        info.trader_id,
732                                        info.strategy_id,
733                                        instrument.id(),
734                                        client_order_id,
735                                        account_id,
736                                        Ustr::from(reason),
737                                        UUID4::new(),
738                                        ts_event,
739                                        ts_init,
740                                        false,
741                                        true, // due_post_only
742                                    );
743                                    self.pending_messages
744                                        .push_back(NautilusWsMessage::OrderRejected(rejected));
745                                } else {
746                                    let canceled = OrderCanceled::new(
747                                        info.trader_id,
748                                        info.strategy_id,
749                                        instrument.id(),
750                                        client_order_id,
751                                        UUID4::new(),
752                                        ts_event,
753                                        ts_init,
754                                        false,
755                                        Some(venue_order_id),
756                                        Some(account_id),
757                                    );
758                                    self.pending_messages
759                                        .push_back(NautilusWsMessage::OrderCanceled(canceled));
760                                }
761                            }
762                            KrakenExecType::Expired => {
763                                let expired = OrderExpired::new(
764                                    info.trader_id,
765                                    info.strategy_id,
766                                    instrument.id(),
767                                    client_order_id,
768                                    UUID4::new(),
769                                    ts_event,
770                                    ts_init,
771                                    false,
772                                    Some(venue_order_id),
773                                    Some(account_id),
774                                );
775                                self.pending_messages
776                                    .push_back(NautilusWsMessage::OrderExpired(expired));
777                            }
778                            KrakenExecType::Amended | KrakenExecType::Restated => {
779                                // For modifications, emit OrderUpdated
780                                if let Some(order_qty) = exec_data.order_qty.or(cached_order_qty) {
781                                    let updated = OrderUpdated::new(
782                                        info.trader_id,
783                                        info.strategy_id,
784                                        instrument.id(),
785                                        client_order_id,
786                                        Quantity::new(order_qty, instrument.size_precision()),
787                                        UUID4::new(),
788                                        ts_event,
789                                        ts_init,
790                                        false,
791                                        Some(venue_order_id),
792                                        Some(account_id),
793                                        None, // price
794                                        None, // trigger_price
795                                        None, // protection_price
796                                    );
797                                    self.pending_messages
798                                        .push_back(NautilusWsMessage::OrderUpdated(updated));
799                                }
800                            }
801                            KrakenExecType::Trade | KrakenExecType::Filled => {
802                                // Trades use OrderStatusReport + FillReport
803                                let has_complete_trade_data =
804                                    exec_data.last_qty.is_some_and(|q| q > 0.0)
805                                        && exec_data.last_price.is_some_and(|p| p > 0.0);
806
807                                if let Ok(status_report) = parse_ws_order_status_report(
808                                    &exec_data,
809                                    &instrument,
810                                    account_id,
811                                    cached_order_qty,
812                                    ts_init,
813                                ) {
814                                    self.pending_messages.push_back(
815                                        NautilusWsMessage::OrderStatusReport(Box::new(
816                                            status_report,
817                                        )),
818                                    );
819                                }
820
821                                if has_complete_trade_data
822                                    && let Ok(fill_report) = parse_ws_fill_report(
823                                        &exec_data,
824                                        &instrument,
825                                        account_id,
826                                        ts_init,
827                                    )
828                                {
829                                    self.pending_messages
830                                        .push_back(NautilusWsMessage::FillReport(Box::new(
831                                            fill_report,
832                                        )));
833                                }
834                            }
835                            KrakenExecType::IcebergRefill => {
836                                // Iceberg order refill - treat similar to order update
837                                if let Some(order_qty) = exec_data.order_qty.or(cached_order_qty) {
838                                    let updated = OrderUpdated::new(
839                                        info.trader_id,
840                                        info.strategy_id,
841                                        instrument.id(),
842                                        client_order_id,
843                                        Quantity::new(order_qty, instrument.size_precision()),
844                                        UUID4::new(),
845                                        ts_event,
846                                        ts_init,
847                                        false,
848                                        Some(venue_order_id),
849                                        Some(account_id),
850                                        None,
851                                        None,
852                                        None,
853                                    );
854                                    self.pending_messages
855                                        .push_back(NautilusWsMessage::OrderUpdated(updated));
856                                }
857                            }
858                            KrakenExecType::Status => {
859                                // Status update without state change - emit OrderStatusReport
860                                if let Ok(status_report) = parse_ws_order_status_report(
861                                    &exec_data,
862                                    &instrument,
863                                    account_id,
864                                    cached_order_qty,
865                                    ts_init,
866                                ) {
867                                    self.pending_messages.push_back(
868                                        NautilusWsMessage::OrderStatusReport(Box::new(
869                                            status_report,
870                                        )),
871                                    );
872                                }
873                            }
874                        }
875                    } else {
876                        // No cached info - external order or reconciliation, use OrderStatusReport
877                        if exec_data.exec_type == KrakenExecType::Trade
878                            || exec_data.exec_type == KrakenExecType::Filled
879                        {
880                            let has_order_data = exec_data.order_qty.is_some()
881                                || cached_order_qty.is_some()
882                                || exec_data.cum_qty.is_some();
883
884                            let has_complete_trade_data =
885                                exec_data.last_qty.is_some_and(|q| q > 0.0)
886                                    && exec_data.last_price.is_some_and(|p| p > 0.0);
887
888                            if has_order_data
889                                && let Ok(status_report) = parse_ws_order_status_report(
890                                    &exec_data,
891                                    &instrument,
892                                    account_id,
893                                    cached_order_qty,
894                                    ts_init,
895                                )
896                            {
897                                self.pending_messages.push_back(
898                                    NautilusWsMessage::OrderStatusReport(Box::new(status_report)),
899                                );
900                            }
901
902                            if has_complete_trade_data
903                                && let Ok(fill_report) = parse_ws_fill_report(
904                                    &exec_data,
905                                    &instrument,
906                                    account_id,
907                                    ts_init,
908                                )
909                            {
910                                self.pending_messages
911                                    .push_back(NautilusWsMessage::FillReport(Box::new(
912                                        fill_report,
913                                    )));
914                            }
915                        } else if let Ok(report) = parse_ws_order_status_report(
916                            &exec_data,
917                            &instrument,
918                            account_id,
919                            cached_order_qty,
920                            ts_init,
921                        ) {
922                            self.pending_messages
923                                .push_back(NautilusWsMessage::OrderStatusReport(Box::new(report)));
924                        }
925                    }
926                }
927                Err(e) => {
928                    tracing::error!("Failed to deserialize execution data: {e}");
929                }
930            }
931        }
932
933        // Return first queued message (rest returned via next() pending check)
934        self.pending_messages.pop_front()
935    }
936}