Skip to main content

nautilus_kraken/data/
futures.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Kraken Futures data client implementation.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc, RwLock,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use async_trait::async_trait;
29use nautilus_common::{
30    clients::DataClient,
31    live::{get_data_event_sender, get_runtime},
32    messages::{
33        DataEvent,
34        data::{
35            SubscribeBars, SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices,
36            SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, UnsubscribeBars,
37            UnsubscribeBookDeltas, UnsubscribeFundingRates, UnsubscribeIndexPrices,
38            UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
39        },
40    },
41};
42use nautilus_model::{
43    data::{Data, OrderBookDeltas_API},
44    enums::BookType,
45    identifiers::{ClientId, InstrumentId, Venue},
46    instruments::{Instrument, InstrumentAny},
47};
48use tokio::task::JoinHandle;
49use tokio_util::sync::CancellationToken;
50
51use crate::{
52    common::consts::KRAKEN_VENUE,
53    config::KrakenDataClientConfig,
54    http::KrakenFuturesHttpClient,
55    websocket::futures::{client::KrakenFuturesWebSocketClient, messages::KrakenFuturesWsMessage},
56};
57
58/// Kraken Futures data client.
59///
60/// Provides real-time market data from Kraken Futures markets.
61#[allow(dead_code)]
62#[derive(Debug)]
63pub struct KrakenFuturesDataClient {
64    client_id: ClientId,
65    config: KrakenDataClientConfig,
66    http: KrakenFuturesHttpClient,
67    ws: KrakenFuturesWebSocketClient,
68    is_connected: AtomicBool,
69    cancellation_token: CancellationToken,
70    tasks: Vec<JoinHandle<()>>,
71    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
72    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
73}
74
75impl KrakenFuturesDataClient {
76    /// Creates a new [`KrakenFuturesDataClient`] instance.
77    pub fn new(client_id: ClientId, config: KrakenDataClientConfig) -> anyhow::Result<Self> {
78        let cancellation_token = CancellationToken::new();
79
80        let http = KrakenFuturesHttpClient::new(
81            config.environment,
82            config.base_url.clone(),
83            config.timeout_secs,
84            None,
85            None,
86            None,
87            config.http_proxy.clone(),
88            config.max_requests_per_second,
89        )?;
90
91        let ws = KrakenFuturesWebSocketClient::new(
92            config.ws_public_url(),
93            config.heartbeat_interval_secs,
94        );
95
96        Ok(Self {
97            client_id,
98            config,
99            http,
100            ws,
101            is_connected: AtomicBool::new(false),
102            cancellation_token,
103            tasks: Vec::new(),
104            instruments: Arc::new(RwLock::new(AHashMap::new())),
105            data_sender: get_data_event_sender(),
106        })
107    }
108
109    /// Returns the cached instruments.
110    #[must_use]
111    pub fn instruments(&self) -> Vec<InstrumentAny> {
112        self.instruments
113            .read()
114            .map(|guard| guard.values().cloned().collect())
115            .unwrap_or_default()
116    }
117
118    /// Returns a cached instrument by ID.
119    #[must_use]
120    pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
121        self.instruments
122            .read()
123            .ok()
124            .and_then(|guard| guard.get(instrument_id).cloned())
125    }
126
127    async fn load_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
128        let instruments = self
129            .http
130            .request_instruments()
131            .await
132            .context("Failed to load futures instruments")?;
133
134        if let Ok(mut guard) = self.instruments.write() {
135            for instrument in &instruments {
136                guard.insert(instrument.id(), instrument.clone());
137            }
138        }
139
140        self.http.cache_instruments(instruments.clone());
141
142        log::info!(
143            "Loaded instruments: client_id={}, count={}",
144            self.client_id,
145            instruments.len()
146        );
147
148        Ok(instruments)
149    }
150
151    fn spawn_ws<F>(&self, fut: F, context: &'static str)
152    where
153        F: Future<Output = anyhow::Result<()>> + Send + 'static,
154    {
155        get_runtime().spawn(async move {
156            if let Err(e) = fut.await {
157                log::error!("{context}: {e:?}");
158            }
159        });
160    }
161
162    fn spawn_message_handler(&mut self) -> anyhow::Result<()> {
163        let mut rx = self
164            .ws
165            .take_output_rx()
166            .context("Failed to take futures WebSocket output receiver")?;
167        let data_sender = self.data_sender.clone();
168        let cancellation_token = self.cancellation_token.clone();
169
170        let handle = get_runtime().spawn(async move {
171            loop {
172                tokio::select! {
173                    () = cancellation_token.cancelled() => {
174                        log::debug!("Futures message handler cancelled");
175                        break;
176                    }
177                    msg = rx.recv() => {
178                        match msg {
179                            Some(ws_msg) => {
180                                Self::handle_ws_message(ws_msg, &data_sender);
181                            }
182                            None => {
183                                log::debug!("Futures WebSocket stream ended");
184                                break;
185                            }
186                        }
187                    }
188                }
189            }
190        });
191
192        self.tasks.push(handle);
193        Ok(())
194    }
195
196    fn handle_ws_message(
197        msg: KrakenFuturesWsMessage,
198        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
199    ) {
200        match msg {
201            KrakenFuturesWsMessage::BookDeltas(deltas) => {
202                let api_deltas = OrderBookDeltas_API::new(deltas);
203                if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(api_deltas))) {
204                    log::error!("Failed to send deltas event: {e}");
205                }
206            }
207            KrakenFuturesWsMessage::Quote(quote) => {
208                if let Err(e) = sender.send(DataEvent::Data(Data::Quote(quote))) {
209                    log::error!("Failed to send quote event: {e}");
210                }
211            }
212            KrakenFuturesWsMessage::Trade(trade) => {
213                if let Err(e) = sender.send(DataEvent::Data(Data::Trade(trade))) {
214                    log::error!("Failed to send trade event: {e}");
215                }
216            }
217            KrakenFuturesWsMessage::MarkPrice(mark_price) => {
218                if let Err(e) = sender.send(DataEvent::Data(Data::MarkPriceUpdate(mark_price))) {
219                    log::error!("Failed to send mark price event: {e}");
220                }
221            }
222            KrakenFuturesWsMessage::IndexPrice(index_price) => {
223                if let Err(e) = sender.send(DataEvent::Data(Data::IndexPriceUpdate(index_price))) {
224                    log::error!("Failed to send index price event: {e}");
225                }
226            }
227            KrakenFuturesWsMessage::FundingRate(funding_rate) => {
228                if let Err(e) = sender.send(DataEvent::FundingRate(funding_rate)) {
229                    log::error!("Failed to send funding rate event: {e}");
230                }
231            }
232            KrakenFuturesWsMessage::Reconnected => {
233                log::info!("Futures WebSocket reconnected");
234            }
235            // Execution messages are handled by the execution client
236            KrakenFuturesWsMessage::OrderAccepted(_)
237            | KrakenFuturesWsMessage::OrderCanceled(_)
238            | KrakenFuturesWsMessage::OrderExpired(_)
239            | KrakenFuturesWsMessage::OrderUpdated(_)
240            | KrakenFuturesWsMessage::OrderStatusReport(_)
241            | KrakenFuturesWsMessage::FillReport(_) => {}
242        }
243    }
244}
245
246#[async_trait(?Send)]
247impl DataClient for KrakenFuturesDataClient {
248    fn client_id(&self) -> ClientId {
249        self.client_id
250    }
251
252    fn venue(&self) -> Option<Venue> {
253        Some(*KRAKEN_VENUE)
254    }
255
256    fn start(&mut self) -> anyhow::Result<()> {
257        log::info!(
258            "Starting Futures data client: client_id={}, environment={:?}",
259            self.client_id,
260            self.config.environment
261        );
262        Ok(())
263    }
264
265    fn stop(&mut self) -> anyhow::Result<()> {
266        log::info!("Stopping Futures data client: {}", self.client_id);
267        self.cancellation_token.cancel();
268        self.is_connected.store(false, Ordering::Relaxed);
269        Ok(())
270    }
271
272    fn reset(&mut self) -> anyhow::Result<()> {
273        log::info!("Resetting Futures data client: {}", self.client_id);
274        self.cancellation_token.cancel();
275
276        for task in self.tasks.drain(..) {
277            task.abort();
278        }
279
280        let mut ws = self.ws.clone();
281        get_runtime().spawn(async move {
282            let _ = ws.close().await;
283        });
284
285        if let Ok(mut instruments) = self.instruments.write() {
286            instruments.clear();
287        }
288
289        self.is_connected.store(false, Ordering::Relaxed);
290        self.cancellation_token = CancellationToken::new();
291        Ok(())
292    }
293
294    fn dispose(&mut self) -> anyhow::Result<()> {
295        log::info!("Disposing Futures data client: {}", self.client_id);
296        self.stop()
297    }
298
299    fn is_connected(&self) -> bool {
300        self.is_connected.load(Ordering::SeqCst)
301    }
302
303    fn is_disconnected(&self) -> bool {
304        !self.is_connected()
305    }
306
307    async fn connect(&mut self) -> anyhow::Result<()> {
308        if self.is_connected() {
309            return Ok(());
310        }
311
312        let instruments = self.load_instruments().await?;
313
314        self.ws
315            .connect()
316            .await
317            .context("Failed to connect futures WebSocket")?;
318        self.ws
319            .wait_until_active(10.0)
320            .await
321            .context("Futures WebSocket failed to become active")?;
322
323        self.spawn_message_handler()?;
324        self.ws.cache_instruments(instruments.clone());
325
326        for instrument in instruments {
327            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
328                log::error!("Failed to send instrument: {e}");
329            }
330        }
331
332        self.is_connected.store(true, Ordering::Release);
333        log::info!(
334            "Connected: client_id={}, product_type=Futures",
335            self.client_id
336        );
337        Ok(())
338    }
339
340    async fn disconnect(&mut self) -> anyhow::Result<()> {
341        if self.is_disconnected() {
342            return Ok(());
343        }
344
345        self.cancellation_token.cancel();
346        let _ = self.ws.close().await;
347
348        for handle in self.tasks.drain(..) {
349            if let Err(e) = handle.await {
350                log::error!("Error joining WebSocket task: {e:?}");
351            }
352        }
353
354        self.cancellation_token = CancellationToken::new();
355        self.is_connected.store(false, Ordering::Relaxed);
356
357        log::info!("Disconnected: client_id={}", self.client_id);
358        Ok(())
359    }
360
361    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
362        let instrument_id = cmd.instrument_id;
363        let depth = cmd.depth;
364
365        if cmd.book_type != BookType::L2_MBP {
366            log::warn!(
367                "Book type {:?} not supported by Kraken, skipping subscription",
368                cmd.book_type
369            );
370            return Ok(());
371        }
372
373        let ws = self.ws.clone();
374        self.spawn_ws(
375            async move {
376                ws.subscribe_book(instrument_id, depth.map(|d| d.get() as u32))
377                    .await
378                    .map_err(|e| anyhow::anyhow!("{e}"))
379            },
380            "subscribe book",
381        );
382
383        log::info!("Subscribed to book: instrument_id={instrument_id}, depth={depth:?}");
384        Ok(())
385    }
386
387    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
388        let instrument_id = cmd.instrument_id;
389        let ws = self.ws.clone();
390
391        self.spawn_ws(
392            async move {
393                ws.subscribe_quotes(instrument_id)
394                    .await
395                    .map_err(|e| anyhow::anyhow!("{e}"))
396            },
397            "subscribe quotes",
398        );
399
400        log::info!("Subscribed to quotes: instrument_id={instrument_id}");
401        Ok(())
402    }
403
404    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
405        let instrument_id = cmd.instrument_id;
406        let ws = self.ws.clone();
407
408        self.spawn_ws(
409            async move {
410                ws.subscribe_trades(instrument_id)
411                    .await
412                    .map_err(|e| anyhow::anyhow!("{e}"))
413            },
414            "subscribe trades",
415        );
416
417        log::info!("Subscribed to trades: instrument_id={instrument_id}");
418        Ok(())
419    }
420
421    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
422        let instrument_id = cmd.instrument_id;
423        let ws = self.ws.clone();
424
425        self.spawn_ws(
426            async move {
427                ws.subscribe_mark_price(instrument_id)
428                    .await
429                    .map_err(|e| anyhow::anyhow!("{e}"))
430            },
431            "subscribe mark price",
432        );
433
434        log::info!("Subscribed to mark price: instrument_id={instrument_id}");
435        Ok(())
436    }
437
438    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
439        let instrument_id = cmd.instrument_id;
440        let ws = self.ws.clone();
441
442        self.spawn_ws(
443            async move {
444                ws.subscribe_index_price(instrument_id)
445                    .await
446                    .map_err(|e| anyhow::anyhow!("{e}"))
447            },
448            "subscribe index price",
449        );
450
451        log::info!("Subscribed to index price: instrument_id={instrument_id}");
452        Ok(())
453    }
454
455    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
456        log::warn!(
457            "Cannot subscribe to {} bars: Kraken Futures does not support EXTERNAL bar streaming",
458            cmd.bar_type
459        );
460        Ok(())
461    }
462
463    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
464        let instrument_id = cmd.instrument_id;
465        let ws = self.ws.clone();
466
467        self.spawn_ws(
468            async move {
469                ws.subscribe_funding_rate(instrument_id)
470                    .await
471                    .map_err(|e| anyhow::anyhow!("{e}"))
472            },
473            "subscribe funding rate",
474        );
475
476        log::info!("Subscribed to funding rate: instrument_id={instrument_id}");
477        Ok(())
478    }
479
480    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
481        let instrument_id = cmd.instrument_id;
482        let ws = self.ws.clone();
483
484        self.spawn_ws(
485            async move {
486                ws.unsubscribe_book(instrument_id)
487                    .await
488                    .map_err(|e| anyhow::anyhow!("{e}"))
489            },
490            "unsubscribe book",
491        );
492
493        log::info!("Unsubscribed from book: instrument_id={instrument_id}");
494        Ok(())
495    }
496
497    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
498        let instrument_id = cmd.instrument_id;
499        let ws = self.ws.clone();
500
501        self.spawn_ws(
502            async move {
503                ws.unsubscribe_quotes(instrument_id)
504                    .await
505                    .map_err(|e| anyhow::anyhow!("{e}"))
506            },
507            "unsubscribe quotes",
508        );
509
510        log::info!("Unsubscribed from quotes: instrument_id={instrument_id}");
511        Ok(())
512    }
513
514    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
515        let instrument_id = cmd.instrument_id;
516        let ws = self.ws.clone();
517
518        self.spawn_ws(
519            async move {
520                ws.unsubscribe_trades(instrument_id)
521                    .await
522                    .map_err(|e| anyhow::anyhow!("{e}"))
523            },
524            "unsubscribe trades",
525        );
526
527        log::info!("Unsubscribed from trades: instrument_id={instrument_id}");
528        Ok(())
529    }
530
531    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
532        let instrument_id = cmd.instrument_id;
533        let ws = self.ws.clone();
534
535        self.spawn_ws(
536            async move {
537                ws.unsubscribe_mark_price(instrument_id)
538                    .await
539                    .map_err(|e| anyhow::anyhow!("{e}"))
540            },
541            "unsubscribe mark price",
542        );
543
544        log::info!("Unsubscribed from mark price: instrument_id={instrument_id}");
545        Ok(())
546    }
547
548    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
549        let instrument_id = cmd.instrument_id;
550        let ws = self.ws.clone();
551
552        self.spawn_ws(
553            async move {
554                ws.unsubscribe_index_price(instrument_id)
555                    .await
556                    .map_err(|e| anyhow::anyhow!("{e}"))
557            },
558            "unsubscribe index price",
559        );
560
561        log::info!("Unsubscribed from index price: instrument_id={instrument_id}");
562        Ok(())
563    }
564
565    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
566        let instrument_id = cmd.instrument_id;
567        let ws = self.ws.clone();
568
569        self.spawn_ws(
570            async move {
571                ws.unsubscribe_funding_rate(instrument_id)
572                    .await
573                    .map_err(|e| anyhow::anyhow!("{e}"))
574            },
575            "unsubscribe funding rate",
576        );
577
578        log::info!("Unsubscribed from funding rate: instrument_id={instrument_id}");
579        Ok(())
580    }
581
582    fn unsubscribe_bars(&mut self, _cmd: &UnsubscribeBars) -> anyhow::Result<()> {
583        Ok(())
584    }
585}
586
587#[cfg(test)]
588mod tests {
589    use nautilus_common::{live::runner::set_data_event_sender, messages::DataEvent};
590    use nautilus_model::identifiers::ClientId;
591    use rstest::rstest;
592
593    use super::*;
594    use crate::{common::enums::KrakenProductType, config::KrakenDataClientConfig};
595
596    fn setup_test_env() {
597        let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
598        set_data_event_sender(sender);
599    }
600
601    #[rstest]
602    fn test_futures_data_client_new() {
603        setup_test_env();
604        let config = KrakenDataClientConfig {
605            product_type: KrakenProductType::Futures,
606            ..Default::default()
607        };
608        let client = KrakenFuturesDataClient::new(ClientId::from("KRAKEN"), config);
609        assert!(client.is_ok());
610
611        let client = client.unwrap();
612        assert_eq!(client.client_id(), ClientId::from("KRAKEN"));
613        assert_eq!(client.venue(), Some(*KRAKEN_VENUE));
614        assert!(!client.is_connected());
615        assert!(client.is_disconnected());
616        assert!(client.instruments().is_empty());
617    }
618
619    #[rstest]
620    fn test_futures_data_client_start_stop() {
621        setup_test_env();
622        let config = KrakenDataClientConfig {
623            product_type: KrakenProductType::Futures,
624            ..Default::default()
625        };
626        let mut client = KrakenFuturesDataClient::new(ClientId::from("KRAKEN"), config).unwrap();
627
628        assert!(client.start().is_ok());
629        assert!(client.stop().is_ok());
630        assert!(client.is_disconnected());
631    }
632}