okx_ws_data/
ws-data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use futures_util::StreamExt;
21use nautilus_model::identifiers::InstrumentId;
22use nautilus_okx::{
23    common::enums::OKXInstrumentType, http::client::OKXHttpClient, websocket::OKXWebSocketClient,
24};
25use tokio::{pin, signal};
26use tracing::level_filters::LevelFilter;
27
28#[tokio::main]
29async fn main() -> Result<(), Box<dyn std::error::Error>> {
30    tracing_subscriber::fmt()
31        .with_max_level(LevelFilter::TRACE)
32        .init();
33
34    let http_client = OKXHttpClient::from_env().unwrap();
35    let instruments = http_client
36        .request_instruments(OKXInstrumentType::Swap)
37        .await?;
38
39    let mut ws_client = OKXWebSocketClient::from_env().unwrap();
40    ws_client.initialize_instruments_cache(instruments.clone());
41    ws_client.connect().await?;
42
43    let instrument_id = InstrumentId::from("BTC-USD-SWAP.OKX");
44
45    // let mut client_business = OKXWebSocketClient::new(
46    //     Some(OKX_WS_BUSINESS_URL),
47    //     None,     // No API key for public feeds
48    //     None,     // No API secret
49    //     None,     // No API passphrase
50    //     Some(10), // 10 second heartbeat
51    // )
52    // .unwrap();
53
54    // client_business.connect_data(instruments).await?;
55    // let bar_type = BarType::new(
56    //     instrument_id,
57    //     BAR_SPEC_1_MINUTE,
58    //     AggregationSource::External,
59    // );
60    // client_business.subscribe_bars(bar_type).await?;
61
62    ws_client
63        .subscribe_instruments(OKXInstrumentType::Swap)
64        .await?;
65    // client.subscribe_tickers(instrument_id).await?;
66    // client.subscribe_trades(instrument_id, true).await?;
67    ws_client.subscribe_book(instrument_id).await?;
68    // client.subscribe_quotes(instrument_id).await?;
69
70    // tokio::time::sleep(Duration::from_secs(1)).await;
71
72    // client.subscribe_book(instrument_id).await?;
73    // client.subscribe_book_depth5(instrument_id).await?;
74    // client.subscribe_quotes(instrument_id).await?;
75    // client.subscribe_trades(instrument_id).await?;
76
77    // Create a future that completes on CTRL+C
78    let sigint = signal::ctrl_c();
79    pin!(sigint);
80
81    let stream = ws_client.stream();
82    tokio::pin!(stream); // Pin the stream to allow polling in the loop
83
84    loop {
85        tokio::select! {
86            Some(data) = stream.next() => {
87                tracing::debug!("{data:?}");
88            }
89            _ = &mut sigint => {
90                tracing::info!("Received SIGINT, closing connection...");
91                ws_client.close().await?;
92                break;
93            }
94            else => break,
95        }
96    }
97
98    Ok(())
99}