nautilus_kraken/websocket/futures/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Kraken Futures.
17
18use std::{
19    collections::VecDeque,
20    fmt::Debug,
21    sync::{
22        Arc,
23        atomic::{AtomicBool, Ordering},
24    },
25};
26
27use ahash::AHashMap;
28use nautilus_common::cache::quote::QuoteCache;
29use nautilus_core::{AtomicTime, UUID4, UnixNanos, time::get_atomic_clock_realtime};
30use nautilus_model::{
31    data::{
32        BookOrder, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, TradeTick,
33    },
34    enums::{
35        AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType, TimeInForce,
36    },
37    events::{OrderAccepted, OrderCanceled, OrderExpired, OrderUpdated},
38    identifiers::{
39        AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TradeId, TraderId, VenueOrderId,
40    },
41    instruments::{Instrument, InstrumentAny},
42    reports::{FillReport, OrderStatusReport},
43    types::{Money, Price, Quantity},
44};
45use nautilus_network::{
46    RECONNECTED,
47    websocket::{SubscriptionState, WebSocketClient},
48};
49use serde::Deserialize;
50use serde_json::Value;
51use tokio_tungstenite::tungstenite::Message;
52use ustr::Ustr;
53
54use super::messages::{
55    KrakenFuturesBookDelta, KrakenFuturesBookSnapshot, KrakenFuturesChallengeRequest,
56    KrakenFuturesChannel, KrakenFuturesEvent, KrakenFuturesFeed, KrakenFuturesFill,
57    KrakenFuturesFillsDelta, KrakenFuturesMessageType, KrakenFuturesOpenOrder,
58    KrakenFuturesOpenOrdersCancel, KrakenFuturesOpenOrdersDelta,
59    KrakenFuturesPrivateSubscribeRequest, KrakenFuturesTickerData, KrakenFuturesTradeData,
60    KrakenFuturesTradeSnapshot, KrakenFuturesWsMessage, classify_futures_message,
61};
62use crate::common::enums::KrakenOrderSide;
63
64/// Parsed order event from a Kraken Futures WebSocket message.
65#[derive(Debug, Clone)]
66pub enum ParsedOrderEvent {
67    Accepted(OrderAccepted),
68    Canceled(OrderCanceled),
69    Expired(OrderExpired),
70    Updated(OrderUpdated),
71    StatusOnly(Box<OrderStatusReport>),
72}
73
74/// Cached order info for proper event generation.
75#[derive(Debug, Clone)]
76struct CachedOrderInfo {
77    instrument_id: InstrumentId,
78    trader_id: TraderId,
79    strategy_id: StrategyId,
80}
81
82/// Commands sent from the outer client to the inner message handler.
83#[allow(
84    clippy::large_enum_variant,
85    reason = "Commands are ephemeral and immediately consumed"
86)]
87pub enum HandlerCommand {
88    SetClient(WebSocketClient),
89    SubscribeTicker(Symbol),
90    UnsubscribeTicker(Symbol),
91    SubscribeTrade(Symbol),
92    UnsubscribeTrade(Symbol),
93    SubscribeBook(Symbol),
94    UnsubscribeBook(Symbol),
95    Disconnect,
96    InitializeInstruments(Vec<InstrumentAny>),
97    UpdateInstrument(InstrumentAny),
98    SetAccountId(AccountId),
99    RequestChallenge {
100        api_key: String,
101        response_tx: tokio::sync::oneshot::Sender<String>,
102    },
103    SetAuthCredentials {
104        api_key: String,
105        original_challenge: String,
106        signed_challenge: String,
107    },
108    SubscribeOpenOrders,
109    SubscribeFills,
110    CacheClientOrder {
111        client_order_id: ClientOrderId,
112        venue_order_id: Option<VenueOrderId>,
113        instrument_id: InstrumentId,
114        trader_id: TraderId,
115        strategy_id: StrategyId,
116    },
117}
118
119impl Debug for HandlerCommand {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        match self {
122            Self::SetClient(_) => f.debug_struct("SetClient").finish(),
123            Self::SubscribeTicker(s) => f.debug_tuple("SubscribeTicker").field(s).finish(),
124            Self::UnsubscribeTicker(s) => f.debug_tuple("UnsubscribeTicker").field(s).finish(),
125            Self::SubscribeTrade(s) => f.debug_tuple("SubscribeTrade").field(s).finish(),
126            Self::UnsubscribeTrade(s) => f.debug_tuple("UnsubscribeTrade").field(s).finish(),
127            Self::SubscribeBook(s) => f.debug_tuple("SubscribeBook").field(s).finish(),
128            Self::UnsubscribeBook(s) => f.debug_tuple("UnsubscribeBook").field(s).finish(),
129            Self::Disconnect => write!(f, "Disconnect"),
130            Self::InitializeInstruments(v) => f
131                .debug_tuple("InitializeInstruments")
132                .field(&v.len())
133                .finish(),
134            Self::UpdateInstrument(i) => f.debug_tuple("UpdateInstrument").field(&i.id()).finish(),
135            Self::SetAccountId(id) => f.debug_tuple("SetAccountId").field(id).finish(),
136            Self::RequestChallenge { api_key, .. } => {
137                let masked = &api_key[..4.min(api_key.len())];
138                f.debug_struct("RequestChallenge")
139                    .field("api_key", &format!("{masked}..."))
140                    .finish()
141            }
142            Self::SetAuthCredentials { api_key, .. } => {
143                let masked = &api_key[..4.min(api_key.len())];
144                f.debug_struct("SetAuthCredentials")
145                    .field("api_key", &format!("{masked}..."))
146                    .finish()
147            }
148            Self::SubscribeOpenOrders => write!(f, "SubscribeOpenOrders"),
149            Self::SubscribeFills => write!(f, "SubscribeFills"),
150            Self::CacheClientOrder {
151                client_order_id,
152                instrument_id,
153                ..
154            } => f
155                .debug_struct("CacheClientOrder")
156                .field("client_order_id", client_order_id)
157                .field("instrument_id", instrument_id)
158                .finish(),
159        }
160    }
161}
162
163/// WebSocket message handler for Kraken Futures.
164pub struct FuturesFeedHandler {
165    clock: &'static AtomicTime,
166    signal: Arc<AtomicBool>,
167    client: Option<WebSocketClient>,
168    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
169    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
170    subscriptions: SubscriptionState,
171    instruments_cache: AHashMap<Ustr, InstrumentAny>,
172    quote_cache: QuoteCache,
173    pending_messages: VecDeque<KrakenFuturesWsMessage>,
174    account_id: Option<AccountId>,
175    api_key: Option<String>,
176    original_challenge: Option<String>,
177    signed_challenge: Option<String>,
178    client_order_cache: AHashMap<String, CachedOrderInfo>,
179    venue_order_cache: AHashMap<String, String>,
180    pending_challenge_tx: Option<tokio::sync::oneshot::Sender<String>>,
181}
182
183impl FuturesFeedHandler {
184    /// Creates a new [`FuturesFeedHandler`] instance.
185    pub fn new(
186        signal: Arc<AtomicBool>,
187        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
188        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
189        subscriptions: SubscriptionState,
190    ) -> Self {
191        Self {
192            clock: get_atomic_clock_realtime(),
193            signal,
194            client: None,
195            cmd_rx,
196            raw_rx,
197            subscriptions,
198            instruments_cache: AHashMap::new(),
199            quote_cache: QuoteCache::new(),
200            pending_messages: VecDeque::new(),
201            account_id: None,
202            api_key: None,
203            original_challenge: None,
204            signed_challenge: None,
205            client_order_cache: AHashMap::new(),
206            venue_order_cache: AHashMap::new(),
207            pending_challenge_tx: None,
208        }
209    }
210
211    pub fn is_stopped(&self) -> bool {
212        self.signal.load(Ordering::Relaxed)
213    }
214
215    fn is_subscribed(&self, channel: KrakenFuturesChannel, symbol: &Ustr) -> bool {
216        let channel_ustr = Ustr::from(channel.as_ref());
217        self.subscriptions.is_subscribed(&channel_ustr, symbol)
218    }
219
220    fn get_instrument(&self, symbol: &Ustr) -> Option<&InstrumentAny> {
221        self.instruments_cache.get(symbol)
222    }
223
224    /// Processes messages and commands, returning when stopped or stream ends.
225    pub async fn next(&mut self) -> Option<KrakenFuturesWsMessage> {
226        // First drain any pending messages from previous ticker processing
227        if let Some(msg) = self.pending_messages.pop_front() {
228            return Some(msg);
229        }
230
231        loop {
232            tokio::select! {
233                Some(cmd) = self.cmd_rx.recv() => {
234                    match cmd {
235                        HandlerCommand::SetClient(client) => {
236                            tracing::debug!("WebSocketClient received by futures handler");
237                            self.client = Some(client);
238                        }
239                        HandlerCommand::SubscribeTicker(symbol) => {
240                            self.send_subscribe(KrakenFuturesFeed::Ticker, &symbol).await;
241                        }
242                        HandlerCommand::UnsubscribeTicker(symbol) => {
243                            self.send_unsubscribe(KrakenFuturesFeed::Ticker, &symbol).await;
244                        }
245                        HandlerCommand::SubscribeTrade(symbol) => {
246                            self.send_subscribe(KrakenFuturesFeed::Trade, &symbol).await;
247                        }
248                        HandlerCommand::UnsubscribeTrade(symbol) => {
249                            self.send_unsubscribe(KrakenFuturesFeed::Trade, &symbol).await;
250                        }
251                        HandlerCommand::SubscribeBook(symbol) => {
252                            self.send_subscribe(KrakenFuturesFeed::Book, &symbol).await;
253                        }
254                        HandlerCommand::UnsubscribeBook(symbol) => {
255                            self.send_unsubscribe(KrakenFuturesFeed::Book, &symbol).await;
256                        }
257                        HandlerCommand::Disconnect => {
258                            tracing::debug!("Disconnect command received");
259                            if let Some(client) = self.client.take() {
260                                client.disconnect().await;
261                            }
262                            return None;
263                        }
264                        HandlerCommand::InitializeInstruments(instruments) => {
265                            for inst in instruments {
266                                self.instruments_cache.insert(inst.raw_symbol().inner(), inst);
267                            }
268                            let count = self.instruments_cache.len();
269                            tracing::debug!("Initialized {count} instruments in futures handler cache");
270                        }
271                        HandlerCommand::UpdateInstrument(inst) => {
272                            self.instruments_cache.insert(inst.raw_symbol().inner(), inst);
273                        }
274                        HandlerCommand::SetAccountId(account_id) => {
275                            tracing::debug!("Setting account_id for futures handler: {account_id}");
276                            self.account_id = Some(account_id);
277                        }
278                        HandlerCommand::RequestChallenge { api_key, response_tx } => {
279                            tracing::debug!("Requesting challenge for authentication");
280                            self.pending_challenge_tx = Some(response_tx);
281                            self.send_challenge_request(&api_key).await;
282                        }
283                        HandlerCommand::SetAuthCredentials { api_key, original_challenge, signed_challenge } => {
284                            tracing::debug!("Setting auth credentials for futures handler");
285                            self.api_key = Some(api_key);
286                            self.original_challenge = Some(original_challenge);
287                            self.signed_challenge = Some(signed_challenge);
288                        }
289                        HandlerCommand::SubscribeOpenOrders => {
290                            self.send_private_subscribe(KrakenFuturesFeed::OpenOrders).await;
291                        }
292                        HandlerCommand::SubscribeFills => {
293                            self.send_private_subscribe(KrakenFuturesFeed::Fills).await;
294                        }
295                        HandlerCommand::CacheClientOrder {
296                            client_order_id,
297                            venue_order_id,
298                            instrument_id,
299                            trader_id,
300                            strategy_id,
301                        } => {
302                            let client_order_id_str = client_order_id.to_string();
303                            self.client_order_cache.insert(
304                                client_order_id_str.clone(),
305                                CachedOrderInfo {
306                                    instrument_id,
307                                    trader_id,
308                                    strategy_id,
309                                },
310                            );
311                            if let Some(venue_id) = venue_order_id {
312                                self.venue_order_cache
313                                    .insert(venue_id.to_string(), client_order_id_str);
314                            }
315                        }
316                    }
317                    continue;
318                }
319
320                msg = self.raw_rx.recv() => {
321                    let msg = match msg {
322                        Some(msg) => msg,
323                        None => {
324                            tracing::debug!("WebSocket stream closed");
325                            return None;
326                        }
327                    };
328
329                    if self.signal.load(Ordering::Relaxed) {
330                        tracing::debug!("Stop signal received");
331                        return None;
332                    }
333
334                    // Handle control frames first (borrow msg to avoid moving)
335                    match &msg {
336                        Message::Ping(data) => {
337                            let len = data.len();
338                            tracing::trace!("Received ping frame with {len} bytes");
339                            if let Some(client) = &self.client
340                                && let Err(e) = client.send_pong(data.to_vec()).await
341                            {
342                                tracing::warn!(error = %e, "Failed to send pong frame");
343                            }
344                            continue;
345                        }
346                        Message::Pong(_) => {
347                            tracing::trace!("Received pong");
348                            continue;
349                        }
350                        Message::Close(_) => {
351                            tracing::info!("WebSocket connection closed");
352                            return None;
353                        }
354                        Message::Frame(_) => {
355                            tracing::trace!("Received raw frame");
356                            continue;
357                        }
358                        _ => {}
359                    }
360
361                    // Extract text without allocation (Utf8Bytes derefs to &str)
362                    let text: &str = match &msg {
363                        Message::Text(text) => text,
364                        Message::Binary(data) => match std::str::from_utf8(data) {
365                            Ok(s) => s,
366                            Err(_) => continue,
367                        },
368                        _ => continue,
369                    };
370
371                    if text == RECONNECTED {
372                        tracing::info!("Received WebSocket reconnected signal");
373                        self.quote_cache.clear();
374                        return Some(KrakenFuturesWsMessage::Reconnected);
375                    }
376
377                    let ts_init = self.clock.get_time_ns();
378                    self.parse_message(text, ts_init);
379
380                    // Return first pending message if any were produced
381                    if let Some(msg) = self.pending_messages.pop_front() {
382                        return Some(msg);
383                    }
384
385                    continue;
386                }
387            }
388        }
389    }
390
391    async fn send_subscribe(&self, feed: KrakenFuturesFeed, symbol: &Symbol) {
392        if let Some(ref client) = self.client {
393            let feed_str = serde_json::to_string(&feed).unwrap_or_default();
394            let feed_str = feed_str.trim_matches('"');
395            let msg = format!(
396                r#"{{"event":"subscribe","feed":"{feed_str}","product_ids":["{symbol}"]}}"#
397            );
398            if let Err(e) = client.send_text(msg, None).await {
399                tracing::error!("Failed to send {feed:?} subscribe: {e}");
400            }
401        }
402    }
403
404    async fn send_unsubscribe(&self, feed: KrakenFuturesFeed, symbol: &Symbol) {
405        if let Some(ref client) = self.client {
406            let feed_str = serde_json::to_string(&feed).unwrap_or_default();
407            let feed_str = feed_str.trim_matches('"');
408            let msg = format!(
409                r#"{{"event":"unsubscribe","feed":"{feed_str}","product_ids":["{symbol}"]}}"#
410            );
411            if let Err(e) = client.send_text(msg, None).await {
412                tracing::error!("Failed to send {feed:?} unsubscribe: {e}");
413            }
414        }
415    }
416
417    async fn send_private_subscribe(&self, feed: KrakenFuturesFeed) {
418        let Some(ref client) = self.client else {
419            tracing::error!("Cannot subscribe to {feed:?}: no WebSocket client");
420            return;
421        };
422
423        let Some(ref api_key) = self.api_key else {
424            tracing::error!("Cannot subscribe to {feed:?}: no API key set");
425            return;
426        };
427
428        let Some(ref original_challenge) = self.original_challenge else {
429            tracing::error!("Cannot subscribe to {feed:?}: no challenge set");
430            return;
431        };
432
433        let Some(ref signed_challenge) = self.signed_challenge else {
434            tracing::error!("Cannot subscribe to {feed:?}: no signed challenge set");
435            return;
436        };
437
438        let request = KrakenFuturesPrivateSubscribeRequest {
439            event: KrakenFuturesEvent::Subscribe,
440            feed,
441            api_key: api_key.clone(),
442            original_challenge: original_challenge.clone(),
443            signed_challenge: signed_challenge.clone(),
444        };
445
446        let msg = match serde_json::to_string(&request) {
447            Ok(m) => m,
448            Err(e) => {
449                tracing::error!("Failed to serialize {feed:?} subscribe request: {e}");
450                return;
451            }
452        };
453
454        if let Err(e) = client.send_text(msg, None).await {
455            tracing::error!("Failed to send {feed:?} subscribe: {e}");
456        } else {
457            tracing::debug!("Sent private subscribe request for {feed:?}");
458        }
459    }
460
461    async fn send_challenge_request(&self, api_key: &str) {
462        let Some(ref client) = self.client else {
463            tracing::error!("Cannot request challenge: no WebSocket client");
464            return;
465        };
466
467        let request = KrakenFuturesChallengeRequest {
468            event: KrakenFuturesEvent::Challenge,
469            api_key: api_key.to_string(),
470        };
471
472        let msg = match serde_json::to_string(&request) {
473            Ok(m) => m,
474            Err(e) => {
475                tracing::error!("Failed to serialize challenge request: {e}");
476                return;
477            }
478        };
479
480        if let Err(e) = client.send_text(msg, None).await {
481            tracing::error!("Failed to send challenge request: {e}");
482        } else {
483            tracing::debug!("Sent challenge request for authentication");
484        }
485    }
486
487    fn parse_message(&mut self, text: &str, ts_init: UnixNanos) {
488        let value: Value = match serde_json::from_str(text) {
489            Ok(v) => v,
490            Err(e) => {
491                tracing::debug!("Failed to parse message as JSON: {e}");
492                return;
493            }
494        };
495
496        match classify_futures_message(&value) {
497            // Private feeds (execution)
498            KrakenFuturesMessageType::OpenOrdersSnapshot => {
499                tracing::debug!(
500                    "Skipping open_orders_snapshot (REST reconciliation handles initial state)"
501                );
502            }
503            KrakenFuturesMessageType::OpenOrdersCancel => {
504                self.handle_open_orders_cancel_value(value, ts_init);
505            }
506            KrakenFuturesMessageType::OpenOrdersDelta => {
507                self.handle_open_orders_delta_value(value, ts_init);
508            }
509            KrakenFuturesMessageType::FillsSnapshot => {
510                tracing::debug!(
511                    "Skipping fills_snapshot (REST reconciliation handles initial state)"
512                );
513            }
514            KrakenFuturesMessageType::FillsDelta => {
515                self.handle_fills_delta_value(value, ts_init);
516            }
517            // Public feeds (market data)
518            KrakenFuturesMessageType::Ticker => {
519                self.handle_ticker_message_value(value, ts_init);
520            }
521            KrakenFuturesMessageType::TradeSnapshot => {
522                self.handle_trade_snapshot_value(value, ts_init);
523            }
524            KrakenFuturesMessageType::Trade => {
525                self.handle_trade_message_value(value, ts_init);
526            }
527            KrakenFuturesMessageType::BookSnapshot => {
528                self.handle_book_snapshot_value(value, ts_init);
529            }
530            KrakenFuturesMessageType::BookDelta => {
531                self.handle_book_delta_value(value, ts_init);
532            }
533            // Control messages
534            KrakenFuturesMessageType::Info => {
535                tracing::debug!("Received info message: {text}");
536            }
537            KrakenFuturesMessageType::Pong => {
538                tracing::trace!("Received pong response");
539            }
540            KrakenFuturesMessageType::Subscribed => {
541                tracing::debug!("Subscription confirmed: {text}");
542            }
543            KrakenFuturesMessageType::Unsubscribed => {
544                tracing::debug!("Unsubscription confirmed: {text}");
545            }
546            KrakenFuturesMessageType::Challenge => {
547                self.handle_challenge_response_value(value);
548            }
549            KrakenFuturesMessageType::Heartbeat => {
550                tracing::trace!("Heartbeat received");
551            }
552            KrakenFuturesMessageType::Unknown => {
553                tracing::debug!("Unhandled message: {text}");
554            }
555        }
556    }
557
558    fn handle_challenge_response_value(&mut self, value: Value) {
559        #[derive(Deserialize)]
560        struct ChallengeResponse {
561            message: String,
562        }
563
564        match serde_json::from_value::<ChallengeResponse>(value) {
565            Ok(response) => {
566                let len = response.message.len();
567                tracing::debug!("Challenge received, length: {len}");
568
569                if let Some(tx) = self.pending_challenge_tx.take() {
570                    if tx.send(response.message).is_err() {
571                        tracing::warn!("Failed to send challenge response - receiver dropped");
572                    }
573                } else {
574                    tracing::warn!("Received challenge but no pending request");
575                }
576            }
577            Err(e) => {
578                tracing::error!("Failed to parse challenge response: {e}");
579            }
580        }
581    }
582
583    fn emit_order_event(&mut self, event: ParsedOrderEvent) {
584        match event {
585            ParsedOrderEvent::Accepted(accepted) => {
586                self.pending_messages
587                    .push_back(KrakenFuturesWsMessage::OrderAccepted(accepted));
588            }
589            ParsedOrderEvent::Canceled(canceled) => {
590                self.pending_messages
591                    .push_back(KrakenFuturesWsMessage::OrderCanceled(canceled));
592            }
593            ParsedOrderEvent::Expired(expired) => {
594                self.pending_messages
595                    .push_back(KrakenFuturesWsMessage::OrderExpired(expired));
596            }
597            ParsedOrderEvent::Updated(updated) => {
598                self.pending_messages
599                    .push_back(KrakenFuturesWsMessage::OrderUpdated(updated));
600            }
601            ParsedOrderEvent::StatusOnly(report) => {
602                self.pending_messages
603                    .push_back(KrakenFuturesWsMessage::OrderStatusReport(report));
604            }
605        }
606    }
607
608    fn handle_ticker_message_value(&mut self, value: Value, ts_init: UnixNanos) {
609        let ticker = match serde_json::from_value::<KrakenFuturesTickerData>(value) {
610            Ok(t) => t,
611            Err(e) => {
612                tracing::debug!("Failed to parse ticker: {e}");
613                return;
614            }
615        };
616
617        let (instrument_id, price_precision) = {
618            let Some(instrument) = self.get_instrument(&ticker.product_id) else {
619                let product_id = &ticker.product_id;
620                tracing::debug!("Instrument not found for product_id: {product_id}");
621                return;
622            };
623            (instrument.id(), instrument.price_precision())
624        };
625
626        let ts_event = ticker
627            .time
628            .map(|t| UnixNanos::from((t as u64) * 1_000_000))
629            .unwrap_or(ts_init);
630
631        let has_mark = self.is_subscribed(KrakenFuturesChannel::Mark, &ticker.product_id);
632        let has_index = self.is_subscribed(KrakenFuturesChannel::Index, &ticker.product_id);
633
634        if let Some(mark_price) = ticker.mark_price
635            && has_mark
636        {
637            let update = MarkPriceUpdate::new(
638                instrument_id,
639                Price::new(mark_price, price_precision),
640                ts_event,
641                ts_init,
642            );
643            self.pending_messages
644                .push_back(KrakenFuturesWsMessage::MarkPrice(update));
645        }
646
647        if let Some(index_price) = ticker.index
648            && has_index
649        {
650            let update = IndexPriceUpdate::new(
651                instrument_id,
652                Price::new(index_price, price_precision),
653                ts_event,
654                ts_init,
655            );
656            self.pending_messages
657                .push_back(KrakenFuturesWsMessage::IndexPrice(update));
658        }
659    }
660
661    fn handle_trade_message_value(&mut self, value: Value, ts_init: UnixNanos) {
662        let trade = match serde_json::from_value::<KrakenFuturesTradeData>(value) {
663            Ok(t) => t,
664            Err(e) => {
665                tracing::trace!("Failed to parse trade: {e}");
666                return;
667            }
668        };
669
670        if !self.is_subscribed(KrakenFuturesChannel::Trades, &trade.product_id) {
671            return;
672        }
673
674        let (instrument_id, price_precision, size_precision) = {
675            let Some(instrument) = self.get_instrument(&trade.product_id) else {
676                return;
677            };
678            (
679                instrument.id(),
680                instrument.price_precision(),
681                instrument.size_precision(),
682            )
683        };
684
685        let size = Quantity::new(trade.qty, size_precision);
686        if size.is_zero() {
687            let product_id = trade.product_id;
688            let raw_qty = trade.qty;
689            tracing::warn!("Skipping zero quantity trade for {product_id} (raw qty: {raw_qty})");
690            return;
691        }
692
693        let ts_event = UnixNanos::from((trade.time as u64) * 1_000_000);
694        let aggressor_side = match trade.side {
695            KrakenOrderSide::Buy => AggressorSide::Buyer,
696            KrakenOrderSide::Sell => AggressorSide::Seller,
697        };
698        let trade_id = trade.uid.unwrap_or_else(|| trade.seq.to_string());
699
700        let trade_tick = TradeTick::new(
701            instrument_id,
702            Price::new(trade.price, price_precision),
703            size,
704            aggressor_side,
705            TradeId::new(&trade_id),
706            ts_event,
707            ts_init,
708        );
709
710        self.pending_messages
711            .push_back(KrakenFuturesWsMessage::Trade(trade_tick));
712    }
713
714    fn handle_trade_snapshot_value(&mut self, value: Value, ts_init: UnixNanos) {
715        let snapshot = match serde_json::from_value::<KrakenFuturesTradeSnapshot>(value) {
716            Ok(s) => s,
717            Err(e) => {
718                tracing::trace!("Failed to parse trade snapshot: {e}");
719                return;
720            }
721        };
722
723        if !self.is_subscribed(KrakenFuturesChannel::Trades, &snapshot.product_id) {
724            return;
725        }
726
727        let (instrument_id, price_precision, size_precision) = {
728            let Some(instrument) = self.get_instrument(&snapshot.product_id) else {
729                return;
730            };
731            (
732                instrument.id(),
733                instrument.price_precision(),
734                instrument.size_precision(),
735            )
736        };
737
738        for trade in snapshot.trades {
739            let size = Quantity::new(trade.qty, size_precision);
740            if size.is_zero() {
741                let product_id = snapshot.product_id;
742                let raw_qty = trade.qty;
743                tracing::warn!(
744                    "Skipping zero quantity trade in snapshot for {product_id} (raw qty: {raw_qty})"
745                );
746                continue;
747            }
748
749            let ts_event = UnixNanos::from((trade.time as u64) * 1_000_000);
750            let aggressor_side = match trade.side {
751                KrakenOrderSide::Buy => AggressorSide::Buyer,
752                KrakenOrderSide::Sell => AggressorSide::Seller,
753            };
754            let trade_id = trade.uid.unwrap_or_else(|| trade.seq.to_string());
755
756            let trade_tick = TradeTick::new(
757                instrument_id,
758                Price::new(trade.price, price_precision),
759                size,
760                aggressor_side,
761                TradeId::new(&trade_id),
762                ts_event,
763                ts_init,
764            );
765
766            self.pending_messages
767                .push_back(KrakenFuturesWsMessage::Trade(trade_tick));
768        }
769    }
770
771    fn handle_book_snapshot_value(&mut self, value: Value, ts_init: UnixNanos) {
772        let snapshot = match serde_json::from_value::<KrakenFuturesBookSnapshot>(value) {
773            Ok(s) => s,
774            Err(e) => {
775                tracing::trace!("Failed to parse book snapshot: {e}");
776                return;
777            }
778        };
779
780        let has_book = self.is_subscribed(KrakenFuturesChannel::Book, &snapshot.product_id);
781        let has_quotes = self.is_subscribed(KrakenFuturesChannel::Quotes, &snapshot.product_id);
782
783        if !has_book && !has_quotes {
784            return;
785        }
786
787        let (instrument_id, price_precision, size_precision) = {
788            let Some(instrument) = self.get_instrument(&snapshot.product_id) else {
789                return;
790            };
791            (
792                instrument.id(),
793                instrument.price_precision(),
794                instrument.size_precision(),
795            )
796        };
797
798        let ts_event = UnixNanos::from((snapshot.timestamp as u64) * 1_000_000);
799
800        let best_bid = snapshot
801            .bids
802            .iter()
803            .filter(|l| l.qty > 0.0)
804            .max_by(|a, b| a.price.total_cmp(&b.price));
805        let best_ask = snapshot
806            .asks
807            .iter()
808            .filter(|l| l.qty > 0.0)
809            .min_by(|a, b| a.price.total_cmp(&b.price));
810
811        if has_quotes {
812            let bid_price = best_bid.map(|b| Price::new(b.price, price_precision));
813            let ask_price = best_ask.map(|a| Price::new(a.price, price_precision));
814            let bid_size = best_bid.map(|b| Quantity::new(b.qty, size_precision));
815            let ask_size = best_ask.map(|a| Quantity::new(a.qty, size_precision));
816
817            match self.quote_cache.process(
818                instrument_id,
819                bid_price,
820                ask_price,
821                bid_size,
822                ask_size,
823                ts_event,
824                ts_init,
825            ) {
826                Ok(quote) => {
827                    self.pending_messages
828                        .push_back(KrakenFuturesWsMessage::Quote(quote));
829                }
830                Err(e) => {
831                    tracing::trace!("Quote cache process error: {e}");
832                }
833            }
834        }
835
836        if has_book {
837            let mut deltas = Vec::with_capacity(snapshot.bids.len() + snapshot.asks.len() + 1);
838
839            deltas.push(OrderBookDelta::clear(
840                instrument_id,
841                snapshot.seq as u64,
842                ts_event,
843                ts_init,
844            ));
845
846            for bid in &snapshot.bids {
847                let size = Quantity::new(bid.qty, size_precision);
848                if size.is_zero() {
849                    continue;
850                }
851                let order = BookOrder::new(
852                    OrderSide::Buy,
853                    Price::new(bid.price, price_precision),
854                    size,
855                    0,
856                );
857                deltas.push(OrderBookDelta::new(
858                    instrument_id,
859                    BookAction::Add,
860                    order,
861                    0,
862                    snapshot.seq as u64,
863                    ts_event,
864                    ts_init,
865                ));
866            }
867
868            for ask in &snapshot.asks {
869                let size = Quantity::new(ask.qty, size_precision);
870                if size.is_zero() {
871                    continue;
872                }
873                let order = BookOrder::new(
874                    OrderSide::Sell,
875                    Price::new(ask.price, price_precision),
876                    size,
877                    0,
878                );
879                deltas.push(OrderBookDelta::new(
880                    instrument_id,
881                    BookAction::Add,
882                    order,
883                    0,
884                    snapshot.seq as u64,
885                    ts_event,
886                    ts_init,
887                ));
888            }
889
890            let book_deltas = OrderBookDeltas::new(instrument_id, deltas);
891            self.pending_messages
892                .push_back(KrakenFuturesWsMessage::BookDeltas(book_deltas));
893        }
894    }
895
896    fn handle_book_delta_value(&mut self, value: Value, ts_init: UnixNanos) {
897        let delta = match serde_json::from_value::<KrakenFuturesBookDelta>(value) {
898            Ok(d) => d,
899            Err(e) => {
900                tracing::trace!("Failed to parse book delta: {e}");
901                return;
902            }
903        };
904
905        let has_book = self.is_subscribed(KrakenFuturesChannel::Book, &delta.product_id);
906        let has_quotes = self.is_subscribed(KrakenFuturesChannel::Quotes, &delta.product_id);
907
908        if !has_book && !has_quotes {
909            return;
910        }
911
912        let Some(instrument) = self.get_instrument(&delta.product_id) else {
913            return;
914        };
915
916        let ts_event = UnixNanos::from((delta.timestamp as u64) * 1_000_000);
917        let instrument_id = instrument.id();
918        let price_precision = instrument.price_precision();
919        let size_precision = instrument.size_precision();
920
921        let side: OrderSide = delta.side.into();
922
923        if has_quotes && delta.qty > 0.0 {
924            let price = Price::new(delta.price, price_precision);
925            let size = Quantity::new(delta.qty, size_precision);
926
927            let (bid_price, ask_price, bid_size, ask_size) = match side {
928                OrderSide::Buy => (Some(price), None, Some(size), None),
929                OrderSide::Sell => (None, Some(price), None, Some(size)),
930                _ => (None, None, None, None),
931            };
932
933            if let Ok(quote) = self.quote_cache.process(
934                instrument_id,
935                bid_price,
936                ask_price,
937                bid_size,
938                ask_size,
939                ts_event,
940                ts_init,
941            ) {
942                self.pending_messages
943                    .push_back(KrakenFuturesWsMessage::Quote(quote));
944            }
945        }
946
947        if has_book {
948            let size = Quantity::new(delta.qty, size_precision);
949            let action = if size.is_zero() {
950                BookAction::Delete
951            } else {
952                BookAction::Update
953            };
954
955            let order = BookOrder::new(side, Price::new(delta.price, price_precision), size, 0);
956
957            let book_delta = OrderBookDelta::new(
958                instrument_id,
959                action,
960                order,
961                0,
962                delta.seq as u64,
963                ts_event,
964                ts_init,
965            );
966
967            let book_deltas = OrderBookDeltas::new(instrument_id, vec![book_delta]);
968            self.pending_messages
969                .push_back(KrakenFuturesWsMessage::BookDeltas(book_deltas));
970        }
971    }
972
973    fn handle_open_orders_delta_value(&mut self, value: Value, ts_init: UnixNanos) {
974        let delta = match serde_json::from_value::<KrakenFuturesOpenOrdersDelta>(value) {
975            Ok(d) => d,
976            Err(e) => {
977                tracing::error!("Failed to parse open_orders delta: {e}");
978                return;
979            }
980        };
981
982        tracing::debug!(
983            order_id = %delta.order.order_id,
984            is_cancel = delta.is_cancel,
985            reason = ?delta.reason,
986            "Received open_orders delta"
987        );
988
989        if let Some(event) = self.parse_order_event(
990            &delta.order,
991            ts_init,
992            delta.is_cancel,
993            delta.reason.as_deref(),
994        ) {
995            self.emit_order_event(event);
996        }
997    }
998
999    fn handle_open_orders_cancel_value(&mut self, value: Value, ts_init: UnixNanos) {
1000        // Already classified - we know it's a cancel with is_cancel=true and no "order" field
1001        // Check if this is a fill-related cancel (skip those - fills feed handles them)
1002        if let Some(reason) = value.get("reason").and_then(|r| r.as_str())
1003            && (reason == "full_fill" || reason == "partial_fill")
1004        {
1005            tracing::debug!(
1006                reason = %reason,
1007                "Skipping open_orders cancel for fill (handled by fills feed)"
1008            );
1009            return;
1010        }
1011
1012        let cancel = match serde_json::from_value::<KrakenFuturesOpenOrdersCancel>(value) {
1013            Ok(c) => c,
1014            Err(e) => {
1015                tracing::error!("Failed to parse open_orders cancel: {e}");
1016                return;
1017            }
1018        };
1019
1020        tracing::debug!(
1021            order_id = %cancel.order_id,
1022            cli_ord_id = ?cancel.cli_ord_id,
1023            reason = ?cancel.reason,
1024            "Received open_orders cancel"
1025        );
1026
1027        let Some(account_id) = self.account_id else {
1028            tracing::warn!("Cannot process cancel: account_id not set");
1029            return;
1030        };
1031
1032        let (client_order_id, info) = if let Some(cli_ord_id) = cancel.cli_ord_id.as_ref() {
1033            if let Some(info) = self.client_order_cache.get(cli_ord_id) {
1034                (ClientOrderId::new(cli_ord_id), info.clone())
1035            } else if let Some(mapped_cli_ord_id) = self.venue_order_cache.get(&cancel.order_id) {
1036                if let Some(info) = self.client_order_cache.get(mapped_cli_ord_id) {
1037                    (ClientOrderId::new(mapped_cli_ord_id), info.clone())
1038                } else {
1039                    tracing::debug!(
1040                        order_id = %cancel.order_id,
1041                        cli_ord_id = %cli_ord_id,
1042                        "Cancel received for unknown order (not in cache)"
1043                    );
1044                    return;
1045                }
1046            } else {
1047                tracing::debug!(
1048                    order_id = %cancel.order_id,
1049                    cli_ord_id = %cli_ord_id,
1050                    "Cancel received for unknown order (not in cache)"
1051                );
1052                return;
1053            }
1054        } else if let Some(mapped_cli_ord_id) = self.venue_order_cache.get(&cancel.order_id) {
1055            if let Some(info) = self.client_order_cache.get(mapped_cli_ord_id) {
1056                (ClientOrderId::new(mapped_cli_ord_id), info.clone())
1057            } else {
1058                tracing::debug!(
1059                    order_id = %cancel.order_id,
1060                    "Cancel received but mapped order not in cache"
1061                );
1062                return;
1063            }
1064        } else {
1065            tracing::debug!(
1066                order_id = %cancel.order_id,
1067                "Cancel received without cli_ord_id and no venue mapping (external order)"
1068            );
1069            return;
1070        };
1071
1072        let venue_order_id = VenueOrderId::new(&cancel.order_id);
1073
1074        let canceled = OrderCanceled::new(
1075            info.trader_id,
1076            info.strategy_id,
1077            info.instrument_id,
1078            client_order_id,
1079            UUID4::new(),
1080            ts_init,
1081            ts_init,
1082            false,
1083            Some(venue_order_id),
1084            Some(account_id),
1085        );
1086
1087        self.pending_messages
1088            .push_back(KrakenFuturesWsMessage::OrderCanceled(canceled));
1089    }
1090
1091    fn handle_fills_delta_value(&mut self, value: Value, ts_init: UnixNanos) {
1092        let delta = match serde_json::from_value::<KrakenFuturesFillsDelta>(value) {
1093            Ok(d) => d,
1094            Err(e) => {
1095                tracing::error!("Failed to parse fills delta: {e}");
1096                return;
1097            }
1098        };
1099
1100        tracing::debug!(fill_count = delta.fills.len(), "Received fills delta");
1101
1102        for fill in &delta.fills {
1103            tracing::debug!(
1104                fill_id = %fill.fill_id,
1105                order_id = %fill.order_id,
1106                "Processing fill"
1107            );
1108
1109            if let Some(report) = self.parse_fill_to_report(fill, ts_init) {
1110                self.pending_messages
1111                    .push_back(KrakenFuturesWsMessage::FillReport(Box::new(report)));
1112            }
1113        }
1114    }
1115
1116    /// Parses a Kraken Futures order message into a proper order event.
1117    ///
1118    /// Returns the appropriate event type based on order status:
1119    /// - New orders with no fills -> `OrderAccepted`
1120    /// - Canceled orders -> `OrderCanceled` or `OrderExpired` (based on reason)
1121    /// - Orders without cached info -> `StatusOnly` (for reconciliation)
1122    fn parse_order_event(
1123        &self,
1124        order: &KrakenFuturesOpenOrder,
1125        ts_init: UnixNanos,
1126        is_cancel: bool,
1127        cancel_reason: Option<&str>,
1128    ) -> Option<ParsedOrderEvent> {
1129        let Some(account_id) = self.account_id else {
1130            tracing::warn!("Cannot process order: account_id not set");
1131            return None;
1132        };
1133
1134        let instrument = self
1135            .instruments_cache
1136            .get(&Ustr::from(order.instrument.as_str()))?;
1137
1138        let instrument_id = instrument.id();
1139
1140        if order.qty <= 0.0 {
1141            tracing::warn!(
1142                order_id = %order.order_id,
1143                "Skipping order with invalid quantity: {}",
1144                order.qty
1145            );
1146            return None;
1147        }
1148
1149        let ts_event = UnixNanos::from((order.last_update_time as u64) * 1_000_000);
1150        let venue_order_id = VenueOrderId::new(&order.order_id);
1151
1152        let client_order_id = order
1153            .cli_ord_id
1154            .as_ref()
1155            .map(|s| ClientOrderId::new(s.as_str()));
1156
1157        let cached_info = order
1158            .cli_ord_id
1159            .as_ref()
1160            .and_then(|id| self.client_order_cache.get(id));
1161
1162        // External orders or snapshots fall back to OrderStatusReport for reconciliation
1163        let Some(info) = cached_info else {
1164            return self
1165                .parse_order_to_status_report(order, ts_init, is_cancel)
1166                .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r)));
1167        };
1168
1169        let client_order_id = client_order_id.expect("client_order_id should exist if cached");
1170
1171        let status = if is_cancel {
1172            OrderStatus::Canceled
1173        } else if order.filled >= order.qty {
1174            OrderStatus::Filled
1175        } else if order.filled > 0.0 {
1176            OrderStatus::PartiallyFilled
1177        } else {
1178            OrderStatus::Accepted
1179        };
1180
1181        match status {
1182            OrderStatus::Accepted => Some(ParsedOrderEvent::Accepted(OrderAccepted::new(
1183                info.trader_id,
1184                info.strategy_id,
1185                instrument_id,
1186                client_order_id,
1187                venue_order_id,
1188                account_id,
1189                UUID4::new(),
1190                ts_event,
1191                ts_init,
1192                false,
1193            ))),
1194            OrderStatus::Canceled => {
1195                // Detect expiry by cancel reason keywords
1196                let is_expired = cancel_reason
1197                    .map(|r| {
1198                        let r_lower = r.to_lowercase();
1199                        r_lower.contains("expir")
1200                            || r_lower.contains("gtd")
1201                            || r_lower.contains("timeout")
1202                    })
1203                    .unwrap_or(false);
1204
1205                if is_expired {
1206                    Some(ParsedOrderEvent::Expired(OrderExpired::new(
1207                        info.trader_id,
1208                        info.strategy_id,
1209                        instrument_id,
1210                        client_order_id,
1211                        UUID4::new(),
1212                        ts_event,
1213                        ts_init,
1214                        false,
1215                        Some(venue_order_id),
1216                        Some(account_id),
1217                    )))
1218                } else {
1219                    Some(ParsedOrderEvent::Canceled(OrderCanceled::new(
1220                        info.trader_id,
1221                        info.strategy_id,
1222                        instrument_id,
1223                        client_order_id,
1224                        UUID4::new(),
1225                        ts_event,
1226                        ts_init,
1227                        false,
1228                        Some(venue_order_id),
1229                        Some(account_id),
1230                    )))
1231                }
1232            }
1233
1234            // Fill events are handled separately via the fills feed
1235            OrderStatus::PartiallyFilled | OrderStatus::Filled => self
1236                .parse_order_to_status_report(order, ts_init, is_cancel)
1237                .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r))),
1238            _ => self
1239                .parse_order_to_status_report(order, ts_init, is_cancel)
1240                .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r))),
1241        }
1242    }
1243
1244    /// Parses a Kraken Futures order into an `OrderStatusReport`.
1245    ///
1246    /// Used for snapshots (reconciliation) and external orders.
1247    fn parse_order_to_status_report(
1248        &self,
1249        order: &KrakenFuturesOpenOrder,
1250        ts_init: UnixNanos,
1251        is_cancel: bool,
1252    ) -> Option<OrderStatusReport> {
1253        let Some(account_id) = self.account_id else {
1254            tracing::warn!("Cannot process order: account_id not set");
1255            return None;
1256        };
1257
1258        let instrument = self
1259            .instruments_cache
1260            .get(&Ustr::from(order.instrument.as_str()))?;
1261
1262        let instrument_id = instrument.id();
1263        let size_precision = instrument.size_precision();
1264
1265        let side = if order.direction == 0 {
1266            OrderSide::Buy
1267        } else {
1268            OrderSide::Sell
1269        };
1270
1271        let order_type = match order.order_type.as_str() {
1272            "limit" | "lmt" => OrderType::Limit,
1273            "stop" | "stp" => OrderType::StopLimit,
1274            "take_profit" => OrderType::LimitIfTouched,
1275            "market" | "mkt" => OrderType::Market,
1276            _ => OrderType::Limit,
1277        };
1278
1279        let status = if is_cancel {
1280            OrderStatus::Canceled
1281        } else if order.filled >= order.qty {
1282            OrderStatus::Filled
1283        } else if order.filled > 0.0 {
1284            OrderStatus::PartiallyFilled
1285        } else {
1286            OrderStatus::Accepted
1287        };
1288
1289        if order.qty <= 0.0 {
1290            let qty = order.qty;
1291            tracing::warn!(order_id = %order.order_id, "Skipping order with invalid quantity: {qty}");
1292            return None;
1293        }
1294
1295        let ts_event = UnixNanos::from((order.last_update_time as u64) * 1_000_000);
1296
1297        let client_order_id = order
1298            .cli_ord_id
1299            .as_ref()
1300            .map(|s| ClientOrderId::new(s.as_str()));
1301
1302        let filled_qty = if order.filled <= 0.0 {
1303            Quantity::zero(size_precision)
1304        } else {
1305            Quantity::new(order.filled, size_precision)
1306        };
1307
1308        Some(OrderStatusReport::new(
1309            account_id,
1310            instrument_id,
1311            client_order_id,
1312            VenueOrderId::new(&order.order_id),
1313            side,
1314            order_type,
1315            TimeInForce::Gtc,
1316            status,
1317            Quantity::new(order.qty, size_precision),
1318            filled_qty,
1319            ts_event, // ts_accepted
1320            ts_event, // ts_last
1321            ts_init,
1322            Some(UUID4::new()),
1323        ))
1324    }
1325
1326    fn parse_fill_to_report(
1327        &self,
1328        fill: &KrakenFuturesFill,
1329        ts_init: UnixNanos,
1330    ) -> Option<FillReport> {
1331        let Some(account_id) = self.account_id else {
1332            tracing::warn!("Cannot process fill: account_id not set");
1333            return None;
1334        };
1335
1336        // Resolve instrument: try message field first, then fall back to cache
1337        let instrument = if let Some(ref symbol) = fill.instrument {
1338            self.instruments_cache.get(symbol).cloned()
1339        } else if let Some(ref cli_ord_id) = fill.cli_ord_id {
1340            // Fall back to client order cache
1341            self.client_order_cache.get(cli_ord_id).and_then(|info| {
1342                self.instruments_cache
1343                    .iter()
1344                    .find(|(_, inst)| inst.id() == info.instrument_id)
1345                    .map(|(_, inst)| inst.clone())
1346            })
1347        } else {
1348            None
1349        };
1350
1351        let Some(instrument) = instrument else {
1352            tracing::warn!(
1353                fill_id = %fill.fill_id,
1354                order_id = %fill.order_id,
1355                cli_ord_id = ?fill.cli_ord_id,
1356                "Cannot resolve instrument for fill"
1357            );
1358            return None;
1359        };
1360
1361        let instrument_id = instrument.id();
1362        let price_precision = instrument.price_precision();
1363        let size_precision = instrument.size_precision();
1364
1365        if fill.qty <= 0.0 {
1366            let qty = fill.qty;
1367            tracing::warn!(fill_id = %fill.fill_id, "Skipping fill with invalid quantity: {qty}");
1368            return None;
1369        }
1370
1371        let side = if fill.buy {
1372            OrderSide::Buy
1373        } else {
1374            OrderSide::Sell
1375        };
1376
1377        let ts_event = UnixNanos::from((fill.time as u64) * 1_000_000);
1378
1379        let client_order_id = fill
1380            .cli_ord_id
1381            .as_ref()
1382            .map(|s| ClientOrderId::new(s.as_str()));
1383
1384        let commission = Money::new(fill.fee_paid.unwrap_or(0.0), instrument.quote_currency());
1385
1386        Some(FillReport::new(
1387            account_id,
1388            instrument_id,
1389            VenueOrderId::new(&fill.order_id),
1390            TradeId::new(&fill.fill_id),
1391            side,
1392            Quantity::new(fill.qty, size_precision),
1393            Price::new(fill.price, price_precision),
1394            commission,
1395            LiquiditySide::NoLiquiditySide, // Not provided
1396            client_order_id,
1397            None, // venue_position_id
1398            ts_event,
1399            ts_init,
1400            Some(UUID4::new()),
1401        ))
1402    }
1403}
1404
1405#[cfg(test)]
1406mod tests {
1407    use nautilus_model::{
1408        instruments::{CryptoFuture, InstrumentAny},
1409        types::Currency,
1410    };
1411    use rstest::rstest;
1412
1413    use super::*;
1414
1415    fn create_test_handler() -> FuturesFeedHandler {
1416        let signal = Arc::new(AtomicBool::new(false));
1417        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
1418        let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
1419        let subscriptions = SubscriptionState::new(':');
1420
1421        FuturesFeedHandler::new(signal, cmd_rx, raw_rx, subscriptions)
1422    }
1423
1424    fn create_test_instrument() -> InstrumentAny {
1425        InstrumentAny::CryptoFuture(CryptoFuture::new(
1426            InstrumentId::from("PI_XBTUSD.KRAKEN"),
1427            Symbol::from("PI_XBTUSD"),
1428            Currency::BTC(),
1429            Currency::USD(),
1430            Currency::USD(),
1431            false,
1432            UnixNanos::default(),
1433            UnixNanos::default(),
1434            1, // price_precision
1435            0, // size_precision
1436            Price::from("0.5"),
1437            Quantity::from(1),
1438            None,
1439            None,
1440            None,
1441            None,
1442            None,
1443            None,
1444            None,
1445            None,
1446            None,
1447            None,
1448            None,
1449            None,
1450            UnixNanos::default(),
1451            UnixNanos::default(),
1452        ))
1453    }
1454
1455    #[rstest]
1456    fn test_book_snapshot_filters_zero_quantity_bids() {
1457        let mut handler = create_test_handler();
1458        let instrument = create_test_instrument();
1459        handler
1460            .instruments_cache
1461            .insert(Ustr::from("PI_XBTUSD"), instrument);
1462
1463        handler.subscriptions.mark_subscribe("book:PI_XBTUSD");
1464        handler.subscriptions.confirm_subscribe("book:PI_XBTUSD");
1465
1466        let json = include_str!("../../../test_data/ws_futures_book_snapshot_with_zero_qty.json");
1467        let ts_init = UnixNanos::from(1_000_000_000);
1468
1469        handler.parse_message(json, ts_init);
1470
1471        assert_eq!(handler.pending_messages.len(), 1);
1472
1473        let msg = handler.pending_messages.pop_front().unwrap();
1474        let KrakenFuturesWsMessage::BookDeltas(deltas) = msg else {
1475            panic!("Expected BookDeltas message");
1476        };
1477
1478        // Fixture has 3 bids (1 zero qty) + 2 asks (1 zero qty) = 3 valid + 1 clear = 4
1479        assert_eq!(deltas.deltas.len(), 4);
1480        assert_eq!(deltas.deltas[0].action, BookAction::Clear);
1481
1482        for delta in &deltas.deltas[1..] {
1483            assert!(
1484                !delta.order.size.is_zero(),
1485                "Found zero-quantity delta that should have been filtered: {:?}",
1486                delta
1487            );
1488        }
1489    }
1490
1491    #[rstest]
1492    fn test_book_snapshot_filters_zero_quantity_asks() {
1493        let mut handler = create_test_handler();
1494        let instrument = create_test_instrument();
1495        handler
1496            .instruments_cache
1497            .insert(Ustr::from("PI_XBTUSD"), instrument);
1498
1499        handler.subscriptions.mark_subscribe("book:PI_XBTUSD");
1500        handler.subscriptions.confirm_subscribe("book:PI_XBTUSD");
1501
1502        let json = include_str!("../../../test_data/ws_futures_book_snapshot_with_zero_qty.json");
1503        let ts_init = UnixNanos::from(1_000_000_000);
1504
1505        handler.parse_message(json, ts_init);
1506
1507        let msg = handler.pending_messages.pop_front().unwrap();
1508        let KrakenFuturesWsMessage::BookDeltas(deltas) = msg else {
1509            panic!("Expected BookDeltas message");
1510        };
1511
1512        // Only 1 ask should remain (the one with qty 2300, not the zero qty one)
1513        let sell_deltas: Vec<_> = deltas
1514            .deltas
1515            .iter()
1516            .filter(|d| d.order.side == OrderSide::Sell)
1517            .collect();
1518
1519        assert_eq!(sell_deltas.len(), 1);
1520        assert_eq!(sell_deltas[0].order.price.as_f64(), 34912.0);
1521    }
1522
1523    #[rstest]
1524    fn test_trade_filters_zero_quantity() {
1525        let mut handler = create_test_handler();
1526        let instrument = create_test_instrument();
1527        handler
1528            .instruments_cache
1529            .insert(Ustr::from("PI_XBTUSD"), instrument);
1530
1531        handler.subscriptions.mark_subscribe("trades:PI_XBTUSD");
1532        handler.subscriptions.confirm_subscribe("trades:PI_XBTUSD");
1533
1534        let json = r#"{
1535            "feed": "trade",
1536            "product_id": "PI_XBTUSD",
1537            "time": 1612269825817,
1538            "side": "buy",
1539            "qty": 0.0,
1540            "price": 34900.0,
1541            "seq": 12345
1542        }"#;
1543        let ts_init = UnixNanos::from(1_000_000_000);
1544
1545        handler.parse_message(json, ts_init);
1546
1547        assert!(
1548            handler.pending_messages.is_empty(),
1549            "Zero quantity trade should be filtered out"
1550        );
1551    }
1552}