Skip to main content

nautilus_kraken/data/
spot.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Kraken Spot data client implementation.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc, RwLock,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use async_trait::async_trait;
29use futures_util::StreamExt;
30use nautilus_common::{
31    clients::DataClient,
32    live::{get_data_event_sender, get_runtime},
33    messages::{
34        DataEvent,
35        data::{
36            SubscribeBars, SubscribeBookDeltas, SubscribeIndexPrices, SubscribeMarkPrices,
37            SubscribeQuotes, SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas,
38            UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
39        },
40    },
41};
42use nautilus_model::{
43    data::{Data, OrderBookDeltas_API},
44    enums::{AggregationSource, BookType},
45    identifiers::{ClientId, InstrumentId, Venue},
46    instruments::{Instrument, InstrumentAny},
47};
48use tokio::task::JoinHandle;
49use tokio_util::sync::CancellationToken;
50
51use crate::{
52    common::consts::KRAKEN_VENUE,
53    config::KrakenDataClientConfig,
54    http::KrakenSpotHttpClient,
55    websocket::spot_v2::{client::KrakenSpotWebSocketClient, messages::NautilusWsMessage},
56};
57
58/// Kraken Spot data client.
59///
60/// Provides real-time market data from Kraken Spot markets through WebSocket v2.
61#[allow(dead_code)]
62#[derive(Debug)]
63pub struct KrakenSpotDataClient {
64    client_id: ClientId,
65    config: KrakenDataClientConfig,
66    http: KrakenSpotHttpClient,
67    ws: KrakenSpotWebSocketClient,
68    is_connected: AtomicBool,
69    cancellation_token: CancellationToken,
70    tasks: Vec<JoinHandle<()>>,
71    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
72    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
73}
74
75impl KrakenSpotDataClient {
76    /// Creates a new [`KrakenSpotDataClient`] instance.
77    pub fn new(client_id: ClientId, config: KrakenDataClientConfig) -> anyhow::Result<Self> {
78        let cancellation_token = CancellationToken::new();
79
80        let http = KrakenSpotHttpClient::new(
81            config.environment,
82            config.base_url.clone(),
83            config.timeout_secs,
84            None,
85            None,
86            None,
87            config.http_proxy.clone(),
88            config.max_requests_per_second,
89        )?;
90
91        let ws = KrakenSpotWebSocketClient::new(config.clone(), cancellation_token.clone());
92
93        Ok(Self {
94            client_id,
95            config,
96            http,
97            ws,
98            is_connected: AtomicBool::new(false),
99            cancellation_token,
100            tasks: Vec::new(),
101            instruments: Arc::new(RwLock::new(AHashMap::new())),
102            data_sender: get_data_event_sender(),
103        })
104    }
105
106    /// Returns the cached instruments.
107    #[must_use]
108    pub fn instruments(&self) -> Vec<InstrumentAny> {
109        self.instruments
110            .read()
111            .map(|guard| guard.values().cloned().collect())
112            .unwrap_or_default()
113    }
114
115    /// Returns a cached instrument by ID.
116    #[must_use]
117    pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
118        self.instruments
119            .read()
120            .ok()
121            .and_then(|guard| guard.get(instrument_id).cloned())
122    }
123
124    async fn load_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
125        let instruments = self
126            .http
127            .request_instruments(None)
128            .await
129            .context("Failed to load spot instruments")?;
130
131        if let Ok(mut guard) = self.instruments.write() {
132            for instrument in &instruments {
133                guard.insert(instrument.id(), instrument.clone());
134            }
135        }
136
137        self.http.cache_instruments(instruments.clone());
138
139        log::info!(
140            "Loaded instruments: client_id={}, count={}",
141            self.client_id,
142            instruments.len()
143        );
144
145        Ok(instruments)
146    }
147
148    fn spawn_ws<F>(&self, fut: F, context: &'static str)
149    where
150        F: Future<Output = anyhow::Result<()>> + Send + 'static,
151    {
152        get_runtime().spawn(async move {
153            if let Err(e) = fut.await {
154                log::error!("{context}: {e:?}");
155            }
156        });
157    }
158
159    fn spawn_message_handler(&mut self) -> anyhow::Result<()> {
160        let stream = self.ws.stream().map_err(|e| anyhow::anyhow!("{e}"))?;
161        let data_sender = self.data_sender.clone();
162        let cancellation_token = self.cancellation_token.clone();
163
164        let handle = get_runtime().spawn(async move {
165            tokio::pin!(stream);
166
167            loop {
168                tokio::select! {
169                    () = cancellation_token.cancelled() => {
170                        log::debug!("Spot message handler cancelled");
171                        break;
172                    }
173                    msg = stream.next() => {
174                        match msg {
175                            Some(ws_msg) => {
176                                Self::handle_ws_message(ws_msg, &data_sender);
177                            }
178                            None => {
179                                log::debug!("Spot WebSocket stream ended");
180                                break;
181                            }
182                        }
183                    }
184                }
185            }
186        });
187
188        self.tasks.push(handle);
189        Ok(())
190    }
191
192    fn handle_ws_message(
193        msg: NautilusWsMessage,
194        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
195    ) {
196        match msg {
197            NautilusWsMessage::Data(data_vec) => {
198                for data in data_vec {
199                    if let Err(e) = sender.send(DataEvent::Data(data)) {
200                        log::error!("Failed to send data event: {e}");
201                    }
202                }
203            }
204            NautilusWsMessage::Deltas(deltas) => {
205                let api_deltas = OrderBookDeltas_API::new(deltas);
206                if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(api_deltas))) {
207                    log::error!("Failed to send deltas event: {e}");
208                }
209            }
210            NautilusWsMessage::Reconnected => {
211                log::info!("Spot WebSocket reconnected");
212            }
213            NautilusWsMessage::OrderRejected(_)
214            | NautilusWsMessage::OrderAccepted(_)
215            | NautilusWsMessage::OrderCanceled(_)
216            | NautilusWsMessage::OrderExpired(_)
217            | NautilusWsMessage::OrderUpdated(_)
218            | NautilusWsMessage::OrderStatusReport(_)
219            | NautilusWsMessage::FillReport(_) => {}
220        }
221    }
222}
223
224#[async_trait(?Send)]
225impl DataClient for KrakenSpotDataClient {
226    fn client_id(&self) -> ClientId {
227        self.client_id
228    }
229
230    fn venue(&self) -> Option<Venue> {
231        Some(*KRAKEN_VENUE)
232    }
233
234    fn start(&mut self) -> anyhow::Result<()> {
235        log::info!(
236            "Starting Spot data client: client_id={}, environment={:?}",
237            self.client_id,
238            self.config.environment
239        );
240        Ok(())
241    }
242
243    fn stop(&mut self) -> anyhow::Result<()> {
244        log::info!("Stopping Spot data client: {}", self.client_id);
245        self.cancellation_token.cancel();
246        self.is_connected.store(false, Ordering::Relaxed);
247        Ok(())
248    }
249
250    fn reset(&mut self) -> anyhow::Result<()> {
251        log::info!("Resetting Spot data client: {}", self.client_id);
252        self.cancellation_token.cancel();
253
254        for task in self.tasks.drain(..) {
255            task.abort();
256        }
257
258        let mut ws = self.ws.clone();
259        get_runtime().spawn(async move {
260            let _ = ws.close().await;
261        });
262
263        if let Ok(mut instruments) = self.instruments.write() {
264            instruments.clear();
265        }
266
267        self.is_connected.store(false, Ordering::Relaxed);
268        self.cancellation_token = CancellationToken::new();
269        Ok(())
270    }
271
272    fn dispose(&mut self) -> anyhow::Result<()> {
273        log::info!("Disposing Spot data client: {}", self.client_id);
274        self.stop()
275    }
276
277    fn is_connected(&self) -> bool {
278        self.is_connected.load(Ordering::SeqCst)
279    }
280
281    fn is_disconnected(&self) -> bool {
282        !self.is_connected()
283    }
284
285    async fn connect(&mut self) -> anyhow::Result<()> {
286        if self.is_connected() {
287            return Ok(());
288        }
289
290        let instruments = self.load_instruments().await?;
291
292        self.ws
293            .connect()
294            .await
295            .context("Failed to connect spot WebSocket")?;
296        self.ws
297            .wait_until_active(10.0)
298            .await
299            .context("Spot WebSocket failed to become active")?;
300
301        self.spawn_message_handler()?;
302        self.ws.cache_instruments(instruments.clone());
303
304        for instrument in instruments {
305            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
306                log::error!("Failed to send instrument: {e}");
307            }
308        }
309
310        self.is_connected.store(true, Ordering::Release);
311        log::info!("Connected: client_id={}, product_type=Spot", self.client_id);
312        Ok(())
313    }
314
315    async fn disconnect(&mut self) -> anyhow::Result<()> {
316        if self.is_disconnected() {
317            return Ok(());
318        }
319
320        self.cancellation_token.cancel();
321        let _ = self.ws.close().await;
322
323        for handle in self.tasks.drain(..) {
324            if let Err(e) = handle.await {
325                log::error!("Error joining WebSocket task: {e:?}");
326            }
327        }
328
329        self.cancellation_token = CancellationToken::new();
330        self.is_connected.store(false, Ordering::Relaxed);
331
332        log::info!("Disconnected: client_id={}", self.client_id);
333        Ok(())
334    }
335
336    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
337        let instrument_id = cmd.instrument_id;
338        let depth = cmd.depth;
339
340        if cmd.book_type != BookType::L2_MBP {
341            log::warn!(
342                "Book type {:?} not supported by Kraken, skipping subscription",
343                cmd.book_type
344            );
345            return Ok(());
346        }
347
348        if let Some(d) = depth {
349            let d_val = d.get();
350            if !matches!(d_val, 10 | 25 | 100 | 500 | 1000) {
351                log::warn!("Invalid depth {d_val} for Kraken Spot, valid: 10, 25, 100, 500, 1000");
352                return Ok(());
353            }
354        }
355
356        let ws = self.ws.clone();
357        self.spawn_ws(
358            async move {
359                ws.subscribe_book(instrument_id, depth.map(|d| d.get() as u32))
360                    .await
361                    .map_err(|e| anyhow::anyhow!("{e}"))
362            },
363            "subscribe book",
364        );
365
366        log::info!("Subscribed to book: instrument_id={instrument_id}, depth={depth:?}");
367        Ok(())
368    }
369
370    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
371        let instrument_id = cmd.instrument_id;
372        let ws = self.ws.clone();
373
374        self.spawn_ws(
375            async move {
376                ws.subscribe_quotes(instrument_id)
377                    .await
378                    .map_err(|e| anyhow::anyhow!("{e}"))
379            },
380            "subscribe quotes",
381        );
382
383        log::info!("Subscribed to quotes: instrument_id={instrument_id}");
384        Ok(())
385    }
386
387    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
388        let instrument_id = cmd.instrument_id;
389        let ws = self.ws.clone();
390
391        self.spawn_ws(
392            async move {
393                ws.subscribe_trades(instrument_id)
394                    .await
395                    .map_err(|e| anyhow::anyhow!("{e}"))
396            },
397            "subscribe trades",
398        );
399
400        log::info!("Subscribed to trades: instrument_id={instrument_id}");
401        Ok(())
402    }
403
404    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
405        log::warn!(
406            "Mark price subscription not supported for Spot instrument {}",
407            cmd.instrument_id
408        );
409        Ok(())
410    }
411
412    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
413        log::warn!(
414            "Index price subscription not supported for Spot instrument {}",
415            cmd.instrument_id
416        );
417        Ok(())
418    }
419
420    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
421        let bar_type = cmd.bar_type;
422
423        if bar_type.aggregation_source() != AggregationSource::External {
424            log::warn!("Cannot subscribe to {bar_type} bars: only EXTERNAL bars supported");
425            return Ok(());
426        }
427
428        if !bar_type.spec().is_time_aggregated() {
429            log::warn!("Cannot subscribe to {bar_type} bars: only time-based bars supported");
430            return Ok(());
431        }
432
433        let ws = self.ws.clone();
434        self.spawn_ws(
435            async move {
436                ws.subscribe_bars(bar_type)
437                    .await
438                    .map_err(|e| anyhow::anyhow!("{e}"))
439            },
440            "subscribe bars",
441        );
442
443        log::info!("Subscribed to bars: bar_type={bar_type}");
444        Ok(())
445    }
446
447    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
448        let instrument_id = cmd.instrument_id;
449        let ws = self.ws.clone();
450
451        self.spawn_ws(
452            async move {
453                ws.unsubscribe_book(instrument_id)
454                    .await
455                    .map_err(|e| anyhow::anyhow!("{e}"))
456            },
457            "unsubscribe book",
458        );
459
460        log::info!("Unsubscribed from book: instrument_id={instrument_id}");
461        Ok(())
462    }
463
464    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
465        let instrument_id = cmd.instrument_id;
466        let ws = self.ws.clone();
467
468        self.spawn_ws(
469            async move {
470                ws.unsubscribe_quotes(instrument_id)
471                    .await
472                    .map_err(|e| anyhow::anyhow!("{e}"))
473            },
474            "unsubscribe quotes",
475        );
476
477        log::info!("Unsubscribed from quotes: instrument_id={instrument_id}");
478        Ok(())
479    }
480
481    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
482        let instrument_id = cmd.instrument_id;
483        let ws = self.ws.clone();
484
485        self.spawn_ws(
486            async move {
487                ws.unsubscribe_trades(instrument_id)
488                    .await
489                    .map_err(|e| anyhow::anyhow!("{e}"))
490            },
491            "unsubscribe trades",
492        );
493
494        log::info!("Unsubscribed from trades: instrument_id={instrument_id}");
495        Ok(())
496    }
497
498    fn unsubscribe_mark_prices(&mut self, _cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
499        Ok(())
500    }
501
502    fn unsubscribe_index_prices(&mut self, _cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
503        Ok(())
504    }
505
506    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
507        let bar_type = cmd.bar_type;
508        let ws = self.ws.clone();
509
510        self.spawn_ws(
511            async move {
512                ws.unsubscribe_bars(bar_type)
513                    .await
514                    .map_err(|e| anyhow::anyhow!("{e}"))
515            },
516            "unsubscribe bars",
517        );
518
519        log::info!("Unsubscribed from bars: bar_type={bar_type}");
520        Ok(())
521    }
522}
523
524#[cfg(test)]
525mod tests {
526    use nautilus_common::{live::runner::set_data_event_sender, messages::DataEvent};
527    use nautilus_model::identifiers::ClientId;
528    use rstest::rstest;
529
530    use super::*;
531    use crate::config::KrakenDataClientConfig;
532
533    fn setup_test_env() {
534        let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
535        set_data_event_sender(sender);
536    }
537
538    #[rstest]
539    fn test_spot_data_client_new() {
540        setup_test_env();
541        let config = KrakenDataClientConfig::default();
542        let client = KrakenSpotDataClient::new(ClientId::from("KRAKEN"), config);
543        assert!(client.is_ok());
544
545        let client = client.unwrap();
546        assert_eq!(client.client_id(), ClientId::from("KRAKEN"));
547        assert_eq!(client.venue(), Some(*KRAKEN_VENUE));
548        assert!(!client.is_connected());
549        assert!(client.is_disconnected());
550        assert!(client.instruments().is_empty());
551    }
552
553    #[rstest]
554    fn test_spot_data_client_start_stop() {
555        setup_test_env();
556        let config = KrakenDataClientConfig::default();
557        let mut client = KrakenSpotDataClient::new(ClientId::from("KRAKEN"), config).unwrap();
558
559        assert!(client.start().is_ok());
560        assert!(client.stop().is_ok());
561        assert!(client.is_disconnected());
562    }
563}