1use std::{
19 future::Future,
20 sync::{
21 Arc, RwLock,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use async_trait::async_trait;
29use futures_util::StreamExt;
30use nautilus_common::{
31 clients::DataClient,
32 live::{get_data_event_sender, get_runtime},
33 messages::{
34 DataEvent,
35 data::{
36 SubscribeBars, SubscribeBookDeltas, SubscribeIndexPrices, SubscribeMarkPrices,
37 SubscribeQuotes, SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas,
38 UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
39 },
40 },
41};
42use nautilus_model::{
43 data::{Data, OrderBookDeltas_API},
44 enums::{AggregationSource, BookType},
45 identifiers::{ClientId, InstrumentId, Venue},
46 instruments::{Instrument, InstrumentAny},
47};
48use tokio::task::JoinHandle;
49use tokio_util::sync::CancellationToken;
50
51use crate::{
52 common::consts::KRAKEN_VENUE,
53 config::KrakenDataClientConfig,
54 http::KrakenSpotHttpClient,
55 websocket::spot_v2::{client::KrakenSpotWebSocketClient, messages::NautilusWsMessage},
56};
57
58#[allow(dead_code)]
62#[derive(Debug)]
63pub struct KrakenSpotDataClient {
64 client_id: ClientId,
65 config: KrakenDataClientConfig,
66 http: KrakenSpotHttpClient,
67 ws: KrakenSpotWebSocketClient,
68 is_connected: AtomicBool,
69 cancellation_token: CancellationToken,
70 tasks: Vec<JoinHandle<()>>,
71 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
72 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
73}
74
75impl KrakenSpotDataClient {
76 pub fn new(client_id: ClientId, config: KrakenDataClientConfig) -> anyhow::Result<Self> {
78 let cancellation_token = CancellationToken::new();
79
80 let http = KrakenSpotHttpClient::new(
81 config.environment,
82 config.base_url.clone(),
83 config.timeout_secs,
84 None,
85 None,
86 None,
87 config.http_proxy.clone(),
88 config.max_requests_per_second,
89 )?;
90
91 let ws = KrakenSpotWebSocketClient::new(config.clone(), cancellation_token.clone());
92
93 Ok(Self {
94 client_id,
95 config,
96 http,
97 ws,
98 is_connected: AtomicBool::new(false),
99 cancellation_token,
100 tasks: Vec::new(),
101 instruments: Arc::new(RwLock::new(AHashMap::new())),
102 data_sender: get_data_event_sender(),
103 })
104 }
105
106 #[must_use]
108 pub fn instruments(&self) -> Vec<InstrumentAny> {
109 self.instruments
110 .read()
111 .map(|guard| guard.values().cloned().collect())
112 .unwrap_or_default()
113 }
114
115 #[must_use]
117 pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
118 self.instruments
119 .read()
120 .ok()
121 .and_then(|guard| guard.get(instrument_id).cloned())
122 }
123
124 async fn load_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
125 let instruments = self
126 .http
127 .request_instruments(None)
128 .await
129 .context("Failed to load spot instruments")?;
130
131 if let Ok(mut guard) = self.instruments.write() {
132 for instrument in &instruments {
133 guard.insert(instrument.id(), instrument.clone());
134 }
135 }
136
137 self.http.cache_instruments(instruments.clone());
138
139 log::info!(
140 "Loaded instruments: client_id={}, count={}",
141 self.client_id,
142 instruments.len()
143 );
144
145 Ok(instruments)
146 }
147
148 fn spawn_ws<F>(&self, fut: F, context: &'static str)
149 where
150 F: Future<Output = anyhow::Result<()>> + Send + 'static,
151 {
152 get_runtime().spawn(async move {
153 if let Err(e) = fut.await {
154 log::error!("{context}: {e:?}");
155 }
156 });
157 }
158
159 fn spawn_message_handler(&mut self) -> anyhow::Result<()> {
160 let stream = self.ws.stream().map_err(|e| anyhow::anyhow!("{e}"))?;
161 let data_sender = self.data_sender.clone();
162 let cancellation_token = self.cancellation_token.clone();
163
164 let handle = get_runtime().spawn(async move {
165 tokio::pin!(stream);
166
167 loop {
168 tokio::select! {
169 () = cancellation_token.cancelled() => {
170 log::debug!("Spot message handler cancelled");
171 break;
172 }
173 msg = stream.next() => {
174 match msg {
175 Some(ws_msg) => {
176 Self::handle_ws_message(ws_msg, &data_sender);
177 }
178 None => {
179 log::debug!("Spot WebSocket stream ended");
180 break;
181 }
182 }
183 }
184 }
185 }
186 });
187
188 self.tasks.push(handle);
189 Ok(())
190 }
191
192 fn handle_ws_message(
193 msg: NautilusWsMessage,
194 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
195 ) {
196 match msg {
197 NautilusWsMessage::Data(data_vec) => {
198 for data in data_vec {
199 if let Err(e) = sender.send(DataEvent::Data(data)) {
200 log::error!("Failed to send data event: {e}");
201 }
202 }
203 }
204 NautilusWsMessage::Deltas(deltas) => {
205 let api_deltas = OrderBookDeltas_API::new(deltas);
206 if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(api_deltas))) {
207 log::error!("Failed to send deltas event: {e}");
208 }
209 }
210 NautilusWsMessage::Reconnected => {
211 log::info!("Spot WebSocket reconnected");
212 }
213 NautilusWsMessage::OrderRejected(_)
214 | NautilusWsMessage::OrderAccepted(_)
215 | NautilusWsMessage::OrderCanceled(_)
216 | NautilusWsMessage::OrderExpired(_)
217 | NautilusWsMessage::OrderUpdated(_)
218 | NautilusWsMessage::OrderStatusReport(_)
219 | NautilusWsMessage::FillReport(_) => {}
220 }
221 }
222}
223
224#[async_trait(?Send)]
225impl DataClient for KrakenSpotDataClient {
226 fn client_id(&self) -> ClientId {
227 self.client_id
228 }
229
230 fn venue(&self) -> Option<Venue> {
231 Some(*KRAKEN_VENUE)
232 }
233
234 fn start(&mut self) -> anyhow::Result<()> {
235 log::info!(
236 "Starting Spot data client: client_id={}, environment={:?}",
237 self.client_id,
238 self.config.environment
239 );
240 Ok(())
241 }
242
243 fn stop(&mut self) -> anyhow::Result<()> {
244 log::info!("Stopping Spot data client: {}", self.client_id);
245 self.cancellation_token.cancel();
246 self.is_connected.store(false, Ordering::Relaxed);
247 Ok(())
248 }
249
250 fn reset(&mut self) -> anyhow::Result<()> {
251 log::info!("Resetting Spot data client: {}", self.client_id);
252 self.cancellation_token.cancel();
253
254 for task in self.tasks.drain(..) {
255 task.abort();
256 }
257
258 let mut ws = self.ws.clone();
259 get_runtime().spawn(async move {
260 let _ = ws.close().await;
261 });
262
263 if let Ok(mut instruments) = self.instruments.write() {
264 instruments.clear();
265 }
266
267 self.is_connected.store(false, Ordering::Relaxed);
268 self.cancellation_token = CancellationToken::new();
269 Ok(())
270 }
271
272 fn dispose(&mut self) -> anyhow::Result<()> {
273 log::info!("Disposing Spot data client: {}", self.client_id);
274 self.stop()
275 }
276
277 fn is_connected(&self) -> bool {
278 self.is_connected.load(Ordering::SeqCst)
279 }
280
281 fn is_disconnected(&self) -> bool {
282 !self.is_connected()
283 }
284
285 async fn connect(&mut self) -> anyhow::Result<()> {
286 if self.is_connected() {
287 return Ok(());
288 }
289
290 let instruments = self.load_instruments().await?;
291
292 self.ws
293 .connect()
294 .await
295 .context("Failed to connect spot WebSocket")?;
296 self.ws
297 .wait_until_active(10.0)
298 .await
299 .context("Spot WebSocket failed to become active")?;
300
301 self.spawn_message_handler()?;
302 self.ws.cache_instruments(instruments.clone());
303
304 for instrument in instruments {
305 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
306 log::error!("Failed to send instrument: {e}");
307 }
308 }
309
310 self.is_connected.store(true, Ordering::Release);
311 log::info!("Connected: client_id={}, product_type=Spot", self.client_id);
312 Ok(())
313 }
314
315 async fn disconnect(&mut self) -> anyhow::Result<()> {
316 if self.is_disconnected() {
317 return Ok(());
318 }
319
320 self.cancellation_token.cancel();
321 let _ = self.ws.close().await;
322
323 for handle in self.tasks.drain(..) {
324 if let Err(e) = handle.await {
325 log::error!("Error joining WebSocket task: {e:?}");
326 }
327 }
328
329 self.cancellation_token = CancellationToken::new();
330 self.is_connected.store(false, Ordering::Relaxed);
331
332 log::info!("Disconnected: client_id={}", self.client_id);
333 Ok(())
334 }
335
336 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
337 let instrument_id = cmd.instrument_id;
338 let depth = cmd.depth;
339
340 if cmd.book_type != BookType::L2_MBP {
341 log::warn!(
342 "Book type {:?} not supported by Kraken, skipping subscription",
343 cmd.book_type
344 );
345 return Ok(());
346 }
347
348 if let Some(d) = depth {
349 let d_val = d.get();
350 if !matches!(d_val, 10 | 25 | 100 | 500 | 1000) {
351 log::warn!("Invalid depth {d_val} for Kraken Spot, valid: 10, 25, 100, 500, 1000");
352 return Ok(());
353 }
354 }
355
356 let ws = self.ws.clone();
357 self.spawn_ws(
358 async move {
359 ws.subscribe_book(instrument_id, depth.map(|d| d.get() as u32))
360 .await
361 .map_err(|e| anyhow::anyhow!("{e}"))
362 },
363 "subscribe book",
364 );
365
366 log::info!("Subscribed to book: instrument_id={instrument_id}, depth={depth:?}");
367 Ok(())
368 }
369
370 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
371 let instrument_id = cmd.instrument_id;
372 let ws = self.ws.clone();
373
374 self.spawn_ws(
375 async move {
376 ws.subscribe_quotes(instrument_id)
377 .await
378 .map_err(|e| anyhow::anyhow!("{e}"))
379 },
380 "subscribe quotes",
381 );
382
383 log::info!("Subscribed to quotes: instrument_id={instrument_id}");
384 Ok(())
385 }
386
387 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
388 let instrument_id = cmd.instrument_id;
389 let ws = self.ws.clone();
390
391 self.spawn_ws(
392 async move {
393 ws.subscribe_trades(instrument_id)
394 .await
395 .map_err(|e| anyhow::anyhow!("{e}"))
396 },
397 "subscribe trades",
398 );
399
400 log::info!("Subscribed to trades: instrument_id={instrument_id}");
401 Ok(())
402 }
403
404 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
405 log::warn!(
406 "Mark price subscription not supported for Spot instrument {}",
407 cmd.instrument_id
408 );
409 Ok(())
410 }
411
412 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
413 log::warn!(
414 "Index price subscription not supported for Spot instrument {}",
415 cmd.instrument_id
416 );
417 Ok(())
418 }
419
420 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
421 let bar_type = cmd.bar_type;
422
423 if bar_type.aggregation_source() != AggregationSource::External {
424 log::warn!("Cannot subscribe to {bar_type} bars: only EXTERNAL bars supported");
425 return Ok(());
426 }
427
428 if !bar_type.spec().is_time_aggregated() {
429 log::warn!("Cannot subscribe to {bar_type} bars: only time-based bars supported");
430 return Ok(());
431 }
432
433 let ws = self.ws.clone();
434 self.spawn_ws(
435 async move {
436 ws.subscribe_bars(bar_type)
437 .await
438 .map_err(|e| anyhow::anyhow!("{e}"))
439 },
440 "subscribe bars",
441 );
442
443 log::info!("Subscribed to bars: bar_type={bar_type}");
444 Ok(())
445 }
446
447 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
448 let instrument_id = cmd.instrument_id;
449 let ws = self.ws.clone();
450
451 self.spawn_ws(
452 async move {
453 ws.unsubscribe_book(instrument_id)
454 .await
455 .map_err(|e| anyhow::anyhow!("{e}"))
456 },
457 "unsubscribe book",
458 );
459
460 log::info!("Unsubscribed from book: instrument_id={instrument_id}");
461 Ok(())
462 }
463
464 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
465 let instrument_id = cmd.instrument_id;
466 let ws = self.ws.clone();
467
468 self.spawn_ws(
469 async move {
470 ws.unsubscribe_quotes(instrument_id)
471 .await
472 .map_err(|e| anyhow::anyhow!("{e}"))
473 },
474 "unsubscribe quotes",
475 );
476
477 log::info!("Unsubscribed from quotes: instrument_id={instrument_id}");
478 Ok(())
479 }
480
481 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
482 let instrument_id = cmd.instrument_id;
483 let ws = self.ws.clone();
484
485 self.spawn_ws(
486 async move {
487 ws.unsubscribe_trades(instrument_id)
488 .await
489 .map_err(|e| anyhow::anyhow!("{e}"))
490 },
491 "unsubscribe trades",
492 );
493
494 log::info!("Unsubscribed from trades: instrument_id={instrument_id}");
495 Ok(())
496 }
497
498 fn unsubscribe_mark_prices(&mut self, _cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
499 Ok(())
500 }
501
502 fn unsubscribe_index_prices(&mut self, _cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
503 Ok(())
504 }
505
506 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
507 let bar_type = cmd.bar_type;
508 let ws = self.ws.clone();
509
510 self.spawn_ws(
511 async move {
512 ws.unsubscribe_bars(bar_type)
513 .await
514 .map_err(|e| anyhow::anyhow!("{e}"))
515 },
516 "unsubscribe bars",
517 );
518
519 log::info!("Unsubscribed from bars: bar_type={bar_type}");
520 Ok(())
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use nautilus_common::{live::runner::set_data_event_sender, messages::DataEvent};
527 use nautilus_model::identifiers::ClientId;
528 use rstest::rstest;
529
530 use super::*;
531 use crate::config::KrakenDataClientConfig;
532
533 fn setup_test_env() {
534 let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
535 set_data_event_sender(sender);
536 }
537
538 #[rstest]
539 fn test_spot_data_client_new() {
540 setup_test_env();
541 let config = KrakenDataClientConfig::default();
542 let client = KrakenSpotDataClient::new(ClientId::from("KRAKEN"), config);
543 assert!(client.is_ok());
544
545 let client = client.unwrap();
546 assert_eq!(client.client_id(), ClientId::from("KRAKEN"));
547 assert_eq!(client.venue(), Some(*KRAKEN_VENUE));
548 assert!(!client.is_connected());
549 assert!(client.is_disconnected());
550 assert!(client.instruments().is_empty());
551 }
552
553 #[rstest]
554 fn test_spot_data_client_start_stop() {
555 setup_test_env();
556 let config = KrakenDataClientConfig::default();
557 let mut client = KrakenSpotDataClient::new(ClientId::from("KRAKEN"), config).unwrap();
558
559 assert!(client.start().is_ok());
560 assert!(client.stop().is_ok());
561 assert!(client.is_disconnected());
562 }
563}