1use std::{
19 collections::VecDeque,
20 fmt::Debug,
21 sync::{
22 Arc,
23 atomic::{AtomicBool, Ordering},
24 },
25};
26
27use ahash::AHashMap;
28use nautilus_common::cache::quote::QuoteCache;
29use nautilus_core::{AtomicTime, UUID4, UnixNanos, time::get_atomic_clock_realtime};
30use nautilus_model::{
31 data::{
32 BookOrder, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, TradeTick,
33 },
34 enums::{
35 AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType, TimeInForce,
36 },
37 events::{OrderAccepted, OrderCanceled, OrderExpired, OrderUpdated},
38 identifiers::{
39 AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TradeId, TraderId, VenueOrderId,
40 },
41 instruments::{Instrument, InstrumentAny},
42 reports::{FillReport, OrderStatusReport},
43 types::{Money, Price, Quantity},
44};
45use nautilus_network::{
46 RECONNECTED,
47 websocket::{SubscriptionState, WebSocketClient},
48};
49use serde::Deserialize;
50use serde_json::Value;
51use tokio_tungstenite::tungstenite::Message;
52use ustr::Ustr;
53
54use super::messages::{
55 KrakenFuturesBookDelta, KrakenFuturesBookSnapshot, KrakenFuturesChallengeRequest,
56 KrakenFuturesChannel, KrakenFuturesEvent, KrakenFuturesFeed, KrakenFuturesFill,
57 KrakenFuturesFillsDelta, KrakenFuturesMessageType, KrakenFuturesOpenOrder,
58 KrakenFuturesOpenOrdersCancel, KrakenFuturesOpenOrdersDelta,
59 KrakenFuturesPrivateSubscribeRequest, KrakenFuturesTickerData, KrakenFuturesTradeData,
60 KrakenFuturesTradeSnapshot, KrakenFuturesWsMessage, classify_futures_message,
61};
62use crate::common::enums::KrakenOrderSide;
63
64#[derive(Debug, Clone)]
66pub enum ParsedOrderEvent {
67 Accepted(OrderAccepted),
68 Canceled(OrderCanceled),
69 Expired(OrderExpired),
70 Updated(OrderUpdated),
71 StatusOnly(Box<OrderStatusReport>),
72}
73
74#[derive(Debug, Clone)]
76struct CachedOrderInfo {
77 instrument_id: InstrumentId,
78 trader_id: TraderId,
79 strategy_id: StrategyId,
80}
81
82#[allow(
84 clippy::large_enum_variant,
85 reason = "Commands are ephemeral and immediately consumed"
86)]
87pub enum HandlerCommand {
88 SetClient(WebSocketClient),
89 SubscribeTicker(Symbol),
90 UnsubscribeTicker(Symbol),
91 SubscribeTrade(Symbol),
92 UnsubscribeTrade(Symbol),
93 SubscribeBook(Symbol),
94 UnsubscribeBook(Symbol),
95 Disconnect,
96 InitializeInstruments(Vec<InstrumentAny>),
97 UpdateInstrument(InstrumentAny),
98 SetAccountId(AccountId),
99 RequestChallenge {
100 api_key: String,
101 response_tx: tokio::sync::oneshot::Sender<String>,
102 },
103 SetAuthCredentials {
104 api_key: String,
105 original_challenge: String,
106 signed_challenge: String,
107 },
108 SubscribeOpenOrders,
109 SubscribeFills,
110 CacheClientOrder {
111 client_order_id: ClientOrderId,
112 venue_order_id: Option<VenueOrderId>,
113 instrument_id: InstrumentId,
114 trader_id: TraderId,
115 strategy_id: StrategyId,
116 },
117}
118
119impl Debug for HandlerCommand {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 match self {
122 Self::SetClient(_) => f.debug_struct("SetClient").finish(),
123 Self::SubscribeTicker(s) => f.debug_tuple("SubscribeTicker").field(s).finish(),
124 Self::UnsubscribeTicker(s) => f.debug_tuple("UnsubscribeTicker").field(s).finish(),
125 Self::SubscribeTrade(s) => f.debug_tuple("SubscribeTrade").field(s).finish(),
126 Self::UnsubscribeTrade(s) => f.debug_tuple("UnsubscribeTrade").field(s).finish(),
127 Self::SubscribeBook(s) => f.debug_tuple("SubscribeBook").field(s).finish(),
128 Self::UnsubscribeBook(s) => f.debug_tuple("UnsubscribeBook").field(s).finish(),
129 Self::Disconnect => write!(f, "Disconnect"),
130 Self::InitializeInstruments(v) => f
131 .debug_tuple("InitializeInstruments")
132 .field(&v.len())
133 .finish(),
134 Self::UpdateInstrument(i) => f.debug_tuple("UpdateInstrument").field(&i.id()).finish(),
135 Self::SetAccountId(id) => f.debug_tuple("SetAccountId").field(id).finish(),
136 Self::RequestChallenge { api_key, .. } => {
137 let masked = &api_key[..4.min(api_key.len())];
138 f.debug_struct("RequestChallenge")
139 .field("api_key", &format!("{masked}..."))
140 .finish()
141 }
142 Self::SetAuthCredentials { api_key, .. } => {
143 let masked = &api_key[..4.min(api_key.len())];
144 f.debug_struct("SetAuthCredentials")
145 .field("api_key", &format!("{masked}..."))
146 .finish()
147 }
148 Self::SubscribeOpenOrders => write!(f, "SubscribeOpenOrders"),
149 Self::SubscribeFills => write!(f, "SubscribeFills"),
150 Self::CacheClientOrder {
151 client_order_id,
152 instrument_id,
153 ..
154 } => f
155 .debug_struct("CacheClientOrder")
156 .field("client_order_id", client_order_id)
157 .field("instrument_id", instrument_id)
158 .finish(),
159 }
160 }
161}
162
163pub struct FuturesFeedHandler {
165 clock: &'static AtomicTime,
166 signal: Arc<AtomicBool>,
167 client: Option<WebSocketClient>,
168 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
169 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
170 subscriptions: SubscriptionState,
171 instruments_cache: AHashMap<Ustr, InstrumentAny>,
172 quote_cache: QuoteCache,
173 pending_messages: VecDeque<KrakenFuturesWsMessage>,
174 account_id: Option<AccountId>,
175 api_key: Option<String>,
176 original_challenge: Option<String>,
177 signed_challenge: Option<String>,
178 client_order_cache: AHashMap<String, CachedOrderInfo>,
179 venue_order_cache: AHashMap<String, String>,
180 pending_challenge_tx: Option<tokio::sync::oneshot::Sender<String>>,
181}
182
183impl FuturesFeedHandler {
184 pub fn new(
186 signal: Arc<AtomicBool>,
187 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
188 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
189 subscriptions: SubscriptionState,
190 ) -> Self {
191 Self {
192 clock: get_atomic_clock_realtime(),
193 signal,
194 client: None,
195 cmd_rx,
196 raw_rx,
197 subscriptions,
198 instruments_cache: AHashMap::new(),
199 quote_cache: QuoteCache::new(),
200 pending_messages: VecDeque::new(),
201 account_id: None,
202 api_key: None,
203 original_challenge: None,
204 signed_challenge: None,
205 client_order_cache: AHashMap::new(),
206 venue_order_cache: AHashMap::new(),
207 pending_challenge_tx: None,
208 }
209 }
210
211 pub fn is_stopped(&self) -> bool {
212 self.signal.load(Ordering::Relaxed)
213 }
214
215 fn is_subscribed(&self, channel: KrakenFuturesChannel, symbol: &Ustr) -> bool {
216 let channel_ustr = Ustr::from(channel.as_ref());
217 self.subscriptions.is_subscribed(&channel_ustr, symbol)
218 }
219
220 fn get_instrument(&self, symbol: &Ustr) -> Option<&InstrumentAny> {
221 self.instruments_cache.get(symbol)
222 }
223
224 pub async fn next(&mut self) -> Option<KrakenFuturesWsMessage> {
226 if let Some(msg) = self.pending_messages.pop_front() {
228 return Some(msg);
229 }
230
231 loop {
232 tokio::select! {
233 Some(cmd) = self.cmd_rx.recv() => {
234 match cmd {
235 HandlerCommand::SetClient(client) => {
236 tracing::debug!("WebSocketClient received by futures handler");
237 self.client = Some(client);
238 }
239 HandlerCommand::SubscribeTicker(symbol) => {
240 self.send_subscribe(KrakenFuturesFeed::Ticker, &symbol).await;
241 }
242 HandlerCommand::UnsubscribeTicker(symbol) => {
243 self.send_unsubscribe(KrakenFuturesFeed::Ticker, &symbol).await;
244 }
245 HandlerCommand::SubscribeTrade(symbol) => {
246 self.send_subscribe(KrakenFuturesFeed::Trade, &symbol).await;
247 }
248 HandlerCommand::UnsubscribeTrade(symbol) => {
249 self.send_unsubscribe(KrakenFuturesFeed::Trade, &symbol).await;
250 }
251 HandlerCommand::SubscribeBook(symbol) => {
252 self.send_subscribe(KrakenFuturesFeed::Book, &symbol).await;
253 }
254 HandlerCommand::UnsubscribeBook(symbol) => {
255 self.send_unsubscribe(KrakenFuturesFeed::Book, &symbol).await;
256 }
257 HandlerCommand::Disconnect => {
258 tracing::debug!("Disconnect command received");
259 if let Some(client) = self.client.take() {
260 client.disconnect().await;
261 }
262 return None;
263 }
264 HandlerCommand::InitializeInstruments(instruments) => {
265 for inst in instruments {
266 self.instruments_cache.insert(inst.raw_symbol().inner(), inst);
267 }
268 let count = self.instruments_cache.len();
269 tracing::debug!("Initialized {count} instruments in futures handler cache");
270 }
271 HandlerCommand::UpdateInstrument(inst) => {
272 self.instruments_cache.insert(inst.raw_symbol().inner(), inst);
273 }
274 HandlerCommand::SetAccountId(account_id) => {
275 tracing::debug!("Setting account_id for futures handler: {account_id}");
276 self.account_id = Some(account_id);
277 }
278 HandlerCommand::RequestChallenge { api_key, response_tx } => {
279 tracing::debug!("Requesting challenge for authentication");
280 self.pending_challenge_tx = Some(response_tx);
281 self.send_challenge_request(&api_key).await;
282 }
283 HandlerCommand::SetAuthCredentials { api_key, original_challenge, signed_challenge } => {
284 tracing::debug!("Setting auth credentials for futures handler");
285 self.api_key = Some(api_key);
286 self.original_challenge = Some(original_challenge);
287 self.signed_challenge = Some(signed_challenge);
288 }
289 HandlerCommand::SubscribeOpenOrders => {
290 self.send_private_subscribe(KrakenFuturesFeed::OpenOrders).await;
291 }
292 HandlerCommand::SubscribeFills => {
293 self.send_private_subscribe(KrakenFuturesFeed::Fills).await;
294 }
295 HandlerCommand::CacheClientOrder {
296 client_order_id,
297 venue_order_id,
298 instrument_id,
299 trader_id,
300 strategy_id,
301 } => {
302 let client_order_id_str = client_order_id.to_string();
303 self.client_order_cache.insert(
304 client_order_id_str.clone(),
305 CachedOrderInfo {
306 instrument_id,
307 trader_id,
308 strategy_id,
309 },
310 );
311 if let Some(venue_id) = venue_order_id {
312 self.venue_order_cache
313 .insert(venue_id.to_string(), client_order_id_str);
314 }
315 }
316 }
317 continue;
318 }
319
320 msg = self.raw_rx.recv() => {
321 let msg = match msg {
322 Some(msg) => msg,
323 None => {
324 tracing::debug!("WebSocket stream closed");
325 return None;
326 }
327 };
328
329 if self.signal.load(Ordering::Relaxed) {
330 tracing::debug!("Stop signal received");
331 return None;
332 }
333
334 match &msg {
336 Message::Ping(data) => {
337 let len = data.len();
338 tracing::trace!("Received ping frame with {len} bytes");
339 if let Some(client) = &self.client
340 && let Err(e) = client.send_pong(data.to_vec()).await
341 {
342 tracing::warn!(error = %e, "Failed to send pong frame");
343 }
344 continue;
345 }
346 Message::Pong(_) => {
347 tracing::trace!("Received pong");
348 continue;
349 }
350 Message::Close(_) => {
351 tracing::info!("WebSocket connection closed");
352 return None;
353 }
354 Message::Frame(_) => {
355 tracing::trace!("Received raw frame");
356 continue;
357 }
358 _ => {}
359 }
360
361 let text: &str = match &msg {
363 Message::Text(text) => text,
364 Message::Binary(data) => match std::str::from_utf8(data) {
365 Ok(s) => s,
366 Err(_) => continue,
367 },
368 _ => continue,
369 };
370
371 if text == RECONNECTED {
372 tracing::info!("Received WebSocket reconnected signal");
373 self.quote_cache.clear();
374 return Some(KrakenFuturesWsMessage::Reconnected);
375 }
376
377 let ts_init = self.clock.get_time_ns();
378 self.parse_message(text, ts_init);
379
380 if let Some(msg) = self.pending_messages.pop_front() {
382 return Some(msg);
383 }
384
385 continue;
386 }
387 }
388 }
389 }
390
391 async fn send_subscribe(&self, feed: KrakenFuturesFeed, symbol: &Symbol) {
392 if let Some(ref client) = self.client {
393 let feed_str = serde_json::to_string(&feed).unwrap_or_default();
394 let feed_str = feed_str.trim_matches('"');
395 let msg = format!(
396 r#"{{"event":"subscribe","feed":"{feed_str}","product_ids":["{symbol}"]}}"#
397 );
398 if let Err(e) = client.send_text(msg, None).await {
399 tracing::error!("Failed to send {feed:?} subscribe: {e}");
400 }
401 }
402 }
403
404 async fn send_unsubscribe(&self, feed: KrakenFuturesFeed, symbol: &Symbol) {
405 if let Some(ref client) = self.client {
406 let feed_str = serde_json::to_string(&feed).unwrap_or_default();
407 let feed_str = feed_str.trim_matches('"');
408 let msg = format!(
409 r#"{{"event":"unsubscribe","feed":"{feed_str}","product_ids":["{symbol}"]}}"#
410 );
411 if let Err(e) = client.send_text(msg, None).await {
412 tracing::error!("Failed to send {feed:?} unsubscribe: {e}");
413 }
414 }
415 }
416
417 async fn send_private_subscribe(&self, feed: KrakenFuturesFeed) {
418 let Some(ref client) = self.client else {
419 tracing::error!("Cannot subscribe to {feed:?}: no WebSocket client");
420 return;
421 };
422
423 let Some(ref api_key) = self.api_key else {
424 tracing::error!("Cannot subscribe to {feed:?}: no API key set");
425 return;
426 };
427
428 let Some(ref original_challenge) = self.original_challenge else {
429 tracing::error!("Cannot subscribe to {feed:?}: no challenge set");
430 return;
431 };
432
433 let Some(ref signed_challenge) = self.signed_challenge else {
434 tracing::error!("Cannot subscribe to {feed:?}: no signed challenge set");
435 return;
436 };
437
438 let request = KrakenFuturesPrivateSubscribeRequest {
439 event: KrakenFuturesEvent::Subscribe,
440 feed,
441 api_key: api_key.clone(),
442 original_challenge: original_challenge.clone(),
443 signed_challenge: signed_challenge.clone(),
444 };
445
446 let msg = match serde_json::to_string(&request) {
447 Ok(m) => m,
448 Err(e) => {
449 tracing::error!("Failed to serialize {feed:?} subscribe request: {e}");
450 return;
451 }
452 };
453
454 if let Err(e) = client.send_text(msg, None).await {
455 tracing::error!("Failed to send {feed:?} subscribe: {e}");
456 } else {
457 tracing::debug!("Sent private subscribe request for {feed:?}");
458 }
459 }
460
461 async fn send_challenge_request(&self, api_key: &str) {
462 let Some(ref client) = self.client else {
463 tracing::error!("Cannot request challenge: no WebSocket client");
464 return;
465 };
466
467 let request = KrakenFuturesChallengeRequest {
468 event: KrakenFuturesEvent::Challenge,
469 api_key: api_key.to_string(),
470 };
471
472 let msg = match serde_json::to_string(&request) {
473 Ok(m) => m,
474 Err(e) => {
475 tracing::error!("Failed to serialize challenge request: {e}");
476 return;
477 }
478 };
479
480 if let Err(e) = client.send_text(msg, None).await {
481 tracing::error!("Failed to send challenge request: {e}");
482 } else {
483 tracing::debug!("Sent challenge request for authentication");
484 }
485 }
486
487 fn parse_message(&mut self, text: &str, ts_init: UnixNanos) {
488 let value: Value = match serde_json::from_str(text) {
489 Ok(v) => v,
490 Err(e) => {
491 tracing::debug!("Failed to parse message as JSON: {e}");
492 return;
493 }
494 };
495
496 match classify_futures_message(&value) {
497 KrakenFuturesMessageType::OpenOrdersSnapshot => {
499 tracing::debug!(
500 "Skipping open_orders_snapshot (REST reconciliation handles initial state)"
501 );
502 }
503 KrakenFuturesMessageType::OpenOrdersCancel => {
504 self.handle_open_orders_cancel_value(value, ts_init);
505 }
506 KrakenFuturesMessageType::OpenOrdersDelta => {
507 self.handle_open_orders_delta_value(value, ts_init);
508 }
509 KrakenFuturesMessageType::FillsSnapshot => {
510 tracing::debug!(
511 "Skipping fills_snapshot (REST reconciliation handles initial state)"
512 );
513 }
514 KrakenFuturesMessageType::FillsDelta => {
515 self.handle_fills_delta_value(value, ts_init);
516 }
517 KrakenFuturesMessageType::Ticker => {
519 self.handle_ticker_message_value(value, ts_init);
520 }
521 KrakenFuturesMessageType::TradeSnapshot => {
522 self.handle_trade_snapshot_value(value, ts_init);
523 }
524 KrakenFuturesMessageType::Trade => {
525 self.handle_trade_message_value(value, ts_init);
526 }
527 KrakenFuturesMessageType::BookSnapshot => {
528 self.handle_book_snapshot_value(value, ts_init);
529 }
530 KrakenFuturesMessageType::BookDelta => {
531 self.handle_book_delta_value(value, ts_init);
532 }
533 KrakenFuturesMessageType::Info => {
535 tracing::debug!("Received info message: {text}");
536 }
537 KrakenFuturesMessageType::Pong => {
538 tracing::trace!("Received pong response");
539 }
540 KrakenFuturesMessageType::Subscribed => {
541 tracing::debug!("Subscription confirmed: {text}");
542 }
543 KrakenFuturesMessageType::Unsubscribed => {
544 tracing::debug!("Unsubscription confirmed: {text}");
545 }
546 KrakenFuturesMessageType::Challenge => {
547 self.handle_challenge_response_value(value);
548 }
549 KrakenFuturesMessageType::Heartbeat => {
550 tracing::trace!("Heartbeat received");
551 }
552 KrakenFuturesMessageType::Unknown => {
553 tracing::debug!("Unhandled message: {text}");
554 }
555 }
556 }
557
558 fn handle_challenge_response_value(&mut self, value: Value) {
559 #[derive(Deserialize)]
560 struct ChallengeResponse {
561 message: String,
562 }
563
564 match serde_json::from_value::<ChallengeResponse>(value) {
565 Ok(response) => {
566 let len = response.message.len();
567 tracing::debug!("Challenge received, length: {len}");
568
569 if let Some(tx) = self.pending_challenge_tx.take() {
570 if tx.send(response.message).is_err() {
571 tracing::warn!("Failed to send challenge response - receiver dropped");
572 }
573 } else {
574 tracing::warn!("Received challenge but no pending request");
575 }
576 }
577 Err(e) => {
578 tracing::error!("Failed to parse challenge response: {e}");
579 }
580 }
581 }
582
583 fn emit_order_event(&mut self, event: ParsedOrderEvent) {
584 match event {
585 ParsedOrderEvent::Accepted(accepted) => {
586 self.pending_messages
587 .push_back(KrakenFuturesWsMessage::OrderAccepted(accepted));
588 }
589 ParsedOrderEvent::Canceled(canceled) => {
590 self.pending_messages
591 .push_back(KrakenFuturesWsMessage::OrderCanceled(canceled));
592 }
593 ParsedOrderEvent::Expired(expired) => {
594 self.pending_messages
595 .push_back(KrakenFuturesWsMessage::OrderExpired(expired));
596 }
597 ParsedOrderEvent::Updated(updated) => {
598 self.pending_messages
599 .push_back(KrakenFuturesWsMessage::OrderUpdated(updated));
600 }
601 ParsedOrderEvent::StatusOnly(report) => {
602 self.pending_messages
603 .push_back(KrakenFuturesWsMessage::OrderStatusReport(report));
604 }
605 }
606 }
607
608 fn handle_ticker_message_value(&mut self, value: Value, ts_init: UnixNanos) {
609 let ticker = match serde_json::from_value::<KrakenFuturesTickerData>(value) {
610 Ok(t) => t,
611 Err(e) => {
612 tracing::debug!("Failed to parse ticker: {e}");
613 return;
614 }
615 };
616
617 let (instrument_id, price_precision) = {
618 let Some(instrument) = self.get_instrument(&ticker.product_id) else {
619 let product_id = &ticker.product_id;
620 tracing::debug!("Instrument not found for product_id: {product_id}");
621 return;
622 };
623 (instrument.id(), instrument.price_precision())
624 };
625
626 let ts_event = ticker
627 .time
628 .map(|t| UnixNanos::from((t as u64) * 1_000_000))
629 .unwrap_or(ts_init);
630
631 let has_mark = self.is_subscribed(KrakenFuturesChannel::Mark, &ticker.product_id);
632 let has_index = self.is_subscribed(KrakenFuturesChannel::Index, &ticker.product_id);
633
634 if let Some(mark_price) = ticker.mark_price
635 && has_mark
636 {
637 let update = MarkPriceUpdate::new(
638 instrument_id,
639 Price::new(mark_price, price_precision),
640 ts_event,
641 ts_init,
642 );
643 self.pending_messages
644 .push_back(KrakenFuturesWsMessage::MarkPrice(update));
645 }
646
647 if let Some(index_price) = ticker.index
648 && has_index
649 {
650 let update = IndexPriceUpdate::new(
651 instrument_id,
652 Price::new(index_price, price_precision),
653 ts_event,
654 ts_init,
655 );
656 self.pending_messages
657 .push_back(KrakenFuturesWsMessage::IndexPrice(update));
658 }
659 }
660
661 fn handle_trade_message_value(&mut self, value: Value, ts_init: UnixNanos) {
662 let trade = match serde_json::from_value::<KrakenFuturesTradeData>(value) {
663 Ok(t) => t,
664 Err(e) => {
665 tracing::trace!("Failed to parse trade: {e}");
666 return;
667 }
668 };
669
670 if !self.is_subscribed(KrakenFuturesChannel::Trades, &trade.product_id) {
671 return;
672 }
673
674 let (instrument_id, price_precision, size_precision) = {
675 let Some(instrument) = self.get_instrument(&trade.product_id) else {
676 return;
677 };
678 (
679 instrument.id(),
680 instrument.price_precision(),
681 instrument.size_precision(),
682 )
683 };
684
685 let size = Quantity::new(trade.qty, size_precision);
686 if size.is_zero() {
687 let product_id = trade.product_id;
688 let raw_qty = trade.qty;
689 tracing::warn!("Skipping zero quantity trade for {product_id} (raw qty: {raw_qty})");
690 return;
691 }
692
693 let ts_event = UnixNanos::from((trade.time as u64) * 1_000_000);
694 let aggressor_side = match trade.side {
695 KrakenOrderSide::Buy => AggressorSide::Buyer,
696 KrakenOrderSide::Sell => AggressorSide::Seller,
697 };
698 let trade_id = trade.uid.unwrap_or_else(|| trade.seq.to_string());
699
700 let trade_tick = TradeTick::new(
701 instrument_id,
702 Price::new(trade.price, price_precision),
703 size,
704 aggressor_side,
705 TradeId::new(&trade_id),
706 ts_event,
707 ts_init,
708 );
709
710 self.pending_messages
711 .push_back(KrakenFuturesWsMessage::Trade(trade_tick));
712 }
713
714 fn handle_trade_snapshot_value(&mut self, value: Value, ts_init: UnixNanos) {
715 let snapshot = match serde_json::from_value::<KrakenFuturesTradeSnapshot>(value) {
716 Ok(s) => s,
717 Err(e) => {
718 tracing::trace!("Failed to parse trade snapshot: {e}");
719 return;
720 }
721 };
722
723 if !self.is_subscribed(KrakenFuturesChannel::Trades, &snapshot.product_id) {
724 return;
725 }
726
727 let (instrument_id, price_precision, size_precision) = {
728 let Some(instrument) = self.get_instrument(&snapshot.product_id) else {
729 return;
730 };
731 (
732 instrument.id(),
733 instrument.price_precision(),
734 instrument.size_precision(),
735 )
736 };
737
738 for trade in snapshot.trades {
739 let size = Quantity::new(trade.qty, size_precision);
740 if size.is_zero() {
741 let product_id = snapshot.product_id;
742 let raw_qty = trade.qty;
743 tracing::warn!(
744 "Skipping zero quantity trade in snapshot for {product_id} (raw qty: {raw_qty})"
745 );
746 continue;
747 }
748
749 let ts_event = UnixNanos::from((trade.time as u64) * 1_000_000);
750 let aggressor_side = match trade.side {
751 KrakenOrderSide::Buy => AggressorSide::Buyer,
752 KrakenOrderSide::Sell => AggressorSide::Seller,
753 };
754 let trade_id = trade.uid.unwrap_or_else(|| trade.seq.to_string());
755
756 let trade_tick = TradeTick::new(
757 instrument_id,
758 Price::new(trade.price, price_precision),
759 size,
760 aggressor_side,
761 TradeId::new(&trade_id),
762 ts_event,
763 ts_init,
764 );
765
766 self.pending_messages
767 .push_back(KrakenFuturesWsMessage::Trade(trade_tick));
768 }
769 }
770
771 fn handle_book_snapshot_value(&mut self, value: Value, ts_init: UnixNanos) {
772 let snapshot = match serde_json::from_value::<KrakenFuturesBookSnapshot>(value) {
773 Ok(s) => s,
774 Err(e) => {
775 tracing::trace!("Failed to parse book snapshot: {e}");
776 return;
777 }
778 };
779
780 let has_book = self.is_subscribed(KrakenFuturesChannel::Book, &snapshot.product_id);
781 let has_quotes = self.is_subscribed(KrakenFuturesChannel::Quotes, &snapshot.product_id);
782
783 if !has_book && !has_quotes {
784 return;
785 }
786
787 let (instrument_id, price_precision, size_precision) = {
788 let Some(instrument) = self.get_instrument(&snapshot.product_id) else {
789 return;
790 };
791 (
792 instrument.id(),
793 instrument.price_precision(),
794 instrument.size_precision(),
795 )
796 };
797
798 let ts_event = UnixNanos::from((snapshot.timestamp as u64) * 1_000_000);
799
800 let best_bid = snapshot
801 .bids
802 .iter()
803 .filter(|l| l.qty > 0.0)
804 .max_by(|a, b| a.price.total_cmp(&b.price));
805 let best_ask = snapshot
806 .asks
807 .iter()
808 .filter(|l| l.qty > 0.0)
809 .min_by(|a, b| a.price.total_cmp(&b.price));
810
811 if has_quotes {
812 let bid_price = best_bid.map(|b| Price::new(b.price, price_precision));
813 let ask_price = best_ask.map(|a| Price::new(a.price, price_precision));
814 let bid_size = best_bid.map(|b| Quantity::new(b.qty, size_precision));
815 let ask_size = best_ask.map(|a| Quantity::new(a.qty, size_precision));
816
817 match self.quote_cache.process(
818 instrument_id,
819 bid_price,
820 ask_price,
821 bid_size,
822 ask_size,
823 ts_event,
824 ts_init,
825 ) {
826 Ok(quote) => {
827 self.pending_messages
828 .push_back(KrakenFuturesWsMessage::Quote(quote));
829 }
830 Err(e) => {
831 tracing::trace!("Quote cache process error: {e}");
832 }
833 }
834 }
835
836 if has_book {
837 let mut deltas = Vec::with_capacity(snapshot.bids.len() + snapshot.asks.len() + 1);
838
839 deltas.push(OrderBookDelta::clear(
840 instrument_id,
841 snapshot.seq as u64,
842 ts_event,
843 ts_init,
844 ));
845
846 for bid in &snapshot.bids {
847 let size = Quantity::new(bid.qty, size_precision);
848 if size.is_zero() {
849 continue;
850 }
851 let order = BookOrder::new(
852 OrderSide::Buy,
853 Price::new(bid.price, price_precision),
854 size,
855 0,
856 );
857 deltas.push(OrderBookDelta::new(
858 instrument_id,
859 BookAction::Add,
860 order,
861 0,
862 snapshot.seq as u64,
863 ts_event,
864 ts_init,
865 ));
866 }
867
868 for ask in &snapshot.asks {
869 let size = Quantity::new(ask.qty, size_precision);
870 if size.is_zero() {
871 continue;
872 }
873 let order = BookOrder::new(
874 OrderSide::Sell,
875 Price::new(ask.price, price_precision),
876 size,
877 0,
878 );
879 deltas.push(OrderBookDelta::new(
880 instrument_id,
881 BookAction::Add,
882 order,
883 0,
884 snapshot.seq as u64,
885 ts_event,
886 ts_init,
887 ));
888 }
889
890 let book_deltas = OrderBookDeltas::new(instrument_id, deltas);
891 self.pending_messages
892 .push_back(KrakenFuturesWsMessage::BookDeltas(book_deltas));
893 }
894 }
895
896 fn handle_book_delta_value(&mut self, value: Value, ts_init: UnixNanos) {
897 let delta = match serde_json::from_value::<KrakenFuturesBookDelta>(value) {
898 Ok(d) => d,
899 Err(e) => {
900 tracing::trace!("Failed to parse book delta: {e}");
901 return;
902 }
903 };
904
905 let has_book = self.is_subscribed(KrakenFuturesChannel::Book, &delta.product_id);
906 let has_quotes = self.is_subscribed(KrakenFuturesChannel::Quotes, &delta.product_id);
907
908 if !has_book && !has_quotes {
909 return;
910 }
911
912 let Some(instrument) = self.get_instrument(&delta.product_id) else {
913 return;
914 };
915
916 let ts_event = UnixNanos::from((delta.timestamp as u64) * 1_000_000);
917 let instrument_id = instrument.id();
918 let price_precision = instrument.price_precision();
919 let size_precision = instrument.size_precision();
920
921 let side: OrderSide = delta.side.into();
922
923 if has_quotes && delta.qty > 0.0 {
924 let price = Price::new(delta.price, price_precision);
925 let size = Quantity::new(delta.qty, size_precision);
926
927 let (bid_price, ask_price, bid_size, ask_size) = match side {
928 OrderSide::Buy => (Some(price), None, Some(size), None),
929 OrderSide::Sell => (None, Some(price), None, Some(size)),
930 _ => (None, None, None, None),
931 };
932
933 if let Ok(quote) = self.quote_cache.process(
934 instrument_id,
935 bid_price,
936 ask_price,
937 bid_size,
938 ask_size,
939 ts_event,
940 ts_init,
941 ) {
942 self.pending_messages
943 .push_back(KrakenFuturesWsMessage::Quote(quote));
944 }
945 }
946
947 if has_book {
948 let size = Quantity::new(delta.qty, size_precision);
949 let action = if size.is_zero() {
950 BookAction::Delete
951 } else {
952 BookAction::Update
953 };
954
955 let order = BookOrder::new(side, Price::new(delta.price, price_precision), size, 0);
956
957 let book_delta = OrderBookDelta::new(
958 instrument_id,
959 action,
960 order,
961 0,
962 delta.seq as u64,
963 ts_event,
964 ts_init,
965 );
966
967 let book_deltas = OrderBookDeltas::new(instrument_id, vec![book_delta]);
968 self.pending_messages
969 .push_back(KrakenFuturesWsMessage::BookDeltas(book_deltas));
970 }
971 }
972
973 fn handle_open_orders_delta_value(&mut self, value: Value, ts_init: UnixNanos) {
974 let delta = match serde_json::from_value::<KrakenFuturesOpenOrdersDelta>(value) {
975 Ok(d) => d,
976 Err(e) => {
977 tracing::error!("Failed to parse open_orders delta: {e}");
978 return;
979 }
980 };
981
982 tracing::debug!(
983 order_id = %delta.order.order_id,
984 is_cancel = delta.is_cancel,
985 reason = ?delta.reason,
986 "Received open_orders delta"
987 );
988
989 if let Some(event) = self.parse_order_event(
990 &delta.order,
991 ts_init,
992 delta.is_cancel,
993 delta.reason.as_deref(),
994 ) {
995 self.emit_order_event(event);
996 }
997 }
998
999 fn handle_open_orders_cancel_value(&mut self, value: Value, ts_init: UnixNanos) {
1000 if let Some(reason) = value.get("reason").and_then(|r| r.as_str())
1003 && (reason == "full_fill" || reason == "partial_fill")
1004 {
1005 tracing::debug!(
1006 reason = %reason,
1007 "Skipping open_orders cancel for fill (handled by fills feed)"
1008 );
1009 return;
1010 }
1011
1012 let cancel = match serde_json::from_value::<KrakenFuturesOpenOrdersCancel>(value) {
1013 Ok(c) => c,
1014 Err(e) => {
1015 tracing::error!("Failed to parse open_orders cancel: {e}");
1016 return;
1017 }
1018 };
1019
1020 tracing::debug!(
1021 order_id = %cancel.order_id,
1022 cli_ord_id = ?cancel.cli_ord_id,
1023 reason = ?cancel.reason,
1024 "Received open_orders cancel"
1025 );
1026
1027 let Some(account_id) = self.account_id else {
1028 tracing::warn!("Cannot process cancel: account_id not set");
1029 return;
1030 };
1031
1032 let (client_order_id, info) = if let Some(cli_ord_id) = cancel.cli_ord_id.as_ref() {
1033 if let Some(info) = self.client_order_cache.get(cli_ord_id) {
1034 (ClientOrderId::new(cli_ord_id), info.clone())
1035 } else if let Some(mapped_cli_ord_id) = self.venue_order_cache.get(&cancel.order_id) {
1036 if let Some(info) = self.client_order_cache.get(mapped_cli_ord_id) {
1037 (ClientOrderId::new(mapped_cli_ord_id), info.clone())
1038 } else {
1039 tracing::debug!(
1040 order_id = %cancel.order_id,
1041 cli_ord_id = %cli_ord_id,
1042 "Cancel received for unknown order (not in cache)"
1043 );
1044 return;
1045 }
1046 } else {
1047 tracing::debug!(
1048 order_id = %cancel.order_id,
1049 cli_ord_id = %cli_ord_id,
1050 "Cancel received for unknown order (not in cache)"
1051 );
1052 return;
1053 }
1054 } else if let Some(mapped_cli_ord_id) = self.venue_order_cache.get(&cancel.order_id) {
1055 if let Some(info) = self.client_order_cache.get(mapped_cli_ord_id) {
1056 (ClientOrderId::new(mapped_cli_ord_id), info.clone())
1057 } else {
1058 tracing::debug!(
1059 order_id = %cancel.order_id,
1060 "Cancel received but mapped order not in cache"
1061 );
1062 return;
1063 }
1064 } else {
1065 tracing::debug!(
1066 order_id = %cancel.order_id,
1067 "Cancel received without cli_ord_id and no venue mapping (external order)"
1068 );
1069 return;
1070 };
1071
1072 let venue_order_id = VenueOrderId::new(&cancel.order_id);
1073
1074 let canceled = OrderCanceled::new(
1075 info.trader_id,
1076 info.strategy_id,
1077 info.instrument_id,
1078 client_order_id,
1079 UUID4::new(),
1080 ts_init,
1081 ts_init,
1082 false,
1083 Some(venue_order_id),
1084 Some(account_id),
1085 );
1086
1087 self.pending_messages
1088 .push_back(KrakenFuturesWsMessage::OrderCanceled(canceled));
1089 }
1090
1091 fn handle_fills_delta_value(&mut self, value: Value, ts_init: UnixNanos) {
1092 let delta = match serde_json::from_value::<KrakenFuturesFillsDelta>(value) {
1093 Ok(d) => d,
1094 Err(e) => {
1095 tracing::error!("Failed to parse fills delta: {e}");
1096 return;
1097 }
1098 };
1099
1100 tracing::debug!(fill_count = delta.fills.len(), "Received fills delta");
1101
1102 for fill in &delta.fills {
1103 tracing::debug!(
1104 fill_id = %fill.fill_id,
1105 order_id = %fill.order_id,
1106 "Processing fill"
1107 );
1108
1109 if let Some(report) = self.parse_fill_to_report(fill, ts_init) {
1110 self.pending_messages
1111 .push_back(KrakenFuturesWsMessage::FillReport(Box::new(report)));
1112 }
1113 }
1114 }
1115
1116 fn parse_order_event(
1123 &self,
1124 order: &KrakenFuturesOpenOrder,
1125 ts_init: UnixNanos,
1126 is_cancel: bool,
1127 cancel_reason: Option<&str>,
1128 ) -> Option<ParsedOrderEvent> {
1129 let Some(account_id) = self.account_id else {
1130 tracing::warn!("Cannot process order: account_id not set");
1131 return None;
1132 };
1133
1134 let instrument = self
1135 .instruments_cache
1136 .get(&Ustr::from(order.instrument.as_str()))?;
1137
1138 let instrument_id = instrument.id();
1139
1140 if order.qty <= 0.0 {
1141 tracing::warn!(
1142 order_id = %order.order_id,
1143 "Skipping order with invalid quantity: {}",
1144 order.qty
1145 );
1146 return None;
1147 }
1148
1149 let ts_event = UnixNanos::from((order.last_update_time as u64) * 1_000_000);
1150 let venue_order_id = VenueOrderId::new(&order.order_id);
1151
1152 let client_order_id = order
1153 .cli_ord_id
1154 .as_ref()
1155 .map(|s| ClientOrderId::new(s.as_str()));
1156
1157 let cached_info = order
1158 .cli_ord_id
1159 .as_ref()
1160 .and_then(|id| self.client_order_cache.get(id));
1161
1162 let Some(info) = cached_info else {
1164 return self
1165 .parse_order_to_status_report(order, ts_init, is_cancel)
1166 .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r)));
1167 };
1168
1169 let client_order_id = client_order_id.expect("client_order_id should exist if cached");
1170
1171 let status = if is_cancel {
1172 OrderStatus::Canceled
1173 } else if order.filled >= order.qty {
1174 OrderStatus::Filled
1175 } else if order.filled > 0.0 {
1176 OrderStatus::PartiallyFilled
1177 } else {
1178 OrderStatus::Accepted
1179 };
1180
1181 match status {
1182 OrderStatus::Accepted => Some(ParsedOrderEvent::Accepted(OrderAccepted::new(
1183 info.trader_id,
1184 info.strategy_id,
1185 instrument_id,
1186 client_order_id,
1187 venue_order_id,
1188 account_id,
1189 UUID4::new(),
1190 ts_event,
1191 ts_init,
1192 false,
1193 ))),
1194 OrderStatus::Canceled => {
1195 let is_expired = cancel_reason
1197 .map(|r| {
1198 let r_lower = r.to_lowercase();
1199 r_lower.contains("expir")
1200 || r_lower.contains("gtd")
1201 || r_lower.contains("timeout")
1202 })
1203 .unwrap_or(false);
1204
1205 if is_expired {
1206 Some(ParsedOrderEvent::Expired(OrderExpired::new(
1207 info.trader_id,
1208 info.strategy_id,
1209 instrument_id,
1210 client_order_id,
1211 UUID4::new(),
1212 ts_event,
1213 ts_init,
1214 false,
1215 Some(venue_order_id),
1216 Some(account_id),
1217 )))
1218 } else {
1219 Some(ParsedOrderEvent::Canceled(OrderCanceled::new(
1220 info.trader_id,
1221 info.strategy_id,
1222 instrument_id,
1223 client_order_id,
1224 UUID4::new(),
1225 ts_event,
1226 ts_init,
1227 false,
1228 Some(venue_order_id),
1229 Some(account_id),
1230 )))
1231 }
1232 }
1233
1234 OrderStatus::PartiallyFilled | OrderStatus::Filled => self
1236 .parse_order_to_status_report(order, ts_init, is_cancel)
1237 .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r))),
1238 _ => self
1239 .parse_order_to_status_report(order, ts_init, is_cancel)
1240 .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r))),
1241 }
1242 }
1243
1244 fn parse_order_to_status_report(
1248 &self,
1249 order: &KrakenFuturesOpenOrder,
1250 ts_init: UnixNanos,
1251 is_cancel: bool,
1252 ) -> Option<OrderStatusReport> {
1253 let Some(account_id) = self.account_id else {
1254 tracing::warn!("Cannot process order: account_id not set");
1255 return None;
1256 };
1257
1258 let instrument = self
1259 .instruments_cache
1260 .get(&Ustr::from(order.instrument.as_str()))?;
1261
1262 let instrument_id = instrument.id();
1263 let size_precision = instrument.size_precision();
1264
1265 let side = if order.direction == 0 {
1266 OrderSide::Buy
1267 } else {
1268 OrderSide::Sell
1269 };
1270
1271 let order_type = match order.order_type.as_str() {
1272 "limit" | "lmt" => OrderType::Limit,
1273 "stop" | "stp" => OrderType::StopLimit,
1274 "take_profit" => OrderType::LimitIfTouched,
1275 "market" | "mkt" => OrderType::Market,
1276 _ => OrderType::Limit,
1277 };
1278
1279 let status = if is_cancel {
1280 OrderStatus::Canceled
1281 } else if order.filled >= order.qty {
1282 OrderStatus::Filled
1283 } else if order.filled > 0.0 {
1284 OrderStatus::PartiallyFilled
1285 } else {
1286 OrderStatus::Accepted
1287 };
1288
1289 if order.qty <= 0.0 {
1290 let qty = order.qty;
1291 tracing::warn!(order_id = %order.order_id, "Skipping order with invalid quantity: {qty}");
1292 return None;
1293 }
1294
1295 let ts_event = UnixNanos::from((order.last_update_time as u64) * 1_000_000);
1296
1297 let client_order_id = order
1298 .cli_ord_id
1299 .as_ref()
1300 .map(|s| ClientOrderId::new(s.as_str()));
1301
1302 let filled_qty = if order.filled <= 0.0 {
1303 Quantity::zero(size_precision)
1304 } else {
1305 Quantity::new(order.filled, size_precision)
1306 };
1307
1308 Some(OrderStatusReport::new(
1309 account_id,
1310 instrument_id,
1311 client_order_id,
1312 VenueOrderId::new(&order.order_id),
1313 side,
1314 order_type,
1315 TimeInForce::Gtc,
1316 status,
1317 Quantity::new(order.qty, size_precision),
1318 filled_qty,
1319 ts_event, ts_event, ts_init,
1322 Some(UUID4::new()),
1323 ))
1324 }
1325
1326 fn parse_fill_to_report(
1327 &self,
1328 fill: &KrakenFuturesFill,
1329 ts_init: UnixNanos,
1330 ) -> Option<FillReport> {
1331 let Some(account_id) = self.account_id else {
1332 tracing::warn!("Cannot process fill: account_id not set");
1333 return None;
1334 };
1335
1336 let instrument = if let Some(ref symbol) = fill.instrument {
1338 self.instruments_cache.get(symbol).cloned()
1339 } else if let Some(ref cli_ord_id) = fill.cli_ord_id {
1340 self.client_order_cache.get(cli_ord_id).and_then(|info| {
1342 self.instruments_cache
1343 .iter()
1344 .find(|(_, inst)| inst.id() == info.instrument_id)
1345 .map(|(_, inst)| inst.clone())
1346 })
1347 } else {
1348 None
1349 };
1350
1351 let Some(instrument) = instrument else {
1352 tracing::warn!(
1353 fill_id = %fill.fill_id,
1354 order_id = %fill.order_id,
1355 cli_ord_id = ?fill.cli_ord_id,
1356 "Cannot resolve instrument for fill"
1357 );
1358 return None;
1359 };
1360
1361 let instrument_id = instrument.id();
1362 let price_precision = instrument.price_precision();
1363 let size_precision = instrument.size_precision();
1364
1365 if fill.qty <= 0.0 {
1366 let qty = fill.qty;
1367 tracing::warn!(fill_id = %fill.fill_id, "Skipping fill with invalid quantity: {qty}");
1368 return None;
1369 }
1370
1371 let side = if fill.buy {
1372 OrderSide::Buy
1373 } else {
1374 OrderSide::Sell
1375 };
1376
1377 let ts_event = UnixNanos::from((fill.time as u64) * 1_000_000);
1378
1379 let client_order_id = fill
1380 .cli_ord_id
1381 .as_ref()
1382 .map(|s| ClientOrderId::new(s.as_str()));
1383
1384 let commission = Money::new(fill.fee_paid.unwrap_or(0.0), instrument.quote_currency());
1385
1386 Some(FillReport::new(
1387 account_id,
1388 instrument_id,
1389 VenueOrderId::new(&fill.order_id),
1390 TradeId::new(&fill.fill_id),
1391 side,
1392 Quantity::new(fill.qty, size_precision),
1393 Price::new(fill.price, price_precision),
1394 commission,
1395 LiquiditySide::NoLiquiditySide, client_order_id,
1397 None, ts_event,
1399 ts_init,
1400 Some(UUID4::new()),
1401 ))
1402 }
1403}
1404
1405#[cfg(test)]
1406mod tests {
1407 use nautilus_model::{
1408 instruments::{CryptoFuture, InstrumentAny},
1409 types::Currency,
1410 };
1411 use rstest::rstest;
1412
1413 use super::*;
1414
1415 fn create_test_handler() -> FuturesFeedHandler {
1416 let signal = Arc::new(AtomicBool::new(false));
1417 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
1418 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
1419 let subscriptions = SubscriptionState::new(':');
1420
1421 FuturesFeedHandler::new(signal, cmd_rx, raw_rx, subscriptions)
1422 }
1423
1424 fn create_test_instrument() -> InstrumentAny {
1425 InstrumentAny::CryptoFuture(CryptoFuture::new(
1426 InstrumentId::from("PI_XBTUSD.KRAKEN"),
1427 Symbol::from("PI_XBTUSD"),
1428 Currency::BTC(),
1429 Currency::USD(),
1430 Currency::USD(),
1431 false,
1432 UnixNanos::default(),
1433 UnixNanos::default(),
1434 1, 0, Price::from("0.5"),
1437 Quantity::from(1),
1438 None,
1439 None,
1440 None,
1441 None,
1442 None,
1443 None,
1444 None,
1445 None,
1446 None,
1447 None,
1448 None,
1449 None,
1450 UnixNanos::default(),
1451 UnixNanos::default(),
1452 ))
1453 }
1454
1455 #[rstest]
1456 fn test_book_snapshot_filters_zero_quantity_bids() {
1457 let mut handler = create_test_handler();
1458 let instrument = create_test_instrument();
1459 handler
1460 .instruments_cache
1461 .insert(Ustr::from("PI_XBTUSD"), instrument);
1462
1463 handler.subscriptions.mark_subscribe("book:PI_XBTUSD");
1464 handler.subscriptions.confirm_subscribe("book:PI_XBTUSD");
1465
1466 let json = include_str!("../../../test_data/ws_futures_book_snapshot_with_zero_qty.json");
1467 let ts_init = UnixNanos::from(1_000_000_000);
1468
1469 handler.parse_message(json, ts_init);
1470
1471 assert_eq!(handler.pending_messages.len(), 1);
1472
1473 let msg = handler.pending_messages.pop_front().unwrap();
1474 let KrakenFuturesWsMessage::BookDeltas(deltas) = msg else {
1475 panic!("Expected BookDeltas message");
1476 };
1477
1478 assert_eq!(deltas.deltas.len(), 4);
1480 assert_eq!(deltas.deltas[0].action, BookAction::Clear);
1481
1482 for delta in &deltas.deltas[1..] {
1483 assert!(
1484 !delta.order.size.is_zero(),
1485 "Found zero-quantity delta that should have been filtered: {:?}",
1486 delta
1487 );
1488 }
1489 }
1490
1491 #[rstest]
1492 fn test_book_snapshot_filters_zero_quantity_asks() {
1493 let mut handler = create_test_handler();
1494 let instrument = create_test_instrument();
1495 handler
1496 .instruments_cache
1497 .insert(Ustr::from("PI_XBTUSD"), instrument);
1498
1499 handler.subscriptions.mark_subscribe("book:PI_XBTUSD");
1500 handler.subscriptions.confirm_subscribe("book:PI_XBTUSD");
1501
1502 let json = include_str!("../../../test_data/ws_futures_book_snapshot_with_zero_qty.json");
1503 let ts_init = UnixNanos::from(1_000_000_000);
1504
1505 handler.parse_message(json, ts_init);
1506
1507 let msg = handler.pending_messages.pop_front().unwrap();
1508 let KrakenFuturesWsMessage::BookDeltas(deltas) = msg else {
1509 panic!("Expected BookDeltas message");
1510 };
1511
1512 let sell_deltas: Vec<_> = deltas
1514 .deltas
1515 .iter()
1516 .filter(|d| d.order.side == OrderSide::Sell)
1517 .collect();
1518
1519 assert_eq!(sell_deltas.len(), 1);
1520 assert_eq!(sell_deltas[0].order.price.as_f64(), 34912.0);
1521 }
1522
1523 #[rstest]
1524 fn test_trade_filters_zero_quantity() {
1525 let mut handler = create_test_handler();
1526 let instrument = create_test_instrument();
1527 handler
1528 .instruments_cache
1529 .insert(Ustr::from("PI_XBTUSD"), instrument);
1530
1531 handler.subscriptions.mark_subscribe("trades:PI_XBTUSD");
1532 handler.subscriptions.confirm_subscribe("trades:PI_XBTUSD");
1533
1534 let json = r#"{
1535 "feed": "trade",
1536 "product_id": "PI_XBTUSD",
1537 "time": 1612269825817,
1538 "side": "buy",
1539 "qty": 0.0,
1540 "price": 34900.0,
1541 "seq": 12345
1542 }"#;
1543 let ts_init = UnixNanos::from(1_000_000_000);
1544
1545 handler.parse_message(json, ts_init);
1546
1547 assert!(
1548 handler.pending_messages.is_empty(),
1549 "Zero quantity trade should be filtered out"
1550 );
1551 }
1552}