okx_ws_exec/
ws_exec.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::time::Duration;
17
18use futures_util::StreamExt;
19use nautilus_model::{
20    enums::{OrderSide, OrderType},
21    identifiers::{ClientOrderId, InstrumentId, StrategyId, TraderId},
22    types::Quantity,
23};
24use nautilus_okx::{
25    common::enums::{OKXInstrumentType, OKXTradeMode},
26    http::OKXHttpClient,
27    websocket::client::OKXWebSocketClient,
28};
29use tokio::{pin, signal};
30use tracing::level_filters::LevelFilter;
31
32#[tokio::main]
33async fn main() -> Result<(), Box<dyn std::error::Error>> {
34    tracing_subscriber::fmt()
35        .with_max_level(LevelFilter::TRACE)
36        .init();
37
38    let rest_client = OKXHttpClient::from_env().unwrap();
39
40    let inst_type = OKXInstrumentType::Swap;
41    let instruments = rest_client.request_instruments(inst_type, None).await?;
42
43    let mut ws_client = OKXWebSocketClient::from_env().unwrap();
44    ws_client.initialize_instruments_cache(instruments.clone());
45    ws_client.connect().await?;
46
47    // Subscribe to execution channels: orders and account updates
48    ws_client.subscribe_orders(inst_type).await?;
49    // ws_client.subscribe_account().await?;
50
51    // Wait briefly to ensure subscriptions are active
52    tokio::time::sleep(Duration::from_secs(1)).await;
53
54    let trader_id = TraderId::from("TRADER-001");
55    let strategy_id = StrategyId::from("SCALPER-001");
56    let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
57    let client_order_id = ClientOrderId::from("O20250711001");
58    let order_side = OrderSide::Buy;
59    let order_type = OrderType::Market;
60    let quantity = Quantity::from("0.01");
61
62    let resp = ws_client
63        .submit_order(
64            trader_id,
65            strategy_id,
66            instrument_id,
67            OKXTradeMode::Isolated,
68            client_order_id,
69            order_side,
70            order_type,
71            quantity,
72            None, // time_in_force
73            None, // price
74            None, // trigger_price
75            None, // post_only
76            None, // reduce_only
77            None, // quote_quantity
78            None, // position_side
79        )
80        .await;
81
82    match resp {
83        Ok(resp) => tracing::debug!("{resp:?}"),
84        Err(e) => tracing::error!("{e:?}"),
85    }
86
87    // Create a future that completes on CTRL+C
88    let sigint = signal::ctrl_c();
89    pin!(sigint);
90
91    let stream = ws_client.stream();
92    tokio::pin!(stream); // Pin the stream to allow polling in the loop
93
94    loop {
95        tokio::select! {
96            Some(data) = stream.next() => {
97                tracing::debug!("{data:?}");
98            }
99            _ = &mut sigint => {
100                tracing::info!("Received SIGINT, closing connection...");
101                ws_client.close().await?;
102                break;
103            }
104            else => break,
105        }
106    }
107
108    Ok(())
109}