kraken_ws_data/
ws_data.rs1use futures_util::StreamExt;
20use nautilus_kraken::{
21 config::KrakenDataClientConfig,
22 websocket::{client::KrakenWebSocketClient, enums::KrakenWsChannel},
23};
24use tokio::{pin, signal};
25use tokio_util::sync::CancellationToken;
26use tracing::level_filters::LevelFilter;
27use ustr::Ustr;
28
29#[tokio::main]
30async fn main() -> anyhow::Result<()> {
31 tracing_subscriber::fmt()
32 .with_max_level(LevelFilter::INFO)
33 .with_target(false)
34 .compact()
35 .init();
36
37 let config = KrakenDataClientConfig::default();
38 let token = CancellationToken::new();
39
40 let mut client = KrakenWebSocketClient::new(config, token.clone());
41
42 client.connect().await?;
43
44 client
45 .subscribe(KrakenWsChannel::Ticker, vec![Ustr::from("BTC/USD")], None)
46 .await?;
47
48 client
49 .subscribe(KrakenWsChannel::Trade, vec![Ustr::from("BTC/USD")], None)
50 .await?;
51
52 let stream = client.stream();
53 let shutdown = signal::ctrl_c();
54 pin!(stream);
55 pin!(shutdown);
56
57 tracing::info!("Streaming Kraken market data; press Ctrl+C to exit");
58
59 loop {
60 tokio::select! {
61 Some(msg) = stream.next() => {
62 tracing::info!("Received: {:#?}", msg);
63 }
64 _ = &mut shutdown => {
65 tracing::info!("Received Ctrl+C, closing connection");
66 client.disconnect().await?;
67 break;
68 }
69 else => break,
70 }
71 }
72
73 Ok(())
74}