1use std::{
19 collections::VecDeque,
20 fmt::Debug,
21 sync::{
22 Arc,
23 atomic::{AtomicBool, Ordering},
24 },
25};
26
27use ahash::AHashMap;
28use nautilus_common::cache::quote::QuoteCache;
29use nautilus_core::{AtomicTime, UUID4, UnixNanos, time::get_atomic_clock_realtime};
30use nautilus_model::{
31 data::{
32 BookOrder, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, TradeTick,
33 },
34 enums::{
35 AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType, TimeInForce,
36 },
37 events::{OrderAccepted, OrderCanceled, OrderExpired, OrderUpdated},
38 identifiers::{
39 AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TradeId, TraderId, VenueOrderId,
40 },
41 instruments::{Instrument, InstrumentAny},
42 reports::{FillReport, OrderStatusReport},
43 types::{Money, Price, Quantity},
44};
45use nautilus_network::{
46 RECONNECTED,
47 websocket::{SubscriptionState, WebSocketClient},
48};
49use serde::Deserialize;
50use serde_json::Value;
51use tokio_tungstenite::tungstenite::Message;
52use ustr::Ustr;
53
54use super::messages::{
55 KrakenFuturesBookDelta, KrakenFuturesBookSnapshot, KrakenFuturesChallengeRequest,
56 KrakenFuturesChannel, KrakenFuturesEvent, KrakenFuturesFeed, KrakenFuturesFill,
57 KrakenFuturesFillsDelta, KrakenFuturesMessageType, KrakenFuturesOpenOrder,
58 KrakenFuturesOpenOrdersCancel, KrakenFuturesOpenOrdersDelta,
59 KrakenFuturesPrivateSubscribeRequest, KrakenFuturesTickerData, KrakenFuturesTradeData,
60 KrakenFuturesTradeSnapshot, KrakenFuturesWsMessage, classify_futures_message,
61};
62use crate::common::enums::KrakenOrderSide;
63
64#[derive(Debug, Clone)]
66pub enum ParsedOrderEvent {
67 Accepted(OrderAccepted),
68 Canceled(OrderCanceled),
69 Expired(OrderExpired),
70 Updated(OrderUpdated),
71 StatusOnly(Box<OrderStatusReport>),
72}
73
74#[derive(Debug, Clone)]
76struct CachedOrderInfo {
77 instrument_id: InstrumentId,
78 trader_id: TraderId,
79 strategy_id: StrategyId,
80}
81
82#[allow(
84 clippy::large_enum_variant,
85 reason = "Commands are ephemeral and immediately consumed"
86)]
87pub enum HandlerCommand {
88 SetClient(WebSocketClient),
89 SubscribeTicker(Symbol),
90 UnsubscribeTicker(Symbol),
91 SubscribeTrade(Symbol),
92 UnsubscribeTrade(Symbol),
93 SubscribeBook(Symbol),
94 UnsubscribeBook(Symbol),
95 Disconnect,
96 InitializeInstruments(Vec<InstrumentAny>),
97 UpdateInstrument(InstrumentAny),
98 SetAccountId(AccountId),
99 RequestChallenge {
100 api_key: String,
101 response_tx: tokio::sync::oneshot::Sender<String>,
102 },
103 SetAuthCredentials {
104 api_key: String,
105 original_challenge: String,
106 signed_challenge: String,
107 },
108 SubscribeOpenOrders,
109 SubscribeFills,
110 CacheClientOrder {
111 client_order_id: ClientOrderId,
112 venue_order_id: Option<VenueOrderId>,
113 instrument_id: InstrumentId,
114 trader_id: TraderId,
115 strategy_id: StrategyId,
116 },
117}
118
119impl Debug for HandlerCommand {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 match self {
122 Self::SetClient(_) => f.debug_struct(stringify!(SetClient)).finish(),
123 Self::SubscribeTicker(s) => f.debug_tuple("SubscribeTicker").field(s).finish(),
124 Self::UnsubscribeTicker(s) => f.debug_tuple("UnsubscribeTicker").field(s).finish(),
125 Self::SubscribeTrade(s) => f.debug_tuple("SubscribeTrade").field(s).finish(),
126 Self::UnsubscribeTrade(s) => f.debug_tuple("UnsubscribeTrade").field(s).finish(),
127 Self::SubscribeBook(s) => f.debug_tuple("SubscribeBook").field(s).finish(),
128 Self::UnsubscribeBook(s) => f.debug_tuple("UnsubscribeBook").field(s).finish(),
129 Self::Disconnect => write!(f, "Disconnect"),
130 Self::InitializeInstruments(v) => f
131 .debug_tuple("InitializeInstruments")
132 .field(&v.len())
133 .finish(),
134 Self::UpdateInstrument(i) => f.debug_tuple("UpdateInstrument").field(&i.id()).finish(),
135 Self::SetAccountId(id) => f.debug_tuple("SetAccountId").field(id).finish(),
136 Self::RequestChallenge { api_key, .. } => {
137 let masked = &api_key[..4.min(api_key.len())];
138 f.debug_struct(stringify!(RequestChallenge))
139 .field("api_key", &format!("{masked}..."))
140 .finish()
141 }
142 Self::SetAuthCredentials { api_key, .. } => {
143 let masked = &api_key[..4.min(api_key.len())];
144 f.debug_struct(stringify!(SetAuthCredentials))
145 .field("api_key", &format!("{masked}..."))
146 .finish()
147 }
148 Self::SubscribeOpenOrders => write!(f, "SubscribeOpenOrders"),
149 Self::SubscribeFills => write!(f, "SubscribeFills"),
150 Self::CacheClientOrder {
151 client_order_id,
152 instrument_id,
153 ..
154 } => f
155 .debug_struct(stringify!(CacheClientOrder))
156 .field("client_order_id", client_order_id)
157 .field("instrument_id", instrument_id)
158 .finish(),
159 }
160 }
161}
162
163pub struct FuturesFeedHandler {
165 clock: &'static AtomicTime,
166 signal: Arc<AtomicBool>,
167 client: Option<WebSocketClient>,
168 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
169 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
170 subscriptions: SubscriptionState,
171 instruments_cache: AHashMap<Ustr, InstrumentAny>,
172 quote_cache: QuoteCache,
173 pending_messages: VecDeque<KrakenFuturesWsMessage>,
174 account_id: Option<AccountId>,
175 api_key: Option<String>,
176 original_challenge: Option<String>,
177 signed_challenge: Option<String>,
178 client_order_cache: AHashMap<String, CachedOrderInfo>,
179 venue_order_cache: AHashMap<String, String>,
180 pending_challenge_tx: Option<tokio::sync::oneshot::Sender<String>>,
181}
182
183impl FuturesFeedHandler {
184 pub fn new(
186 signal: Arc<AtomicBool>,
187 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
188 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
189 subscriptions: SubscriptionState,
190 ) -> Self {
191 Self {
192 clock: get_atomic_clock_realtime(),
193 signal,
194 client: None,
195 cmd_rx,
196 raw_rx,
197 subscriptions,
198 instruments_cache: AHashMap::new(),
199 quote_cache: QuoteCache::new(),
200 pending_messages: VecDeque::new(),
201 account_id: None,
202 api_key: None,
203 original_challenge: None,
204 signed_challenge: None,
205 client_order_cache: AHashMap::new(),
206 venue_order_cache: AHashMap::new(),
207 pending_challenge_tx: None,
208 }
209 }
210
211 pub fn is_stopped(&self) -> bool {
212 self.signal.load(Ordering::Relaxed)
213 }
214
215 fn is_subscribed(&self, channel: KrakenFuturesChannel, symbol: &Ustr) -> bool {
216 let channel_ustr = Ustr::from(channel.as_ref());
217 self.subscriptions.is_subscribed(&channel_ustr, symbol)
218 }
219
220 fn get_instrument(&self, symbol: &Ustr) -> Option<&InstrumentAny> {
221 self.instruments_cache.get(symbol)
222 }
223
224 pub async fn next(&mut self) -> Option<KrakenFuturesWsMessage> {
226 if let Some(msg) = self.pending_messages.pop_front() {
228 return Some(msg);
229 }
230
231 loop {
232 tokio::select! {
233 Some(cmd) = self.cmd_rx.recv() => {
234 match cmd {
235 HandlerCommand::SetClient(client) => {
236 log::debug!("WebSocketClient received by futures handler");
237 self.client = Some(client);
238 }
239 HandlerCommand::SubscribeTicker(symbol) => {
240 self.send_subscribe(KrakenFuturesFeed::Ticker, &symbol).await;
241 }
242 HandlerCommand::UnsubscribeTicker(symbol) => {
243 self.send_unsubscribe(KrakenFuturesFeed::Ticker, &symbol).await;
244 }
245 HandlerCommand::SubscribeTrade(symbol) => {
246 self.send_subscribe(KrakenFuturesFeed::Trade, &symbol).await;
247 }
248 HandlerCommand::UnsubscribeTrade(symbol) => {
249 self.send_unsubscribe(KrakenFuturesFeed::Trade, &symbol).await;
250 }
251 HandlerCommand::SubscribeBook(symbol) => {
252 self.send_subscribe(KrakenFuturesFeed::Book, &symbol).await;
253 }
254 HandlerCommand::UnsubscribeBook(symbol) => {
255 self.send_unsubscribe(KrakenFuturesFeed::Book, &symbol).await;
256 }
257 HandlerCommand::Disconnect => {
258 log::debug!("Disconnect command received");
259 if let Some(client) = self.client.take() {
260 client.disconnect().await;
261 }
262 return None;
263 }
264 HandlerCommand::InitializeInstruments(instruments) => {
265 for inst in instruments {
266 self.instruments_cache.insert(inst.raw_symbol().inner(), inst);
267 }
268 let count = self.instruments_cache.len();
269 log::debug!("Initialized {count} instruments in futures handler cache");
270 }
271 HandlerCommand::UpdateInstrument(inst) => {
272 self.instruments_cache.insert(inst.raw_symbol().inner(), inst);
273 }
274 HandlerCommand::SetAccountId(account_id) => {
275 log::debug!("Setting account_id for futures handler: {account_id}");
276 self.account_id = Some(account_id);
277 }
278 HandlerCommand::RequestChallenge { api_key, response_tx } => {
279 log::debug!("Requesting challenge for authentication");
280 self.pending_challenge_tx = Some(response_tx);
281 self.send_challenge_request(&api_key).await;
282 }
283 HandlerCommand::SetAuthCredentials { api_key, original_challenge, signed_challenge } => {
284 log::debug!("Setting auth credentials for futures handler");
285 self.api_key = Some(api_key);
286 self.original_challenge = Some(original_challenge);
287 self.signed_challenge = Some(signed_challenge);
288 }
289 HandlerCommand::SubscribeOpenOrders => {
290 self.send_private_subscribe(KrakenFuturesFeed::OpenOrders).await;
291 }
292 HandlerCommand::SubscribeFills => {
293 self.send_private_subscribe(KrakenFuturesFeed::Fills).await;
294 }
295 HandlerCommand::CacheClientOrder {
296 client_order_id,
297 venue_order_id,
298 instrument_id,
299 trader_id,
300 strategy_id,
301 } => {
302 let client_order_id_str = client_order_id.to_string();
303 self.client_order_cache.insert(
304 client_order_id_str.clone(),
305 CachedOrderInfo {
306 instrument_id,
307 trader_id,
308 strategy_id,
309 },
310 );
311 if let Some(venue_id) = venue_order_id {
312 self.venue_order_cache
313 .insert(venue_id.to_string(), client_order_id_str);
314 }
315 }
316 }
317 continue;
318 }
319
320 msg = self.raw_rx.recv() => {
321 let msg = match msg {
322 Some(msg) => msg,
323 None => {
324 log::debug!("WebSocket stream closed");
325 return None;
326 }
327 };
328
329 if self.signal.load(Ordering::Relaxed) {
330 log::debug!("Stop signal received");
331 return None;
332 }
333
334 match &msg {
336 Message::Ping(data) => {
337 let len = data.len();
338 log::trace!("Received ping frame with {len} bytes");
339 if let Some(client) = &self.client
340 && let Err(e) = client.send_pong(data.to_vec()).await
341 {
342 log::warn!("Failed to send pong frame: {e}");
343 }
344 continue;
345 }
346 Message::Pong(_) => {
347 log::trace!("Received pong");
348 continue;
349 }
350 Message::Close(_) => {
351 log::info!("WebSocket connection closed");
352 return None;
353 }
354 Message::Frame(_) => {
355 log::trace!("Received raw frame");
356 continue;
357 }
358 _ => {}
359 }
360
361 let text: &str = match &msg {
363 Message::Text(text) => text,
364 Message::Binary(data) => match std::str::from_utf8(data) {
365 Ok(s) => s,
366 Err(_) => continue,
367 },
368 _ => continue,
369 };
370
371 if text == RECONNECTED {
372 log::info!("Received WebSocket reconnected signal");
373 self.quote_cache.clear();
374 return Some(KrakenFuturesWsMessage::Reconnected);
375 }
376
377 let ts_init = self.clock.get_time_ns();
378 self.parse_message(text, ts_init);
379
380 if let Some(msg) = self.pending_messages.pop_front() {
382 return Some(msg);
383 }
384
385 continue;
386 }
387 }
388 }
389 }
390
391 async fn send_subscribe(&self, feed: KrakenFuturesFeed, symbol: &Symbol) {
392 if let Some(ref client) = self.client {
393 let feed_str = serde_json::to_string(&feed).unwrap_or_default();
394 let feed_str = feed_str.trim_matches('"');
395 let msg = format!(
396 r#"{{"event":"subscribe","feed":"{feed_str}","product_ids":["{symbol}"]}}"#
397 );
398 if let Err(e) = client.send_text(msg, None).await {
399 log::error!("Failed to send {feed:?} subscribe: {e}");
400 }
401 }
402 }
403
404 async fn send_unsubscribe(&self, feed: KrakenFuturesFeed, symbol: &Symbol) {
405 if let Some(ref client) = self.client {
406 let feed_str = serde_json::to_string(&feed).unwrap_or_default();
407 let feed_str = feed_str.trim_matches('"');
408 let msg = format!(
409 r#"{{"event":"unsubscribe","feed":"{feed_str}","product_ids":["{symbol}"]}}"#
410 );
411 if let Err(e) = client.send_text(msg, None).await {
412 log::error!("Failed to send {feed:?} unsubscribe: {e}");
413 }
414 }
415 }
416
417 async fn send_private_subscribe(&self, feed: KrakenFuturesFeed) {
418 let Some(ref client) = self.client else {
419 log::error!("Cannot subscribe to {feed:?}: no WebSocket client");
420 return;
421 };
422
423 let Some(ref api_key) = self.api_key else {
424 log::error!("Cannot subscribe to {feed:?}: no API key set");
425 return;
426 };
427
428 let Some(ref original_challenge) = self.original_challenge else {
429 log::error!("Cannot subscribe to {feed:?}: no challenge set");
430 return;
431 };
432
433 let Some(ref signed_challenge) = self.signed_challenge else {
434 log::error!("Cannot subscribe to {feed:?}: no signed challenge set");
435 return;
436 };
437
438 let request = KrakenFuturesPrivateSubscribeRequest {
439 event: KrakenFuturesEvent::Subscribe,
440 feed,
441 api_key: api_key.clone(),
442 original_challenge: original_challenge.clone(),
443 signed_challenge: signed_challenge.clone(),
444 };
445
446 let msg = match serde_json::to_string(&request) {
447 Ok(m) => m,
448 Err(e) => {
449 log::error!("Failed to serialize {feed:?} subscribe request: {e}");
450 return;
451 }
452 };
453
454 if let Err(e) = client.send_text(msg, None).await {
455 log::error!("Failed to send {feed:?} subscribe: {e}");
456 } else {
457 log::debug!("Sent private subscribe request for {feed:?}");
458 }
459 }
460
461 async fn send_challenge_request(&self, api_key: &str) {
462 let Some(ref client) = self.client else {
463 log::error!("Cannot request challenge: no WebSocket client");
464 return;
465 };
466
467 let request = KrakenFuturesChallengeRequest {
468 event: KrakenFuturesEvent::Challenge,
469 api_key: api_key.to_string(),
470 };
471
472 let msg = match serde_json::to_string(&request) {
473 Ok(m) => m,
474 Err(e) => {
475 log::error!("Failed to serialize challenge request: {e}");
476 return;
477 }
478 };
479
480 if let Err(e) = client.send_text(msg, None).await {
481 log::error!("Failed to send challenge request: {e}");
482 } else {
483 log::debug!("Sent challenge request for authentication");
484 }
485 }
486
487 fn parse_message(&mut self, text: &str, ts_init: UnixNanos) {
488 let value: Value = match serde_json::from_str(text) {
489 Ok(v) => v,
490 Err(e) => {
491 log::debug!("Failed to parse message as JSON: {e}");
492 return;
493 }
494 };
495
496 match classify_futures_message(&value) {
497 KrakenFuturesMessageType::OpenOrdersSnapshot => {
499 log::debug!(
500 "Skipping open_orders_snapshot (REST reconciliation handles initial state)"
501 );
502 }
503 KrakenFuturesMessageType::OpenOrdersCancel => {
504 self.handle_open_orders_cancel_value(value, ts_init);
505 }
506 KrakenFuturesMessageType::OpenOrdersDelta => {
507 self.handle_open_orders_delta_value(value, ts_init);
508 }
509 KrakenFuturesMessageType::FillsSnapshot => {
510 log::debug!("Skipping fills_snapshot (REST reconciliation handles initial state)");
511 }
512 KrakenFuturesMessageType::FillsDelta => {
513 self.handle_fills_delta_value(value, ts_init);
514 }
515 KrakenFuturesMessageType::Ticker => {
517 self.handle_ticker_message_value(value, ts_init);
518 }
519 KrakenFuturesMessageType::TradeSnapshot => {
520 self.handle_trade_snapshot_value(value, ts_init);
521 }
522 KrakenFuturesMessageType::Trade => {
523 self.handle_trade_message_value(value, ts_init);
524 }
525 KrakenFuturesMessageType::BookSnapshot => {
526 self.handle_book_snapshot_value(value, ts_init);
527 }
528 KrakenFuturesMessageType::BookDelta => {
529 self.handle_book_delta_value(value, ts_init);
530 }
531 KrakenFuturesMessageType::Info => {
533 log::debug!("Received info message: {text}");
534 }
535 KrakenFuturesMessageType::Pong => {
536 log::trace!("Received pong response");
537 }
538 KrakenFuturesMessageType::Subscribed => {
539 log::debug!("Subscription confirmed: {text}");
540 }
541 KrakenFuturesMessageType::Unsubscribed => {
542 log::debug!("Unsubscription confirmed: {text}");
543 }
544 KrakenFuturesMessageType::Challenge => {
545 self.handle_challenge_response_value(value);
546 }
547 KrakenFuturesMessageType::Heartbeat => {
548 log::trace!("Heartbeat received");
549 }
550 KrakenFuturesMessageType::Unknown => {
551 log::debug!("Unhandled message: {text}");
552 }
553 }
554 }
555
556 fn handle_challenge_response_value(&mut self, value: Value) {
557 #[derive(Deserialize)]
558 struct ChallengeResponse {
559 message: String,
560 }
561
562 match serde_json::from_value::<ChallengeResponse>(value) {
563 Ok(response) => {
564 let len = response.message.len();
565 log::debug!("Challenge received, length: {len}");
566
567 if let Some(tx) = self.pending_challenge_tx.take() {
568 if tx.send(response.message).is_err() {
569 log::warn!("Failed to send challenge response - receiver dropped");
570 }
571 } else {
572 log::warn!("Received challenge but no pending request");
573 }
574 }
575 Err(e) => {
576 log::error!("Failed to parse challenge response: {e}");
577 }
578 }
579 }
580
581 fn emit_order_event(&mut self, event: ParsedOrderEvent) {
582 match event {
583 ParsedOrderEvent::Accepted(accepted) => {
584 self.pending_messages
585 .push_back(KrakenFuturesWsMessage::OrderAccepted(accepted));
586 }
587 ParsedOrderEvent::Canceled(canceled) => {
588 self.pending_messages
589 .push_back(KrakenFuturesWsMessage::OrderCanceled(canceled));
590 }
591 ParsedOrderEvent::Expired(expired) => {
592 self.pending_messages
593 .push_back(KrakenFuturesWsMessage::OrderExpired(expired));
594 }
595 ParsedOrderEvent::Updated(updated) => {
596 self.pending_messages
597 .push_back(KrakenFuturesWsMessage::OrderUpdated(updated));
598 }
599 ParsedOrderEvent::StatusOnly(report) => {
600 self.pending_messages
601 .push_back(KrakenFuturesWsMessage::OrderStatusReport(report));
602 }
603 }
604 }
605
606 fn handle_ticker_message_value(&mut self, value: Value, ts_init: UnixNanos) {
607 let ticker = match serde_json::from_value::<KrakenFuturesTickerData>(value) {
608 Ok(t) => t,
609 Err(e) => {
610 log::debug!("Failed to parse ticker: {e}");
611 return;
612 }
613 };
614
615 let (instrument_id, price_precision) = {
616 let Some(instrument) = self.get_instrument(&ticker.product_id) else {
617 let product_id = &ticker.product_id;
618 log::debug!("Instrument not found for product_id: {product_id}");
619 return;
620 };
621 (instrument.id(), instrument.price_precision())
622 };
623
624 let ts_event = ticker
625 .time
626 .map_or(ts_init, |t| UnixNanos::from((t as u64) * 1_000_000));
627
628 let has_mark = self.is_subscribed(KrakenFuturesChannel::Mark, &ticker.product_id);
629 let has_index = self.is_subscribed(KrakenFuturesChannel::Index, &ticker.product_id);
630
631 if let Some(mark_price) = ticker.mark_price
632 && has_mark
633 {
634 let update = MarkPriceUpdate::new(
635 instrument_id,
636 Price::new(mark_price, price_precision),
637 ts_event,
638 ts_init,
639 );
640 self.pending_messages
641 .push_back(KrakenFuturesWsMessage::MarkPrice(update));
642 }
643
644 if let Some(index_price) = ticker.index
645 && has_index
646 {
647 let update = IndexPriceUpdate::new(
648 instrument_id,
649 Price::new(index_price, price_precision),
650 ts_event,
651 ts_init,
652 );
653 self.pending_messages
654 .push_back(KrakenFuturesWsMessage::IndexPrice(update));
655 }
656 }
657
658 fn handle_trade_message_value(&mut self, value: Value, ts_init: UnixNanos) {
659 let trade = match serde_json::from_value::<KrakenFuturesTradeData>(value) {
660 Ok(t) => t,
661 Err(e) => {
662 log::trace!("Failed to parse trade: {e}");
663 return;
664 }
665 };
666
667 if !self.is_subscribed(KrakenFuturesChannel::Trades, &trade.product_id) {
668 return;
669 }
670
671 let (instrument_id, price_precision, size_precision) = {
672 let Some(instrument) = self.get_instrument(&trade.product_id) else {
673 return;
674 };
675 (
676 instrument.id(),
677 instrument.price_precision(),
678 instrument.size_precision(),
679 )
680 };
681
682 let size = Quantity::new(trade.qty, size_precision);
683 if size.is_zero() {
684 let product_id = trade.product_id;
685 let raw_qty = trade.qty;
686 log::warn!("Skipping zero quantity trade for {product_id} (raw qty: {raw_qty})");
687 return;
688 }
689
690 let ts_event = UnixNanos::from((trade.time as u64) * 1_000_000);
691 let aggressor_side = match trade.side {
692 KrakenOrderSide::Buy => AggressorSide::Buyer,
693 KrakenOrderSide::Sell => AggressorSide::Seller,
694 };
695 let trade_id = trade.uid.unwrap_or_else(|| trade.seq.to_string());
696
697 let trade_tick = TradeTick::new(
698 instrument_id,
699 Price::new(trade.price, price_precision),
700 size,
701 aggressor_side,
702 TradeId::new(&trade_id),
703 ts_event,
704 ts_init,
705 );
706
707 self.pending_messages
708 .push_back(KrakenFuturesWsMessage::Trade(trade_tick));
709 }
710
711 fn handle_trade_snapshot_value(&mut self, value: Value, ts_init: UnixNanos) {
712 let snapshot = match serde_json::from_value::<KrakenFuturesTradeSnapshot>(value) {
713 Ok(s) => s,
714 Err(e) => {
715 log::trace!("Failed to parse trade snapshot: {e}");
716 return;
717 }
718 };
719
720 if !self.is_subscribed(KrakenFuturesChannel::Trades, &snapshot.product_id) {
721 return;
722 }
723
724 let (instrument_id, price_precision, size_precision) = {
725 let Some(instrument) = self.get_instrument(&snapshot.product_id) else {
726 return;
727 };
728 (
729 instrument.id(),
730 instrument.price_precision(),
731 instrument.size_precision(),
732 )
733 };
734
735 for trade in snapshot.trades {
736 let size = Quantity::new(trade.qty, size_precision);
737 if size.is_zero() {
738 let product_id = snapshot.product_id;
739 let raw_qty = trade.qty;
740 log::warn!(
741 "Skipping zero quantity trade in snapshot for {product_id} (raw qty: {raw_qty})"
742 );
743 continue;
744 }
745
746 let ts_event = UnixNanos::from((trade.time as u64) * 1_000_000);
747 let aggressor_side = match trade.side {
748 KrakenOrderSide::Buy => AggressorSide::Buyer,
749 KrakenOrderSide::Sell => AggressorSide::Seller,
750 };
751 let trade_id = trade.uid.unwrap_or_else(|| trade.seq.to_string());
752
753 let trade_tick = TradeTick::new(
754 instrument_id,
755 Price::new(trade.price, price_precision),
756 size,
757 aggressor_side,
758 TradeId::new(&trade_id),
759 ts_event,
760 ts_init,
761 );
762
763 self.pending_messages
764 .push_back(KrakenFuturesWsMessage::Trade(trade_tick));
765 }
766 }
767
768 fn handle_book_snapshot_value(&mut self, value: Value, ts_init: UnixNanos) {
769 let snapshot = match serde_json::from_value::<KrakenFuturesBookSnapshot>(value) {
770 Ok(s) => s,
771 Err(e) => {
772 log::trace!("Failed to parse book snapshot: {e}");
773 return;
774 }
775 };
776
777 let has_book = self.is_subscribed(KrakenFuturesChannel::Book, &snapshot.product_id);
778 let has_quotes = self.is_subscribed(KrakenFuturesChannel::Quotes, &snapshot.product_id);
779
780 if !has_book && !has_quotes {
781 return;
782 }
783
784 let (instrument_id, price_precision, size_precision) = {
785 let Some(instrument) = self.get_instrument(&snapshot.product_id) else {
786 return;
787 };
788 (
789 instrument.id(),
790 instrument.price_precision(),
791 instrument.size_precision(),
792 )
793 };
794
795 let ts_event = UnixNanos::from((snapshot.timestamp as u64) * 1_000_000);
796
797 let best_bid = snapshot
798 .bids
799 .iter()
800 .filter(|l| l.qty > 0.0)
801 .max_by(|a, b| a.price.total_cmp(&b.price));
802 let best_ask = snapshot
803 .asks
804 .iter()
805 .filter(|l| l.qty > 0.0)
806 .min_by(|a, b| a.price.total_cmp(&b.price));
807
808 if has_quotes {
809 let bid_price = best_bid.map(|b| Price::new(b.price, price_precision));
810 let ask_price = best_ask.map(|a| Price::new(a.price, price_precision));
811 let bid_size = best_bid.map(|b| Quantity::new(b.qty, size_precision));
812 let ask_size = best_ask.map(|a| Quantity::new(a.qty, size_precision));
813
814 match self.quote_cache.process(
815 instrument_id,
816 bid_price,
817 ask_price,
818 bid_size,
819 ask_size,
820 ts_event,
821 ts_init,
822 ) {
823 Ok(quote) => {
824 self.pending_messages
825 .push_back(KrakenFuturesWsMessage::Quote(quote));
826 }
827 Err(e) => {
828 log::trace!("Quote cache process error: {e}");
829 }
830 }
831 }
832
833 if has_book {
834 let mut deltas = Vec::with_capacity(snapshot.bids.len() + snapshot.asks.len() + 1);
835
836 deltas.push(OrderBookDelta::clear(
837 instrument_id,
838 snapshot.seq as u64,
839 ts_event,
840 ts_init,
841 ));
842
843 for bid in &snapshot.bids {
844 let size = Quantity::new(bid.qty, size_precision);
845 if size.is_zero() {
846 continue;
847 }
848 let order = BookOrder::new(
849 OrderSide::Buy,
850 Price::new(bid.price, price_precision),
851 size,
852 0,
853 );
854 deltas.push(OrderBookDelta::new(
855 instrument_id,
856 BookAction::Add,
857 order,
858 0,
859 snapshot.seq as u64,
860 ts_event,
861 ts_init,
862 ));
863 }
864
865 for ask in &snapshot.asks {
866 let size = Quantity::new(ask.qty, size_precision);
867 if size.is_zero() {
868 continue;
869 }
870 let order = BookOrder::new(
871 OrderSide::Sell,
872 Price::new(ask.price, price_precision),
873 size,
874 0,
875 );
876 deltas.push(OrderBookDelta::new(
877 instrument_id,
878 BookAction::Add,
879 order,
880 0,
881 snapshot.seq as u64,
882 ts_event,
883 ts_init,
884 ));
885 }
886
887 let book_deltas = OrderBookDeltas::new(instrument_id, deltas);
888 self.pending_messages
889 .push_back(KrakenFuturesWsMessage::BookDeltas(book_deltas));
890 }
891 }
892
893 fn handle_book_delta_value(&mut self, value: Value, ts_init: UnixNanos) {
894 let delta = match serde_json::from_value::<KrakenFuturesBookDelta>(value) {
895 Ok(d) => d,
896 Err(e) => {
897 log::trace!("Failed to parse book delta: {e}");
898 return;
899 }
900 };
901
902 let has_book = self.is_subscribed(KrakenFuturesChannel::Book, &delta.product_id);
903 let has_quotes = self.is_subscribed(KrakenFuturesChannel::Quotes, &delta.product_id);
904
905 if !has_book && !has_quotes {
906 return;
907 }
908
909 let Some(instrument) = self.get_instrument(&delta.product_id) else {
910 return;
911 };
912
913 let ts_event = UnixNanos::from((delta.timestamp as u64) * 1_000_000);
914 let instrument_id = instrument.id();
915 let price_precision = instrument.price_precision();
916 let size_precision = instrument.size_precision();
917
918 let side: OrderSide = delta.side.into();
919
920 if has_quotes && delta.qty > 0.0 {
921 let price = Price::new(delta.price, price_precision);
922 let size = Quantity::new(delta.qty, size_precision);
923
924 let (bid_price, ask_price, bid_size, ask_size) = match side {
925 OrderSide::Buy => (Some(price), None, Some(size), None),
926 OrderSide::Sell => (None, Some(price), None, Some(size)),
927 _ => (None, None, None, None),
928 };
929
930 if let Ok(quote) = self.quote_cache.process(
931 instrument_id,
932 bid_price,
933 ask_price,
934 bid_size,
935 ask_size,
936 ts_event,
937 ts_init,
938 ) {
939 self.pending_messages
940 .push_back(KrakenFuturesWsMessage::Quote(quote));
941 }
942 }
943
944 if has_book {
945 let size = Quantity::new(delta.qty, size_precision);
946 let action = if size.is_zero() {
947 BookAction::Delete
948 } else {
949 BookAction::Update
950 };
951
952 let order = BookOrder::new(side, Price::new(delta.price, price_precision), size, 0);
953
954 let book_delta = OrderBookDelta::new(
955 instrument_id,
956 action,
957 order,
958 0,
959 delta.seq as u64,
960 ts_event,
961 ts_init,
962 );
963
964 let book_deltas = OrderBookDeltas::new(instrument_id, vec![book_delta]);
965 self.pending_messages
966 .push_back(KrakenFuturesWsMessage::BookDeltas(book_deltas));
967 }
968 }
969
970 fn handle_open_orders_delta_value(&mut self, value: Value, ts_init: UnixNanos) {
971 let delta = match serde_json::from_value::<KrakenFuturesOpenOrdersDelta>(value) {
972 Ok(d) => d,
973 Err(e) => {
974 log::error!("Failed to parse open_orders delta: {e}");
975 return;
976 }
977 };
978
979 log::debug!(
980 "Received open_orders delta: order_id={}, is_cancel={}, reason={:?}",
981 delta.order.order_id,
982 delta.is_cancel,
983 delta.reason
984 );
985
986 if let Some(event) = self.parse_order_event(
987 &delta.order,
988 ts_init,
989 delta.is_cancel,
990 delta.reason.as_deref(),
991 ) {
992 self.emit_order_event(event);
993 }
994 }
995
996 fn handle_open_orders_cancel_value(&mut self, value: Value, ts_init: UnixNanos) {
997 if let Some(reason) = value.get("reason").and_then(|r| r.as_str())
1000 && (reason == "full_fill" || reason == "partial_fill")
1001 {
1002 log::debug!(
1003 "Skipping open_orders cancel for fill (handled by fills feed): reason={reason}"
1004 );
1005 return;
1006 }
1007
1008 let cancel = match serde_json::from_value::<KrakenFuturesOpenOrdersCancel>(value) {
1009 Ok(c) => c,
1010 Err(e) => {
1011 log::error!("Failed to parse open_orders cancel: {e}");
1012 return;
1013 }
1014 };
1015
1016 log::debug!(
1017 "Received open_orders cancel: order_id={}, cli_ord_id={:?}, reason={:?}",
1018 cancel.order_id,
1019 cancel.cli_ord_id,
1020 cancel.reason
1021 );
1022
1023 let Some(account_id) = self.account_id else {
1024 log::warn!("Cannot process cancel: account_id not set");
1025 return;
1026 };
1027
1028 let (client_order_id, info) = if let Some(cli_ord_id) = cancel.cli_ord_id.as_ref() {
1029 if let Some(info) = self.client_order_cache.get(cli_ord_id) {
1030 (ClientOrderId::new(cli_ord_id), info.clone())
1031 } else if let Some(mapped_cli_ord_id) = self.venue_order_cache.get(&cancel.order_id) {
1032 if let Some(info) = self.client_order_cache.get(mapped_cli_ord_id) {
1033 (ClientOrderId::new(mapped_cli_ord_id), info.clone())
1034 } else {
1035 log::debug!(
1036 "Cancel received for unknown order (not in cache): \
1037 order_id={}, cli_ord_id={cli_ord_id}",
1038 cancel.order_id
1039 );
1040 return;
1041 }
1042 } else {
1043 log::debug!(
1044 "Cancel received for unknown order (not in cache): \
1045 order_id={}, cli_ord_id={cli_ord_id}",
1046 cancel.order_id
1047 );
1048 return;
1049 }
1050 } else if let Some(mapped_cli_ord_id) = self.venue_order_cache.get(&cancel.order_id) {
1051 if let Some(info) = self.client_order_cache.get(mapped_cli_ord_id) {
1052 (ClientOrderId::new(mapped_cli_ord_id), info.clone())
1053 } else {
1054 log::debug!(
1055 "Cancel received but mapped order not in cache: order_id={}",
1056 cancel.order_id
1057 );
1058 return;
1059 }
1060 } else {
1061 log::debug!(
1062 "Cancel received without cli_ord_id and no venue mapping (external order): \
1063 order_id={}",
1064 cancel.order_id
1065 );
1066 return;
1067 };
1068
1069 let venue_order_id = VenueOrderId::new(&cancel.order_id);
1070
1071 let canceled = OrderCanceled::new(
1072 info.trader_id,
1073 info.strategy_id,
1074 info.instrument_id,
1075 client_order_id,
1076 UUID4::new(),
1077 ts_init,
1078 ts_init,
1079 false,
1080 Some(venue_order_id),
1081 Some(account_id),
1082 );
1083
1084 self.pending_messages
1085 .push_back(KrakenFuturesWsMessage::OrderCanceled(canceled));
1086 }
1087
1088 fn handle_fills_delta_value(&mut self, value: Value, ts_init: UnixNanos) {
1089 let delta = match serde_json::from_value::<KrakenFuturesFillsDelta>(value) {
1090 Ok(d) => d,
1091 Err(e) => {
1092 log::error!("Failed to parse fills delta: {e}");
1093 return;
1094 }
1095 };
1096
1097 log::debug!("Received fills delta: fill_count={}", delta.fills.len());
1098
1099 for fill in &delta.fills {
1100 log::debug!(
1101 "Processing fill: fill_id={}, order_id={}",
1102 fill.fill_id,
1103 fill.order_id
1104 );
1105
1106 if let Some(report) = self.parse_fill_to_report(fill, ts_init) {
1107 self.pending_messages
1108 .push_back(KrakenFuturesWsMessage::FillReport(Box::new(report)));
1109 }
1110 }
1111 }
1112
1113 fn parse_order_event(
1120 &self,
1121 order: &KrakenFuturesOpenOrder,
1122 ts_init: UnixNanos,
1123 is_cancel: bool,
1124 cancel_reason: Option<&str>,
1125 ) -> Option<ParsedOrderEvent> {
1126 let Some(account_id) = self.account_id else {
1127 log::warn!("Cannot process order: account_id not set");
1128 return None;
1129 };
1130
1131 let instrument = self
1132 .instruments_cache
1133 .get(&Ustr::from(order.instrument.as_str()))?;
1134
1135 let instrument_id = instrument.id();
1136
1137 if order.qty <= 0.0 {
1138 log::warn!(
1139 "Skipping order with invalid quantity: order_id={}, qty={}",
1140 order.order_id,
1141 order.qty
1142 );
1143 return None;
1144 }
1145
1146 let ts_event = UnixNanos::from((order.last_update_time as u64) * 1_000_000);
1147 let venue_order_id = VenueOrderId::new(&order.order_id);
1148
1149 let client_order_id = order
1150 .cli_ord_id
1151 .as_ref()
1152 .map(|s| ClientOrderId::new(s.as_str()));
1153
1154 let cached_info = order
1155 .cli_ord_id
1156 .as_ref()
1157 .and_then(|id| self.client_order_cache.get(id));
1158
1159 let Some(info) = cached_info else {
1161 return self
1162 .parse_order_to_status_report(order, ts_init, is_cancel)
1163 .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r)));
1164 };
1165
1166 let client_order_id = client_order_id.expect("client_order_id should exist if cached");
1167
1168 let status = if is_cancel {
1169 OrderStatus::Canceled
1170 } else if order.filled >= order.qty {
1171 OrderStatus::Filled
1172 } else if order.filled > 0.0 {
1173 OrderStatus::PartiallyFilled
1174 } else {
1175 OrderStatus::Accepted
1176 };
1177
1178 match status {
1179 OrderStatus::Accepted => Some(ParsedOrderEvent::Accepted(OrderAccepted::new(
1180 info.trader_id,
1181 info.strategy_id,
1182 instrument_id,
1183 client_order_id,
1184 venue_order_id,
1185 account_id,
1186 UUID4::new(),
1187 ts_event,
1188 ts_init,
1189 false,
1190 ))),
1191 OrderStatus::Canceled => {
1192 let is_expired = cancel_reason.is_some_and(|r| {
1194 let r_lower = r.to_lowercase();
1195 r_lower.contains("expir")
1196 || r_lower.contains("gtd")
1197 || r_lower.contains("timeout")
1198 });
1199
1200 if is_expired {
1201 Some(ParsedOrderEvent::Expired(OrderExpired::new(
1202 info.trader_id,
1203 info.strategy_id,
1204 instrument_id,
1205 client_order_id,
1206 UUID4::new(),
1207 ts_event,
1208 ts_init,
1209 false,
1210 Some(venue_order_id),
1211 Some(account_id),
1212 )))
1213 } else {
1214 Some(ParsedOrderEvent::Canceled(OrderCanceled::new(
1215 info.trader_id,
1216 info.strategy_id,
1217 instrument_id,
1218 client_order_id,
1219 UUID4::new(),
1220 ts_event,
1221 ts_init,
1222 false,
1223 Some(venue_order_id),
1224 Some(account_id),
1225 )))
1226 }
1227 }
1228
1229 OrderStatus::PartiallyFilled | OrderStatus::Filled => self
1231 .parse_order_to_status_report(order, ts_init, is_cancel)
1232 .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r))),
1233 _ => self
1234 .parse_order_to_status_report(order, ts_init, is_cancel)
1235 .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r))),
1236 }
1237 }
1238
1239 fn parse_order_to_status_report(
1243 &self,
1244 order: &KrakenFuturesOpenOrder,
1245 ts_init: UnixNanos,
1246 is_cancel: bool,
1247 ) -> Option<OrderStatusReport> {
1248 let Some(account_id) = self.account_id else {
1249 log::warn!("Cannot process order: account_id not set");
1250 return None;
1251 };
1252
1253 let instrument = self
1254 .instruments_cache
1255 .get(&Ustr::from(order.instrument.as_str()))?;
1256
1257 let instrument_id = instrument.id();
1258 let size_precision = instrument.size_precision();
1259
1260 let side = if order.direction == 0 {
1261 OrderSide::Buy
1262 } else {
1263 OrderSide::Sell
1264 };
1265
1266 let order_type = match order.order_type.as_str() {
1267 "limit" | "lmt" => OrderType::Limit,
1268 "stop" | "stp" => OrderType::StopLimit,
1269 "take_profit" => OrderType::LimitIfTouched,
1270 "market" | "mkt" => OrderType::Market,
1271 _ => OrderType::Limit,
1272 };
1273
1274 let status = if is_cancel {
1275 OrderStatus::Canceled
1276 } else if order.filled >= order.qty {
1277 OrderStatus::Filled
1278 } else if order.filled > 0.0 {
1279 OrderStatus::PartiallyFilled
1280 } else {
1281 OrderStatus::Accepted
1282 };
1283
1284 if order.qty <= 0.0 {
1285 log::warn!(
1286 "Skipping order with invalid quantity: order_id={}, qty={}",
1287 order.order_id,
1288 order.qty
1289 );
1290 return None;
1291 }
1292
1293 let ts_event = UnixNanos::from((order.last_update_time as u64) * 1_000_000);
1294
1295 let client_order_id = order
1296 .cli_ord_id
1297 .as_ref()
1298 .map(|s| ClientOrderId::new(s.as_str()));
1299
1300 let filled_qty = if order.filled <= 0.0 {
1301 Quantity::zero(size_precision)
1302 } else {
1303 Quantity::new(order.filled, size_precision)
1304 };
1305
1306 Some(OrderStatusReport::new(
1307 account_id,
1308 instrument_id,
1309 client_order_id,
1310 VenueOrderId::new(&order.order_id),
1311 side,
1312 order_type,
1313 TimeInForce::Gtc,
1314 status,
1315 Quantity::new(order.qty, size_precision),
1316 filled_qty,
1317 ts_event, ts_event, ts_init,
1320 Some(UUID4::new()),
1321 ))
1322 }
1323
1324 fn parse_fill_to_report(
1325 &self,
1326 fill: &KrakenFuturesFill,
1327 ts_init: UnixNanos,
1328 ) -> Option<FillReport> {
1329 let Some(account_id) = self.account_id else {
1330 log::warn!("Cannot process fill: account_id not set");
1331 return None;
1332 };
1333
1334 let instrument = if let Some(ref symbol) = fill.instrument {
1336 self.instruments_cache.get(symbol).cloned()
1337 } else if let Some(ref cli_ord_id) = fill.cli_ord_id {
1338 self.client_order_cache.get(cli_ord_id).and_then(|info| {
1340 self.instruments_cache
1341 .iter()
1342 .find(|(_, inst)| inst.id() == info.instrument_id)
1343 .map(|(_, inst)| inst.clone())
1344 })
1345 } else {
1346 None
1347 };
1348
1349 let Some(instrument) = instrument else {
1350 log::warn!(
1351 "Cannot resolve instrument for fill: fill_id={}, order_id={}, cli_ord_id={:?}",
1352 fill.fill_id,
1353 fill.order_id,
1354 fill.cli_ord_id
1355 );
1356 return None;
1357 };
1358
1359 let instrument_id = instrument.id();
1360 let price_precision = instrument.price_precision();
1361 let size_precision = instrument.size_precision();
1362
1363 if fill.qty <= 0.0 {
1364 log::warn!(
1365 "Skipping fill with invalid quantity: fill_id={}, qty={}",
1366 fill.fill_id,
1367 fill.qty
1368 );
1369 return None;
1370 }
1371
1372 let side = if fill.buy {
1373 OrderSide::Buy
1374 } else {
1375 OrderSide::Sell
1376 };
1377
1378 let ts_event = UnixNanos::from((fill.time as u64) * 1_000_000);
1379
1380 let client_order_id = fill
1381 .cli_ord_id
1382 .as_ref()
1383 .map(|s| ClientOrderId::new(s.as_str()));
1384
1385 let commission = Money::new(fill.fee_paid.unwrap_or(0.0), instrument.quote_currency());
1386
1387 Some(FillReport::new(
1388 account_id,
1389 instrument_id,
1390 VenueOrderId::new(&fill.order_id),
1391 TradeId::new(&fill.fill_id),
1392 side,
1393 Quantity::new(fill.qty, size_precision),
1394 Price::new(fill.price, price_precision),
1395 commission,
1396 LiquiditySide::NoLiquiditySide, client_order_id,
1398 None, ts_event,
1400 ts_init,
1401 Some(UUID4::new()),
1402 ))
1403 }
1404}
1405
1406#[cfg(test)]
1407mod tests {
1408 use nautilus_model::{
1409 instruments::{CryptoFuture, InstrumentAny},
1410 types::Currency,
1411 };
1412 use rstest::rstest;
1413
1414 use super::*;
1415
1416 fn create_test_handler() -> FuturesFeedHandler {
1417 let signal = Arc::new(AtomicBool::new(false));
1418 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
1419 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
1420 let subscriptions = SubscriptionState::new(':');
1421
1422 FuturesFeedHandler::new(signal, cmd_rx, raw_rx, subscriptions)
1423 }
1424
1425 fn create_test_instrument() -> InstrumentAny {
1426 InstrumentAny::CryptoFuture(CryptoFuture::new(
1427 InstrumentId::from("PI_XBTUSD.KRAKEN"),
1428 Symbol::from("PI_XBTUSD"),
1429 Currency::BTC(),
1430 Currency::USD(),
1431 Currency::USD(),
1432 false,
1433 UnixNanos::default(),
1434 UnixNanos::default(),
1435 1, 0, Price::from("0.5"),
1438 Quantity::from(1),
1439 None,
1440 None,
1441 None,
1442 None,
1443 None,
1444 None,
1445 None,
1446 None,
1447 None,
1448 None,
1449 None,
1450 None,
1451 UnixNanos::default(),
1452 UnixNanos::default(),
1453 ))
1454 }
1455
1456 #[rstest]
1457 fn test_book_snapshot_filters_zero_quantity_bids() {
1458 let mut handler = create_test_handler();
1459 let instrument = create_test_instrument();
1460 handler
1461 .instruments_cache
1462 .insert(Ustr::from("PI_XBTUSD"), instrument);
1463
1464 handler.subscriptions.mark_subscribe("book:PI_XBTUSD");
1465 handler.subscriptions.confirm_subscribe("book:PI_XBTUSD");
1466
1467 let json = include_str!("../../../test_data/ws_futures_book_snapshot_with_zero_qty.json");
1468 let ts_init = UnixNanos::from(1_000_000_000);
1469
1470 handler.parse_message(json, ts_init);
1471
1472 assert_eq!(handler.pending_messages.len(), 1);
1473
1474 let msg = handler.pending_messages.pop_front().unwrap();
1475 let KrakenFuturesWsMessage::BookDeltas(deltas) = msg else {
1476 panic!("Expected BookDeltas message");
1477 };
1478
1479 assert_eq!(deltas.deltas.len(), 4);
1481 assert_eq!(deltas.deltas[0].action, BookAction::Clear);
1482
1483 for delta in &deltas.deltas[1..] {
1484 assert!(
1485 !delta.order.size.is_zero(),
1486 "Found zero-quantity delta that should have been filtered: {delta:?}"
1487 );
1488 }
1489 }
1490
1491 #[rstest]
1492 fn test_book_snapshot_filters_zero_quantity_asks() {
1493 let mut handler = create_test_handler();
1494 let instrument = create_test_instrument();
1495 handler
1496 .instruments_cache
1497 .insert(Ustr::from("PI_XBTUSD"), instrument);
1498
1499 handler.subscriptions.mark_subscribe("book:PI_XBTUSD");
1500 handler.subscriptions.confirm_subscribe("book:PI_XBTUSD");
1501
1502 let json = include_str!("../../../test_data/ws_futures_book_snapshot_with_zero_qty.json");
1503 let ts_init = UnixNanos::from(1_000_000_000);
1504
1505 handler.parse_message(json, ts_init);
1506
1507 let msg = handler.pending_messages.pop_front().unwrap();
1508 let KrakenFuturesWsMessage::BookDeltas(deltas) = msg else {
1509 panic!("Expected BookDeltas message");
1510 };
1511
1512 let sell_deltas: Vec<_> = deltas
1514 .deltas
1515 .iter()
1516 .filter(|d| d.order.side == OrderSide::Sell)
1517 .collect();
1518
1519 assert_eq!(sell_deltas.len(), 1);
1520 assert_eq!(sell_deltas[0].order.price.as_f64(), 34912.0);
1521 }
1522
1523 #[rstest]
1524 fn test_trade_filters_zero_quantity() {
1525 let mut handler = create_test_handler();
1526 let instrument = create_test_instrument();
1527 handler
1528 .instruments_cache
1529 .insert(Ustr::from("PI_XBTUSD"), instrument);
1530
1531 handler.subscriptions.mark_subscribe("trades:PI_XBTUSD");
1532 handler.subscriptions.confirm_subscribe("trades:PI_XBTUSD");
1533
1534 let json = r#"{
1535 "feed": "trade",
1536 "product_id": "PI_XBTUSD",
1537 "time": 1612269825817,
1538 "side": "buy",
1539 "qty": 0.0,
1540 "price": 34900.0,
1541 "seq": 12345
1542 }"#;
1543 let ts_init = UnixNanos::from(1_000_000_000);
1544
1545 handler.parse_message(json, ts_init);
1546
1547 assert!(
1548 handler.pending_messages.is_empty(),
1549 "Zero quantity trade should be filtered out"
1550 );
1551 }
1552}