nautilus_okx/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use ahash::AHashMap;
17use nautilus_core::nanos::UnixNanos;
18use nautilus_model::{
19    data::{
20        Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
21        MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API, OrderBookDepth10,
22        QuoteTick, TradeTick, depth::DEPTH10_LEN,
23    },
24    enums::{
25        AggregationSource, AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus,
26        OrderType, RecordFlag, TimeInForce,
27    },
28    identifiers::{AccountId, InstrumentId, TradeId, VenueOrderId},
29    instruments::{Instrument, InstrumentAny},
30    reports::{FillReport, OrderStatusReport},
31    types::{Currency, Money, Price, Quantity},
32};
33use ustr::Ustr;
34
35use super::{
36    enums::OKXWsChannel,
37    messages::{
38        OKXBookMsg, OKXCandleMsg, OKXIndexPriceMsg, OKXMarkPriceMsg, OKXOrderMsg, OKXTickerMsg,
39        OKXTradeMsg, OrderBookEntry,
40    },
41};
42use crate::{
43    common::{
44        enums::{OKXBookAction, OKXCandleConfirm, OKXOrderStatus, OKXOrderType},
45        models::OKXInstrument,
46        parse::{
47            okx_channel_to_bar_spec, parse_client_order_id, parse_fee, parse_funding_rate_msg,
48            parse_instrument_any, parse_message_vec, parse_millisecond_timestamp, parse_price,
49            parse_quantity,
50        },
51    },
52    websocket::messages::{ExecutionReport, NautilusWsMessage, OKXFundingRateMsg},
53};
54
55/// Parses vector of OKX book messages into Nautilus order book deltas.
56pub fn parse_book_msg_vec(
57    data: Vec<OKXBookMsg>,
58    instrument_id: &InstrumentId,
59    price_precision: u8,
60    size_precision: u8,
61    action: OKXBookAction,
62    ts_init: UnixNanos,
63) -> anyhow::Result<Vec<Data>> {
64    let mut deltas = Vec::with_capacity(data.len());
65
66    for msg in data {
67        let deltas_api = OrderBookDeltas_API::new(parse_book_msg(
68            &msg,
69            *instrument_id,
70            price_precision,
71            size_precision,
72            &action,
73            ts_init,
74        )?);
75        deltas.push(Data::Deltas(deltas_api));
76    }
77
78    Ok(deltas)
79}
80
81/// Parses vector of OKX ticker messages into Nautilus quote ticks.
82pub fn parse_ticker_msg_vec(
83    data: serde_json::Value,
84    instrument_id: &InstrumentId,
85    price_precision: u8,
86    size_precision: u8,
87    ts_init: UnixNanos,
88) -> anyhow::Result<Vec<Data>> {
89    parse_message_vec(
90        data,
91        |msg| {
92            parse_ticker_msg(
93                msg,
94                *instrument_id,
95                price_precision,
96                size_precision,
97                ts_init,
98            )
99        },
100        Data::Quote,
101    )
102}
103
104/// Parses vector of OKX book messages into Nautilus quote ticks.
105pub fn parse_quote_msg_vec(
106    data: serde_json::Value,
107    instrument_id: &InstrumentId,
108    price_precision: u8,
109    size_precision: u8,
110    ts_init: UnixNanos,
111) -> anyhow::Result<Vec<Data>> {
112    parse_message_vec(
113        data,
114        |msg| {
115            parse_quote_msg(
116                msg,
117                *instrument_id,
118                price_precision,
119                size_precision,
120                ts_init,
121            )
122        },
123        Data::Quote,
124    )
125}
126
127/// Parses vector of OKX trade messages into Nautilus trade ticks.
128pub fn parse_trade_msg_vec(
129    data: serde_json::Value,
130    instrument_id: &InstrumentId,
131    price_precision: u8,
132    size_precision: u8,
133    ts_init: UnixNanos,
134) -> anyhow::Result<Vec<Data>> {
135    parse_message_vec(
136        data,
137        |msg| {
138            parse_trade_msg(
139                msg,
140                *instrument_id,
141                price_precision,
142                size_precision,
143                ts_init,
144            )
145        },
146        Data::Trade,
147    )
148}
149
150/// Parses vector of OKX mark price messages into Nautilus mark price updates.
151pub fn parse_mark_price_msg_vec(
152    data: serde_json::Value,
153    instrument_id: &InstrumentId,
154    price_precision: u8,
155    ts_init: UnixNanos,
156) -> anyhow::Result<Vec<Data>> {
157    parse_message_vec(
158        data,
159        |msg| parse_mark_price_msg(msg, *instrument_id, price_precision, ts_init),
160        Data::MarkPriceUpdate,
161    )
162}
163
164/// Parses vector of OKX index price messages into Nautilus index price updates.
165pub fn parse_index_price_msg_vec(
166    data: serde_json::Value,
167    instrument_id: &InstrumentId,
168    price_precision: u8,
169    ts_init: UnixNanos,
170) -> anyhow::Result<Vec<Data>> {
171    parse_message_vec(
172        data,
173        |msg| parse_index_price_msg(msg, *instrument_id, price_precision, ts_init),
174        Data::IndexPriceUpdate,
175    )
176}
177
178/// Parses vector of OKX funding rate messages into Nautilus funding rate updates.
179/// Includes caching to filter out duplicate funding rates.
180pub fn parse_funding_rate_msg_vec(
181    data: serde_json::Value,
182    instrument_id: &InstrumentId,
183    ts_init: UnixNanos,
184    funding_cache: &mut AHashMap<Ustr, (Ustr, u64)>,
185) -> anyhow::Result<Vec<FundingRateUpdate>> {
186    let msgs: Vec<OKXFundingRateMsg> = serde_json::from_value(data)?;
187
188    let mut result = Vec::with_capacity(msgs.len());
189    for msg in &msgs {
190        let cache_key = (msg.funding_rate, msg.funding_time);
191
192        if let Some(cached) = funding_cache.get(&msg.inst_id)
193            && *cached == cache_key
194        {
195            continue; // Skip duplicate
196        }
197
198        // New or changed funding rate, update cache and parse
199        funding_cache.insert(msg.inst_id, cache_key);
200        let funding_rate = parse_funding_rate_msg(msg, *instrument_id, ts_init)?;
201        result.push(funding_rate);
202    }
203
204    Ok(result)
205}
206
207/// Parses vector of OKX candle messages into Nautilus bars.
208pub fn parse_candle_msg_vec(
209    data: serde_json::Value,
210    instrument_id: &InstrumentId,
211    price_precision: u8,
212    size_precision: u8,
213    spec: BarSpecification,
214    ts_init: UnixNanos,
215) -> anyhow::Result<Vec<Data>> {
216    let msgs: Vec<OKXCandleMsg> = serde_json::from_value(data)?;
217    let bar_type = BarType::new(*instrument_id, spec, AggregationSource::External);
218    let mut bars = Vec::with_capacity(msgs.len());
219
220    for msg in msgs {
221        // Only process completed candles to avoid duplicate/partial bars
222        if msg.confirm == OKXCandleConfirm::Closed {
223            let bar = parse_candle_msg(&msg, bar_type, price_precision, size_precision, ts_init)?;
224            bars.push(Data::Bar(bar));
225        }
226    }
227
228    Ok(bars)
229}
230
231/// Parses vector of OKX book messages into Nautilus depth10 updates.
232pub fn parse_book10_msg_vec(
233    data: Vec<OKXBookMsg>,
234    instrument_id: &InstrumentId,
235    price_precision: u8,
236    size_precision: u8,
237    ts_init: UnixNanos,
238) -> anyhow::Result<Vec<Data>> {
239    let mut depth10_updates = Vec::with_capacity(data.len());
240
241    for msg in data {
242        let depth10 = parse_book10_msg(
243            &msg,
244            *instrument_id,
245            price_precision,
246            size_precision,
247            ts_init,
248        )?;
249        depth10_updates.push(Data::Depth10(Box::new(depth10)));
250    }
251
252    Ok(depth10_updates)
253}
254
255/// Parses an OKX book message into Nautilus order book deltas.
256pub fn parse_book_msg(
257    msg: &OKXBookMsg,
258    instrument_id: InstrumentId,
259    price_precision: u8,
260    size_precision: u8,
261    action: &OKXBookAction,
262    ts_init: UnixNanos,
263) -> anyhow::Result<OrderBookDeltas> {
264    let flags = if action == &OKXBookAction::Snapshot {
265        RecordFlag::F_SNAPSHOT as u8
266    } else {
267        0
268    };
269    let ts_event = parse_millisecond_timestamp(msg.ts);
270
271    let mut deltas = Vec::with_capacity(msg.asks.len() + msg.bids.len());
272
273    for bid in &msg.bids {
274        let book_action = match action {
275            OKXBookAction::Snapshot => BookAction::Add,
276            _ => match bid.size.as_str() {
277                "0" => BookAction::Delete,
278                _ => BookAction::Update,
279            },
280        };
281        let price = parse_price(&bid.price, price_precision)?;
282        let size = parse_quantity(&bid.size, size_precision)?;
283        let order_id = 0; // TBD
284        let order = BookOrder::new(OrderSide::Buy, price, size, order_id);
285        let delta = OrderBookDelta::new(
286            instrument_id,
287            book_action,
288            order,
289            flags,
290            msg.seq_id,
291            ts_event,
292            ts_init,
293        );
294        deltas.push(delta)
295    }
296
297    for ask in &msg.asks {
298        let book_action = match action {
299            OKXBookAction::Snapshot => BookAction::Add,
300            _ => match ask.size.as_str() {
301                "0" => BookAction::Delete,
302                _ => BookAction::Update,
303            },
304        };
305        let price = parse_price(&ask.price, price_precision)?;
306        let size = parse_quantity(&ask.size, size_precision)?;
307        let order_id = 0; // TBD
308        let order = BookOrder::new(OrderSide::Sell, price, size, order_id);
309        let delta = OrderBookDelta::new(
310            instrument_id,
311            book_action,
312            order,
313            flags,
314            msg.seq_id,
315            ts_event,
316            ts_init,
317        );
318        deltas.push(delta)
319    }
320
321    OrderBookDeltas::new_checked(instrument_id, deltas)
322}
323
324/// Parses an OKX book message into a Nautilus quote tick.
325pub fn parse_quote_msg(
326    msg: &OKXBookMsg,
327    instrument_id: InstrumentId,
328    price_precision: u8,
329    size_precision: u8,
330    ts_init: UnixNanos,
331) -> anyhow::Result<QuoteTick> {
332    let best_bid: &OrderBookEntry = &msg.bids[0];
333    let best_ask: &OrderBookEntry = &msg.asks[0];
334
335    let bid_price = parse_price(&best_bid.price, price_precision)?;
336    let ask_price = parse_price(&best_ask.price, price_precision)?;
337    let bid_size = parse_quantity(&best_bid.size, size_precision)?;
338    let ask_size = parse_quantity(&best_ask.size, size_precision)?;
339    let ts_event = parse_millisecond_timestamp(msg.ts);
340
341    QuoteTick::new_checked(
342        instrument_id,
343        bid_price,
344        ask_price,
345        bid_size,
346        ask_size,
347        ts_event,
348        ts_init,
349    )
350}
351
352/// Parses an OKX book message into a Nautilus [`OrderBookDepth10`].
353///
354/// Converts order book data into a fixed-depth snapshot with top 10 levels for both sides.
355pub fn parse_book10_msg(
356    msg: &OKXBookMsg,
357    instrument_id: InstrumentId,
358    price_precision: u8,
359    size_precision: u8,
360    ts_init: UnixNanos,
361) -> anyhow::Result<OrderBookDepth10> {
362    // Initialize arrays - need to fill all 10 levels even if we have fewer
363    let mut bids: [BookOrder; DEPTH10_LEN] = [BookOrder::default(); DEPTH10_LEN];
364    let mut asks: [BookOrder; DEPTH10_LEN] = [BookOrder::default(); DEPTH10_LEN];
365    let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
366    let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
367
368    // Parse available bid levels (up to 10)
369    let bid_len = msg.bids.len().min(DEPTH10_LEN);
370    for (i, level) in msg.bids.iter().take(DEPTH10_LEN).enumerate() {
371        let price = parse_price(&level.price, price_precision)?;
372        let size = parse_quantity(&level.size, size_precision)?;
373        let orders_count = level.orders_count.parse::<u32>().unwrap_or(1);
374
375        let bid_order = BookOrder::new(OrderSide::Buy, price, size, 0);
376        bids[i] = bid_order;
377        bid_counts[i] = orders_count;
378    }
379
380    // Fill remaining bid slots with empty Buy orders (not NULL orders)
381    for i in bid_len..DEPTH10_LEN {
382        bids[i] = BookOrder::new(
383            OrderSide::Buy,
384            Price::zero(price_precision),
385            Quantity::zero(size_precision),
386            0,
387        );
388        bid_counts[i] = 0;
389    }
390
391    // Parse available ask levels (up to 10)
392    let ask_len = msg.asks.len().min(DEPTH10_LEN);
393    for (i, level) in msg.asks.iter().take(DEPTH10_LEN).enumerate() {
394        let price = parse_price(&level.price, price_precision)?;
395        let size = parse_quantity(&level.size, size_precision)?;
396        let orders_count = level.orders_count.parse::<u32>().unwrap_or(1);
397
398        let ask_order = BookOrder::new(OrderSide::Sell, price, size, 0);
399        asks[i] = ask_order;
400        ask_counts[i] = orders_count;
401    }
402
403    // Fill remaining ask slots with empty Sell orders (not NULL orders)
404    for i in ask_len..DEPTH10_LEN {
405        asks[i] = BookOrder::new(
406            OrderSide::Sell,
407            Price::zero(price_precision),
408            Quantity::zero(size_precision),
409            0,
410        );
411        ask_counts[i] = 0;
412    }
413
414    let ts_event = parse_millisecond_timestamp(msg.ts);
415
416    Ok(OrderBookDepth10::new(
417        instrument_id,
418        bids,
419        asks,
420        bid_counts,
421        ask_counts,
422        RecordFlag::F_SNAPSHOT as u8,
423        msg.seq_id, // Use sequence ID for OKX L2 books
424        ts_event,
425        ts_init,
426    ))
427}
428
429/// Parses an OKX ticker message into a Nautilus quote tick.
430pub fn parse_ticker_msg(
431    msg: &OKXTickerMsg,
432    instrument_id: InstrumentId,
433    price_precision: u8,
434    size_precision: u8,
435    ts_init: UnixNanos,
436) -> anyhow::Result<QuoteTick> {
437    let bid_price = parse_price(&msg.bid_px, price_precision)?;
438    let ask_price = parse_price(&msg.ask_px, price_precision)?;
439    let bid_size = parse_quantity(&msg.bid_sz, size_precision)?;
440    let ask_size = parse_quantity(&msg.ask_sz, size_precision)?;
441    let ts_event = parse_millisecond_timestamp(msg.ts);
442
443    QuoteTick::new_checked(
444        instrument_id,
445        bid_price,
446        ask_price,
447        bid_size,
448        ask_size,
449        ts_event,
450        ts_init,
451    )
452}
453
454/// Parses an OKX trade message into a Nautilus trade tick.
455pub fn parse_trade_msg(
456    msg: &OKXTradeMsg,
457    instrument_id: InstrumentId,
458    price_precision: u8,
459    size_precision: u8,
460    ts_init: UnixNanos,
461) -> anyhow::Result<TradeTick> {
462    let price = parse_price(&msg.px, price_precision)?;
463    let size = parse_quantity(&msg.sz, size_precision)?;
464    let aggressor_side: AggressorSide = msg.side.into();
465    let trade_id = TradeId::new(&msg.trade_id);
466    let ts_event = parse_millisecond_timestamp(msg.ts);
467
468    TradeTick::new_checked(
469        instrument_id,
470        price,
471        size,
472        aggressor_side,
473        trade_id,
474        ts_event,
475        ts_init,
476    )
477}
478
479/// Parses an OKX mark price message into a Nautilus mark price update.
480pub fn parse_mark_price_msg(
481    msg: &OKXMarkPriceMsg,
482    instrument_id: InstrumentId,
483    price_precision: u8,
484    ts_init: UnixNanos,
485) -> anyhow::Result<MarkPriceUpdate> {
486    let price = parse_price(&msg.mark_px, price_precision)?;
487    let ts_event = parse_millisecond_timestamp(msg.ts);
488
489    Ok(MarkPriceUpdate::new(
490        instrument_id,
491        price,
492        ts_event,
493        ts_init,
494    ))
495}
496
497/// Parses an OKX index price message into a Nautilus index price update.
498pub fn parse_index_price_msg(
499    msg: &OKXIndexPriceMsg,
500    instrument_id: InstrumentId,
501    price_precision: u8,
502    ts_init: UnixNanos,
503) -> anyhow::Result<IndexPriceUpdate> {
504    let price = parse_price(&msg.idx_px, price_precision)?;
505    let ts_event = parse_millisecond_timestamp(msg.ts);
506
507    Ok(IndexPriceUpdate::new(
508        instrument_id,
509        price,
510        ts_event,
511        ts_init,
512    ))
513}
514
515/// Parses an OKX candle message into a Nautilus bar.
516pub fn parse_candle_msg(
517    msg: &OKXCandleMsg,
518    bar_type: BarType,
519    price_precision: u8,
520    size_precision: u8,
521    ts_init: UnixNanos,
522) -> anyhow::Result<Bar> {
523    let open = parse_price(&msg.o, price_precision)?;
524    let high = parse_price(&msg.h, price_precision)?;
525    let low = parse_price(&msg.l, price_precision)?;
526    let close = parse_price(&msg.c, price_precision)?;
527    let volume = parse_quantity(&msg.vol, size_precision)?;
528    let ts_event = parse_millisecond_timestamp(msg.ts);
529
530    Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
531}
532
533/// Parses vector of OKX order messages into Nautilus execution reports.
534pub fn parse_order_msg_vec(
535    data: Vec<OKXOrderMsg>,
536    account_id: AccountId,
537    instruments: &AHashMap<Ustr, InstrumentAny>,
538    fee_cache: &AHashMap<Ustr, Money>,
539    ts_init: UnixNanos,
540) -> anyhow::Result<Vec<ExecutionReport>> {
541    let mut order_reports = Vec::with_capacity(data.len());
542
543    for msg in data {
544        let inst = instruments
545            .get(&msg.inst_id)
546            .ok_or_else(|| anyhow::anyhow!("No instrument found for inst_id: {}", msg.inst_id))?;
547
548        let previous_fee = fee_cache.get(&msg.ord_id).copied();
549
550        let result = match &msg.state {
551            OKXOrderStatus::Filled | OKXOrderStatus::PartiallyFilled => {
552                parse_fill_report(&msg, inst, account_id, previous_fee, ts_init)
553                    .map(ExecutionReport::Fill)
554            }
555            _ => parse_order_status_report(&msg, inst, account_id, ts_init)
556                .map(ExecutionReport::Order),
557        };
558
559        match result {
560            Ok(report) => order_reports.push(report),
561            Err(e) => tracing::error!("Failed to parse execution report from message: {e}"),
562        }
563    }
564
565    Ok(order_reports)
566}
567
568/// Parses an OKX order message into a Nautilus order status report.
569pub fn parse_order_status_report(
570    msg: &OKXOrderMsg,
571    instrument: &InstrumentAny,
572    account_id: AccountId,
573    ts_init: UnixNanos,
574) -> anyhow::Result<OrderStatusReport> {
575    let client_order_id = parse_client_order_id(&msg.cl_ord_id);
576    let venue_order_id = VenueOrderId::new(msg.ord_id);
577    let order_side: OrderSide = msg.side.into();
578
579    let okx_order_type = msg.ord_type;
580    let order_type: OrderType = msg.ord_type.into();
581    let order_status: OrderStatus = msg.state.into();
582
583    let time_in_force = match okx_order_type {
584        OKXOrderType::Fok => TimeInForce::Fok,
585        OKXOrderType::Ioc | OKXOrderType::OptimalLimitIoc => TimeInForce::Ioc,
586        _ => TimeInForce::Gtc,
587    };
588
589    let size_precision = instrument.size_precision();
590    let quantity = parse_quantity(&msg.sz, size_precision)?;
591    let filled_qty = parse_quantity(&msg.acc_fill_sz.clone().unwrap_or_default(), size_precision)?;
592
593    let ts_accepted = parse_millisecond_timestamp(msg.c_time);
594    let ts_last = parse_millisecond_timestamp(msg.u_time);
595
596    let mut report = OrderStatusReport::new(
597        account_id,
598        instrument.id(),
599        client_order_id,
600        venue_order_id,
601        order_side,
602        order_type,
603        time_in_force,
604        order_status,
605        quantity,
606        filled_qty,
607        ts_accepted,
608        ts_init,
609        ts_last,
610        None, // Generate UUID4 automatically
611    );
612
613    if !msg.px.is_empty() {
614        let price_precision = instrument.price_precision();
615        if let Ok(price) = parse_price(&msg.px, price_precision) {
616            report = report.with_price(price);
617        }
618    }
619
620    if !msg.avg_px.is_empty() {
621        report = report.with_avg_px(msg.avg_px.parse::<f64>()?);
622    }
623
624    if msg.ord_type == OKXOrderType::PostOnly {
625        report = report.with_post_only(true);
626    }
627
628    if msg.reduce_only == "true" {
629        report = report.with_reduce_only(true);
630    }
631
632    if let Some(reason) = &msg.cancel_source_reason
633        && !reason.is_empty()
634    {
635        report = report.with_cancel_reason(reason.clone());
636    }
637
638    Ok(report)
639}
640
641/// Parses an OKX order message into a Nautilus fill report.
642pub fn parse_fill_report(
643    msg: &OKXOrderMsg,
644    instrument: &InstrumentAny,
645    account_id: AccountId,
646    previous_fee: Option<Money>,
647    ts_init: UnixNanos,
648) -> anyhow::Result<FillReport> {
649    let client_order_id = parse_client_order_id(&msg.cl_ord_id);
650    let venue_order_id = VenueOrderId::new(msg.ord_id);
651    let trade_id = TradeId::from(msg.trade_id.as_str());
652    let order_side: OrderSide = msg.side.into();
653
654    let price_precision = instrument.price_precision();
655    let size_precision = instrument.size_precision();
656    let last_px = parse_price(&msg.fill_px, price_precision)?;
657    let last_qty = parse_quantity(&msg.fill_sz, size_precision)?;
658
659    let fee_currency = Currency::from(&msg.fee_ccy);
660    let total_fee = parse_fee(msg.fee.as_deref(), fee_currency)?;
661    let commission = if let Some(previous_fee) = previous_fee {
662        total_fee - previous_fee
663    } else {
664        total_fee
665    };
666
667    let liquidity_side: LiquiditySide = msg.exec_type.into();
668    let ts_event = parse_millisecond_timestamp(msg.fill_time);
669
670    let report = FillReport::new(
671        account_id,
672        instrument.id(),
673        venue_order_id,
674        trade_id,
675        order_side,
676        last_qty,
677        last_px,
678        commission,
679        liquidity_side,
680        client_order_id,
681        None,
682        ts_event,
683        ts_init,
684        None, // Generate UUID4 automatically
685    );
686
687    Ok(report)
688}
689
690/// Parses OKX WebSocket message payloads into Nautilus data structures.
691///
692/// # Panics
693///
694/// Panics only in the case where `okx_channel_to_bar_spec(channel)` returns
695/// `None` after a prior `is_some` check – an unreachable scenario indicating a
696/// logic error.
697pub fn parse_ws_message_data(
698    channel: &OKXWsChannel,
699    data: serde_json::Value,
700    instrument_id: &InstrumentId,
701    price_precision: u8,
702    size_precision: u8,
703    ts_init: UnixNanos,
704    funding_cache: &mut AHashMap<Ustr, (Ustr, u64)>,
705) -> anyhow::Result<Option<NautilusWsMessage>> {
706    match channel {
707        OKXWsChannel::Instruments => {
708            if let Ok(msg) = serde_json::from_value::<OKXInstrument>(data) {
709                match parse_instrument_any(&msg, ts_init)? {
710                    Some(inst_any) => Ok(Some(NautilusWsMessage::Instrument(Box::new(inst_any)))),
711                    None => {
712                        tracing::warn!("Empty instrument payload: {:?}", msg);
713                        Ok(None)
714                    }
715                }
716            } else {
717                anyhow::bail!("Failed to deserialize instrument payload")
718            }
719        }
720        OKXWsChannel::BboTbt => {
721            let data_vec = parse_quote_msg_vec(
722                data,
723                instrument_id,
724                price_precision,
725                size_precision,
726                ts_init,
727            )?;
728            Ok(Some(NautilusWsMessage::Data(data_vec)))
729        }
730        OKXWsChannel::Tickers => {
731            let data_vec = parse_ticker_msg_vec(
732                data,
733                instrument_id,
734                price_precision,
735                size_precision,
736                ts_init,
737            )?;
738            Ok(Some(NautilusWsMessage::Data(data_vec)))
739        }
740        OKXWsChannel::Trades => {
741            let data_vec = parse_trade_msg_vec(
742                data,
743                instrument_id,
744                price_precision,
745                size_precision,
746                ts_init,
747            )?;
748            Ok(Some(NautilusWsMessage::Data(data_vec)))
749        }
750        OKXWsChannel::MarkPrice => {
751            let data_vec = parse_mark_price_msg_vec(data, instrument_id, price_precision, ts_init)?;
752            Ok(Some(NautilusWsMessage::Data(data_vec)))
753        }
754        OKXWsChannel::IndexTickers => {
755            let data_vec =
756                parse_index_price_msg_vec(data, instrument_id, price_precision, ts_init)?;
757            Ok(Some(NautilusWsMessage::Data(data_vec)))
758        }
759        OKXWsChannel::FundingRate => {
760            let data_vec = parse_funding_rate_msg_vec(data, instrument_id, ts_init, funding_cache)?;
761            Ok(Some(NautilusWsMessage::FundingRates(data_vec)))
762        }
763        channel if okx_channel_to_bar_spec(channel).is_some() => {
764            let bar_spec = okx_channel_to_bar_spec(channel).expect("bar_spec checked above");
765            let data_vec = parse_candle_msg_vec(
766                data,
767                instrument_id,
768                price_precision,
769                size_precision,
770                bar_spec,
771                ts_init,
772            )?;
773            Ok(Some(NautilusWsMessage::Data(data_vec)))
774        }
775        OKXWsChannel::Books
776        | OKXWsChannel::BooksTbt
777        | OKXWsChannel::Books5
778        | OKXWsChannel::Books50Tbt => {
779            if let Ok(book_msgs) = serde_json::from_value::<Vec<OKXBookMsg>>(data) {
780                let data_vec = parse_book10_msg_vec(
781                    book_msgs,
782                    instrument_id,
783                    price_precision,
784                    size_precision,
785                    ts_init,
786                )?;
787                Ok(Some(NautilusWsMessage::Data(data_vec)))
788            } else {
789                anyhow::bail!("Failed to deserialize Books channel data as Vec<OKXBookMsg>")
790            }
791        }
792        _ => {
793            tracing::warn!("Unsupported channel for message parsing: {channel:?}");
794            Ok(None)
795        }
796    }
797}
798
799////////////////////////////////////////////////////////////////////////////////
800// Tests
801////////////////////////////////////////////////////////////////////////////////
802#[cfg(test)]
803mod tests {
804    use ahash::AHashMap;
805    use nautilus_core::nanos::UnixNanos;
806    use nautilus_model::{
807        data::bar::BAR_SPEC_1_DAY_LAST,
808        enums::AggressorSide,
809        identifiers::{ClientOrderId, InstrumentId, Symbol},
810        instruments::CryptoPerpetual,
811        types::{Currency, Price, Quantity},
812    };
813    use rstest::rstest;
814    use rust_decimal::Decimal;
815    use ustr::Ustr;
816
817    use super::*;
818    use crate::{
819        common::{enums::OKXTradeMode, parse::parse_account_state, testing::load_test_json},
820        http::models::OKXAccount,
821        websocket::messages::{OKXWebSocketArg, OKXWebSocketEvent},
822    };
823
824    #[rstest]
825    fn test_parse_books_snapshot() {
826        let json_data = load_test_json("ws_books_snapshot.json");
827        let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
828        let (okx_books, action): (Vec<OKXBookMsg>, OKXBookAction) = match msg {
829            OKXWebSocketEvent::BookData { data, action, .. } => (data, action),
830            _ => panic!("Expected a `BookData` variant"),
831        };
832
833        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
834        let deltas = parse_book_msg(
835            &okx_books[0],
836            instrument_id,
837            2,
838            1,
839            &action,
840            UnixNanos::default(),
841        )
842        .unwrap();
843
844        assert_eq!(deltas.instrument_id, instrument_id);
845        assert_eq!(deltas.deltas.len(), 16);
846        assert_eq!(deltas.flags, 32);
847        assert_eq!(deltas.sequence, 123456);
848        assert_eq!(deltas.ts_event, UnixNanos::from(1597026383085000000));
849        assert_eq!(deltas.ts_init, UnixNanos::default());
850
851        // Verify some individual deltas are parsed correctly
852        assert!(!deltas.deltas.is_empty());
853        // Snapshot should have both bid and ask deltas
854        let bid_deltas: Vec<_> = deltas
855            .deltas
856            .iter()
857            .filter(|d| d.order.side == OrderSide::Buy)
858            .collect();
859        let ask_deltas: Vec<_> = deltas
860            .deltas
861            .iter()
862            .filter(|d| d.order.side == OrderSide::Sell)
863            .collect();
864        assert!(!bid_deltas.is_empty());
865        assert!(!ask_deltas.is_empty());
866    }
867
868    #[rstest]
869    fn test_parse_books_update() {
870        let json_data = load_test_json("ws_books_update.json");
871        let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
872        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
873        let (okx_books, action): (Vec<OKXBookMsg>, OKXBookAction) = match msg {
874            OKXWebSocketEvent::BookData { data, action, .. } => (data, action),
875            _ => panic!("Expected a `BookData` variant"),
876        };
877
878        let deltas = parse_book_msg(
879            &okx_books[0],
880            instrument_id,
881            2,
882            1,
883            &action,
884            UnixNanos::default(),
885        )
886        .unwrap();
887
888        assert_eq!(deltas.instrument_id, instrument_id);
889        assert_eq!(deltas.deltas.len(), 16);
890        assert_eq!(deltas.flags, 0);
891        assert_eq!(deltas.sequence, 123457);
892        assert_eq!(deltas.ts_event, UnixNanos::from(1597026383085000000));
893        assert_eq!(deltas.ts_init, UnixNanos::default());
894
895        // Verify some individual deltas are parsed correctly
896        assert!(!deltas.deltas.is_empty());
897        // Update should also have both bid and ask deltas
898        let bid_deltas: Vec<_> = deltas
899            .deltas
900            .iter()
901            .filter(|d| d.order.side == OrderSide::Buy)
902            .collect();
903        let ask_deltas: Vec<_> = deltas
904            .deltas
905            .iter()
906            .filter(|d| d.order.side == OrderSide::Sell)
907            .collect();
908        assert!(!bid_deltas.is_empty());
909        assert!(!ask_deltas.is_empty());
910    }
911
912    #[rstest]
913    fn test_parse_tickers() {
914        let json_data = load_test_json("ws_tickers.json");
915        let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
916        let okx_tickers: Vec<OKXTickerMsg> = match msg {
917            OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
918            _ => panic!("Expected a `Data` variant"),
919        };
920
921        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
922        let trade =
923            parse_ticker_msg(&okx_tickers[0], instrument_id, 2, 1, UnixNanos::default()).unwrap();
924
925        assert_eq!(trade.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
926        assert_eq!(trade.bid_price, Price::from("8888.88"));
927        assert_eq!(trade.ask_price, Price::from("9999.99"));
928        assert_eq!(trade.bid_size, Quantity::from(5));
929        assert_eq!(trade.ask_size, Quantity::from(11));
930        assert_eq!(trade.ts_event, UnixNanos::from(1597026383085000000));
931        assert_eq!(trade.ts_init, UnixNanos::default());
932    }
933
934    #[rstest]
935    fn test_parse_quotes() {
936        let json_data = load_test_json("ws_bbo_tbt.json");
937        let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
938        let okx_quotes: Vec<OKXBookMsg> = match msg {
939            OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
940            _ => panic!("Expected a `Data` variant"),
941        };
942        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
943
944        let quote =
945            parse_quote_msg(&okx_quotes[0], instrument_id, 2, 1, UnixNanos::default()).unwrap();
946
947        assert_eq!(quote.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
948        assert_eq!(quote.bid_price, Price::from("8476.97"));
949        assert_eq!(quote.ask_price, Price::from("8476.98"));
950        assert_eq!(quote.bid_size, Quantity::from(256));
951        assert_eq!(quote.ask_size, Quantity::from(415));
952        assert_eq!(quote.ts_event, UnixNanos::from(1597026383085000000));
953        assert_eq!(quote.ts_init, UnixNanos::default());
954    }
955
956    #[rstest]
957    fn test_parse_trades() {
958        let json_data = load_test_json("ws_trades.json");
959        let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
960        let okx_trades: Vec<OKXTradeMsg> = match msg {
961            OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
962            _ => panic!("Expected a `Data` variant"),
963        };
964
965        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
966        let trade =
967            parse_trade_msg(&okx_trades[0], instrument_id, 1, 8, UnixNanos::default()).unwrap();
968
969        assert_eq!(trade.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
970        assert_eq!(trade.price, Price::from("42219.9"));
971        assert_eq!(trade.size, Quantity::from("0.12060306"));
972        assert_eq!(trade.aggressor_side, AggressorSide::Buyer);
973        assert_eq!(trade.trade_id, TradeId::from("130639474"));
974        assert_eq!(trade.ts_event, UnixNanos::from(1630048897897000000));
975        assert_eq!(trade.ts_init, UnixNanos::default());
976    }
977
978    #[rstest]
979    fn test_parse_candle() {
980        let json_data = load_test_json("ws_candle.json");
981        let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
982        let okx_candles: Vec<OKXCandleMsg> = match msg {
983            OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
984            _ => panic!("Expected a `Data` variant"),
985        };
986
987        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
988        let bar_type = BarType::new(
989            instrument_id,
990            BAR_SPEC_1_DAY_LAST,
991            AggregationSource::External,
992        );
993        let bar = parse_candle_msg(&okx_candles[0], bar_type, 2, 0, UnixNanos::default()).unwrap();
994
995        assert_eq!(bar.bar_type, bar_type);
996        assert_eq!(bar.open, Price::from("8533.02"));
997        assert_eq!(bar.high, Price::from("8553.74"));
998        assert_eq!(bar.low, Price::from("8527.17"));
999        assert_eq!(bar.close, Price::from("8548.26"));
1000        assert_eq!(bar.volume, Quantity::from(45247));
1001        assert_eq!(bar.ts_event, UnixNanos::from(1597026383085000000));
1002        assert_eq!(bar.ts_init, UnixNanos::default());
1003    }
1004
1005    #[rstest]
1006    fn test_parse_funding_rate() {
1007        let json_data = load_test_json("ws_funding_rate.json");
1008        let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1009
1010        let okx_funding_rates: Vec<crate::websocket::messages::OKXFundingRateMsg> = match msg {
1011            OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
1012            _ => panic!("Expected a `Data` variant"),
1013        };
1014
1015        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1016        let funding_rate =
1017            parse_funding_rate_msg(&okx_funding_rates[0], instrument_id, UnixNanos::default())
1018                .unwrap();
1019
1020        assert_eq!(funding_rate.instrument_id, instrument_id);
1021        assert_eq!(funding_rate.rate, Decimal::new(1, 4));
1022        assert_eq!(
1023            funding_rate.next_funding_ns,
1024            Some(UnixNanos::from(1744590349506000000))
1025        );
1026        assert_eq!(funding_rate.ts_event, UnixNanos::from(1744590349506000000));
1027        assert_eq!(funding_rate.ts_init, UnixNanos::default());
1028    }
1029
1030    #[rstest]
1031    fn test_parse_book_vec() {
1032        let json_data = load_test_json("ws_books_snapshot.json");
1033        let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1034        let (msgs, action): (Vec<OKXBookMsg>, OKXBookAction) = match event {
1035            OKXWebSocketEvent::BookData { data, action, .. } => (data, action),
1036            _ => panic!("Expected BookData"),
1037        };
1038
1039        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1040        let deltas_vec =
1041            parse_book_msg_vec(msgs, &instrument_id, 8, 1, action, UnixNanos::default()).unwrap();
1042
1043        assert_eq!(deltas_vec.len(), 1);
1044
1045        if let Data::Deltas(d) = &deltas_vec[0] {
1046            assert_eq!(d.sequence, 123456);
1047        } else {
1048            panic!("Expected Deltas");
1049        }
1050    }
1051
1052    #[rstest]
1053    fn test_parse_ticker_vec() {
1054        let json_data = load_test_json("ws_tickers.json");
1055        let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1056        let data_val: serde_json::Value = match event {
1057            OKXWebSocketEvent::Data { data, .. } => data,
1058            _ => panic!("Expected Data"),
1059        };
1060
1061        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1062        let quotes_vec =
1063            parse_ticker_msg_vec(data_val, &instrument_id, 8, 1, UnixNanos::default()).unwrap();
1064
1065        assert_eq!(quotes_vec.len(), 1);
1066
1067        if let Data::Quote(q) = &quotes_vec[0] {
1068            assert_eq!(q.bid_price, Price::from("8888.88000000"));
1069            assert_eq!(q.ask_price, Price::from("9999.99"));
1070        } else {
1071            panic!("Expected Quote");
1072        }
1073    }
1074
1075    #[rstest]
1076    fn test_parse_trade_vec() {
1077        let json_data = load_test_json("ws_trades.json");
1078        let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1079        let data_val: serde_json::Value = match event {
1080            OKXWebSocketEvent::Data { data, .. } => data,
1081            _ => panic!("Expected Data"),
1082        };
1083
1084        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1085        let trades_vec =
1086            parse_trade_msg_vec(data_val, &instrument_id, 8, 1, UnixNanos::default()).unwrap();
1087
1088        assert_eq!(trades_vec.len(), 1);
1089
1090        if let Data::Trade(t) = &trades_vec[0] {
1091            assert_eq!(t.trade_id, TradeId::new("130639474"));
1092        } else {
1093            panic!("Expected Trade");
1094        }
1095    }
1096
1097    #[rstest]
1098    fn test_parse_candle_vec() {
1099        let json_data = load_test_json("ws_candle.json");
1100        let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1101        let data_val: serde_json::Value = match event {
1102            OKXWebSocketEvent::Data { data, .. } => data,
1103            _ => panic!("Expected Data"),
1104        };
1105
1106        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1107        let bars_vec = parse_candle_msg_vec(
1108            data_val,
1109            &instrument_id,
1110            2,
1111            1,
1112            BAR_SPEC_1_DAY_LAST,
1113            UnixNanos::default(),
1114        )
1115        .unwrap();
1116
1117        assert_eq!(bars_vec.len(), 1);
1118
1119        if let Data::Bar(b) = &bars_vec[0] {
1120            assert_eq!(b.open, Price::from("8533.02"));
1121        } else {
1122            panic!("Expected Bar");
1123        }
1124    }
1125
1126    #[rstest]
1127    fn test_parse_book_message() {
1128        let json_data = load_test_json("ws_bbo_tbt.json");
1129        let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1130        let (okx_books, arg): (Vec<OKXBookMsg>, OKXWebSocketArg) = match msg {
1131            OKXWebSocketEvent::Data { data, arg, .. } => {
1132                (serde_json::from_value(data).unwrap(), arg)
1133            }
1134            _ => panic!("Expected a `Data` variant"),
1135        };
1136
1137        assert_eq!(arg.channel, OKXWsChannel::BboTbt);
1138        assert_eq!(arg.inst_id.as_ref().unwrap(), &Ustr::from("BTC-USDT"));
1139        assert_eq!(arg.inst_type, None);
1140        assert_eq!(okx_books.len(), 1);
1141
1142        let book_msg = &okx_books[0];
1143
1144        // Check asks
1145        assert_eq!(book_msg.asks.len(), 1);
1146        let ask = &book_msg.asks[0];
1147        assert_eq!(ask.price, "8476.98");
1148        assert_eq!(ask.size, "415");
1149        assert_eq!(ask.liquidated_orders_count, "0");
1150        assert_eq!(ask.orders_count, "13");
1151
1152        // Check bids
1153        assert_eq!(book_msg.bids.len(), 1);
1154        let bid = &book_msg.bids[0];
1155        assert_eq!(bid.price, "8476.97");
1156        assert_eq!(bid.size, "256");
1157        assert_eq!(bid.liquidated_orders_count, "0");
1158        assert_eq!(bid.orders_count, "12");
1159        assert_eq!(book_msg.ts, 1597026383085);
1160        assert_eq!(book_msg.seq_id, 123456);
1161        assert_eq!(book_msg.checksum, None);
1162        assert_eq!(book_msg.prev_seq_id, None);
1163    }
1164
1165    #[rstest]
1166    fn test_parse_ws_account_message() {
1167        let json_data = load_test_json("ws_account.json");
1168        let accounts: Vec<OKXAccount> = serde_json::from_str(&json_data).unwrap();
1169
1170        assert_eq!(accounts.len(), 1);
1171        let account = &accounts[0];
1172
1173        assert_eq!(account.total_eq, "100.56089404807182");
1174        assert_eq!(account.details.len(), 3);
1175
1176        let usdt_detail = &account.details[0];
1177        assert_eq!(usdt_detail.ccy, "USDT");
1178        assert_eq!(usdt_detail.avail_bal, "100.52768569797846");
1179        assert_eq!(usdt_detail.cash_bal, "100.52768569797846");
1180
1181        let btc_detail = &account.details[1];
1182        assert_eq!(btc_detail.ccy, "BTC");
1183        assert_eq!(btc_detail.avail_bal, "0.0000000051");
1184
1185        let eth_detail = &account.details[2];
1186        assert_eq!(eth_detail.ccy, "ETH");
1187        assert_eq!(eth_detail.avail_bal, "0.000000185");
1188
1189        let account_id = AccountId::new("OKX-001");
1190        let ts_init = nautilus_core::nanos::UnixNanos::default();
1191        let account_state = parse_account_state(account, account_id, ts_init);
1192
1193        assert!(account_state.is_ok());
1194        let state = account_state.unwrap();
1195        assert_eq!(state.account_id, account_id);
1196        assert_eq!(state.balances.len(), 3);
1197    }
1198
1199    #[rstest]
1200    fn test_parse_order_msg() {
1201        let json_data = load_test_json("ws_orders.json");
1202        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
1203
1204        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
1205
1206        let account_id = AccountId::new("OKX-001");
1207        let mut instruments = AHashMap::new();
1208
1209        // Create a mock instrument for testing
1210        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1211        let instrument = CryptoPerpetual::new(
1212            instrument_id,
1213            Symbol::from("BTC-USDT-SWAP"),
1214            Currency::BTC(),
1215            Currency::USDT(),
1216            Currency::USDT(),
1217            false, // is_inverse
1218            2,     // price_precision
1219            8,     // size_precision
1220            Price::from("0.01"),
1221            Quantity::from("0.00000001"),
1222            None, // multiplier
1223            None, // lot_size
1224            None, // max_quantity
1225            None, // min_quantity
1226            None, // max_notional
1227            None, // min_notional
1228            None, // max_price
1229            None, // min_price
1230            None, // margin_init
1231            None, // margin_maint
1232            None, // maker_fee
1233            None, // taker_fee
1234            UnixNanos::default(),
1235            UnixNanos::default(),
1236        );
1237
1238        instruments.insert(
1239            Ustr::from("BTC-USDT-SWAP"),
1240            InstrumentAny::CryptoPerpetual(instrument),
1241        );
1242
1243        let ts_init = UnixNanos::default();
1244        let fee_cache = AHashMap::new();
1245
1246        let result = parse_order_msg_vec(data, account_id, &instruments, &fee_cache, ts_init);
1247
1248        assert!(result.is_ok());
1249        let order_reports = result.unwrap();
1250        assert_eq!(order_reports.len(), 1);
1251
1252        // Verify the parsed order report
1253        let report = &order_reports[0];
1254
1255        if let ExecutionReport::Fill(fill_report) = report {
1256            assert_eq!(fill_report.account_id, account_id);
1257            assert_eq!(fill_report.instrument_id, instrument_id);
1258            assert_eq!(
1259                fill_report.client_order_id,
1260                Some(ClientOrderId::new("001BTCUSDT20250106001"))
1261            );
1262            assert_eq!(
1263                fill_report.venue_order_id,
1264                VenueOrderId::new("2497956918703120384")
1265            );
1266            assert_eq!(fill_report.trade_id, TradeId::from("1518905529"));
1267            assert_eq!(fill_report.order_side, OrderSide::Buy);
1268            assert_eq!(fill_report.last_px, Price::from("103698.90"));
1269            assert_eq!(fill_report.last_qty, Quantity::from("0.03000000"));
1270            assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
1271        } else {
1272            panic!("Expected Fill report for filled order");
1273        }
1274    }
1275
1276    #[rstest]
1277    fn test_parse_order_status_report() {
1278        let json_data = load_test_json("ws_orders.json");
1279        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
1280        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
1281        let order_msg = &data[0];
1282
1283        let account_id = AccountId::new("OKX-001");
1284        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1285        let instrument = CryptoPerpetual::new(
1286            instrument_id,
1287            Symbol::from("BTC-USDT-SWAP"),
1288            Currency::BTC(),
1289            Currency::USDT(),
1290            Currency::USDT(),
1291            false, // is_inverse
1292            2,     // price_precision
1293            8,     // size_precision
1294            Price::from("0.01"),
1295            Quantity::from("0.00000001"),
1296            None,
1297            None,
1298            None,
1299            None,
1300            None,
1301            None,
1302            None,
1303            None,
1304            None,
1305            None,
1306            None,
1307            None,
1308            UnixNanos::default(),
1309            UnixNanos::default(),
1310        );
1311
1312        let ts_init = UnixNanos::default();
1313
1314        let result = parse_order_status_report(
1315            order_msg,
1316            &InstrumentAny::CryptoPerpetual(instrument),
1317            account_id,
1318            ts_init,
1319        );
1320
1321        assert!(result.is_ok());
1322        let order_status_report = result.unwrap();
1323
1324        assert_eq!(order_status_report.account_id, account_id);
1325        assert_eq!(order_status_report.instrument_id, instrument_id);
1326        assert_eq!(
1327            order_status_report.client_order_id,
1328            Some(ClientOrderId::new("001BTCUSDT20250106001"))
1329        );
1330        assert_eq!(
1331            order_status_report.venue_order_id,
1332            VenueOrderId::new("2497956918703120384")
1333        );
1334        assert_eq!(order_status_report.order_side, OrderSide::Buy);
1335        assert_eq!(order_status_report.order_status, OrderStatus::Filled);
1336        assert_eq!(order_status_report.quantity, Quantity::from("0.03000000"));
1337        assert_eq!(order_status_report.filled_qty, Quantity::from("0.03000000"));
1338    }
1339
1340    #[rstest]
1341    fn test_parse_fill_report() {
1342        let json_data = load_test_json("ws_orders.json");
1343        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
1344        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
1345        let order_msg = &data[0];
1346
1347        let account_id = AccountId::new("OKX-001");
1348        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1349        let instrument = CryptoPerpetual::new(
1350            instrument_id,
1351            Symbol::from("BTC-USDT-SWAP"),
1352            Currency::BTC(),
1353            Currency::USDT(),
1354            Currency::USDT(),
1355            false, // is_inverse
1356            2,     // price_precision
1357            8,     // size_precision
1358            Price::from("0.01"),
1359            Quantity::from("0.00000001"),
1360            None,
1361            None,
1362            None,
1363            None,
1364            None,
1365            None,
1366            None,
1367            None,
1368            None,
1369            None,
1370            None,
1371            None,
1372            UnixNanos::default(),
1373            UnixNanos::default(),
1374        );
1375
1376        let ts_init = UnixNanos::default();
1377
1378        let result = parse_fill_report(
1379            order_msg,
1380            &InstrumentAny::CryptoPerpetual(instrument),
1381            account_id,
1382            None,
1383            ts_init,
1384        );
1385
1386        assert!(result.is_ok());
1387        let fill_report = result.unwrap();
1388
1389        assert_eq!(fill_report.account_id, account_id);
1390        assert_eq!(fill_report.instrument_id, instrument_id);
1391        assert_eq!(
1392            fill_report.client_order_id,
1393            Some(ClientOrderId::new("001BTCUSDT20250106001"))
1394        );
1395        assert_eq!(
1396            fill_report.venue_order_id,
1397            VenueOrderId::new("2497956918703120384")
1398        );
1399        assert_eq!(fill_report.trade_id, TradeId::from("1518905529"));
1400        assert_eq!(fill_report.order_side, OrderSide::Buy);
1401        assert_eq!(fill_report.last_px, Price::from("103698.90"));
1402        assert_eq!(fill_report.last_qty, Quantity::from("0.03000000"));
1403        assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
1404    }
1405
1406    #[rstest]
1407    fn test_parse_book10_msg() {
1408        let json_data = load_test_json("ws_books_snapshot.json");
1409        let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1410        let msgs: Vec<OKXBookMsg> = match event {
1411            OKXWebSocketEvent::BookData { data, .. } => data,
1412            _ => panic!("Expected BookData"),
1413        };
1414
1415        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1416        let depth10 =
1417            parse_book10_msg(&msgs[0], instrument_id, 2, 0, UnixNanos::default()).unwrap();
1418
1419        assert_eq!(depth10.instrument_id, instrument_id);
1420        assert_eq!(depth10.sequence, 123456);
1421        assert_eq!(depth10.ts_event, UnixNanos::from(1597026383085000000));
1422        assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
1423
1424        // Check bid levels (available in test data: 8 levels)
1425        assert_eq!(depth10.bids[0].price, Price::from("8476.97"));
1426        assert_eq!(depth10.bids[0].size, Quantity::from("256"));
1427        assert_eq!(depth10.bids[0].side, OrderSide::Buy);
1428        assert_eq!(depth10.bid_counts[0], 12);
1429
1430        assert_eq!(depth10.bids[1].price, Price::from("8475.55"));
1431        assert_eq!(depth10.bids[1].size, Quantity::from("101"));
1432        assert_eq!(depth10.bid_counts[1], 1);
1433
1434        // Check that levels beyond available data are padded with empty orders
1435        assert_eq!(depth10.bids[8].price, Price::from("0"));
1436        assert_eq!(depth10.bids[8].size, Quantity::from("0"));
1437        assert_eq!(depth10.bid_counts[8], 0);
1438
1439        // Check ask levels (available in test data: 8 levels)
1440        assert_eq!(depth10.asks[0].price, Price::from("8476.98"));
1441        assert_eq!(depth10.asks[0].size, Quantity::from("415"));
1442        assert_eq!(depth10.asks[0].side, OrderSide::Sell);
1443        assert_eq!(depth10.ask_counts[0], 13);
1444
1445        assert_eq!(depth10.asks[1].price, Price::from("8477.00"));
1446        assert_eq!(depth10.asks[1].size, Quantity::from("7"));
1447        assert_eq!(depth10.ask_counts[1], 2);
1448
1449        // Check that levels beyond available data are padded with empty orders
1450        assert_eq!(depth10.asks[8].price, Price::from("0"));
1451        assert_eq!(depth10.asks[8].size, Quantity::from("0"));
1452        assert_eq!(depth10.ask_counts[8], 0);
1453    }
1454
1455    #[rstest]
1456    fn test_parse_book10_msg_vec() {
1457        let json_data = load_test_json("ws_books_snapshot.json");
1458        let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1459        let msgs: Vec<OKXBookMsg> = match event {
1460            OKXWebSocketEvent::BookData { data, .. } => data,
1461            _ => panic!("Expected BookData"),
1462        };
1463
1464        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1465        let depth10_vec =
1466            parse_book10_msg_vec(msgs, &instrument_id, 2, 0, UnixNanos::default()).unwrap();
1467
1468        assert_eq!(depth10_vec.len(), 1);
1469
1470        if let Data::Depth10(d) = &depth10_vec[0] {
1471            assert_eq!(d.instrument_id, instrument_id);
1472            assert_eq!(d.sequence, 123456);
1473            assert_eq!(d.bids[0].price, Price::from("8476.97"));
1474            assert_eq!(d.asks[0].price, Price::from("8476.98"));
1475        } else {
1476            panic!("Expected Depth10");
1477        }
1478    }
1479
1480    #[rstest]
1481    fn test_parse_fill_report_with_fee_cache() {
1482        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1483        let instrument = CryptoPerpetual::new(
1484            instrument_id,
1485            Symbol::from("BTC-USDT-SWAP"),
1486            Currency::BTC(),
1487            Currency::USDT(),
1488            Currency::USDT(),
1489            false, // is_inverse
1490            2,     // price_precision
1491            8,     // size_precision
1492            Price::from("0.01"),
1493            Quantity::from("0.00000001"),
1494            None, // multiplier
1495            None, // lot_size
1496            None, // max_quantity
1497            None, // min_quantity
1498            None, // max_notional
1499            None, // min_notional
1500            None, // max_price
1501            None, // min_price
1502            None, // margin_init
1503            None, // margin_maint
1504            None, // maker_fee
1505            None, // taker_fee
1506            UnixNanos::default(),
1507            UnixNanos::default(),
1508        );
1509
1510        let account_id = AccountId::new("OKX-001");
1511        let ts_init = UnixNanos::default();
1512
1513        // First fill: 0.01 BTC out of 0.03 BTC total (1/3)
1514        let order_msg_1 = OKXOrderMsg {
1515            acc_fill_sz: Some("0.01".to_string()),
1516            algo_cl_ord_id: None,
1517            algo_id: None,
1518            avg_px: "50000.0".to_string(),
1519            c_time: 1746947317401,
1520            cancel_source: None,
1521            cancel_source_reason: None,
1522            category: Ustr::from("normal"),
1523            ccy: Ustr::from("USDT"),
1524            cl_ord_id: "test_order_1".to_string(),
1525            fee: Some("-1.0".to_string()), // Total fee so far
1526            fee_ccy: Ustr::from("USDT"),
1527            fill_px: "50000.0".to_string(),
1528            fill_sz: "0.01".to_string(),
1529            fill_time: 1746947317402,
1530            inst_id: Ustr::from("BTC-USDT-SWAP"),
1531            inst_type: crate::common::enums::OKXInstrumentType::Swap,
1532            lever: "2.0".to_string(),
1533            ord_id: Ustr::from("1234567890"),
1534            ord_type: OKXOrderType::Market,
1535            pnl: "0".to_string(),
1536            pos_side: Ustr::from("long"),
1537            px: "".to_string(),
1538            reduce_only: "false".to_string(),
1539            side: crate::common::enums::OKXSide::Buy,
1540            state: crate::common::enums::OKXOrderStatus::PartiallyFilled,
1541            exec_type: crate::common::enums::OKXExecType::Maker,
1542            sz: "0.03".to_string(), // Total order size
1543            td_mode: OKXTradeMode::Isolated,
1544            trade_id: "trade_1".to_string(),
1545            u_time: 1746947317402,
1546        };
1547
1548        let fill_report_1 = parse_fill_report(
1549            &order_msg_1,
1550            &InstrumentAny::CryptoPerpetual(instrument),
1551            account_id,
1552            None,
1553            ts_init,
1554        )
1555        .unwrap();
1556
1557        // First fill should get the full fee since there's no previous fee
1558        assert_eq!(fill_report_1.commission, Money::new(1.0, Currency::USDT()));
1559
1560        // Second fill: 0.02 BTC more, now 0.03 BTC total (completely filled)
1561        let order_msg_2 = OKXOrderMsg {
1562            acc_fill_sz: Some("0.03".to_string()),
1563            algo_cl_ord_id: None,
1564            algo_id: None,
1565            avg_px: "50000.0".to_string(),
1566            c_time: 1746947317401,
1567            cancel_source: None,
1568            cancel_source_reason: None,
1569            category: Ustr::from("normal"),
1570            ccy: Ustr::from("USDT"),
1571            cl_ord_id: "test_order_1".to_string(),
1572            fee: Some("-3.0".to_string()), // Same total fee
1573            fee_ccy: Ustr::from("USDT"),
1574            fill_px: "50000.0".to_string(),
1575            fill_sz: "0.02".to_string(),
1576            fill_time: 1746947317403,
1577            inst_id: Ustr::from("BTC-USDT-SWAP"),
1578            inst_type: crate::common::enums::OKXInstrumentType::Swap,
1579            lever: "2.0".to_string(),
1580            ord_id: Ustr::from("1234567890"),
1581            ord_type: OKXOrderType::Market,
1582            pnl: "0".to_string(),
1583            pos_side: Ustr::from("long"),
1584            px: "".to_string(),
1585            reduce_only: "false".to_string(),
1586            side: crate::common::enums::OKXSide::Buy,
1587            state: crate::common::enums::OKXOrderStatus::Filled,
1588            exec_type: crate::common::enums::OKXExecType::Maker,
1589            sz: "0.03".to_string(), // Same total order size
1590            td_mode: OKXTradeMode::Isolated,
1591            trade_id: "trade_2".to_string(),
1592            u_time: 1746947317403,
1593        };
1594
1595        let fill_report_2 = parse_fill_report(
1596            &order_msg_2,
1597            &InstrumentAny::CryptoPerpetual(instrument),
1598            account_id,
1599            Some(fill_report_1.commission),
1600            ts_init,
1601        )
1602        .unwrap();
1603
1604        // Second fill should get total_fee - previous_fee = 3.0 - 1.0 = 2.0
1605        assert_eq!(fill_report_2.commission, Money::new(2.0, Currency::USDT()));
1606
1607        // Test passed - fee was correctly split proportionally
1608    }
1609
1610    #[rstest]
1611    fn test_parse_book10_msg_partial_levels() {
1612        // Test with fewer than 10 levels - should pad with empty orders
1613        let book_msg = OKXBookMsg {
1614            asks: vec![
1615                OrderBookEntry {
1616                    price: "8476.98".to_string(),
1617                    size: "415".to_string(),
1618                    liquidated_orders_count: "0".to_string(),
1619                    orders_count: "13".to_string(),
1620                },
1621                OrderBookEntry {
1622                    price: "8477.00".to_string(),
1623                    size: "7".to_string(),
1624                    liquidated_orders_count: "0".to_string(),
1625                    orders_count: "2".to_string(),
1626                },
1627            ],
1628            bids: vec![OrderBookEntry {
1629                price: "8476.97".to_string(),
1630                size: "256".to_string(),
1631                liquidated_orders_count: "0".to_string(),
1632                orders_count: "12".to_string(),
1633            }],
1634            ts: 1597026383085,
1635            checksum: None,
1636            prev_seq_id: None,
1637            seq_id: 123456,
1638        };
1639
1640        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1641        let depth10 =
1642            parse_book10_msg(&book_msg, instrument_id, 2, 0, UnixNanos::default()).unwrap();
1643
1644        // Check that first levels have data
1645        assert_eq!(depth10.bids[0].price, Price::from("8476.97"));
1646        assert_eq!(depth10.bids[0].size, Quantity::from("256"));
1647        assert_eq!(depth10.bid_counts[0], 12);
1648
1649        // Check that remaining levels are padded with default (empty) orders
1650        assert_eq!(depth10.bids[1].price, Price::from("0"));
1651        assert_eq!(depth10.bids[1].size, Quantity::from("0"));
1652        assert_eq!(depth10.bid_counts[1], 0);
1653
1654        // Check asks
1655        assert_eq!(depth10.asks[0].price, Price::from("8476.98"));
1656        assert_eq!(depth10.asks[1].price, Price::from("8477.00"));
1657        assert_eq!(depth10.asks[2].price, Price::from("0")); // padded with empty
1658    }
1659}