okx_ws_data/ws_data.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use futures_util::StreamExt;
17use nautilus_model::identifiers::InstrumentId;
18use nautilus_okx::{
19 common::enums::OKXInstrumentType, http::client::OKXHttpClient, websocket::OKXWebSocketClient,
20};
21use tokio::{pin, signal};
22use tracing::level_filters::LevelFilter;
23
24#[tokio::main]
25async fn main() -> Result<(), Box<dyn std::error::Error>> {
26 tracing_subscriber::fmt()
27 .with_max_level(LevelFilter::TRACE)
28 .init();
29
30 let http_client = OKXHttpClient::from_env().unwrap();
31 let instruments = http_client
32 .request_instruments(OKXInstrumentType::Swap, None)
33 .await?;
34
35 let mut ws_client = OKXWebSocketClient::from_env().unwrap();
36 ws_client.initialize_instruments_cache(instruments.clone());
37 ws_client.connect().await?;
38
39 let instrument_id = InstrumentId::from("BTC-USD-SWAP.OKX");
40
41 // let mut client_business = OKXWebSocketClient::new(
42 // Some(OKX_WS_BUSINESS_URL),
43 // None, // No API key for public feeds
44 // None, // No API secret
45 // None, // No API passphrase
46 // Some(10), // 10 second heartbeat
47 // )
48 // .unwrap();
49
50 // client_business.connect_data(instruments).await?;
51 // let bar_type = BarType::new(
52 // instrument_id,
53 // BAR_SPEC_1_MINUTE,
54 // AggregationSource::External,
55 // );
56 // client_business.subscribe_bars(bar_type).await?;
57
58 ws_client
59 .subscribe_instruments(OKXInstrumentType::Swap)
60 .await?;
61 // client.subscribe_tickers(instrument_id).await?;
62 // client.subscribe_trades(instrument_id, true).await?;
63 ws_client.subscribe_book(instrument_id).await?;
64 // client.subscribe_quotes(instrument_id).await?;
65
66 // tokio::time::sleep(Duration::from_secs(1)).await;
67
68 // client.subscribe_book(instrument_id).await?;
69 // client.subscribe_book_depth5(instrument_id).await?;
70 // client.subscribe_quotes(instrument_id).await?;
71 // client.subscribe_trades(instrument_id).await?;
72
73 // Create a future that completes on CTRL+C
74 let sigint = signal::ctrl_c();
75 pin!(sigint);
76
77 let stream = ws_client.stream();
78 tokio::pin!(stream); // Pin the stream to allow polling in the loop
79
80 loop {
81 tokio::select! {
82 Some(data) = stream.next() => {
83 tracing::debug!("{data:?}");
84 }
85 _ = &mut sigint => {
86 tracing::info!("Received SIGINT, closing connection...");
87 ws_client.close().await?;
88 break;
89 }
90 else => break,
91 }
92 }
93
94 Ok(())
95}