1use std::{
19 future::Future,
20 sync::{
21 Arc, RwLock,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use async_trait::async_trait;
29use nautilus_common::{
30 clients::DataClient,
31 live::{get_data_event_sender, get_runtime},
32 messages::{
33 DataEvent,
34 data::{
35 SubscribeBars, SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices,
36 SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, UnsubscribeBars,
37 UnsubscribeBookDeltas, UnsubscribeFundingRates, UnsubscribeIndexPrices,
38 UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
39 },
40 },
41};
42use nautilus_model::{
43 data::{Data, OrderBookDeltas_API},
44 enums::BookType,
45 identifiers::{ClientId, InstrumentId, Venue},
46 instruments::{Instrument, InstrumentAny},
47};
48use tokio::task::JoinHandle;
49use tokio_util::sync::CancellationToken;
50
51use crate::{
52 common::consts::KRAKEN_VENUE,
53 config::KrakenDataClientConfig,
54 http::KrakenFuturesHttpClient,
55 websocket::futures::{client::KrakenFuturesWebSocketClient, messages::KrakenFuturesWsMessage},
56};
57
58#[allow(dead_code)]
62#[derive(Debug)]
63pub struct KrakenFuturesDataClient {
64 client_id: ClientId,
65 config: KrakenDataClientConfig,
66 http: KrakenFuturesHttpClient,
67 ws: KrakenFuturesWebSocketClient,
68 is_connected: AtomicBool,
69 cancellation_token: CancellationToken,
70 tasks: Vec<JoinHandle<()>>,
71 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
72 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
73}
74
75impl KrakenFuturesDataClient {
76 pub fn new(client_id: ClientId, config: KrakenDataClientConfig) -> anyhow::Result<Self> {
78 let cancellation_token = CancellationToken::new();
79
80 let http = KrakenFuturesHttpClient::new(
81 config.environment,
82 config.base_url.clone(),
83 config.timeout_secs,
84 None,
85 None,
86 None,
87 config.http_proxy.clone(),
88 config.max_requests_per_second,
89 )?;
90
91 let ws = KrakenFuturesWebSocketClient::new(
92 config.ws_public_url(),
93 config.heartbeat_interval_secs,
94 );
95
96 Ok(Self {
97 client_id,
98 config,
99 http,
100 ws,
101 is_connected: AtomicBool::new(false),
102 cancellation_token,
103 tasks: Vec::new(),
104 instruments: Arc::new(RwLock::new(AHashMap::new())),
105 data_sender: get_data_event_sender(),
106 })
107 }
108
109 #[must_use]
111 pub fn instruments(&self) -> Vec<InstrumentAny> {
112 self.instruments
113 .read()
114 .map(|guard| guard.values().cloned().collect())
115 .unwrap_or_default()
116 }
117
118 #[must_use]
120 pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
121 self.instruments
122 .read()
123 .ok()
124 .and_then(|guard| guard.get(instrument_id).cloned())
125 }
126
127 async fn load_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
128 let instruments = self
129 .http
130 .request_instruments()
131 .await
132 .context("Failed to load futures instruments")?;
133
134 if let Ok(mut guard) = self.instruments.write() {
135 for instrument in &instruments {
136 guard.insert(instrument.id(), instrument.clone());
137 }
138 }
139
140 self.http.cache_instruments(instruments.clone());
141
142 log::info!(
143 "Loaded instruments: client_id={}, count={}",
144 self.client_id,
145 instruments.len()
146 );
147
148 Ok(instruments)
149 }
150
151 fn spawn_ws<F>(&self, fut: F, context: &'static str)
152 where
153 F: Future<Output = anyhow::Result<()>> + Send + 'static,
154 {
155 get_runtime().spawn(async move {
156 if let Err(e) = fut.await {
157 log::error!("{context}: {e:?}");
158 }
159 });
160 }
161
162 fn spawn_message_handler(&mut self) -> anyhow::Result<()> {
163 let mut rx = self
164 .ws
165 .take_output_rx()
166 .context("Failed to take futures WebSocket output receiver")?;
167 let data_sender = self.data_sender.clone();
168 let cancellation_token = self.cancellation_token.clone();
169
170 let handle = get_runtime().spawn(async move {
171 loop {
172 tokio::select! {
173 () = cancellation_token.cancelled() => {
174 log::debug!("Futures message handler cancelled");
175 break;
176 }
177 msg = rx.recv() => {
178 match msg {
179 Some(ws_msg) => {
180 Self::handle_ws_message(ws_msg, &data_sender);
181 }
182 None => {
183 log::debug!("Futures WebSocket stream ended");
184 break;
185 }
186 }
187 }
188 }
189 }
190 });
191
192 self.tasks.push(handle);
193 Ok(())
194 }
195
196 fn handle_ws_message(
197 msg: KrakenFuturesWsMessage,
198 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
199 ) {
200 match msg {
201 KrakenFuturesWsMessage::BookDeltas(deltas) => {
202 let api_deltas = OrderBookDeltas_API::new(deltas);
203 if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(api_deltas))) {
204 log::error!("Failed to send deltas event: {e}");
205 }
206 }
207 KrakenFuturesWsMessage::Quote(quote) => {
208 if let Err(e) = sender.send(DataEvent::Data(Data::Quote(quote))) {
209 log::error!("Failed to send quote event: {e}");
210 }
211 }
212 KrakenFuturesWsMessage::Trade(trade) => {
213 if let Err(e) = sender.send(DataEvent::Data(Data::Trade(trade))) {
214 log::error!("Failed to send trade event: {e}");
215 }
216 }
217 KrakenFuturesWsMessage::MarkPrice(mark_price) => {
218 if let Err(e) = sender.send(DataEvent::Data(Data::MarkPriceUpdate(mark_price))) {
219 log::error!("Failed to send mark price event: {e}");
220 }
221 }
222 KrakenFuturesWsMessage::IndexPrice(index_price) => {
223 if let Err(e) = sender.send(DataEvent::Data(Data::IndexPriceUpdate(index_price))) {
224 log::error!("Failed to send index price event: {e}");
225 }
226 }
227 KrakenFuturesWsMessage::FundingRate(funding_rate) => {
228 if let Err(e) = sender.send(DataEvent::FundingRate(funding_rate)) {
229 log::error!("Failed to send funding rate event: {e}");
230 }
231 }
232 KrakenFuturesWsMessage::Reconnected => {
233 log::info!("Futures WebSocket reconnected");
234 }
235 KrakenFuturesWsMessage::OrderAccepted(_)
237 | KrakenFuturesWsMessage::OrderCanceled(_)
238 | KrakenFuturesWsMessage::OrderExpired(_)
239 | KrakenFuturesWsMessage::OrderUpdated(_)
240 | KrakenFuturesWsMessage::OrderStatusReport(_)
241 | KrakenFuturesWsMessage::FillReport(_) => {}
242 }
243 }
244}
245
246#[async_trait(?Send)]
247impl DataClient for KrakenFuturesDataClient {
248 fn client_id(&self) -> ClientId {
249 self.client_id
250 }
251
252 fn venue(&self) -> Option<Venue> {
253 Some(*KRAKEN_VENUE)
254 }
255
256 fn start(&mut self) -> anyhow::Result<()> {
257 log::info!(
258 "Starting Futures data client: client_id={}, environment={:?}",
259 self.client_id,
260 self.config.environment
261 );
262 Ok(())
263 }
264
265 fn stop(&mut self) -> anyhow::Result<()> {
266 log::info!("Stopping Futures data client: {}", self.client_id);
267 self.cancellation_token.cancel();
268 self.is_connected.store(false, Ordering::Relaxed);
269 Ok(())
270 }
271
272 fn reset(&mut self) -> anyhow::Result<()> {
273 log::info!("Resetting Futures data client: {}", self.client_id);
274 self.cancellation_token.cancel();
275
276 for task in self.tasks.drain(..) {
277 task.abort();
278 }
279
280 let mut ws = self.ws.clone();
281 get_runtime().spawn(async move {
282 let _ = ws.close().await;
283 });
284
285 if let Ok(mut instruments) = self.instruments.write() {
286 instruments.clear();
287 }
288
289 self.is_connected.store(false, Ordering::Relaxed);
290 self.cancellation_token = CancellationToken::new();
291 Ok(())
292 }
293
294 fn dispose(&mut self) -> anyhow::Result<()> {
295 log::info!("Disposing Futures data client: {}", self.client_id);
296 self.stop()
297 }
298
299 fn is_connected(&self) -> bool {
300 self.is_connected.load(Ordering::SeqCst)
301 }
302
303 fn is_disconnected(&self) -> bool {
304 !self.is_connected()
305 }
306
307 async fn connect(&mut self) -> anyhow::Result<()> {
308 if self.is_connected() {
309 return Ok(());
310 }
311
312 let instruments = self.load_instruments().await?;
313
314 self.ws
315 .connect()
316 .await
317 .context("Failed to connect futures WebSocket")?;
318 self.ws
319 .wait_until_active(10.0)
320 .await
321 .context("Futures WebSocket failed to become active")?;
322
323 self.spawn_message_handler()?;
324 self.ws.cache_instruments(instruments.clone());
325
326 for instrument in instruments {
327 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
328 log::error!("Failed to send instrument: {e}");
329 }
330 }
331
332 self.is_connected.store(true, Ordering::Release);
333 log::info!(
334 "Connected: client_id={}, product_type=Futures",
335 self.client_id
336 );
337 Ok(())
338 }
339
340 async fn disconnect(&mut self) -> anyhow::Result<()> {
341 if self.is_disconnected() {
342 return Ok(());
343 }
344
345 self.cancellation_token.cancel();
346 let _ = self.ws.close().await;
347
348 for handle in self.tasks.drain(..) {
349 if let Err(e) = handle.await {
350 log::error!("Error joining WebSocket task: {e:?}");
351 }
352 }
353
354 self.cancellation_token = CancellationToken::new();
355 self.is_connected.store(false, Ordering::Relaxed);
356
357 log::info!("Disconnected: client_id={}", self.client_id);
358 Ok(())
359 }
360
361 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
362 let instrument_id = cmd.instrument_id;
363 let depth = cmd.depth;
364
365 if cmd.book_type != BookType::L2_MBP {
366 log::warn!(
367 "Book type {:?} not supported by Kraken, skipping subscription",
368 cmd.book_type
369 );
370 return Ok(());
371 }
372
373 let ws = self.ws.clone();
374 self.spawn_ws(
375 async move {
376 ws.subscribe_book(instrument_id, depth.map(|d| d.get() as u32))
377 .await
378 .map_err(|e| anyhow::anyhow!("{e}"))
379 },
380 "subscribe book",
381 );
382
383 log::info!("Subscribed to book: instrument_id={instrument_id}, depth={depth:?}");
384 Ok(())
385 }
386
387 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
388 let instrument_id = cmd.instrument_id;
389 let ws = self.ws.clone();
390
391 self.spawn_ws(
392 async move {
393 ws.subscribe_quotes(instrument_id)
394 .await
395 .map_err(|e| anyhow::anyhow!("{e}"))
396 },
397 "subscribe quotes",
398 );
399
400 log::info!("Subscribed to quotes: instrument_id={instrument_id}");
401 Ok(())
402 }
403
404 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
405 let instrument_id = cmd.instrument_id;
406 let ws = self.ws.clone();
407
408 self.spawn_ws(
409 async move {
410 ws.subscribe_trades(instrument_id)
411 .await
412 .map_err(|e| anyhow::anyhow!("{e}"))
413 },
414 "subscribe trades",
415 );
416
417 log::info!("Subscribed to trades: instrument_id={instrument_id}");
418 Ok(())
419 }
420
421 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
422 let instrument_id = cmd.instrument_id;
423 let ws = self.ws.clone();
424
425 self.spawn_ws(
426 async move {
427 ws.subscribe_mark_price(instrument_id)
428 .await
429 .map_err(|e| anyhow::anyhow!("{e}"))
430 },
431 "subscribe mark price",
432 );
433
434 log::info!("Subscribed to mark price: instrument_id={instrument_id}");
435 Ok(())
436 }
437
438 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
439 let instrument_id = cmd.instrument_id;
440 let ws = self.ws.clone();
441
442 self.spawn_ws(
443 async move {
444 ws.subscribe_index_price(instrument_id)
445 .await
446 .map_err(|e| anyhow::anyhow!("{e}"))
447 },
448 "subscribe index price",
449 );
450
451 log::info!("Subscribed to index price: instrument_id={instrument_id}");
452 Ok(())
453 }
454
455 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
456 log::warn!(
457 "Cannot subscribe to {} bars: Kraken Futures does not support EXTERNAL bar streaming",
458 cmd.bar_type
459 );
460 Ok(())
461 }
462
463 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
464 let instrument_id = cmd.instrument_id;
465 let ws = self.ws.clone();
466
467 self.spawn_ws(
468 async move {
469 ws.subscribe_funding_rate(instrument_id)
470 .await
471 .map_err(|e| anyhow::anyhow!("{e}"))
472 },
473 "subscribe funding rate",
474 );
475
476 log::info!("Subscribed to funding rate: instrument_id={instrument_id}");
477 Ok(())
478 }
479
480 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
481 let instrument_id = cmd.instrument_id;
482 let ws = self.ws.clone();
483
484 self.spawn_ws(
485 async move {
486 ws.unsubscribe_book(instrument_id)
487 .await
488 .map_err(|e| anyhow::anyhow!("{e}"))
489 },
490 "unsubscribe book",
491 );
492
493 log::info!("Unsubscribed from book: instrument_id={instrument_id}");
494 Ok(())
495 }
496
497 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
498 let instrument_id = cmd.instrument_id;
499 let ws = self.ws.clone();
500
501 self.spawn_ws(
502 async move {
503 ws.unsubscribe_quotes(instrument_id)
504 .await
505 .map_err(|e| anyhow::anyhow!("{e}"))
506 },
507 "unsubscribe quotes",
508 );
509
510 log::info!("Unsubscribed from quotes: instrument_id={instrument_id}");
511 Ok(())
512 }
513
514 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
515 let instrument_id = cmd.instrument_id;
516 let ws = self.ws.clone();
517
518 self.spawn_ws(
519 async move {
520 ws.unsubscribe_trades(instrument_id)
521 .await
522 .map_err(|e| anyhow::anyhow!("{e}"))
523 },
524 "unsubscribe trades",
525 );
526
527 log::info!("Unsubscribed from trades: instrument_id={instrument_id}");
528 Ok(())
529 }
530
531 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
532 let instrument_id = cmd.instrument_id;
533 let ws = self.ws.clone();
534
535 self.spawn_ws(
536 async move {
537 ws.unsubscribe_mark_price(instrument_id)
538 .await
539 .map_err(|e| anyhow::anyhow!("{e}"))
540 },
541 "unsubscribe mark price",
542 );
543
544 log::info!("Unsubscribed from mark price: instrument_id={instrument_id}");
545 Ok(())
546 }
547
548 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
549 let instrument_id = cmd.instrument_id;
550 let ws = self.ws.clone();
551
552 self.spawn_ws(
553 async move {
554 ws.unsubscribe_index_price(instrument_id)
555 .await
556 .map_err(|e| anyhow::anyhow!("{e}"))
557 },
558 "unsubscribe index price",
559 );
560
561 log::info!("Unsubscribed from index price: instrument_id={instrument_id}");
562 Ok(())
563 }
564
565 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
566 let instrument_id = cmd.instrument_id;
567 let ws = self.ws.clone();
568
569 self.spawn_ws(
570 async move {
571 ws.unsubscribe_funding_rate(instrument_id)
572 .await
573 .map_err(|e| anyhow::anyhow!("{e}"))
574 },
575 "unsubscribe funding rate",
576 );
577
578 log::info!("Unsubscribed from funding rate: instrument_id={instrument_id}");
579 Ok(())
580 }
581
582 fn unsubscribe_bars(&mut self, _cmd: &UnsubscribeBars) -> anyhow::Result<()> {
583 Ok(())
584 }
585}
586
587#[cfg(test)]
588mod tests {
589 use nautilus_common::{live::runner::set_data_event_sender, messages::DataEvent};
590 use nautilus_model::identifiers::ClientId;
591 use rstest::rstest;
592
593 use super::*;
594 use crate::{common::enums::KrakenProductType, config::KrakenDataClientConfig};
595
596 fn setup_test_env() {
597 let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
598 set_data_event_sender(sender);
599 }
600
601 #[rstest]
602 fn test_futures_data_client_new() {
603 setup_test_env();
604 let config = KrakenDataClientConfig {
605 product_type: KrakenProductType::Futures,
606 ..Default::default()
607 };
608 let client = KrakenFuturesDataClient::new(ClientId::from("KRAKEN"), config);
609 assert!(client.is_ok());
610
611 let client = client.unwrap();
612 assert_eq!(client.client_id(), ClientId::from("KRAKEN"));
613 assert_eq!(client.venue(), Some(*KRAKEN_VENUE));
614 assert!(!client.is_connected());
615 assert!(client.is_disconnected());
616 assert!(client.instruments().is_empty());
617 }
618
619 #[rstest]
620 fn test_futures_data_client_start_stop() {
621 setup_test_env();
622 let config = KrakenDataClientConfig {
623 product_type: KrakenProductType::Futures,
624 ..Default::default()
625 };
626 let mut client = KrakenFuturesDataClient::new(ClientId::from("KRAKEN"), config).unwrap();
627
628 assert!(client.start().is_ok());
629 assert!(client.stop().is_ok());
630 assert!(client.is_disconnected());
631 }
632}