kraken_ws_data/
ws_data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Connects to the Kraken public WebSocket feed and streams market data.
17//! Useful when manually validating the Rust WebSocket client implementation.
18
19use futures_util::StreamExt;
20use nautilus_kraken::{
21    config::KrakenDataClientConfig,
22    websocket::{client::KrakenWebSocketClient, enums::KrakenWsChannel},
23};
24use tokio::{pin, signal};
25use tokio_util::sync::CancellationToken;
26use tracing::level_filters::LevelFilter;
27use ustr::Ustr;
28
29#[tokio::main]
30async fn main() -> anyhow::Result<()> {
31    tracing_subscriber::fmt()
32        .with_max_level(LevelFilter::INFO)
33        .with_target(false)
34        .compact()
35        .init();
36
37    let config = KrakenDataClientConfig::default();
38    let token = CancellationToken::new();
39
40    let mut client = KrakenWebSocketClient::new(config, token.clone());
41
42    client.connect().await?;
43
44    client
45        .subscribe(KrakenWsChannel::Ticker, vec![Ustr::from("BTC/USD")], None)
46        .await?;
47
48    client
49        .subscribe(KrakenWsChannel::Trade, vec![Ustr::from("BTC/USD")], None)
50        .await?;
51
52    let stream = client.stream();
53    let shutdown = signal::ctrl_c();
54    pin!(stream);
55    pin!(shutdown);
56
57    tracing::info!("Streaming Kraken market data; press Ctrl+C to exit");
58
59    loop {
60        tokio::select! {
61            Some(msg) = stream.next() => {
62                tracing::info!("Received: {:#?}", msg);
63            }
64            _ = &mut shutdown => {
65                tracing::info!("Received Ctrl+C, closing connection");
66                client.disconnect().await?;
67                break;
68            }
69            else => break,
70        }
71    }
72
73    Ok(())
74}