nautilus_okx/websocket/
messages.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use derive_builder::Builder;
17use nautilus_model::{
18    data::{Data, FundingRateUpdate, OrderBookDeltas},
19    events::{AccountState, OrderCancelRejected, OrderModifyRejected, OrderRejected},
20    instruments::InstrumentAny,
21    reports::{FillReport, OrderStatusReport},
22};
23use serde::{Deserialize, Serialize};
24use ustr::Ustr;
25
26use super::enums::{OKXWsChannel, OKXWsOperation};
27use crate::{
28    common::{
29        enums::{
30            OKXBookAction, OKXCandleConfirm, OKXExecType, OKXInstrumentType, OKXOrderStatus,
31            OKXOrderType, OKXPositionSide, OKXSide, OKXTradeMode,
32        },
33        parse::{deserialize_empty_string_as_none, deserialize_string_to_u64},
34    },
35    websocket::enums::OKXSubscriptionEvent,
36};
37
38#[derive(Debug, Clone)]
39pub enum NautilusWsMessage {
40    Data(Vec<Data>),
41    Deltas(OrderBookDeltas),
42    FundingRates(Vec<FundingRateUpdate>),
43    Instrument(Box<InstrumentAny>),
44    AccountUpdate(AccountState),
45    OrderRejected(OrderRejected),
46    OrderCancelRejected(OrderCancelRejected),
47    OrderModifyRejected(OrderModifyRejected),
48    ExecutionReports(Vec<ExecutionReport>),
49    Error(OKXWebSocketError),
50    Raw(serde_json::Value), // Unhandled channels
51    Reconnected,
52}
53
54/// Represents an OKX WebSocket error.
55#[derive(Debug, Clone, Serialize, Deserialize)]
56#[cfg_attr(feature = "python", pyo3::pyclass)]
57pub struct OKXWebSocketError {
58    /// Error code from OKX (e.g., "50101").
59    pub code: String,
60    /// Error message from OKX.
61    pub message: String,
62    /// Connection ID if available.
63    pub conn_id: Option<String>,
64    /// Timestamp when the error occurred.
65    pub timestamp: u64,
66}
67
68#[derive(Debug, Clone)]
69#[allow(clippy::large_enum_variant)]
70pub enum ExecutionReport {
71    Order(OrderStatusReport),
72    Fill(FillReport),
73}
74
75/// Generic WebSocket request for OKX trading commands.
76#[derive(Debug, Serialize)]
77#[serde(rename_all = "camelCase")]
78pub struct OKXWsRequest<T> {
79    /// Client request ID (required for order operations).
80    #[serde(skip_serializing_if = "Option::is_none")]
81    pub id: Option<String>,
82    /// Operation type (order, cancel-order, amend-order).
83    pub op: OKXWsOperation,
84    /// Request effective deadline. Unix timestamp format in milliseconds.
85    /// This is when the request itself expires, not related to order expiration.
86    #[serde(skip_serializing_if = "Option::is_none")]
87    #[serde(rename = "expTime")]
88    pub exp_time: Option<String>,
89    /// Arguments payload for the operation.
90    pub args: Vec<T>,
91}
92
93/// OKX WebSocket authentication message.
94#[derive(Debug, Serialize)]
95pub struct OKXAuthentication {
96    pub op: &'static str,
97    pub args: Vec<OKXAuthenticationArg>,
98}
99
100/// OKX WebSocket authentication arguments.
101#[derive(Debug, Serialize)]
102#[serde(rename_all = "camelCase")]
103pub struct OKXAuthenticationArg {
104    pub api_key: String,
105    pub passphrase: String,
106    pub timestamp: String,
107    pub sign: String,
108}
109
110#[derive(Debug, Serialize)]
111pub struct OKXSubscription {
112    pub op: OKXWsOperation,
113    pub args: Vec<OKXSubscriptionArg>,
114}
115
116#[derive(Debug, Serialize)]
117#[serde(rename_all = "camelCase")]
118pub struct OKXSubscriptionArg {
119    pub channel: OKXWsChannel,
120    pub inst_type: Option<OKXInstrumentType>,
121    pub inst_family: Option<Ustr>,
122    pub inst_id: Option<Ustr>,
123}
124
125#[derive(Debug, Deserialize)]
126#[serde(untagged)]
127pub enum OKXWebSocketEvent {
128    Login {
129        event: String,
130        code: String,
131        msg: String,
132        #[serde(rename = "connId")]
133        conn_id: String,
134    },
135    Subscription {
136        event: OKXSubscriptionEvent,
137        arg: OKXWebSocketArg,
138        #[serde(rename = "connId")]
139        conn_id: String,
140    },
141    ChannelConnCount {
142        event: String,
143        channel: OKXWsChannel,
144        #[serde(rename = "connCount")]
145        conn_count: String,
146        #[serde(rename = "connId")]
147        conn_id: String,
148    },
149    OrderResponse {
150        id: Option<String>,
151        op: OKXWsOperation,
152        code: String,
153        msg: String,
154        data: Vec<serde_json::Value>,
155    },
156    BookData {
157        arg: OKXWebSocketArg,
158        action: OKXBookAction,
159        data: Vec<OKXBookMsg>,
160    },
161    Data {
162        arg: OKXWebSocketArg,
163        data: serde_json::Value,
164    },
165    Error {
166        code: String,
167        msg: String,
168    },
169    #[serde(skip)]
170    Reconnected,
171}
172
173#[derive(Debug, Serialize, Deserialize)]
174#[serde(rename_all = "camelCase")]
175pub struct OKXWebSocketArg {
176    /// Channel name that pushed the data.
177    pub channel: OKXWsChannel,
178    #[serde(default)]
179    pub inst_id: Option<Ustr>,
180    #[serde(default)]
181    pub inst_type: Option<OKXInstrumentType>,
182    #[serde(default)]
183    pub bar: Option<Ustr>,
184}
185
186/// Ticker data for an instrument.
187#[derive(Debug, Serialize, Deserialize)]
188#[serde(rename_all = "camelCase")]
189pub struct OKXTickerMsg {
190    /// Instrument type, e.g. "SPOT", "SWAP".
191    pub inst_type: OKXInstrumentType,
192    /// Instrument ID, e.g. "BTC-USDT".
193    pub inst_id: Ustr,
194    /// Last traded price.
195    #[serde(rename = "last")]
196    pub last_px: String,
197    /// Last traded size.
198    pub last_sz: String,
199    /// Best ask price.
200    pub ask_px: String,
201    /// Best ask size.
202    pub ask_sz: String,
203    /// Best bid price.
204    pub bid_px: String,
205    /// Best bid size.
206    pub bid_sz: String,
207    /// 24-hour opening price.
208    pub open24h: String,
209    /// 24-hour highest price.
210    pub high24h: String,
211    /// 24-hour lowest price.
212    pub low24h: String,
213    /// 24-hour trading volume in quote currency.
214    pub vol_ccy_24h: String,
215    /// 24-hour trading volume.
216    pub vol24h: String,
217    /// The opening price of the day (UTC 0).
218    pub sod_utc0: String,
219    /// The opening price of the day (UTC 8).
220    pub sod_utc8: String,
221    /// Timestamp of the data generation, Unix timestamp format in milliseconds.
222    #[serde(deserialize_with = "deserialize_string_to_u64")]
223    pub ts: u64,
224}
225
226/// Represents a single order in the order book.
227#[derive(Debug, Serialize, Deserialize)]
228pub struct OrderBookEntry {
229    /// Price of the order.
230    pub price: String,
231    /// Size of the order.
232    pub size: String,
233    /// Number of liquidated orders.
234    pub liquidated_orders_count: String,
235    /// Total number of orders at this price.
236    pub orders_count: String,
237}
238
239/// Order book data for an instrument.
240#[derive(Debug, Serialize, Deserialize)]
241#[serde(rename_all = "camelCase")]
242pub struct OKXBookMsg {
243    /// Order book asks [price, size, liquidated orders count, orders count].
244    pub asks: Vec<OrderBookEntry>,
245    /// Order book bids [price, size, liquidated orders count, orders count].
246    pub bids: Vec<OrderBookEntry>,
247    /// Checksum value.
248    pub checksum: Option<i64>,
249    /// Sequence ID of the last sent message. Only applicable to books, books-l2-tbt, books50-l2-tbt.
250    pub prev_seq_id: Option<i64>,
251    /// Sequence ID of the current message, implementation details below.
252    pub seq_id: u64,
253    /// Order book generation time, Unix timestamp format in milliseconds, e.g. 1597026383085.
254    #[serde(deserialize_with = "deserialize_string_to_u64")]
255    pub ts: u64,
256}
257
258/// Trade data for an instrument.
259#[derive(Debug, Serialize, Deserialize)]
260#[serde(rename_all = "camelCase")]
261pub struct OKXTradeMsg {
262    /// Instrument ID.
263    pub inst_id: Ustr,
264    /// Trade ID.
265    pub trade_id: String,
266    /// Trade price.
267    pub px: String,
268    /// Trade size.
269    pub sz: String,
270    /// Trade direction (buy or sell).
271    pub side: OKXSide,
272    /// Count.
273    pub count: String,
274    /// Trade timestamp, Unix timestamp format in milliseconds.
275    #[serde(deserialize_with = "deserialize_string_to_u64")]
276    pub ts: u64,
277}
278
279/// Funding rate data for perpetual swaps.
280#[derive(Debug, Serialize, Deserialize)]
281#[serde(rename_all = "camelCase")]
282pub struct OKXFundingRateMsg {
283    /// Instrument ID.
284    pub inst_id: Ustr,
285    /// Current funding rate.
286    pub funding_rate: Ustr,
287    /// Predicted next funding rate.
288    pub next_funding_rate: Ustr,
289    /// Next funding time, Unix timestamp format in milliseconds.
290    #[serde(deserialize_with = "deserialize_string_to_u64")]
291    pub funding_time: u64,
292    /// Message timestamp, Unix timestamp format in milliseconds.
293    #[serde(deserialize_with = "deserialize_string_to_u64")]
294    pub ts: u64,
295}
296
297/// Mark price data for perpetual swaps.
298#[derive(Debug, Serialize, Deserialize)]
299#[serde(rename_all = "camelCase")]
300pub struct OKXMarkPriceMsg {
301    /// Instrument ID.
302    pub inst_id: Ustr,
303    /// Current mark price.
304    pub mark_px: String,
305    /// Timestamp of the data generation, Unix timestamp format in milliseconds.
306    #[serde(deserialize_with = "deserialize_string_to_u64")]
307    pub ts: u64,
308}
309
310/// Index price data.
311#[derive(Debug, Serialize, Deserialize)]
312#[serde(rename_all = "camelCase")]
313pub struct OKXIndexPriceMsg {
314    /// Index name, e.g. "BTC-USD".
315    pub inst_id: Ustr,
316    /// Latest index price.
317    pub idx_px: String,
318    /// 24-hour highest price.
319    pub high24h: String,
320    /// 24-hour lowest price.
321    pub low24h: String,
322    /// 24-hour opening price.
323    pub open24h: String,
324    /// The opening price of the day (UTC 0).
325    pub sod_utc0: String,
326    /// The opening price of the day (UTC 8).
327    pub sod_utc8: String,
328    /// Timestamp of the data generation, Unix timestamp format in milliseconds.
329    #[serde(deserialize_with = "deserialize_string_to_u64")]
330    pub ts: u64,
331}
332
333/// Price limit data (upper and lower limits).
334#[derive(Debug, Serialize, Deserialize)]
335#[serde(rename_all = "camelCase")]
336pub struct OKXPriceLimitMsg {
337    /// Instrument ID.
338    pub inst_id: Ustr,
339    /// Buy limit price.
340    pub buy_lmt: String,
341    /// Sell limit price.
342    pub sell_lmt: String,
343    /// Timestamp of the data generation, Unix timestamp format in milliseconds.
344    #[serde(deserialize_with = "deserialize_string_to_u64")]
345    pub ts: u64,
346}
347
348/// Candlestick data for an instrument.
349#[derive(Debug, Serialize, Deserialize)]
350#[serde(rename_all = "camelCase")]
351pub struct OKXCandleMsg {
352    /// Candlestick timestamp, Unix timestamp format in milliseconds.
353    #[serde(deserialize_with = "deserialize_string_to_u64")]
354    pub ts: u64,
355    /// Opening price.
356    pub o: String,
357    /// Highest price.
358    pub h: String,
359    /// Lowest price.
360    pub l: String,
361    /// Closing price.
362    pub c: String,
363    /// Trading volume in contracts.
364    pub vol: String,
365    /// Trading volume in quote currency.
366    pub vol_ccy: String,
367    pub vol_ccy_quote: String,
368    /// Whether this is a completed candle.
369    pub confirm: OKXCandleConfirm,
370}
371
372/// Open interest data.
373#[derive(Debug, Serialize, Deserialize)]
374#[serde(rename_all = "camelCase")]
375pub struct OKXOpenInterestMsg {
376    /// Instrument ID.
377    pub inst_id: Ustr,
378    /// Open interest in contracts.
379    pub oi: String,
380    /// Open interest in quote currency.
381    pub oi_ccy: String,
382    /// Timestamp of the data generation, Unix timestamp format in milliseconds.
383    #[serde(deserialize_with = "deserialize_string_to_u64")]
384    pub ts: u64,
385}
386
387/// Option market data summary.
388#[derive(Debug, Serialize, Deserialize)]
389#[serde(rename_all = "camelCase")]
390pub struct OKXOptionSummaryMsg {
391    /// Instrument ID.
392    pub inst_id: Ustr,
393    /// Underlying.
394    pub uly: String,
395    /// Delta.
396    pub delta: String,
397    /// Gamma.
398    pub gamma: String,
399    /// Theta.
400    pub theta: String,
401    /// Vega.
402    pub vega: String,
403    /// Black-Scholes implied volatility delta.
404    pub delta_bs: String,
405    /// Black-Scholes implied volatility gamma.
406    pub gamma_bs: String,
407    /// Black-Scholes implied volatility theta.
408    pub theta_bs: String,
409    /// Black-Scholes implied volatility vega.
410    pub vega_bs: String,
411    /// Realized volatility.
412    pub real_vol: String,
413    /// Bid volatility.
414    pub bid_vol: String,
415    /// Ask volatility.
416    pub ask_vol: String,
417    /// Mark volatility.
418    pub mark_vol: String,
419    /// Leverage.
420    pub lever: String,
421    /// Timestamp of the data generation, Unix timestamp format in milliseconds.
422    #[serde(deserialize_with = "deserialize_string_to_u64")]
423    pub ts: u64,
424}
425
426/// Estimated delivery/exercise price data.
427#[derive(Debug, Serialize, Deserialize)]
428#[serde(rename_all = "camelCase")]
429pub struct OKXEstimatedPriceMsg {
430    /// Instrument ID.
431    pub inst_id: Ustr,
432    /// Estimated settlement price.
433    pub settle_px: String,
434    /// Timestamp of the data generation, Unix timestamp format in milliseconds.
435    #[serde(deserialize_with = "deserialize_string_to_u64")]
436    pub ts: u64,
437}
438
439/// Platform status updates.
440#[derive(Debug, Serialize, Deserialize)]
441#[serde(rename_all = "camelCase")]
442pub struct OKXStatusMsg {
443    /// System maintenance status.
444    pub title: Ustr,
445    /// Status type: planned or scheduled.
446    #[serde(rename = "type")]
447    pub status_type: Ustr,
448    /// System maintenance state: canceled, completed, pending, ongoing.
449    pub state: Ustr,
450    /// Expected completion timestamp.
451    pub end_time: Option<String>,
452    /// Planned start timestamp.
453    pub begin_time: Option<String>,
454    /// Service involved.
455    pub service_type: Option<Ustr>,
456    /// Reason for status change.
457    pub reason: Option<String>,
458    /// Timestamp of the data generation, Unix timestamp format in milliseconds.
459    #[serde(deserialize_with = "deserialize_string_to_u64")]
460    pub ts: u64,
461}
462
463/// Order update message from WebSocket orders channel.
464#[derive(Debug, Serialize, Deserialize)]
465#[serde(rename_all = "camelCase")]
466pub struct OKXOrderMsg {
467    /// Accumulated filled size.
468    #[serde(default, deserialize_with = "deserialize_empty_string_as_none")]
469    pub acc_fill_sz: Option<String>,
470    /// Algorithm client order ID.
471    #[serde(default)]
472    pub algo_cl_ord_id: Option<String>,
473    /// Algorithm ID.
474    #[serde(default)]
475    pub algo_id: Option<String>,
476    /// Average price.
477    pub avg_px: String,
478    /// Creation time, Unix timestamp in milliseconds.
479    #[serde(deserialize_with = "deserialize_string_to_u64")]
480    pub c_time: u64,
481    /// Cancel source.
482    #[serde(default)]
483    pub cancel_source: Option<String>,
484    /// Cancel source reason.
485    #[serde(default)]
486    pub cancel_source_reason: Option<String>,
487    /// Category.
488    pub category: Ustr,
489    /// Currency.
490    pub ccy: Ustr,
491    /// Client order ID.
492    pub cl_ord_id: String,
493    /// Fee.
494    #[serde(default, deserialize_with = "deserialize_empty_string_as_none")]
495    pub fee: Option<String>,
496    /// Fee currency.
497    pub fee_ccy: Ustr,
498    /// Fill price.
499    pub fill_px: String,
500    /// Fill size.
501    pub fill_sz: String,
502    /// Fill time, Unix timestamp in milliseconds.
503    #[serde(deserialize_with = "deserialize_string_to_u64")]
504    pub fill_time: u64,
505    /// Instrument ID.
506    pub inst_id: Ustr,
507    /// Instrument type.
508    pub inst_type: OKXInstrumentType,
509    /// Leverage.
510    pub lever: String,
511    /// Order ID.
512    pub ord_id: Ustr,
513    /// Order type.
514    pub ord_type: OKXOrderType,
515    /// Profit and loss.
516    pub pnl: String,
517    /// Position side.
518    pub pos_side: Ustr,
519    /// Price.
520    pub px: String,
521    /// Reduce only flag.
522    pub reduce_only: String,
523    /// Side.
524    pub side: OKXSide,
525    /// Order state.
526    pub state: OKXOrderStatus,
527    /// Execution type.
528    pub exec_type: OKXExecType,
529    /// Size.
530    pub sz: String,
531    /// Trade mode.
532    pub td_mode: OKXTradeMode,
533    /// Trade ID.
534    pub trade_id: String,
535    /// Last update time, Unix timestamp in milliseconds.
536    #[serde(deserialize_with = "deserialize_string_to_u64")]
537    pub u_time: u64,
538}
539
540/// Parameters for WebSocket place order operation.
541#[derive(Clone, Debug, Deserialize, Serialize, Builder)]
542#[builder(setter(into, strip_option))]
543#[serde(rename_all = "camelCase")]
544pub struct WsPostOrderParams {
545    /// Instrument type: SPOT, MARGIN, SWAP, FUTURES, OPTION (optional for WebSocket).
546    #[builder(default)]
547    #[serde(skip_serializing_if = "Option::is_none")]
548    pub inst_type: Option<OKXInstrumentType>,
549    /// Instrument ID, e.g. "BTC-USDT".
550    pub inst_id: Ustr,
551    /// Trading mode: cash, isolated, cross.
552    pub td_mode: OKXTradeMode,
553    /// Margin currency (only for isolated margin).
554    #[builder(default)]
555    #[serde(skip_serializing_if = "Option::is_none")]
556    pub ccy: Option<Ustr>,
557    /// Unique client order ID.
558    #[serde(skip_serializing_if = "Option::is_none")]
559    pub cl_ord_id: Option<String>,
560    /// Order side: buy or sell.
561    pub side: OKXSide,
562    /// Position side: long, short, net (optional).
563    #[builder(default)]
564    #[serde(skip_serializing_if = "Option::is_none")]
565    pub pos_side: Option<OKXPositionSide>,
566    /// Order type: limit, market, post_only, fok, ioc, etc.
567    pub ord_type: OKXOrderType,
568    /// Order size.
569    pub sz: String,
570    /// Order price (required for limit orders).
571    #[builder(default)]
572    #[serde(skip_serializing_if = "Option::is_none")]
573    pub px: Option<String>,
574    /// Reduce-only flag.
575    #[builder(default)]
576    #[serde(skip_serializing_if = "Option::is_none")]
577    pub reduce_only: Option<bool>,
578    /// Target currency for net orders.
579    #[builder(default)]
580    #[serde(skip_serializing_if = "Option::is_none")]
581    pub tgt_ccy: Option<String>,
582    /// Order tag for categorization.
583    #[builder(default)]
584    #[serde(skip_serializing_if = "Option::is_none")]
585    pub tag: Option<String>,
586}
587
588/// Parameters for WebSocket cancel order operation (instType not included).
589#[derive(Clone, Debug, Default, Deserialize, Serialize, Builder)]
590#[builder(default)]
591#[builder(setter(into, strip_option))]
592#[serde(rename_all = "camelCase")]
593pub struct WsCancelOrderParams {
594    /// Instrument ID, e.g. "BTC-USDT".
595    pub inst_id: Ustr,
596    /// Exchange-assigned order ID.
597    #[serde(skip_serializing_if = "Option::is_none")]
598    pub ord_id: Option<String>,
599    /// User-assigned client order ID.
600    #[serde(skip_serializing_if = "Option::is_none")]
601    pub cl_ord_id: Option<String>,
602}
603
604/// Parameters for WebSocket mass cancel operation.
605#[derive(Clone, Debug, Default, Deserialize, Serialize, Builder)]
606#[builder(default)]
607#[builder(setter(into, strip_option))]
608#[serde(rename_all = "camelCase")]
609pub struct WsMassCancelParams {
610    /// Instrument type.
611    pub inst_type: OKXInstrumentType,
612    /// Instrument family, e.g. "BTC-USD", "BTC-USDT".
613    pub inst_family: Ustr,
614}
615
616/// Parameters for WebSocket amend order operation (instType not included).
617#[derive(Clone, Debug, Default, Deserialize, Serialize, Builder)]
618#[builder(default)]
619#[builder(setter(into, strip_option))]
620#[serde(rename_all = "camelCase")]
621pub struct WsAmendOrderParams {
622    /// Instrument ID, e.g. "BTC-USDT".
623    pub inst_id: Ustr,
624    /// Exchange-assigned order ID (optional if using clOrdId).
625    #[serde(skip_serializing_if = "Option::is_none")]
626    pub ord_id: Option<String>,
627    /// User-assigned client order ID (optional if using ordId).
628    #[serde(skip_serializing_if = "Option::is_none")]
629    pub cl_ord_id: Option<String>,
630    /// New client order ID for the amended order.
631    #[serde(skip_serializing_if = "Option::is_none")]
632    pub new_cl_ord_id: Option<String>,
633    /// New order price (optional).
634    #[serde(skip_serializing_if = "Option::is_none")]
635    pub new_px: Option<String>,
636    /// New order size (optional).
637    #[serde(skip_serializing_if = "Option::is_none")]
638    pub new_sz: Option<String>,
639}
640
641////////////////////////////////////////////////////////////////////////////////
642// Tests
643////////////////////////////////////////////////////////////////////////////////
644#[cfg(test)]
645mod tests {
646    use nautilus_core::time::get_atomic_clock_realtime;
647    use rstest::rstest;
648
649    use super::*;
650
651    #[rstest]
652    fn test_deserialize_websocket_arg() {
653        let json_str = r#"{"channel":"instruments","instType":"SPOT"}"#;
654
655        let result: Result<OKXWebSocketArg, _> = serde_json::from_str(json_str);
656        match result {
657            Ok(arg) => {
658                assert_eq!(arg.channel, OKXWsChannel::Instruments);
659                assert_eq!(arg.inst_type, Some(OKXInstrumentType::Spot));
660                assert_eq!(arg.inst_id, None);
661            }
662            Err(e) => {
663                panic!("Failed to deserialize WebSocket arg: {e}");
664            }
665        }
666    }
667
668    #[rstest]
669    fn test_deserialize_subscribe_variant_direct() {
670        #[derive(Debug, Deserialize)]
671        #[serde(rename_all = "camelCase")]
672        struct SubscribeMsg {
673            event: String,
674            arg: OKXWebSocketArg,
675            conn_id: String,
676        }
677
678        let json_str = r#"{"event":"subscribe","arg":{"channel":"instruments","instType":"SPOT"},"connId":"380cfa6a"}"#;
679
680        let result: Result<SubscribeMsg, _> = serde_json::from_str(json_str);
681        match result {
682            Ok(msg) => {
683                assert_eq!(msg.event, "subscribe");
684                assert_eq!(msg.arg.channel, OKXWsChannel::Instruments);
685                assert_eq!(msg.conn_id, "380cfa6a");
686            }
687            Err(e) => {
688                panic!("Failed to deserialize subscribe message directly: {e}");
689            }
690        }
691    }
692
693    #[rstest]
694    fn test_deserialize_subscribe_confirmation() {
695        let json_str = r#"{"event":"subscribe","arg":{"channel":"instruments","instType":"SPOT"},"connId":"380cfa6a"}"#;
696
697        let result: Result<OKXWebSocketEvent, _> = serde_json::from_str(json_str);
698        match result {
699            Ok(msg) => {
700                if let OKXWebSocketEvent::Subscription {
701                    event,
702                    arg,
703                    conn_id,
704                } = msg
705                {
706                    assert_eq!(event, OKXSubscriptionEvent::Subscribe);
707                    assert_eq!(arg.channel, OKXWsChannel::Instruments);
708                    assert_eq!(conn_id, "380cfa6a");
709                } else {
710                    panic!("Expected Subscribe variant, got: {msg:?}");
711                }
712            }
713            Err(e) => {
714                panic!("Failed to deserialize subscription confirmation: {e}");
715            }
716        }
717    }
718
719    #[rstest]
720    fn test_deserialize_subscribe_with_inst_id() {
721        let json_str = r#"{"event":"subscribe","arg":{"channel":"candle1m","instId":"ETH-USDT"},"connId":"358602f5"}"#;
722
723        let result: Result<OKXWebSocketEvent, _> = serde_json::from_str(json_str);
724        match result {
725            Ok(msg) => {
726                if let OKXWebSocketEvent::Subscription {
727                    event,
728                    arg,
729                    conn_id,
730                } = msg
731                {
732                    assert_eq!(event, OKXSubscriptionEvent::Subscribe);
733                    assert_eq!(arg.channel, OKXWsChannel::Candle1Minute);
734                    assert_eq!(conn_id, "358602f5");
735                } else {
736                    panic!("Expected Subscribe variant, got: {msg:?}");
737                }
738            }
739            Err(e) => {
740                panic!("Failed to deserialize subscription confirmation: {e}");
741            }
742        }
743    }
744
745    #[rstest]
746    fn test_channel_serialization_for_logging() {
747        let channel = OKXWsChannel::Candle1Minute;
748        let serialized = serde_json::to_string(&channel).unwrap();
749        let cleaned = serialized.trim_matches('"').to_string();
750        assert_eq!(cleaned, "candle1m");
751
752        let channel = OKXWsChannel::BboTbt;
753        let serialized = serde_json::to_string(&channel).unwrap();
754        let cleaned = serialized.trim_matches('"').to_string();
755        assert_eq!(cleaned, "bbo-tbt");
756
757        let channel = OKXWsChannel::Trades;
758        let serialized = serde_json::to_string(&channel).unwrap();
759        let cleaned = serialized.trim_matches('"').to_string();
760        assert_eq!(cleaned, "trades");
761    }
762
763    #[rstest]
764    fn test_order_response_with_enum_operation() {
765        let json_str = r#"{"id":"req-123","op":"order","code":"0","msg":"","data":[]}"#;
766        let result: Result<OKXWebSocketEvent, _> = serde_json::from_str(json_str);
767        match result {
768            Ok(OKXWebSocketEvent::OrderResponse {
769                id,
770                op,
771                code,
772                msg,
773                data,
774            }) => {
775                assert_eq!(id, Some("req-123".to_string()));
776                assert_eq!(op, OKXWsOperation::Order);
777                assert_eq!(code, "0");
778                assert_eq!(msg, "");
779                assert!(data.is_empty());
780            }
781            Ok(other) => panic!("Expected OrderResponse, got: {other:?}"),
782            Err(e) => panic!("Failed to deserialize: {e}"),
783        }
784
785        let json_str = r#"{"id":"cancel-456","op":"cancel-order","code":"50001","msg":"Order not found","data":[]}"#;
786        let result: Result<OKXWebSocketEvent, _> = serde_json::from_str(json_str);
787        match result {
788            Ok(OKXWebSocketEvent::OrderResponse {
789                id,
790                op,
791                code,
792                msg,
793                data,
794            }) => {
795                assert_eq!(id, Some("cancel-456".to_string()));
796                assert_eq!(op, OKXWsOperation::CancelOrder);
797                assert_eq!(code, "50001");
798                assert_eq!(msg, "Order not found");
799                assert!(data.is_empty());
800            }
801            Ok(other) => panic!("Expected OrderResponse, got: {other:?}"),
802            Err(e) => panic!("Failed to deserialize: {e}"),
803        }
804
805        let json_str = r#"{"id":"amend-789","op":"amend-order","code":"50002","msg":"Invalid price","data":[]}"#;
806        let result: Result<OKXWebSocketEvent, _> = serde_json::from_str(json_str);
807        match result {
808            Ok(OKXWebSocketEvent::OrderResponse {
809                id,
810                op,
811                code,
812                msg,
813                data,
814            }) => {
815                assert_eq!(id, Some("amend-789".to_string()));
816                assert_eq!(op, OKXWsOperation::AmendOrder);
817                assert_eq!(code, "50002");
818                assert_eq!(msg, "Invalid price");
819                assert!(data.is_empty());
820            }
821            Ok(other) => panic!("Expected OrderResponse, got: {other:?}"),
822            Err(e) => panic!("Failed to deserialize: {e}"),
823        }
824    }
825
826    #[rstest]
827    fn test_operation_enum_serialization() {
828        let op = OKXWsOperation::Order;
829        let serialized = serde_json::to_string(&op).unwrap();
830        assert_eq!(serialized, "\"order\"");
831
832        let op = OKXWsOperation::CancelOrder;
833        let serialized = serde_json::to_string(&op).unwrap();
834        assert_eq!(serialized, "\"cancel-order\"");
835
836        let op = OKXWsOperation::AmendOrder;
837        let serialized = serde_json::to_string(&op).unwrap();
838        assert_eq!(serialized, "\"amend-order\"");
839
840        let op = OKXWsOperation::Subscribe;
841        let serialized = serde_json::to_string(&op).unwrap();
842        assert_eq!(serialized, "\"subscribe\"");
843    }
844
845    #[rstest]
846    fn test_order_response_parsing() {
847        let success_response = r#"{
848            "id": "req-123",
849            "op": "order",
850            "code": "0",
851            "msg": "",
852            "data": [{"sMsg": "Order placed successfully"}]
853        }"#;
854
855        let parsed: OKXWebSocketEvent = serde_json::from_str(success_response).unwrap();
856
857        match parsed {
858            OKXWebSocketEvent::OrderResponse {
859                id,
860                op,
861                code,
862                msg,
863                data,
864            } => {
865                assert_eq!(id, Some("req-123".to_string()));
866                assert_eq!(op, OKXWsOperation::Order);
867                assert_eq!(code, "0");
868                assert_eq!(msg, "");
869                assert_eq!(data.len(), 1);
870            }
871            _ => panic!("Expected OrderResponse variant"),
872        }
873
874        let failure_response = r#"{
875            "id": "req-456",
876            "op": "cancel-order",
877            "code": "50001",
878            "msg": "Order not found",
879            "data": [{"sMsg": "Order with client order ID not found"}]
880        }"#;
881
882        let parsed: OKXWebSocketEvent = serde_json::from_str(failure_response).unwrap();
883
884        match parsed {
885            OKXWebSocketEvent::OrderResponse {
886                id,
887                op,
888                code,
889                msg,
890                data,
891            } => {
892                assert_eq!(id, Some("req-456".to_string()));
893                assert_eq!(op, OKXWsOperation::CancelOrder);
894                assert_eq!(code, "50001");
895                assert_eq!(msg, "Order not found");
896                assert_eq!(data.len(), 1);
897            }
898            _ => panic!("Expected OrderResponse variant"),
899        }
900    }
901
902    #[rstest]
903    fn test_subscription_event_parsing() {
904        let subscription_json = r#"{
905            "event": "subscribe",
906            "arg": {
907                "channel": "tickers",
908                "instId": "BTC-USDT"
909            },
910            "connId": "a4d3ae55"
911        }"#;
912
913        let parsed: OKXWebSocketEvent = serde_json::from_str(subscription_json).unwrap();
914
915        match parsed {
916            OKXWebSocketEvent::Subscription {
917                event,
918                arg,
919                conn_id,
920            } => {
921                assert_eq!(
922                    event,
923                    crate::websocket::enums::OKXSubscriptionEvent::Subscribe
924                );
925                assert_eq!(arg.channel, OKXWsChannel::Tickers);
926                assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
927                assert_eq!(conn_id, "a4d3ae55");
928            }
929            _ => panic!("Expected Subscription variant"),
930        }
931    }
932
933    #[rstest]
934    fn test_login_event_parsing() {
935        let login_success = r#"{
936            "event": "login",
937            "code": "0",
938            "msg": "Login successful",
939            "connId": "a4d3ae55"
940        }"#;
941
942        let parsed: OKXWebSocketEvent = serde_json::from_str(login_success).unwrap();
943
944        match parsed {
945            OKXWebSocketEvent::Login {
946                event,
947                code,
948                msg,
949                conn_id,
950            } => {
951                assert_eq!(event, "login");
952                assert_eq!(code, "0");
953                assert_eq!(msg, "Login successful");
954                assert_eq!(conn_id, "a4d3ae55");
955            }
956            _ => panic!("Expected Login variant, got: {:?}", parsed),
957        }
958    }
959
960    #[rstest]
961    fn test_error_event_parsing() {
962        let error_json = r#"{
963            "code": "60012",
964            "msg": "Invalid request"
965        }"#;
966
967        let parsed: OKXWebSocketEvent = serde_json::from_str(error_json).unwrap();
968
969        match parsed {
970            OKXWebSocketEvent::Error { code, msg } => {
971                assert_eq!(code, "60012");
972                assert_eq!(msg, "Invalid request");
973            }
974            _ => panic!("Expected Error variant"),
975        }
976    }
977
978    #[rstest]
979    fn test_websocket_request_serialization() {
980        let request = OKXWsRequest {
981            id: Some("req-123".to_string()),
982            op: OKXWsOperation::Order,
983            args: vec![serde_json::json!({
984                "instId": "BTC-USDT",
985                "tdMode": "cash",
986                "side": "buy",
987                "ordType": "market",
988                "sz": "0.1"
989            })],
990            exp_time: None,
991        };
992
993        let serialized = serde_json::to_string(&request).unwrap();
994        let parsed: serde_json::Value = serde_json::from_str(&serialized).unwrap();
995
996        assert_eq!(parsed["id"], "req-123");
997        assert_eq!(parsed["op"], "order");
998        assert!(parsed["args"].is_array());
999        assert_eq!(parsed["args"].as_array().unwrap().len(), 1);
1000    }
1001
1002    #[rstest]
1003    fn test_subscription_request_serialization() {
1004        let subscription = OKXSubscription {
1005            op: OKXWsOperation::Subscribe,
1006            args: vec![OKXSubscriptionArg {
1007                channel: OKXWsChannel::Tickers,
1008                inst_type: Some(crate::common::enums::OKXInstrumentType::Spot),
1009                inst_family: None,
1010                inst_id: Some(Ustr::from("BTC-USDT")),
1011            }],
1012        };
1013
1014        let serialized = serde_json::to_string(&subscription).unwrap();
1015        let parsed: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1016
1017        assert_eq!(parsed["op"], "subscribe");
1018        assert!(parsed["args"].is_array());
1019        assert_eq!(parsed["args"][0]["channel"], "tickers");
1020        assert_eq!(parsed["args"][0]["instType"], "SPOT");
1021        assert_eq!(parsed["args"][0]["instId"], "BTC-USDT");
1022    }
1023
1024    #[rstest]
1025    fn test_error_message_extraction() {
1026        let responses = vec![
1027            (
1028                r#"{
1029                "id": "req-123",
1030                "op": "order",
1031                "code": "50001",
1032                "msg": "Order failed",
1033                "data": [{"sMsg": "Insufficient balance"}]
1034            }"#,
1035                "Insufficient balance",
1036            ),
1037            (
1038                r#"{
1039                "id": "req-456",
1040                "op": "cancel-order",
1041                "code": "50002",
1042                "msg": "Cancel failed",
1043                "data": [{}]
1044            }"#,
1045                "Cancel failed",
1046            ),
1047        ];
1048
1049        for (response_json, expected_msg) in responses {
1050            let parsed: OKXWebSocketEvent = serde_json::from_str(response_json).unwrap();
1051
1052            match parsed {
1053                OKXWebSocketEvent::OrderResponse {
1054                    id: _,
1055                    op: _,
1056                    code,
1057                    msg,
1058                    data,
1059                } => {
1060                    assert_ne!(code, "0"); // Error response
1061
1062                    // Extract error message with fallback logic
1063                    let error_msg = data
1064                        .first()
1065                        .and_then(|d| d.get("sMsg"))
1066                        .and_then(|s| s.as_str())
1067                        .filter(|s| !s.is_empty())
1068                        .unwrap_or(&msg);
1069
1070                    assert_eq!(error_msg, expected_msg);
1071                }
1072                _ => panic!("Expected OrderResponse variant"),
1073            }
1074        }
1075    }
1076
1077    #[rstest]
1078    fn test_book_data_parsing() {
1079        let book_data_json = r#"{
1080            "arg": {
1081                "channel": "books",
1082                "instId": "BTC-USDT"
1083            },
1084            "action": "snapshot",
1085            "data": [{
1086                "asks": [["50000.0", "0.1", "0", "1"]],
1087                "bids": [["49999.0", "0.2", "0", "1"]],
1088                "ts": "1640995200000",
1089                "checksum": 123456789,
1090                "seqId": 1000
1091            }]
1092        }"#;
1093
1094        let parsed: OKXWebSocketEvent = serde_json::from_str(book_data_json).unwrap();
1095
1096        match parsed {
1097            OKXWebSocketEvent::BookData { arg, action, data } => {
1098                assert_eq!(arg.channel, OKXWsChannel::Books);
1099                assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
1100                assert_eq!(
1101                    action,
1102                    super::super::super::common::enums::OKXBookAction::Snapshot
1103                );
1104                assert_eq!(data.len(), 1);
1105            }
1106            _ => panic!("Expected BookData variant"),
1107        }
1108    }
1109
1110    #[rstest]
1111    fn test_data_event_parsing() {
1112        let data_json = r#"{
1113            "arg": {
1114                "channel": "trades",
1115                "instId": "BTC-USDT"
1116            },
1117            "data": [{
1118                "instId": "BTC-USDT",
1119                "tradeId": "12345",
1120                "px": "50000.0",
1121                "sz": "0.1",
1122                "side": "buy",
1123                "ts": "1640995200000"
1124            }]
1125        }"#;
1126
1127        let parsed: OKXWebSocketEvent = serde_json::from_str(data_json).unwrap();
1128
1129        match parsed {
1130            OKXWebSocketEvent::Data { arg, data } => {
1131                assert_eq!(arg.channel, OKXWsChannel::Trades);
1132                assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
1133                assert!(data.is_array());
1134            }
1135            _ => panic!("Expected Data variant"),
1136        }
1137    }
1138
1139    #[rstest]
1140    fn test_nautilus_message_variants() {
1141        let clock = get_atomic_clock_realtime();
1142        let ts_init = clock.get_time_ns();
1143
1144        let error = OKXWebSocketError {
1145            code: "60012".to_string(),
1146            message: "Invalid request".to_string(),
1147            conn_id: None,
1148            timestamp: ts_init.as_u64(),
1149        };
1150        let error_msg = NautilusWsMessage::Error(error);
1151
1152        match error_msg {
1153            NautilusWsMessage::Error(err) => {
1154                assert_eq!(err.code, "60012");
1155                assert_eq!(err.message, "Invalid request");
1156            }
1157            _ => panic!("Expected Error variant"),
1158        }
1159
1160        let raw_scenarios = vec![
1161            ::serde_json::json!({"unknown": "data"}),
1162            ::serde_json::json!({"channel": "unsupported", "data": [1, 2, 3]}),
1163            ::serde_json::json!({"complex": {"nested": {"structure": true}}}),
1164        ];
1165
1166        for raw_data in raw_scenarios {
1167            let raw_msg = NautilusWsMessage::Raw(raw_data.clone());
1168
1169            match raw_msg {
1170                NautilusWsMessage::Raw(data) => {
1171                    assert_eq!(data, raw_data);
1172                }
1173                _ => panic!("Expected Raw variant"),
1174            }
1175        }
1176    }
1177
1178    #[rstest]
1179    fn test_order_response_parsing_success() {
1180        let order_response_json = r#"{
1181            "id": "req-123",
1182            "op": "order",
1183            "code": "0",
1184            "msg": "",
1185            "data": [{"sMsg": "Order placed successfully"}]
1186        }"#;
1187
1188        let parsed: OKXWebSocketEvent = serde_json::from_str(order_response_json).unwrap();
1189
1190        match parsed {
1191            OKXWebSocketEvent::OrderResponse {
1192                id,
1193                op,
1194                code,
1195                msg,
1196                data,
1197            } => {
1198                assert_eq!(id, Some("req-123".to_string()));
1199                assert_eq!(op, OKXWsOperation::Order);
1200                assert_eq!(code, "0");
1201                assert_eq!(msg, "");
1202                assert_eq!(data.len(), 1);
1203            }
1204            _ => panic!("Expected OrderResponse variant"),
1205        }
1206    }
1207
1208    #[rstest]
1209    fn test_order_response_parsing_failure() {
1210        let order_response_json = r#"{
1211            "id": "req-456",
1212            "op": "cancel-order",
1213            "code": "50001",
1214            "msg": "Order not found",
1215            "data": [{"sMsg": "Order with client order ID not found"}]
1216        }"#;
1217
1218        let parsed: OKXWebSocketEvent = serde_json::from_str(order_response_json).unwrap();
1219
1220        match parsed {
1221            OKXWebSocketEvent::OrderResponse {
1222                id,
1223                op,
1224                code,
1225                msg,
1226                data,
1227            } => {
1228                assert_eq!(id, Some("req-456".to_string()));
1229                assert_eq!(op, OKXWsOperation::CancelOrder);
1230                assert_eq!(code, "50001");
1231                assert_eq!(msg, "Order not found");
1232                assert_eq!(data.len(), 1);
1233            }
1234            _ => panic!("Expected OrderResponse variant"),
1235        }
1236    }
1237
1238    #[rstest]
1239    fn test_message_request_serialization() {
1240        let request = OKXWsRequest {
1241            id: Some("req-123".to_string()),
1242            op: OKXWsOperation::Order,
1243            args: vec![::serde_json::json!({
1244                "instId": "BTC-USDT",
1245                "tdMode": "cash",
1246                "side": "buy",
1247                "ordType": "market",
1248                "sz": "0.1"
1249            })],
1250            exp_time: None,
1251        };
1252
1253        let serialized = serde_json::to_string(&request).unwrap();
1254        let parsed: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1255
1256        assert_eq!(parsed["id"], "req-123");
1257        assert_eq!(parsed["op"], "order");
1258        assert!(parsed["args"].is_array());
1259        assert_eq!(parsed["args"].as_array().unwrap().len(), 1);
1260    }
1261}