Skip to main content

nautilus_kraken/websocket/futures/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Kraken Futures.
17
18use std::{
19    collections::VecDeque,
20    fmt::Debug,
21    sync::{
22        Arc,
23        atomic::{AtomicBool, Ordering},
24    },
25};
26
27use ahash::AHashMap;
28use nautilus_common::cache::quote::QuoteCache;
29use nautilus_core::{AtomicTime, UUID4, UnixNanos, time::get_atomic_clock_realtime};
30use nautilus_model::{
31    data::{
32        BookOrder, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta,
33        OrderBookDeltas, TradeTick,
34    },
35    enums::{
36        AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType, TimeInForce,
37    },
38    events::{OrderAccepted, OrderCanceled, OrderExpired, OrderUpdated},
39    identifiers::{
40        AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TradeId, TraderId, VenueOrderId,
41    },
42    instruments::{Instrument, InstrumentAny},
43    reports::{FillReport, OrderStatusReport},
44    types::{Money, Price, Quantity},
45};
46use nautilus_network::{
47    RECONNECTED,
48    websocket::{SubscriptionState, WebSocketClient},
49};
50use rust_decimal::Decimal;
51use serde::Deserialize;
52use serde_json::Value;
53use tokio_tungstenite::tungstenite::Message;
54use ustr::Ustr;
55
56use super::messages::{
57    KrakenFuturesBookDelta, KrakenFuturesBookSnapshot, KrakenFuturesChallengeRequest,
58    KrakenFuturesChannel, KrakenFuturesEvent, KrakenFuturesFeed, KrakenFuturesFill,
59    KrakenFuturesFillsDelta, KrakenFuturesMessageType, KrakenFuturesOpenOrder,
60    KrakenFuturesOpenOrdersCancel, KrakenFuturesOpenOrdersDelta,
61    KrakenFuturesPrivateSubscribeRequest, KrakenFuturesTickerData, KrakenFuturesTradeData,
62    KrakenFuturesWsMessage, classify_futures_message,
63};
64use crate::common::enums::KrakenOrderSide;
65
66/// Parsed order event from a Kraken Futures WebSocket message.
67#[derive(Debug, Clone)]
68pub enum ParsedOrderEvent {
69    Accepted(OrderAccepted),
70    Canceled(OrderCanceled),
71    Expired(OrderExpired),
72    Updated(OrderUpdated),
73    StatusOnly(Box<OrderStatusReport>),
74}
75
76/// Cached order info for proper event generation.
77#[derive(Debug, Clone)]
78struct CachedOrderInfo {
79    instrument_id: InstrumentId,
80    trader_id: TraderId,
81    strategy_id: StrategyId,
82}
83
84/// Commands sent from the outer client to the inner message handler.
85#[allow(
86    clippy::large_enum_variant,
87    reason = "Commands are ephemeral and immediately consumed"
88)]
89pub enum HandlerCommand {
90    SetClient(WebSocketClient),
91    SubscribeTicker(Symbol),
92    UnsubscribeTicker(Symbol),
93    SubscribeTrade(Symbol),
94    UnsubscribeTrade(Symbol),
95    SubscribeBook(Symbol),
96    UnsubscribeBook(Symbol),
97    Disconnect,
98    InitializeInstruments(Vec<InstrumentAny>),
99    UpdateInstrument(InstrumentAny),
100    SetAccountId(AccountId),
101    RequestChallenge {
102        api_key: String,
103        response_tx: tokio::sync::oneshot::Sender<String>,
104    },
105    SetAuthCredentials {
106        api_key: String,
107        original_challenge: String,
108        signed_challenge: String,
109    },
110    SubscribeOpenOrders,
111    SubscribeFills,
112    CacheClientOrder {
113        client_order_id: ClientOrderId,
114        venue_order_id: Option<VenueOrderId>,
115        instrument_id: InstrumentId,
116        trader_id: TraderId,
117        strategy_id: StrategyId,
118    },
119}
120
121impl Debug for HandlerCommand {
122    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123        match self {
124            Self::SetClient(_) => f.debug_struct(stringify!(SetClient)).finish(),
125            Self::SubscribeTicker(s) => f.debug_tuple("SubscribeTicker").field(s).finish(),
126            Self::UnsubscribeTicker(s) => f.debug_tuple("UnsubscribeTicker").field(s).finish(),
127            Self::SubscribeTrade(s) => f.debug_tuple("SubscribeTrade").field(s).finish(),
128            Self::UnsubscribeTrade(s) => f.debug_tuple("UnsubscribeTrade").field(s).finish(),
129            Self::SubscribeBook(s) => f.debug_tuple("SubscribeBook").field(s).finish(),
130            Self::UnsubscribeBook(s) => f.debug_tuple("UnsubscribeBook").field(s).finish(),
131            Self::Disconnect => write!(f, "Disconnect"),
132            Self::InitializeInstruments(v) => f
133                .debug_tuple("InitializeInstruments")
134                .field(&v.len())
135                .finish(),
136            Self::UpdateInstrument(i) => f.debug_tuple("UpdateInstrument").field(&i.id()).finish(),
137            Self::SetAccountId(id) => f.debug_tuple("SetAccountId").field(id).finish(),
138            Self::RequestChallenge { api_key, .. } => {
139                let masked = &api_key[..4.min(api_key.len())];
140                f.debug_struct(stringify!(RequestChallenge))
141                    .field("api_key", &format!("{masked}..."))
142                    .finish()
143            }
144            Self::SetAuthCredentials { api_key, .. } => {
145                let masked = &api_key[..4.min(api_key.len())];
146                f.debug_struct(stringify!(SetAuthCredentials))
147                    .field("api_key", &format!("{masked}..."))
148                    .finish()
149            }
150            Self::SubscribeOpenOrders => write!(f, "SubscribeOpenOrders"),
151            Self::SubscribeFills => write!(f, "SubscribeFills"),
152            Self::CacheClientOrder {
153                client_order_id,
154                instrument_id,
155                ..
156            } => f
157                .debug_struct(stringify!(CacheClientOrder))
158                .field("client_order_id", client_order_id)
159                .field("instrument_id", instrument_id)
160                .finish(),
161        }
162    }
163}
164
165/// WebSocket message handler for Kraken Futures.
166pub struct FuturesFeedHandler {
167    clock: &'static AtomicTime,
168    signal: Arc<AtomicBool>,
169    client: Option<WebSocketClient>,
170    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
171    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
172    subscriptions: SubscriptionState,
173    instruments_cache: AHashMap<Ustr, InstrumentAny>,
174    quote_cache: QuoteCache,
175    pending_messages: VecDeque<KrakenFuturesWsMessage>,
176    account_id: Option<AccountId>,
177    api_key: Option<String>,
178    original_challenge: Option<String>,
179    signed_challenge: Option<String>,
180    client_order_cache: AHashMap<ClientOrderId, CachedOrderInfo>,
181    venue_order_cache: AHashMap<VenueOrderId, ClientOrderId>,
182    pending_challenge_tx: Option<tokio::sync::oneshot::Sender<String>>,
183}
184
185impl FuturesFeedHandler {
186    /// Creates a new [`FuturesFeedHandler`] instance.
187    pub fn new(
188        signal: Arc<AtomicBool>,
189        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
190        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
191        subscriptions: SubscriptionState,
192    ) -> Self {
193        Self {
194            clock: get_atomic_clock_realtime(),
195            signal,
196            client: None,
197            cmd_rx,
198            raw_rx,
199            subscriptions,
200            instruments_cache: AHashMap::new(),
201            quote_cache: QuoteCache::new(),
202            pending_messages: VecDeque::new(),
203            account_id: None,
204            api_key: None,
205            original_challenge: None,
206            signed_challenge: None,
207            client_order_cache: AHashMap::new(),
208            venue_order_cache: AHashMap::new(),
209            pending_challenge_tx: None,
210        }
211    }
212
213    pub fn is_stopped(&self) -> bool {
214        self.signal.load(Ordering::Relaxed)
215    }
216
217    fn is_subscribed(&self, channel: KrakenFuturesChannel, symbol: &Ustr) -> bool {
218        let channel_ustr = Ustr::from(channel.as_ref());
219        self.subscriptions.is_subscribed(&channel_ustr, symbol)
220    }
221
222    fn get_instrument(&self, symbol: &Ustr) -> Option<&InstrumentAny> {
223        self.instruments_cache.get(symbol)
224    }
225
226    /// Processes messages and commands, returning when stopped or stream ends.
227    pub async fn next(&mut self) -> Option<KrakenFuturesWsMessage> {
228        // First drain any pending messages from previous ticker processing
229        if let Some(msg) = self.pending_messages.pop_front() {
230            return Some(msg);
231        }
232
233        loop {
234            tokio::select! {
235                Some(cmd) = self.cmd_rx.recv() => {
236                    match cmd {
237                        HandlerCommand::SetClient(client) => {
238                            log::debug!("WebSocketClient received by futures handler");
239                            self.client = Some(client);
240                        }
241                        HandlerCommand::SubscribeTicker(symbol) => {
242                            self.send_subscribe(KrakenFuturesFeed::Ticker, &symbol).await;
243                        }
244                        HandlerCommand::UnsubscribeTicker(symbol) => {
245                            self.send_unsubscribe(KrakenFuturesFeed::Ticker, &symbol).await;
246                        }
247                        HandlerCommand::SubscribeTrade(symbol) => {
248                            self.send_subscribe(KrakenFuturesFeed::Trade, &symbol).await;
249                        }
250                        HandlerCommand::UnsubscribeTrade(symbol) => {
251                            self.send_unsubscribe(KrakenFuturesFeed::Trade, &symbol).await;
252                        }
253                        HandlerCommand::SubscribeBook(symbol) => {
254                            self.send_subscribe(KrakenFuturesFeed::Book, &symbol).await;
255                        }
256                        HandlerCommand::UnsubscribeBook(symbol) => {
257                            self.send_unsubscribe(KrakenFuturesFeed::Book, &symbol).await;
258                        }
259                        HandlerCommand::Disconnect => {
260                            log::debug!("Disconnect command received");
261                            if let Some(client) = self.client.take() {
262                                client.disconnect().await;
263                            }
264                            return None;
265                        }
266                        HandlerCommand::InitializeInstruments(instruments) => {
267                            for inst in instruments {
268                                self.instruments_cache.insert(inst.raw_symbol().inner(), inst);
269                            }
270                            let count = self.instruments_cache.len();
271                            log::debug!("Initialized {count} instruments in futures handler cache");
272                        }
273                        HandlerCommand::UpdateInstrument(inst) => {
274                            self.instruments_cache.insert(inst.raw_symbol().inner(), inst);
275                        }
276                        HandlerCommand::SetAccountId(account_id) => {
277                            log::debug!("Setting account_id for futures handler: {account_id}");
278                            self.account_id = Some(account_id);
279                        }
280                        HandlerCommand::RequestChallenge { api_key, response_tx } => {
281                            log::debug!("Requesting challenge for authentication");
282                            self.pending_challenge_tx = Some(response_tx);
283                            self.send_challenge_request(&api_key).await;
284                        }
285                        HandlerCommand::SetAuthCredentials { api_key, original_challenge, signed_challenge } => {
286                            log::debug!("Setting auth credentials for futures handler");
287                            self.api_key = Some(api_key);
288                            self.original_challenge = Some(original_challenge);
289                            self.signed_challenge = Some(signed_challenge);
290                        }
291                        HandlerCommand::SubscribeOpenOrders => {
292                            self.send_private_subscribe(KrakenFuturesFeed::OpenOrders).await;
293                        }
294                        HandlerCommand::SubscribeFills => {
295                            self.send_private_subscribe(KrakenFuturesFeed::Fills).await;
296                        }
297                        HandlerCommand::CacheClientOrder {
298                            client_order_id,
299                            venue_order_id,
300                            instrument_id,
301                            trader_id,
302                            strategy_id,
303                        } => {
304                            self.client_order_cache.insert(
305                                client_order_id,
306                                CachedOrderInfo {
307                                    instrument_id,
308                                    trader_id,
309                                    strategy_id,
310                                },
311                            );
312                            if let Some(venue_id) = venue_order_id {
313                                self.venue_order_cache.insert(venue_id, client_order_id);
314                            }
315                        }
316                    }
317                    continue;
318                }
319
320                msg = self.raw_rx.recv() => {
321                    let msg = match msg {
322                        Some(msg) => msg,
323                        None => {
324                            log::debug!("WebSocket stream closed");
325                            return None;
326                        }
327                    };
328
329                    if self.signal.load(Ordering::Relaxed) {
330                        log::debug!("Stop signal received");
331                        return None;
332                    }
333
334                    // Handle control frames first (borrow msg to avoid moving)
335                    match &msg {
336                        Message::Ping(data) => {
337                            let len = data.len();
338                            log::trace!("Received ping frame with {len} bytes");
339                            if let Some(client) = &self.client
340                                && let Err(e) = client.send_pong(data.to_vec()).await
341                            {
342                                log::warn!("Failed to send pong frame: {e}");
343                            }
344                            continue;
345                        }
346                        Message::Pong(_) => {
347                            log::debug!("Received pong from server");
348                            continue;
349                        }
350                        Message::Close(_) => {
351                            log::info!("WebSocket connection closed");
352                            return None;
353                        }
354                        Message::Frame(_) => {
355                            log::trace!("Received raw frame");
356                            continue;
357                        }
358                        _ => {}
359                    }
360
361                    // Extract text without allocation (Utf8Bytes derefs to &str)
362                    let text: &str = match &msg {
363                        Message::Text(text) => text,
364                        Message::Binary(data) => match std::str::from_utf8(data) {
365                            Ok(s) => s,
366                            Err(_) => continue,
367                        },
368                        _ => continue,
369                    };
370
371                    if text == RECONNECTED {
372                        log::info!("Received WebSocket reconnected signal");
373                        self.quote_cache.clear();
374                        return Some(KrakenFuturesWsMessage::Reconnected);
375                    }
376
377                    let ts_init = self.clock.get_time_ns();
378                    self.parse_message(text, ts_init);
379
380                    // Return first pending message if any were produced
381                    if let Some(msg) = self.pending_messages.pop_front() {
382                        return Some(msg);
383                    }
384
385                    continue;
386                }
387            }
388        }
389    }
390
391    async fn send_subscribe(&self, feed: KrakenFuturesFeed, symbol: &Symbol) {
392        if let Some(ref client) = self.client {
393            let feed_str = serde_json::to_string(&feed).unwrap_or_default();
394            let feed_str = feed_str.trim_matches('"');
395            let msg = format!(
396                r#"{{"event":"subscribe","feed":"{feed_str}","product_ids":["{symbol}"]}}"#
397            );
398            if let Err(e) = client.send_text(msg, None).await {
399                log::error!("Failed to send {feed:?} subscribe: {e}");
400            }
401        }
402    }
403
404    async fn send_unsubscribe(&self, feed: KrakenFuturesFeed, symbol: &Symbol) {
405        if let Some(ref client) = self.client {
406            let feed_str = serde_json::to_string(&feed).unwrap_or_default();
407            let feed_str = feed_str.trim_matches('"');
408            let msg = format!(
409                r#"{{"event":"unsubscribe","feed":"{feed_str}","product_ids":["{symbol}"]}}"#
410            );
411            if let Err(e) = client.send_text(msg, None).await {
412                log::error!("Failed to send {feed:?} unsubscribe: {e}");
413            }
414        }
415    }
416
417    async fn send_private_subscribe(&self, feed: KrakenFuturesFeed) {
418        let Some(ref client) = self.client else {
419            log::error!("Cannot subscribe to {feed:?}: no WebSocket client");
420            return;
421        };
422
423        let Some(ref api_key) = self.api_key else {
424            log::error!("Cannot subscribe to {feed:?}: no API key set");
425            return;
426        };
427
428        let Some(ref original_challenge) = self.original_challenge else {
429            log::error!("Cannot subscribe to {feed:?}: no challenge set");
430            return;
431        };
432
433        let Some(ref signed_challenge) = self.signed_challenge else {
434            log::error!("Cannot subscribe to {feed:?}: no signed challenge set");
435            return;
436        };
437
438        let request = KrakenFuturesPrivateSubscribeRequest {
439            event: KrakenFuturesEvent::Subscribe,
440            feed,
441            api_key: api_key.clone(),
442            original_challenge: original_challenge.clone(),
443            signed_challenge: signed_challenge.clone(),
444        };
445
446        let msg = match serde_json::to_string(&request) {
447            Ok(m) => m,
448            Err(e) => {
449                log::error!("Failed to serialize {feed:?} subscribe request: {e}");
450                return;
451            }
452        };
453
454        if let Err(e) = client.send_text(msg, None).await {
455            log::error!("Failed to send {feed:?} subscribe: {e}");
456        } else {
457            log::debug!("Sent private subscribe request for {feed:?}");
458        }
459    }
460
461    async fn send_challenge_request(&self, api_key: &str) {
462        let Some(ref client) = self.client else {
463            log::error!("Cannot request challenge: no WebSocket client");
464            return;
465        };
466
467        let request = KrakenFuturesChallengeRequest {
468            event: KrakenFuturesEvent::Challenge,
469            api_key: api_key.to_string(),
470        };
471
472        let msg = match serde_json::to_string(&request) {
473            Ok(m) => m,
474            Err(e) => {
475                log::error!("Failed to serialize challenge request: {e}");
476                return;
477            }
478        };
479
480        if let Err(e) = client.send_text(msg, None).await {
481            log::error!("Failed to send challenge request: {e}");
482        } else {
483            log::debug!("Sent challenge request for authentication");
484        }
485    }
486
487    fn parse_message(&mut self, text: &str, ts_init: UnixNanos) {
488        let value: Value = match serde_json::from_str(text) {
489            Ok(v) => v,
490            Err(e) => {
491                log::debug!("Failed to parse message as JSON: {e}");
492                return;
493            }
494        };
495
496        match classify_futures_message(&value) {
497            // Private feeds (execution)
498            KrakenFuturesMessageType::OpenOrdersSnapshot => {
499                log::debug!(
500                    "Skipping open_orders_snapshot (REST reconciliation handles initial state)"
501                );
502            }
503            KrakenFuturesMessageType::OpenOrdersCancel => {
504                self.handle_open_orders_cancel_value(value, ts_init);
505            }
506            KrakenFuturesMessageType::OpenOrdersDelta => {
507                self.handle_open_orders_delta_value(value, ts_init);
508            }
509            KrakenFuturesMessageType::FillsSnapshot => {
510                log::debug!("Skipping fills_snapshot (REST reconciliation handles initial state)");
511            }
512            KrakenFuturesMessageType::FillsDelta => {
513                self.handle_fills_delta_value(value, ts_init);
514            }
515            // Public feeds (market data)
516            KrakenFuturesMessageType::Ticker => {
517                self.handle_ticker_message_value(value, ts_init);
518            }
519            KrakenFuturesMessageType::TradeSnapshot => {
520                log::debug!("Skipping trade_snapshot (only streaming live trades)");
521            }
522            KrakenFuturesMessageType::Trade => {
523                self.handle_trade_message_value(value, ts_init);
524            }
525            KrakenFuturesMessageType::BookSnapshot => {
526                self.handle_book_snapshot_value(value, ts_init);
527            }
528            KrakenFuturesMessageType::BookDelta => {
529                self.handle_book_delta_value(value, ts_init);
530            }
531            // Control messages
532            KrakenFuturesMessageType::Info => {
533                log::debug!("Received info message: {text}");
534            }
535            KrakenFuturesMessageType::Pong => {
536                log::debug!("Received text pong response");
537            }
538            KrakenFuturesMessageType::Subscribed => {
539                log::debug!("Subscription confirmed: {text}");
540            }
541            KrakenFuturesMessageType::Unsubscribed => {
542                log::debug!("Unsubscription confirmed: {text}");
543            }
544            KrakenFuturesMessageType::Challenge => {
545                self.handle_challenge_response_value(value);
546            }
547            KrakenFuturesMessageType::Heartbeat => {
548                log::trace!("Heartbeat received");
549            }
550            KrakenFuturesMessageType::Error => {
551                let message = value
552                    .get("message")
553                    .and_then(|v| v.as_str())
554                    .unwrap_or("Unknown error");
555                log::error!("Kraken Futures WebSocket error: {message}");
556            }
557            KrakenFuturesMessageType::Alert => {
558                let message = value
559                    .get("message")
560                    .and_then(|v| v.as_str())
561                    .unwrap_or("Unknown alert");
562                log::warn!("Kraken Futures WebSocket alert: {message}");
563            }
564            KrakenFuturesMessageType::Unknown => {
565                log::warn!("Unhandled futures message: {text}");
566            }
567        }
568    }
569
570    fn handle_challenge_response_value(&mut self, value: Value) {
571        #[derive(Deserialize)]
572        struct ChallengeResponse {
573            message: String,
574        }
575
576        match serde_json::from_value::<ChallengeResponse>(value) {
577            Ok(response) => {
578                let len = response.message.len();
579                log::debug!("Challenge received, length: {len}");
580
581                if let Some(tx) = self.pending_challenge_tx.take() {
582                    if tx.send(response.message).is_err() {
583                        log::warn!("Failed to send challenge response - receiver dropped");
584                    }
585                } else {
586                    log::warn!("Received challenge but no pending request");
587                }
588            }
589            Err(e) => {
590                log::error!("Failed to parse challenge response: {e}");
591            }
592        }
593    }
594
595    fn emit_order_event(&mut self, event: ParsedOrderEvent) {
596        match event {
597            ParsedOrderEvent::Accepted(accepted) => {
598                self.pending_messages
599                    .push_back(KrakenFuturesWsMessage::OrderAccepted(accepted));
600            }
601            ParsedOrderEvent::Canceled(canceled) => {
602                self.pending_messages
603                    .push_back(KrakenFuturesWsMessage::OrderCanceled(canceled));
604            }
605            ParsedOrderEvent::Expired(expired) => {
606                self.pending_messages
607                    .push_back(KrakenFuturesWsMessage::OrderExpired(expired));
608            }
609            ParsedOrderEvent::Updated(updated) => {
610                self.pending_messages
611                    .push_back(KrakenFuturesWsMessage::OrderUpdated(updated));
612            }
613            ParsedOrderEvent::StatusOnly(report) => {
614                self.pending_messages
615                    .push_back(KrakenFuturesWsMessage::OrderStatusReport(report));
616            }
617        }
618    }
619
620    fn handle_ticker_message_value(&mut self, value: Value, ts_init: UnixNanos) {
621        let ticker = match serde_json::from_value::<KrakenFuturesTickerData>(value) {
622            Ok(t) => t,
623            Err(e) => {
624                log::debug!("Failed to parse ticker: {e}");
625                return;
626            }
627        };
628
629        let (instrument_id, price_precision) = {
630            let Some(instrument) = self.get_instrument(&ticker.product_id) else {
631                let product_id = &ticker.product_id;
632                log::debug!("Instrument not found for product_id: {product_id}");
633                return;
634            };
635            (instrument.id(), instrument.price_precision())
636        };
637
638        let ts_event = ticker
639            .time
640            .map_or(ts_init, |t| UnixNanos::from((t as u64) * 1_000_000));
641
642        let has_mark = self.is_subscribed(KrakenFuturesChannel::Mark, &ticker.product_id);
643        let has_index = self.is_subscribed(KrakenFuturesChannel::Index, &ticker.product_id);
644
645        if let Some(mark_price) = ticker.mark_price
646            && has_mark
647        {
648            let update = MarkPriceUpdate::new(
649                instrument_id,
650                Price::new(mark_price, price_precision),
651                ts_event,
652                ts_init,
653            );
654            self.pending_messages
655                .push_back(KrakenFuturesWsMessage::MarkPrice(update));
656        }
657
658        if let Some(index_price) = ticker.index
659            && has_index
660        {
661            let update = IndexPriceUpdate::new(
662                instrument_id,
663                Price::new(index_price, price_precision),
664                ts_event,
665                ts_init,
666            );
667            self.pending_messages
668                .push_back(KrakenFuturesWsMessage::IndexPrice(update));
669        }
670
671        let has_funding = self.is_subscribed(KrakenFuturesChannel::Funding, &ticker.product_id);
672
673        if let Some(funding_rate) = ticker.funding_rate
674            && has_funding
675        {
676            let next_funding_ns = ticker
677                .next_funding_rate_time
678                .map(|t| UnixNanos::from((t as u64) * 1_000_000));
679            let update = FundingRateUpdate::new(
680                instrument_id,
681                Decimal::from_f64_retain(funding_rate).unwrap_or_default(),
682                next_funding_ns,
683                ts_event,
684                ts_init,
685            );
686            self.pending_messages
687                .push_back(KrakenFuturesWsMessage::FundingRate(update));
688        }
689    }
690
691    fn handle_trade_message_value(&mut self, value: Value, ts_init: UnixNanos) {
692        let trade = match serde_json::from_value::<KrakenFuturesTradeData>(value) {
693            Ok(t) => t,
694            Err(e) => {
695                log::warn!("Failed to parse trade: {e}");
696                return;
697            }
698        };
699
700        if !self.is_subscribed(KrakenFuturesChannel::Trades, &trade.product_id) {
701            log::warn!(
702                "Received trade for unsubscribed product: {}",
703                trade.product_id
704            );
705            return;
706        }
707
708        let (instrument_id, price_precision, size_precision) = {
709            let Some(instrument) = self.get_instrument(&trade.product_id) else {
710                log::warn!(
711                    "No instrument found for trade product: {}",
712                    trade.product_id
713                );
714                return;
715            };
716            (
717                instrument.id(),
718                instrument.price_precision(),
719                instrument.size_precision(),
720            )
721        };
722
723        let size = Quantity::new(trade.qty, size_precision);
724        if size.is_zero() {
725            let product_id = trade.product_id;
726            let raw_qty = trade.qty;
727            log::warn!("Skipping zero quantity trade for {product_id} (raw qty: {raw_qty})");
728            return;
729        }
730
731        let ts_event = UnixNanos::from((trade.time as u64) * 1_000_000);
732        let aggressor_side = match trade.side {
733            KrakenOrderSide::Buy => AggressorSide::Buyer,
734            KrakenOrderSide::Sell => AggressorSide::Seller,
735        };
736        let trade_id = trade.uid.unwrap_or_else(|| trade.seq.to_string());
737
738        let trade_tick = TradeTick::new(
739            instrument_id,
740            Price::new(trade.price, price_precision),
741            size,
742            aggressor_side,
743            TradeId::new(&trade_id),
744            ts_event,
745            ts_init,
746        );
747
748        self.pending_messages
749            .push_back(KrakenFuturesWsMessage::Trade(trade_tick));
750    }
751
752    fn handle_book_snapshot_value(&mut self, value: Value, ts_init: UnixNanos) {
753        let snapshot = match serde_json::from_value::<KrakenFuturesBookSnapshot>(value) {
754            Ok(s) => s,
755            Err(e) => {
756                log::warn!("Failed to parse book snapshot: {e}");
757                return;
758            }
759        };
760
761        let has_book = self.is_subscribed(KrakenFuturesChannel::Book, &snapshot.product_id);
762        let has_quotes = self.is_subscribed(KrakenFuturesChannel::Quotes, &snapshot.product_id);
763
764        if !has_book && !has_quotes {
765            log::warn!(
766                "Received book snapshot for unsubscribed product: {}",
767                snapshot.product_id
768            );
769            return;
770        }
771
772        let (instrument_id, price_precision, size_precision) = {
773            let Some(instrument) = self.get_instrument(&snapshot.product_id) else {
774                log::warn!(
775                    "No instrument found for book snapshot product: {}",
776                    snapshot.product_id
777                );
778                return;
779            };
780            (
781                instrument.id(),
782                instrument.price_precision(),
783                instrument.size_precision(),
784            )
785        };
786
787        let ts_event = UnixNanos::from((snapshot.timestamp as u64) * 1_000_000);
788
789        let best_bid = snapshot
790            .bids
791            .iter()
792            .filter(|l| l.qty > 0.0)
793            .max_by(|a, b| a.price.total_cmp(&b.price));
794        let best_ask = snapshot
795            .asks
796            .iter()
797            .filter(|l| l.qty > 0.0)
798            .min_by(|a, b| a.price.total_cmp(&b.price));
799
800        if has_quotes {
801            let bid_price = best_bid.map(|b| Price::new(b.price, price_precision));
802            let ask_price = best_ask.map(|a| Price::new(a.price, price_precision));
803            let bid_size = best_bid.map(|b| Quantity::new(b.qty, size_precision));
804            let ask_size = best_ask.map(|a| Quantity::new(a.qty, size_precision));
805
806            match self.quote_cache.process(
807                instrument_id,
808                bid_price,
809                ask_price,
810                bid_size,
811                ask_size,
812                ts_event,
813                ts_init,
814            ) {
815                Ok(quote) => {
816                    self.pending_messages
817                        .push_back(KrakenFuturesWsMessage::Quote(quote));
818                }
819                Err(e) => {
820                    log::trace!("Quote cache process error: {e}");
821                }
822            }
823        }
824
825        if has_book {
826            let mut deltas = Vec::with_capacity(snapshot.bids.len() + snapshot.asks.len() + 1);
827
828            deltas.push(OrderBookDelta::clear(
829                instrument_id,
830                snapshot.seq as u64,
831                ts_event,
832                ts_init,
833            ));
834
835            for bid in &snapshot.bids {
836                let size = Quantity::new(bid.qty, size_precision);
837                if size.is_zero() {
838                    continue;
839                }
840                let order = BookOrder::new(
841                    OrderSide::Buy,
842                    Price::new(bid.price, price_precision),
843                    size,
844                    0,
845                );
846                deltas.push(OrderBookDelta::new(
847                    instrument_id,
848                    BookAction::Add,
849                    order,
850                    0,
851                    snapshot.seq as u64,
852                    ts_event,
853                    ts_init,
854                ));
855            }
856
857            for ask in &snapshot.asks {
858                let size = Quantity::new(ask.qty, size_precision);
859                if size.is_zero() {
860                    continue;
861                }
862                let order = BookOrder::new(
863                    OrderSide::Sell,
864                    Price::new(ask.price, price_precision),
865                    size,
866                    0,
867                );
868                deltas.push(OrderBookDelta::new(
869                    instrument_id,
870                    BookAction::Add,
871                    order,
872                    0,
873                    snapshot.seq as u64,
874                    ts_event,
875                    ts_init,
876                ));
877            }
878
879            let book_deltas = OrderBookDeltas::new(instrument_id, deltas);
880            self.pending_messages
881                .push_back(KrakenFuturesWsMessage::BookDeltas(book_deltas));
882        }
883    }
884
885    fn handle_book_delta_value(&mut self, value: Value, ts_init: UnixNanos) {
886        let delta = match serde_json::from_value::<KrakenFuturesBookDelta>(value) {
887            Ok(d) => d,
888            Err(e) => {
889                log::warn!("Failed to parse book delta: {e}");
890                return;
891            }
892        };
893
894        let has_book = self.is_subscribed(KrakenFuturesChannel::Book, &delta.product_id);
895        let has_quotes = self.is_subscribed(KrakenFuturesChannel::Quotes, &delta.product_id);
896
897        if !has_book && !has_quotes {
898            log::warn!(
899                "Received book delta for unsubscribed product: {}",
900                delta.product_id
901            );
902            return;
903        }
904
905        let Some(instrument) = self.get_instrument(&delta.product_id) else {
906            log::warn!(
907                "No instrument found for book delta product: {}",
908                delta.product_id
909            );
910            return;
911        };
912
913        let ts_event = UnixNanos::from((delta.timestamp as u64) * 1_000_000);
914        let instrument_id = instrument.id();
915        let price_precision = instrument.price_precision();
916        let size_precision = instrument.size_precision();
917
918        let side: OrderSide = delta.side.into();
919
920        if has_quotes && delta.qty > 0.0 {
921            let price = Price::new(delta.price, price_precision);
922            let size = Quantity::new(delta.qty, size_precision);
923
924            let (bid_price, ask_price, bid_size, ask_size) = match side {
925                OrderSide::Buy => (Some(price), None, Some(size), None),
926                OrderSide::Sell => (None, Some(price), None, Some(size)),
927                _ => (None, None, None, None),
928            };
929
930            if let Ok(quote) = self.quote_cache.process(
931                instrument_id,
932                bid_price,
933                ask_price,
934                bid_size,
935                ask_size,
936                ts_event,
937                ts_init,
938            ) {
939                self.pending_messages
940                    .push_back(KrakenFuturesWsMessage::Quote(quote));
941            }
942        }
943
944        if has_book {
945            let size = Quantity::new(delta.qty, size_precision);
946            let action = if size.is_zero() {
947                BookAction::Delete
948            } else {
949                BookAction::Update
950            };
951
952            let order = BookOrder::new(side, Price::new(delta.price, price_precision), size, 0);
953
954            let book_delta = OrderBookDelta::new(
955                instrument_id,
956                action,
957                order,
958                0,
959                delta.seq as u64,
960                ts_event,
961                ts_init,
962            );
963
964            let book_deltas = OrderBookDeltas::new(instrument_id, vec![book_delta]);
965            self.pending_messages
966                .push_back(KrakenFuturesWsMessage::BookDeltas(book_deltas));
967        }
968    }
969
970    fn handle_open_orders_delta_value(&mut self, value: Value, ts_init: UnixNanos) {
971        let delta = match serde_json::from_value::<KrakenFuturesOpenOrdersDelta>(value) {
972            Ok(d) => d,
973            Err(e) => {
974                log::error!("Failed to parse open_orders delta: {e}");
975                return;
976            }
977        };
978
979        log::debug!(
980            "Received open_orders delta: order_id={}, is_cancel={}, reason={:?}",
981            delta.order.order_id,
982            delta.is_cancel,
983            delta.reason
984        );
985
986        if let Some(event) = self.parse_order_event(
987            &delta.order,
988            ts_init,
989            delta.is_cancel,
990            delta.reason.as_deref(),
991        ) {
992            self.emit_order_event(event);
993        }
994    }
995
996    fn handle_open_orders_cancel_value(&mut self, value: Value, ts_init: UnixNanos) {
997        // Already classified - we know it's a cancel with is_cancel=true and no "order" field
998        // Check if this is a fill-related cancel (skip those - fills feed handles them)
999        if let Some(reason) = value.get("reason").and_then(|r| r.as_str())
1000            && (reason == "full_fill" || reason == "partial_fill")
1001        {
1002            log::debug!(
1003                "Skipping open_orders cancel for fill (handled by fills feed): reason={reason}"
1004            );
1005            return;
1006        }
1007
1008        let cancel = match serde_json::from_value::<KrakenFuturesOpenOrdersCancel>(value) {
1009            Ok(c) => c,
1010            Err(e) => {
1011                log::error!("Failed to parse open_orders cancel: {e}");
1012                return;
1013            }
1014        };
1015
1016        log::debug!(
1017            "Received open_orders cancel: order_id={}, cli_ord_id={:?}, reason={:?}",
1018            cancel.order_id,
1019            cancel.cli_ord_id,
1020            cancel.reason
1021        );
1022
1023        let Some(account_id) = self.account_id else {
1024            log::warn!("Cannot process cancel: account_id not set");
1025            return;
1026        };
1027
1028        let venue_order_id_key = VenueOrderId::new(&cancel.order_id);
1029
1030        let (client_order_id, info) = if let Some(cli_ord_id) =
1031            cancel.cli_ord_id.as_ref().filter(|id| !id.is_empty())
1032        {
1033            let client_order_id_key = ClientOrderId::new(cli_ord_id);
1034            if let Some(info) = self.client_order_cache.get(&client_order_id_key) {
1035                (client_order_id_key, info.clone())
1036            } else if let Some(mapped_cli_ord_id) = self.venue_order_cache.get(&venue_order_id_key)
1037            {
1038                if let Some(info) = self.client_order_cache.get(mapped_cli_ord_id) {
1039                    (*mapped_cli_ord_id, info.clone())
1040                } else {
1041                    log::debug!(
1042                        "Cancel received for unknown order (not in cache): \
1043                        order_id={}, cli_ord_id={cli_ord_id}",
1044                        cancel.order_id
1045                    );
1046                    return;
1047                }
1048            } else {
1049                log::debug!(
1050                    "Cancel received for unknown order (not in cache): \
1051                    order_id={}, cli_ord_id={cli_ord_id}",
1052                    cancel.order_id
1053                );
1054                return;
1055            }
1056        } else if let Some(mapped_cli_ord_id) = self.venue_order_cache.get(&venue_order_id_key) {
1057            if let Some(info) = self.client_order_cache.get(mapped_cli_ord_id) {
1058                (*mapped_cli_ord_id, info.clone())
1059            } else {
1060                log::debug!(
1061                    "Cancel received but mapped order not in cache: order_id={}",
1062                    cancel.order_id
1063                );
1064                return;
1065            }
1066        } else {
1067            log::debug!(
1068                "Cancel received without cli_ord_id and no venue mapping (external order): \
1069                order_id={}",
1070                cancel.order_id
1071            );
1072            return;
1073        };
1074
1075        let venue_order_id = VenueOrderId::new(&cancel.order_id);
1076
1077        let canceled = OrderCanceled::new(
1078            info.trader_id,
1079            info.strategy_id,
1080            info.instrument_id,
1081            client_order_id,
1082            UUID4::new(),
1083            ts_init,
1084            ts_init,
1085            false,
1086            Some(venue_order_id),
1087            Some(account_id),
1088        );
1089
1090        self.pending_messages
1091            .push_back(KrakenFuturesWsMessage::OrderCanceled(canceled));
1092    }
1093
1094    fn handle_fills_delta_value(&mut self, value: Value, ts_init: UnixNanos) {
1095        let delta = match serde_json::from_value::<KrakenFuturesFillsDelta>(value) {
1096            Ok(d) => d,
1097            Err(e) => {
1098                log::error!("Failed to parse fills delta: {e}");
1099                return;
1100            }
1101        };
1102
1103        log::debug!("Received fills delta: fill_count={}", delta.fills.len());
1104
1105        for fill in &delta.fills {
1106            log::debug!(
1107                "Processing fill: fill_id={}, order_id={}",
1108                fill.fill_id,
1109                fill.order_id
1110            );
1111
1112            if let Some(report) = self.parse_fill_to_report(fill, ts_init) {
1113                self.pending_messages
1114                    .push_back(KrakenFuturesWsMessage::FillReport(Box::new(report)));
1115            }
1116        }
1117    }
1118
1119    /// Parses a Kraken Futures order message into a proper order event.
1120    ///
1121    /// Returns the appropriate event type based on order status:
1122    /// - New orders with no fills -> `OrderAccepted`
1123    /// - Canceled orders -> `OrderCanceled` or `OrderExpired` (based on reason)
1124    /// - Orders without cached info -> `StatusOnly` (for reconciliation)
1125    fn parse_order_event(
1126        &self,
1127        order: &KrakenFuturesOpenOrder,
1128        ts_init: UnixNanos,
1129        is_cancel: bool,
1130        cancel_reason: Option<&str>,
1131    ) -> Option<ParsedOrderEvent> {
1132        let Some(account_id) = self.account_id else {
1133            log::warn!("Cannot process order: account_id not set");
1134            return None;
1135        };
1136
1137        let instrument = self.instruments_cache.get(&order.instrument)?;
1138
1139        let instrument_id = instrument.id();
1140
1141        if order.qty <= 0.0 {
1142            log::warn!(
1143                "Skipping order with invalid quantity: order_id={}, qty={}",
1144                order.order_id,
1145                order.qty
1146            );
1147            return None;
1148        }
1149
1150        let ts_event = UnixNanos::from((order.last_update_time as u64) * 1_000_000);
1151        let venue_order_id = VenueOrderId::new(&order.order_id);
1152
1153        let client_order_id = order
1154            .cli_ord_id
1155            .as_ref()
1156            .filter(|s| !s.is_empty())
1157            .map(|s| ClientOrderId::new(s.as_str()));
1158
1159        let cached_info = order
1160            .cli_ord_id
1161            .as_ref()
1162            .filter(|id| !id.is_empty())
1163            .and_then(|id| self.client_order_cache.get(&ClientOrderId::new(id)));
1164
1165        // External orders or snapshots fall back to OrderStatusReport for reconciliation
1166        let Some(info) = cached_info else {
1167            return self
1168                .parse_order_to_status_report(order, ts_init, is_cancel)
1169                .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r)));
1170        };
1171
1172        let client_order_id = client_order_id.expect("client_order_id should exist if cached");
1173
1174        // Check fill status first - a fully filled order should be Filled even if is_cancel=true
1175        // (Kraken sends is_cancel=true with reason=full_fill for completed orders)
1176        let status = if order.filled >= order.qty && order.qty > 0.0 {
1177            OrderStatus::Filled
1178        } else if order.filled > 0.0 {
1179            OrderStatus::PartiallyFilled
1180        } else if is_cancel {
1181            OrderStatus::Canceled
1182        } else {
1183            OrderStatus::Accepted
1184        };
1185
1186        match status {
1187            OrderStatus::Accepted => Some(ParsedOrderEvent::Accepted(OrderAccepted::new(
1188                info.trader_id,
1189                info.strategy_id,
1190                instrument_id,
1191                client_order_id,
1192                venue_order_id,
1193                account_id,
1194                UUID4::new(),
1195                ts_event,
1196                ts_init,
1197                false,
1198            ))),
1199            OrderStatus::Canceled => {
1200                // Detect expiry by cancel reason keywords
1201                let is_expired = cancel_reason.is_some_and(|r| {
1202                    let r_lower = r.to_lowercase();
1203                    r_lower.contains("expir")
1204                        || r_lower.contains("gtd")
1205                        || r_lower.contains("timeout")
1206                });
1207
1208                if is_expired {
1209                    Some(ParsedOrderEvent::Expired(OrderExpired::new(
1210                        info.trader_id,
1211                        info.strategy_id,
1212                        instrument_id,
1213                        client_order_id,
1214                        UUID4::new(),
1215                        ts_event,
1216                        ts_init,
1217                        false,
1218                        Some(venue_order_id),
1219                        Some(account_id),
1220                    )))
1221                } else {
1222                    Some(ParsedOrderEvent::Canceled(OrderCanceled::new(
1223                        info.trader_id,
1224                        info.strategy_id,
1225                        instrument_id,
1226                        client_order_id,
1227                        UUID4::new(),
1228                        ts_event,
1229                        ts_init,
1230                        false,
1231                        Some(venue_order_id),
1232                        Some(account_id),
1233                    )))
1234                }
1235            }
1236
1237            // Fill events are handled separately via the fills feed
1238            OrderStatus::PartiallyFilled | OrderStatus::Filled => self
1239                .parse_order_to_status_report(order, ts_init, is_cancel)
1240                .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r))),
1241            _ => self
1242                .parse_order_to_status_report(order, ts_init, is_cancel)
1243                .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r))),
1244        }
1245    }
1246
1247    /// Parses a Kraken Futures order into an `OrderStatusReport`.
1248    ///
1249    /// Used for snapshots (reconciliation) and external orders.
1250    fn parse_order_to_status_report(
1251        &self,
1252        order: &KrakenFuturesOpenOrder,
1253        ts_init: UnixNanos,
1254        is_cancel: bool,
1255    ) -> Option<OrderStatusReport> {
1256        let Some(account_id) = self.account_id else {
1257            log::warn!("Cannot process order: account_id not set");
1258            return None;
1259        };
1260
1261        let instrument = self.instruments_cache.get(&order.instrument)?;
1262
1263        let instrument_id = instrument.id();
1264        let size_precision = instrument.size_precision();
1265
1266        let side = if order.direction == 0 {
1267            OrderSide::Buy
1268        } else {
1269            OrderSide::Sell
1270        };
1271
1272        let order_type = match order.order_type.as_str() {
1273            "limit" | "lmt" => OrderType::Limit,
1274            "stop" | "stp" => OrderType::StopLimit,
1275            "take_profit" => OrderType::LimitIfTouched,
1276            "market" | "mkt" => OrderType::Market,
1277            _ => OrderType::Limit,
1278        };
1279
1280        if order.qty <= 0.0 {
1281            log::warn!(
1282                "Skipping order with invalid quantity: order_id={}, qty={}",
1283                order.order_id,
1284                order.qty
1285            );
1286            return None;
1287        }
1288
1289        // Check fill status first - a fully filled order should be Filled even if is_cancel=true
1290        let status = if order.filled >= order.qty {
1291            OrderStatus::Filled
1292        } else if order.filled > 0.0 {
1293            OrderStatus::PartiallyFilled
1294        } else if is_cancel {
1295            OrderStatus::Canceled
1296        } else {
1297            OrderStatus::Accepted
1298        };
1299
1300        let ts_event = UnixNanos::from((order.last_update_time as u64) * 1_000_000);
1301
1302        let client_order_id = order
1303            .cli_ord_id
1304            .as_ref()
1305            .filter(|s| !s.is_empty())
1306            .map(|s| ClientOrderId::new(s.as_str()));
1307
1308        let filled_qty = if order.filled <= 0.0 {
1309            Quantity::zero(size_precision)
1310        } else {
1311            Quantity::new(order.filled, size_precision)
1312        };
1313
1314        Some(OrderStatusReport::new(
1315            account_id,
1316            instrument_id,
1317            client_order_id,
1318            VenueOrderId::new(&order.order_id),
1319            side,
1320            order_type,
1321            TimeInForce::Gtc,
1322            status,
1323            Quantity::new(order.qty, size_precision),
1324            filled_qty,
1325            ts_event, // ts_accepted
1326            ts_event, // ts_last
1327            ts_init,
1328            Some(UUID4::new()),
1329        ))
1330    }
1331
1332    fn parse_fill_to_report(
1333        &self,
1334        fill: &KrakenFuturesFill,
1335        ts_init: UnixNanos,
1336    ) -> Option<FillReport> {
1337        let Some(account_id) = self.account_id else {
1338            log::warn!("Cannot process fill: account_id not set");
1339            return None;
1340        };
1341
1342        // Resolve instrument: try message field first, then fall back to cache
1343        let instrument = if let Some(ref symbol) = fill.instrument {
1344            self.instruments_cache.get(symbol).cloned()
1345        } else if let Some(ref cli_ord_id) = fill.cli_ord_id.as_ref().filter(|id| !id.is_empty()) {
1346            // Fall back to client order cache
1347            self.client_order_cache
1348                .get(&ClientOrderId::new(cli_ord_id))
1349                .and_then(|info| {
1350                    self.instruments_cache
1351                        .iter()
1352                        .find(|(_, inst)| inst.id() == info.instrument_id)
1353                        .map(|(_, inst)| inst.clone())
1354                })
1355        } else {
1356            None
1357        };
1358
1359        let Some(instrument) = instrument else {
1360            log::warn!(
1361                "Cannot resolve instrument for fill: fill_id={}, order_id={}, cli_ord_id={:?}",
1362                fill.fill_id,
1363                fill.order_id,
1364                fill.cli_ord_id
1365            );
1366            return None;
1367        };
1368
1369        let instrument_id = instrument.id();
1370        let price_precision = instrument.price_precision();
1371        let size_precision = instrument.size_precision();
1372
1373        if fill.qty <= 0.0 {
1374            log::warn!(
1375                "Skipping fill with invalid quantity: fill_id={}, qty={}",
1376                fill.fill_id,
1377                fill.qty
1378            );
1379            return None;
1380        }
1381
1382        let side = if fill.buy {
1383            OrderSide::Buy
1384        } else {
1385            OrderSide::Sell
1386        };
1387
1388        let ts_event = UnixNanos::from((fill.time as u64) * 1_000_000);
1389
1390        let client_order_id = fill
1391            .cli_ord_id
1392            .as_ref()
1393            .filter(|s| !s.is_empty())
1394            .map(|s| ClientOrderId::new(s.as_str()));
1395
1396        let commission = Money::new(fill.fee_paid.unwrap_or(0.0), instrument.quote_currency());
1397
1398        Some(FillReport::new(
1399            account_id,
1400            instrument_id,
1401            VenueOrderId::new(&fill.order_id),
1402            TradeId::new(&fill.fill_id),
1403            side,
1404            Quantity::new(fill.qty, size_precision),
1405            Price::new(fill.price, price_precision),
1406            commission,
1407            LiquiditySide::NoLiquiditySide, // Not provided
1408            client_order_id,
1409            None, // venue_position_id
1410            ts_event,
1411            ts_init,
1412            Some(UUID4::new()),
1413        ))
1414    }
1415}
1416
1417#[cfg(test)]
1418mod tests {
1419    use nautilus_model::{
1420        instruments::{CryptoFuture, InstrumentAny},
1421        types::Currency,
1422    };
1423    use rstest::rstest;
1424
1425    use super::*;
1426
1427    fn create_test_handler() -> FuturesFeedHandler {
1428        let signal = Arc::new(AtomicBool::new(false));
1429        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
1430        let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
1431        let subscriptions = SubscriptionState::new(':');
1432
1433        FuturesFeedHandler::new(signal, cmd_rx, raw_rx, subscriptions)
1434    }
1435
1436    fn create_test_instrument() -> InstrumentAny {
1437        InstrumentAny::CryptoFuture(CryptoFuture::new(
1438            InstrumentId::from("PI_XBTUSD.KRAKEN"),
1439            Symbol::from("PI_XBTUSD"),
1440            Currency::BTC(),
1441            Currency::USD(),
1442            Currency::USD(),
1443            false,
1444            UnixNanos::default(),
1445            UnixNanos::default(),
1446            1, // price_precision
1447            0, // size_precision
1448            Price::from("0.5"),
1449            Quantity::from(1),
1450            None,
1451            None,
1452            None,
1453            None,
1454            None,
1455            None,
1456            None,
1457            None,
1458            None,
1459            None,
1460            None,
1461            None,
1462            UnixNanos::default(),
1463            UnixNanos::default(),
1464        ))
1465    }
1466
1467    #[rstest]
1468    fn test_book_snapshot_filters_zero_quantity_bids() {
1469        let mut handler = create_test_handler();
1470        let instrument = create_test_instrument();
1471        handler
1472            .instruments_cache
1473            .insert(Ustr::from("PI_XBTUSD"), instrument);
1474
1475        handler.subscriptions.mark_subscribe("book:PI_XBTUSD");
1476        handler.subscriptions.confirm_subscribe("book:PI_XBTUSD");
1477
1478        let json = include_str!("../../../test_data/ws_futures_book_snapshot_with_zero_qty.json");
1479        let ts_init = UnixNanos::from(1_000_000_000);
1480
1481        handler.parse_message(json, ts_init);
1482
1483        assert_eq!(handler.pending_messages.len(), 1);
1484
1485        let msg = handler.pending_messages.pop_front().unwrap();
1486        let KrakenFuturesWsMessage::BookDeltas(deltas) = msg else {
1487            panic!("Expected BookDeltas message");
1488        };
1489
1490        // Fixture has 3 bids (1 zero qty) + 2 asks (1 zero qty) = 3 valid + 1 clear = 4
1491        assert_eq!(deltas.deltas.len(), 4);
1492        assert_eq!(deltas.deltas[0].action, BookAction::Clear);
1493
1494        for delta in &deltas.deltas[1..] {
1495            assert!(
1496                !delta.order.size.is_zero(),
1497                "Found zero-quantity delta that should have been filtered: {delta:?}"
1498            );
1499        }
1500    }
1501
1502    #[rstest]
1503    fn test_book_snapshot_filters_zero_quantity_asks() {
1504        let mut handler = create_test_handler();
1505        let instrument = create_test_instrument();
1506        handler
1507            .instruments_cache
1508            .insert(Ustr::from("PI_XBTUSD"), instrument);
1509
1510        handler.subscriptions.mark_subscribe("book:PI_XBTUSD");
1511        handler.subscriptions.confirm_subscribe("book:PI_XBTUSD");
1512
1513        let json = include_str!("../../../test_data/ws_futures_book_snapshot_with_zero_qty.json");
1514        let ts_init = UnixNanos::from(1_000_000_000);
1515
1516        handler.parse_message(json, ts_init);
1517
1518        let msg = handler.pending_messages.pop_front().unwrap();
1519        let KrakenFuturesWsMessage::BookDeltas(deltas) = msg else {
1520            panic!("Expected BookDeltas message");
1521        };
1522
1523        // Only 1 ask should remain (the one with qty 2300, not the zero qty one)
1524        let sell_deltas: Vec<_> = deltas
1525            .deltas
1526            .iter()
1527            .filter(|d| d.order.side == OrderSide::Sell)
1528            .collect();
1529
1530        assert_eq!(sell_deltas.len(), 1);
1531        assert_eq!(sell_deltas[0].order.price.as_f64(), 34912.0);
1532    }
1533
1534    #[rstest]
1535    fn test_trade_filters_zero_quantity() {
1536        let mut handler = create_test_handler();
1537        let instrument = create_test_instrument();
1538        handler
1539            .instruments_cache
1540            .insert(Ustr::from("PI_XBTUSD"), instrument);
1541
1542        handler.subscriptions.mark_subscribe("trades:PI_XBTUSD");
1543        handler.subscriptions.confirm_subscribe("trades:PI_XBTUSD");
1544
1545        let json = r#"{
1546            "feed": "trade",
1547            "product_id": "PI_XBTUSD",
1548            "time": 1612269825817,
1549            "side": "buy",
1550            "qty": 0.0,
1551            "price": 34900.0,
1552            "seq": 12345
1553        }"#;
1554        let ts_init = UnixNanos::from(1_000_000_000);
1555
1556        handler.parse_message(json, ts_init);
1557
1558        assert!(
1559            handler.pending_messages.is_empty(),
1560            "Zero quantity trade should be filtered out"
1561        );
1562    }
1563}