nautilus_kraken/websocket/
handler.rs1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use nautilus_core::{AtomicTime, UnixNanos, time::get_atomic_clock_realtime};
25use nautilus_model::{
26 data::Data,
27 instruments::{Instrument, InstrumentAny},
28};
29use nautilus_network::websocket::WebSocketClient;
30use serde_json::Value;
31use tokio_tungstenite::tungstenite::Message;
32use ustr::Ustr;
33
34use super::{
35 enums::KrakenWsChannel,
36 messages::{
37 KrakenWsBookData, KrakenWsMessage, KrakenWsResponse, KrakenWsTickerData, KrakenWsTradeData,
38 NautilusWsMessage,
39 },
40 parse::{parse_book_deltas, parse_quote_tick, parse_trade_tick},
41};
42
43#[derive(Debug)]
45#[allow(
46 clippy::large_enum_variant,
47 reason = "Commands are ephemeral and immediately consumed"
48)]
49pub enum HandlerCommand {
50 SetClient(WebSocketClient),
52 Disconnect,
54 SendText { payload: String },
56 InitializeInstruments(Vec<InstrumentAny>),
58 UpdateInstrument(InstrumentAny),
60}
61
62pub(super) struct FeedHandler {
64 clock: &'static AtomicTime,
65 signal: Arc<AtomicBool>,
66 client: Option<WebSocketClient>,
67 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
68 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
69 instruments_cache: AHashMap<Ustr, InstrumentAny>,
70 book_sequence: u64,
71}
72
73impl FeedHandler {
74 pub(super) fn new(
76 signal: Arc<AtomicBool>,
77 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
78 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
79 ) -> Self {
80 Self {
81 clock: get_atomic_clock_realtime(),
82 signal,
83 client: None,
84 cmd_rx,
85 raw_rx,
86 instruments_cache: AHashMap::new(),
87 book_sequence: 0,
88 }
89 }
90
91 pub(super) fn is_stopped(&self) -> bool {
92 self.signal.load(Ordering::Relaxed)
93 }
94
95 fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
96 self.instruments_cache.get(symbol).cloned()
97 }
98
99 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
101 loop {
102 tokio::select! {
103 Some(cmd) = self.cmd_rx.recv() => {
104 match cmd {
105 HandlerCommand::SetClient(client) => {
106 tracing::debug!("WebSocketClient received by handler");
107 self.client = Some(client);
108 }
109 HandlerCommand::Disconnect => {
110 tracing::debug!("Disconnect command received");
111 if let Some(client) = self.client.take() {
112 client.disconnect().await;
113 }
114 }
115 HandlerCommand::SendText { payload } => {
116 if let Some(client) = &self.client
117 && let Err(e) = client.send_text(payload.clone(), None).await
118 {
119 tracing::error!(error = %e, "Failed to send text");
120 }
121 }
122 HandlerCommand::InitializeInstruments(instruments) => {
123 for inst in instruments {
124 self.instruments_cache.insert(inst.symbol().inner(), inst);
125 }
126 }
127 HandlerCommand::UpdateInstrument(inst) => {
128 self.instruments_cache.insert(inst.symbol().inner(), inst);
129 }
130 }
131 continue;
132 }
133
134 msg = self.raw_rx.recv() => {
135 let msg = match msg {
136 Some(msg) => msg,
137 None => {
138 tracing::debug!("WebSocket stream closed");
139 return None;
140 }
141 };
142
143 if let Message::Ping(data) = &msg {
144 tracing::trace!("Received ping frame with {} bytes", data.len());
145 if let Some(client) = &self.client
146 && let Err(e) = client.send_pong(data.to_vec()).await
147 {
148 tracing::warn!(error = %e, "Failed to send pong frame");
149 }
150 continue;
151 }
152
153 if self.signal.load(Ordering::Relaxed) {
154 tracing::debug!("Stop signal received");
155 return None;
156 }
157
158 let text = match msg {
159 Message::Text(text) => text.to_string(),
160 Message::Binary(data) => {
161 match String::from_utf8(data.to_vec()) {
162 Ok(text) => text,
163 Err(e) => {
164 tracing::warn!("Failed to decode binary message: {e}");
165 continue;
166 }
167 }
168 }
169 Message::Pong(_) => {
170 tracing::trace!("Received pong");
171 continue;
172 }
173 Message::Close(_) => {
174 tracing::info!("WebSocket connection closed");
175 return None;
176 }
177 Message::Frame(_) => {
178 tracing::trace!("Received raw frame");
179 continue;
180 }
181 _ => continue,
182 };
183
184 let ts_init = self.clock.get_time_ns();
185
186 if let Some(nautilus_msg) = self.parse_message(&text, ts_init) {
187 return Some(nautilus_msg);
188 }
189
190 continue;
191 }
192 }
193 }
194 }
195
196 fn parse_message(&mut self, text: &str, ts_init: UnixNanos) -> Option<NautilusWsMessage> {
197 if let Ok(msg) = serde_json::from_str::<KrakenWsMessage>(text) {
199 return self.handle_data_message(msg, ts_init);
200 }
201
202 if let Ok(value) = serde_json::from_str::<Value>(text) {
204 if value.get("channel").and_then(|v| v.as_str()) == Some("heartbeat") {
205 tracing::trace!("Received heartbeat");
206 return None;
207 }
208
209 if value.get("channel").and_then(|v| v.as_str()) == Some("status") {
210 tracing::debug!("Received status message");
211 return None;
212 }
213
214 if value.get("method").is_some() {
215 if let Ok(response) = serde_json::from_value::<KrakenWsResponse>(value) {
216 match response {
217 KrakenWsResponse::Subscribe(sub) => {
218 if sub.success {
219 if let Some(result) = &sub.result {
220 tracing::debug!(
221 channel = ?result.channel,
222 req_id = ?sub.req_id,
223 "Subscription confirmed"
224 );
225 } else {
226 tracing::debug!(req_id = ?sub.req_id, "Subscription confirmed");
227 }
228 } else {
229 tracing::warn!(
230 error = ?sub.error,
231 req_id = ?sub.req_id,
232 "Subscription failed"
233 );
234 }
235 }
236 KrakenWsResponse::Unsubscribe(unsub) => {
237 if unsub.success {
238 tracing::debug!(req_id = ?unsub.req_id, "Unsubscription confirmed");
239 } else {
240 tracing::warn!(
241 error = ?unsub.error,
242 req_id = ?unsub.req_id,
243 "Unsubscription failed"
244 );
245 }
246 }
247 KrakenWsResponse::Pong(pong) => {
248 tracing::trace!(req_id = ?pong.req_id, "Received pong");
249 }
250 KrakenWsResponse::Other => {
251 tracing::debug!("Received unknown subscription response");
252 }
253 }
254 } else {
255 tracing::debug!("Received subscription response (failed to parse details)");
256 }
257 return None;
258 }
259 }
260
261 tracing::warn!("Failed to parse message: {text}");
262 None
263 }
264
265 fn handle_data_message(
266 &mut self,
267 msg: KrakenWsMessage,
268 ts_init: UnixNanos,
269 ) -> Option<NautilusWsMessage> {
270 match msg.channel {
271 KrakenWsChannel::Book => self.handle_book_message(msg, ts_init),
272 KrakenWsChannel::Ticker => self.handle_ticker_message(msg, ts_init),
273 KrakenWsChannel::Trade => self.handle_trade_message(msg, ts_init),
274 KrakenWsChannel::Ohlc => self.handle_ohlc_message(msg, ts_init),
275 _ => {
276 tracing::warn!("Unhandled channel: {:?}", msg.channel);
277 None
278 }
279 }
280 }
281
282 fn handle_book_message(
283 &mut self,
284 msg: KrakenWsMessage,
285 ts_init: UnixNanos,
286 ) -> Option<NautilusWsMessage> {
287 let mut all_deltas = Vec::new();
288 let mut instrument_id = None;
289
290 for data in msg.data {
291 match serde_json::from_value::<KrakenWsBookData>(data) {
292 Ok(book_data) => {
293 let instrument = self.get_instrument(&book_data.symbol)?;
294 instrument_id = Some(instrument.id());
295
296 match parse_book_deltas(&book_data, &instrument, self.book_sequence, ts_init) {
297 Ok(mut deltas) => {
298 self.book_sequence += deltas.len() as u64;
299 all_deltas.append(&mut deltas);
300 }
301 Err(e) => {
302 tracing::error!("Failed to parse book deltas: {e}");
303 }
304 }
305 }
306 Err(e) => {
307 tracing::error!("Failed to deserialize book data: {e}");
308 }
309 }
310 }
311
312 if all_deltas.is_empty() {
313 None
314 } else {
315 use nautilus_model::data::OrderBookDeltas;
316 let deltas = OrderBookDeltas::new(instrument_id?, all_deltas);
317 Some(NautilusWsMessage::Deltas(deltas))
318 }
319 }
320
321 fn handle_ticker_message(
322 &self,
323 msg: KrakenWsMessage,
324 ts_init: UnixNanos,
325 ) -> Option<NautilusWsMessage> {
326 let mut quotes = Vec::new();
327
328 for data in msg.data {
329 match serde_json::from_value::<KrakenWsTickerData>(data) {
330 Ok(ticker_data) => {
331 let instrument = self.get_instrument(&ticker_data.symbol)?;
332
333 match parse_quote_tick(&ticker_data, &instrument, ts_init) {
334 Ok(quote) => quotes.push(Data::Quote(quote)),
335 Err(e) => {
336 tracing::error!("Failed to parse quote tick: {e}");
337 }
338 }
339 }
340 Err(e) => {
341 tracing::error!("Failed to deserialize ticker data: {e}");
342 }
343 }
344 }
345
346 if quotes.is_empty() {
347 None
348 } else {
349 Some(NautilusWsMessage::Data(quotes))
350 }
351 }
352
353 fn handle_trade_message(
354 &self,
355 msg: KrakenWsMessage,
356 ts_init: UnixNanos,
357 ) -> Option<NautilusWsMessage> {
358 let mut trades = Vec::new();
359
360 for data in msg.data {
361 match serde_json::from_value::<KrakenWsTradeData>(data) {
362 Ok(trade_data) => {
363 let instrument = self.get_instrument(&trade_data.symbol)?;
364
365 match parse_trade_tick(&trade_data, &instrument, ts_init) {
366 Ok(trade) => trades.push(Data::Trade(trade)),
367 Err(e) => {
368 tracing::error!("Failed to parse trade tick: {e}");
369 }
370 }
371 }
372 Err(e) => {
373 tracing::error!("Failed to deserialize trade data: {e}");
374 }
375 }
376 }
377
378 if trades.is_empty() {
379 None
380 } else {
381 Some(NautilusWsMessage::Data(trades))
382 }
383 }
384
385 fn handle_ohlc_message(
386 &self,
387 _msg: KrakenWsMessage,
388 _ts_init: UnixNanos,
389 ) -> Option<NautilusWsMessage> {
390 tracing::debug!("OHLC message received but parsing not yet implemented");
392 None
393 }
394}