1use std::{
19 collections::VecDeque,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use nautilus_core::{AtomicTime, UUID4, UnixNanos, time::get_atomic_clock_realtime};
28use nautilus_model::{
29 data::{Bar, Data, OrderBookDeltas},
30 events::{OrderAccepted, OrderCanceled, OrderExpired, OrderRejected, OrderUpdated},
31 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
32 instruments::{Instrument, InstrumentAny},
33 types::Quantity,
34};
35use nautilus_network::{
36 RECONNECTED,
37 websocket::{SubscriptionState, WebSocketClient},
38};
39use serde_json::Value;
40use tokio_tungstenite::tungstenite::Message;
41use ustr::Ustr;
42
43use super::{
44 enums::{KrakenExecType, KrakenWsChannel},
45 messages::{
46 KrakenWsBookData, KrakenWsExecutionData, KrakenWsMessage, KrakenWsOhlcData,
47 KrakenWsResponse, KrakenWsTickerData, KrakenWsTradeData, NautilusWsMessage,
48 },
49 parse::{
50 parse_book_deltas, parse_quote_tick, parse_trade_tick, parse_ws_bar, parse_ws_fill_report,
51 parse_ws_order_status_report,
52 },
53};
54
55#[derive(Debug, Clone)]
57struct CachedOrderInfo {
58 instrument_id: InstrumentId,
59 trader_id: TraderId,
60 strategy_id: StrategyId,
61}
62
63#[derive(Debug)]
65#[allow(
66 clippy::large_enum_variant,
67 reason = "Commands are ephemeral and immediately consumed"
68)]
69pub enum SpotHandlerCommand {
70 SetClient(WebSocketClient),
71 Disconnect,
72 SendText {
73 payload: String,
74 },
75 InitializeInstruments(Vec<InstrumentAny>),
76 UpdateInstrument(InstrumentAny),
77 SetAccountId(AccountId),
78 CacheClientOrder {
79 client_order_id: ClientOrderId,
80 instrument_id: InstrumentId,
81 trader_id: TraderId,
82 strategy_id: StrategyId,
83 },
84}
85
86type OhlcBufferKey = (Ustr, u32);
88
89type OhlcBufferEntry = (Bar, UnixNanos);
91
92pub(super) struct SpotFeedHandler {
94 clock: &'static AtomicTime,
95 signal: Arc<AtomicBool>,
96 client: Option<WebSocketClient>,
97 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<SpotHandlerCommand>,
98 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
99 subscriptions: SubscriptionState,
100 instruments_cache: AHashMap<Ustr, InstrumentAny>,
101 client_order_cache: AHashMap<ClientOrderId, CachedOrderInfo>,
102 order_qty_cache: AHashMap<VenueOrderId, f64>,
103 book_sequence: u64,
104 pending_messages: VecDeque<NautilusWsMessage>,
105 account_id: Option<AccountId>,
106 ohlc_buffer: AHashMap<OhlcBufferKey, OhlcBufferEntry>,
107}
108
109impl SpotFeedHandler {
110 pub(super) fn new(
112 signal: Arc<AtomicBool>,
113 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<SpotHandlerCommand>,
114 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
115 subscriptions: SubscriptionState,
116 ) -> Self {
117 Self {
118 clock: get_atomic_clock_realtime(),
119 signal,
120 client: None,
121 cmd_rx,
122 raw_rx,
123 subscriptions,
124 instruments_cache: AHashMap::new(),
125 client_order_cache: AHashMap::new(),
126 order_qty_cache: AHashMap::new(),
127 book_sequence: 0,
128 pending_messages: VecDeque::new(),
129 account_id: None,
130 ohlc_buffer: AHashMap::new(),
131 }
132 }
133
134 pub(super) fn is_stopped(&self) -> bool {
135 self.signal.load(Ordering::Relaxed)
136 }
137
138 fn is_subscribed(&self, topic: &str) -> bool {
140 self.subscriptions.all_topics().iter().any(|t| t == topic)
141 }
142
143 fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
144 self.instruments_cache.get(symbol).cloned()
145 }
146
147 fn flush_ohlc_buffer(&mut self) {
152 if self.ohlc_buffer.is_empty() {
153 return;
154 }
155
156 let bars: Vec<Data> = self
157 .ohlc_buffer
158 .drain()
159 .map(|(_, (bar, _))| Data::Bar(bar))
160 .collect();
161
162 if !bars.is_empty() {
163 log::debug!("Flushing {} buffered OHLC bars on stream end", bars.len());
164 self.pending_messages
165 .push_back(NautilusWsMessage::Data(bars));
166 }
167 }
168
169 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
171 if let Some(msg) = self.pending_messages.pop_front() {
173 return Some(msg);
174 }
175
176 loop {
177 tokio::select! {
178 Some(cmd) = self.cmd_rx.recv() => {
179 match cmd {
180 SpotHandlerCommand::SetClient(client) => {
181 log::debug!("WebSocketClient received by handler");
182 self.client = Some(client);
183 }
184 SpotHandlerCommand::Disconnect => {
185 log::debug!("Disconnect command received");
186 if let Some(client) = self.client.take() {
187 client.disconnect().await;
188 }
189 }
190 SpotHandlerCommand::SendText { payload } => {
191 if let Some(client) = &self.client
192 && let Err(e) = client.send_text(payload.clone(), None).await
193 {
194 log::error!("Failed to send text: {e}");
195 }
196 }
197 SpotHandlerCommand::InitializeInstruments(instruments) => {
198 for inst in instruments {
199 self.instruments_cache.insert(inst.symbol().inner(), inst);
202 }
203 }
204 SpotHandlerCommand::UpdateInstrument(inst) => {
205 self.instruments_cache.insert(inst.symbol().inner(), inst);
206 }
207 SpotHandlerCommand::SetAccountId(account_id) => {
208 log::debug!("Account ID set for execution reports: {account_id}");
209 self.account_id = Some(account_id);
210 }
211 SpotHandlerCommand::CacheClientOrder {
212 client_order_id,
213 instrument_id,
214 trader_id,
215 strategy_id,
216 } => {
217 log::debug!(
218 "Cached client order info: \
219 client_order_id={client_order_id}, instrument_id={instrument_id}"
220 );
221 self.client_order_cache.insert(
222 client_order_id,
223 CachedOrderInfo {
224 instrument_id,
225 trader_id,
226 strategy_id,
227 },
228 );
229 }
230 }
231 continue;
232 }
233
234 msg = self.raw_rx.recv() => {
235 let msg = match msg {
236 Some(msg) => msg,
237 None => {
238 log::debug!("WebSocket stream closed");
239 self.flush_ohlc_buffer();
240 return self.pending_messages.pop_front();
241 }
242 };
243
244 if let Message::Ping(data) = &msg {
245 log::trace!("Received ping frame with {} bytes", data.len());
246 if let Some(client) = &self.client
247 && let Err(e) = client.send_pong(data.to_vec()).await
248 {
249 log::warn!("Failed to send pong frame: {e}");
250 }
251 continue;
252 }
253
254 if self.signal.load(Ordering::Relaxed) {
255 log::debug!("Stop signal received");
256 self.flush_ohlc_buffer();
257 return self.pending_messages.pop_front();
258 }
259
260 let text = match msg {
261 Message::Text(text) => text.to_string(),
262 Message::Binary(data) => {
263 match String::from_utf8(data.to_vec()) {
264 Ok(text) => text,
265 Err(e) => {
266 log::warn!("Failed to decode binary message: {e}");
267 continue;
268 }
269 }
270 }
271 Message::Pong(_) => {
272 log::trace!("Received pong");
273 continue;
274 }
275 Message::Close(_) => {
276 log::info!("WebSocket connection closed");
277 self.flush_ohlc_buffer();
278 return self.pending_messages.pop_front();
279 }
280 Message::Frame(_) => {
281 log::trace!("Received raw frame");
282 continue;
283 }
284 _ => continue,
285 };
286
287 if text == RECONNECTED {
288 log::info!("Received WebSocket reconnected signal");
289 return Some(NautilusWsMessage::Reconnected);
290 }
291
292 let ts_init = self.clock.get_time_ns();
293
294 if let Some(nautilus_msg) = self.parse_message(&text, ts_init) {
295 return Some(nautilus_msg);
296 }
297
298 continue;
299 }
300 }
301 }
302 }
303
304 fn parse_message(&mut self, text: &str, ts_init: UnixNanos) -> Option<NautilusWsMessage> {
305 if text.len() < 50 && text.starts_with("{\"channel\":\"") {
308 if text.contains("heartbeat") {
309 log::trace!("Received heartbeat");
310 return None;
311 }
312 if text.contains("status") {
313 log::debug!("Received status message");
314 return None;
315 }
316 }
317
318 let value: Value = match serde_json::from_str(text) {
319 Ok(v) => v,
320 Err(e) => {
321 log::warn!("Failed to parse message: {e}");
322 return None;
323 }
324 };
325
326 if value.get("method").is_some() {
328 self.handle_control_message(value);
329 return None;
330 }
331
332 if value.get("channel").is_some() && value.get("data").is_some() {
334 match serde_json::from_value::<KrakenWsMessage>(value) {
335 Ok(msg) => return self.handle_data_message(msg, ts_init),
336 Err(e) => {
337 log::debug!("Failed to parse data message: {e}");
338 return None;
339 }
340 }
341 }
342
343 log::debug!("Unhandled message structure: {text}");
344 None
345 }
346
347 fn handle_control_message(&self, value: Value) {
348 match serde_json::from_value::<KrakenWsResponse>(value) {
349 Ok(response) => match response {
350 KrakenWsResponse::Subscribe(sub) => {
351 if sub.success {
352 if let Some(result) = &sub.result {
353 log::debug!(
354 "Subscription confirmed: channel={:?}, req_id={:?}",
355 result.channel,
356 sub.req_id
357 );
358 } else {
359 log::debug!("Subscription confirmed: req_id={:?}", sub.req_id);
360 }
361 } else {
362 log::warn!(
363 "Subscription failed: error={:?}, req_id={:?}",
364 sub.error,
365 sub.req_id
366 );
367 }
368 }
369 KrakenWsResponse::Unsubscribe(unsub) => {
370 if unsub.success {
371 log::debug!("Unsubscription confirmed: req_id={:?}", unsub.req_id);
372 } else {
373 log::warn!(
374 "Unsubscription failed: error={:?}, req_id={:?}",
375 unsub.error,
376 unsub.req_id
377 );
378 }
379 }
380 KrakenWsResponse::Pong(pong) => {
381 log::trace!("Received pong: req_id={:?}", pong.req_id);
382 }
383 KrakenWsResponse::Other => {
384 log::debug!("Received unknown control response");
385 }
386 },
387 Err(_) => {
388 log::debug!("Received control message (failed to parse details)");
389 }
390 }
391 }
392
393 fn handle_data_message(
394 &mut self,
395 msg: KrakenWsMessage,
396 ts_init: UnixNanos,
397 ) -> Option<NautilusWsMessage> {
398 match msg.channel {
399 KrakenWsChannel::Book => self.handle_book_message(msg, ts_init),
400 KrakenWsChannel::Ticker => self.handle_ticker_message(msg, ts_init),
401 KrakenWsChannel::Trade => self.handle_trade_message(msg, ts_init),
402 KrakenWsChannel::Ohlc => self.handle_ohlc_message(msg, ts_init),
403 KrakenWsChannel::Executions => self.handle_executions_message(msg, ts_init),
404 _ => {
405 log::warn!("Unhandled channel: {:?}", msg.channel);
406 None
407 }
408 }
409 }
410
411 fn handle_book_message(
412 &mut self,
413 msg: KrakenWsMessage,
414 ts_init: UnixNanos,
415 ) -> Option<NautilusWsMessage> {
416 let mut all_deltas = Vec::new();
417 let mut instrument_id = None;
418
419 for data in msg.data {
420 match serde_json::from_value::<KrakenWsBookData>(data) {
421 Ok(book_data) => {
422 let symbol = &book_data.symbol;
423
424 if !self.is_subscribed(&format!("book:{symbol}")) {
425 continue;
426 }
427
428 let instrument = self.get_instrument(symbol)?;
429 instrument_id = Some(instrument.id());
430
431 match parse_book_deltas(&book_data, &instrument, self.book_sequence, ts_init) {
432 Ok(mut deltas) => {
433 self.book_sequence += deltas.len() as u64;
434 all_deltas.append(&mut deltas);
435 }
436 Err(e) => {
437 log::error!("Failed to parse book deltas: {e}");
438 }
439 }
440 }
441 Err(e) => {
442 log::error!("Failed to deserialize book data: {e}");
443 }
444 }
445 }
446
447 if all_deltas.is_empty() {
448 None
449 } else {
450 let deltas = OrderBookDeltas::new(instrument_id?, all_deltas);
451 Some(NautilusWsMessage::Deltas(deltas))
452 }
453 }
454
455 fn handle_ticker_message(
456 &self,
457 msg: KrakenWsMessage,
458 ts_init: UnixNanos,
459 ) -> Option<NautilusWsMessage> {
460 let mut quotes = Vec::new();
461
462 for data in msg.data {
463 match serde_json::from_value::<KrakenWsTickerData>(data) {
464 Ok(ticker_data) => {
465 let symbol = &ticker_data.symbol;
466
467 let quotes_key = format!("quotes:{symbol}");
470 let ticker_key = format!("ticker:{symbol}");
471 if !self.is_subscribed("es_key) && !self.is_subscribed(&ticker_key) {
472 continue;
473 }
474
475 let instrument = self.get_instrument(symbol)?;
476
477 match parse_quote_tick(&ticker_data, &instrument, ts_init) {
478 Ok(quote) => quotes.push(Data::Quote(quote)),
479 Err(e) => {
480 log::error!("Failed to parse quote tick: {e}");
481 }
482 }
483 }
484 Err(e) => {
485 log::error!("Failed to deserialize ticker data: {e}");
486 }
487 }
488 }
489
490 if quotes.is_empty() {
491 None
492 } else {
493 Some(NautilusWsMessage::Data(quotes))
494 }
495 }
496
497 fn handle_trade_message(
498 &self,
499 msg: KrakenWsMessage,
500 ts_init: UnixNanos,
501 ) -> Option<NautilusWsMessage> {
502 let mut trades = Vec::new();
503
504 for data in msg.data {
505 match serde_json::from_value::<KrakenWsTradeData>(data) {
506 Ok(trade_data) => {
507 let instrument = self.get_instrument(&trade_data.symbol)?;
508
509 match parse_trade_tick(&trade_data, &instrument, ts_init) {
510 Ok(trade) => trades.push(Data::Trade(trade)),
511 Err(e) => {
512 log::error!("Failed to parse trade tick: {e}");
513 }
514 }
515 }
516 Err(e) => {
517 log::error!("Failed to deserialize trade data: {e}");
518 }
519 }
520 }
521
522 if trades.is_empty() {
523 None
524 } else {
525 Some(NautilusWsMessage::Data(trades))
526 }
527 }
528
529 fn handle_ohlc_message(
530 &mut self,
531 msg: KrakenWsMessage,
532 ts_init: UnixNanos,
533 ) -> Option<NautilusWsMessage> {
534 let mut closed_bars = Vec::new();
535
536 for data in msg.data {
537 match serde_json::from_value::<KrakenWsOhlcData>(data) {
538 Ok(ohlc_data) => {
539 let instrument = self.get_instrument(&ohlc_data.symbol)?;
540
541 match parse_ws_bar(&ohlc_data, &instrument, ts_init) {
542 Ok(new_bar) => {
543 let key = (ohlc_data.symbol, ohlc_data.interval);
544 let new_interval_begin = UnixNanos::from(
545 ohlc_data.interval_begin.timestamp_nanos_opt().unwrap_or(0) as u64,
546 );
547
548 if let Some((buffered_bar, buffered_interval_begin)) =
550 self.ohlc_buffer.get(&key)
551 {
552 if new_interval_begin != *buffered_interval_begin {
554 closed_bars.push(Data::Bar(*buffered_bar));
555 }
556 }
557
558 self.ohlc_buffer.insert(key, (new_bar, new_interval_begin));
560 }
561 Err(e) => {
562 log::error!("Failed to parse bar: {e}");
563 }
564 }
565 }
566 Err(e) => {
567 log::error!("Failed to deserialize OHLC data: {e}");
568 }
569 }
570 }
571
572 if closed_bars.is_empty() {
573 None
574 } else {
575 Some(NautilusWsMessage::Data(closed_bars))
576 }
577 }
578
579 fn handle_executions_message(
580 &mut self,
581 msg: KrakenWsMessage,
582 ts_init: UnixNanos,
583 ) -> Option<NautilusWsMessage> {
584 let Some(account_id) = self.account_id else {
585 log::warn!("Cannot process execution message: account_id not set");
586 return None;
587 };
588
589 for data in msg.data {
591 match serde_json::from_value::<KrakenWsExecutionData>(data) {
592 Ok(exec_data) => {
593 log::debug!(
594 "Received execution message: exec_type={:?}, order_id={}, \
595 order_status={:?}, order_qty={:?}, cum_qty={:?}, last_qty={:?}",
596 exec_data.exec_type,
597 exec_data.order_id,
598 exec_data.order_status,
599 exec_data.order_qty,
600 exec_data.cum_qty,
601 exec_data.last_qty
602 );
603
604 if let Some(qty) = exec_data.order_qty {
606 self.order_qty_cache
607 .insert(VenueOrderId::new(&exec_data.order_id), qty);
608 }
609
610 let (instrument, cached_info) = if let Some(ref symbol) = exec_data.symbol {
612 let symbol_ustr = Ustr::from(symbol.as_str());
613 let inst = self.instruments_cache.get(&symbol_ustr).cloned();
614 if inst.is_none() {
615 log::warn!(
616 "No instrument found for symbol: symbol={symbol}, order_id={}",
617 exec_data.order_id
618 );
619 }
620 let cached = exec_data
621 .cl_ord_id
622 .as_ref()
623 .filter(|id| !id.is_empty())
624 .and_then(|id| {
625 self.client_order_cache
626 .get(&ClientOrderId::new(id))
627 .cloned()
628 });
629 (inst, cached)
630 } else if let Some(ref cl_ord_id) =
631 exec_data.cl_ord_id.as_ref().filter(|id| !id.is_empty())
632 {
633 let cached = self
634 .client_order_cache
635 .get(&ClientOrderId::new(cl_ord_id))
636 .cloned();
637 let inst = cached.as_ref().and_then(|info| {
638 self.instruments_cache
639 .iter()
640 .find(|(_, inst)| inst.id() == info.instrument_id)
641 .map(|(_, inst)| inst.clone())
642 });
643 (inst, cached)
644 } else {
645 (None, None)
646 };
647
648 let Some(instrument) = instrument else {
649 log::debug!(
650 "Execution missing symbol and order not in cache (external order): \
651 order_id={}, cl_ord_id={:?}, exec_type={:?}",
652 exec_data.order_id,
653 exec_data.cl_ord_id,
654 exec_data.exec_type
655 );
656 continue;
657 };
658
659 let cached_order_qty = self
660 .order_qty_cache
661 .get(&VenueOrderId::new(&exec_data.order_id))
662 .copied();
663 let ts_event = chrono::DateTime::parse_from_rfc3339(&exec_data.timestamp)
664 .map_or(ts_init, |t| {
665 UnixNanos::from(t.timestamp_nanos_opt().unwrap_or(0) as u64)
666 });
667
668 if let Some(ref info) = cached_info {
671 let client_order_id = exec_data
672 .cl_ord_id
673 .as_ref()
674 .map(ClientOrderId::new)
675 .expect("cl_ord_id should exist if cached");
676 let venue_order_id = VenueOrderId::new(&exec_data.order_id);
677
678 match exec_data.exec_type {
679 KrakenExecType::PendingNew => {
680 let accepted = OrderAccepted::new(
682 info.trader_id,
683 info.strategy_id,
684 instrument.id(),
685 client_order_id,
686 venue_order_id,
687 account_id,
688 UUID4::new(),
689 ts_event,
690 ts_init,
691 false,
692 );
693 self.pending_messages
694 .push_back(NautilusWsMessage::OrderAccepted(accepted));
695 }
696 KrakenExecType::New => {
697 }
699 KrakenExecType::Canceled => {
700 let is_post_only_rejection = exec_data
703 .reason
704 .as_ref()
705 .is_some_and(|r| r.eq_ignore_ascii_case("Post only order"));
706
707 if is_post_only_rejection {
708 let reason = exec_data
709 .reason
710 .as_deref()
711 .unwrap_or("Post-only order would have crossed");
712 let rejected = OrderRejected::new(
713 info.trader_id,
714 info.strategy_id,
715 instrument.id(),
716 client_order_id,
717 account_id,
718 Ustr::from(reason),
719 UUID4::new(),
720 ts_event,
721 ts_init,
722 false,
723 true, );
725 self.pending_messages
726 .push_back(NautilusWsMessage::OrderRejected(rejected));
727 } else {
728 let canceled = OrderCanceled::new(
729 info.trader_id,
730 info.strategy_id,
731 instrument.id(),
732 client_order_id,
733 UUID4::new(),
734 ts_event,
735 ts_init,
736 false,
737 Some(venue_order_id),
738 Some(account_id),
739 );
740 self.pending_messages
741 .push_back(NautilusWsMessage::OrderCanceled(canceled));
742 }
743 }
744 KrakenExecType::Expired => {
745 let expired = OrderExpired::new(
746 info.trader_id,
747 info.strategy_id,
748 instrument.id(),
749 client_order_id,
750 UUID4::new(),
751 ts_event,
752 ts_init,
753 false,
754 Some(venue_order_id),
755 Some(account_id),
756 );
757 self.pending_messages
758 .push_back(NautilusWsMessage::OrderExpired(expired));
759 }
760 KrakenExecType::Amended | KrakenExecType::Restated => {
761 if let Some(order_qty) = exec_data.order_qty.or(cached_order_qty) {
763 let updated = OrderUpdated::new(
764 info.trader_id,
765 info.strategy_id,
766 instrument.id(),
767 client_order_id,
768 Quantity::new(order_qty, instrument.size_precision()),
769 UUID4::new(),
770 ts_event,
771 ts_init,
772 false,
773 Some(venue_order_id),
774 Some(account_id),
775 None, None, None, );
779 self.pending_messages
780 .push_back(NautilusWsMessage::OrderUpdated(updated));
781 }
782 }
783 KrakenExecType::Trade | KrakenExecType::Filled => {
784 let has_complete_trade_data =
786 exec_data.last_qty.is_some_and(|q| q > 0.0)
787 && exec_data.last_price.is_some_and(|p| p > 0.0);
788
789 if let Ok(status_report) = parse_ws_order_status_report(
790 &exec_data,
791 &instrument,
792 account_id,
793 cached_order_qty,
794 ts_init,
795 ) {
796 self.pending_messages.push_back(
797 NautilusWsMessage::OrderStatusReport(Box::new(
798 status_report,
799 )),
800 );
801 }
802
803 if has_complete_trade_data
804 && let Ok(fill_report) = parse_ws_fill_report(
805 &exec_data,
806 &instrument,
807 account_id,
808 ts_init,
809 )
810 {
811 self.pending_messages
812 .push_back(NautilusWsMessage::FillReport(Box::new(
813 fill_report,
814 )));
815 }
816 }
817 KrakenExecType::IcebergRefill => {
818 if let Some(order_qty) = exec_data.order_qty.or(cached_order_qty) {
820 let updated = OrderUpdated::new(
821 info.trader_id,
822 info.strategy_id,
823 instrument.id(),
824 client_order_id,
825 Quantity::new(order_qty, instrument.size_precision()),
826 UUID4::new(),
827 ts_event,
828 ts_init,
829 false,
830 Some(venue_order_id),
831 Some(account_id),
832 None,
833 None,
834 None,
835 );
836 self.pending_messages
837 .push_back(NautilusWsMessage::OrderUpdated(updated));
838 }
839 }
840 KrakenExecType::Status => {
841 if let Ok(status_report) = parse_ws_order_status_report(
843 &exec_data,
844 &instrument,
845 account_id,
846 cached_order_qty,
847 ts_init,
848 ) {
849 self.pending_messages.push_back(
850 NautilusWsMessage::OrderStatusReport(Box::new(
851 status_report,
852 )),
853 );
854 }
855 }
856 }
857 } else {
858 if exec_data.exec_type == KrakenExecType::Trade
860 || exec_data.exec_type == KrakenExecType::Filled
861 {
862 let has_order_data = exec_data.order_qty.is_some()
863 || cached_order_qty.is_some()
864 || exec_data.cum_qty.is_some();
865
866 let has_complete_trade_data =
867 exec_data.last_qty.is_some_and(|q| q > 0.0)
868 && exec_data.last_price.is_some_and(|p| p > 0.0);
869
870 if has_order_data
871 && let Ok(status_report) = parse_ws_order_status_report(
872 &exec_data,
873 &instrument,
874 account_id,
875 cached_order_qty,
876 ts_init,
877 )
878 {
879 self.pending_messages.push_back(
880 NautilusWsMessage::OrderStatusReport(Box::new(status_report)),
881 );
882 }
883
884 if has_complete_trade_data
885 && let Ok(fill_report) = parse_ws_fill_report(
886 &exec_data,
887 &instrument,
888 account_id,
889 ts_init,
890 )
891 {
892 self.pending_messages
893 .push_back(NautilusWsMessage::FillReport(Box::new(
894 fill_report,
895 )));
896 }
897 } else if let Ok(report) = parse_ws_order_status_report(
898 &exec_data,
899 &instrument,
900 account_id,
901 cached_order_qty,
902 ts_init,
903 ) {
904 self.pending_messages
905 .push_back(NautilusWsMessage::OrderStatusReport(Box::new(report)));
906 }
907 }
908 }
909 Err(e) => {
910 log::error!("Failed to deserialize execution data: {e}");
911 }
912 }
913 }
914
915 self.pending_messages.pop_front()
917 }
918}
919
920#[cfg(test)]
921mod tests {
922 use nautilus_model::{
923 identifiers::{InstrumentId, Symbol, Venue},
924 instruments::{InstrumentAny, currency_pair::CurrencyPair},
925 types::{Currency, Price, Quantity},
926 };
927 use rstest::rstest;
928
929 use super::*;
930
931 fn create_test_handler() -> SpotFeedHandler {
932 let signal = Arc::new(AtomicBool::new(false));
933 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
934 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
935 let subscriptions = SubscriptionState::new(':');
936
937 SpotFeedHandler::new(signal, cmd_rx, raw_rx, subscriptions)
938 }
939
940 fn create_test_instrument(symbol: &str) -> InstrumentAny {
941 let instrument_id = InstrumentId::new(Symbol::new(symbol), Venue::new("KRAKEN"));
942 InstrumentAny::CurrencyPair(CurrencyPair::new(
943 instrument_id,
944 Symbol::new(symbol),
945 Currency::BTC(),
946 Currency::USD(),
947 2,
948 8,
949 Price::from("0.01"),
950 Quantity::from("0.00000001"),
951 None,
952 None,
953 None,
954 None,
955 None,
956 None,
957 None,
958 None,
959 None,
960 None,
961 None,
962 None,
963 UnixNanos::default(),
964 UnixNanos::default(),
965 ))
966 }
967
968 #[rstest]
969 fn test_ticker_message_filtered_without_quotes_subscription() {
970 let mut handler = create_test_handler();
971 let instrument = create_test_instrument("BTC/USD");
972 handler
973 .instruments_cache
974 .insert(Ustr::from("BTC/USD"), instrument);
975
976 let json = r#"{
977 "channel": "ticker",
978 "type": "snapshot",
979 "data": [{
980 "symbol": "BTC/USD",
981 "bid": 105944.20,
982 "bid_qty": 2.5,
983 "ask": 105944.30,
984 "ask_qty": 3.2,
985 "last": 105899.40,
986 "volume": 163.28908096,
987 "vwap": 105904.39279,
988 "low": 104711.00,
989 "high": 106613.10,
990 "change": 250.00,
991 "change_pct": 0.24
992 }]
993 }"#;
994
995 let ts_init = UnixNanos::from(1_000_000_000);
996 let result = handler.parse_message(json, ts_init);
997
998 assert!(
999 result.is_none(),
1000 "Ticker message should be filtered when no quotes subscription exists"
1001 );
1002 }
1003
1004 #[rstest]
1005 fn test_ticker_message_passes_with_quotes_subscription() {
1006 let mut handler = create_test_handler();
1007 let instrument = create_test_instrument("BTC/USD");
1008 handler
1009 .instruments_cache
1010 .insert(Ustr::from("BTC/USD"), instrument);
1011
1012 handler.subscriptions.mark_subscribe("quotes:BTC/USD");
1013 handler.subscriptions.confirm_subscribe("quotes:BTC/USD");
1014
1015 let json = r#"{
1016 "channel": "ticker",
1017 "type": "snapshot",
1018 "data": [{
1019 "symbol": "BTC/USD",
1020 "bid": 105944.20,
1021 "bid_qty": 2.5,
1022 "ask": 105944.30,
1023 "ask_qty": 3.2,
1024 "last": 105899.40,
1025 "volume": 163.28908096,
1026 "vwap": 105904.39279,
1027 "low": 104711.00,
1028 "high": 106613.10,
1029 "change": 250.00,
1030 "change_pct": 0.24
1031 }]
1032 }"#;
1033
1034 let ts_init = UnixNanos::from(1_000_000_000);
1035 let result = handler.parse_message(json, ts_init);
1036
1037 assert!(
1038 result.is_some(),
1039 "Ticker message should pass with quotes subscription"
1040 );
1041 match result.unwrap() {
1042 NautilusWsMessage::Data(data) => {
1043 assert!(!data.is_empty(), "Should have quote data");
1044 }
1045 _ => panic!("Expected Data message with quote"),
1046 }
1047 }
1048
1049 #[rstest]
1050 fn test_ticker_message_passes_with_ticker_subscription() {
1051 let mut handler = create_test_handler();
1052 let instrument = create_test_instrument("BTC/USD");
1053 handler
1054 .instruments_cache
1055 .insert(Ustr::from("BTC/USD"), instrument);
1056
1057 handler.subscriptions.mark_subscribe("ticker:BTC/USD");
1059 handler.subscriptions.confirm_subscribe("ticker:BTC/USD");
1060
1061 let json = r#"{
1062 "channel": "ticker",
1063 "type": "snapshot",
1064 "data": [{
1065 "symbol": "BTC/USD",
1066 "bid": 105944.20,
1067 "bid_qty": 2.5,
1068 "ask": 105944.30,
1069 "ask_qty": 3.2,
1070 "last": 105899.40,
1071 "volume": 163.28908096,
1072 "vwap": 105904.39279,
1073 "low": 104711.00,
1074 "high": 106613.10,
1075 "change": 250.00,
1076 "change_pct": 0.24
1077 }]
1078 }"#;
1079
1080 let ts_init = UnixNanos::from(1_000_000_000);
1081 let result = handler.parse_message(json, ts_init);
1082
1083 assert!(
1084 result.is_some(),
1085 "Ticker message should pass with ticker: subscription"
1086 );
1087 match result.unwrap() {
1088 NautilusWsMessage::Data(data) => {
1089 assert!(!data.is_empty(), "Should have quote data");
1090 }
1091 _ => panic!("Expected Data message with quote"),
1092 }
1093 }
1094
1095 #[rstest]
1096 fn test_book_message_filtered_without_book_subscription() {
1097 let mut handler = create_test_handler();
1098 let instrument = create_test_instrument("BTC/USD");
1099 handler
1100 .instruments_cache
1101 .insert(Ustr::from("BTC/USD"), instrument);
1102
1103 let json = r#"{
1104 "channel": "book",
1105 "type": "snapshot",
1106 "data": [{
1107 "symbol": "BTC/USD",
1108 "bids": [{"price": 105944.20, "qty": 2.5}],
1109 "asks": [{"price": 105944.30, "qty": 3.2}],
1110 "checksum": 12345
1111 }]
1112 }"#;
1113
1114 let ts_init = UnixNanos::from(1_000_000_000);
1115 let result = handler.parse_message(json, ts_init);
1116
1117 assert!(
1118 result.is_none(),
1119 "Book message should be filtered when no book subscription exists"
1120 );
1121 }
1122
1123 #[rstest]
1124 fn test_book_message_passes_with_book_subscription() {
1125 let mut handler = create_test_handler();
1126 let instrument = create_test_instrument("BTC/USD");
1127 handler
1128 .instruments_cache
1129 .insert(Ustr::from("BTC/USD"), instrument);
1130
1131 handler.subscriptions.mark_subscribe("book:BTC/USD");
1132 handler.subscriptions.confirm_subscribe("book:BTC/USD");
1133
1134 let json = r#"{
1135 "channel": "book",
1136 "type": "snapshot",
1137 "data": [{
1138 "symbol": "BTC/USD",
1139 "bids": [{"price": 105944.20, "qty": 2.5}],
1140 "asks": [{"price": 105944.30, "qty": 3.2}],
1141 "checksum": 12345
1142 }]
1143 }"#;
1144
1145 let ts_init = UnixNanos::from(1_000_000_000);
1146 let result = handler.parse_message(json, ts_init);
1147
1148 assert!(
1149 result.is_some(),
1150 "Book message should pass with book subscription"
1151 );
1152 match result.unwrap() {
1153 NautilusWsMessage::Deltas(_) => {}
1154 _ => panic!("Expected Deltas message"),
1155 }
1156 }
1157
1158 #[rstest]
1159 fn test_quotes_and_book_subscriptions_independent() {
1160 let mut handler = create_test_handler();
1161 let instrument = create_test_instrument("BTC/USD");
1162 handler
1163 .instruments_cache
1164 .insert(Ustr::from("BTC/USD"), instrument);
1165
1166 handler.subscriptions.mark_subscribe("quotes:BTC/USD");
1167 handler.subscriptions.confirm_subscribe("quotes:BTC/USD");
1168
1169 let book_json = r#"{
1170 "channel": "book",
1171 "type": "snapshot",
1172 "data": [{
1173 "symbol": "BTC/USD",
1174 "bids": [{"price": 105944.20, "qty": 2.5}],
1175 "asks": [{"price": 105944.30, "qty": 3.2}],
1176 "checksum": 12345
1177 }]
1178 }"#;
1179
1180 let ts_init = UnixNanos::from(1_000_000_000);
1181 let book_result = handler.parse_message(book_json, ts_init);
1182 assert!(
1183 book_result.is_none(),
1184 "Book message should be filtered without book: subscription"
1185 );
1186
1187 let ticker_json = r#"{
1188 "channel": "ticker",
1189 "type": "snapshot",
1190 "data": [{
1191 "symbol": "BTC/USD",
1192 "bid": 105944.20,
1193 "bid_qty": 2.5,
1194 "ask": 105944.30,
1195 "ask_qty": 3.2,
1196 "last": 105899.40,
1197 "volume": 163.28908096,
1198 "vwap": 105904.39279,
1199 "low": 104711.00,
1200 "high": 106613.10,
1201 "change": 250.00,
1202 "change_pct": 0.24
1203 }]
1204 }"#;
1205
1206 let ticker_result = handler.parse_message(ticker_json, ts_init);
1207 assert!(
1208 ticker_result.is_some(),
1209 "Ticker should pass with quotes subscription"
1210 );
1211 }
1212}