okx_ws_data/ws-data.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use futures_util::StreamExt;
21use nautilus_model::identifiers::InstrumentId;
22use nautilus_okx::{
23 common::enums::OKXInstrumentType, http::client::OKXHttpClient, websocket::OKXWebSocketClient,
24};
25use tokio::{pin, signal};
26use tracing::level_filters::LevelFilter;
27
28#[tokio::main]
29async fn main() -> Result<(), Box<dyn std::error::Error>> {
30 tracing_subscriber::fmt()
31 .with_max_level(LevelFilter::TRACE)
32 .init();
33
34 let http_client = OKXHttpClient::from_env().unwrap();
35 let instruments = http_client
36 .request_instruments(OKXInstrumentType::Swap)
37 .await?;
38
39 let mut ws_client = OKXWebSocketClient::from_env().unwrap();
40 ws_client.initialize_instruments_cache(instruments.clone());
41 ws_client.connect().await?;
42
43 let instrument_id = InstrumentId::from("BTC-USD-SWAP.OKX");
44
45 // let mut client_business = OKXWebSocketClient::new(
46 // Some(OKX_WS_BUSINESS_URL),
47 // None, // No API key for public feeds
48 // None, // No API secret
49 // None, // No API passphrase
50 // Some(10), // 10 second heartbeat
51 // )
52 // .unwrap();
53
54 // client_business.connect_data(instruments).await?;
55 // let bar_type = BarType::new(
56 // instrument_id,
57 // BAR_SPEC_1_MINUTE,
58 // AggregationSource::External,
59 // );
60 // client_business.subscribe_bars(bar_type).await?;
61
62 ws_client
63 .subscribe_instruments(OKXInstrumentType::Swap)
64 .await?;
65 // client.subscribe_tickers(instrument_id).await?;
66 // client.subscribe_trades(instrument_id, true).await?;
67 ws_client.subscribe_book(instrument_id).await?;
68 // client.subscribe_quotes(instrument_id).await?;
69
70 // tokio::time::sleep(Duration::from_secs(1)).await;
71
72 // client.subscribe_book(instrument_id).await?;
73 // client.subscribe_book_depth5(instrument_id).await?;
74 // client.subscribe_quotes(instrument_id).await?;
75 // client.subscribe_trades(instrument_id).await?;
76
77 // Create a future that completes on CTRL+C
78 let sigint = signal::ctrl_c();
79 pin!(sigint);
80
81 let stream = ws_client.stream();
82 tokio::pin!(stream); // Pin the stream to allow polling in the loop
83
84 loop {
85 tokio::select! {
86 Some(data) = stream.next() => {
87 tracing::debug!("{data:?}");
88 }
89 _ = &mut sigint => {
90 tracing::info!("Received SIGINT, closing connection...");
91 ws_client.close().await?;
92 break;
93 }
94 else => break,
95 }
96 }
97
98 Ok(())
99}