okx_ws_exec/
ws-exec.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::time::Duration;
21
22use futures_util::StreamExt;
23use nautilus_model::{
24    enums::{OrderSide, OrderType},
25    identifiers::{ClientOrderId, InstrumentId, StrategyId, TraderId},
26    types::Quantity,
27};
28use nautilus_okx::{
29    common::enums::{OKXInstrumentType, OKXTradeMode},
30    http::OKXHttpClient,
31    websocket::client::OKXWebSocketClient,
32};
33use tokio::{pin, signal};
34use tracing::level_filters::LevelFilter;
35
36#[tokio::main]
37async fn main() -> Result<(), Box<dyn std::error::Error>> {
38    tracing_subscriber::fmt()
39        .with_max_level(LevelFilter::TRACE)
40        .init();
41
42    let rest_client = OKXHttpClient::from_env().unwrap();
43
44    let inst_type = OKXInstrumentType::Swap;
45    let instruments = rest_client.request_instruments(inst_type).await?;
46
47    let mut ws_client = OKXWebSocketClient::from_env().unwrap();
48    ws_client.initialize_instruments_cache(instruments.clone());
49    ws_client.connect().await?;
50
51    // Subscribe to execution channels: orders and account updates
52    ws_client.subscribe_orders(inst_type).await?;
53    // ws_client.subscribe_account().await?;
54
55    // Wait briefly to ensure subscriptions are active
56    tokio::time::sleep(Duration::from_secs(1)).await;
57
58    let trader_id = TraderId::from("TRADER-001");
59    let strategy_id = StrategyId::from("SCALPER-001");
60    let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
61    let client_order_id = ClientOrderId::from("O20250711001");
62    let order_side = OrderSide::Buy;
63    let order_type = OrderType::Market;
64    let quantity = Quantity::from("0.01");
65
66    let resp = ws_client
67        .submit_order(
68            trader_id,
69            strategy_id,
70            instrument_id,
71            OKXTradeMode::Isolated,
72            client_order_id,
73            order_side,
74            order_type,
75            quantity,
76            None, // time_in_force
77            None, // price
78            None, // trigger_price
79            None, // post_only
80            None, // reduce_only
81            None, // quote_quantity
82            None, // position_side
83        )
84        .await;
85
86    match resp {
87        Ok(resp) => tracing::debug!("{resp:?}"),
88        Err(e) => tracing::error!("{e:?}"),
89    }
90
91    // Create a future that completes on CTRL+C
92    let sigint = signal::ctrl_c();
93    pin!(sigint);
94
95    let stream = ws_client.stream();
96    tokio::pin!(stream); // Pin the stream to allow polling in the loop
97
98    loop {
99        tokio::select! {
100            Some(data) = stream.next() => {
101                tracing::debug!("{data:?}");
102            }
103            _ = &mut sigint => {
104                tracing::info!("Received SIGINT, closing connection...");
105                ws_client.close().await?;
106                break;
107            }
108            else => break,
109        }
110    }
111
112    Ok(())
113}