1use std::{
19 collections::VecDeque,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use nautilus_common::cache::quote::QuoteCache;
28use nautilus_core::{AtomicTime, UUID4, UnixNanos, time::get_atomic_clock_realtime};
29use nautilus_model::{
30 data::{Bar, Data, OrderBookDeltas, QuoteTick},
31 events::{OrderAccepted, OrderCanceled, OrderExpired, OrderRejected, OrderUpdated},
32 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
33 instruments::{Instrument, InstrumentAny},
34 types::{Price, Quantity},
35};
36use nautilus_network::{
37 RECONNECTED,
38 websocket::{SubscriptionState, WebSocketClient},
39};
40use serde_json::Value;
41use tokio_tungstenite::tungstenite::Message;
42use ustr::Ustr;
43
44use super::{
45 enums::{KrakenExecType, KrakenWsChannel},
46 messages::{
47 KrakenWsBookData, KrakenWsExecutionData, KrakenWsMessage, KrakenWsOhlcData,
48 KrakenWsResponse, KrakenWsTickerData, KrakenWsTradeData, NautilusWsMessage,
49 },
50 parse::{
51 parse_book_deltas, parse_quote_tick, parse_trade_tick, parse_ws_bar, parse_ws_fill_report,
52 parse_ws_order_status_report,
53 },
54};
55
56#[derive(Debug, Clone)]
58struct CachedOrderInfo {
59 instrument_id: InstrumentId,
60 trader_id: TraderId,
61 strategy_id: StrategyId,
62}
63
64#[derive(Debug)]
66#[allow(
67 clippy::large_enum_variant,
68 reason = "Commands are ephemeral and immediately consumed"
69)]
70pub enum SpotHandlerCommand {
71 SetClient(WebSocketClient),
72 Disconnect,
73 SendText {
74 payload: String,
75 },
76 InitializeInstruments(Vec<InstrumentAny>),
77 UpdateInstrument(InstrumentAny),
78 SetAccountId(AccountId),
79 CacheClientOrder {
80 client_order_id: ClientOrderId,
81 instrument_id: InstrumentId,
82 trader_id: TraderId,
83 strategy_id: StrategyId,
84 },
85}
86
87type OhlcBufferKey = (Ustr, u32);
89
90type OhlcBufferEntry = (Bar, UnixNanos);
92
93pub(super) struct SpotFeedHandler {
95 clock: &'static AtomicTime,
96 signal: Arc<AtomicBool>,
97 client: Option<WebSocketClient>,
98 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<SpotHandlerCommand>,
99 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
100 subscriptions: SubscriptionState,
101 instruments_cache: AHashMap<Ustr, InstrumentAny>,
102 client_order_cache: AHashMap<String, CachedOrderInfo>,
103 order_qty_cache: AHashMap<String, f64>,
104 quote_cache: QuoteCache,
105 book_sequence: u64,
106 pending_quotes: Vec<QuoteTick>,
107 pending_messages: VecDeque<NautilusWsMessage>,
108 account_id: Option<AccountId>,
109 ohlc_buffer: AHashMap<OhlcBufferKey, OhlcBufferEntry>,
110}
111
112impl SpotFeedHandler {
113 pub(super) fn new(
115 signal: Arc<AtomicBool>,
116 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<SpotHandlerCommand>,
117 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
118 subscriptions: SubscriptionState,
119 ) -> Self {
120 Self {
121 clock: get_atomic_clock_realtime(),
122 signal,
123 client: None,
124 cmd_rx,
125 raw_rx,
126 subscriptions,
127 instruments_cache: AHashMap::new(),
128 client_order_cache: AHashMap::new(),
129 order_qty_cache: AHashMap::new(),
130 quote_cache: QuoteCache::new(),
131 book_sequence: 0,
132 pending_quotes: Vec::new(),
133 pending_messages: VecDeque::new(),
134 account_id: None,
135 ohlc_buffer: AHashMap::new(),
136 }
137 }
138
139 pub(super) fn is_stopped(&self) -> bool {
140 self.signal.load(Ordering::Relaxed)
141 }
142
143 fn is_subscribed(&self, topic: &str) -> bool {
145 self.subscriptions.all_topics().iter().any(|t| t == topic)
146 }
147
148 fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
149 self.instruments_cache.get(symbol).cloned()
150 }
151
152 fn flush_ohlc_buffer(&mut self) {
157 if self.ohlc_buffer.is_empty() {
158 return;
159 }
160
161 let bars: Vec<Data> = self
162 .ohlc_buffer
163 .drain()
164 .map(|(_, (bar, _))| Data::Bar(bar))
165 .collect();
166
167 if !bars.is_empty() {
168 tracing::debug!("Flushing {} buffered OHLC bars on stream end", bars.len());
169 self.pending_messages
170 .push_back(NautilusWsMessage::Data(bars));
171 }
172 }
173
174 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
176 if let Some(msg) = self.pending_messages.pop_front() {
178 return Some(msg);
179 }
180
181 if let Some(quote) = self.pending_quotes.pop() {
182 return Some(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
183 }
184
185 loop {
186 tokio::select! {
187 Some(cmd) = self.cmd_rx.recv() => {
188 match cmd {
189 SpotHandlerCommand::SetClient(client) => {
190 tracing::debug!("WebSocketClient received by handler");
191 self.client = Some(client);
192 }
193 SpotHandlerCommand::Disconnect => {
194 tracing::debug!("Disconnect command received");
195 if let Some(client) = self.client.take() {
196 client.disconnect().await;
197 }
198 }
199 SpotHandlerCommand::SendText { payload } => {
200 if let Some(client) = &self.client
201 && let Err(e) = client.send_text(payload.clone(), None).await
202 {
203 tracing::error!(error = %e, "Failed to send text");
204 }
205 }
206 SpotHandlerCommand::InitializeInstruments(instruments) => {
207 for inst in instruments {
208 self.instruments_cache.insert(inst.symbol().inner(), inst);
211 }
212 }
213 SpotHandlerCommand::UpdateInstrument(inst) => {
214 self.instruments_cache.insert(inst.symbol().inner(), inst);
215 }
216 SpotHandlerCommand::SetAccountId(account_id) => {
217 tracing::debug!(%account_id, "Account ID set for execution reports");
218 self.account_id = Some(account_id);
219 }
220 SpotHandlerCommand::CacheClientOrder {
221 client_order_id,
222 instrument_id,
223 trader_id,
224 strategy_id,
225 } => {
226 tracing::debug!(
227 %client_order_id,
228 %instrument_id,
229 "Cached client order info"
230 );
231 self.client_order_cache.insert(
232 client_order_id.to_string(),
233 CachedOrderInfo {
234 instrument_id,
235 trader_id,
236 strategy_id,
237 },
238 );
239 }
240 }
241 continue;
242 }
243
244 msg = self.raw_rx.recv() => {
245 let msg = match msg {
246 Some(msg) => msg,
247 None => {
248 tracing::debug!("WebSocket stream closed");
249 self.flush_ohlc_buffer();
250 return self.pending_messages.pop_front();
251 }
252 };
253
254 if let Message::Ping(data) = &msg {
255 tracing::trace!("Received ping frame with {} bytes", data.len());
256 if let Some(client) = &self.client
257 && let Err(e) = client.send_pong(data.to_vec()).await
258 {
259 tracing::warn!(error = %e, "Failed to send pong frame");
260 }
261 continue;
262 }
263
264 if self.signal.load(Ordering::Relaxed) {
265 tracing::debug!("Stop signal received");
266 self.flush_ohlc_buffer();
267 return self.pending_messages.pop_front();
268 }
269
270 let text = match msg {
271 Message::Text(text) => text.to_string(),
272 Message::Binary(data) => {
273 match String::from_utf8(data.to_vec()) {
274 Ok(text) => text,
275 Err(e) => {
276 tracing::warn!("Failed to decode binary message: {e}");
277 continue;
278 }
279 }
280 }
281 Message::Pong(_) => {
282 tracing::trace!("Received pong");
283 continue;
284 }
285 Message::Close(_) => {
286 tracing::info!("WebSocket connection closed");
287 self.flush_ohlc_buffer();
288 return self.pending_messages.pop_front();
289 }
290 Message::Frame(_) => {
291 tracing::trace!("Received raw frame");
292 continue;
293 }
294 _ => continue,
295 };
296
297 if text == RECONNECTED {
298 tracing::info!("Received WebSocket reconnected signal");
299 self.quote_cache.clear();
300 return Some(NautilusWsMessage::Reconnected);
301 }
302
303 let ts_init = self.clock.get_time_ns();
304
305 if let Some(nautilus_msg) = self.parse_message(&text, ts_init) {
306 return Some(nautilus_msg);
307 }
308
309 continue;
310 }
311 }
312 }
313 }
314
315 fn parse_message(&mut self, text: &str, ts_init: UnixNanos) -> Option<NautilusWsMessage> {
316 if text.len() < 50 && text.starts_with("{\"channel\":\"") {
319 if text.contains("heartbeat") {
320 tracing::trace!("Received heartbeat");
321 return None;
322 }
323 if text.contains("status") {
324 tracing::debug!("Received status message");
325 return None;
326 }
327 }
328
329 let value: Value = match serde_json::from_str(text) {
330 Ok(v) => v,
331 Err(e) => {
332 tracing::warn!("Failed to parse message: {e}");
333 return None;
334 }
335 };
336
337 if value.get("method").is_some() {
339 self.handle_control_message(value);
340 return None;
341 }
342
343 if value.get("channel").is_some() && value.get("data").is_some() {
345 match serde_json::from_value::<KrakenWsMessage>(value) {
346 Ok(msg) => return self.handle_data_message(msg, ts_init),
347 Err(e) => {
348 tracing::debug!("Failed to parse data message: {e}");
349 return None;
350 }
351 }
352 }
353
354 tracing::debug!("Unhandled message structure: {text}");
355 None
356 }
357
358 fn handle_control_message(&self, value: Value) {
359 match serde_json::from_value::<KrakenWsResponse>(value) {
360 Ok(response) => match response {
361 KrakenWsResponse::Subscribe(sub) => {
362 if sub.success {
363 if let Some(result) = &sub.result {
364 tracing::debug!(
365 channel = ?result.channel,
366 req_id = ?sub.req_id,
367 "Subscription confirmed"
368 );
369 } else {
370 tracing::debug!(req_id = ?sub.req_id, "Subscription confirmed");
371 }
372 } else {
373 tracing::warn!(
374 error = ?sub.error,
375 req_id = ?sub.req_id,
376 "Subscription failed"
377 );
378 }
379 }
380 KrakenWsResponse::Unsubscribe(unsub) => {
381 if unsub.success {
382 tracing::debug!(req_id = ?unsub.req_id, "Unsubscription confirmed");
383 } else {
384 tracing::warn!(
385 error = ?unsub.error,
386 req_id = ?unsub.req_id,
387 "Unsubscription failed"
388 );
389 }
390 }
391 KrakenWsResponse::Pong(pong) => {
392 tracing::trace!(req_id = ?pong.req_id, "Received pong");
393 }
394 KrakenWsResponse::Other => {
395 tracing::debug!("Received unknown control response");
396 }
397 },
398 Err(_) => {
399 tracing::debug!("Received control message (failed to parse details)");
400 }
401 }
402 }
403
404 fn handle_data_message(
405 &mut self,
406 msg: KrakenWsMessage,
407 ts_init: UnixNanos,
408 ) -> Option<NautilusWsMessage> {
409 match msg.channel {
410 KrakenWsChannel::Book => self.handle_book_message(msg, ts_init),
411 KrakenWsChannel::Ticker => self.handle_ticker_message(msg, ts_init),
412 KrakenWsChannel::Trade => self.handle_trade_message(msg, ts_init),
413 KrakenWsChannel::Ohlc => self.handle_ohlc_message(msg, ts_init),
414 KrakenWsChannel::Executions => self.handle_executions_message(msg, ts_init),
415 _ => {
416 tracing::warn!("Unhandled channel: {:?}", msg.channel);
417 None
418 }
419 }
420 }
421
422 fn handle_book_message(
423 &mut self,
424 msg: KrakenWsMessage,
425 ts_init: UnixNanos,
426 ) -> Option<NautilusWsMessage> {
427 let mut all_deltas = Vec::new();
428 let mut instrument_id = None;
429
430 for data in msg.data {
431 match serde_json::from_value::<KrakenWsBookData>(data) {
432 Ok(book_data) => {
433 let symbol = &book_data.symbol;
434 let instrument = self.get_instrument(symbol)?;
435 instrument_id = Some(instrument.id());
436
437 let price_precision = instrument.price_precision();
438 let size_precision = instrument.size_precision();
439
440 let has_book = self.is_subscribed(&format!("book:{symbol}"));
441 let has_quotes = self.is_subscribed(&format!("quotes:{symbol}"));
442
443 if has_quotes {
444 let best_bid = book_data.bids.as_ref().and_then(|bids| bids.first());
445 let best_ask = book_data.asks.as_ref().and_then(|asks| asks.first());
446
447 let bid_price = best_bid.map(|b| Price::new(b.price, price_precision));
448 let ask_price = best_ask.map(|a| Price::new(a.price, price_precision));
449 let bid_size = best_bid.map(|b| Quantity::new(b.qty, size_precision));
450 let ask_size = best_ask.map(|a| Quantity::new(a.qty, size_precision));
451
452 if let Ok(quote) = self.quote_cache.process(
453 instrument.id(),
454 bid_price,
455 ask_price,
456 bid_size,
457 ask_size,
458 ts_init,
459 ts_init,
460 ) {
461 self.pending_quotes.push(quote);
462 }
463 }
464
465 if has_book {
466 match parse_book_deltas(
467 &book_data,
468 &instrument,
469 self.book_sequence,
470 ts_init,
471 ) {
472 Ok(mut deltas) => {
473 self.book_sequence += deltas.len() as u64;
474 all_deltas.append(&mut deltas);
475 }
476 Err(e) => {
477 tracing::error!("Failed to parse book deltas: {e}");
478 }
479 }
480 }
481 }
482 Err(e) => {
483 tracing::error!("Failed to deserialize book data: {e}");
484 }
485 }
486 }
487
488 if all_deltas.is_empty() {
489 if let Some(quote) = self.pending_quotes.pop() {
490 return Some(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
491 }
492 None
493 } else {
494 let deltas = OrderBookDeltas::new(instrument_id?, all_deltas);
495 Some(NautilusWsMessage::Deltas(deltas))
496 }
497 }
498
499 fn handle_ticker_message(
500 &self,
501 msg: KrakenWsMessage,
502 ts_init: UnixNanos,
503 ) -> Option<NautilusWsMessage> {
504 let mut quotes = Vec::new();
505
506 for data in msg.data {
507 match serde_json::from_value::<KrakenWsTickerData>(data) {
508 Ok(ticker_data) => {
509 let instrument = self.get_instrument(&ticker_data.symbol)?;
510
511 match parse_quote_tick(&ticker_data, &instrument, ts_init) {
512 Ok(quote) => quotes.push(Data::Quote(quote)),
513 Err(e) => {
514 tracing::error!("Failed to parse quote tick: {e}");
515 }
516 }
517 }
518 Err(e) => {
519 tracing::error!("Failed to deserialize ticker data: {e}");
520 }
521 }
522 }
523
524 if quotes.is_empty() {
525 None
526 } else {
527 Some(NautilusWsMessage::Data(quotes))
528 }
529 }
530
531 fn handle_trade_message(
532 &self,
533 msg: KrakenWsMessage,
534 ts_init: UnixNanos,
535 ) -> Option<NautilusWsMessage> {
536 let mut trades = Vec::new();
537
538 for data in msg.data {
539 match serde_json::from_value::<KrakenWsTradeData>(data) {
540 Ok(trade_data) => {
541 let instrument = self.get_instrument(&trade_data.symbol)?;
542
543 match parse_trade_tick(&trade_data, &instrument, ts_init) {
544 Ok(trade) => trades.push(Data::Trade(trade)),
545 Err(e) => {
546 tracing::error!("Failed to parse trade tick: {e}");
547 }
548 }
549 }
550 Err(e) => {
551 tracing::error!("Failed to deserialize trade data: {e}");
552 }
553 }
554 }
555
556 if trades.is_empty() {
557 None
558 } else {
559 Some(NautilusWsMessage::Data(trades))
560 }
561 }
562
563 fn handle_ohlc_message(
564 &mut self,
565 msg: KrakenWsMessage,
566 ts_init: UnixNanos,
567 ) -> Option<NautilusWsMessage> {
568 let mut closed_bars = Vec::new();
569
570 for data in msg.data {
571 match serde_json::from_value::<KrakenWsOhlcData>(data) {
572 Ok(ohlc_data) => {
573 let instrument = self.get_instrument(&ohlc_data.symbol)?;
574
575 match parse_ws_bar(&ohlc_data, &instrument, ts_init) {
576 Ok(new_bar) => {
577 let key = (ohlc_data.symbol, ohlc_data.interval);
578 let new_interval_begin = UnixNanos::from(
579 ohlc_data.interval_begin.timestamp_nanos_opt().unwrap_or(0) as u64,
580 );
581
582 if let Some((buffered_bar, buffered_interval_begin)) =
584 self.ohlc_buffer.get(&key)
585 {
586 if new_interval_begin != *buffered_interval_begin {
588 closed_bars.push(Data::Bar(*buffered_bar));
589 }
590 }
591
592 self.ohlc_buffer.insert(key, (new_bar, new_interval_begin));
594 }
595 Err(e) => {
596 tracing::error!("Failed to parse bar: {e}");
597 }
598 }
599 }
600 Err(e) => {
601 tracing::error!("Failed to deserialize OHLC data: {e}");
602 }
603 }
604 }
605
606 if closed_bars.is_empty() {
607 None
608 } else {
609 Some(NautilusWsMessage::Data(closed_bars))
610 }
611 }
612
613 fn handle_executions_message(
614 &mut self,
615 msg: KrakenWsMessage,
616 ts_init: UnixNanos,
617 ) -> Option<NautilusWsMessage> {
618 let Some(account_id) = self.account_id else {
619 tracing::warn!("Cannot process execution message: account_id not set");
620 return None;
621 };
622
623 for data in msg.data {
625 match serde_json::from_value::<KrakenWsExecutionData>(data) {
626 Ok(exec_data) => {
627 tracing::debug!(
628 exec_type = ?exec_data.exec_type,
629 order_id = %exec_data.order_id,
630 order_status = ?exec_data.order_status,
631 order_qty = ?exec_data.order_qty,
632 cum_qty = ?exec_data.cum_qty,
633 last_qty = ?exec_data.last_qty,
634 "Received execution message"
635 );
636
637 if let Some(qty) = exec_data.order_qty {
639 self.order_qty_cache.insert(exec_data.order_id.clone(), qty);
640 }
641
642 let (instrument, cached_info) = if let Some(ref symbol) = exec_data.symbol {
644 let symbol_ustr = Ustr::from(symbol.as_str());
645 let inst = self.instruments_cache.get(&symbol_ustr).cloned();
646 if inst.is_none() {
647 tracing::warn!(
648 symbol = %symbol,
649 order_id = %exec_data.order_id,
650 "No instrument found for symbol"
651 );
652 }
653 let cached = exec_data
654 .cl_ord_id
655 .as_ref()
656 .and_then(|id| self.client_order_cache.get(id).cloned());
657 (inst, cached)
658 } else if let Some(ref cl_ord_id) = exec_data.cl_ord_id {
659 let cached = self.client_order_cache.get(cl_ord_id).cloned();
660 let inst = cached.as_ref().and_then(|info| {
661 self.instruments_cache
662 .iter()
663 .find(|(_, inst)| inst.id() == info.instrument_id)
664 .map(|(_, inst)| inst.clone())
665 });
666 (inst, cached)
667 } else {
668 (None, None)
669 };
670
671 let Some(instrument) = instrument else {
672 tracing::debug!(
673 order_id = %exec_data.order_id,
674 cl_ord_id = ?exec_data.cl_ord_id,
675 exec_type = ?exec_data.exec_type,
676 "Execution missing symbol and order not in cache (external order)"
677 );
678 continue;
679 };
680
681 let cached_order_qty = self.order_qty_cache.get(&exec_data.order_id).copied();
682 let ts_event = chrono::DateTime::parse_from_rfc3339(&exec_data.timestamp)
683 .map(|t| UnixNanos::from(t.timestamp_nanos_opt().unwrap_or(0) as u64))
684 .unwrap_or(ts_init);
685
686 if let Some(ref info) = cached_info {
689 let client_order_id = exec_data
690 .cl_ord_id
691 .as_ref()
692 .map(ClientOrderId::new)
693 .expect("cl_ord_id should exist if cached");
694 let venue_order_id = VenueOrderId::new(&exec_data.order_id);
695
696 match exec_data.exec_type {
697 KrakenExecType::PendingNew => {
698 let accepted = OrderAccepted::new(
700 info.trader_id,
701 info.strategy_id,
702 instrument.id(),
703 client_order_id,
704 venue_order_id,
705 account_id,
706 UUID4::new(),
707 ts_event,
708 ts_init,
709 false,
710 );
711 self.pending_messages
712 .push_back(NautilusWsMessage::OrderAccepted(accepted));
713 }
714 KrakenExecType::New => {
715 }
717 KrakenExecType::Canceled => {
718 let is_post_only_rejection = exec_data
721 .reason
722 .as_ref()
723 .is_some_and(|r| r.eq_ignore_ascii_case("Post only order"));
724
725 if is_post_only_rejection {
726 let reason = exec_data
727 .reason
728 .as_deref()
729 .unwrap_or("Post-only order would have crossed");
730 let rejected = OrderRejected::new(
731 info.trader_id,
732 info.strategy_id,
733 instrument.id(),
734 client_order_id,
735 account_id,
736 Ustr::from(reason),
737 UUID4::new(),
738 ts_event,
739 ts_init,
740 false,
741 true, );
743 self.pending_messages
744 .push_back(NautilusWsMessage::OrderRejected(rejected));
745 } else {
746 let canceled = OrderCanceled::new(
747 info.trader_id,
748 info.strategy_id,
749 instrument.id(),
750 client_order_id,
751 UUID4::new(),
752 ts_event,
753 ts_init,
754 false,
755 Some(venue_order_id),
756 Some(account_id),
757 );
758 self.pending_messages
759 .push_back(NautilusWsMessage::OrderCanceled(canceled));
760 }
761 }
762 KrakenExecType::Expired => {
763 let expired = OrderExpired::new(
764 info.trader_id,
765 info.strategy_id,
766 instrument.id(),
767 client_order_id,
768 UUID4::new(),
769 ts_event,
770 ts_init,
771 false,
772 Some(venue_order_id),
773 Some(account_id),
774 );
775 self.pending_messages
776 .push_back(NautilusWsMessage::OrderExpired(expired));
777 }
778 KrakenExecType::Amended | KrakenExecType::Restated => {
779 if let Some(order_qty) = exec_data.order_qty.or(cached_order_qty) {
781 let updated = OrderUpdated::new(
782 info.trader_id,
783 info.strategy_id,
784 instrument.id(),
785 client_order_id,
786 Quantity::new(order_qty, instrument.size_precision()),
787 UUID4::new(),
788 ts_event,
789 ts_init,
790 false,
791 Some(venue_order_id),
792 Some(account_id),
793 None, None, None, );
797 self.pending_messages
798 .push_back(NautilusWsMessage::OrderUpdated(updated));
799 }
800 }
801 KrakenExecType::Trade | KrakenExecType::Filled => {
802 let has_complete_trade_data =
804 exec_data.last_qty.is_some_and(|q| q > 0.0)
805 && exec_data.last_price.is_some_and(|p| p > 0.0);
806
807 if let Ok(status_report) = parse_ws_order_status_report(
808 &exec_data,
809 &instrument,
810 account_id,
811 cached_order_qty,
812 ts_init,
813 ) {
814 self.pending_messages.push_back(
815 NautilusWsMessage::OrderStatusReport(Box::new(
816 status_report,
817 )),
818 );
819 }
820
821 if has_complete_trade_data
822 && let Ok(fill_report) = parse_ws_fill_report(
823 &exec_data,
824 &instrument,
825 account_id,
826 ts_init,
827 )
828 {
829 self.pending_messages
830 .push_back(NautilusWsMessage::FillReport(Box::new(
831 fill_report,
832 )));
833 }
834 }
835 KrakenExecType::IcebergRefill => {
836 if let Some(order_qty) = exec_data.order_qty.or(cached_order_qty) {
838 let updated = OrderUpdated::new(
839 info.trader_id,
840 info.strategy_id,
841 instrument.id(),
842 client_order_id,
843 Quantity::new(order_qty, instrument.size_precision()),
844 UUID4::new(),
845 ts_event,
846 ts_init,
847 false,
848 Some(venue_order_id),
849 Some(account_id),
850 None,
851 None,
852 None,
853 );
854 self.pending_messages
855 .push_back(NautilusWsMessage::OrderUpdated(updated));
856 }
857 }
858 KrakenExecType::Status => {
859 if let Ok(status_report) = parse_ws_order_status_report(
861 &exec_data,
862 &instrument,
863 account_id,
864 cached_order_qty,
865 ts_init,
866 ) {
867 self.pending_messages.push_back(
868 NautilusWsMessage::OrderStatusReport(Box::new(
869 status_report,
870 )),
871 );
872 }
873 }
874 }
875 } else {
876 if exec_data.exec_type == KrakenExecType::Trade
878 || exec_data.exec_type == KrakenExecType::Filled
879 {
880 let has_order_data = exec_data.order_qty.is_some()
881 || cached_order_qty.is_some()
882 || exec_data.cum_qty.is_some();
883
884 let has_complete_trade_data =
885 exec_data.last_qty.is_some_and(|q| q > 0.0)
886 && exec_data.last_price.is_some_and(|p| p > 0.0);
887
888 if has_order_data
889 && let Ok(status_report) = parse_ws_order_status_report(
890 &exec_data,
891 &instrument,
892 account_id,
893 cached_order_qty,
894 ts_init,
895 )
896 {
897 self.pending_messages.push_back(
898 NautilusWsMessage::OrderStatusReport(Box::new(status_report)),
899 );
900 }
901
902 if has_complete_trade_data
903 && let Ok(fill_report) = parse_ws_fill_report(
904 &exec_data,
905 &instrument,
906 account_id,
907 ts_init,
908 )
909 {
910 self.pending_messages
911 .push_back(NautilusWsMessage::FillReport(Box::new(
912 fill_report,
913 )));
914 }
915 } else if let Ok(report) = parse_ws_order_status_report(
916 &exec_data,
917 &instrument,
918 account_id,
919 cached_order_qty,
920 ts_init,
921 ) {
922 self.pending_messages
923 .push_back(NautilusWsMessage::OrderStatusReport(Box::new(report)));
924 }
925 }
926 }
927 Err(e) => {
928 tracing::error!("Failed to deserialize execution data: {e}");
929 }
930 }
931 }
932
933 self.pending_messages.pop_front()
935 }
936}