1use std::{
19 collections::VecDeque,
20 fmt::Debug,
21 sync::{
22 Arc,
23 atomic::{AtomicBool, Ordering},
24 },
25};
26
27use ahash::AHashMap;
28use nautilus_common::cache::quote::QuoteCache;
29use nautilus_core::{AtomicTime, UUID4, UnixNanos, time::get_atomic_clock_realtime};
30use nautilus_model::{
31 data::{
32 BookOrder, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta,
33 OrderBookDeltas, TradeTick,
34 },
35 enums::{
36 AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType, TimeInForce,
37 },
38 events::{OrderAccepted, OrderCanceled, OrderExpired, OrderUpdated},
39 identifiers::{
40 AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TradeId, TraderId, VenueOrderId,
41 },
42 instruments::{Instrument, InstrumentAny},
43 reports::{FillReport, OrderStatusReport},
44 types::{Money, Price, Quantity},
45};
46use nautilus_network::{
47 RECONNECTED,
48 websocket::{SubscriptionState, WebSocketClient},
49};
50use rust_decimal::Decimal;
51use serde::Deserialize;
52use serde_json::Value;
53use tokio_tungstenite::tungstenite::Message;
54use ustr::Ustr;
55
56use super::messages::{
57 KrakenFuturesBookDelta, KrakenFuturesBookSnapshot, KrakenFuturesChallengeRequest,
58 KrakenFuturesChannel, KrakenFuturesEvent, KrakenFuturesFeed, KrakenFuturesFill,
59 KrakenFuturesFillsDelta, KrakenFuturesMessageType, KrakenFuturesOpenOrder,
60 KrakenFuturesOpenOrdersCancel, KrakenFuturesOpenOrdersDelta,
61 KrakenFuturesPrivateSubscribeRequest, KrakenFuturesTickerData, KrakenFuturesTradeData,
62 KrakenFuturesWsMessage, classify_futures_message,
63};
64use crate::common::enums::KrakenOrderSide;
65
66#[derive(Debug, Clone)]
68pub enum ParsedOrderEvent {
69 Accepted(OrderAccepted),
70 Canceled(OrderCanceled),
71 Expired(OrderExpired),
72 Updated(OrderUpdated),
73 StatusOnly(Box<OrderStatusReport>),
74}
75
76#[derive(Debug, Clone)]
78struct CachedOrderInfo {
79 instrument_id: InstrumentId,
80 trader_id: TraderId,
81 strategy_id: StrategyId,
82}
83
84#[allow(
86 clippy::large_enum_variant,
87 reason = "Commands are ephemeral and immediately consumed"
88)]
89pub enum HandlerCommand {
90 SetClient(WebSocketClient),
91 SubscribeTicker(Symbol),
92 UnsubscribeTicker(Symbol),
93 SubscribeTrade(Symbol),
94 UnsubscribeTrade(Symbol),
95 SubscribeBook(Symbol),
96 UnsubscribeBook(Symbol),
97 Disconnect,
98 InitializeInstruments(Vec<InstrumentAny>),
99 UpdateInstrument(InstrumentAny),
100 SetAccountId(AccountId),
101 RequestChallenge {
102 api_key: String,
103 response_tx: tokio::sync::oneshot::Sender<String>,
104 },
105 SetAuthCredentials {
106 api_key: String,
107 original_challenge: String,
108 signed_challenge: String,
109 },
110 SubscribeOpenOrders,
111 SubscribeFills,
112 CacheClientOrder {
113 client_order_id: ClientOrderId,
114 venue_order_id: Option<VenueOrderId>,
115 instrument_id: InstrumentId,
116 trader_id: TraderId,
117 strategy_id: StrategyId,
118 },
119}
120
121impl Debug for HandlerCommand {
122 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123 match self {
124 Self::SetClient(_) => f.debug_struct(stringify!(SetClient)).finish(),
125 Self::SubscribeTicker(s) => f.debug_tuple("SubscribeTicker").field(s).finish(),
126 Self::UnsubscribeTicker(s) => f.debug_tuple("UnsubscribeTicker").field(s).finish(),
127 Self::SubscribeTrade(s) => f.debug_tuple("SubscribeTrade").field(s).finish(),
128 Self::UnsubscribeTrade(s) => f.debug_tuple("UnsubscribeTrade").field(s).finish(),
129 Self::SubscribeBook(s) => f.debug_tuple("SubscribeBook").field(s).finish(),
130 Self::UnsubscribeBook(s) => f.debug_tuple("UnsubscribeBook").field(s).finish(),
131 Self::Disconnect => write!(f, "Disconnect"),
132 Self::InitializeInstruments(v) => f
133 .debug_tuple("InitializeInstruments")
134 .field(&v.len())
135 .finish(),
136 Self::UpdateInstrument(i) => f.debug_tuple("UpdateInstrument").field(&i.id()).finish(),
137 Self::SetAccountId(id) => f.debug_tuple("SetAccountId").field(id).finish(),
138 Self::RequestChallenge { api_key, .. } => {
139 let masked = &api_key[..4.min(api_key.len())];
140 f.debug_struct(stringify!(RequestChallenge))
141 .field("api_key", &format!("{masked}..."))
142 .finish()
143 }
144 Self::SetAuthCredentials { api_key, .. } => {
145 let masked = &api_key[..4.min(api_key.len())];
146 f.debug_struct(stringify!(SetAuthCredentials))
147 .field("api_key", &format!("{masked}..."))
148 .finish()
149 }
150 Self::SubscribeOpenOrders => write!(f, "SubscribeOpenOrders"),
151 Self::SubscribeFills => write!(f, "SubscribeFills"),
152 Self::CacheClientOrder {
153 client_order_id,
154 instrument_id,
155 ..
156 } => f
157 .debug_struct(stringify!(CacheClientOrder))
158 .field("client_order_id", client_order_id)
159 .field("instrument_id", instrument_id)
160 .finish(),
161 }
162 }
163}
164
165pub struct FuturesFeedHandler {
167 clock: &'static AtomicTime,
168 signal: Arc<AtomicBool>,
169 client: Option<WebSocketClient>,
170 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
171 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
172 subscriptions: SubscriptionState,
173 instruments_cache: AHashMap<Ustr, InstrumentAny>,
174 quote_cache: QuoteCache,
175 pending_messages: VecDeque<KrakenFuturesWsMessage>,
176 account_id: Option<AccountId>,
177 api_key: Option<String>,
178 original_challenge: Option<String>,
179 signed_challenge: Option<String>,
180 client_order_cache: AHashMap<ClientOrderId, CachedOrderInfo>,
181 venue_order_cache: AHashMap<VenueOrderId, ClientOrderId>,
182 pending_challenge_tx: Option<tokio::sync::oneshot::Sender<String>>,
183}
184
185impl FuturesFeedHandler {
186 pub fn new(
188 signal: Arc<AtomicBool>,
189 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
190 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
191 subscriptions: SubscriptionState,
192 ) -> Self {
193 Self {
194 clock: get_atomic_clock_realtime(),
195 signal,
196 client: None,
197 cmd_rx,
198 raw_rx,
199 subscriptions,
200 instruments_cache: AHashMap::new(),
201 quote_cache: QuoteCache::new(),
202 pending_messages: VecDeque::new(),
203 account_id: None,
204 api_key: None,
205 original_challenge: None,
206 signed_challenge: None,
207 client_order_cache: AHashMap::new(),
208 venue_order_cache: AHashMap::new(),
209 pending_challenge_tx: None,
210 }
211 }
212
213 pub fn is_stopped(&self) -> bool {
214 self.signal.load(Ordering::Relaxed)
215 }
216
217 fn is_subscribed(&self, channel: KrakenFuturesChannel, symbol: &Ustr) -> bool {
218 let channel_ustr = Ustr::from(channel.as_ref());
219 self.subscriptions.is_subscribed(&channel_ustr, symbol)
220 }
221
222 fn get_instrument(&self, symbol: &Ustr) -> Option<&InstrumentAny> {
223 self.instruments_cache.get(symbol)
224 }
225
226 pub async fn next(&mut self) -> Option<KrakenFuturesWsMessage> {
228 if let Some(msg) = self.pending_messages.pop_front() {
230 return Some(msg);
231 }
232
233 loop {
234 tokio::select! {
235 Some(cmd) = self.cmd_rx.recv() => {
236 match cmd {
237 HandlerCommand::SetClient(client) => {
238 log::debug!("WebSocketClient received by futures handler");
239 self.client = Some(client);
240 }
241 HandlerCommand::SubscribeTicker(symbol) => {
242 self.send_subscribe(KrakenFuturesFeed::Ticker, &symbol).await;
243 }
244 HandlerCommand::UnsubscribeTicker(symbol) => {
245 self.send_unsubscribe(KrakenFuturesFeed::Ticker, &symbol).await;
246 }
247 HandlerCommand::SubscribeTrade(symbol) => {
248 self.send_subscribe(KrakenFuturesFeed::Trade, &symbol).await;
249 }
250 HandlerCommand::UnsubscribeTrade(symbol) => {
251 self.send_unsubscribe(KrakenFuturesFeed::Trade, &symbol).await;
252 }
253 HandlerCommand::SubscribeBook(symbol) => {
254 self.send_subscribe(KrakenFuturesFeed::Book, &symbol).await;
255 }
256 HandlerCommand::UnsubscribeBook(symbol) => {
257 self.send_unsubscribe(KrakenFuturesFeed::Book, &symbol).await;
258 }
259 HandlerCommand::Disconnect => {
260 log::debug!("Disconnect command received");
261 if let Some(client) = self.client.take() {
262 client.disconnect().await;
263 }
264 return None;
265 }
266 HandlerCommand::InitializeInstruments(instruments) => {
267 for inst in instruments {
268 self.instruments_cache.insert(inst.raw_symbol().inner(), inst);
269 }
270 let count = self.instruments_cache.len();
271 log::debug!("Initialized {count} instruments in futures handler cache");
272 }
273 HandlerCommand::UpdateInstrument(inst) => {
274 self.instruments_cache.insert(inst.raw_symbol().inner(), inst);
275 }
276 HandlerCommand::SetAccountId(account_id) => {
277 log::debug!("Setting account_id for futures handler: {account_id}");
278 self.account_id = Some(account_id);
279 }
280 HandlerCommand::RequestChallenge { api_key, response_tx } => {
281 log::debug!("Requesting challenge for authentication");
282 self.pending_challenge_tx = Some(response_tx);
283 self.send_challenge_request(&api_key).await;
284 }
285 HandlerCommand::SetAuthCredentials { api_key, original_challenge, signed_challenge } => {
286 log::debug!("Setting auth credentials for futures handler");
287 self.api_key = Some(api_key);
288 self.original_challenge = Some(original_challenge);
289 self.signed_challenge = Some(signed_challenge);
290 }
291 HandlerCommand::SubscribeOpenOrders => {
292 self.send_private_subscribe(KrakenFuturesFeed::OpenOrders).await;
293 }
294 HandlerCommand::SubscribeFills => {
295 self.send_private_subscribe(KrakenFuturesFeed::Fills).await;
296 }
297 HandlerCommand::CacheClientOrder {
298 client_order_id,
299 venue_order_id,
300 instrument_id,
301 trader_id,
302 strategy_id,
303 } => {
304 self.client_order_cache.insert(
305 client_order_id,
306 CachedOrderInfo {
307 instrument_id,
308 trader_id,
309 strategy_id,
310 },
311 );
312 if let Some(venue_id) = venue_order_id {
313 self.venue_order_cache.insert(venue_id, client_order_id);
314 }
315 }
316 }
317 continue;
318 }
319
320 msg = self.raw_rx.recv() => {
321 let msg = match msg {
322 Some(msg) => msg,
323 None => {
324 log::debug!("WebSocket stream closed");
325 return None;
326 }
327 };
328
329 if self.signal.load(Ordering::Relaxed) {
330 log::debug!("Stop signal received");
331 return None;
332 }
333
334 match &msg {
336 Message::Ping(data) => {
337 let len = data.len();
338 log::trace!("Received ping frame with {len} bytes");
339 if let Some(client) = &self.client
340 && let Err(e) = client.send_pong(data.to_vec()).await
341 {
342 log::warn!("Failed to send pong frame: {e}");
343 }
344 continue;
345 }
346 Message::Pong(_) => {
347 log::debug!("Received pong from server");
348 continue;
349 }
350 Message::Close(_) => {
351 log::info!("WebSocket connection closed");
352 return None;
353 }
354 Message::Frame(_) => {
355 log::trace!("Received raw frame");
356 continue;
357 }
358 _ => {}
359 }
360
361 let text: &str = match &msg {
363 Message::Text(text) => text,
364 Message::Binary(data) => match std::str::from_utf8(data) {
365 Ok(s) => s,
366 Err(_) => continue,
367 },
368 _ => continue,
369 };
370
371 if text == RECONNECTED {
372 log::info!("Received WebSocket reconnected signal");
373 self.quote_cache.clear();
374 return Some(KrakenFuturesWsMessage::Reconnected);
375 }
376
377 let ts_init = self.clock.get_time_ns();
378 self.parse_message(text, ts_init);
379
380 if let Some(msg) = self.pending_messages.pop_front() {
382 return Some(msg);
383 }
384
385 continue;
386 }
387 }
388 }
389 }
390
391 async fn send_subscribe(&self, feed: KrakenFuturesFeed, symbol: &Symbol) {
392 if let Some(ref client) = self.client {
393 let feed_str = serde_json::to_string(&feed).unwrap_or_default();
394 let feed_str = feed_str.trim_matches('"');
395 let msg = format!(
396 r#"{{"event":"subscribe","feed":"{feed_str}","product_ids":["{symbol}"]}}"#
397 );
398 if let Err(e) = client.send_text(msg, None).await {
399 log::error!("Failed to send {feed:?} subscribe: {e}");
400 }
401 }
402 }
403
404 async fn send_unsubscribe(&self, feed: KrakenFuturesFeed, symbol: &Symbol) {
405 if let Some(ref client) = self.client {
406 let feed_str = serde_json::to_string(&feed).unwrap_or_default();
407 let feed_str = feed_str.trim_matches('"');
408 let msg = format!(
409 r#"{{"event":"unsubscribe","feed":"{feed_str}","product_ids":["{symbol}"]}}"#
410 );
411 if let Err(e) = client.send_text(msg, None).await {
412 log::error!("Failed to send {feed:?} unsubscribe: {e}");
413 }
414 }
415 }
416
417 async fn send_private_subscribe(&self, feed: KrakenFuturesFeed) {
418 let Some(ref client) = self.client else {
419 log::error!("Cannot subscribe to {feed:?}: no WebSocket client");
420 return;
421 };
422
423 let Some(ref api_key) = self.api_key else {
424 log::error!("Cannot subscribe to {feed:?}: no API key set");
425 return;
426 };
427
428 let Some(ref original_challenge) = self.original_challenge else {
429 log::error!("Cannot subscribe to {feed:?}: no challenge set");
430 return;
431 };
432
433 let Some(ref signed_challenge) = self.signed_challenge else {
434 log::error!("Cannot subscribe to {feed:?}: no signed challenge set");
435 return;
436 };
437
438 let request = KrakenFuturesPrivateSubscribeRequest {
439 event: KrakenFuturesEvent::Subscribe,
440 feed,
441 api_key: api_key.clone(),
442 original_challenge: original_challenge.clone(),
443 signed_challenge: signed_challenge.clone(),
444 };
445
446 let msg = match serde_json::to_string(&request) {
447 Ok(m) => m,
448 Err(e) => {
449 log::error!("Failed to serialize {feed:?} subscribe request: {e}");
450 return;
451 }
452 };
453
454 if let Err(e) = client.send_text(msg, None).await {
455 log::error!("Failed to send {feed:?} subscribe: {e}");
456 } else {
457 log::debug!("Sent private subscribe request for {feed:?}");
458 }
459 }
460
461 async fn send_challenge_request(&self, api_key: &str) {
462 let Some(ref client) = self.client else {
463 log::error!("Cannot request challenge: no WebSocket client");
464 return;
465 };
466
467 let request = KrakenFuturesChallengeRequest {
468 event: KrakenFuturesEvent::Challenge,
469 api_key: api_key.to_string(),
470 };
471
472 let msg = match serde_json::to_string(&request) {
473 Ok(m) => m,
474 Err(e) => {
475 log::error!("Failed to serialize challenge request: {e}");
476 return;
477 }
478 };
479
480 if let Err(e) = client.send_text(msg, None).await {
481 log::error!("Failed to send challenge request: {e}");
482 } else {
483 log::debug!("Sent challenge request for authentication");
484 }
485 }
486
487 fn parse_message(&mut self, text: &str, ts_init: UnixNanos) {
488 let value: Value = match serde_json::from_str(text) {
489 Ok(v) => v,
490 Err(e) => {
491 log::debug!("Failed to parse message as JSON: {e}");
492 return;
493 }
494 };
495
496 match classify_futures_message(&value) {
497 KrakenFuturesMessageType::OpenOrdersSnapshot => {
499 log::debug!(
500 "Skipping open_orders_snapshot (REST reconciliation handles initial state)"
501 );
502 }
503 KrakenFuturesMessageType::OpenOrdersCancel => {
504 self.handle_open_orders_cancel_value(value, ts_init);
505 }
506 KrakenFuturesMessageType::OpenOrdersDelta => {
507 self.handle_open_orders_delta_value(value, ts_init);
508 }
509 KrakenFuturesMessageType::FillsSnapshot => {
510 log::debug!("Skipping fills_snapshot (REST reconciliation handles initial state)");
511 }
512 KrakenFuturesMessageType::FillsDelta => {
513 self.handle_fills_delta_value(value, ts_init);
514 }
515 KrakenFuturesMessageType::Ticker => {
517 self.handle_ticker_message_value(value, ts_init);
518 }
519 KrakenFuturesMessageType::TradeSnapshot => {
520 log::debug!("Skipping trade_snapshot (only streaming live trades)");
521 }
522 KrakenFuturesMessageType::Trade => {
523 self.handle_trade_message_value(value, ts_init);
524 }
525 KrakenFuturesMessageType::BookSnapshot => {
526 self.handle_book_snapshot_value(value, ts_init);
527 }
528 KrakenFuturesMessageType::BookDelta => {
529 self.handle_book_delta_value(value, ts_init);
530 }
531 KrakenFuturesMessageType::Info => {
533 log::debug!("Received info message: {text}");
534 }
535 KrakenFuturesMessageType::Pong => {
536 log::debug!("Received text pong response");
537 }
538 KrakenFuturesMessageType::Subscribed => {
539 log::debug!("Subscription confirmed: {text}");
540 }
541 KrakenFuturesMessageType::Unsubscribed => {
542 log::debug!("Unsubscription confirmed: {text}");
543 }
544 KrakenFuturesMessageType::Challenge => {
545 self.handle_challenge_response_value(value);
546 }
547 KrakenFuturesMessageType::Heartbeat => {
548 log::trace!("Heartbeat received");
549 }
550 KrakenFuturesMessageType::Error => {
551 let message = value
552 .get("message")
553 .and_then(|v| v.as_str())
554 .unwrap_or("Unknown error");
555 log::error!("Kraken Futures WebSocket error: {message}");
556 }
557 KrakenFuturesMessageType::Alert => {
558 let message = value
559 .get("message")
560 .and_then(|v| v.as_str())
561 .unwrap_or("Unknown alert");
562 log::warn!("Kraken Futures WebSocket alert: {message}");
563 }
564 KrakenFuturesMessageType::Unknown => {
565 log::warn!("Unhandled futures message: {text}");
566 }
567 }
568 }
569
570 fn handle_challenge_response_value(&mut self, value: Value) {
571 #[derive(Deserialize)]
572 struct ChallengeResponse {
573 message: String,
574 }
575
576 match serde_json::from_value::<ChallengeResponse>(value) {
577 Ok(response) => {
578 let len = response.message.len();
579 log::debug!("Challenge received, length: {len}");
580
581 if let Some(tx) = self.pending_challenge_tx.take() {
582 if tx.send(response.message).is_err() {
583 log::warn!("Failed to send challenge response - receiver dropped");
584 }
585 } else {
586 log::warn!("Received challenge but no pending request");
587 }
588 }
589 Err(e) => {
590 log::error!("Failed to parse challenge response: {e}");
591 }
592 }
593 }
594
595 fn emit_order_event(&mut self, event: ParsedOrderEvent) {
596 match event {
597 ParsedOrderEvent::Accepted(accepted) => {
598 self.pending_messages
599 .push_back(KrakenFuturesWsMessage::OrderAccepted(accepted));
600 }
601 ParsedOrderEvent::Canceled(canceled) => {
602 self.pending_messages
603 .push_back(KrakenFuturesWsMessage::OrderCanceled(canceled));
604 }
605 ParsedOrderEvent::Expired(expired) => {
606 self.pending_messages
607 .push_back(KrakenFuturesWsMessage::OrderExpired(expired));
608 }
609 ParsedOrderEvent::Updated(updated) => {
610 self.pending_messages
611 .push_back(KrakenFuturesWsMessage::OrderUpdated(updated));
612 }
613 ParsedOrderEvent::StatusOnly(report) => {
614 self.pending_messages
615 .push_back(KrakenFuturesWsMessage::OrderStatusReport(report));
616 }
617 }
618 }
619
620 fn handle_ticker_message_value(&mut self, value: Value, ts_init: UnixNanos) {
621 let ticker = match serde_json::from_value::<KrakenFuturesTickerData>(value) {
622 Ok(t) => t,
623 Err(e) => {
624 log::debug!("Failed to parse ticker: {e}");
625 return;
626 }
627 };
628
629 let (instrument_id, price_precision) = {
630 let Some(instrument) = self.get_instrument(&ticker.product_id) else {
631 let product_id = &ticker.product_id;
632 log::debug!("Instrument not found for product_id: {product_id}");
633 return;
634 };
635 (instrument.id(), instrument.price_precision())
636 };
637
638 let ts_event = ticker
639 .time
640 .map_or(ts_init, |t| UnixNanos::from((t as u64) * 1_000_000));
641
642 let has_mark = self.is_subscribed(KrakenFuturesChannel::Mark, &ticker.product_id);
643 let has_index = self.is_subscribed(KrakenFuturesChannel::Index, &ticker.product_id);
644
645 if let Some(mark_price) = ticker.mark_price
646 && has_mark
647 {
648 let update = MarkPriceUpdate::new(
649 instrument_id,
650 Price::new(mark_price, price_precision),
651 ts_event,
652 ts_init,
653 );
654 self.pending_messages
655 .push_back(KrakenFuturesWsMessage::MarkPrice(update));
656 }
657
658 if let Some(index_price) = ticker.index
659 && has_index
660 {
661 let update = IndexPriceUpdate::new(
662 instrument_id,
663 Price::new(index_price, price_precision),
664 ts_event,
665 ts_init,
666 );
667 self.pending_messages
668 .push_back(KrakenFuturesWsMessage::IndexPrice(update));
669 }
670
671 let has_funding = self.is_subscribed(KrakenFuturesChannel::Funding, &ticker.product_id);
672
673 if let Some(funding_rate) = ticker.funding_rate
674 && has_funding
675 {
676 let next_funding_ns = ticker
677 .next_funding_rate_time
678 .map(|t| UnixNanos::from((t as u64) * 1_000_000));
679 let update = FundingRateUpdate::new(
680 instrument_id,
681 Decimal::from_f64_retain(funding_rate).unwrap_or_default(),
682 next_funding_ns,
683 ts_event,
684 ts_init,
685 );
686 self.pending_messages
687 .push_back(KrakenFuturesWsMessage::FundingRate(update));
688 }
689 }
690
691 fn handle_trade_message_value(&mut self, value: Value, ts_init: UnixNanos) {
692 let trade = match serde_json::from_value::<KrakenFuturesTradeData>(value) {
693 Ok(t) => t,
694 Err(e) => {
695 log::warn!("Failed to parse trade: {e}");
696 return;
697 }
698 };
699
700 if !self.is_subscribed(KrakenFuturesChannel::Trades, &trade.product_id) {
701 log::warn!(
702 "Received trade for unsubscribed product: {}",
703 trade.product_id
704 );
705 return;
706 }
707
708 let (instrument_id, price_precision, size_precision) = {
709 let Some(instrument) = self.get_instrument(&trade.product_id) else {
710 log::warn!(
711 "No instrument found for trade product: {}",
712 trade.product_id
713 );
714 return;
715 };
716 (
717 instrument.id(),
718 instrument.price_precision(),
719 instrument.size_precision(),
720 )
721 };
722
723 let size = Quantity::new(trade.qty, size_precision);
724 if size.is_zero() {
725 let product_id = trade.product_id;
726 let raw_qty = trade.qty;
727 log::warn!("Skipping zero quantity trade for {product_id} (raw qty: {raw_qty})");
728 return;
729 }
730
731 let ts_event = UnixNanos::from((trade.time as u64) * 1_000_000);
732 let aggressor_side = match trade.side {
733 KrakenOrderSide::Buy => AggressorSide::Buyer,
734 KrakenOrderSide::Sell => AggressorSide::Seller,
735 };
736 let trade_id = trade.uid.unwrap_or_else(|| trade.seq.to_string());
737
738 let trade_tick = TradeTick::new(
739 instrument_id,
740 Price::new(trade.price, price_precision),
741 size,
742 aggressor_side,
743 TradeId::new(&trade_id),
744 ts_event,
745 ts_init,
746 );
747
748 self.pending_messages
749 .push_back(KrakenFuturesWsMessage::Trade(trade_tick));
750 }
751
752 fn handle_book_snapshot_value(&mut self, value: Value, ts_init: UnixNanos) {
753 let snapshot = match serde_json::from_value::<KrakenFuturesBookSnapshot>(value) {
754 Ok(s) => s,
755 Err(e) => {
756 log::warn!("Failed to parse book snapshot: {e}");
757 return;
758 }
759 };
760
761 let has_book = self.is_subscribed(KrakenFuturesChannel::Book, &snapshot.product_id);
762 let has_quotes = self.is_subscribed(KrakenFuturesChannel::Quotes, &snapshot.product_id);
763
764 if !has_book && !has_quotes {
765 log::warn!(
766 "Received book snapshot for unsubscribed product: {}",
767 snapshot.product_id
768 );
769 return;
770 }
771
772 let (instrument_id, price_precision, size_precision) = {
773 let Some(instrument) = self.get_instrument(&snapshot.product_id) else {
774 log::warn!(
775 "No instrument found for book snapshot product: {}",
776 snapshot.product_id
777 );
778 return;
779 };
780 (
781 instrument.id(),
782 instrument.price_precision(),
783 instrument.size_precision(),
784 )
785 };
786
787 let ts_event = UnixNanos::from((snapshot.timestamp as u64) * 1_000_000);
788
789 let best_bid = snapshot
790 .bids
791 .iter()
792 .filter(|l| l.qty > 0.0)
793 .max_by(|a, b| a.price.total_cmp(&b.price));
794 let best_ask = snapshot
795 .asks
796 .iter()
797 .filter(|l| l.qty > 0.0)
798 .min_by(|a, b| a.price.total_cmp(&b.price));
799
800 if has_quotes {
801 let bid_price = best_bid.map(|b| Price::new(b.price, price_precision));
802 let ask_price = best_ask.map(|a| Price::new(a.price, price_precision));
803 let bid_size = best_bid.map(|b| Quantity::new(b.qty, size_precision));
804 let ask_size = best_ask.map(|a| Quantity::new(a.qty, size_precision));
805
806 match self.quote_cache.process(
807 instrument_id,
808 bid_price,
809 ask_price,
810 bid_size,
811 ask_size,
812 ts_event,
813 ts_init,
814 ) {
815 Ok(quote) => {
816 self.pending_messages
817 .push_back(KrakenFuturesWsMessage::Quote(quote));
818 }
819 Err(e) => {
820 log::trace!("Quote cache process error: {e}");
821 }
822 }
823 }
824
825 if has_book {
826 let mut deltas = Vec::with_capacity(snapshot.bids.len() + snapshot.asks.len() + 1);
827
828 deltas.push(OrderBookDelta::clear(
829 instrument_id,
830 snapshot.seq as u64,
831 ts_event,
832 ts_init,
833 ));
834
835 for bid in &snapshot.bids {
836 let size = Quantity::new(bid.qty, size_precision);
837 if size.is_zero() {
838 continue;
839 }
840 let order = BookOrder::new(
841 OrderSide::Buy,
842 Price::new(bid.price, price_precision),
843 size,
844 0,
845 );
846 deltas.push(OrderBookDelta::new(
847 instrument_id,
848 BookAction::Add,
849 order,
850 0,
851 snapshot.seq as u64,
852 ts_event,
853 ts_init,
854 ));
855 }
856
857 for ask in &snapshot.asks {
858 let size = Quantity::new(ask.qty, size_precision);
859 if size.is_zero() {
860 continue;
861 }
862 let order = BookOrder::new(
863 OrderSide::Sell,
864 Price::new(ask.price, price_precision),
865 size,
866 0,
867 );
868 deltas.push(OrderBookDelta::new(
869 instrument_id,
870 BookAction::Add,
871 order,
872 0,
873 snapshot.seq as u64,
874 ts_event,
875 ts_init,
876 ));
877 }
878
879 let book_deltas = OrderBookDeltas::new(instrument_id, deltas);
880 self.pending_messages
881 .push_back(KrakenFuturesWsMessage::BookDeltas(book_deltas));
882 }
883 }
884
885 fn handle_book_delta_value(&mut self, value: Value, ts_init: UnixNanos) {
886 let delta = match serde_json::from_value::<KrakenFuturesBookDelta>(value) {
887 Ok(d) => d,
888 Err(e) => {
889 log::warn!("Failed to parse book delta: {e}");
890 return;
891 }
892 };
893
894 let has_book = self.is_subscribed(KrakenFuturesChannel::Book, &delta.product_id);
895 let has_quotes = self.is_subscribed(KrakenFuturesChannel::Quotes, &delta.product_id);
896
897 if !has_book && !has_quotes {
898 log::warn!(
899 "Received book delta for unsubscribed product: {}",
900 delta.product_id
901 );
902 return;
903 }
904
905 let Some(instrument) = self.get_instrument(&delta.product_id) else {
906 log::warn!(
907 "No instrument found for book delta product: {}",
908 delta.product_id
909 );
910 return;
911 };
912
913 let ts_event = UnixNanos::from((delta.timestamp as u64) * 1_000_000);
914 let instrument_id = instrument.id();
915 let price_precision = instrument.price_precision();
916 let size_precision = instrument.size_precision();
917
918 let side: OrderSide = delta.side.into();
919
920 if has_quotes && delta.qty > 0.0 {
921 let price = Price::new(delta.price, price_precision);
922 let size = Quantity::new(delta.qty, size_precision);
923
924 let (bid_price, ask_price, bid_size, ask_size) = match side {
925 OrderSide::Buy => (Some(price), None, Some(size), None),
926 OrderSide::Sell => (None, Some(price), None, Some(size)),
927 _ => (None, None, None, None),
928 };
929
930 if let Ok(quote) = self.quote_cache.process(
931 instrument_id,
932 bid_price,
933 ask_price,
934 bid_size,
935 ask_size,
936 ts_event,
937 ts_init,
938 ) {
939 self.pending_messages
940 .push_back(KrakenFuturesWsMessage::Quote(quote));
941 }
942 }
943
944 if has_book {
945 let size = Quantity::new(delta.qty, size_precision);
946 let action = if size.is_zero() {
947 BookAction::Delete
948 } else {
949 BookAction::Update
950 };
951
952 let order = BookOrder::new(side, Price::new(delta.price, price_precision), size, 0);
953
954 let book_delta = OrderBookDelta::new(
955 instrument_id,
956 action,
957 order,
958 0,
959 delta.seq as u64,
960 ts_event,
961 ts_init,
962 );
963
964 let book_deltas = OrderBookDeltas::new(instrument_id, vec![book_delta]);
965 self.pending_messages
966 .push_back(KrakenFuturesWsMessage::BookDeltas(book_deltas));
967 }
968 }
969
970 fn handle_open_orders_delta_value(&mut self, value: Value, ts_init: UnixNanos) {
971 let delta = match serde_json::from_value::<KrakenFuturesOpenOrdersDelta>(value) {
972 Ok(d) => d,
973 Err(e) => {
974 log::error!("Failed to parse open_orders delta: {e}");
975 return;
976 }
977 };
978
979 log::debug!(
980 "Received open_orders delta: order_id={}, is_cancel={}, reason={:?}",
981 delta.order.order_id,
982 delta.is_cancel,
983 delta.reason
984 );
985
986 if let Some(event) = self.parse_order_event(
987 &delta.order,
988 ts_init,
989 delta.is_cancel,
990 delta.reason.as_deref(),
991 ) {
992 self.emit_order_event(event);
993 }
994 }
995
996 fn handle_open_orders_cancel_value(&mut self, value: Value, ts_init: UnixNanos) {
997 if let Some(reason) = value.get("reason").and_then(|r| r.as_str())
1000 && (reason == "full_fill" || reason == "partial_fill")
1001 {
1002 log::debug!(
1003 "Skipping open_orders cancel for fill (handled by fills feed): reason={reason}"
1004 );
1005 return;
1006 }
1007
1008 let cancel = match serde_json::from_value::<KrakenFuturesOpenOrdersCancel>(value) {
1009 Ok(c) => c,
1010 Err(e) => {
1011 log::error!("Failed to parse open_orders cancel: {e}");
1012 return;
1013 }
1014 };
1015
1016 log::debug!(
1017 "Received open_orders cancel: order_id={}, cli_ord_id={:?}, reason={:?}",
1018 cancel.order_id,
1019 cancel.cli_ord_id,
1020 cancel.reason
1021 );
1022
1023 let Some(account_id) = self.account_id else {
1024 log::warn!("Cannot process cancel: account_id not set");
1025 return;
1026 };
1027
1028 let venue_order_id_key = VenueOrderId::new(&cancel.order_id);
1029
1030 let (client_order_id, info) = if let Some(cli_ord_id) =
1031 cancel.cli_ord_id.as_ref().filter(|id| !id.is_empty())
1032 {
1033 let client_order_id_key = ClientOrderId::new(cli_ord_id);
1034 if let Some(info) = self.client_order_cache.get(&client_order_id_key) {
1035 (client_order_id_key, info.clone())
1036 } else if let Some(mapped_cli_ord_id) = self.venue_order_cache.get(&venue_order_id_key)
1037 {
1038 if let Some(info) = self.client_order_cache.get(mapped_cli_ord_id) {
1039 (*mapped_cli_ord_id, info.clone())
1040 } else {
1041 log::debug!(
1042 "Cancel received for unknown order (not in cache): \
1043 order_id={}, cli_ord_id={cli_ord_id}",
1044 cancel.order_id
1045 );
1046 return;
1047 }
1048 } else {
1049 log::debug!(
1050 "Cancel received for unknown order (not in cache): \
1051 order_id={}, cli_ord_id={cli_ord_id}",
1052 cancel.order_id
1053 );
1054 return;
1055 }
1056 } else if let Some(mapped_cli_ord_id) = self.venue_order_cache.get(&venue_order_id_key) {
1057 if let Some(info) = self.client_order_cache.get(mapped_cli_ord_id) {
1058 (*mapped_cli_ord_id, info.clone())
1059 } else {
1060 log::debug!(
1061 "Cancel received but mapped order not in cache: order_id={}",
1062 cancel.order_id
1063 );
1064 return;
1065 }
1066 } else {
1067 log::debug!(
1068 "Cancel received without cli_ord_id and no venue mapping (external order): \
1069 order_id={}",
1070 cancel.order_id
1071 );
1072 return;
1073 };
1074
1075 let venue_order_id = VenueOrderId::new(&cancel.order_id);
1076
1077 let canceled = OrderCanceled::new(
1078 info.trader_id,
1079 info.strategy_id,
1080 info.instrument_id,
1081 client_order_id,
1082 UUID4::new(),
1083 ts_init,
1084 ts_init,
1085 false,
1086 Some(venue_order_id),
1087 Some(account_id),
1088 );
1089
1090 self.pending_messages
1091 .push_back(KrakenFuturesWsMessage::OrderCanceled(canceled));
1092 }
1093
1094 fn handle_fills_delta_value(&mut self, value: Value, ts_init: UnixNanos) {
1095 let delta = match serde_json::from_value::<KrakenFuturesFillsDelta>(value) {
1096 Ok(d) => d,
1097 Err(e) => {
1098 log::error!("Failed to parse fills delta: {e}");
1099 return;
1100 }
1101 };
1102
1103 log::debug!("Received fills delta: fill_count={}", delta.fills.len());
1104
1105 for fill in &delta.fills {
1106 log::debug!(
1107 "Processing fill: fill_id={}, order_id={}",
1108 fill.fill_id,
1109 fill.order_id
1110 );
1111
1112 if let Some(report) = self.parse_fill_to_report(fill, ts_init) {
1113 self.pending_messages
1114 .push_back(KrakenFuturesWsMessage::FillReport(Box::new(report)));
1115 }
1116 }
1117 }
1118
1119 fn parse_order_event(
1126 &self,
1127 order: &KrakenFuturesOpenOrder,
1128 ts_init: UnixNanos,
1129 is_cancel: bool,
1130 cancel_reason: Option<&str>,
1131 ) -> Option<ParsedOrderEvent> {
1132 let Some(account_id) = self.account_id else {
1133 log::warn!("Cannot process order: account_id not set");
1134 return None;
1135 };
1136
1137 let instrument = self.instruments_cache.get(&order.instrument)?;
1138
1139 let instrument_id = instrument.id();
1140
1141 if order.qty <= 0.0 {
1142 log::warn!(
1143 "Skipping order with invalid quantity: order_id={}, qty={}",
1144 order.order_id,
1145 order.qty
1146 );
1147 return None;
1148 }
1149
1150 let ts_event = UnixNanos::from((order.last_update_time as u64) * 1_000_000);
1151 let venue_order_id = VenueOrderId::new(&order.order_id);
1152
1153 let client_order_id = order
1154 .cli_ord_id
1155 .as_ref()
1156 .filter(|s| !s.is_empty())
1157 .map(|s| ClientOrderId::new(s.as_str()));
1158
1159 let cached_info = order
1160 .cli_ord_id
1161 .as_ref()
1162 .filter(|id| !id.is_empty())
1163 .and_then(|id| self.client_order_cache.get(&ClientOrderId::new(id)));
1164
1165 let Some(info) = cached_info else {
1167 return self
1168 .parse_order_to_status_report(order, ts_init, is_cancel)
1169 .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r)));
1170 };
1171
1172 let client_order_id = client_order_id.expect("client_order_id should exist if cached");
1173
1174 let status = if order.filled >= order.qty && order.qty > 0.0 {
1177 OrderStatus::Filled
1178 } else if order.filled > 0.0 {
1179 OrderStatus::PartiallyFilled
1180 } else if is_cancel {
1181 OrderStatus::Canceled
1182 } else {
1183 OrderStatus::Accepted
1184 };
1185
1186 match status {
1187 OrderStatus::Accepted => Some(ParsedOrderEvent::Accepted(OrderAccepted::new(
1188 info.trader_id,
1189 info.strategy_id,
1190 instrument_id,
1191 client_order_id,
1192 venue_order_id,
1193 account_id,
1194 UUID4::new(),
1195 ts_event,
1196 ts_init,
1197 false,
1198 ))),
1199 OrderStatus::Canceled => {
1200 let is_expired = cancel_reason.is_some_and(|r| {
1202 let r_lower = r.to_lowercase();
1203 r_lower.contains("expir")
1204 || r_lower.contains("gtd")
1205 || r_lower.contains("timeout")
1206 });
1207
1208 if is_expired {
1209 Some(ParsedOrderEvent::Expired(OrderExpired::new(
1210 info.trader_id,
1211 info.strategy_id,
1212 instrument_id,
1213 client_order_id,
1214 UUID4::new(),
1215 ts_event,
1216 ts_init,
1217 false,
1218 Some(venue_order_id),
1219 Some(account_id),
1220 )))
1221 } else {
1222 Some(ParsedOrderEvent::Canceled(OrderCanceled::new(
1223 info.trader_id,
1224 info.strategy_id,
1225 instrument_id,
1226 client_order_id,
1227 UUID4::new(),
1228 ts_event,
1229 ts_init,
1230 false,
1231 Some(venue_order_id),
1232 Some(account_id),
1233 )))
1234 }
1235 }
1236
1237 OrderStatus::PartiallyFilled | OrderStatus::Filled => self
1239 .parse_order_to_status_report(order, ts_init, is_cancel)
1240 .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r))),
1241 _ => self
1242 .parse_order_to_status_report(order, ts_init, is_cancel)
1243 .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r))),
1244 }
1245 }
1246
1247 fn parse_order_to_status_report(
1251 &self,
1252 order: &KrakenFuturesOpenOrder,
1253 ts_init: UnixNanos,
1254 is_cancel: bool,
1255 ) -> Option<OrderStatusReport> {
1256 let Some(account_id) = self.account_id else {
1257 log::warn!("Cannot process order: account_id not set");
1258 return None;
1259 };
1260
1261 let instrument = self.instruments_cache.get(&order.instrument)?;
1262
1263 let instrument_id = instrument.id();
1264 let size_precision = instrument.size_precision();
1265
1266 let side = if order.direction == 0 {
1267 OrderSide::Buy
1268 } else {
1269 OrderSide::Sell
1270 };
1271
1272 let order_type = match order.order_type.as_str() {
1273 "limit" | "lmt" => OrderType::Limit,
1274 "stop" | "stp" => OrderType::StopLimit,
1275 "take_profit" => OrderType::LimitIfTouched,
1276 "market" | "mkt" => OrderType::Market,
1277 _ => OrderType::Limit,
1278 };
1279
1280 if order.qty <= 0.0 {
1281 log::warn!(
1282 "Skipping order with invalid quantity: order_id={}, qty={}",
1283 order.order_id,
1284 order.qty
1285 );
1286 return None;
1287 }
1288
1289 let status = if order.filled >= order.qty {
1291 OrderStatus::Filled
1292 } else if order.filled > 0.0 {
1293 OrderStatus::PartiallyFilled
1294 } else if is_cancel {
1295 OrderStatus::Canceled
1296 } else {
1297 OrderStatus::Accepted
1298 };
1299
1300 let ts_event = UnixNanos::from((order.last_update_time as u64) * 1_000_000);
1301
1302 let client_order_id = order
1303 .cli_ord_id
1304 .as_ref()
1305 .filter(|s| !s.is_empty())
1306 .map(|s| ClientOrderId::new(s.as_str()));
1307
1308 let filled_qty = if order.filled <= 0.0 {
1309 Quantity::zero(size_precision)
1310 } else {
1311 Quantity::new(order.filled, size_precision)
1312 };
1313
1314 Some(OrderStatusReport::new(
1315 account_id,
1316 instrument_id,
1317 client_order_id,
1318 VenueOrderId::new(&order.order_id),
1319 side,
1320 order_type,
1321 TimeInForce::Gtc,
1322 status,
1323 Quantity::new(order.qty, size_precision),
1324 filled_qty,
1325 ts_event, ts_event, ts_init,
1328 Some(UUID4::new()),
1329 ))
1330 }
1331
1332 fn parse_fill_to_report(
1333 &self,
1334 fill: &KrakenFuturesFill,
1335 ts_init: UnixNanos,
1336 ) -> Option<FillReport> {
1337 let Some(account_id) = self.account_id else {
1338 log::warn!("Cannot process fill: account_id not set");
1339 return None;
1340 };
1341
1342 let instrument = if let Some(ref symbol) = fill.instrument {
1344 self.instruments_cache.get(symbol).cloned()
1345 } else if let Some(ref cli_ord_id) = fill.cli_ord_id.as_ref().filter(|id| !id.is_empty()) {
1346 self.client_order_cache
1348 .get(&ClientOrderId::new(cli_ord_id))
1349 .and_then(|info| {
1350 self.instruments_cache
1351 .iter()
1352 .find(|(_, inst)| inst.id() == info.instrument_id)
1353 .map(|(_, inst)| inst.clone())
1354 })
1355 } else {
1356 None
1357 };
1358
1359 let Some(instrument) = instrument else {
1360 log::warn!(
1361 "Cannot resolve instrument for fill: fill_id={}, order_id={}, cli_ord_id={:?}",
1362 fill.fill_id,
1363 fill.order_id,
1364 fill.cli_ord_id
1365 );
1366 return None;
1367 };
1368
1369 let instrument_id = instrument.id();
1370 let price_precision = instrument.price_precision();
1371 let size_precision = instrument.size_precision();
1372
1373 if fill.qty <= 0.0 {
1374 log::warn!(
1375 "Skipping fill with invalid quantity: fill_id={}, qty={}",
1376 fill.fill_id,
1377 fill.qty
1378 );
1379 return None;
1380 }
1381
1382 let side = if fill.buy {
1383 OrderSide::Buy
1384 } else {
1385 OrderSide::Sell
1386 };
1387
1388 let ts_event = UnixNanos::from((fill.time as u64) * 1_000_000);
1389
1390 let client_order_id = fill
1391 .cli_ord_id
1392 .as_ref()
1393 .filter(|s| !s.is_empty())
1394 .map(|s| ClientOrderId::new(s.as_str()));
1395
1396 let commission = Money::new(fill.fee_paid.unwrap_or(0.0), instrument.quote_currency());
1397
1398 Some(FillReport::new(
1399 account_id,
1400 instrument_id,
1401 VenueOrderId::new(&fill.order_id),
1402 TradeId::new(&fill.fill_id),
1403 side,
1404 Quantity::new(fill.qty, size_precision),
1405 Price::new(fill.price, price_precision),
1406 commission,
1407 LiquiditySide::NoLiquiditySide, client_order_id,
1409 None, ts_event,
1411 ts_init,
1412 Some(UUID4::new()),
1413 ))
1414 }
1415}
1416
1417#[cfg(test)]
1418mod tests {
1419 use nautilus_model::{
1420 instruments::{CryptoFuture, InstrumentAny},
1421 types::Currency,
1422 };
1423 use rstest::rstest;
1424
1425 use super::*;
1426
1427 fn create_test_handler() -> FuturesFeedHandler {
1428 let signal = Arc::new(AtomicBool::new(false));
1429 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
1430 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
1431 let subscriptions = SubscriptionState::new(':');
1432
1433 FuturesFeedHandler::new(signal, cmd_rx, raw_rx, subscriptions)
1434 }
1435
1436 fn create_test_instrument() -> InstrumentAny {
1437 InstrumentAny::CryptoFuture(CryptoFuture::new(
1438 InstrumentId::from("PI_XBTUSD.KRAKEN"),
1439 Symbol::from("PI_XBTUSD"),
1440 Currency::BTC(),
1441 Currency::USD(),
1442 Currency::USD(),
1443 false,
1444 UnixNanos::default(),
1445 UnixNanos::default(),
1446 1, 0, Price::from("0.5"),
1449 Quantity::from(1),
1450 None,
1451 None,
1452 None,
1453 None,
1454 None,
1455 None,
1456 None,
1457 None,
1458 None,
1459 None,
1460 None,
1461 None,
1462 UnixNanos::default(),
1463 UnixNanos::default(),
1464 ))
1465 }
1466
1467 #[rstest]
1468 fn test_book_snapshot_filters_zero_quantity_bids() {
1469 let mut handler = create_test_handler();
1470 let instrument = create_test_instrument();
1471 handler
1472 .instruments_cache
1473 .insert(Ustr::from("PI_XBTUSD"), instrument);
1474
1475 handler.subscriptions.mark_subscribe("book:PI_XBTUSD");
1476 handler.subscriptions.confirm_subscribe("book:PI_XBTUSD");
1477
1478 let json = include_str!("../../../test_data/ws_futures_book_snapshot_with_zero_qty.json");
1479 let ts_init = UnixNanos::from(1_000_000_000);
1480
1481 handler.parse_message(json, ts_init);
1482
1483 assert_eq!(handler.pending_messages.len(), 1);
1484
1485 let msg = handler.pending_messages.pop_front().unwrap();
1486 let KrakenFuturesWsMessage::BookDeltas(deltas) = msg else {
1487 panic!("Expected BookDeltas message");
1488 };
1489
1490 assert_eq!(deltas.deltas.len(), 4);
1492 assert_eq!(deltas.deltas[0].action, BookAction::Clear);
1493
1494 for delta in &deltas.deltas[1..] {
1495 assert!(
1496 !delta.order.size.is_zero(),
1497 "Found zero-quantity delta that should have been filtered: {delta:?}"
1498 );
1499 }
1500 }
1501
1502 #[rstest]
1503 fn test_book_snapshot_filters_zero_quantity_asks() {
1504 let mut handler = create_test_handler();
1505 let instrument = create_test_instrument();
1506 handler
1507 .instruments_cache
1508 .insert(Ustr::from("PI_XBTUSD"), instrument);
1509
1510 handler.subscriptions.mark_subscribe("book:PI_XBTUSD");
1511 handler.subscriptions.confirm_subscribe("book:PI_XBTUSD");
1512
1513 let json = include_str!("../../../test_data/ws_futures_book_snapshot_with_zero_qty.json");
1514 let ts_init = UnixNanos::from(1_000_000_000);
1515
1516 handler.parse_message(json, ts_init);
1517
1518 let msg = handler.pending_messages.pop_front().unwrap();
1519 let KrakenFuturesWsMessage::BookDeltas(deltas) = msg else {
1520 panic!("Expected BookDeltas message");
1521 };
1522
1523 let sell_deltas: Vec<_> = deltas
1525 .deltas
1526 .iter()
1527 .filter(|d| d.order.side == OrderSide::Sell)
1528 .collect();
1529
1530 assert_eq!(sell_deltas.len(), 1);
1531 assert_eq!(sell_deltas[0].order.price.as_f64(), 34912.0);
1532 }
1533
1534 #[rstest]
1535 fn test_trade_filters_zero_quantity() {
1536 let mut handler = create_test_handler();
1537 let instrument = create_test_instrument();
1538 handler
1539 .instruments_cache
1540 .insert(Ustr::from("PI_XBTUSD"), instrument);
1541
1542 handler.subscriptions.mark_subscribe("trades:PI_XBTUSD");
1543 handler.subscriptions.confirm_subscribe("trades:PI_XBTUSD");
1544
1545 let json = r#"{
1546 "feed": "trade",
1547 "product_id": "PI_XBTUSD",
1548 "time": 1612269825817,
1549 "side": "buy",
1550 "qty": 0.0,
1551 "price": 34900.0,
1552 "seq": 12345
1553 }"#;
1554 let ts_init = UnixNanos::from(1_000_000_000);
1555
1556 handler.parse_message(json, ts_init);
1557
1558 assert!(
1559 handler.pending_messages.is_empty(),
1560 "Zero quantity trade should be filtered out"
1561 );
1562 }
1563}