okx_ws_data/
ws_data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use futures_util::StreamExt;
17use nautilus_model::identifiers::InstrumentId;
18use nautilus_okx::{
19    common::enums::OKXInstrumentType, http::client::OKXHttpClient, websocket::OKXWebSocketClient,
20};
21use tokio::{pin, signal};
22use tracing::level_filters::LevelFilter;
23
24#[tokio::main]
25async fn main() -> Result<(), Box<dyn std::error::Error>> {
26    tracing_subscriber::fmt()
27        .with_max_level(LevelFilter::TRACE)
28        .init();
29
30    let http_client = OKXHttpClient::from_env().unwrap();
31    let instruments = http_client
32        .request_instruments(OKXInstrumentType::Swap, None)
33        .await?;
34
35    let mut ws_client = OKXWebSocketClient::from_env().unwrap();
36    ws_client.initialize_instruments_cache(instruments.clone());
37    ws_client.connect().await?;
38
39    let instrument_id = InstrumentId::from("BTC-USD-SWAP.OKX");
40
41    // let mut client_business = OKXWebSocketClient::new(
42    //     Some(OKX_WS_BUSINESS_URL),
43    //     None,     // No API key for public feeds
44    //     None,     // No API secret
45    //     None,     // No API passphrase
46    //     Some(10), // 10 second heartbeat
47    // )
48    // .unwrap();
49
50    // client_business.connect_data(instruments).await?;
51    // let bar_type = BarType::new(
52    //     instrument_id,
53    //     BAR_SPEC_1_MINUTE,
54    //     AggregationSource::External,
55    // );
56    // client_business.subscribe_bars(bar_type).await?;
57
58    ws_client
59        .subscribe_instruments(OKXInstrumentType::Swap)
60        .await?;
61    // client.subscribe_tickers(instrument_id).await?;
62    // client.subscribe_trades(instrument_id, true).await?;
63    ws_client.subscribe_book(instrument_id).await?;
64    // client.subscribe_quotes(instrument_id).await?;
65
66    // tokio::time::sleep(Duration::from_secs(1)).await;
67
68    // client.subscribe_book(instrument_id).await?;
69    // client.subscribe_book_depth5(instrument_id).await?;
70    // client.subscribe_quotes(instrument_id).await?;
71    // client.subscribe_trades(instrument_id).await?;
72
73    // Create a future that completes on CTRL+C
74    let sigint = signal::ctrl_c();
75    pin!(sigint);
76
77    let stream = ws_client.stream();
78    tokio::pin!(stream); // Pin the stream to allow polling in the loop
79
80    loop {
81        tokio::select! {
82            Some(data) = stream.next() => {
83                tracing::debug!("{data:?}");
84            }
85            _ = &mut sigint => {
86                tracing::info!("Received SIGINT, closing connection...");
87                ws_client.close().await?;
88                break;
89            }
90            else => break,
91        }
92    }
93
94    Ok(())
95}