nautilus_okx/websocket/
messages.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Data structures modelling OKX WebSocket request and response payloads.
17
18use derive_builder::Builder;
19use nautilus_model::{
20    data::{Data, FundingRateUpdate, OrderBookDeltas},
21    events::{
22        AccountState, OrderAccepted, OrderCancelRejected, OrderCanceled, OrderExpired,
23        OrderModifyRejected, OrderRejected, OrderTriggered, OrderUpdated,
24    },
25    instruments::InstrumentAny,
26    reports::{FillReport, OrderStatusReport, PositionStatusReport},
27};
28use serde::{Deserialize, Serialize};
29use ustr::Ustr;
30
31use super::enums::{OKXWsChannel, OKXWsOperation};
32use crate::{
33    common::{
34        enums::{
35            OKXAlgoOrderType, OKXBookAction, OKXCandleConfirm, OKXExecType, OKXInstrumentType,
36            OKXOrderCategory, OKXOrderStatus, OKXOrderType, OKXPositionSide, OKXSide,
37            OKXTargetCurrency, OKXTradeMode, OKXTriggerType,
38        },
39        parse::{
40            deserialize_empty_string_as_none, deserialize_string_to_u64,
41            deserialize_target_currency_as_none,
42        },
43    },
44    websocket::enums::OKXSubscriptionEvent,
45};
46
47#[derive(Debug, Clone)]
48pub enum NautilusWsMessage {
49    Data(Vec<Data>),
50    Deltas(OrderBookDeltas),
51    FundingRates(Vec<FundingRateUpdate>),
52    Instrument(Box<InstrumentAny>),
53    AccountUpdate(AccountState),
54    PositionUpdate(PositionStatusReport),
55    OrderAccepted(OrderAccepted),
56    OrderCanceled(OrderCanceled),
57    OrderExpired(OrderExpired),
58    OrderRejected(OrderRejected),
59    OrderCancelRejected(OrderCancelRejected),
60    OrderModifyRejected(OrderModifyRejected),
61    OrderTriggered(OrderTriggered),
62    OrderUpdated(OrderUpdated),
63    ExecutionReports(Vec<ExecutionReport>),
64    Error(OKXWebSocketError),
65    Raw(serde_json::Value), // Unhandled channels
66    Reconnected,
67    Authenticated,
68}
69
70/// Represents an OKX WebSocket error.
71#[derive(Debug, Clone, Serialize, Deserialize)]
72#[cfg_attr(feature = "python", pyo3::pyclass)]
73pub struct OKXWebSocketError {
74    /// Error code from OKX (e.g., "50101").
75    pub code: String,
76    /// Error message from OKX.
77    pub message: String,
78    /// Connection ID if available.
79    pub conn_id: Option<String>,
80    /// Timestamp when the error occurred.
81    pub timestamp: u64,
82}
83
84#[derive(Debug, Clone)]
85#[allow(clippy::large_enum_variant)]
86pub enum ExecutionReport {
87    Order(OrderStatusReport),
88    Fill(FillReport),
89}
90
91/// Generic WebSocket request for OKX trading commands.
92#[derive(Debug, Serialize)]
93#[serde(rename_all = "camelCase")]
94pub struct OKXWsRequest<T> {
95    /// Client request ID (required for order operations).
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub id: Option<String>,
98    /// Operation type (order, cancel-order, amend-order).
99    pub op: OKXWsOperation,
100    /// Request effective deadline. Unix timestamp format in milliseconds.
101    /// This is when the request itself expires, not related to order expiration.
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub exp_time: Option<String>,
104    /// Arguments payload for the operation.
105    pub args: Vec<T>,
106}
107
108/// OKX WebSocket authentication message.
109#[derive(Debug, Serialize)]
110pub struct OKXAuthentication {
111    pub op: &'static str,
112    pub args: Vec<OKXAuthenticationArg>,
113}
114
115/// OKX WebSocket authentication arguments.
116#[derive(Debug, Serialize)]
117#[serde(rename_all = "camelCase")]
118pub struct OKXAuthenticationArg {
119    pub api_key: String,
120    pub passphrase: String,
121    pub timestamp: String,
122    pub sign: String,
123}
124
125#[derive(Debug, Serialize)]
126pub struct OKXSubscription {
127    pub op: OKXWsOperation,
128    pub args: Vec<OKXSubscriptionArg>,
129}
130
131#[derive(Clone, Debug, Serialize)]
132#[serde(rename_all = "camelCase")]
133pub struct OKXSubscriptionArg {
134    pub channel: OKXWsChannel,
135    pub inst_type: Option<OKXInstrumentType>,
136    pub inst_family: Option<Ustr>,
137    pub inst_id: Option<Ustr>,
138}
139
140/// OKX WebSocket message variants.
141///
142/// Uses custom deserialization that checks discriminant fields (event, op, action)
143/// to determine the correct variant.
144#[derive(Debug)]
145pub enum OKXWsMessage {
146    Login {
147        event: String,
148        code: String,
149        msg: String,
150        conn_id: String,
151    },
152    Subscription {
153        event: OKXSubscriptionEvent,
154        arg: OKXWebSocketArg,
155        conn_id: String,
156        code: Option<String>,
157        msg: Option<String>,
158    },
159    ChannelConnCount {
160        event: String,
161        channel: OKXWsChannel,
162        conn_count: String,
163        conn_id: String,
164    },
165    OrderResponse {
166        id: Option<String>,
167        op: OKXWsOperation,
168        code: String,
169        msg: String,
170        data: Vec<serde_json::Value>,
171    },
172    BookData {
173        arg: OKXWebSocketArg,
174        action: OKXBookAction,
175        data: Vec<OKXBookMsg>,
176    },
177    Data {
178        arg: OKXWebSocketArg,
179        data: serde_json::Value,
180    },
181    Error {
182        code: String,
183        msg: String,
184    },
185    Ping,
186    Reconnected,
187}
188
189impl<'de> Deserialize<'de> for OKXWsMessage {
190    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
191    where
192        D: serde::Deserializer<'de>,
193    {
194        use serde::de::Error;
195
196        // Deserialize to a map to inspect discriminant fields first
197        let value = serde_json::Value::deserialize(deserializer)?;
198        let obj = value
199            .as_object()
200            .ok_or_else(|| D::Error::custom("expected JSON object for OKXWsMessage"))?;
201
202        // Check discriminant fields in priority order
203
204        // 1. Check for "event" field - Login, Subscription, ChannelConnCount, or Error
205        if let Some(event) = obj.get("event").and_then(|v| v.as_str()) {
206            if event == "login" {
207                return parse_login(obj);
208            } else if event == "subscribe" || event == "unsubscribe" {
209                return parse_subscription(obj);
210            } else if event == "error" {
211                // All error events (simple or subscription-related) go to parse_error
212                // Extra fields like "arg" and "connId" are ignored
213                return parse_error(obj);
214            } else if obj.contains_key("channel") && obj.contains_key("connCount") {
215                return parse_channel_conn_count(obj);
216            }
217        }
218
219        // 2. Check for "op" field - OrderResponse
220        if obj.contains_key("op") {
221            return parse_order_response(obj);
222        }
223
224        // 3. Check for "action" field with "arg" - BookData
225        if obj.contains_key("action") && obj.contains_key("arg") {
226            return parse_book_data(obj);
227        }
228
229        // 4. Check for "arg" and "data" without "action" - Data
230        if obj.contains_key("arg") && obj.contains_key("data") {
231            return parse_data(obj);
232        }
233
234        // 5. Fallback to Error if it has "code" and "msg"
235        if obj.contains_key("code") && obj.contains_key("msg") {
236            return parse_error(obj);
237        }
238
239        Err(D::Error::custom(format!(
240            "cannot determine OKXWsMessage variant from: {}",
241            serde_json::to_string(&value).unwrap_or_default()
242        )))
243    }
244}
245
246fn parse_login<E: serde::de::Error>(
247    obj: &serde_json::Map<String, serde_json::Value>,
248) -> Result<OKXWsMessage, E> {
249    Ok(OKXWsMessage::Login {
250        event: obj
251            .get("event")
252            .and_then(|v| v.as_str())
253            .map(String::from)
254            .ok_or_else(|| E::missing_field("event"))?,
255        code: obj
256            .get("code")
257            .and_then(|v| v.as_str())
258            .map(String::from)
259            .ok_or_else(|| E::missing_field("code"))?,
260        msg: obj
261            .get("msg")
262            .and_then(|v| v.as_str())
263            .map(String::from)
264            .ok_or_else(|| E::missing_field("msg"))?,
265        conn_id: obj
266            .get("connId")
267            .and_then(|v| v.as_str())
268            .map(String::from)
269            .ok_or_else(|| E::missing_field("connId"))?,
270    })
271}
272
273fn parse_subscription<E: serde::de::Error>(
274    obj: &serde_json::Map<String, serde_json::Value>,
275) -> Result<OKXWsMessage, E> {
276    let event_str = obj
277        .get("event")
278        .and_then(|v| v.as_str())
279        .ok_or_else(|| E::missing_field("event"))?;
280
281    let event: OKXSubscriptionEvent =
282        serde_json::from_value(serde_json::Value::String(event_str.to_string()))
283            .map_err(|e| E::custom(format!("invalid event: {e}")))?;
284
285    let arg: OKXWebSocketArg = obj
286        .get("arg")
287        .cloned()
288        .map(serde_json::from_value)
289        .transpose()
290        .map_err(|e| E::custom(format!("invalid arg: {e}")))?
291        .ok_or_else(|| E::missing_field("arg"))?;
292
293    Ok(OKXWsMessage::Subscription {
294        event,
295        arg,
296        conn_id: obj
297            .get("connId")
298            .and_then(|v| v.as_str())
299            .map(String::from)
300            .ok_or_else(|| E::missing_field("connId"))?,
301        code: obj.get("code").and_then(|v| v.as_str()).map(String::from),
302        msg: obj.get("msg").and_then(|v| v.as_str()).map(String::from),
303    })
304}
305
306fn parse_channel_conn_count<E: serde::de::Error>(
307    obj: &serde_json::Map<String, serde_json::Value>,
308) -> Result<OKXWsMessage, E> {
309    let channel: OKXWsChannel = obj
310        .get("channel")
311        .cloned()
312        .map(serde_json::from_value)
313        .transpose()
314        .map_err(|e| E::custom(format!("invalid channel: {e}")))?
315        .ok_or_else(|| E::missing_field("channel"))?;
316
317    Ok(OKXWsMessage::ChannelConnCount {
318        event: obj
319            .get("event")
320            .and_then(|v| v.as_str())
321            .map(String::from)
322            .ok_or_else(|| E::missing_field("event"))?,
323        channel,
324        conn_count: obj
325            .get("connCount")
326            .and_then(|v| v.as_str())
327            .map(String::from)
328            .ok_or_else(|| E::missing_field("connCount"))?,
329        conn_id: obj
330            .get("connId")
331            .and_then(|v| v.as_str())
332            .map(String::from)
333            .ok_or_else(|| E::missing_field("connId"))?,
334    })
335}
336
337fn parse_order_response<E: serde::de::Error>(
338    obj: &serde_json::Map<String, serde_json::Value>,
339) -> Result<OKXWsMessage, E> {
340    let op: OKXWsOperation = obj
341        .get("op")
342        .cloned()
343        .map(serde_json::from_value)
344        .transpose()
345        .map_err(|e| E::custom(format!("invalid op: {e}")))?
346        .ok_or_else(|| E::missing_field("op"))?;
347
348    let data: Vec<serde_json::Value> = obj
349        .get("data")
350        .cloned()
351        .map(serde_json::from_value)
352        .transpose()
353        .map_err(|e| E::custom(format!("invalid data: {e}")))?
354        .unwrap_or_default();
355
356    Ok(OKXWsMessage::OrderResponse {
357        id: obj.get("id").and_then(|v| v.as_str()).map(String::from),
358        op,
359        code: obj
360            .get("code")
361            .and_then(|v| v.as_str())
362            .map(String::from)
363            .ok_or_else(|| E::missing_field("code"))?,
364        msg: obj
365            .get("msg")
366            .and_then(|v| v.as_str())
367            .map(String::from)
368            .ok_or_else(|| E::missing_field("msg"))?,
369        data,
370    })
371}
372
373fn parse_book_data<E: serde::de::Error>(
374    obj: &serde_json::Map<String, serde_json::Value>,
375) -> Result<OKXWsMessage, E> {
376    let arg: OKXWebSocketArg = obj
377        .get("arg")
378        .cloned()
379        .map(serde_json::from_value)
380        .transpose()
381        .map_err(|e| E::custom(format!("invalid arg: {e}")))?
382        .ok_or_else(|| E::missing_field("arg"))?;
383
384    let action: OKXBookAction = obj
385        .get("action")
386        .cloned()
387        .map(serde_json::from_value)
388        .transpose()
389        .map_err(|e| E::custom(format!("invalid action: {e}")))?
390        .ok_or_else(|| E::missing_field("action"))?;
391
392    let data: Vec<OKXBookMsg> = obj
393        .get("data")
394        .cloned()
395        .map(serde_json::from_value)
396        .transpose()
397        .map_err(|e| E::custom(format!("invalid data: {e}")))?
398        .ok_or_else(|| E::missing_field("data"))?;
399
400    Ok(OKXWsMessage::BookData { arg, action, data })
401}
402
403fn parse_data<E: serde::de::Error>(
404    obj: &serde_json::Map<String, serde_json::Value>,
405) -> Result<OKXWsMessage, E> {
406    let arg: OKXWebSocketArg = obj
407        .get("arg")
408        .cloned()
409        .map(serde_json::from_value)
410        .transpose()
411        .map_err(|e| E::custom(format!("invalid arg: {e}")))?
412        .ok_or_else(|| E::missing_field("arg"))?;
413
414    let data = obj
415        .get("data")
416        .cloned()
417        .ok_or_else(|| E::missing_field("data"))?;
418
419    Ok(OKXWsMessage::Data { arg, data })
420}
421
422fn parse_error<E: serde::de::Error>(
423    obj: &serde_json::Map<String, serde_json::Value>,
424) -> Result<OKXWsMessage, E> {
425    Ok(OKXWsMessage::Error {
426        code: obj
427            .get("code")
428            .and_then(|v| v.as_str())
429            .map(String::from)
430            .ok_or_else(|| E::missing_field("code"))?,
431        msg: obj
432            .get("msg")
433            .and_then(|v| v.as_str())
434            .map(String::from)
435            .ok_or_else(|| E::missing_field("msg"))?,
436    })
437}
438
439#[derive(Debug, Serialize, Deserialize)]
440#[serde(rename_all = "camelCase")]
441pub struct OKXWebSocketArg {
442    /// Channel name that pushed the data.
443    pub channel: OKXWsChannel,
444    #[serde(default)]
445    pub inst_id: Option<Ustr>,
446    #[serde(default)]
447    pub inst_type: Option<OKXInstrumentType>,
448    #[serde(default)]
449    pub inst_family: Option<Ustr>,
450    #[serde(default)]
451    pub bar: Option<Ustr>,
452}
453
454/// Ticker data for an instrument.
455#[derive(Debug, Serialize, Deserialize)]
456#[serde(rename_all = "camelCase")]
457pub struct OKXTickerMsg {
458    /// Instrument type, e.g. "SPOT", "SWAP".
459    pub inst_type: OKXInstrumentType,
460    /// Instrument ID, e.g. "BTC-USDT".
461    pub inst_id: Ustr,
462    /// Last traded price.
463    #[serde(rename = "last")]
464    pub last_px: String,
465    /// Last traded size.
466    pub last_sz: String,
467    /// Best ask price.
468    pub ask_px: String,
469    /// Best ask size.
470    pub ask_sz: String,
471    /// Best bid price.
472    pub bid_px: String,
473    /// Best bid size.
474    pub bid_sz: String,
475    /// 24-hour opening price.
476    pub open24h: String,
477    /// 24-hour highest price.
478    pub high24h: String,
479    /// 24-hour lowest price.
480    pub low24h: String,
481    /// 24-hour trading volume in quote currency.
482    pub vol_ccy_24h: String,
483    /// 24-hour trading volume.
484    pub vol24h: String,
485    /// The opening price of the day (UTC 0).
486    pub sod_utc0: String,
487    /// The opening price of the day (UTC 8).
488    pub sod_utc8: String,
489    /// Timestamp of the data generation, Unix timestamp format in milliseconds.
490    #[serde(deserialize_with = "deserialize_string_to_u64")]
491    pub ts: u64,
492}
493
494/// Represents a single order in the order book.
495#[derive(Debug, Serialize, Deserialize)]
496pub struct OrderBookEntry {
497    /// Price of the order.
498    pub price: String,
499    /// Size of the order.
500    pub size: String,
501    /// Number of liquidated orders.
502    pub liquidated_orders_count: String,
503    /// Total number of orders at this price.
504    pub orders_count: String,
505}
506
507/// Order book data for an instrument.
508#[derive(Debug, Serialize, Deserialize)]
509#[serde(rename_all = "camelCase")]
510pub struct OKXBookMsg {
511    /// Order book asks [price, size, liquidated orders count, orders count].
512    pub asks: Vec<OrderBookEntry>,
513    /// Order book bids [price, size, liquidated orders count, orders count].
514    pub bids: Vec<OrderBookEntry>,
515    /// Checksum value.
516    pub checksum: Option<i64>,
517    /// Sequence ID of the last sent message. Only applicable to books, books-l2-tbt, books50-l2-tbt.
518    pub prev_seq_id: Option<i64>,
519    /// Sequence ID of the current message, implementation details below.
520    pub seq_id: u64,
521    /// Order book generation time, Unix timestamp format in milliseconds, e.g. 1597026383085.
522    #[serde(deserialize_with = "deserialize_string_to_u64")]
523    pub ts: u64,
524}
525
526/// Trade data for an instrument.
527#[derive(Debug, Serialize, Deserialize)]
528#[serde(rename_all = "camelCase")]
529pub struct OKXTradeMsg {
530    /// Instrument ID.
531    pub inst_id: Ustr,
532    /// Trade ID.
533    pub trade_id: String,
534    /// Trade price.
535    pub px: String,
536    /// Trade size.
537    pub sz: String,
538    /// Trade direction (buy or sell).
539    pub side: OKXSide,
540    /// Count.
541    pub count: String,
542    /// Trade timestamp, Unix timestamp format in milliseconds.
543    #[serde(deserialize_with = "deserialize_string_to_u64")]
544    pub ts: u64,
545}
546
547/// Funding rate data for perpetual swaps.
548#[derive(Debug, Serialize, Deserialize)]
549#[serde(rename_all = "camelCase")]
550pub struct OKXFundingRateMsg {
551    /// Instrument ID.
552    pub inst_id: Ustr,
553    /// Current funding rate.
554    pub funding_rate: Ustr,
555    /// Predicted next funding rate.
556    pub next_funding_rate: Ustr,
557    /// Next funding time, Unix timestamp format in milliseconds.
558    #[serde(deserialize_with = "deserialize_string_to_u64")]
559    pub funding_time: u64,
560    /// Message timestamp, Unix timestamp format in milliseconds.
561    #[serde(deserialize_with = "deserialize_string_to_u64")]
562    pub ts: u64,
563}
564
565/// Mark price data for perpetual swaps.
566#[derive(Debug, Serialize, Deserialize)]
567#[serde(rename_all = "camelCase")]
568pub struct OKXMarkPriceMsg {
569    /// Instrument ID.
570    pub inst_id: Ustr,
571    /// Current mark price.
572    pub mark_px: String,
573    /// Timestamp of the data generation, Unix timestamp format in milliseconds.
574    #[serde(deserialize_with = "deserialize_string_to_u64")]
575    pub ts: u64,
576}
577
578/// Index price data.
579#[derive(Debug, Serialize, Deserialize)]
580#[serde(rename_all = "camelCase")]
581pub struct OKXIndexPriceMsg {
582    /// Index name, e.g. "BTC-USD".
583    pub inst_id: Ustr,
584    /// Latest index price.
585    pub idx_px: String,
586    /// 24-hour highest price.
587    pub high24h: String,
588    /// 24-hour lowest price.
589    pub low24h: String,
590    /// 24-hour opening price.
591    pub open24h: String,
592    /// The opening price of the day (UTC 0).
593    pub sod_utc0: String,
594    /// The opening price of the day (UTC 8).
595    pub sod_utc8: String,
596    /// Timestamp of the data generation, Unix timestamp format in milliseconds.
597    #[serde(deserialize_with = "deserialize_string_to_u64")]
598    pub ts: u64,
599}
600
601/// Price limit data (upper and lower limits).
602#[derive(Debug, Serialize, Deserialize)]
603#[serde(rename_all = "camelCase")]
604pub struct OKXPriceLimitMsg {
605    /// Instrument ID.
606    pub inst_id: Ustr,
607    /// Buy limit price.
608    pub buy_lmt: String,
609    /// Sell limit price.
610    pub sell_lmt: String,
611    /// Timestamp of the data generation, Unix timestamp format in milliseconds.
612    #[serde(deserialize_with = "deserialize_string_to_u64")]
613    pub ts: u64,
614}
615
616/// Candlestick data for an instrument.
617#[derive(Debug, Serialize, Deserialize)]
618#[serde(rename_all = "camelCase")]
619pub struct OKXCandleMsg {
620    /// Candlestick timestamp, Unix timestamp format in milliseconds.
621    #[serde(deserialize_with = "deserialize_string_to_u64")]
622    pub ts: u64,
623    /// Opening price.
624    pub o: String,
625    /// Highest price.
626    pub h: String,
627    /// Lowest price.
628    pub l: String,
629    /// Closing price.
630    pub c: String,
631    /// Trading volume in contracts.
632    pub vol: String,
633    /// Trading volume in quote currency.
634    pub vol_ccy: String,
635    pub vol_ccy_quote: String,
636    /// Whether this is a completed candle.
637    pub confirm: OKXCandleConfirm,
638}
639
640/// Open interest data.
641#[derive(Debug, Serialize, Deserialize)]
642#[serde(rename_all = "camelCase")]
643pub struct OKXOpenInterestMsg {
644    /// Instrument ID.
645    pub inst_id: Ustr,
646    /// Open interest in contracts.
647    pub oi: String,
648    /// Open interest in quote currency.
649    pub oi_ccy: String,
650    /// Timestamp of the data generation, Unix timestamp format in milliseconds.
651    #[serde(deserialize_with = "deserialize_string_to_u64")]
652    pub ts: u64,
653}
654
655/// Option market data summary.
656#[derive(Debug, Serialize, Deserialize)]
657#[serde(rename_all = "camelCase")]
658pub struct OKXOptionSummaryMsg {
659    /// Instrument ID.
660    pub inst_id: Ustr,
661    /// Underlying.
662    pub uly: String,
663    /// Delta.
664    pub delta: String,
665    /// Gamma.
666    pub gamma: String,
667    /// Theta.
668    pub theta: String,
669    /// Vega.
670    pub vega: String,
671    /// Black-Scholes implied volatility delta.
672    pub delta_bs: String,
673    /// Black-Scholes implied volatility gamma.
674    pub gamma_bs: String,
675    /// Black-Scholes implied volatility theta.
676    pub theta_bs: String,
677    /// Black-Scholes implied volatility vega.
678    pub vega_bs: String,
679    /// Realized volatility.
680    pub real_vol: String,
681    /// Bid volatility.
682    pub bid_vol: String,
683    /// Ask volatility.
684    pub ask_vol: String,
685    /// Mark volatility.
686    pub mark_vol: String,
687    /// Leverage.
688    pub lever: String,
689    /// Timestamp of the data generation, Unix timestamp format in milliseconds.
690    #[serde(deserialize_with = "deserialize_string_to_u64")]
691    pub ts: u64,
692}
693
694/// Estimated delivery/exercise price data.
695#[derive(Debug, Serialize, Deserialize)]
696#[serde(rename_all = "camelCase")]
697pub struct OKXEstimatedPriceMsg {
698    /// Instrument ID.
699    pub inst_id: Ustr,
700    /// Estimated settlement price.
701    pub settle_px: String,
702    /// Timestamp of the data generation, Unix timestamp format in milliseconds.
703    #[serde(deserialize_with = "deserialize_string_to_u64")]
704    pub ts: u64,
705}
706
707/// Platform status updates.
708#[derive(Debug, Serialize, Deserialize)]
709#[serde(rename_all = "camelCase")]
710pub struct OKXStatusMsg {
711    /// System maintenance status.
712    pub title: Ustr,
713    /// Status type: planned or scheduled.
714    #[serde(rename = "type")]
715    pub status_type: Ustr,
716    /// System maintenance state: canceled, completed, pending, ongoing.
717    pub state: Ustr,
718    /// Expected completion timestamp.
719    pub end_time: Option<String>,
720    /// Planned start timestamp.
721    pub begin_time: Option<String>,
722    /// Service involved.
723    pub service_type: Option<Ustr>,
724    /// Reason for status change.
725    pub reason: Option<String>,
726    /// Timestamp of the data generation, Unix timestamp format in milliseconds.
727    #[serde(deserialize_with = "deserialize_string_to_u64")]
728    pub ts: u64,
729}
730
731/// Order update message from WebSocket orders channel.
732#[derive(Clone, Debug, Serialize, Deserialize)]
733#[serde(rename_all = "camelCase")]
734pub struct OKXOrderMsg {
735    /// Accumulated filled size.
736    #[serde(default, deserialize_with = "deserialize_empty_string_as_none")]
737    pub acc_fill_sz: Option<String>,
738    /// Average price.
739    pub avg_px: String,
740    /// Creation time, Unix timestamp in milliseconds.
741    #[serde(deserialize_with = "deserialize_string_to_u64")]
742    pub c_time: u64,
743    /// Cancel source.
744    #[serde(default)]
745    pub cancel_source: Option<String>,
746    /// Cancel source reason.
747    #[serde(default)]
748    pub cancel_source_reason: Option<String>,
749    /// Order category (normal, liquidation, ADL, etc.).
750    pub category: OKXOrderCategory,
751    /// Currency.
752    pub ccy: Ustr,
753    /// Client order ID.
754    pub cl_ord_id: String,
755    /// Parent algo client order ID if present.
756    #[serde(default, deserialize_with = "deserialize_empty_string_as_none")]
757    pub algo_cl_ord_id: Option<String>,
758    /// Fee.
759    #[serde(default, deserialize_with = "deserialize_empty_string_as_none")]
760    pub fee: Option<String>,
761    /// Fee currency.
762    pub fee_ccy: Ustr,
763    /// Fill price.
764    pub fill_px: String,
765    /// Fill size.
766    pub fill_sz: String,
767    /// Fill time, Unix timestamp in milliseconds.
768    #[serde(deserialize_with = "deserialize_string_to_u64")]
769    pub fill_time: u64,
770    /// Instrument ID.
771    pub inst_id: Ustr,
772    /// Instrument type.
773    pub inst_type: OKXInstrumentType,
774    /// Leverage.
775    pub lever: String,
776    /// Order ID.
777    pub ord_id: Ustr,
778    /// Order type.
779    pub ord_type: OKXOrderType,
780    /// Profit and loss.
781    pub pnl: String,
782    /// Position side.
783    pub pos_side: OKXPositionSide,
784    /// Price (algo orders use ordPx instead).
785    #[serde(default)]
786    pub px: String,
787    /// Reduce only flag.
788    pub reduce_only: String,
789    /// Side.
790    pub side: OKXSide,
791    /// Order state.
792    pub state: OKXOrderStatus,
793    /// Execution type.
794    pub exec_type: OKXExecType,
795    /// Size.
796    pub sz: String,
797    /// Trade mode.
798    pub td_mode: OKXTradeMode,
799    /// Target currency (base_ccy or quote_ccy). Empty for margin modes.
800    #[serde(default, deserialize_with = "deserialize_target_currency_as_none")]
801    pub tgt_ccy: Option<OKXTargetCurrency>,
802    /// Trade ID.
803    pub trade_id: String,
804    /// Last update time, Unix timestamp in milliseconds.
805    #[serde(deserialize_with = "deserialize_string_to_u64")]
806    pub u_time: u64,
807}
808
809/// Represents an algo order message from WebSocket updates.
810#[derive(Clone, Debug, Deserialize, Serialize)]
811#[serde(rename_all = "camelCase")]
812pub struct OKXAlgoOrderMsg {
813    /// Algorithm ID.
814    pub algo_id: String,
815    /// Algorithm client order ID.
816    #[serde(default)]
817    pub algo_cl_ord_id: String,
818    /// Client order ID (empty for algo orders until triggered).
819    pub cl_ord_id: String,
820    /// Order ID (empty until algo order is triggered).
821    pub ord_id: String,
822    /// Instrument ID.
823    pub inst_id: Ustr,
824    /// Instrument type.
825    pub inst_type: OKXInstrumentType,
826    /// Order type (always "trigger" for conditional orders).
827    pub ord_type: OKXOrderType,
828    /// Order state.
829    pub state: OKXOrderStatus,
830    /// Side.
831    pub side: OKXSide,
832    /// Position side.
833    pub pos_side: OKXPositionSide,
834    /// Size.
835    pub sz: String,
836    /// Trigger price.
837    pub trigger_px: String,
838    /// Trigger price type (last, mark, index).
839    pub trigger_px_type: OKXTriggerType,
840    /// Order price (-1 for market orders).
841    pub ord_px: String,
842    /// Trade mode.
843    pub td_mode: OKXTradeMode,
844    /// Leverage.
845    pub lever: String,
846    /// Reduce only flag.
847    pub reduce_only: String,
848    /// Actual filled price.
849    pub actual_px: String,
850    /// Actual filled size.
851    pub actual_sz: String,
852    /// Notional USD value.
853    pub notional_usd: String,
854    /// Creation time, Unix timestamp in milliseconds.
855    #[serde(deserialize_with = "deserialize_string_to_u64")]
856    pub c_time: u64,
857    /// Update time, Unix timestamp in milliseconds.
858    #[serde(deserialize_with = "deserialize_string_to_u64")]
859    pub u_time: u64,
860    /// Trigger time (empty until triggered).
861    pub trigger_time: String,
862    /// Tag.
863    #[serde(default)]
864    pub tag: String,
865}
866
867/// Parameters for WebSocket place order operation.
868#[derive(Clone, Debug, Deserialize, Serialize, Builder)]
869#[builder(setter(into, strip_option))]
870#[serde(rename_all = "camelCase")]
871pub struct WsPostOrderParams {
872    /// Instrument type: SPOT, MARGIN, SWAP, FUTURES, OPTION (optional for WebSocket).
873    #[builder(default)]
874    #[serde(skip_serializing_if = "Option::is_none")]
875    pub inst_type: Option<OKXInstrumentType>,
876    /// Instrument ID, e.g. "BTC-USDT".
877    pub inst_id: Ustr,
878    /// Trading mode: cash, isolated, cross.
879    pub td_mode: OKXTradeMode,
880    /// Margin currency (only for isolated margin).
881    #[builder(default)]
882    #[serde(skip_serializing_if = "Option::is_none")]
883    pub ccy: Option<Ustr>,
884    /// Unique client order ID.
885    #[builder(default)]
886    #[serde(skip_serializing_if = "Option::is_none")]
887    pub cl_ord_id: Option<String>,
888    /// Order side: buy or sell.
889    pub side: OKXSide,
890    /// Position side: long, short, net (optional).
891    #[builder(default)]
892    #[serde(skip_serializing_if = "Option::is_none")]
893    pub pos_side: Option<OKXPositionSide>,
894    /// Order type: limit, market, post_only, fok, ioc, etc.
895    pub ord_type: OKXOrderType,
896    /// Order size.
897    pub sz: String,
898    /// Order price (required for limit orders).
899    #[builder(default)]
900    #[serde(skip_serializing_if = "Option::is_none")]
901    pub px: Option<String>,
902    /// Reduce-only flag.
903    #[builder(default)]
904    #[serde(skip_serializing_if = "Option::is_none")]
905    pub reduce_only: Option<bool>,
906    /// Whether to close the entire position.
907    #[builder(default)]
908    #[serde(rename = "closePosition", skip_serializing_if = "Option::is_none")]
909    pub close_position: Option<bool>,
910    /// Target currency for net orders.
911    #[builder(default)]
912    #[serde(skip_serializing_if = "Option::is_none")]
913    pub tgt_ccy: Option<OKXTargetCurrency>,
914    /// Order tag for categorization.
915    #[builder(default)]
916    #[serde(skip_serializing_if = "Option::is_none")]
917    pub tag: Option<String>,
918}
919
920/// Parameters for WebSocket cancel order operation (instType not included).
921#[derive(Clone, Debug, Default, Deserialize, Serialize, Builder)]
922#[builder(default)]
923#[builder(setter(into, strip_option))]
924#[serde(rename_all = "camelCase")]
925pub struct WsCancelOrderParams {
926    /// Instrument ID, e.g. "BTC-USDT".
927    pub inst_id: Ustr,
928    /// Exchange-assigned order ID.
929    #[serde(skip_serializing_if = "Option::is_none")]
930    pub ord_id: Option<String>,
931    /// User-assigned client order ID.
932    #[serde(skip_serializing_if = "Option::is_none")]
933    pub cl_ord_id: Option<String>,
934}
935
936/// Parameters for WebSocket mass cancel operation.
937#[derive(Clone, Debug, Default, Deserialize, Serialize, Builder)]
938#[builder(default)]
939#[builder(setter(into, strip_option))]
940#[serde(rename_all = "camelCase")]
941pub struct WsMassCancelParams {
942    /// Instrument type.
943    pub inst_type: OKXInstrumentType,
944    /// Instrument family, e.g. "BTC-USD", "BTC-USDT".
945    pub inst_family: Ustr,
946}
947
948/// Parameters for WebSocket amend order operation (instType not included).
949#[derive(Clone, Debug, Default, Deserialize, Serialize, Builder)]
950#[builder(default)]
951#[builder(setter(into, strip_option))]
952#[serde(rename_all = "camelCase")]
953pub struct WsAmendOrderParams {
954    /// Instrument ID, e.g. "BTC-USDT".
955    pub inst_id: Ustr,
956    /// Exchange-assigned order ID (optional if using clOrdId).
957    #[serde(skip_serializing_if = "Option::is_none")]
958    pub ord_id: Option<String>,
959    /// User-assigned client order ID (optional if using ordId).
960    #[serde(skip_serializing_if = "Option::is_none")]
961    pub cl_ord_id: Option<String>,
962    /// New client order ID for the amended order.
963    #[serde(skip_serializing_if = "Option::is_none")]
964    pub new_cl_ord_id: Option<String>,
965    /// New order price (optional).
966    #[serde(skip_serializing_if = "Option::is_none")]
967    pub new_px: Option<String>,
968    /// New order size (optional).
969    #[serde(skip_serializing_if = "Option::is_none")]
970    pub new_sz: Option<String>,
971}
972
973/// Parameters for WebSocket algo order placement.
974#[derive(Clone, Debug, Deserialize, Serialize, Builder)]
975#[builder(setter(into, strip_option))]
976#[serde(rename_all = "camelCase")]
977pub struct WsPostAlgoOrderParams {
978    /// Instrument ID, e.g. "BTC-USDT".
979    pub inst_id: Ustr,
980    /// Trading mode: cash, isolated, cross.
981    pub td_mode: OKXTradeMode,
982    /// Order side: buy or sell.
983    pub side: OKXSide,
984    /// Order type: trigger (for stop orders).
985    pub ord_type: OKXAlgoOrderType,
986    /// Order size.
987    pub sz: String,
988    /// Client order ID (optional).
989    #[builder(default)]
990    #[serde(skip_serializing_if = "Option::is_none")]
991    pub cl_ord_id: Option<String>,
992    /// Position side: long, short, net (optional).
993    #[builder(default)]
994    #[serde(skip_serializing_if = "Option::is_none")]
995    pub pos_side: Option<OKXPositionSide>,
996    /// Trigger price for stop/conditional orders.
997    #[serde(skip_serializing_if = "Option::is_none")]
998    pub trigger_px: Option<String>,
999    /// Trigger price type: last, index, mark.
1000    #[builder(default)]
1001    #[serde(skip_serializing_if = "Option::is_none")]
1002    pub trigger_px_type: Option<OKXTriggerType>,
1003    /// Order price (for limit orders after trigger).
1004    #[builder(default)]
1005    #[serde(skip_serializing_if = "Option::is_none")]
1006    pub order_px: Option<String>,
1007    /// Reduce-only flag.
1008    #[builder(default)]
1009    #[serde(skip_serializing_if = "Option::is_none")]
1010    pub reduce_only: Option<bool>,
1011    /// Order tag for categorization.
1012    #[builder(default)]
1013    #[serde(skip_serializing_if = "Option::is_none")]
1014    pub tag: Option<String>,
1015}
1016
1017/// Parameters for WebSocket cancel algo order operation.
1018#[derive(Clone, Debug, Deserialize, Serialize, Builder)]
1019#[builder(setter(into, strip_option))]
1020#[serde(rename_all = "camelCase")]
1021pub struct WsCancelAlgoOrderParams {
1022    /// Instrument ID, e.g. "BTC-USDT".
1023    pub inst_id: Ustr,
1024    /// Algo order ID.
1025    #[serde(skip_serializing_if = "Option::is_none")]
1026    pub algo_id: Option<String>,
1027    /// Client algo order ID.
1028    #[serde(skip_serializing_if = "Option::is_none")]
1029    pub algo_cl_ord_id: Option<String>,
1030}
1031
1032#[cfg(test)]
1033mod tests {
1034    use nautilus_core::time::get_atomic_clock_realtime;
1035    use rstest::rstest;
1036
1037    use super::*;
1038
1039    #[rstest]
1040    fn test_deserialize_websocket_arg() {
1041        let json_str = r#"{"channel":"instruments","instType":"SPOT"}"#;
1042
1043        let result: Result<OKXWebSocketArg, _> = serde_json::from_str(json_str);
1044        match result {
1045            Ok(arg) => {
1046                assert_eq!(arg.channel, OKXWsChannel::Instruments);
1047                assert_eq!(arg.inst_type, Some(OKXInstrumentType::Spot));
1048                assert_eq!(arg.inst_id, None);
1049            }
1050            Err(e) => {
1051                panic!("Failed to deserialize WebSocket arg: {e}");
1052            }
1053        }
1054    }
1055
1056    #[rstest]
1057    fn test_deserialize_subscribe_variant_direct() {
1058        #[derive(Debug, Deserialize)]
1059        #[serde(rename_all = "camelCase")]
1060        struct SubscribeMsg {
1061            event: String,
1062            arg: OKXWebSocketArg,
1063            conn_id: String,
1064        }
1065
1066        let json_str = r#"{"event":"subscribe","arg":{"channel":"instruments","instType":"SPOT"},"connId":"380cfa6a"}"#;
1067
1068        let result: Result<SubscribeMsg, _> = serde_json::from_str(json_str);
1069        match result {
1070            Ok(msg) => {
1071                assert_eq!(msg.event, "subscribe");
1072                assert_eq!(msg.arg.channel, OKXWsChannel::Instruments);
1073                assert_eq!(msg.conn_id, "380cfa6a");
1074            }
1075            Err(e) => {
1076                panic!("Failed to deserialize subscribe message directly: {e}");
1077            }
1078        }
1079    }
1080
1081    #[rstest]
1082    fn test_deserialize_subscribe_confirmation() {
1083        let json_str = r#"{"event":"subscribe","arg":{"channel":"instruments","instType":"SPOT"},"connId":"380cfa6a"}"#;
1084
1085        let result: Result<OKXWsMessage, _> = serde_json::from_str(json_str);
1086        match result {
1087            Ok(msg) => {
1088                if let OKXWsMessage::Subscription {
1089                    event,
1090                    arg,
1091                    conn_id,
1092                    ..
1093                } = msg
1094                {
1095                    assert_eq!(event, OKXSubscriptionEvent::Subscribe);
1096                    assert_eq!(arg.channel, OKXWsChannel::Instruments);
1097                    assert_eq!(conn_id, "380cfa6a");
1098                } else {
1099                    panic!("Expected Subscribe variant, was: {msg:?}");
1100                }
1101            }
1102            Err(e) => {
1103                panic!("Failed to deserialize subscription confirmation: {e}");
1104            }
1105        }
1106    }
1107
1108    #[rstest]
1109    fn test_deserialize_subscribe_with_inst_id() {
1110        let json_str = r#"{"event":"subscribe","arg":{"channel":"candle1m","instId":"ETH-USDT"},"connId":"358602f5"}"#;
1111
1112        let result: Result<OKXWsMessage, _> = serde_json::from_str(json_str);
1113        match result {
1114            Ok(msg) => {
1115                if let OKXWsMessage::Subscription {
1116                    event,
1117                    arg,
1118                    conn_id,
1119                    ..
1120                } = msg
1121                {
1122                    assert_eq!(event, OKXSubscriptionEvent::Subscribe);
1123                    assert_eq!(arg.channel, OKXWsChannel::Candle1Minute);
1124                    assert_eq!(conn_id, "358602f5");
1125                } else {
1126                    panic!("Expected Subscribe variant, was: {msg:?}");
1127                }
1128            }
1129            Err(e) => {
1130                panic!("Failed to deserialize subscription confirmation: {e}");
1131            }
1132        }
1133    }
1134
1135    #[rstest]
1136    fn test_channel_serialization_for_logging() {
1137        let channel = OKXWsChannel::Candle1Minute;
1138        let serialized = serde_json::to_string(&channel).unwrap();
1139        let cleaned = serialized.trim_matches('"').to_string();
1140        assert_eq!(cleaned, "candle1m");
1141
1142        let channel = OKXWsChannel::BboTbt;
1143        let serialized = serde_json::to_string(&channel).unwrap();
1144        let cleaned = serialized.trim_matches('"').to_string();
1145        assert_eq!(cleaned, "bbo-tbt");
1146
1147        let channel = OKXWsChannel::Trades;
1148        let serialized = serde_json::to_string(&channel).unwrap();
1149        let cleaned = serialized.trim_matches('"').to_string();
1150        assert_eq!(cleaned, "trades");
1151    }
1152
1153    #[rstest]
1154    fn test_order_response_with_enum_operation() {
1155        let json_str = r#"{"id":"req-123","op":"order","code":"0","msg":"","data":[]}"#;
1156        let result: Result<OKXWsMessage, _> = serde_json::from_str(json_str);
1157        match result {
1158            Ok(OKXWsMessage::OrderResponse {
1159                id,
1160                op,
1161                code,
1162                msg,
1163                data,
1164            }) => {
1165                assert_eq!(id, Some("req-123".to_string()));
1166                assert_eq!(op, OKXWsOperation::Order);
1167                assert_eq!(code, "0");
1168                assert_eq!(msg, "");
1169                assert!(data.is_empty());
1170            }
1171            Ok(other) => panic!("Expected OrderResponse, was: {other:?}"),
1172            Err(e) => panic!("Failed to deserialize: {e}"),
1173        }
1174
1175        let json_str = r#"{"id":"cancel-456","op":"cancel-order","code":"50001","msg":"Order not found","data":[]}"#;
1176        let result: Result<OKXWsMessage, _> = serde_json::from_str(json_str);
1177        match result {
1178            Ok(OKXWsMessage::OrderResponse {
1179                id,
1180                op,
1181                code,
1182                msg,
1183                data,
1184            }) => {
1185                assert_eq!(id, Some("cancel-456".to_string()));
1186                assert_eq!(op, OKXWsOperation::CancelOrder);
1187                assert_eq!(code, "50001");
1188                assert_eq!(msg, "Order not found");
1189                assert!(data.is_empty());
1190            }
1191            Ok(other) => panic!("Expected OrderResponse, was: {other:?}"),
1192            Err(e) => panic!("Failed to deserialize: {e}"),
1193        }
1194
1195        let json_str = r#"{"id":"amend-789","op":"amend-order","code":"50002","msg":"Invalid price","data":[]}"#;
1196        let result: Result<OKXWsMessage, _> = serde_json::from_str(json_str);
1197        match result {
1198            Ok(OKXWsMessage::OrderResponse {
1199                id,
1200                op,
1201                code,
1202                msg,
1203                data,
1204            }) => {
1205                assert_eq!(id, Some("amend-789".to_string()));
1206                assert_eq!(op, OKXWsOperation::AmendOrder);
1207                assert_eq!(code, "50002");
1208                assert_eq!(msg, "Invalid price");
1209                assert!(data.is_empty());
1210            }
1211            Ok(other) => panic!("Expected OrderResponse, was: {other:?}"),
1212            Err(e) => panic!("Failed to deserialize: {e}"),
1213        }
1214    }
1215
1216    #[rstest]
1217    fn test_operation_enum_serialization() {
1218        let op = OKXWsOperation::Order;
1219        let serialized = serde_json::to_string(&op).unwrap();
1220        assert_eq!(serialized, "\"order\"");
1221
1222        let op = OKXWsOperation::CancelOrder;
1223        let serialized = serde_json::to_string(&op).unwrap();
1224        assert_eq!(serialized, "\"cancel-order\"");
1225
1226        let op = OKXWsOperation::AmendOrder;
1227        let serialized = serde_json::to_string(&op).unwrap();
1228        assert_eq!(serialized, "\"amend-order\"");
1229
1230        let op = OKXWsOperation::Subscribe;
1231        let serialized = serde_json::to_string(&op).unwrap();
1232        assert_eq!(serialized, "\"subscribe\"");
1233    }
1234
1235    #[rstest]
1236    fn test_order_response_parsing() {
1237        let success_response = r#"{
1238            "id": "req-123",
1239            "op": "order",
1240            "code": "0",
1241            "msg": "",
1242            "data": [{"sMsg": "Order placed successfully"}]
1243        }"#;
1244
1245        let parsed: OKXWsMessage = serde_json::from_str(success_response).unwrap();
1246
1247        match parsed {
1248            OKXWsMessage::OrderResponse {
1249                id,
1250                op,
1251                code,
1252                msg,
1253                data,
1254            } => {
1255                assert_eq!(id, Some("req-123".to_string()));
1256                assert_eq!(op, OKXWsOperation::Order);
1257                assert_eq!(code, "0");
1258                assert_eq!(msg, "");
1259                assert_eq!(data.len(), 1);
1260            }
1261            _ => panic!("Expected OrderResponse variant"),
1262        }
1263
1264        let failure_response = r#"{
1265            "id": "req-456",
1266            "op": "cancel-order",
1267            "code": "50001",
1268            "msg": "Order not found",
1269            "data": [{"sMsg": "Order with client order ID not found"}]
1270        }"#;
1271
1272        let parsed: OKXWsMessage = serde_json::from_str(failure_response).unwrap();
1273
1274        match parsed {
1275            OKXWsMessage::OrderResponse {
1276                id,
1277                op,
1278                code,
1279                msg,
1280                data,
1281            } => {
1282                assert_eq!(id, Some("req-456".to_string()));
1283                assert_eq!(op, OKXWsOperation::CancelOrder);
1284                assert_eq!(code, "50001");
1285                assert_eq!(msg, "Order not found");
1286                assert_eq!(data.len(), 1);
1287            }
1288            _ => panic!("Expected OrderResponse variant"),
1289        }
1290    }
1291
1292    #[rstest]
1293    fn test_subscription_event_parsing() {
1294        let subscription_json = r#"{
1295            "event": "subscribe",
1296            "arg": {
1297                "channel": "tickers",
1298                "instId": "BTC-USDT"
1299            },
1300            "connId": "a4d3ae55"
1301        }"#;
1302
1303        let parsed: OKXWsMessage = serde_json::from_str(subscription_json).unwrap();
1304
1305        match parsed {
1306            OKXWsMessage::Subscription {
1307                event,
1308                arg,
1309                conn_id,
1310                ..
1311            } => {
1312                assert_eq!(
1313                    event,
1314                    crate::websocket::enums::OKXSubscriptionEvent::Subscribe
1315                );
1316                assert_eq!(arg.channel, OKXWsChannel::Tickers);
1317                assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
1318                assert_eq!(conn_id, "a4d3ae55");
1319            }
1320            _ => panic!("Expected Subscription variant"),
1321        }
1322    }
1323
1324    #[rstest]
1325    fn test_login_event_parsing() {
1326        let login_success = r#"{
1327            "event": "login",
1328            "code": "0",
1329            "msg": "Login successful",
1330            "connId": "a4d3ae55"
1331        }"#;
1332
1333        let parsed: OKXWsMessage = serde_json::from_str(login_success).unwrap();
1334
1335        match parsed {
1336            OKXWsMessage::Login {
1337                event,
1338                code,
1339                msg,
1340                conn_id,
1341            } => {
1342                assert_eq!(event, "login");
1343                assert_eq!(code, "0");
1344                assert_eq!(msg, "Login successful");
1345                assert_eq!(conn_id, "a4d3ae55");
1346            }
1347            _ => panic!("Expected Login variant, was: {parsed:?}"),
1348        }
1349    }
1350
1351    #[rstest]
1352    fn test_error_event_parsing() {
1353        let error_json = r#"{
1354            "code": "60012",
1355            "msg": "Invalid request"
1356        }"#;
1357
1358        let parsed: OKXWsMessage = serde_json::from_str(error_json).unwrap();
1359
1360        match parsed {
1361            OKXWsMessage::Error { code, msg } => {
1362                assert_eq!(code, "60012");
1363                assert_eq!(msg, "Invalid request");
1364            }
1365            _ => panic!("Expected Error variant"),
1366        }
1367    }
1368
1369    #[rstest]
1370    fn test_error_event_with_event_field_parsing() {
1371        // OKX sends error events with "event":"error" field (e.g., login failures)
1372        let error_json = r#"{
1373            "event": "error",
1374            "code": "60018",
1375            "msg": "Invalid sign"
1376        }"#;
1377
1378        let parsed: OKXWsMessage = serde_json::from_str(error_json).unwrap();
1379
1380        match parsed {
1381            OKXWsMessage::Error { code, msg } => {
1382                assert_eq!(code, "60018");
1383                assert_eq!(msg, "Invalid sign");
1384            }
1385            _ => panic!("Expected Error variant, was: {parsed:?}"),
1386        }
1387    }
1388
1389    #[rstest]
1390    fn test_subscription_error_with_arg_field_parsing() {
1391        // OKX sends subscription errors with arg field (channel subscription failures)
1392        let error_json = r#"{
1393            "event": "error",
1394            "arg": {"channel": "tickers", "instId": "INVALID-INST"},
1395            "code": "60012",
1396            "msg": "Invalid request: channel not found",
1397            "connId": "a4d3ae55"
1398        }"#;
1399
1400        let parsed: OKXWsMessage = serde_json::from_str(error_json).unwrap();
1401
1402        match parsed {
1403            OKXWsMessage::Error { code, msg } => {
1404                assert_eq!(code, "60012");
1405                assert_eq!(msg, "Invalid request: channel not found");
1406            }
1407            _ => panic!("Expected Error variant, was: {parsed:?}"),
1408        }
1409    }
1410
1411    #[rstest]
1412    fn test_websocket_request_serialization() {
1413        let request = OKXWsRequest {
1414            id: Some("req-123".to_string()),
1415            op: OKXWsOperation::Order,
1416            args: vec![serde_json::json!({
1417                "instId": "BTC-USDT",
1418                "tdMode": "cash",
1419                "side": "buy",
1420                "ordType": "market",
1421                "sz": "0.1"
1422            })],
1423            exp_time: None,
1424        };
1425
1426        let serialized = serde_json::to_string(&request).unwrap();
1427        let parsed: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1428
1429        assert_eq!(parsed["id"], "req-123");
1430        assert_eq!(parsed["op"], "order");
1431        assert!(parsed["args"].is_array());
1432        assert_eq!(parsed["args"].as_array().unwrap().len(), 1);
1433    }
1434
1435    #[rstest]
1436    fn test_subscription_request_serialization() {
1437        let subscription = OKXSubscription {
1438            op: OKXWsOperation::Subscribe,
1439            args: vec![OKXSubscriptionArg {
1440                channel: OKXWsChannel::Tickers,
1441                inst_type: Some(OKXInstrumentType::Spot),
1442                inst_family: None,
1443                inst_id: Some(Ustr::from("BTC-USDT")),
1444            }],
1445        };
1446
1447        let serialized = serde_json::to_string(&subscription).unwrap();
1448        let parsed: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1449
1450        assert_eq!(parsed["op"], "subscribe");
1451        assert!(parsed["args"].is_array());
1452        assert_eq!(parsed["args"][0]["channel"], "tickers");
1453        assert_eq!(parsed["args"][0]["instType"], "SPOT");
1454        assert_eq!(parsed["args"][0]["instId"], "BTC-USDT");
1455    }
1456
1457    #[rstest]
1458    fn test_error_message_extraction() {
1459        let responses = vec![
1460            (
1461                r#"{
1462                "id": "req-123",
1463                "op": "order",
1464                "code": "50001",
1465                "msg": "Order failed",
1466                "data": [{"sMsg": "Insufficient balance"}]
1467            }"#,
1468                "Insufficient balance",
1469            ),
1470            (
1471                r#"{
1472                "id": "req-456",
1473                "op": "cancel-order",
1474                "code": "50002",
1475                "msg": "Cancel failed",
1476                "data": [{}]
1477            }"#,
1478                "Cancel failed",
1479            ),
1480        ];
1481
1482        for (response_json, expected_msg) in responses {
1483            let parsed: OKXWsMessage = serde_json::from_str(response_json).unwrap();
1484
1485            match parsed {
1486                OKXWsMessage::OrderResponse {
1487                    id: _,
1488                    op: _,
1489                    code,
1490                    msg,
1491                    data,
1492                } => {
1493                    assert_ne!(code, "0"); // Error response
1494
1495                    // Extract error message with fallback logic
1496                    let error_msg = data
1497                        .first()
1498                        .and_then(|d| d.get("sMsg"))
1499                        .and_then(|s| s.as_str())
1500                        .filter(|s| !s.is_empty())
1501                        .unwrap_or(&msg);
1502
1503                    assert_eq!(error_msg, expected_msg);
1504                }
1505                _ => panic!("Expected OrderResponse variant"),
1506            }
1507        }
1508    }
1509
1510    #[rstest]
1511    fn test_book_data_parsing() {
1512        let book_data_json = r#"{
1513            "arg": {
1514                "channel": "books",
1515                "instId": "BTC-USDT"
1516            },
1517            "action": "snapshot",
1518            "data": [{
1519                "asks": [["50000.0", "0.1", "0", "1"]],
1520                "bids": [["49999.0", "0.2", "0", "1"]],
1521                "ts": "1640995200000",
1522                "checksum": 123456789,
1523                "seqId": 1000
1524            }]
1525        }"#;
1526
1527        let parsed: OKXWsMessage = serde_json::from_str(book_data_json).unwrap();
1528
1529        match parsed {
1530            OKXWsMessage::BookData { arg, action, data } => {
1531                assert_eq!(arg.channel, OKXWsChannel::Books);
1532                assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
1533                assert_eq!(
1534                    action,
1535                    super::super::super::common::enums::OKXBookAction::Snapshot
1536                );
1537                assert_eq!(data.len(), 1);
1538            }
1539            _ => panic!("Expected BookData variant"),
1540        }
1541    }
1542
1543    #[rstest]
1544    fn test_data_event_parsing() {
1545        let data_json = r#"{
1546            "arg": {
1547                "channel": "trades",
1548                "instId": "BTC-USDT"
1549            },
1550            "data": [{
1551                "instId": "BTC-USDT",
1552                "tradeId": "12345",
1553                "px": "50000.0",
1554                "sz": "0.1",
1555                "side": "buy",
1556                "ts": "1640995200000"
1557            }]
1558        }"#;
1559
1560        let parsed: OKXWsMessage = serde_json::from_str(data_json).unwrap();
1561
1562        match parsed {
1563            OKXWsMessage::Data { arg, data } => {
1564                assert_eq!(arg.channel, OKXWsChannel::Trades);
1565                assert_eq!(arg.inst_id, Some(Ustr::from("BTC-USDT")));
1566                assert!(data.is_array());
1567            }
1568            _ => panic!("Expected Data variant"),
1569        }
1570    }
1571
1572    #[rstest]
1573    fn test_nautilus_message_variants() {
1574        let clock = get_atomic_clock_realtime();
1575        let ts_init = clock.get_time_ns();
1576
1577        let error = OKXWebSocketError {
1578            code: "60012".to_string(),
1579            message: "Invalid request".to_string(),
1580            conn_id: None,
1581            timestamp: ts_init.as_u64(),
1582        };
1583        let error_msg = NautilusWsMessage::Error(error);
1584
1585        match error_msg {
1586            NautilusWsMessage::Error(e) => {
1587                assert_eq!(e.code, "60012");
1588                assert_eq!(e.message, "Invalid request");
1589            }
1590            _ => panic!("Expected Error variant"),
1591        }
1592
1593        let raw_scenarios = vec![
1594            ::serde_json::json!({"unknown": "data"}),
1595            ::serde_json::json!({"channel": "unsupported", "data": [1, 2, 3]}),
1596            ::serde_json::json!({"complex": {"nested": {"structure": true}}}),
1597        ];
1598
1599        for raw_data in raw_scenarios {
1600            let raw_msg = NautilusWsMessage::Raw(raw_data.clone());
1601
1602            match raw_msg {
1603                NautilusWsMessage::Raw(data) => {
1604                    assert_eq!(data, raw_data);
1605                }
1606                _ => panic!("Expected Raw variant"),
1607            }
1608        }
1609    }
1610
1611    #[rstest]
1612    fn test_order_response_parsing_success() {
1613        let order_response_json = r#"{
1614            "id": "req-123",
1615            "op": "order",
1616            "code": "0",
1617            "msg": "",
1618            "data": [{"sMsg": "Order placed successfully"}]
1619        }"#;
1620
1621        let parsed: OKXWsMessage = serde_json::from_str(order_response_json).unwrap();
1622
1623        match parsed {
1624            OKXWsMessage::OrderResponse {
1625                id,
1626                op,
1627                code,
1628                msg,
1629                data,
1630            } => {
1631                assert_eq!(id, Some("req-123".to_string()));
1632                assert_eq!(op, OKXWsOperation::Order);
1633                assert_eq!(code, "0");
1634                assert_eq!(msg, "");
1635                assert_eq!(data.len(), 1);
1636            }
1637            _ => panic!("Expected OrderResponse variant"),
1638        }
1639    }
1640
1641    #[rstest]
1642    fn test_order_response_parsing_failure() {
1643        let order_response_json = r#"{
1644            "id": "req-456",
1645            "op": "cancel-order",
1646            "code": "50001",
1647            "msg": "Order not found",
1648            "data": [{"sMsg": "Order with client order ID not found"}]
1649        }"#;
1650
1651        let parsed: OKXWsMessage = serde_json::from_str(order_response_json).unwrap();
1652
1653        match parsed {
1654            OKXWsMessage::OrderResponse {
1655                id,
1656                op,
1657                code,
1658                msg,
1659                data,
1660            } => {
1661                assert_eq!(id, Some("req-456".to_string()));
1662                assert_eq!(op, OKXWsOperation::CancelOrder);
1663                assert_eq!(code, "50001");
1664                assert_eq!(msg, "Order not found");
1665                assert_eq!(data.len(), 1);
1666            }
1667            _ => panic!("Expected OrderResponse variant"),
1668        }
1669    }
1670
1671    #[rstest]
1672    fn test_message_request_serialization() {
1673        let request = OKXWsRequest {
1674            id: Some("req-123".to_string()),
1675            op: OKXWsOperation::Order,
1676            args: vec![::serde_json::json!({
1677                "instId": "BTC-USDT",
1678                "tdMode": "cash",
1679                "side": "buy",
1680                "ordType": "market",
1681                "sz": "0.1"
1682            })],
1683            exp_time: None,
1684        };
1685
1686        let serialized = serde_json::to_string(&request).unwrap();
1687        let parsed: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1688
1689        assert_eq!(parsed["id"], "req-123");
1690        assert_eq!(parsed["op"], "order");
1691        assert!(parsed["args"].is_array());
1692        assert_eq!(parsed["args"].as_array().unwrap().len(), 1);
1693    }
1694}