nautilus_kraken/websocket/futures/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Kraken Futures.
17
18use std::{
19    collections::VecDeque,
20    fmt::Debug,
21    sync::{
22        Arc,
23        atomic::{AtomicBool, Ordering},
24    },
25};
26
27use ahash::AHashMap;
28use nautilus_common::cache::quote::QuoteCache;
29use nautilus_core::{AtomicTime, UUID4, UnixNanos, time::get_atomic_clock_realtime};
30use nautilus_model::{
31    data::{
32        BookOrder, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, TradeTick,
33    },
34    enums::{
35        AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType, TimeInForce,
36    },
37    events::{OrderAccepted, OrderCanceled, OrderExpired, OrderUpdated},
38    identifiers::{
39        AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TradeId, TraderId, VenueOrderId,
40    },
41    instruments::{Instrument, InstrumentAny},
42    reports::{FillReport, OrderStatusReport},
43    types::{Money, Price, Quantity},
44};
45use nautilus_network::{
46    RECONNECTED,
47    websocket::{SubscriptionState, WebSocketClient},
48};
49use serde::Deserialize;
50use serde_json::Value;
51use tokio_tungstenite::tungstenite::Message;
52use ustr::Ustr;
53
54use super::messages::{
55    KrakenFuturesBookDelta, KrakenFuturesBookSnapshot, KrakenFuturesChallengeRequest,
56    KrakenFuturesChannel, KrakenFuturesEvent, KrakenFuturesFeed, KrakenFuturesFill,
57    KrakenFuturesFillsDelta, KrakenFuturesMessageType, KrakenFuturesOpenOrder,
58    KrakenFuturesOpenOrdersCancel, KrakenFuturesOpenOrdersDelta,
59    KrakenFuturesPrivateSubscribeRequest, KrakenFuturesTickerData, KrakenFuturesTradeData,
60    KrakenFuturesTradeSnapshot, KrakenFuturesWsMessage, classify_futures_message,
61};
62use crate::common::enums::KrakenOrderSide;
63
64/// Parsed order event from a Kraken Futures WebSocket message.
65#[derive(Debug, Clone)]
66pub enum ParsedOrderEvent {
67    Accepted(OrderAccepted),
68    Canceled(OrderCanceled),
69    Expired(OrderExpired),
70    Updated(OrderUpdated),
71    StatusOnly(Box<OrderStatusReport>),
72}
73
74/// Cached order info for proper event generation.
75#[derive(Debug, Clone)]
76struct CachedOrderInfo {
77    instrument_id: InstrumentId,
78    trader_id: TraderId,
79    strategy_id: StrategyId,
80}
81
82/// Commands sent from the outer client to the inner message handler.
83#[allow(
84    clippy::large_enum_variant,
85    reason = "Commands are ephemeral and immediately consumed"
86)]
87pub enum HandlerCommand {
88    SetClient(WebSocketClient),
89    SubscribeTicker(Symbol),
90    UnsubscribeTicker(Symbol),
91    SubscribeTrade(Symbol),
92    UnsubscribeTrade(Symbol),
93    SubscribeBook(Symbol),
94    UnsubscribeBook(Symbol),
95    Disconnect,
96    InitializeInstruments(Vec<InstrumentAny>),
97    UpdateInstrument(InstrumentAny),
98    SetAccountId(AccountId),
99    RequestChallenge {
100        api_key: String,
101        response_tx: tokio::sync::oneshot::Sender<String>,
102    },
103    SetAuthCredentials {
104        api_key: String,
105        original_challenge: String,
106        signed_challenge: String,
107    },
108    SubscribeOpenOrders,
109    SubscribeFills,
110    CacheClientOrder {
111        client_order_id: ClientOrderId,
112        venue_order_id: Option<VenueOrderId>,
113        instrument_id: InstrumentId,
114        trader_id: TraderId,
115        strategy_id: StrategyId,
116    },
117}
118
119impl Debug for HandlerCommand {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        match self {
122            Self::SetClient(_) => f.debug_struct(stringify!(SetClient)).finish(),
123            Self::SubscribeTicker(s) => f.debug_tuple("SubscribeTicker").field(s).finish(),
124            Self::UnsubscribeTicker(s) => f.debug_tuple("UnsubscribeTicker").field(s).finish(),
125            Self::SubscribeTrade(s) => f.debug_tuple("SubscribeTrade").field(s).finish(),
126            Self::UnsubscribeTrade(s) => f.debug_tuple("UnsubscribeTrade").field(s).finish(),
127            Self::SubscribeBook(s) => f.debug_tuple("SubscribeBook").field(s).finish(),
128            Self::UnsubscribeBook(s) => f.debug_tuple("UnsubscribeBook").field(s).finish(),
129            Self::Disconnect => write!(f, "Disconnect"),
130            Self::InitializeInstruments(v) => f
131                .debug_tuple("InitializeInstruments")
132                .field(&v.len())
133                .finish(),
134            Self::UpdateInstrument(i) => f.debug_tuple("UpdateInstrument").field(&i.id()).finish(),
135            Self::SetAccountId(id) => f.debug_tuple("SetAccountId").field(id).finish(),
136            Self::RequestChallenge { api_key, .. } => {
137                let masked = &api_key[..4.min(api_key.len())];
138                f.debug_struct(stringify!(RequestChallenge))
139                    .field("api_key", &format!("{masked}..."))
140                    .finish()
141            }
142            Self::SetAuthCredentials { api_key, .. } => {
143                let masked = &api_key[..4.min(api_key.len())];
144                f.debug_struct(stringify!(SetAuthCredentials))
145                    .field("api_key", &format!("{masked}..."))
146                    .finish()
147            }
148            Self::SubscribeOpenOrders => write!(f, "SubscribeOpenOrders"),
149            Self::SubscribeFills => write!(f, "SubscribeFills"),
150            Self::CacheClientOrder {
151                client_order_id,
152                instrument_id,
153                ..
154            } => f
155                .debug_struct(stringify!(CacheClientOrder))
156                .field("client_order_id", client_order_id)
157                .field("instrument_id", instrument_id)
158                .finish(),
159        }
160    }
161}
162
163/// WebSocket message handler for Kraken Futures.
164pub struct FuturesFeedHandler {
165    clock: &'static AtomicTime,
166    signal: Arc<AtomicBool>,
167    client: Option<WebSocketClient>,
168    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
169    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
170    subscriptions: SubscriptionState,
171    instruments_cache: AHashMap<Ustr, InstrumentAny>,
172    quote_cache: QuoteCache,
173    pending_messages: VecDeque<KrakenFuturesWsMessage>,
174    account_id: Option<AccountId>,
175    api_key: Option<String>,
176    original_challenge: Option<String>,
177    signed_challenge: Option<String>,
178    client_order_cache: AHashMap<String, CachedOrderInfo>,
179    venue_order_cache: AHashMap<String, String>,
180    pending_challenge_tx: Option<tokio::sync::oneshot::Sender<String>>,
181}
182
183impl FuturesFeedHandler {
184    /// Creates a new [`FuturesFeedHandler`] instance.
185    pub fn new(
186        signal: Arc<AtomicBool>,
187        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
188        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
189        subscriptions: SubscriptionState,
190    ) -> Self {
191        Self {
192            clock: get_atomic_clock_realtime(),
193            signal,
194            client: None,
195            cmd_rx,
196            raw_rx,
197            subscriptions,
198            instruments_cache: AHashMap::new(),
199            quote_cache: QuoteCache::new(),
200            pending_messages: VecDeque::new(),
201            account_id: None,
202            api_key: None,
203            original_challenge: None,
204            signed_challenge: None,
205            client_order_cache: AHashMap::new(),
206            venue_order_cache: AHashMap::new(),
207            pending_challenge_tx: None,
208        }
209    }
210
211    pub fn is_stopped(&self) -> bool {
212        self.signal.load(Ordering::Relaxed)
213    }
214
215    fn is_subscribed(&self, channel: KrakenFuturesChannel, symbol: &Ustr) -> bool {
216        let channel_ustr = Ustr::from(channel.as_ref());
217        self.subscriptions.is_subscribed(&channel_ustr, symbol)
218    }
219
220    fn get_instrument(&self, symbol: &Ustr) -> Option<&InstrumentAny> {
221        self.instruments_cache.get(symbol)
222    }
223
224    /// Processes messages and commands, returning when stopped or stream ends.
225    pub async fn next(&mut self) -> Option<KrakenFuturesWsMessage> {
226        // First drain any pending messages from previous ticker processing
227        if let Some(msg) = self.pending_messages.pop_front() {
228            return Some(msg);
229        }
230
231        loop {
232            tokio::select! {
233                Some(cmd) = self.cmd_rx.recv() => {
234                    match cmd {
235                        HandlerCommand::SetClient(client) => {
236                            log::debug!("WebSocketClient received by futures handler");
237                            self.client = Some(client);
238                        }
239                        HandlerCommand::SubscribeTicker(symbol) => {
240                            self.send_subscribe(KrakenFuturesFeed::Ticker, &symbol).await;
241                        }
242                        HandlerCommand::UnsubscribeTicker(symbol) => {
243                            self.send_unsubscribe(KrakenFuturesFeed::Ticker, &symbol).await;
244                        }
245                        HandlerCommand::SubscribeTrade(symbol) => {
246                            self.send_subscribe(KrakenFuturesFeed::Trade, &symbol).await;
247                        }
248                        HandlerCommand::UnsubscribeTrade(symbol) => {
249                            self.send_unsubscribe(KrakenFuturesFeed::Trade, &symbol).await;
250                        }
251                        HandlerCommand::SubscribeBook(symbol) => {
252                            self.send_subscribe(KrakenFuturesFeed::Book, &symbol).await;
253                        }
254                        HandlerCommand::UnsubscribeBook(symbol) => {
255                            self.send_unsubscribe(KrakenFuturesFeed::Book, &symbol).await;
256                        }
257                        HandlerCommand::Disconnect => {
258                            log::debug!("Disconnect command received");
259                            if let Some(client) = self.client.take() {
260                                client.disconnect().await;
261                            }
262                            return None;
263                        }
264                        HandlerCommand::InitializeInstruments(instruments) => {
265                            for inst in instruments {
266                                self.instruments_cache.insert(inst.raw_symbol().inner(), inst);
267                            }
268                            let count = self.instruments_cache.len();
269                            log::debug!("Initialized {count} instruments in futures handler cache");
270                        }
271                        HandlerCommand::UpdateInstrument(inst) => {
272                            self.instruments_cache.insert(inst.raw_symbol().inner(), inst);
273                        }
274                        HandlerCommand::SetAccountId(account_id) => {
275                            log::debug!("Setting account_id for futures handler: {account_id}");
276                            self.account_id = Some(account_id);
277                        }
278                        HandlerCommand::RequestChallenge { api_key, response_tx } => {
279                            log::debug!("Requesting challenge for authentication");
280                            self.pending_challenge_tx = Some(response_tx);
281                            self.send_challenge_request(&api_key).await;
282                        }
283                        HandlerCommand::SetAuthCredentials { api_key, original_challenge, signed_challenge } => {
284                            log::debug!("Setting auth credentials for futures handler");
285                            self.api_key = Some(api_key);
286                            self.original_challenge = Some(original_challenge);
287                            self.signed_challenge = Some(signed_challenge);
288                        }
289                        HandlerCommand::SubscribeOpenOrders => {
290                            self.send_private_subscribe(KrakenFuturesFeed::OpenOrders).await;
291                        }
292                        HandlerCommand::SubscribeFills => {
293                            self.send_private_subscribe(KrakenFuturesFeed::Fills).await;
294                        }
295                        HandlerCommand::CacheClientOrder {
296                            client_order_id,
297                            venue_order_id,
298                            instrument_id,
299                            trader_id,
300                            strategy_id,
301                        } => {
302                            let client_order_id_str = client_order_id.to_string();
303                            self.client_order_cache.insert(
304                                client_order_id_str.clone(),
305                                CachedOrderInfo {
306                                    instrument_id,
307                                    trader_id,
308                                    strategy_id,
309                                },
310                            );
311                            if let Some(venue_id) = venue_order_id {
312                                self.venue_order_cache
313                                    .insert(venue_id.to_string(), client_order_id_str);
314                            }
315                        }
316                    }
317                    continue;
318                }
319
320                msg = self.raw_rx.recv() => {
321                    let msg = match msg {
322                        Some(msg) => msg,
323                        None => {
324                            log::debug!("WebSocket stream closed");
325                            return None;
326                        }
327                    };
328
329                    if self.signal.load(Ordering::Relaxed) {
330                        log::debug!("Stop signal received");
331                        return None;
332                    }
333
334                    // Handle control frames first (borrow msg to avoid moving)
335                    match &msg {
336                        Message::Ping(data) => {
337                            let len = data.len();
338                            log::trace!("Received ping frame with {len} bytes");
339                            if let Some(client) = &self.client
340                                && let Err(e) = client.send_pong(data.to_vec()).await
341                            {
342                                log::warn!("Failed to send pong frame: {e}");
343                            }
344                            continue;
345                        }
346                        Message::Pong(_) => {
347                            log::trace!("Received pong");
348                            continue;
349                        }
350                        Message::Close(_) => {
351                            log::info!("WebSocket connection closed");
352                            return None;
353                        }
354                        Message::Frame(_) => {
355                            log::trace!("Received raw frame");
356                            continue;
357                        }
358                        _ => {}
359                    }
360
361                    // Extract text without allocation (Utf8Bytes derefs to &str)
362                    let text: &str = match &msg {
363                        Message::Text(text) => text,
364                        Message::Binary(data) => match std::str::from_utf8(data) {
365                            Ok(s) => s,
366                            Err(_) => continue,
367                        },
368                        _ => continue,
369                    };
370
371                    if text == RECONNECTED {
372                        log::info!("Received WebSocket reconnected signal");
373                        self.quote_cache.clear();
374                        return Some(KrakenFuturesWsMessage::Reconnected);
375                    }
376
377                    let ts_init = self.clock.get_time_ns();
378                    self.parse_message(text, ts_init);
379
380                    // Return first pending message if any were produced
381                    if let Some(msg) = self.pending_messages.pop_front() {
382                        return Some(msg);
383                    }
384
385                    continue;
386                }
387            }
388        }
389    }
390
391    async fn send_subscribe(&self, feed: KrakenFuturesFeed, symbol: &Symbol) {
392        if let Some(ref client) = self.client {
393            let feed_str = serde_json::to_string(&feed).unwrap_or_default();
394            let feed_str = feed_str.trim_matches('"');
395            let msg = format!(
396                r#"{{"event":"subscribe","feed":"{feed_str}","product_ids":["{symbol}"]}}"#
397            );
398            if let Err(e) = client.send_text(msg, None).await {
399                log::error!("Failed to send {feed:?} subscribe: {e}");
400            }
401        }
402    }
403
404    async fn send_unsubscribe(&self, feed: KrakenFuturesFeed, symbol: &Symbol) {
405        if let Some(ref client) = self.client {
406            let feed_str = serde_json::to_string(&feed).unwrap_or_default();
407            let feed_str = feed_str.trim_matches('"');
408            let msg = format!(
409                r#"{{"event":"unsubscribe","feed":"{feed_str}","product_ids":["{symbol}"]}}"#
410            );
411            if let Err(e) = client.send_text(msg, None).await {
412                log::error!("Failed to send {feed:?} unsubscribe: {e}");
413            }
414        }
415    }
416
417    async fn send_private_subscribe(&self, feed: KrakenFuturesFeed) {
418        let Some(ref client) = self.client else {
419            log::error!("Cannot subscribe to {feed:?}: no WebSocket client");
420            return;
421        };
422
423        let Some(ref api_key) = self.api_key else {
424            log::error!("Cannot subscribe to {feed:?}: no API key set");
425            return;
426        };
427
428        let Some(ref original_challenge) = self.original_challenge else {
429            log::error!("Cannot subscribe to {feed:?}: no challenge set");
430            return;
431        };
432
433        let Some(ref signed_challenge) = self.signed_challenge else {
434            log::error!("Cannot subscribe to {feed:?}: no signed challenge set");
435            return;
436        };
437
438        let request = KrakenFuturesPrivateSubscribeRequest {
439            event: KrakenFuturesEvent::Subscribe,
440            feed,
441            api_key: api_key.clone(),
442            original_challenge: original_challenge.clone(),
443            signed_challenge: signed_challenge.clone(),
444        };
445
446        let msg = match serde_json::to_string(&request) {
447            Ok(m) => m,
448            Err(e) => {
449                log::error!("Failed to serialize {feed:?} subscribe request: {e}");
450                return;
451            }
452        };
453
454        if let Err(e) = client.send_text(msg, None).await {
455            log::error!("Failed to send {feed:?} subscribe: {e}");
456        } else {
457            log::debug!("Sent private subscribe request for {feed:?}");
458        }
459    }
460
461    async fn send_challenge_request(&self, api_key: &str) {
462        let Some(ref client) = self.client else {
463            log::error!("Cannot request challenge: no WebSocket client");
464            return;
465        };
466
467        let request = KrakenFuturesChallengeRequest {
468            event: KrakenFuturesEvent::Challenge,
469            api_key: api_key.to_string(),
470        };
471
472        let msg = match serde_json::to_string(&request) {
473            Ok(m) => m,
474            Err(e) => {
475                log::error!("Failed to serialize challenge request: {e}");
476                return;
477            }
478        };
479
480        if let Err(e) = client.send_text(msg, None).await {
481            log::error!("Failed to send challenge request: {e}");
482        } else {
483            log::debug!("Sent challenge request for authentication");
484        }
485    }
486
487    fn parse_message(&mut self, text: &str, ts_init: UnixNanos) {
488        let value: Value = match serde_json::from_str(text) {
489            Ok(v) => v,
490            Err(e) => {
491                log::debug!("Failed to parse message as JSON: {e}");
492                return;
493            }
494        };
495
496        match classify_futures_message(&value) {
497            // Private feeds (execution)
498            KrakenFuturesMessageType::OpenOrdersSnapshot => {
499                log::debug!(
500                    "Skipping open_orders_snapshot (REST reconciliation handles initial state)"
501                );
502            }
503            KrakenFuturesMessageType::OpenOrdersCancel => {
504                self.handle_open_orders_cancel_value(value, ts_init);
505            }
506            KrakenFuturesMessageType::OpenOrdersDelta => {
507                self.handle_open_orders_delta_value(value, ts_init);
508            }
509            KrakenFuturesMessageType::FillsSnapshot => {
510                log::debug!("Skipping fills_snapshot (REST reconciliation handles initial state)");
511            }
512            KrakenFuturesMessageType::FillsDelta => {
513                self.handle_fills_delta_value(value, ts_init);
514            }
515            // Public feeds (market data)
516            KrakenFuturesMessageType::Ticker => {
517                self.handle_ticker_message_value(value, ts_init);
518            }
519            KrakenFuturesMessageType::TradeSnapshot => {
520                self.handle_trade_snapshot_value(value, ts_init);
521            }
522            KrakenFuturesMessageType::Trade => {
523                self.handle_trade_message_value(value, ts_init);
524            }
525            KrakenFuturesMessageType::BookSnapshot => {
526                self.handle_book_snapshot_value(value, ts_init);
527            }
528            KrakenFuturesMessageType::BookDelta => {
529                self.handle_book_delta_value(value, ts_init);
530            }
531            // Control messages
532            KrakenFuturesMessageType::Info => {
533                log::debug!("Received info message: {text}");
534            }
535            KrakenFuturesMessageType::Pong => {
536                log::trace!("Received pong response");
537            }
538            KrakenFuturesMessageType::Subscribed => {
539                log::debug!("Subscription confirmed: {text}");
540            }
541            KrakenFuturesMessageType::Unsubscribed => {
542                log::debug!("Unsubscription confirmed: {text}");
543            }
544            KrakenFuturesMessageType::Challenge => {
545                self.handle_challenge_response_value(value);
546            }
547            KrakenFuturesMessageType::Heartbeat => {
548                log::trace!("Heartbeat received");
549            }
550            KrakenFuturesMessageType::Unknown => {
551                log::debug!("Unhandled message: {text}");
552            }
553        }
554    }
555
556    fn handle_challenge_response_value(&mut self, value: Value) {
557        #[derive(Deserialize)]
558        struct ChallengeResponse {
559            message: String,
560        }
561
562        match serde_json::from_value::<ChallengeResponse>(value) {
563            Ok(response) => {
564                let len = response.message.len();
565                log::debug!("Challenge received, length: {len}");
566
567                if let Some(tx) = self.pending_challenge_tx.take() {
568                    if tx.send(response.message).is_err() {
569                        log::warn!("Failed to send challenge response - receiver dropped");
570                    }
571                } else {
572                    log::warn!("Received challenge but no pending request");
573                }
574            }
575            Err(e) => {
576                log::error!("Failed to parse challenge response: {e}");
577            }
578        }
579    }
580
581    fn emit_order_event(&mut self, event: ParsedOrderEvent) {
582        match event {
583            ParsedOrderEvent::Accepted(accepted) => {
584                self.pending_messages
585                    .push_back(KrakenFuturesWsMessage::OrderAccepted(accepted));
586            }
587            ParsedOrderEvent::Canceled(canceled) => {
588                self.pending_messages
589                    .push_back(KrakenFuturesWsMessage::OrderCanceled(canceled));
590            }
591            ParsedOrderEvent::Expired(expired) => {
592                self.pending_messages
593                    .push_back(KrakenFuturesWsMessage::OrderExpired(expired));
594            }
595            ParsedOrderEvent::Updated(updated) => {
596                self.pending_messages
597                    .push_back(KrakenFuturesWsMessage::OrderUpdated(updated));
598            }
599            ParsedOrderEvent::StatusOnly(report) => {
600                self.pending_messages
601                    .push_back(KrakenFuturesWsMessage::OrderStatusReport(report));
602            }
603        }
604    }
605
606    fn handle_ticker_message_value(&mut self, value: Value, ts_init: UnixNanos) {
607        let ticker = match serde_json::from_value::<KrakenFuturesTickerData>(value) {
608            Ok(t) => t,
609            Err(e) => {
610                log::debug!("Failed to parse ticker: {e}");
611                return;
612            }
613        };
614
615        let (instrument_id, price_precision) = {
616            let Some(instrument) = self.get_instrument(&ticker.product_id) else {
617                let product_id = &ticker.product_id;
618                log::debug!("Instrument not found for product_id: {product_id}");
619                return;
620            };
621            (instrument.id(), instrument.price_precision())
622        };
623
624        let ts_event = ticker
625            .time
626            .map_or(ts_init, |t| UnixNanos::from((t as u64) * 1_000_000));
627
628        let has_mark = self.is_subscribed(KrakenFuturesChannel::Mark, &ticker.product_id);
629        let has_index = self.is_subscribed(KrakenFuturesChannel::Index, &ticker.product_id);
630
631        if let Some(mark_price) = ticker.mark_price
632            && has_mark
633        {
634            let update = MarkPriceUpdate::new(
635                instrument_id,
636                Price::new(mark_price, price_precision),
637                ts_event,
638                ts_init,
639            );
640            self.pending_messages
641                .push_back(KrakenFuturesWsMessage::MarkPrice(update));
642        }
643
644        if let Some(index_price) = ticker.index
645            && has_index
646        {
647            let update = IndexPriceUpdate::new(
648                instrument_id,
649                Price::new(index_price, price_precision),
650                ts_event,
651                ts_init,
652            );
653            self.pending_messages
654                .push_back(KrakenFuturesWsMessage::IndexPrice(update));
655        }
656    }
657
658    fn handle_trade_message_value(&mut self, value: Value, ts_init: UnixNanos) {
659        let trade = match serde_json::from_value::<KrakenFuturesTradeData>(value) {
660            Ok(t) => t,
661            Err(e) => {
662                log::trace!("Failed to parse trade: {e}");
663                return;
664            }
665        };
666
667        if !self.is_subscribed(KrakenFuturesChannel::Trades, &trade.product_id) {
668            return;
669        }
670
671        let (instrument_id, price_precision, size_precision) = {
672            let Some(instrument) = self.get_instrument(&trade.product_id) else {
673                return;
674            };
675            (
676                instrument.id(),
677                instrument.price_precision(),
678                instrument.size_precision(),
679            )
680        };
681
682        let size = Quantity::new(trade.qty, size_precision);
683        if size.is_zero() {
684            let product_id = trade.product_id;
685            let raw_qty = trade.qty;
686            log::warn!("Skipping zero quantity trade for {product_id} (raw qty: {raw_qty})");
687            return;
688        }
689
690        let ts_event = UnixNanos::from((trade.time as u64) * 1_000_000);
691        let aggressor_side = match trade.side {
692            KrakenOrderSide::Buy => AggressorSide::Buyer,
693            KrakenOrderSide::Sell => AggressorSide::Seller,
694        };
695        let trade_id = trade.uid.unwrap_or_else(|| trade.seq.to_string());
696
697        let trade_tick = TradeTick::new(
698            instrument_id,
699            Price::new(trade.price, price_precision),
700            size,
701            aggressor_side,
702            TradeId::new(&trade_id),
703            ts_event,
704            ts_init,
705        );
706
707        self.pending_messages
708            .push_back(KrakenFuturesWsMessage::Trade(trade_tick));
709    }
710
711    fn handle_trade_snapshot_value(&mut self, value: Value, ts_init: UnixNanos) {
712        let snapshot = match serde_json::from_value::<KrakenFuturesTradeSnapshot>(value) {
713            Ok(s) => s,
714            Err(e) => {
715                log::trace!("Failed to parse trade snapshot: {e}");
716                return;
717            }
718        };
719
720        if !self.is_subscribed(KrakenFuturesChannel::Trades, &snapshot.product_id) {
721            return;
722        }
723
724        let (instrument_id, price_precision, size_precision) = {
725            let Some(instrument) = self.get_instrument(&snapshot.product_id) else {
726                return;
727            };
728            (
729                instrument.id(),
730                instrument.price_precision(),
731                instrument.size_precision(),
732            )
733        };
734
735        for trade in snapshot.trades {
736            let size = Quantity::new(trade.qty, size_precision);
737            if size.is_zero() {
738                let product_id = snapshot.product_id;
739                let raw_qty = trade.qty;
740                log::warn!(
741                    "Skipping zero quantity trade in snapshot for {product_id} (raw qty: {raw_qty})"
742                );
743                continue;
744            }
745
746            let ts_event = UnixNanos::from((trade.time as u64) * 1_000_000);
747            let aggressor_side = match trade.side {
748                KrakenOrderSide::Buy => AggressorSide::Buyer,
749                KrakenOrderSide::Sell => AggressorSide::Seller,
750            };
751            let trade_id = trade.uid.unwrap_or_else(|| trade.seq.to_string());
752
753            let trade_tick = TradeTick::new(
754                instrument_id,
755                Price::new(trade.price, price_precision),
756                size,
757                aggressor_side,
758                TradeId::new(&trade_id),
759                ts_event,
760                ts_init,
761            );
762
763            self.pending_messages
764                .push_back(KrakenFuturesWsMessage::Trade(trade_tick));
765        }
766    }
767
768    fn handle_book_snapshot_value(&mut self, value: Value, ts_init: UnixNanos) {
769        let snapshot = match serde_json::from_value::<KrakenFuturesBookSnapshot>(value) {
770            Ok(s) => s,
771            Err(e) => {
772                log::trace!("Failed to parse book snapshot: {e}");
773                return;
774            }
775        };
776
777        let has_book = self.is_subscribed(KrakenFuturesChannel::Book, &snapshot.product_id);
778        let has_quotes = self.is_subscribed(KrakenFuturesChannel::Quotes, &snapshot.product_id);
779
780        if !has_book && !has_quotes {
781            return;
782        }
783
784        let (instrument_id, price_precision, size_precision) = {
785            let Some(instrument) = self.get_instrument(&snapshot.product_id) else {
786                return;
787            };
788            (
789                instrument.id(),
790                instrument.price_precision(),
791                instrument.size_precision(),
792            )
793        };
794
795        let ts_event = UnixNanos::from((snapshot.timestamp as u64) * 1_000_000);
796
797        let best_bid = snapshot
798            .bids
799            .iter()
800            .filter(|l| l.qty > 0.0)
801            .max_by(|a, b| a.price.total_cmp(&b.price));
802        let best_ask = snapshot
803            .asks
804            .iter()
805            .filter(|l| l.qty > 0.0)
806            .min_by(|a, b| a.price.total_cmp(&b.price));
807
808        if has_quotes {
809            let bid_price = best_bid.map(|b| Price::new(b.price, price_precision));
810            let ask_price = best_ask.map(|a| Price::new(a.price, price_precision));
811            let bid_size = best_bid.map(|b| Quantity::new(b.qty, size_precision));
812            let ask_size = best_ask.map(|a| Quantity::new(a.qty, size_precision));
813
814            match self.quote_cache.process(
815                instrument_id,
816                bid_price,
817                ask_price,
818                bid_size,
819                ask_size,
820                ts_event,
821                ts_init,
822            ) {
823                Ok(quote) => {
824                    self.pending_messages
825                        .push_back(KrakenFuturesWsMessage::Quote(quote));
826                }
827                Err(e) => {
828                    log::trace!("Quote cache process error: {e}");
829                }
830            }
831        }
832
833        if has_book {
834            let mut deltas = Vec::with_capacity(snapshot.bids.len() + snapshot.asks.len() + 1);
835
836            deltas.push(OrderBookDelta::clear(
837                instrument_id,
838                snapshot.seq as u64,
839                ts_event,
840                ts_init,
841            ));
842
843            for bid in &snapshot.bids {
844                let size = Quantity::new(bid.qty, size_precision);
845                if size.is_zero() {
846                    continue;
847                }
848                let order = BookOrder::new(
849                    OrderSide::Buy,
850                    Price::new(bid.price, price_precision),
851                    size,
852                    0,
853                );
854                deltas.push(OrderBookDelta::new(
855                    instrument_id,
856                    BookAction::Add,
857                    order,
858                    0,
859                    snapshot.seq as u64,
860                    ts_event,
861                    ts_init,
862                ));
863            }
864
865            for ask in &snapshot.asks {
866                let size = Quantity::new(ask.qty, size_precision);
867                if size.is_zero() {
868                    continue;
869                }
870                let order = BookOrder::new(
871                    OrderSide::Sell,
872                    Price::new(ask.price, price_precision),
873                    size,
874                    0,
875                );
876                deltas.push(OrderBookDelta::new(
877                    instrument_id,
878                    BookAction::Add,
879                    order,
880                    0,
881                    snapshot.seq as u64,
882                    ts_event,
883                    ts_init,
884                ));
885            }
886
887            let book_deltas = OrderBookDeltas::new(instrument_id, deltas);
888            self.pending_messages
889                .push_back(KrakenFuturesWsMessage::BookDeltas(book_deltas));
890        }
891    }
892
893    fn handle_book_delta_value(&mut self, value: Value, ts_init: UnixNanos) {
894        let delta = match serde_json::from_value::<KrakenFuturesBookDelta>(value) {
895            Ok(d) => d,
896            Err(e) => {
897                log::trace!("Failed to parse book delta: {e}");
898                return;
899            }
900        };
901
902        let has_book = self.is_subscribed(KrakenFuturesChannel::Book, &delta.product_id);
903        let has_quotes = self.is_subscribed(KrakenFuturesChannel::Quotes, &delta.product_id);
904
905        if !has_book && !has_quotes {
906            return;
907        }
908
909        let Some(instrument) = self.get_instrument(&delta.product_id) else {
910            return;
911        };
912
913        let ts_event = UnixNanos::from((delta.timestamp as u64) * 1_000_000);
914        let instrument_id = instrument.id();
915        let price_precision = instrument.price_precision();
916        let size_precision = instrument.size_precision();
917
918        let side: OrderSide = delta.side.into();
919
920        if has_quotes && delta.qty > 0.0 {
921            let price = Price::new(delta.price, price_precision);
922            let size = Quantity::new(delta.qty, size_precision);
923
924            let (bid_price, ask_price, bid_size, ask_size) = match side {
925                OrderSide::Buy => (Some(price), None, Some(size), None),
926                OrderSide::Sell => (None, Some(price), None, Some(size)),
927                _ => (None, None, None, None),
928            };
929
930            if let Ok(quote) = self.quote_cache.process(
931                instrument_id,
932                bid_price,
933                ask_price,
934                bid_size,
935                ask_size,
936                ts_event,
937                ts_init,
938            ) {
939                self.pending_messages
940                    .push_back(KrakenFuturesWsMessage::Quote(quote));
941            }
942        }
943
944        if has_book {
945            let size = Quantity::new(delta.qty, size_precision);
946            let action = if size.is_zero() {
947                BookAction::Delete
948            } else {
949                BookAction::Update
950            };
951
952            let order = BookOrder::new(side, Price::new(delta.price, price_precision), size, 0);
953
954            let book_delta = OrderBookDelta::new(
955                instrument_id,
956                action,
957                order,
958                0,
959                delta.seq as u64,
960                ts_event,
961                ts_init,
962            );
963
964            let book_deltas = OrderBookDeltas::new(instrument_id, vec![book_delta]);
965            self.pending_messages
966                .push_back(KrakenFuturesWsMessage::BookDeltas(book_deltas));
967        }
968    }
969
970    fn handle_open_orders_delta_value(&mut self, value: Value, ts_init: UnixNanos) {
971        let delta = match serde_json::from_value::<KrakenFuturesOpenOrdersDelta>(value) {
972            Ok(d) => d,
973            Err(e) => {
974                log::error!("Failed to parse open_orders delta: {e}");
975                return;
976            }
977        };
978
979        log::debug!(
980            "Received open_orders delta: order_id={}, is_cancel={}, reason={:?}",
981            delta.order.order_id,
982            delta.is_cancel,
983            delta.reason
984        );
985
986        if let Some(event) = self.parse_order_event(
987            &delta.order,
988            ts_init,
989            delta.is_cancel,
990            delta.reason.as_deref(),
991        ) {
992            self.emit_order_event(event);
993        }
994    }
995
996    fn handle_open_orders_cancel_value(&mut self, value: Value, ts_init: UnixNanos) {
997        // Already classified - we know it's a cancel with is_cancel=true and no "order" field
998        // Check if this is a fill-related cancel (skip those - fills feed handles them)
999        if let Some(reason) = value.get("reason").and_then(|r| r.as_str())
1000            && (reason == "full_fill" || reason == "partial_fill")
1001        {
1002            log::debug!(
1003                "Skipping open_orders cancel for fill (handled by fills feed): reason={reason}"
1004            );
1005            return;
1006        }
1007
1008        let cancel = match serde_json::from_value::<KrakenFuturesOpenOrdersCancel>(value) {
1009            Ok(c) => c,
1010            Err(e) => {
1011                log::error!("Failed to parse open_orders cancel: {e}");
1012                return;
1013            }
1014        };
1015
1016        log::debug!(
1017            "Received open_orders cancel: order_id={}, cli_ord_id={:?}, reason={:?}",
1018            cancel.order_id,
1019            cancel.cli_ord_id,
1020            cancel.reason
1021        );
1022
1023        let Some(account_id) = self.account_id else {
1024            log::warn!("Cannot process cancel: account_id not set");
1025            return;
1026        };
1027
1028        let (client_order_id, info) = if let Some(cli_ord_id) = cancel.cli_ord_id.as_ref() {
1029            if let Some(info) = self.client_order_cache.get(cli_ord_id) {
1030                (ClientOrderId::new(cli_ord_id), info.clone())
1031            } else if let Some(mapped_cli_ord_id) = self.venue_order_cache.get(&cancel.order_id) {
1032                if let Some(info) = self.client_order_cache.get(mapped_cli_ord_id) {
1033                    (ClientOrderId::new(mapped_cli_ord_id), info.clone())
1034                } else {
1035                    log::debug!(
1036                        "Cancel received for unknown order (not in cache): \
1037                        order_id={}, cli_ord_id={cli_ord_id}",
1038                        cancel.order_id
1039                    );
1040                    return;
1041                }
1042            } else {
1043                log::debug!(
1044                    "Cancel received for unknown order (not in cache): \
1045                    order_id={}, cli_ord_id={cli_ord_id}",
1046                    cancel.order_id
1047                );
1048                return;
1049            }
1050        } else if let Some(mapped_cli_ord_id) = self.venue_order_cache.get(&cancel.order_id) {
1051            if let Some(info) = self.client_order_cache.get(mapped_cli_ord_id) {
1052                (ClientOrderId::new(mapped_cli_ord_id), info.clone())
1053            } else {
1054                log::debug!(
1055                    "Cancel received but mapped order not in cache: order_id={}",
1056                    cancel.order_id
1057                );
1058                return;
1059            }
1060        } else {
1061            log::debug!(
1062                "Cancel received without cli_ord_id and no venue mapping (external order): \
1063                order_id={}",
1064                cancel.order_id
1065            );
1066            return;
1067        };
1068
1069        let venue_order_id = VenueOrderId::new(&cancel.order_id);
1070
1071        let canceled = OrderCanceled::new(
1072            info.trader_id,
1073            info.strategy_id,
1074            info.instrument_id,
1075            client_order_id,
1076            UUID4::new(),
1077            ts_init,
1078            ts_init,
1079            false,
1080            Some(venue_order_id),
1081            Some(account_id),
1082        );
1083
1084        self.pending_messages
1085            .push_back(KrakenFuturesWsMessage::OrderCanceled(canceled));
1086    }
1087
1088    fn handle_fills_delta_value(&mut self, value: Value, ts_init: UnixNanos) {
1089        let delta = match serde_json::from_value::<KrakenFuturesFillsDelta>(value) {
1090            Ok(d) => d,
1091            Err(e) => {
1092                log::error!("Failed to parse fills delta: {e}");
1093                return;
1094            }
1095        };
1096
1097        log::debug!("Received fills delta: fill_count={}", delta.fills.len());
1098
1099        for fill in &delta.fills {
1100            log::debug!(
1101                "Processing fill: fill_id={}, order_id={}",
1102                fill.fill_id,
1103                fill.order_id
1104            );
1105
1106            if let Some(report) = self.parse_fill_to_report(fill, ts_init) {
1107                self.pending_messages
1108                    .push_back(KrakenFuturesWsMessage::FillReport(Box::new(report)));
1109            }
1110        }
1111    }
1112
1113    /// Parses a Kraken Futures order message into a proper order event.
1114    ///
1115    /// Returns the appropriate event type based on order status:
1116    /// - New orders with no fills -> `OrderAccepted`
1117    /// - Canceled orders -> `OrderCanceled` or `OrderExpired` (based on reason)
1118    /// - Orders without cached info -> `StatusOnly` (for reconciliation)
1119    fn parse_order_event(
1120        &self,
1121        order: &KrakenFuturesOpenOrder,
1122        ts_init: UnixNanos,
1123        is_cancel: bool,
1124        cancel_reason: Option<&str>,
1125    ) -> Option<ParsedOrderEvent> {
1126        let Some(account_id) = self.account_id else {
1127            log::warn!("Cannot process order: account_id not set");
1128            return None;
1129        };
1130
1131        let instrument = self
1132            .instruments_cache
1133            .get(&Ustr::from(order.instrument.as_str()))?;
1134
1135        let instrument_id = instrument.id();
1136
1137        if order.qty <= 0.0 {
1138            log::warn!(
1139                "Skipping order with invalid quantity: order_id={}, qty={}",
1140                order.order_id,
1141                order.qty
1142            );
1143            return None;
1144        }
1145
1146        let ts_event = UnixNanos::from((order.last_update_time as u64) * 1_000_000);
1147        let venue_order_id = VenueOrderId::new(&order.order_id);
1148
1149        let client_order_id = order
1150            .cli_ord_id
1151            .as_ref()
1152            .map(|s| ClientOrderId::new(s.as_str()));
1153
1154        let cached_info = order
1155            .cli_ord_id
1156            .as_ref()
1157            .and_then(|id| self.client_order_cache.get(id));
1158
1159        // External orders or snapshots fall back to OrderStatusReport for reconciliation
1160        let Some(info) = cached_info else {
1161            return self
1162                .parse_order_to_status_report(order, ts_init, is_cancel)
1163                .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r)));
1164        };
1165
1166        let client_order_id = client_order_id.expect("client_order_id should exist if cached");
1167
1168        let status = if is_cancel {
1169            OrderStatus::Canceled
1170        } else if order.filled >= order.qty {
1171            OrderStatus::Filled
1172        } else if order.filled > 0.0 {
1173            OrderStatus::PartiallyFilled
1174        } else {
1175            OrderStatus::Accepted
1176        };
1177
1178        match status {
1179            OrderStatus::Accepted => Some(ParsedOrderEvent::Accepted(OrderAccepted::new(
1180                info.trader_id,
1181                info.strategy_id,
1182                instrument_id,
1183                client_order_id,
1184                venue_order_id,
1185                account_id,
1186                UUID4::new(),
1187                ts_event,
1188                ts_init,
1189                false,
1190            ))),
1191            OrderStatus::Canceled => {
1192                // Detect expiry by cancel reason keywords
1193                let is_expired = cancel_reason.is_some_and(|r| {
1194                    let r_lower = r.to_lowercase();
1195                    r_lower.contains("expir")
1196                        || r_lower.contains("gtd")
1197                        || r_lower.contains("timeout")
1198                });
1199
1200                if is_expired {
1201                    Some(ParsedOrderEvent::Expired(OrderExpired::new(
1202                        info.trader_id,
1203                        info.strategy_id,
1204                        instrument_id,
1205                        client_order_id,
1206                        UUID4::new(),
1207                        ts_event,
1208                        ts_init,
1209                        false,
1210                        Some(venue_order_id),
1211                        Some(account_id),
1212                    )))
1213                } else {
1214                    Some(ParsedOrderEvent::Canceled(OrderCanceled::new(
1215                        info.trader_id,
1216                        info.strategy_id,
1217                        instrument_id,
1218                        client_order_id,
1219                        UUID4::new(),
1220                        ts_event,
1221                        ts_init,
1222                        false,
1223                        Some(venue_order_id),
1224                        Some(account_id),
1225                    )))
1226                }
1227            }
1228
1229            // Fill events are handled separately via the fills feed
1230            OrderStatus::PartiallyFilled | OrderStatus::Filled => self
1231                .parse_order_to_status_report(order, ts_init, is_cancel)
1232                .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r))),
1233            _ => self
1234                .parse_order_to_status_report(order, ts_init, is_cancel)
1235                .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r))),
1236        }
1237    }
1238
1239    /// Parses a Kraken Futures order into an `OrderStatusReport`.
1240    ///
1241    /// Used for snapshots (reconciliation) and external orders.
1242    fn parse_order_to_status_report(
1243        &self,
1244        order: &KrakenFuturesOpenOrder,
1245        ts_init: UnixNanos,
1246        is_cancel: bool,
1247    ) -> Option<OrderStatusReport> {
1248        let Some(account_id) = self.account_id else {
1249            log::warn!("Cannot process order: account_id not set");
1250            return None;
1251        };
1252
1253        let instrument = self
1254            .instruments_cache
1255            .get(&Ustr::from(order.instrument.as_str()))?;
1256
1257        let instrument_id = instrument.id();
1258        let size_precision = instrument.size_precision();
1259
1260        let side = if order.direction == 0 {
1261            OrderSide::Buy
1262        } else {
1263            OrderSide::Sell
1264        };
1265
1266        let order_type = match order.order_type.as_str() {
1267            "limit" | "lmt" => OrderType::Limit,
1268            "stop" | "stp" => OrderType::StopLimit,
1269            "take_profit" => OrderType::LimitIfTouched,
1270            "market" | "mkt" => OrderType::Market,
1271            _ => OrderType::Limit,
1272        };
1273
1274        let status = if is_cancel {
1275            OrderStatus::Canceled
1276        } else if order.filled >= order.qty {
1277            OrderStatus::Filled
1278        } else if order.filled > 0.0 {
1279            OrderStatus::PartiallyFilled
1280        } else {
1281            OrderStatus::Accepted
1282        };
1283
1284        if order.qty <= 0.0 {
1285            log::warn!(
1286                "Skipping order with invalid quantity: order_id={}, qty={}",
1287                order.order_id,
1288                order.qty
1289            );
1290            return None;
1291        }
1292
1293        let ts_event = UnixNanos::from((order.last_update_time as u64) * 1_000_000);
1294
1295        let client_order_id = order
1296            .cli_ord_id
1297            .as_ref()
1298            .map(|s| ClientOrderId::new(s.as_str()));
1299
1300        let filled_qty = if order.filled <= 0.0 {
1301            Quantity::zero(size_precision)
1302        } else {
1303            Quantity::new(order.filled, size_precision)
1304        };
1305
1306        Some(OrderStatusReport::new(
1307            account_id,
1308            instrument_id,
1309            client_order_id,
1310            VenueOrderId::new(&order.order_id),
1311            side,
1312            order_type,
1313            TimeInForce::Gtc,
1314            status,
1315            Quantity::new(order.qty, size_precision),
1316            filled_qty,
1317            ts_event, // ts_accepted
1318            ts_event, // ts_last
1319            ts_init,
1320            Some(UUID4::new()),
1321        ))
1322    }
1323
1324    fn parse_fill_to_report(
1325        &self,
1326        fill: &KrakenFuturesFill,
1327        ts_init: UnixNanos,
1328    ) -> Option<FillReport> {
1329        let Some(account_id) = self.account_id else {
1330            log::warn!("Cannot process fill: account_id not set");
1331            return None;
1332        };
1333
1334        // Resolve instrument: try message field first, then fall back to cache
1335        let instrument = if let Some(ref symbol) = fill.instrument {
1336            self.instruments_cache.get(symbol).cloned()
1337        } else if let Some(ref cli_ord_id) = fill.cli_ord_id {
1338            // Fall back to client order cache
1339            self.client_order_cache.get(cli_ord_id).and_then(|info| {
1340                self.instruments_cache
1341                    .iter()
1342                    .find(|(_, inst)| inst.id() == info.instrument_id)
1343                    .map(|(_, inst)| inst.clone())
1344            })
1345        } else {
1346            None
1347        };
1348
1349        let Some(instrument) = instrument else {
1350            log::warn!(
1351                "Cannot resolve instrument for fill: fill_id={}, order_id={}, cli_ord_id={:?}",
1352                fill.fill_id,
1353                fill.order_id,
1354                fill.cli_ord_id
1355            );
1356            return None;
1357        };
1358
1359        let instrument_id = instrument.id();
1360        let price_precision = instrument.price_precision();
1361        let size_precision = instrument.size_precision();
1362
1363        if fill.qty <= 0.0 {
1364            log::warn!(
1365                "Skipping fill with invalid quantity: fill_id={}, qty={}",
1366                fill.fill_id,
1367                fill.qty
1368            );
1369            return None;
1370        }
1371
1372        let side = if fill.buy {
1373            OrderSide::Buy
1374        } else {
1375            OrderSide::Sell
1376        };
1377
1378        let ts_event = UnixNanos::from((fill.time as u64) * 1_000_000);
1379
1380        let client_order_id = fill
1381            .cli_ord_id
1382            .as_ref()
1383            .map(|s| ClientOrderId::new(s.as_str()));
1384
1385        let commission = Money::new(fill.fee_paid.unwrap_or(0.0), instrument.quote_currency());
1386
1387        Some(FillReport::new(
1388            account_id,
1389            instrument_id,
1390            VenueOrderId::new(&fill.order_id),
1391            TradeId::new(&fill.fill_id),
1392            side,
1393            Quantity::new(fill.qty, size_precision),
1394            Price::new(fill.price, price_precision),
1395            commission,
1396            LiquiditySide::NoLiquiditySide, // Not provided
1397            client_order_id,
1398            None, // venue_position_id
1399            ts_event,
1400            ts_init,
1401            Some(UUID4::new()),
1402        ))
1403    }
1404}
1405
1406#[cfg(test)]
1407mod tests {
1408    use nautilus_model::{
1409        instruments::{CryptoFuture, InstrumentAny},
1410        types::Currency,
1411    };
1412    use rstest::rstest;
1413
1414    use super::*;
1415
1416    fn create_test_handler() -> FuturesFeedHandler {
1417        let signal = Arc::new(AtomicBool::new(false));
1418        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
1419        let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
1420        let subscriptions = SubscriptionState::new(':');
1421
1422        FuturesFeedHandler::new(signal, cmd_rx, raw_rx, subscriptions)
1423    }
1424
1425    fn create_test_instrument() -> InstrumentAny {
1426        InstrumentAny::CryptoFuture(CryptoFuture::new(
1427            InstrumentId::from("PI_XBTUSD.KRAKEN"),
1428            Symbol::from("PI_XBTUSD"),
1429            Currency::BTC(),
1430            Currency::USD(),
1431            Currency::USD(),
1432            false,
1433            UnixNanos::default(),
1434            UnixNanos::default(),
1435            1, // price_precision
1436            0, // size_precision
1437            Price::from("0.5"),
1438            Quantity::from(1),
1439            None,
1440            None,
1441            None,
1442            None,
1443            None,
1444            None,
1445            None,
1446            None,
1447            None,
1448            None,
1449            None,
1450            None,
1451            UnixNanos::default(),
1452            UnixNanos::default(),
1453        ))
1454    }
1455
1456    #[rstest]
1457    fn test_book_snapshot_filters_zero_quantity_bids() {
1458        let mut handler = create_test_handler();
1459        let instrument = create_test_instrument();
1460        handler
1461            .instruments_cache
1462            .insert(Ustr::from("PI_XBTUSD"), instrument);
1463
1464        handler.subscriptions.mark_subscribe("book:PI_XBTUSD");
1465        handler.subscriptions.confirm_subscribe("book:PI_XBTUSD");
1466
1467        let json = include_str!("../../../test_data/ws_futures_book_snapshot_with_zero_qty.json");
1468        let ts_init = UnixNanos::from(1_000_000_000);
1469
1470        handler.parse_message(json, ts_init);
1471
1472        assert_eq!(handler.pending_messages.len(), 1);
1473
1474        let msg = handler.pending_messages.pop_front().unwrap();
1475        let KrakenFuturesWsMessage::BookDeltas(deltas) = msg else {
1476            panic!("Expected BookDeltas message");
1477        };
1478
1479        // Fixture has 3 bids (1 zero qty) + 2 asks (1 zero qty) = 3 valid + 1 clear = 4
1480        assert_eq!(deltas.deltas.len(), 4);
1481        assert_eq!(deltas.deltas[0].action, BookAction::Clear);
1482
1483        for delta in &deltas.deltas[1..] {
1484            assert!(
1485                !delta.order.size.is_zero(),
1486                "Found zero-quantity delta that should have been filtered: {delta:?}"
1487            );
1488        }
1489    }
1490
1491    #[rstest]
1492    fn test_book_snapshot_filters_zero_quantity_asks() {
1493        let mut handler = create_test_handler();
1494        let instrument = create_test_instrument();
1495        handler
1496            .instruments_cache
1497            .insert(Ustr::from("PI_XBTUSD"), instrument);
1498
1499        handler.subscriptions.mark_subscribe("book:PI_XBTUSD");
1500        handler.subscriptions.confirm_subscribe("book:PI_XBTUSD");
1501
1502        let json = include_str!("../../../test_data/ws_futures_book_snapshot_with_zero_qty.json");
1503        let ts_init = UnixNanos::from(1_000_000_000);
1504
1505        handler.parse_message(json, ts_init);
1506
1507        let msg = handler.pending_messages.pop_front().unwrap();
1508        let KrakenFuturesWsMessage::BookDeltas(deltas) = msg else {
1509            panic!("Expected BookDeltas message");
1510        };
1511
1512        // Only 1 ask should remain (the one with qty 2300, not the zero qty one)
1513        let sell_deltas: Vec<_> = deltas
1514            .deltas
1515            .iter()
1516            .filter(|d| d.order.side == OrderSide::Sell)
1517            .collect();
1518
1519        assert_eq!(sell_deltas.len(), 1);
1520        assert_eq!(sell_deltas[0].order.price.as_f64(), 34912.0);
1521    }
1522
1523    #[rstest]
1524    fn test_trade_filters_zero_quantity() {
1525        let mut handler = create_test_handler();
1526        let instrument = create_test_instrument();
1527        handler
1528            .instruments_cache
1529            .insert(Ustr::from("PI_XBTUSD"), instrument);
1530
1531        handler.subscriptions.mark_subscribe("trades:PI_XBTUSD");
1532        handler.subscriptions.confirm_subscribe("trades:PI_XBTUSD");
1533
1534        let json = r#"{
1535            "feed": "trade",
1536            "product_id": "PI_XBTUSD",
1537            "time": 1612269825817,
1538            "side": "buy",
1539            "qty": 0.0,
1540            "price": 34900.0,
1541            "seq": 12345
1542        }"#;
1543        let ts_init = UnixNanos::from(1_000_000_000);
1544
1545        handler.parse_message(json, ts_init);
1546
1547        assert!(
1548            handler.pending_messages.is_empty(),
1549            "Zero quantity trade should be filtered out"
1550        );
1551    }
1552}