nautilus_okx/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Functions translating raw OKX WebSocket frames into Nautilus data types.
17
18use std::str::FromStr;
19
20use ahash::AHashMap;
21use nautilus_core::{UUID4, nanos::UnixNanos};
22use nautilus_model::{
23    data::{
24        Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
25        MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API, OrderBookDepth10,
26        QuoteTick, TradeTick, depth::DEPTH10_LEN,
27    },
28    enums::{
29        AggregationSource, AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus,
30        OrderType, RecordFlag, TimeInForce, TriggerType,
31    },
32    identifiers::{AccountId, InstrumentId, TradeId, VenueOrderId},
33    instruments::{Instrument, InstrumentAny},
34    reports::{FillReport, OrderStatusReport},
35    types::{Money, Price, Quantity},
36};
37use rust_decimal::Decimal;
38use ustr::Ustr;
39
40use super::{
41    enums::OKXWsChannel,
42    messages::{
43        OKXAlgoOrderMsg, OKXBookMsg, OKXCandleMsg, OKXIndexPriceMsg, OKXMarkPriceMsg, OKXOrderMsg,
44        OKXTickerMsg, OKXTradeMsg, OrderBookEntry,
45    },
46};
47use crate::{
48    common::{
49        consts::{OKX_POST_ONLY_CANCEL_REASON, OKX_POST_ONLY_CANCEL_SOURCE},
50        enums::{
51            OKXBookAction, OKXCandleConfirm, OKXInstrumentType, OKXOrderCategory, OKXOrderStatus,
52            OKXOrderType, OKXSide, OKXTargetCurrency, OKXTriggerType,
53        },
54        models::OKXInstrument,
55        parse::{
56            determine_order_type, is_market_price, okx_channel_to_bar_spec, parse_client_order_id,
57            parse_fee, parse_fee_currency, parse_funding_rate_msg, parse_instrument_any,
58            parse_message_vec, parse_millisecond_timestamp, parse_price, parse_quantity,
59        },
60    },
61    websocket::messages::{ExecutionReport, NautilusWsMessage, OKXFundingRateMsg},
62};
63
64/// Extracts fee rates from a cached instrument.
65///
66/// Returns a tuple of (margin_init, margin_maint, maker_fee, taker_fee).
67/// All values are None if the instrument type doesn't support fees.
68fn extract_fees_from_cached_instrument(
69    instrument: &InstrumentAny,
70) -> (
71    Option<Decimal>,
72    Option<Decimal>,
73    Option<Decimal>,
74    Option<Decimal>,
75) {
76    match instrument {
77        InstrumentAny::CurrencyPair(pair) => (
78            Some(pair.margin_init),
79            Some(pair.margin_maint),
80            Some(pair.maker_fee),
81            Some(pair.taker_fee),
82        ),
83        InstrumentAny::CryptoPerpetual(perp) => (
84            Some(perp.margin_init),
85            Some(perp.margin_maint),
86            Some(perp.maker_fee),
87            Some(perp.taker_fee),
88        ),
89        InstrumentAny::CryptoFuture(future) => (
90            Some(future.margin_init),
91            Some(future.margin_maint),
92            Some(future.maker_fee),
93            Some(future.taker_fee),
94        ),
95        InstrumentAny::CryptoOption(option) => (
96            Some(option.margin_init),
97            Some(option.margin_maint),
98            Some(option.maker_fee),
99            Some(option.taker_fee),
100        ),
101        _ => (None, None, None, None),
102    }
103}
104
105/// Parses vector of OKX book messages into Nautilus order book deltas.
106///
107/// # Errors
108///
109/// Returns an error if any underlying book message cannot be parsed.
110pub fn parse_book_msg_vec(
111    data: Vec<OKXBookMsg>,
112    instrument_id: &InstrumentId,
113    price_precision: u8,
114    size_precision: u8,
115    action: OKXBookAction,
116    ts_init: UnixNanos,
117) -> anyhow::Result<Vec<Data>> {
118    let mut deltas = Vec::with_capacity(data.len());
119
120    for msg in data {
121        let deltas_api = OrderBookDeltas_API::new(parse_book_msg(
122            &msg,
123            *instrument_id,
124            price_precision,
125            size_precision,
126            &action,
127            ts_init,
128        )?);
129        deltas.push(Data::Deltas(deltas_api));
130    }
131
132    Ok(deltas)
133}
134
135/// Parses vector of OKX ticker messages into Nautilus quote ticks.
136///
137/// # Errors
138///
139/// Returns an error if any ticker message fails to parse.
140pub fn parse_ticker_msg_vec(
141    data: serde_json::Value,
142    instrument_id: &InstrumentId,
143    price_precision: u8,
144    size_precision: u8,
145    ts_init: UnixNanos,
146) -> anyhow::Result<Vec<Data>> {
147    parse_message_vec(
148        data,
149        |msg| {
150            parse_ticker_msg(
151                msg,
152                *instrument_id,
153                price_precision,
154                size_precision,
155                ts_init,
156            )
157        },
158        Data::Quote,
159    )
160}
161
162/// Parses vector of OKX book messages into Nautilus quote ticks.
163///
164/// # Errors
165///
166/// Returns an error if any quote message fails to parse.
167pub fn parse_quote_msg_vec(
168    data: serde_json::Value,
169    instrument_id: &InstrumentId,
170    price_precision: u8,
171    size_precision: u8,
172    ts_init: UnixNanos,
173) -> anyhow::Result<Vec<Data>> {
174    parse_message_vec(
175        data,
176        |msg| {
177            parse_quote_msg(
178                msg,
179                *instrument_id,
180                price_precision,
181                size_precision,
182                ts_init,
183            )
184        },
185        Data::Quote,
186    )
187}
188
189/// Parses vector of OKX trade messages into Nautilus trade ticks.
190///
191/// # Errors
192///
193/// Returns an error if any trade message fails to parse.
194pub fn parse_trade_msg_vec(
195    data: serde_json::Value,
196    instrument_id: &InstrumentId,
197    price_precision: u8,
198    size_precision: u8,
199    ts_init: UnixNanos,
200) -> anyhow::Result<Vec<Data>> {
201    parse_message_vec(
202        data,
203        |msg| {
204            parse_trade_msg(
205                msg,
206                *instrument_id,
207                price_precision,
208                size_precision,
209                ts_init,
210            )
211        },
212        Data::Trade,
213    )
214}
215
216/// Parses vector of OKX mark price messages into Nautilus mark price updates.
217///
218/// # Errors
219///
220/// Returns an error if any mark price message fails to parse.
221pub fn parse_mark_price_msg_vec(
222    data: serde_json::Value,
223    instrument_id: &InstrumentId,
224    price_precision: u8,
225    ts_init: UnixNanos,
226) -> anyhow::Result<Vec<Data>> {
227    parse_message_vec(
228        data,
229        |msg| parse_mark_price_msg(msg, *instrument_id, price_precision, ts_init),
230        Data::MarkPriceUpdate,
231    )
232}
233
234/// Parses vector of OKX index price messages into Nautilus index price updates.
235///
236/// # Errors
237///
238/// Returns an error if any index price message fails to parse.
239pub fn parse_index_price_msg_vec(
240    data: serde_json::Value,
241    instrument_id: &InstrumentId,
242    price_precision: u8,
243    ts_init: UnixNanos,
244) -> anyhow::Result<Vec<Data>> {
245    parse_message_vec(
246        data,
247        |msg| parse_index_price_msg(msg, *instrument_id, price_precision, ts_init),
248        Data::IndexPriceUpdate,
249    )
250}
251
252/// Parses vector of OKX funding rate messages into Nautilus funding rate updates.
253/// Includes caching to filter out duplicate funding rates.
254///
255/// # Errors
256///
257/// Returns an error if any funding rate message fails to parse.
258pub fn parse_funding_rate_msg_vec(
259    data: serde_json::Value,
260    instrument_id: &InstrumentId,
261    ts_init: UnixNanos,
262    funding_cache: &mut AHashMap<Ustr, (Ustr, u64)>,
263) -> anyhow::Result<Vec<FundingRateUpdate>> {
264    let msgs: Vec<OKXFundingRateMsg> = serde_json::from_value(data)?;
265
266    let mut result = Vec::with_capacity(msgs.len());
267    for msg in &msgs {
268        let cache_key = (msg.funding_rate, msg.funding_time);
269
270        if let Some(cached) = funding_cache.get(&msg.inst_id)
271            && *cached == cache_key
272        {
273            continue; // Skip duplicate
274        }
275
276        // New or changed funding rate, update cache and parse
277        funding_cache.insert(msg.inst_id, cache_key);
278        let funding_rate = parse_funding_rate_msg(msg, *instrument_id, ts_init)?;
279        result.push(funding_rate);
280    }
281
282    Ok(result)
283}
284
285/// Parses vector of OKX candle messages into Nautilus bars.
286///
287/// # Errors
288///
289/// Returns an error if candle messages cannot be deserialized or parsed.
290pub fn parse_candle_msg_vec(
291    data: serde_json::Value,
292    instrument_id: &InstrumentId,
293    price_precision: u8,
294    size_precision: u8,
295    spec: BarSpecification,
296    ts_init: UnixNanos,
297) -> anyhow::Result<Vec<Data>> {
298    let msgs: Vec<OKXCandleMsg> = serde_json::from_value(data)?;
299    let bar_type = BarType::new(*instrument_id, spec, AggregationSource::External);
300    let mut bars = Vec::with_capacity(msgs.len());
301
302    for msg in msgs {
303        // Only process completed candles to avoid duplicate/partial bars
304        if msg.confirm == OKXCandleConfirm::Closed {
305            let bar = parse_candle_msg(&msg, bar_type, price_precision, size_precision, ts_init)?;
306            bars.push(Data::Bar(bar));
307        }
308    }
309
310    Ok(bars)
311}
312
313/// Parses vector of OKX book messages into Nautilus depth10 updates.
314///
315/// # Errors
316///
317/// Returns an error if any book10 message fails to parse.
318pub fn parse_book10_msg_vec(
319    data: Vec<OKXBookMsg>,
320    instrument_id: &InstrumentId,
321    price_precision: u8,
322    size_precision: u8,
323    ts_init: UnixNanos,
324) -> anyhow::Result<Vec<Data>> {
325    let mut depth10_updates = Vec::with_capacity(data.len());
326
327    for msg in data {
328        let depth10 = parse_book10_msg(
329            &msg,
330            *instrument_id,
331            price_precision,
332            size_precision,
333            ts_init,
334        )?;
335        depth10_updates.push(Data::Depth10(Box::new(depth10)));
336    }
337
338    Ok(depth10_updates)
339}
340
341/// Parses an OKX book message into Nautilus order book deltas.
342///
343/// # Errors
344///
345/// Returns an error if bid or ask levels contain values that cannot be parsed.
346pub fn parse_book_msg(
347    msg: &OKXBookMsg,
348    instrument_id: InstrumentId,
349    price_precision: u8,
350    size_precision: u8,
351    action: &OKXBookAction,
352    ts_init: UnixNanos,
353) -> anyhow::Result<OrderBookDeltas> {
354    let flags = if action == &OKXBookAction::Snapshot {
355        RecordFlag::F_SNAPSHOT as u8
356    } else {
357        0
358    };
359    let ts_event = parse_millisecond_timestamp(msg.ts);
360
361    let mut deltas = Vec::with_capacity(msg.asks.len() + msg.bids.len());
362
363    for bid in &msg.bids {
364        let book_action = match action {
365            OKXBookAction::Snapshot => BookAction::Add,
366            _ => match bid.size.as_str() {
367                "0" => BookAction::Delete,
368                _ => BookAction::Update,
369            },
370        };
371        let price = parse_price(&bid.price, price_precision)?;
372        let size = parse_quantity(&bid.size, size_precision)?;
373        let order_id = 0; // TBD
374        let order = BookOrder::new(OrderSide::Buy, price, size, order_id);
375        let delta = OrderBookDelta::new(
376            instrument_id,
377            book_action,
378            order,
379            flags,
380            msg.seq_id,
381            ts_event,
382            ts_init,
383        );
384        deltas.push(delta);
385    }
386
387    for ask in &msg.asks {
388        let book_action = match action {
389            OKXBookAction::Snapshot => BookAction::Add,
390            _ => match ask.size.as_str() {
391                "0" => BookAction::Delete,
392                _ => BookAction::Update,
393            },
394        };
395        let price = parse_price(&ask.price, price_precision)?;
396        let size = parse_quantity(&ask.size, size_precision)?;
397        let order_id = 0; // TBD
398        let order = BookOrder::new(OrderSide::Sell, price, size, order_id);
399        let delta = OrderBookDelta::new(
400            instrument_id,
401            book_action,
402            order,
403            flags,
404            msg.seq_id,
405            ts_event,
406            ts_init,
407        );
408        deltas.push(delta);
409    }
410
411    OrderBookDeltas::new_checked(instrument_id, deltas)
412}
413
414/// Parses an OKX book message into a Nautilus quote tick.
415///
416/// # Errors
417///
418/// Returns an error if any quote levels contain values that cannot be parsed.
419pub fn parse_quote_msg(
420    msg: &OKXBookMsg,
421    instrument_id: InstrumentId,
422    price_precision: u8,
423    size_precision: u8,
424    ts_init: UnixNanos,
425) -> anyhow::Result<QuoteTick> {
426    let best_bid: &OrderBookEntry = &msg.bids[0];
427    let best_ask: &OrderBookEntry = &msg.asks[0];
428
429    let bid_price = parse_price(&best_bid.price, price_precision)?;
430    let ask_price = parse_price(&best_ask.price, price_precision)?;
431    let bid_size = parse_quantity(&best_bid.size, size_precision)?;
432    let ask_size = parse_quantity(&best_ask.size, size_precision)?;
433    let ts_event = parse_millisecond_timestamp(msg.ts);
434
435    QuoteTick::new_checked(
436        instrument_id,
437        bid_price,
438        ask_price,
439        bid_size,
440        ask_size,
441        ts_event,
442        ts_init,
443    )
444}
445
446/// Parses an OKX book message into a Nautilus [`OrderBookDepth10`].
447///
448/// Converts order book data into a fixed-depth snapshot with top 10 levels for both sides.
449///
450/// # Errors
451///
452/// Returns an error if price or size fields cannot be parsed for any level.
453pub fn parse_book10_msg(
454    msg: &OKXBookMsg,
455    instrument_id: InstrumentId,
456    price_precision: u8,
457    size_precision: u8,
458    ts_init: UnixNanos,
459) -> anyhow::Result<OrderBookDepth10> {
460    // Initialize arrays - need to fill all 10 levels even if we have fewer
461    let mut bids: [BookOrder; DEPTH10_LEN] = [BookOrder::default(); DEPTH10_LEN];
462    let mut asks: [BookOrder; DEPTH10_LEN] = [BookOrder::default(); DEPTH10_LEN];
463    let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
464    let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
465
466    // Parse available bid levels (up to 10)
467    let bid_len = msg.bids.len().min(DEPTH10_LEN);
468    for (i, level) in msg.bids.iter().take(DEPTH10_LEN).enumerate() {
469        let price = parse_price(&level.price, price_precision)?;
470        let size = parse_quantity(&level.size, size_precision)?;
471        let orders_count = level.orders_count.parse::<u32>().unwrap_or(1);
472
473        let bid_order = BookOrder::new(OrderSide::Buy, price, size, 0);
474        bids[i] = bid_order;
475        bid_counts[i] = orders_count;
476    }
477
478    // Fill remaining bid slots with empty Buy orders (not NULL orders)
479    for i in bid_len..DEPTH10_LEN {
480        bids[i] = BookOrder::new(
481            OrderSide::Buy,
482            Price::zero(price_precision),
483            Quantity::zero(size_precision),
484            0,
485        );
486        bid_counts[i] = 0;
487    }
488
489    // Parse available ask levels (up to 10)
490    let ask_len = msg.asks.len().min(DEPTH10_LEN);
491    for (i, level) in msg.asks.iter().take(DEPTH10_LEN).enumerate() {
492        let price = parse_price(&level.price, price_precision)?;
493        let size = parse_quantity(&level.size, size_precision)?;
494        let orders_count = level.orders_count.parse::<u32>().unwrap_or(1);
495
496        let ask_order = BookOrder::new(OrderSide::Sell, price, size, 0);
497        asks[i] = ask_order;
498        ask_counts[i] = orders_count;
499    }
500
501    // Fill remaining ask slots with empty Sell orders (not NULL orders)
502    for i in ask_len..DEPTH10_LEN {
503        asks[i] = BookOrder::new(
504            OrderSide::Sell,
505            Price::zero(price_precision),
506            Quantity::zero(size_precision),
507            0,
508        );
509        ask_counts[i] = 0;
510    }
511
512    let ts_event = parse_millisecond_timestamp(msg.ts);
513
514    Ok(OrderBookDepth10::new(
515        instrument_id,
516        bids,
517        asks,
518        bid_counts,
519        ask_counts,
520        RecordFlag::F_SNAPSHOT as u8,
521        msg.seq_id, // Use sequence ID for OKX L2 books
522        ts_event,
523        ts_init,
524    ))
525}
526
527/// Parses an OKX ticker message into a Nautilus quote tick.
528///
529/// # Errors
530///
531/// Returns an error if bid or ask values cannot be parsed from the message.
532pub fn parse_ticker_msg(
533    msg: &OKXTickerMsg,
534    instrument_id: InstrumentId,
535    price_precision: u8,
536    size_precision: u8,
537    ts_init: UnixNanos,
538) -> anyhow::Result<QuoteTick> {
539    let bid_price = parse_price(&msg.bid_px, price_precision)?;
540    let ask_price = parse_price(&msg.ask_px, price_precision)?;
541    let bid_size = parse_quantity(&msg.bid_sz, size_precision)?;
542    let ask_size = parse_quantity(&msg.ask_sz, size_precision)?;
543    let ts_event = parse_millisecond_timestamp(msg.ts);
544
545    QuoteTick::new_checked(
546        instrument_id,
547        bid_price,
548        ask_price,
549        bid_size,
550        ask_size,
551        ts_event,
552        ts_init,
553    )
554}
555
556/// Parses an OKX trade message into a Nautilus trade tick.
557///
558/// # Errors
559///
560/// Returns an error if trade prices or sizes cannot be parsed.
561pub fn parse_trade_msg(
562    msg: &OKXTradeMsg,
563    instrument_id: InstrumentId,
564    price_precision: u8,
565    size_precision: u8,
566    ts_init: UnixNanos,
567) -> anyhow::Result<TradeTick> {
568    let price = parse_price(&msg.px, price_precision)?;
569    let size = parse_quantity(&msg.sz, size_precision)?;
570    let aggressor_side: AggressorSide = msg.side.into();
571    let trade_id = TradeId::new(&msg.trade_id);
572    let ts_event = parse_millisecond_timestamp(msg.ts);
573
574    TradeTick::new_checked(
575        instrument_id,
576        price,
577        size,
578        aggressor_side,
579        trade_id,
580        ts_event,
581        ts_init,
582    )
583}
584
585/// Parses an OKX mark price message into a Nautilus mark price update.
586///
587/// # Errors
588///
589/// Returns an error if the mark price fails to parse.
590pub fn parse_mark_price_msg(
591    msg: &OKXMarkPriceMsg,
592    instrument_id: InstrumentId,
593    price_precision: u8,
594    ts_init: UnixNanos,
595) -> anyhow::Result<MarkPriceUpdate> {
596    let price = parse_price(&msg.mark_px, price_precision)?;
597    let ts_event = parse_millisecond_timestamp(msg.ts);
598
599    Ok(MarkPriceUpdate::new(
600        instrument_id,
601        price,
602        ts_event,
603        ts_init,
604    ))
605}
606
607/// Parses an OKX index price message into a Nautilus index price update.
608///
609/// # Errors
610///
611/// Returns an error if the index price fails to parse.
612pub fn parse_index_price_msg(
613    msg: &OKXIndexPriceMsg,
614    instrument_id: InstrumentId,
615    price_precision: u8,
616    ts_init: UnixNanos,
617) -> anyhow::Result<IndexPriceUpdate> {
618    let price = parse_price(&msg.idx_px, price_precision)?;
619    let ts_event = parse_millisecond_timestamp(msg.ts);
620
621    Ok(IndexPriceUpdate::new(
622        instrument_id,
623        price,
624        ts_event,
625        ts_init,
626    ))
627}
628
629/// Parses an OKX candle message into a Nautilus bar.
630///
631/// # Errors
632///
633/// Returns an error if candle price or volume fields cannot be parsed.
634pub fn parse_candle_msg(
635    msg: &OKXCandleMsg,
636    bar_type: BarType,
637    price_precision: u8,
638    size_precision: u8,
639    ts_init: UnixNanos,
640) -> anyhow::Result<Bar> {
641    let open = parse_price(&msg.o, price_precision)?;
642    let high = parse_price(&msg.h, price_precision)?;
643    let low = parse_price(&msg.l, price_precision)?;
644    let close = parse_price(&msg.c, price_precision)?;
645    let volume = parse_quantity(&msg.vol, size_precision)?;
646    let ts_event = parse_millisecond_timestamp(msg.ts);
647
648    Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
649}
650
651/// Parses vector of OKX order messages into Nautilus execution reports.
652///
653/// # Errors
654///
655/// Returns an error if any contained order messages cannot be parsed.
656pub fn parse_order_msg_vec(
657    data: Vec<OKXOrderMsg>,
658    account_id: AccountId,
659    instruments: &AHashMap<Ustr, InstrumentAny>,
660    fee_cache: &AHashMap<Ustr, Money>,
661    filled_qty_cache: &AHashMap<Ustr, Quantity>,
662    ts_init: UnixNanos,
663) -> anyhow::Result<Vec<ExecutionReport>> {
664    let mut order_reports = Vec::with_capacity(data.len());
665
666    for msg in data {
667        match parse_order_msg(
668            &msg,
669            account_id,
670            instruments,
671            fee_cache,
672            filled_qty_cache,
673            ts_init,
674        ) {
675            Ok(report) => order_reports.push(report),
676            Err(e) => tracing::error!("Failed to parse execution report from message: {e}"),
677        }
678    }
679
680    Ok(order_reports)
681}
682
683/// Checks if acc_fill_sz has increased compared to the previous filled quantity.
684fn has_acc_fill_sz_increased(
685    acc_fill_sz: &Option<String>,
686    previous_filled_qty: Option<Quantity>,
687    size_precision: u8,
688) -> bool {
689    if let Some(acc_str) = acc_fill_sz {
690        if acc_str.is_empty() || acc_str == "0" {
691            return false;
692        }
693        if let Ok(current_filled) = parse_quantity(acc_str, size_precision) {
694            if let Some(prev_qty) = previous_filled_qty {
695                return current_filled > prev_qty;
696            }
697            return !current_filled.is_zero();
698        }
699    }
700    false
701}
702
703/// Parses a single OKX order message into an [`ExecutionReport`].
704///
705/// # Errors
706///
707/// Returns an error if the instrument cannot be found or if parsing the
708/// underlying order payload fails.
709pub fn parse_order_msg(
710    msg: &OKXOrderMsg,
711    account_id: AccountId,
712    instruments: &AHashMap<Ustr, InstrumentAny>,
713    fee_cache: &AHashMap<Ustr, Money>,
714    filled_qty_cache: &AHashMap<Ustr, Quantity>,
715    ts_init: UnixNanos,
716) -> anyhow::Result<ExecutionReport> {
717    let instrument = instruments
718        .get(&msg.inst_id)
719        .ok_or_else(|| anyhow::anyhow!("No instrument found for inst_id: {}", msg.inst_id))?;
720
721    let previous_fee = fee_cache.get(&msg.ord_id).copied();
722    let previous_filled_qty = filled_qty_cache.get(&msg.ord_id).copied();
723
724    let has_new_fill = (!msg.fill_sz.is_empty() && msg.fill_sz != "0")
725        || !msg.trade_id.is_empty()
726        || has_acc_fill_sz_increased(
727            &msg.acc_fill_sz,
728            previous_filled_qty,
729            instrument.size_precision(),
730        );
731
732    match msg.state {
733        OKXOrderStatus::Filled | OKXOrderStatus::PartiallyFilled if has_new_fill => {
734            parse_fill_report(
735                msg,
736                instrument,
737                account_id,
738                previous_fee,
739                previous_filled_qty,
740                ts_init,
741            )
742            .map(ExecutionReport::Fill)
743        }
744        _ => parse_order_status_report(msg, instrument, account_id, ts_init)
745            .map(ExecutionReport::Order),
746    }
747}
748
749/// Parses an OKX algo order message into a Nautilus execution report.
750///
751/// # Errors
752///
753/// Returns an error if the instrument cannot be found or if message fields
754/// fail to parse.
755pub fn parse_algo_order_msg(
756    msg: OKXAlgoOrderMsg,
757    account_id: AccountId,
758    instruments: &AHashMap<Ustr, InstrumentAny>,
759    ts_init: UnixNanos,
760) -> anyhow::Result<ExecutionReport> {
761    let inst = instruments
762        .get(&msg.inst_id)
763        .ok_or_else(|| anyhow::anyhow!("No instrument found for inst_id: {}", msg.inst_id))?;
764
765    // Algo orders primarily return status reports (not fills since they haven't been triggered yet)
766    parse_algo_order_status_report(&msg, inst, account_id, ts_init).map(ExecutionReport::Order)
767}
768
769/// Parses an OKX algo order message into a Nautilus order status report.
770///
771/// # Errors
772///
773/// Returns an error if any order identifiers or numeric fields cannot be
774/// parsed.
775pub fn parse_algo_order_status_report(
776    msg: &OKXAlgoOrderMsg,
777    instrument: &InstrumentAny,
778    account_id: AccountId,
779    ts_init: UnixNanos,
780) -> anyhow::Result<OrderStatusReport> {
781    // For algo orders, use algo_cl_ord_id if cl_ord_id is empty
782    let client_order_id = if msg.cl_ord_id.is_empty() {
783        parse_client_order_id(&msg.algo_cl_ord_id)
784    } else {
785        parse_client_order_id(&msg.cl_ord_id)
786    };
787
788    // For algo orders that haven't triggered, ord_id will be empty, use algo_id instead
789    let venue_order_id = if msg.ord_id.is_empty() {
790        VenueOrderId::new(msg.algo_id.as_str())
791    } else {
792        VenueOrderId::new(msg.ord_id.as_str())
793    };
794
795    let order_side: OrderSide = msg.side.into();
796
797    // Determine order type based on ord_px for conditional/stop orders
798    let order_type = if is_market_price(&msg.ord_px) {
799        OrderType::StopMarket
800    } else {
801        OrderType::StopLimit
802    };
803
804    let status: OrderStatus = msg.state.into();
805
806    let quantity = parse_quantity(msg.sz.as_str(), instrument.size_precision())?;
807
808    // For algo orders, actual_sz represents filled quantity (if any)
809    let filled_qty = if msg.actual_sz.is_empty() || msg.actual_sz == "0" {
810        Quantity::zero(instrument.size_precision())
811    } else {
812        parse_quantity(msg.actual_sz.as_str(), instrument.size_precision())?
813    };
814
815    let trigger_px = parse_price(msg.trigger_px.as_str(), instrument.price_precision())?;
816
817    // Parse limit price if it exists (not -1)
818    let price = if msg.ord_px != "-1" {
819        Some(parse_price(
820            msg.ord_px.as_str(),
821            instrument.price_precision(),
822        )?)
823    } else {
824        None
825    };
826
827    let trigger_type = match msg.trigger_px_type {
828        OKXTriggerType::Last => TriggerType::LastPrice,
829        OKXTriggerType::Mark => TriggerType::MarkPrice,
830        OKXTriggerType::Index => TriggerType::IndexPrice,
831        OKXTriggerType::None => TriggerType::Default,
832    };
833
834    let mut report = OrderStatusReport::new(
835        account_id,
836        instrument.id(),
837        client_order_id,
838        venue_order_id,
839        order_side,
840        order_type,
841        TimeInForce::Gtc, // Algo orders are typically GTC
842        status,
843        quantity,
844        filled_qty,
845        msg.c_time.into(), // ts_accepted
846        msg.u_time.into(), // ts_last
847        ts_init,
848        None, // report_id - auto-generated
849    );
850
851    report.trigger_price = Some(trigger_px);
852    report.trigger_type = Some(trigger_type);
853
854    if let Some(limit_price) = price {
855        report.price = Some(limit_price);
856    }
857
858    Ok(report)
859}
860
861/// Parses an OKX order message into a Nautilus order status report.
862///
863/// # Errors
864///
865/// Returns an error if order metadata or numeric values cannot be parsed.
866pub fn parse_order_status_report(
867    msg: &OKXOrderMsg,
868    instrument: &InstrumentAny,
869    account_id: AccountId,
870    ts_init: UnixNanos,
871) -> anyhow::Result<OrderStatusReport> {
872    let client_order_id = parse_client_order_id(&msg.cl_ord_id);
873    let venue_order_id = VenueOrderId::new(msg.ord_id);
874    let order_side: OrderSide = msg.side.into();
875
876    let okx_order_type = msg.ord_type;
877
878    // Determine order type based on presence of limit price for certain OKX order types
879    let order_type = match okx_order_type {
880        OKXOrderType::Trigger => {
881            if is_market_price(&msg.px) {
882                OrderType::StopMarket
883            } else {
884                OrderType::StopLimit
885            }
886        }
887        OKXOrderType::Fok | OKXOrderType::Ioc | OKXOrderType::OptimalLimitIoc => {
888            determine_order_type(okx_order_type, &msg.px)
889        }
890        _ => msg.ord_type.into(),
891    };
892    let order_status: OrderStatus = msg.state.into();
893
894    let time_in_force = match okx_order_type {
895        OKXOrderType::Fok => TimeInForce::Fok,
896        OKXOrderType::Ioc | OKXOrderType::OptimalLimitIoc => TimeInForce::Ioc,
897        _ => TimeInForce::Gtc,
898    };
899
900    let size_precision = instrument.size_precision();
901
902    // Parse quantities based on target currency
903    // OKX always returns acc_fill_sz in base currency, but sz depends on tgt_ccy
904
905    // Determine if this is a quote-quantity order
906    // Method 1: Explicit tgt_ccy field set to QuoteCcy
907    let is_quote_qty_explicit = msg.tgt_ccy == Some(OKXTargetCurrency::QuoteCcy);
908
909    // Method 2: Use OKX defaults when tgt_ccy is None (old orders or missing field)
910    // OKX API defaults for SPOT market orders: BUY orders use quote_ccy, SELL orders use base_ccy
911    // Note: tgtCcy only applies to SPOT market orders (not limit orders)
912    // For limit orders, sz is always in base currency regardless of side
913    let is_quote_qty_heuristic = msg.tgt_ccy.is_none()
914        && (msg.inst_type == OKXInstrumentType::Spot || msg.inst_type == OKXInstrumentType::Margin)
915        && msg.side == OKXSide::Buy
916        && order_type == OrderType::Market;
917
918    let (quantity, filled_qty) = if is_quote_qty_explicit || is_quote_qty_heuristic {
919        // Quote-quantity order: sz is in quote currency, need to convert to base
920        let sz_quote_dec = Decimal::from_str(&msg.sz).map_err(|e| {
921            anyhow::anyhow!("Failed to parse sz='{}' as quote quantity: {}", msg.sz, e)
922        })?;
923
924        // Determine the price to use for conversion
925        // Priority: 1) limit price (px) for limit orders, 2) avg_px for market orders
926        let conversion_price_dec =
927            if !is_market_price(&msg.px) {
928                // Limit order: use the limit price (msg.px)
929                Some(
930                    Decimal::from_str(&msg.px)
931                        .map_err(|e| anyhow::anyhow!("Failed to parse px='{}': {}", msg.px, e))?,
932                )
933            } else if !msg.avg_px.is_empty() && msg.avg_px != "0" {
934                // Market order with fills: use average fill price
935                Some(Decimal::from_str(&msg.avg_px).map_err(|e| {
936                    anyhow::anyhow!("Failed to parse avg_px='{}': {}", msg.avg_px, e)
937                })?)
938            } else {
939                None
940            };
941
942        // Convert quote quantity to base: quantity_base = sz_quote / price
943        let quantity_base = if let Some(price) = conversion_price_dec {
944            if !price.is_zero() {
945                Quantity::from_decimal_dp(sz_quote_dec / price, size_precision)?
946            } else {
947                parse_quantity(&msg.sz, size_precision)?
948            }
949        } else {
950            // No price available, can't convert - use sz as-is temporarily
951            // This will be corrected once the order gets filled and price is available
952            parse_quantity(&msg.sz, size_precision)?
953        };
954
955        let filled_qty =
956            parse_quantity(&msg.acc_fill_sz.clone().unwrap_or_default(), size_precision)?;
957
958        (quantity_base, filled_qty)
959    } else {
960        // Base-quantity order: both sz and acc_fill_sz are in base currency
961        let quantity = parse_quantity(&msg.sz, size_precision)?;
962        let filled_qty =
963            parse_quantity(&msg.acc_fill_sz.clone().unwrap_or_default(), size_precision)?;
964
965        (quantity, filled_qty)
966    };
967
968    // For quote-quantity orders marked as FILLED, adjust quantity to match filled_qty
969    // to avoid precision mismatches from quote-to-base conversion
970    let (quantity, filled_qty) = if (is_quote_qty_explicit || is_quote_qty_heuristic)
971        && msg.state == OKXOrderStatus::Filled
972        && filled_qty.is_positive()
973    {
974        (filled_qty, filled_qty)
975    } else {
976        (quantity, filled_qty)
977    };
978
979    let ts_accepted = parse_millisecond_timestamp(msg.c_time);
980    let ts_last = parse_millisecond_timestamp(msg.u_time);
981
982    let is_liquidation = matches!(
983        msg.category,
984        OKXOrderCategory::FullLiquidation | OKXOrderCategory::PartialLiquidation
985    );
986
987    let is_adl = msg.category == OKXOrderCategory::Adl;
988
989    if is_liquidation {
990        tracing::warn!(
991            order_id = msg.ord_id.as_str(),
992            category = ?msg.category,
993            inst_id = msg.inst_id.as_str(),
994            state = ?msg.state,
995            "Liquidation order status update"
996        );
997    }
998
999    if is_adl {
1000        tracing::warn!(
1001            order_id = msg.ord_id.as_str(),
1002            inst_id = msg.inst_id.as_str(),
1003            state = ?msg.state,
1004            "ADL (Auto-Deleveraging) order status update"
1005        );
1006    }
1007
1008    let mut report = OrderStatusReport::new(
1009        account_id,
1010        instrument.id(),
1011        client_order_id,
1012        venue_order_id,
1013        order_side,
1014        order_type,
1015        time_in_force,
1016        order_status,
1017        quantity,
1018        filled_qty,
1019        ts_accepted,
1020        ts_init,
1021        ts_last,
1022        None, // Generate UUID4 automatically
1023    );
1024
1025    let price_precision = instrument.price_precision();
1026
1027    if okx_order_type == OKXOrderType::Trigger {
1028        // For triggered orders coming through regular orders channel,
1029        // set the price if it's a stop-limit order
1030        if !is_market_price(&msg.px)
1031            && let Ok(price) = parse_price(&msg.px, price_precision)
1032        {
1033            report = report.with_price(price);
1034        }
1035    } else {
1036        // For regular orders, use px field
1037        if !is_market_price(&msg.px)
1038            && let Ok(price) = parse_price(&msg.px, price_precision)
1039        {
1040            report = report.with_price(price);
1041        }
1042    }
1043
1044    if !msg.avg_px.is_empty()
1045        && let Ok(avg_px) = msg.avg_px.parse::<f64>()
1046    {
1047        report = report.with_avg_px(avg_px)?;
1048    }
1049
1050    if matches!(
1051        msg.ord_type,
1052        OKXOrderType::PostOnly | OKXOrderType::MmpAndPostOnly
1053    ) || matches!(
1054        msg.cancel_source.as_deref(),
1055        Some(source) if source == OKX_POST_ONLY_CANCEL_SOURCE
1056    ) || matches!(
1057        msg.cancel_source_reason.as_deref(),
1058        Some(reason) if reason.contains("POST_ONLY")
1059    ) {
1060        report = report.with_post_only(true);
1061    }
1062
1063    if msg.reduce_only == "true" {
1064        report = report.with_reduce_only(true);
1065    }
1066
1067    if let Some(reason) = msg
1068        .cancel_source_reason
1069        .as_ref()
1070        .filter(|reason| !reason.is_empty())
1071    {
1072        report = report.with_cancel_reason(reason.clone());
1073    } else if let Some(source) = msg
1074        .cancel_source
1075        .as_ref()
1076        .filter(|source| !source.is_empty())
1077    {
1078        let reason = if source == OKX_POST_ONLY_CANCEL_SOURCE {
1079            OKX_POST_ONLY_CANCEL_REASON.to_string()
1080        } else {
1081            format!("cancel_source={source}")
1082        };
1083        report = report.with_cancel_reason(reason);
1084    }
1085
1086    Ok(report)
1087}
1088
1089/// Parses an OKX order message into a Nautilus fill report.
1090///
1091/// # Errors
1092///
1093/// Returns an error if order quantities, prices, or fees cannot be parsed.
1094pub fn parse_fill_report(
1095    msg: &OKXOrderMsg,
1096    instrument: &InstrumentAny,
1097    account_id: AccountId,
1098    previous_fee: Option<Money>,
1099    previous_filled_qty: Option<Quantity>,
1100    ts_init: UnixNanos,
1101) -> anyhow::Result<FillReport> {
1102    let client_order_id = parse_client_order_id(&msg.cl_ord_id);
1103    let venue_order_id = VenueOrderId::new(msg.ord_id);
1104
1105    // TODO: Extract to dedicated function:
1106    // OKX may not provide a trade_id, so generate a UUID4 as fallback
1107    let trade_id = if msg.trade_id.is_empty() {
1108        TradeId::from(UUID4::new().to_string().as_str())
1109    } else {
1110        TradeId::from(msg.trade_id.as_str())
1111    };
1112
1113    let order_side: OrderSide = msg.side.into();
1114
1115    let price_precision = instrument.price_precision();
1116    let size_precision = instrument.size_precision();
1117
1118    let price_str = if !msg.fill_px.is_empty() {
1119        &msg.fill_px
1120    } else if !msg.avg_px.is_empty() {
1121        &msg.avg_px
1122    } else {
1123        &msg.px
1124    };
1125    let last_px = parse_price(price_str, price_precision).map_err(|e| {
1126        anyhow::anyhow!(
1127            "Failed to parse price (fill_px='{}', avg_px='{}', px='{}'): {}",
1128            msg.fill_px,
1129            msg.avg_px,
1130            msg.px,
1131            e
1132        )
1133    })?;
1134
1135    // OKX provides fillSz (incremental fill) or accFillSz (cumulative total)
1136    // If fillSz is provided, use it directly as the incremental fill quantity
1137    let last_qty = if !msg.fill_sz.is_empty() && msg.fill_sz != "0" {
1138        parse_quantity(&msg.fill_sz, size_precision)
1139            .map_err(|e| anyhow::anyhow!("Failed to parse fill_sz='{}': {e}", msg.fill_sz,))?
1140    } else if let Some(ref acc_fill_sz) = msg.acc_fill_sz {
1141        // If fillSz is missing but accFillSz is available, calculate incremental fill
1142        if !acc_fill_sz.is_empty() && acc_fill_sz != "0" {
1143            let current_filled = parse_quantity(acc_fill_sz, size_precision).map_err(|e| {
1144                anyhow::anyhow!("Failed to parse acc_fill_sz='{}': {e}", acc_fill_sz,)
1145            })?;
1146
1147            // Calculate incremental fill as: current_total - previous_total
1148            if let Some(prev_qty) = previous_filled_qty {
1149                let incremental = current_filled - prev_qty;
1150                if incremental.is_zero() {
1151                    anyhow::bail!(
1152                        "Incremental fill quantity is zero (acc_fill_sz='{}', previous_filled_qty={})",
1153                        acc_fill_sz,
1154                        prev_qty
1155                    );
1156                }
1157                incremental
1158            } else {
1159                // First fill, use accumulated as incremental
1160                current_filled
1161            }
1162        } else {
1163            anyhow::bail!(
1164                "Cannot determine fill quantity: fill_sz is empty/zero and acc_fill_sz is empty/zero"
1165            );
1166        }
1167    } else {
1168        anyhow::bail!(
1169            "Cannot determine fill quantity: fill_sz='{}' and acc_fill_sz is None",
1170            msg.fill_sz
1171        );
1172    };
1173
1174    let fee_str = msg.fee.as_deref().unwrap_or("0");
1175    let fee_dec = Decimal::from_str(fee_str)
1176        .map_err(|e| anyhow::anyhow!("Failed to parse fee '{}': {}", fee_str, e))?;
1177
1178    let fee_currency = parse_fee_currency(msg.fee_ccy.as_str(), fee_dec, || {
1179        format!("fill report for inst_id={}", msg.inst_id)
1180    });
1181
1182    // OKX sends fees as negative numbers (e.g., "-2.5" for a $2.5 charge), parse_fee negates to positive
1183    let total_fee = parse_fee(msg.fee.as_deref(), fee_currency)
1184        .map_err(|e| anyhow::anyhow!("Failed to parse fee={:?}: {}", msg.fee, e))?;
1185
1186    // OKX sends cumulative fees, so we subtract the previous total to get this fill's fee
1187    let commission = if let Some(previous_fee) = previous_fee {
1188        let incremental = total_fee - previous_fee;
1189
1190        if incremental < Money::zero(fee_currency) {
1191            tracing::debug!(
1192                order_id = msg.ord_id.as_str(),
1193                total_fee = %total_fee,
1194                previous_fee = %previous_fee,
1195                incremental = %incremental,
1196                "Negative incremental fee detected - likely a maker rebate or fee refund"
1197            );
1198        }
1199
1200        // Skip corruption check when previous is negative (rebate), as transitions from
1201        // rebate to charge legitimately have incremental > total (e.g., -1 → +2 gives +3)
1202        if previous_fee >= Money::zero(fee_currency)
1203            && total_fee > Money::zero(fee_currency)
1204            && incremental > total_fee
1205        {
1206            tracing::error!(
1207                order_id = msg.ord_id.as_str(),
1208                total_fee = %total_fee,
1209                previous_fee = %previous_fee,
1210                incremental = %incremental,
1211                "Incremental fee exceeds total fee - likely fee cache corruption, using total fee as fallback"
1212            );
1213            total_fee
1214        } else {
1215            incremental
1216        }
1217    } else {
1218        total_fee
1219    };
1220
1221    let liquidity_side: LiquiditySide = msg.exec_type.into();
1222    let ts_event = parse_millisecond_timestamp(msg.fill_time);
1223
1224    let is_liquidation = matches!(
1225        msg.category,
1226        OKXOrderCategory::FullLiquidation | OKXOrderCategory::PartialLiquidation
1227    );
1228
1229    let is_adl = msg.category == OKXOrderCategory::Adl;
1230
1231    if is_liquidation {
1232        tracing::warn!(
1233            order_id = msg.ord_id.as_str(),
1234            category = ?msg.category,
1235            inst_id = msg.inst_id.as_str(),
1236            side = ?msg.side,
1237            fill_sz = %msg.fill_sz,
1238            fill_px = %msg.fill_px,
1239            "Liquidation order detected"
1240        );
1241    }
1242
1243    if is_adl {
1244        tracing::warn!(
1245            order_id = msg.ord_id.as_str(),
1246            inst_id = msg.inst_id.as_str(),
1247            side = ?msg.side,
1248            fill_sz = %msg.fill_sz,
1249            fill_px = %msg.fill_px,
1250            "ADL (Auto-Deleveraging) order detected"
1251        );
1252    }
1253
1254    let report = FillReport::new(
1255        account_id,
1256        instrument.id(),
1257        venue_order_id,
1258        trade_id,
1259        order_side,
1260        last_qty,
1261        last_px,
1262        commission,
1263        liquidity_side,
1264        client_order_id,
1265        None,
1266        ts_event,
1267        ts_init,
1268        None, // Generate UUID4 automatically
1269    );
1270
1271    Ok(report)
1272}
1273
1274/// Parses OKX WebSocket message payloads into Nautilus data structures.
1275///
1276/// # Errors
1277///
1278/// Returns an error if the payload cannot be deserialized or if downstream
1279/// parsing routines fail.
1280///
1281/// # Panics
1282///
1283/// Panics only in the case where `okx_channel_to_bar_spec(channel)` returns
1284/// `None` after a prior `is_some` check – an unreachable scenario indicating a
1285/// logic error.
1286#[allow(clippy::too_many_arguments)]
1287pub fn parse_ws_message_data(
1288    channel: &OKXWsChannel,
1289    data: serde_json::Value,
1290    instrument_id: &InstrumentId,
1291    price_precision: u8,
1292    size_precision: u8,
1293    ts_init: UnixNanos,
1294    funding_cache: &mut AHashMap<Ustr, (Ustr, u64)>,
1295    instruments_cache: &AHashMap<Ustr, InstrumentAny>,
1296) -> anyhow::Result<Option<NautilusWsMessage>> {
1297    match channel {
1298        OKXWsChannel::Instruments => {
1299            if let Ok(msg) = serde_json::from_value::<OKXInstrument>(data) {
1300                // Look up cached instrument to extract existing fees
1301                let (margin_init, margin_maint, maker_fee, taker_fee) =
1302                    instruments_cache.get(&Ustr::from(&msg.inst_id)).map_or(
1303                        (None, None, None, None),
1304                        extract_fees_from_cached_instrument,
1305                    );
1306
1307                match parse_instrument_any(
1308                    &msg,
1309                    margin_init,
1310                    margin_maint,
1311                    maker_fee,
1312                    taker_fee,
1313                    ts_init,
1314                )? {
1315                    Some(inst_any) => Ok(Some(NautilusWsMessage::Instrument(Box::new(inst_any)))),
1316                    None => {
1317                        tracing::warn!("Empty instrument payload: {:?}", msg);
1318                        Ok(None)
1319                    }
1320                }
1321            } else {
1322                anyhow::bail!("Failed to deserialize instrument payload")
1323            }
1324        }
1325        OKXWsChannel::BboTbt => {
1326            let data_vec = parse_quote_msg_vec(
1327                data,
1328                instrument_id,
1329                price_precision,
1330                size_precision,
1331                ts_init,
1332            )?;
1333            Ok(Some(NautilusWsMessage::Data(data_vec)))
1334        }
1335        OKXWsChannel::Tickers => {
1336            let data_vec = parse_ticker_msg_vec(
1337                data,
1338                instrument_id,
1339                price_precision,
1340                size_precision,
1341                ts_init,
1342            )?;
1343            Ok(Some(NautilusWsMessage::Data(data_vec)))
1344        }
1345        OKXWsChannel::Trades => {
1346            let data_vec = parse_trade_msg_vec(
1347                data,
1348                instrument_id,
1349                price_precision,
1350                size_precision,
1351                ts_init,
1352            )?;
1353            Ok(Some(NautilusWsMessage::Data(data_vec)))
1354        }
1355        OKXWsChannel::MarkPrice => {
1356            let data_vec = parse_mark_price_msg_vec(data, instrument_id, price_precision, ts_init)?;
1357            Ok(Some(NautilusWsMessage::Data(data_vec)))
1358        }
1359        OKXWsChannel::IndexTickers => {
1360            let data_vec =
1361                parse_index_price_msg_vec(data, instrument_id, price_precision, ts_init)?;
1362            Ok(Some(NautilusWsMessage::Data(data_vec)))
1363        }
1364        OKXWsChannel::FundingRate => {
1365            let data_vec = parse_funding_rate_msg_vec(data, instrument_id, ts_init, funding_cache)?;
1366            Ok(Some(NautilusWsMessage::FundingRates(data_vec)))
1367        }
1368        channel if okx_channel_to_bar_spec(channel).is_some() => {
1369            let bar_spec = okx_channel_to_bar_spec(channel).expect("bar_spec checked above");
1370            let data_vec = parse_candle_msg_vec(
1371                data,
1372                instrument_id,
1373                price_precision,
1374                size_precision,
1375                bar_spec,
1376                ts_init,
1377            )?;
1378            Ok(Some(NautilusWsMessage::Data(data_vec)))
1379        }
1380        OKXWsChannel::Books
1381        | OKXWsChannel::BooksTbt
1382        | OKXWsChannel::Books5
1383        | OKXWsChannel::Books50Tbt => {
1384            if let Ok(book_msgs) = serde_json::from_value::<Vec<OKXBookMsg>>(data) {
1385                let data_vec = parse_book10_msg_vec(
1386                    book_msgs,
1387                    instrument_id,
1388                    price_precision,
1389                    size_precision,
1390                    ts_init,
1391                )?;
1392                Ok(Some(NautilusWsMessage::Data(data_vec)))
1393            } else {
1394                anyhow::bail!("Failed to deserialize Books channel data as Vec<OKXBookMsg>")
1395            }
1396        }
1397        _ => {
1398            tracing::warn!("Unsupported channel for message parsing: {channel:?}");
1399            Ok(None)
1400        }
1401    }
1402}
1403
1404////////////////////////////////////////////////////////////////////////////////
1405// Tests
1406////////////////////////////////////////////////////////////////////////////////
1407#[cfg(test)]
1408mod tests {
1409    use ahash::AHashMap;
1410    use nautilus_core::nanos::UnixNanos;
1411    use nautilus_model::{
1412        data::bar::BAR_SPEC_1_DAY_LAST,
1413        identifiers::{ClientOrderId, Symbol},
1414        instruments::CryptoPerpetual,
1415        types::Currency,
1416    };
1417    use rstest::rstest;
1418    use rust_decimal::Decimal;
1419    use rust_decimal_macros::dec;
1420    use ustr::Ustr;
1421
1422    use super::*;
1423    use crate::{
1424        OKXPositionSide,
1425        common::{
1426            enums::{OKXExecType, OKXInstrumentType, OKXOrderType, OKXSide, OKXTradeMode},
1427            parse::parse_account_state,
1428            testing::load_test_json,
1429        },
1430        http::models::OKXAccount,
1431        websocket::messages::{OKXWebSocketArg, OKXWsMessage},
1432    };
1433
1434    fn create_stub_instrument() -> CryptoPerpetual {
1435        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1436        CryptoPerpetual::new(
1437            instrument_id,
1438            Symbol::from("BTC-USDT-SWAP"),
1439            Currency::BTC(),
1440            Currency::USDT(),
1441            Currency::USDT(),
1442            false,
1443            2,
1444            8,
1445            Price::from("0.01"),
1446            Quantity::from("0.00000001"),
1447            None,
1448            None,
1449            None,
1450            None,
1451            None,
1452            None,
1453            None,
1454            None,
1455            None,
1456            None,
1457            None,
1458            None,
1459            UnixNanos::default(),
1460            UnixNanos::default(),
1461        )
1462    }
1463
1464    fn create_stub_order_msg(
1465        fill_sz: &str,
1466        acc_fill_sz: Option<String>,
1467        order_id: &str,
1468        trade_id: &str,
1469    ) -> OKXOrderMsg {
1470        OKXOrderMsg {
1471            acc_fill_sz,
1472            avg_px: "50000.0".to_string(),
1473            c_time: 1746947317401,
1474            cancel_source: None,
1475            cancel_source_reason: None,
1476            category: OKXOrderCategory::Normal,
1477            ccy: Ustr::from("USDT"),
1478            cl_ord_id: "test_order_1".to_string(),
1479            algo_cl_ord_id: None,
1480            fee: Some("-1.0".to_string()),
1481            fee_ccy: Ustr::from("USDT"),
1482            fill_px: "50000.0".to_string(),
1483            fill_sz: fill_sz.to_string(),
1484            fill_time: 1746947317402,
1485            inst_id: Ustr::from("BTC-USDT-SWAP"),
1486            inst_type: OKXInstrumentType::Swap,
1487            lever: "2.0".to_string(),
1488            ord_id: Ustr::from(order_id),
1489            ord_type: OKXOrderType::Market,
1490            pnl: "0".to_string(),
1491            pos_side: OKXPositionSide::Long,
1492            px: "".to_string(),
1493            reduce_only: "false".to_string(),
1494            side: OKXSide::Buy,
1495            state: OKXOrderStatus::PartiallyFilled,
1496            exec_type: OKXExecType::Taker,
1497            sz: "0.03".to_string(),
1498            td_mode: OKXTradeMode::Isolated,
1499            tgt_ccy: None,
1500            trade_id: trade_id.to_string(),
1501            u_time: 1746947317402,
1502        }
1503    }
1504
1505    #[rstest]
1506    fn test_parse_books_snapshot() {
1507        let json_data = load_test_json("ws_books_snapshot.json");
1508        let msg: OKXWsMessage = serde_json::from_str(&json_data).unwrap();
1509        let (okx_books, action): (Vec<OKXBookMsg>, OKXBookAction) = match msg {
1510            OKXWsMessage::BookData { data, action, .. } => (data, action),
1511            _ => panic!("Expected a `BookData` variant"),
1512        };
1513
1514        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1515        let deltas = parse_book_msg(
1516            &okx_books[0],
1517            instrument_id,
1518            2,
1519            1,
1520            &action,
1521            UnixNanos::default(),
1522        )
1523        .unwrap();
1524
1525        assert_eq!(deltas.instrument_id, instrument_id);
1526        assert_eq!(deltas.deltas.len(), 16);
1527        assert_eq!(deltas.flags, 32);
1528        assert_eq!(deltas.sequence, 123456);
1529        assert_eq!(deltas.ts_event, UnixNanos::from(1597026383085000000));
1530        assert_eq!(deltas.ts_init, UnixNanos::default());
1531
1532        // Verify some individual deltas are parsed correctly
1533        assert!(!deltas.deltas.is_empty());
1534        // Snapshot should have both bid and ask deltas
1535        let bid_deltas: Vec<_> = deltas
1536            .deltas
1537            .iter()
1538            .filter(|d| d.order.side == OrderSide::Buy)
1539            .collect();
1540        let ask_deltas: Vec<_> = deltas
1541            .deltas
1542            .iter()
1543            .filter(|d| d.order.side == OrderSide::Sell)
1544            .collect();
1545        assert!(!bid_deltas.is_empty());
1546        assert!(!ask_deltas.is_empty());
1547    }
1548
1549    #[rstest]
1550    fn test_parse_books_update() {
1551        let json_data = load_test_json("ws_books_update.json");
1552        let msg: OKXWsMessage = serde_json::from_str(&json_data).unwrap();
1553        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1554        let (okx_books, action): (Vec<OKXBookMsg>, OKXBookAction) = match msg {
1555            OKXWsMessage::BookData { data, action, .. } => (data, action),
1556            _ => panic!("Expected a `BookData` variant"),
1557        };
1558
1559        let deltas = parse_book_msg(
1560            &okx_books[0],
1561            instrument_id,
1562            2,
1563            1,
1564            &action,
1565            UnixNanos::default(),
1566        )
1567        .unwrap();
1568
1569        assert_eq!(deltas.instrument_id, instrument_id);
1570        assert_eq!(deltas.deltas.len(), 16);
1571        assert_eq!(deltas.flags, 0);
1572        assert_eq!(deltas.sequence, 123457);
1573        assert_eq!(deltas.ts_event, UnixNanos::from(1597026383085000000));
1574        assert_eq!(deltas.ts_init, UnixNanos::default());
1575
1576        // Verify some individual deltas are parsed correctly
1577        assert!(!deltas.deltas.is_empty());
1578        // Update should also have both bid and ask deltas
1579        let bid_deltas: Vec<_> = deltas
1580            .deltas
1581            .iter()
1582            .filter(|d| d.order.side == OrderSide::Buy)
1583            .collect();
1584        let ask_deltas: Vec<_> = deltas
1585            .deltas
1586            .iter()
1587            .filter(|d| d.order.side == OrderSide::Sell)
1588            .collect();
1589        assert!(!bid_deltas.is_empty());
1590        assert!(!ask_deltas.is_empty());
1591    }
1592
1593    #[rstest]
1594    fn test_parse_tickers() {
1595        let json_data = load_test_json("ws_tickers.json");
1596        let msg: OKXWsMessage = serde_json::from_str(&json_data).unwrap();
1597        let okx_tickers: Vec<OKXTickerMsg> = match msg {
1598            OKXWsMessage::Data { data, .. } => serde_json::from_value(data).unwrap(),
1599            _ => panic!("Expected a `Data` variant"),
1600        };
1601
1602        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1603        let trade =
1604            parse_ticker_msg(&okx_tickers[0], instrument_id, 2, 1, UnixNanos::default()).unwrap();
1605
1606        assert_eq!(trade.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
1607        assert_eq!(trade.bid_price, Price::from("8888.88"));
1608        assert_eq!(trade.ask_price, Price::from("9999.99"));
1609        assert_eq!(trade.bid_size, Quantity::from(5));
1610        assert_eq!(trade.ask_size, Quantity::from(11));
1611        assert_eq!(trade.ts_event, UnixNanos::from(1597026383085000000));
1612        assert_eq!(trade.ts_init, UnixNanos::default());
1613    }
1614
1615    #[rstest]
1616    fn test_parse_quotes() {
1617        let json_data = load_test_json("ws_bbo_tbt.json");
1618        let msg: OKXWsMessage = serde_json::from_str(&json_data).unwrap();
1619        let okx_quotes: Vec<OKXBookMsg> = match msg {
1620            OKXWsMessage::Data { data, .. } => serde_json::from_value(data).unwrap(),
1621            _ => panic!("Expected a `Data` variant"),
1622        };
1623        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1624
1625        let quote =
1626            parse_quote_msg(&okx_quotes[0], instrument_id, 2, 1, UnixNanos::default()).unwrap();
1627
1628        assert_eq!(quote.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
1629        assert_eq!(quote.bid_price, Price::from("8476.97"));
1630        assert_eq!(quote.ask_price, Price::from("8476.98"));
1631        assert_eq!(quote.bid_size, Quantity::from(256));
1632        assert_eq!(quote.ask_size, Quantity::from(415));
1633        assert_eq!(quote.ts_event, UnixNanos::from(1597026383085000000));
1634        assert_eq!(quote.ts_init, UnixNanos::default());
1635    }
1636
1637    #[rstest]
1638    fn test_parse_trades() {
1639        let json_data = load_test_json("ws_trades.json");
1640        let msg: OKXWsMessage = serde_json::from_str(&json_data).unwrap();
1641        let okx_trades: Vec<OKXTradeMsg> = match msg {
1642            OKXWsMessage::Data { data, .. } => serde_json::from_value(data).unwrap(),
1643            _ => panic!("Expected a `Data` variant"),
1644        };
1645
1646        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1647        let trade =
1648            parse_trade_msg(&okx_trades[0], instrument_id, 1, 8, UnixNanos::default()).unwrap();
1649
1650        assert_eq!(trade.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
1651        assert_eq!(trade.price, Price::from("42219.9"));
1652        assert_eq!(trade.size, Quantity::from("0.12060306"));
1653        assert_eq!(trade.aggressor_side, AggressorSide::Buyer);
1654        assert_eq!(trade.trade_id, TradeId::from("130639474"));
1655        assert_eq!(trade.ts_event, UnixNanos::from(1630048897897000000));
1656        assert_eq!(trade.ts_init, UnixNanos::default());
1657    }
1658
1659    #[rstest]
1660    fn test_parse_candle() {
1661        let json_data = load_test_json("ws_candle.json");
1662        let msg: OKXWsMessage = serde_json::from_str(&json_data).unwrap();
1663        let okx_candles: Vec<OKXCandleMsg> = match msg {
1664            OKXWsMessage::Data { data, .. } => serde_json::from_value(data).unwrap(),
1665            _ => panic!("Expected a `Data` variant"),
1666        };
1667
1668        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1669        let bar_type = BarType::new(
1670            instrument_id,
1671            BAR_SPEC_1_DAY_LAST,
1672            AggregationSource::External,
1673        );
1674        let bar = parse_candle_msg(&okx_candles[0], bar_type, 2, 0, UnixNanos::default()).unwrap();
1675
1676        assert_eq!(bar.bar_type, bar_type);
1677        assert_eq!(bar.open, Price::from("8533.02"));
1678        assert_eq!(bar.high, Price::from("8553.74"));
1679        assert_eq!(bar.low, Price::from("8527.17"));
1680        assert_eq!(bar.close, Price::from("8548.26"));
1681        assert_eq!(bar.volume, Quantity::from(45247));
1682        assert_eq!(bar.ts_event, UnixNanos::from(1597026383085000000));
1683        assert_eq!(bar.ts_init, UnixNanos::default());
1684    }
1685
1686    #[rstest]
1687    fn test_parse_funding_rate() {
1688        let json_data = load_test_json("ws_funding_rate.json");
1689        let msg: OKXWsMessage = serde_json::from_str(&json_data).unwrap();
1690
1691        let okx_funding_rates: Vec<crate::websocket::messages::OKXFundingRateMsg> = match msg {
1692            OKXWsMessage::Data { data, .. } => serde_json::from_value(data).unwrap(),
1693            _ => panic!("Expected a `Data` variant"),
1694        };
1695
1696        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1697        let funding_rate =
1698            parse_funding_rate_msg(&okx_funding_rates[0], instrument_id, UnixNanos::default())
1699                .unwrap();
1700
1701        assert_eq!(funding_rate.instrument_id, instrument_id);
1702        assert_eq!(funding_rate.rate, dec!(0.0001));
1703        assert_eq!(
1704            funding_rate.next_funding_ns,
1705            Some(UnixNanos::from(1744590349506000000))
1706        );
1707        assert_eq!(funding_rate.ts_event, UnixNanos::from(1744590349506000000));
1708        assert_eq!(funding_rate.ts_init, UnixNanos::default());
1709    }
1710
1711    #[rstest]
1712    fn test_parse_book_vec() {
1713        let json_data = load_test_json("ws_books_snapshot.json");
1714        let event: OKXWsMessage = serde_json::from_str(&json_data).unwrap();
1715        let (msgs, action): (Vec<OKXBookMsg>, OKXBookAction) = match event {
1716            OKXWsMessage::BookData { data, action, .. } => (data, action),
1717            _ => panic!("Expected BookData"),
1718        };
1719
1720        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1721        let deltas_vec =
1722            parse_book_msg_vec(msgs, &instrument_id, 8, 1, action, UnixNanos::default()).unwrap();
1723
1724        assert_eq!(deltas_vec.len(), 1);
1725
1726        if let Data::Deltas(d) = &deltas_vec[0] {
1727            assert_eq!(d.sequence, 123456);
1728        } else {
1729            panic!("Expected Deltas");
1730        }
1731    }
1732
1733    #[rstest]
1734    fn test_parse_ticker_vec() {
1735        let json_data = load_test_json("ws_tickers.json");
1736        let event: OKXWsMessage = serde_json::from_str(&json_data).unwrap();
1737        let data_val: serde_json::Value = match event {
1738            OKXWsMessage::Data { data, .. } => data,
1739            _ => panic!("Expected Data"),
1740        };
1741
1742        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1743        let quotes_vec =
1744            parse_ticker_msg_vec(data_val, &instrument_id, 8, 1, UnixNanos::default()).unwrap();
1745
1746        assert_eq!(quotes_vec.len(), 1);
1747
1748        if let Data::Quote(q) = &quotes_vec[0] {
1749            assert_eq!(q.bid_price, Price::from("8888.88000000"));
1750            assert_eq!(q.ask_price, Price::from("9999.99"));
1751        } else {
1752            panic!("Expected Quote");
1753        }
1754    }
1755
1756    #[rstest]
1757    fn test_parse_trade_vec() {
1758        let json_data = load_test_json("ws_trades.json");
1759        let event: OKXWsMessage = serde_json::from_str(&json_data).unwrap();
1760        let data_val: serde_json::Value = match event {
1761            OKXWsMessage::Data { data, .. } => data,
1762            _ => panic!("Expected Data"),
1763        };
1764
1765        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1766        let trades_vec =
1767            parse_trade_msg_vec(data_val, &instrument_id, 8, 1, UnixNanos::default()).unwrap();
1768
1769        assert_eq!(trades_vec.len(), 1);
1770
1771        if let Data::Trade(t) = &trades_vec[0] {
1772            assert_eq!(t.trade_id, TradeId::new("130639474"));
1773        } else {
1774            panic!("Expected Trade");
1775        }
1776    }
1777
1778    #[rstest]
1779    fn test_parse_candle_vec() {
1780        let json_data = load_test_json("ws_candle.json");
1781        let event: OKXWsMessage = serde_json::from_str(&json_data).unwrap();
1782        let data_val: serde_json::Value = match event {
1783            OKXWsMessage::Data { data, .. } => data,
1784            _ => panic!("Expected Data"),
1785        };
1786
1787        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1788        let bars_vec = parse_candle_msg_vec(
1789            data_val,
1790            &instrument_id,
1791            2,
1792            1,
1793            BAR_SPEC_1_DAY_LAST,
1794            UnixNanos::default(),
1795        )
1796        .unwrap();
1797
1798        assert_eq!(bars_vec.len(), 1);
1799
1800        if let Data::Bar(b) = &bars_vec[0] {
1801            assert_eq!(b.open, Price::from("8533.02"));
1802        } else {
1803            panic!("Expected Bar");
1804        }
1805    }
1806
1807    #[rstest]
1808    fn test_parse_book_message() {
1809        let json_data = load_test_json("ws_bbo_tbt.json");
1810        let msg: OKXWsMessage = serde_json::from_str(&json_data).unwrap();
1811        let (okx_books, arg): (Vec<OKXBookMsg>, OKXWebSocketArg) = match msg {
1812            OKXWsMessage::Data { data, arg, .. } => (serde_json::from_value(data).unwrap(), arg),
1813            _ => panic!("Expected a `Data` variant"),
1814        };
1815
1816        assert_eq!(arg.channel, OKXWsChannel::BboTbt);
1817        assert_eq!(arg.inst_id.as_ref().unwrap(), &Ustr::from("BTC-USDT"));
1818        assert_eq!(arg.inst_type, None);
1819        assert_eq!(okx_books.len(), 1);
1820
1821        let book_msg = &okx_books[0];
1822
1823        // Check asks
1824        assert_eq!(book_msg.asks.len(), 1);
1825        let ask = &book_msg.asks[0];
1826        assert_eq!(ask.price, "8476.98");
1827        assert_eq!(ask.size, "415");
1828        assert_eq!(ask.liquidated_orders_count, "0");
1829        assert_eq!(ask.orders_count, "13");
1830
1831        // Check bids
1832        assert_eq!(book_msg.bids.len(), 1);
1833        let bid = &book_msg.bids[0];
1834        assert_eq!(bid.price, "8476.97");
1835        assert_eq!(bid.size, "256");
1836        assert_eq!(bid.liquidated_orders_count, "0");
1837        assert_eq!(bid.orders_count, "12");
1838        assert_eq!(book_msg.ts, 1597026383085);
1839        assert_eq!(book_msg.seq_id, 123456);
1840        assert_eq!(book_msg.checksum, None);
1841        assert_eq!(book_msg.prev_seq_id, None);
1842    }
1843
1844    #[rstest]
1845    fn test_parse_ws_account_message() {
1846        let json_data = load_test_json("ws_account.json");
1847        let accounts: Vec<OKXAccount> = serde_json::from_str(&json_data).unwrap();
1848
1849        assert_eq!(accounts.len(), 1);
1850        let account = &accounts[0];
1851
1852        assert_eq!(account.total_eq, "100.56089404807182");
1853        assert_eq!(account.details.len(), 3);
1854
1855        let usdt_detail = &account.details[0];
1856        assert_eq!(usdt_detail.ccy, "USDT");
1857        assert_eq!(usdt_detail.avail_bal, "100.52768569797846");
1858        assert_eq!(usdt_detail.cash_bal, "100.52768569797846");
1859
1860        let btc_detail = &account.details[1];
1861        assert_eq!(btc_detail.ccy, "BTC");
1862        assert_eq!(btc_detail.avail_bal, "0.0000000051");
1863
1864        let eth_detail = &account.details[2];
1865        assert_eq!(eth_detail.ccy, "ETH");
1866        assert_eq!(eth_detail.avail_bal, "0.000000185");
1867
1868        let account_id = AccountId::new("OKX-001");
1869        let ts_init = nautilus_core::nanos::UnixNanos::default();
1870        let account_state = parse_account_state(account, account_id, ts_init);
1871
1872        assert!(account_state.is_ok());
1873        let state = account_state.unwrap();
1874        assert_eq!(state.account_id, account_id);
1875        assert_eq!(state.balances.len(), 3);
1876    }
1877
1878    #[rstest]
1879    fn test_parse_order_msg() {
1880        let json_data = load_test_json("ws_orders.json");
1881        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
1882
1883        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
1884
1885        let account_id = AccountId::new("OKX-001");
1886        let mut instruments = AHashMap::new();
1887
1888        // Create a mock instrument for testing
1889        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1890        let instrument = CryptoPerpetual::new(
1891            instrument_id,
1892            Symbol::from("BTC-USDT-SWAP"),
1893            Currency::BTC(),
1894            Currency::USDT(),
1895            Currency::USDT(),
1896            false, // is_inverse
1897            2,     // price_precision
1898            8,     // size_precision
1899            Price::from("0.01"),
1900            Quantity::from("0.00000001"),
1901            None, // multiplier
1902            None, // lot_size
1903            None, // max_quantity
1904            None, // min_quantity
1905            None, // max_notional
1906            None, // min_notional
1907            None, // max_price
1908            None, // min_price
1909            None, // margin_init
1910            None, // margin_maint
1911            None, // maker_fee
1912            None, // taker_fee
1913            UnixNanos::default(),
1914            UnixNanos::default(),
1915        );
1916
1917        instruments.insert(
1918            Ustr::from("BTC-USDT-SWAP"),
1919            InstrumentAny::CryptoPerpetual(instrument),
1920        );
1921
1922        let ts_init = UnixNanos::default();
1923        let fee_cache = AHashMap::new();
1924        let filled_qty_cache = AHashMap::new();
1925
1926        let result = parse_order_msg_vec(
1927            data,
1928            account_id,
1929            &instruments,
1930            &fee_cache,
1931            &filled_qty_cache,
1932            ts_init,
1933        );
1934
1935        assert!(result.is_ok());
1936        let order_reports = result.unwrap();
1937        assert_eq!(order_reports.len(), 1);
1938
1939        // Verify the parsed order report
1940        let report = &order_reports[0];
1941
1942        if let ExecutionReport::Fill(fill_report) = report {
1943            assert_eq!(fill_report.account_id, account_id);
1944            assert_eq!(fill_report.instrument_id, instrument_id);
1945            assert_eq!(
1946                fill_report.client_order_id,
1947                Some(ClientOrderId::new("001BTCUSDT20250106001"))
1948            );
1949            assert_eq!(
1950                fill_report.venue_order_id,
1951                VenueOrderId::new("2497956918703120384")
1952            );
1953            assert_eq!(fill_report.trade_id, TradeId::from("1518905529"));
1954            assert_eq!(fill_report.order_side, OrderSide::Buy);
1955            assert_eq!(fill_report.last_px, Price::from("103698.90"));
1956            assert_eq!(fill_report.last_qty, Quantity::from("0.03000000"));
1957            assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
1958        } else {
1959            panic!("Expected Fill report for filled order");
1960        }
1961    }
1962
1963    #[rstest]
1964    fn test_parse_order_status_report() {
1965        let json_data = load_test_json("ws_orders.json");
1966        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
1967        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
1968        let order_msg = &data[0];
1969
1970        let account_id = AccountId::new("OKX-001");
1971        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1972        let instrument = CryptoPerpetual::new(
1973            instrument_id,
1974            Symbol::from("BTC-USDT-SWAP"),
1975            Currency::BTC(),
1976            Currency::USDT(),
1977            Currency::USDT(),
1978            false, // is_inverse
1979            2,     // price_precision
1980            8,     // size_precision
1981            Price::from("0.01"),
1982            Quantity::from("0.00000001"),
1983            None,
1984            None,
1985            None,
1986            None,
1987            None,
1988            None,
1989            None,
1990            None,
1991            None,
1992            None,
1993            None,
1994            None,
1995            UnixNanos::default(),
1996            UnixNanos::default(),
1997        );
1998
1999        let ts_init = UnixNanos::default();
2000
2001        let result = parse_order_status_report(
2002            order_msg,
2003            &InstrumentAny::CryptoPerpetual(instrument),
2004            account_id,
2005            ts_init,
2006        );
2007
2008        assert!(result.is_ok());
2009        let order_status_report = result.unwrap();
2010
2011        assert_eq!(order_status_report.account_id, account_id);
2012        assert_eq!(order_status_report.instrument_id, instrument_id);
2013        assert_eq!(
2014            order_status_report.client_order_id,
2015            Some(ClientOrderId::new("001BTCUSDT20250106001"))
2016        );
2017        assert_eq!(
2018            order_status_report.venue_order_id,
2019            VenueOrderId::new("2497956918703120384")
2020        );
2021        assert_eq!(order_status_report.order_side, OrderSide::Buy);
2022        assert_eq!(order_status_report.order_status, OrderStatus::Filled);
2023        assert_eq!(order_status_report.quantity, Quantity::from("0.03000000"));
2024        assert_eq!(order_status_report.filled_qty, Quantity::from("0.03000000"));
2025    }
2026
2027    #[rstest]
2028    fn test_parse_fill_report() {
2029        let json_data = load_test_json("ws_orders.json");
2030        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
2031        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
2032        let order_msg = &data[0];
2033
2034        let account_id = AccountId::new("OKX-001");
2035        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2036        let instrument = CryptoPerpetual::new(
2037            instrument_id,
2038            Symbol::from("BTC-USDT-SWAP"),
2039            Currency::BTC(),
2040            Currency::USDT(),
2041            Currency::USDT(),
2042            false, // is_inverse
2043            2,     // price_precision
2044            8,     // size_precision
2045            Price::from("0.01"),
2046            Quantity::from("0.00000001"),
2047            None,
2048            None,
2049            None,
2050            None,
2051            None,
2052            None,
2053            None,
2054            None,
2055            None,
2056            None,
2057            None,
2058            None,
2059            UnixNanos::default(),
2060            UnixNanos::default(),
2061        );
2062
2063        let ts_init = UnixNanos::default();
2064
2065        let result = parse_fill_report(
2066            order_msg,
2067            &InstrumentAny::CryptoPerpetual(instrument),
2068            account_id,
2069            None,
2070            None,
2071            ts_init,
2072        );
2073
2074        assert!(result.is_ok());
2075        let fill_report = result.unwrap();
2076
2077        assert_eq!(fill_report.account_id, account_id);
2078        assert_eq!(fill_report.instrument_id, instrument_id);
2079        assert_eq!(
2080            fill_report.client_order_id,
2081            Some(ClientOrderId::new("001BTCUSDT20250106001"))
2082        );
2083        assert_eq!(
2084            fill_report.venue_order_id,
2085            VenueOrderId::new("2497956918703120384")
2086        );
2087        assert_eq!(fill_report.trade_id, TradeId::from("1518905529"));
2088        assert_eq!(fill_report.order_side, OrderSide::Buy);
2089        assert_eq!(fill_report.last_px, Price::from("103698.90"));
2090        assert_eq!(fill_report.last_qty, Quantity::from("0.03000000"));
2091        assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
2092    }
2093
2094    #[rstest]
2095    fn test_parse_book10_msg() {
2096        let json_data = load_test_json("ws_books_snapshot.json");
2097        let event: OKXWsMessage = serde_json::from_str(&json_data).unwrap();
2098        let msgs: Vec<OKXBookMsg> = match event {
2099            OKXWsMessage::BookData { data, .. } => data,
2100            _ => panic!("Expected BookData"),
2101        };
2102
2103        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2104        let depth10 =
2105            parse_book10_msg(&msgs[0], instrument_id, 2, 0, UnixNanos::default()).unwrap();
2106
2107        assert_eq!(depth10.instrument_id, instrument_id);
2108        assert_eq!(depth10.sequence, 123456);
2109        assert_eq!(depth10.ts_event, UnixNanos::from(1597026383085000000));
2110        assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
2111
2112        // Check bid levels (available in test data: 8 levels)
2113        assert_eq!(depth10.bids[0].price, Price::from("8476.97"));
2114        assert_eq!(depth10.bids[0].size, Quantity::from("256"));
2115        assert_eq!(depth10.bids[0].side, OrderSide::Buy);
2116        assert_eq!(depth10.bid_counts[0], 12);
2117
2118        assert_eq!(depth10.bids[1].price, Price::from("8475.55"));
2119        assert_eq!(depth10.bids[1].size, Quantity::from("101"));
2120        assert_eq!(depth10.bid_counts[1], 1);
2121
2122        // Check that levels beyond available data are padded with empty orders
2123        assert_eq!(depth10.bids[8].price, Price::from("0"));
2124        assert_eq!(depth10.bids[8].size, Quantity::from("0"));
2125        assert_eq!(depth10.bid_counts[8], 0);
2126
2127        // Check ask levels (available in test data: 8 levels)
2128        assert_eq!(depth10.asks[0].price, Price::from("8476.98"));
2129        assert_eq!(depth10.asks[0].size, Quantity::from("415"));
2130        assert_eq!(depth10.asks[0].side, OrderSide::Sell);
2131        assert_eq!(depth10.ask_counts[0], 13);
2132
2133        assert_eq!(depth10.asks[1].price, Price::from("8477.00"));
2134        assert_eq!(depth10.asks[1].size, Quantity::from("7"));
2135        assert_eq!(depth10.ask_counts[1], 2);
2136
2137        // Check that levels beyond available data are padded with empty orders
2138        assert_eq!(depth10.asks[8].price, Price::from("0"));
2139        assert_eq!(depth10.asks[8].size, Quantity::from("0"));
2140        assert_eq!(depth10.ask_counts[8], 0);
2141    }
2142
2143    #[rstest]
2144    fn test_parse_book10_msg_vec() {
2145        let json_data = load_test_json("ws_books_snapshot.json");
2146        let event: OKXWsMessage = serde_json::from_str(&json_data).unwrap();
2147        let msgs: Vec<OKXBookMsg> = match event {
2148            OKXWsMessage::BookData { data, .. } => data,
2149            _ => panic!("Expected BookData"),
2150        };
2151
2152        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2153        let depth10_vec =
2154            parse_book10_msg_vec(msgs, &instrument_id, 2, 0, UnixNanos::default()).unwrap();
2155
2156        assert_eq!(depth10_vec.len(), 1);
2157
2158        if let Data::Depth10(d) = &depth10_vec[0] {
2159            assert_eq!(d.instrument_id, instrument_id);
2160            assert_eq!(d.sequence, 123456);
2161            assert_eq!(d.bids[0].price, Price::from("8476.97"));
2162            assert_eq!(d.asks[0].price, Price::from("8476.98"));
2163        } else {
2164            panic!("Expected Depth10");
2165        }
2166    }
2167
2168    #[rstest]
2169    fn test_parse_fill_report_with_fee_cache() {
2170        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2171        let instrument = CryptoPerpetual::new(
2172            instrument_id,
2173            Symbol::from("BTC-USDT-SWAP"),
2174            Currency::BTC(),
2175            Currency::USDT(),
2176            Currency::USDT(),
2177            false, // is_inverse
2178            2,     // price_precision
2179            8,     // size_precision
2180            Price::from("0.01"),
2181            Quantity::from("0.00000001"),
2182            None, // multiplier
2183            None, // lot_size
2184            None, // max_quantity
2185            None, // min_quantity
2186            None, // max_notional
2187            None, // min_notional
2188            None, // max_price
2189            None, // min_price
2190            None, // margin_init
2191            None, // margin_maint
2192            None, // maker_fee
2193            None, // taker_fee
2194            UnixNanos::default(),
2195            UnixNanos::default(),
2196        );
2197
2198        let account_id = AccountId::new("OKX-001");
2199        let ts_init = UnixNanos::default();
2200
2201        // First fill: 0.01 BTC out of 0.03 BTC total (1/3)
2202        let order_msg_1 = OKXOrderMsg {
2203            acc_fill_sz: Some("0.01".to_string()),
2204            avg_px: "50000.0".to_string(),
2205            c_time: 1746947317401,
2206            cancel_source: None,
2207            cancel_source_reason: None,
2208            category: OKXOrderCategory::Normal,
2209            ccy: Ustr::from("USDT"),
2210            cl_ord_id: "test_order_1".to_string(),
2211            algo_cl_ord_id: None,
2212            fee: Some("-1.0".to_string()), // Total fee so far
2213            fee_ccy: Ustr::from("USDT"),
2214            fill_px: "50000.0".to_string(),
2215            fill_sz: "0.01".to_string(),
2216            fill_time: 1746947317402,
2217            inst_id: Ustr::from("BTC-USDT-SWAP"),
2218            inst_type: OKXInstrumentType::Swap,
2219            lever: "2.0".to_string(),
2220            ord_id: Ustr::from("1234567890"),
2221            ord_type: OKXOrderType::Market,
2222            pnl: "0".to_string(),
2223            pos_side: OKXPositionSide::Long,
2224            px: "".to_string(),
2225            reduce_only: "false".to_string(),
2226            side: OKXSide::Buy,
2227            state: OKXOrderStatus::PartiallyFilled,
2228            exec_type: OKXExecType::Maker,
2229            sz: "0.03".to_string(), // Total order size
2230            td_mode: OKXTradeMode::Isolated,
2231            tgt_ccy: None,
2232            trade_id: "trade_1".to_string(),
2233            u_time: 1746947317402,
2234        };
2235
2236        let fill_report_1 = parse_fill_report(
2237            &order_msg_1,
2238            &InstrumentAny::CryptoPerpetual(instrument),
2239            account_id,
2240            None,
2241            None,
2242            ts_init,
2243        )
2244        .unwrap();
2245
2246        // First fill should get the full fee since there's no previous fee
2247        assert_eq!(fill_report_1.commission, Money::new(1.0, Currency::USDT()));
2248
2249        // Second fill: 0.02 BTC more, now 0.03 BTC total (completely filled)
2250        let order_msg_2 = OKXOrderMsg {
2251            acc_fill_sz: Some("0.03".to_string()),
2252            avg_px: "50000.0".to_string(),
2253            c_time: 1746947317401,
2254            cancel_source: None,
2255            cancel_source_reason: None,
2256            category: OKXOrderCategory::Normal,
2257            ccy: Ustr::from("USDT"),
2258            cl_ord_id: "test_order_1".to_string(),
2259            algo_cl_ord_id: None,
2260            fee: Some("-3.0".to_string()), // Same total fee
2261            fee_ccy: Ustr::from("USDT"),
2262            fill_px: "50000.0".to_string(),
2263            fill_sz: "0.02".to_string(),
2264            fill_time: 1746947317403,
2265            inst_id: Ustr::from("BTC-USDT-SWAP"),
2266            inst_type: OKXInstrumentType::Swap,
2267            lever: "2.0".to_string(),
2268            ord_id: Ustr::from("1234567890"),
2269            ord_type: OKXOrderType::Market,
2270            pnl: "0".to_string(),
2271            pos_side: OKXPositionSide::Long,
2272            px: "".to_string(),
2273            reduce_only: "false".to_string(),
2274            side: OKXSide::Buy,
2275            state: OKXOrderStatus::Filled,
2276            exec_type: OKXExecType::Maker,
2277            sz: "0.03".to_string(), // Same total order size
2278            td_mode: OKXTradeMode::Isolated,
2279            tgt_ccy: None,
2280            trade_id: "trade_2".to_string(),
2281            u_time: 1746947317403,
2282        };
2283
2284        let fill_report_2 = parse_fill_report(
2285            &order_msg_2,
2286            &InstrumentAny::CryptoPerpetual(instrument),
2287            account_id,
2288            Some(fill_report_1.commission),
2289            Some(fill_report_1.last_qty),
2290            ts_init,
2291        )
2292        .unwrap();
2293
2294        // Second fill should get total_fee - previous_fee = 3.0 - 1.0 = 2.0
2295        assert_eq!(fill_report_2.commission, Money::new(2.0, Currency::USDT()));
2296
2297        // Test passed - fee was correctly split proportionally
2298    }
2299
2300    #[rstest]
2301    fn test_parse_fill_report_with_maker_rebates() {
2302        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2303        let instrument = CryptoPerpetual::new(
2304            instrument_id,
2305            Symbol::from("BTC-USDT-SWAP"),
2306            Currency::BTC(),
2307            Currency::USDT(),
2308            Currency::USDT(),
2309            false,
2310            2,
2311            8,
2312            Price::from("0.01"),
2313            Quantity::from("0.00000001"),
2314            None,
2315            None,
2316            None,
2317            None,
2318            None,
2319            None,
2320            None,
2321            None,
2322            None,
2323            None,
2324            None,
2325            None,
2326            UnixNanos::default(),
2327            UnixNanos::default(),
2328        );
2329
2330        let account_id = AccountId::new("OKX-001");
2331        let ts_init = UnixNanos::default();
2332
2333        // First fill: maker rebate of $0.5 (OKX sends as "0.5", parse_fee makes it -0.5)
2334        let order_msg_1 = OKXOrderMsg {
2335            acc_fill_sz: Some("0.01".to_string()),
2336            avg_px: "50000.0".to_string(),
2337            c_time: 1746947317401,
2338            cancel_source: None,
2339            cancel_source_reason: None,
2340            category: OKXOrderCategory::Normal,
2341            ccy: Ustr::from("USDT"),
2342            cl_ord_id: "test_order_rebate".to_string(),
2343            algo_cl_ord_id: None,
2344            fee: Some("0.5".to_string()), // Rebate: positive value from OKX
2345            fee_ccy: Ustr::from("USDT"),
2346            fill_px: "50000.0".to_string(),
2347            fill_sz: "0.01".to_string(),
2348            fill_time: 1746947317402,
2349            inst_id: Ustr::from("BTC-USDT-SWAP"),
2350            inst_type: OKXInstrumentType::Swap,
2351            lever: "2.0".to_string(),
2352            ord_id: Ustr::from("rebate_order_123"),
2353            ord_type: OKXOrderType::Market,
2354            pnl: "0".to_string(),
2355            pos_side: OKXPositionSide::Long,
2356            px: "".to_string(),
2357            reduce_only: "false".to_string(),
2358            side: OKXSide::Buy,
2359            state: OKXOrderStatus::PartiallyFilled,
2360            exec_type: OKXExecType::Maker,
2361            sz: "0.02".to_string(),
2362            td_mode: OKXTradeMode::Isolated,
2363            tgt_ccy: None,
2364            trade_id: "trade_rebate_1".to_string(),
2365            u_time: 1746947317402,
2366        };
2367
2368        let fill_report_1 = parse_fill_report(
2369            &order_msg_1,
2370            &InstrumentAny::CryptoPerpetual(instrument),
2371            account_id,
2372            None,
2373            None,
2374            ts_init,
2375        )
2376        .unwrap();
2377
2378        // First fill gets the full rebate (negative commission)
2379        assert_eq!(fill_report_1.commission, Money::new(-0.5, Currency::USDT()));
2380
2381        // Second fill: another maker rebate of $0.3, cumulative now $0.8
2382        let order_msg_2 = OKXOrderMsg {
2383            acc_fill_sz: Some("0.02".to_string()),
2384            avg_px: "50000.0".to_string(),
2385            c_time: 1746947317401,
2386            cancel_source: None,
2387            cancel_source_reason: None,
2388            category: OKXOrderCategory::Normal,
2389            ccy: Ustr::from("USDT"),
2390            cl_ord_id: "test_order_rebate".to_string(),
2391            algo_cl_ord_id: None,
2392            fee: Some("0.8".to_string()), // Cumulative rebate
2393            fee_ccy: Ustr::from("USDT"),
2394            fill_px: "50000.0".to_string(),
2395            fill_sz: "0.01".to_string(),
2396            fill_time: 1746947317403,
2397            inst_id: Ustr::from("BTC-USDT-SWAP"),
2398            inst_type: OKXInstrumentType::Swap,
2399            lever: "2.0".to_string(),
2400            ord_id: Ustr::from("rebate_order_123"),
2401            ord_type: OKXOrderType::Market,
2402            pnl: "0".to_string(),
2403            pos_side: OKXPositionSide::Long,
2404            px: "".to_string(),
2405            reduce_only: "false".to_string(),
2406            side: OKXSide::Buy,
2407            state: OKXOrderStatus::Filled,
2408            exec_type: OKXExecType::Maker,
2409            sz: "0.02".to_string(),
2410            td_mode: OKXTradeMode::Isolated,
2411            tgt_ccy: None,
2412            trade_id: "trade_rebate_2".to_string(),
2413            u_time: 1746947317403,
2414        };
2415
2416        let fill_report_2 = parse_fill_report(
2417            &order_msg_2,
2418            &InstrumentAny::CryptoPerpetual(instrument),
2419            account_id,
2420            Some(fill_report_1.commission),
2421            Some(fill_report_1.last_qty),
2422            ts_init,
2423        )
2424        .unwrap();
2425
2426        // Second fill: incremental = -0.8 - (-0.5) = -0.3
2427        assert_eq!(fill_report_2.commission, Money::new(-0.3, Currency::USDT()));
2428    }
2429
2430    #[rstest]
2431    fn test_parse_fill_report_rebate_to_charge_transition() {
2432        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2433        let instrument = CryptoPerpetual::new(
2434            instrument_id,
2435            Symbol::from("BTC-USDT-SWAP"),
2436            Currency::BTC(),
2437            Currency::USDT(),
2438            Currency::USDT(),
2439            false,
2440            2,
2441            8,
2442            Price::from("0.01"),
2443            Quantity::from("0.00000001"),
2444            None,
2445            None,
2446            None,
2447            None,
2448            None,
2449            None,
2450            None,
2451            None,
2452            None,
2453            None,
2454            None,
2455            None,
2456            UnixNanos::default(),
2457            UnixNanos::default(),
2458        );
2459
2460        let account_id = AccountId::new("OKX-001");
2461        let ts_init = UnixNanos::default();
2462
2463        // First fill: maker rebate of $1.0
2464        let order_msg_1 = OKXOrderMsg {
2465            acc_fill_sz: Some("0.01".to_string()),
2466            avg_px: "50000.0".to_string(),
2467            c_time: 1746947317401,
2468            cancel_source: None,
2469            cancel_source_reason: None,
2470            category: OKXOrderCategory::Normal,
2471            ccy: Ustr::from("USDT"),
2472            cl_ord_id: "test_order_transition".to_string(),
2473            algo_cl_ord_id: None,
2474            fee: Some("1.0".to_string()), // Rebate from OKX
2475            fee_ccy: Ustr::from("USDT"),
2476            fill_px: "50000.0".to_string(),
2477            fill_sz: "0.01".to_string(),
2478            fill_time: 1746947317402,
2479            inst_id: Ustr::from("BTC-USDT-SWAP"),
2480            inst_type: OKXInstrumentType::Swap,
2481            lever: "2.0".to_string(),
2482            ord_id: Ustr::from("transition_order_456"),
2483            ord_type: OKXOrderType::Market,
2484            pnl: "0".to_string(),
2485            pos_side: OKXPositionSide::Long,
2486            px: "".to_string(),
2487            reduce_only: "false".to_string(),
2488            side: OKXSide::Buy,
2489            state: OKXOrderStatus::PartiallyFilled,
2490            exec_type: OKXExecType::Maker,
2491            sz: "0.02".to_string(),
2492            td_mode: OKXTradeMode::Isolated,
2493            tgt_ccy: None,
2494            trade_id: "trade_transition_1".to_string(),
2495            u_time: 1746947317402,
2496        };
2497
2498        let fill_report_1 = parse_fill_report(
2499            &order_msg_1,
2500            &InstrumentAny::CryptoPerpetual(instrument),
2501            account_id,
2502            None,
2503            None,
2504            ts_init,
2505        )
2506        .unwrap();
2507
2508        // First fill gets rebate (negative)
2509        assert_eq!(fill_report_1.commission, Money::new(-1.0, Currency::USDT()));
2510
2511        // Second fill: taker charge of $5.0, net cumulative is now $2.0 charge
2512        // This is the edge case: incremental = 2.0 - (-1.0) = 3.0, which exceeds total (2.0)
2513        // But it's legitimate, not corruption
2514        let order_msg_2 = OKXOrderMsg {
2515            acc_fill_sz: Some("0.02".to_string()),
2516            avg_px: "50000.0".to_string(),
2517            c_time: 1746947317401,
2518            cancel_source: None,
2519            cancel_source_reason: None,
2520            category: OKXOrderCategory::Normal,
2521            ccy: Ustr::from("USDT"),
2522            cl_ord_id: "test_order_transition".to_string(),
2523            algo_cl_ord_id: None,
2524            fee: Some("-2.0".to_string()), // Now a charge (negative from OKX)
2525            fee_ccy: Ustr::from("USDT"),
2526            fill_px: "50000.0".to_string(),
2527            fill_sz: "0.01".to_string(),
2528            fill_time: 1746947317403,
2529            inst_id: Ustr::from("BTC-USDT-SWAP"),
2530            inst_type: OKXInstrumentType::Swap,
2531            lever: "2.0".to_string(),
2532            ord_id: Ustr::from("transition_order_456"),
2533            ord_type: OKXOrderType::Market,
2534            pnl: "0".to_string(),
2535            pos_side: OKXPositionSide::Long,
2536            px: "".to_string(),
2537            reduce_only: "false".to_string(),
2538            side: OKXSide::Buy,
2539            state: OKXOrderStatus::Filled,
2540            exec_type: OKXExecType::Taker,
2541            sz: "0.02".to_string(),
2542            td_mode: OKXTradeMode::Isolated,
2543            tgt_ccy: None,
2544            trade_id: "trade_transition_2".to_string(),
2545            u_time: 1746947317403,
2546        };
2547
2548        let fill_report_2 = parse_fill_report(
2549            &order_msg_2,
2550            &InstrumentAny::CryptoPerpetual(instrument),
2551            account_id,
2552            Some(fill_report_1.commission),
2553            Some(fill_report_1.last_qty),
2554            ts_init,
2555        )
2556        .unwrap();
2557
2558        // Second fill: incremental = 2.0 - (-1.0) = 3.0
2559        // This should NOT trigger corruption detection because previous was negative
2560        assert_eq!(fill_report_2.commission, Money::new(3.0, Currency::USDT()));
2561    }
2562
2563    #[rstest]
2564    fn test_parse_fill_report_negative_incremental() {
2565        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2566        let instrument = CryptoPerpetual::new(
2567            instrument_id,
2568            Symbol::from("BTC-USDT-SWAP"),
2569            Currency::BTC(),
2570            Currency::USDT(),
2571            Currency::USDT(),
2572            false,
2573            2,
2574            8,
2575            Price::from("0.01"),
2576            Quantity::from("0.00000001"),
2577            None,
2578            None,
2579            None,
2580            None,
2581            None,
2582            None,
2583            None,
2584            None,
2585            None,
2586            None,
2587            None,
2588            None,
2589            UnixNanos::default(),
2590            UnixNanos::default(),
2591        );
2592
2593        let account_id = AccountId::new("OKX-001");
2594        let ts_init = UnixNanos::default();
2595
2596        // First fill: charge of $2.0
2597        let order_msg_1 = OKXOrderMsg {
2598            acc_fill_sz: Some("0.01".to_string()),
2599            avg_px: "50000.0".to_string(),
2600            c_time: 1746947317401,
2601            cancel_source: None,
2602            cancel_source_reason: None,
2603            category: OKXOrderCategory::Normal,
2604            ccy: Ustr::from("USDT"),
2605            cl_ord_id: "test_order_neg_inc".to_string(),
2606            algo_cl_ord_id: None,
2607            fee: Some("-2.0".to_string()),
2608            fee_ccy: Ustr::from("USDT"),
2609            fill_px: "50000.0".to_string(),
2610            fill_sz: "0.01".to_string(),
2611            fill_time: 1746947317402,
2612            inst_id: Ustr::from("BTC-USDT-SWAP"),
2613            inst_type: OKXInstrumentType::Swap,
2614            lever: "2.0".to_string(),
2615            ord_id: Ustr::from("neg_inc_order_789"),
2616            ord_type: OKXOrderType::Market,
2617            pnl: "0".to_string(),
2618            pos_side: OKXPositionSide::Long,
2619            px: "".to_string(),
2620            reduce_only: "false".to_string(),
2621            side: OKXSide::Buy,
2622            state: OKXOrderStatus::PartiallyFilled,
2623            exec_type: OKXExecType::Taker,
2624            sz: "0.02".to_string(),
2625            td_mode: OKXTradeMode::Isolated,
2626            tgt_ccy: None,
2627            trade_id: "trade_neg_inc_1".to_string(),
2628            u_time: 1746947317402,
2629        };
2630
2631        let fill_report_1 = parse_fill_report(
2632            &order_msg_1,
2633            &InstrumentAny::CryptoPerpetual(instrument),
2634            account_id,
2635            None,
2636            None,
2637            ts_init,
2638        )
2639        .unwrap();
2640
2641        assert_eq!(fill_report_1.commission, Money::new(2.0, Currency::USDT()));
2642
2643        // Second fill: charge reduced to $1.5 total (refund or maker rebate on this fill)
2644        // Incremental = 1.5 - 2.0 = -0.5 (negative incremental triggers debug log)
2645        let order_msg_2 = OKXOrderMsg {
2646            acc_fill_sz: Some("0.02".to_string()),
2647            avg_px: "50000.0".to_string(),
2648            c_time: 1746947317401,
2649            cancel_source: None,
2650            cancel_source_reason: None,
2651            category: OKXOrderCategory::Normal,
2652            ccy: Ustr::from("USDT"),
2653            cl_ord_id: "test_order_neg_inc".to_string(),
2654            algo_cl_ord_id: None,
2655            fee: Some("-1.5".to_string()), // Total reduced
2656            fee_ccy: Ustr::from("USDT"),
2657            fill_px: "50000.0".to_string(),
2658            fill_sz: "0.01".to_string(),
2659            fill_time: 1746947317403,
2660            inst_id: Ustr::from("BTC-USDT-SWAP"),
2661            inst_type: OKXInstrumentType::Swap,
2662            lever: "2.0".to_string(),
2663            ord_id: Ustr::from("neg_inc_order_789"),
2664            ord_type: OKXOrderType::Market,
2665            pnl: "0".to_string(),
2666            pos_side: OKXPositionSide::Long,
2667            px: "".to_string(),
2668            reduce_only: "false".to_string(),
2669            side: OKXSide::Buy,
2670            state: OKXOrderStatus::Filled,
2671            exec_type: OKXExecType::Maker,
2672            sz: "0.02".to_string(),
2673            td_mode: OKXTradeMode::Isolated,
2674            tgt_ccy: None,
2675            trade_id: "trade_neg_inc_2".to_string(),
2676            u_time: 1746947317403,
2677        };
2678
2679        let fill_report_2 = parse_fill_report(
2680            &order_msg_2,
2681            &InstrumentAny::CryptoPerpetual(instrument),
2682            account_id,
2683            Some(fill_report_1.commission),
2684            Some(fill_report_1.last_qty),
2685            ts_init,
2686        )
2687        .unwrap();
2688
2689        // Incremental is negative: 1.5 - 2.0 = -0.5
2690        assert_eq!(fill_report_2.commission, Money::new(-0.5, Currency::USDT()));
2691    }
2692
2693    #[rstest]
2694    fn test_parse_fill_report_empty_fill_sz_first_fill() {
2695        let instrument = create_stub_instrument();
2696        let account_id = AccountId::new("OKX-001");
2697        let ts_init = UnixNanos::default();
2698
2699        let order_msg =
2700            create_stub_order_msg("", Some("0.01".to_string()), "1234567890", "trade_1");
2701
2702        let fill_report = parse_fill_report(
2703            &order_msg,
2704            &InstrumentAny::CryptoPerpetual(instrument),
2705            account_id,
2706            None,
2707            None,
2708            ts_init,
2709        )
2710        .unwrap();
2711
2712        assert_eq!(fill_report.last_qty, Quantity::from("0.01"));
2713    }
2714
2715    #[rstest]
2716    fn test_parse_fill_report_empty_fill_sz_subsequent_fills() {
2717        let instrument = create_stub_instrument();
2718        let account_id = AccountId::new("OKX-001");
2719        let ts_init = UnixNanos::default();
2720
2721        let order_msg_1 =
2722            create_stub_order_msg("", Some("0.01".to_string()), "1234567890", "trade_1");
2723
2724        let fill_report_1 = parse_fill_report(
2725            &order_msg_1,
2726            &InstrumentAny::CryptoPerpetual(instrument),
2727            account_id,
2728            None,
2729            None,
2730            ts_init,
2731        )
2732        .unwrap();
2733
2734        assert_eq!(fill_report_1.last_qty, Quantity::from("0.01"));
2735
2736        let order_msg_2 =
2737            create_stub_order_msg("", Some("0.03".to_string()), "1234567890", "trade_2");
2738
2739        let fill_report_2 = parse_fill_report(
2740            &order_msg_2,
2741            &InstrumentAny::CryptoPerpetual(instrument),
2742            account_id,
2743            Some(fill_report_1.commission),
2744            Some(fill_report_1.last_qty),
2745            ts_init,
2746        )
2747        .unwrap();
2748
2749        assert_eq!(fill_report_2.last_qty, Quantity::from("0.02"));
2750    }
2751
2752    #[rstest]
2753    fn test_parse_fill_report_error_both_empty() {
2754        let instrument = create_stub_instrument();
2755        let account_id = AccountId::new("OKX-001");
2756        let ts_init = UnixNanos::default();
2757
2758        let order_msg = create_stub_order_msg("", Some("".to_string()), "1234567890", "trade_1");
2759
2760        let result = parse_fill_report(
2761            &order_msg,
2762            &InstrumentAny::CryptoPerpetual(instrument),
2763            account_id,
2764            None,
2765            None,
2766            ts_init,
2767        );
2768
2769        assert!(result.is_err());
2770        let err_msg = result.unwrap_err().to_string();
2771        assert!(err_msg.contains("Cannot determine fill quantity"));
2772        assert!(err_msg.contains("empty/zero"));
2773    }
2774
2775    #[rstest]
2776    fn test_parse_fill_report_error_acc_fill_sz_none() {
2777        let instrument = create_stub_instrument();
2778        let account_id = AccountId::new("OKX-001");
2779        let ts_init = UnixNanos::default();
2780
2781        let order_msg = create_stub_order_msg("", None, "1234567890", "trade_1");
2782
2783        let result = parse_fill_report(
2784            &order_msg,
2785            &InstrumentAny::CryptoPerpetual(instrument),
2786            account_id,
2787            None,
2788            None,
2789            ts_init,
2790        );
2791
2792        assert!(result.is_err());
2793        let err_msg = result.unwrap_err().to_string();
2794        assert!(err_msg.contains("Cannot determine fill quantity"));
2795        assert!(err_msg.contains("acc_fill_sz is None"));
2796    }
2797
2798    #[rstest]
2799    fn test_parse_order_msg_acc_fill_sz_only_update() {
2800        // Test that we emit fill reports when OKX only updates acc_fill_sz without fill_sz or trade_id
2801        let instrument = create_stub_instrument();
2802        let account_id = AccountId::new("OKX-001");
2803        let ts_init = UnixNanos::default();
2804
2805        let mut instruments = AHashMap::new();
2806        instruments.insert(
2807            Ustr::from("BTC-USDT-SWAP"),
2808            InstrumentAny::CryptoPerpetual(instrument),
2809        );
2810
2811        let fee_cache = AHashMap::new();
2812        let mut filled_qty_cache = AHashMap::new();
2813
2814        // First update: acc_fill_sz = 0.01, no fill_sz, no trade_id
2815        let msg_1 = create_stub_order_msg("", Some("0.01".to_string()), "1234567890", "");
2816
2817        let report_1 = parse_order_msg(
2818            &msg_1,
2819            account_id,
2820            &instruments,
2821            &fee_cache,
2822            &filled_qty_cache,
2823            ts_init,
2824        )
2825        .unwrap();
2826
2827        // Should generate a fill report (not a status report)
2828        assert!(matches!(report_1, ExecutionReport::Fill(_)));
2829        if let ExecutionReport::Fill(fill) = &report_1 {
2830            assert_eq!(fill.last_qty, Quantity::from("0.01"));
2831        }
2832
2833        // Update cache
2834        filled_qty_cache.insert(Ustr::from("1234567890"), Quantity::from("0.01"));
2835
2836        // Second update: acc_fill_sz increased to 0.03, still no fill_sz or trade_id
2837        let msg_2 = create_stub_order_msg("", Some("0.03".to_string()), "1234567890", "");
2838
2839        let report_2 = parse_order_msg(
2840            &msg_2,
2841            account_id,
2842            &instruments,
2843            &fee_cache,
2844            &filled_qty_cache,
2845            ts_init,
2846        )
2847        .unwrap();
2848
2849        // Should still generate a fill report for the incremental 0.02
2850        assert!(matches!(report_2, ExecutionReport::Fill(_)));
2851        if let ExecutionReport::Fill(fill) = &report_2 {
2852            assert_eq!(fill.last_qty, Quantity::from("0.02"));
2853        }
2854    }
2855
2856    #[rstest]
2857    fn test_parse_book10_msg_partial_levels() {
2858        // Test with fewer than 10 levels - should pad with empty orders
2859        let book_msg = OKXBookMsg {
2860            asks: vec![
2861                OrderBookEntry {
2862                    price: "8476.98".to_string(),
2863                    size: "415".to_string(),
2864                    liquidated_orders_count: "0".to_string(),
2865                    orders_count: "13".to_string(),
2866                },
2867                OrderBookEntry {
2868                    price: "8477.00".to_string(),
2869                    size: "7".to_string(),
2870                    liquidated_orders_count: "0".to_string(),
2871                    orders_count: "2".to_string(),
2872                },
2873            ],
2874            bids: vec![OrderBookEntry {
2875                price: "8476.97".to_string(),
2876                size: "256".to_string(),
2877                liquidated_orders_count: "0".to_string(),
2878                orders_count: "12".to_string(),
2879            }],
2880            ts: 1597026383085,
2881            checksum: None,
2882            prev_seq_id: None,
2883            seq_id: 123456,
2884        };
2885
2886        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2887        let depth10 =
2888            parse_book10_msg(&book_msg, instrument_id, 2, 0, UnixNanos::default()).unwrap();
2889
2890        // Check that first levels have data
2891        assert_eq!(depth10.bids[0].price, Price::from("8476.97"));
2892        assert_eq!(depth10.bids[0].size, Quantity::from("256"));
2893        assert_eq!(depth10.bid_counts[0], 12);
2894
2895        // Check that remaining levels are padded with default (empty) orders
2896        assert_eq!(depth10.bids[1].price, Price::from("0"));
2897        assert_eq!(depth10.bids[1].size, Quantity::from("0"));
2898        assert_eq!(depth10.bid_counts[1], 0);
2899
2900        // Check asks
2901        assert_eq!(depth10.asks[0].price, Price::from("8476.98"));
2902        assert_eq!(depth10.asks[1].price, Price::from("8477.00"));
2903        assert_eq!(depth10.asks[2].price, Price::from("0")); // padded with empty
2904    }
2905
2906    #[rstest]
2907    fn test_parse_algo_order_msg_stop_market() {
2908        let json_data = load_test_json("ws_orders_algo.json");
2909        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
2910        let data: Vec<OKXAlgoOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
2911
2912        // Test first algo order (stop market sell)
2913        let msg = &data[0];
2914        assert_eq!(msg.algo_id, "706620792746729472");
2915        assert_eq!(msg.algo_cl_ord_id, "STOP001BTCUSDT20250120");
2916        assert_eq!(msg.state, OKXOrderStatus::Live);
2917        assert_eq!(msg.ord_px, "-1"); // Market order indicator
2918
2919        let account_id = AccountId::new("OKX-001");
2920        let mut instruments = AHashMap::new();
2921
2922        // Create mock instrument
2923        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2924        let instrument = CryptoPerpetual::new(
2925            instrument_id,
2926            Symbol::from("BTC-USDT-SWAP"),
2927            Currency::BTC(),
2928            Currency::USDT(),
2929            Currency::USDT(),
2930            false, // is_inverse
2931            2,     // price_precision
2932            8,     // size_precision
2933            Price::from("0.01"),
2934            Quantity::from("0.00000001"),
2935            None,
2936            None,
2937            None,
2938            None,
2939            None,
2940            None,
2941            None,
2942            None,
2943            None,
2944            None,
2945            None,
2946            None,
2947            0.into(), // ts_event
2948            0.into(), // ts_init
2949        );
2950        instruments.insert(
2951            Ustr::from("BTC-USDT-SWAP"),
2952            InstrumentAny::CryptoPerpetual(instrument),
2953        );
2954
2955        let result =
2956            parse_algo_order_msg(msg.clone(), account_id, &instruments, UnixNanos::default());
2957
2958        assert!(result.is_ok());
2959        let report = result.unwrap();
2960
2961        if let ExecutionReport::Order(status_report) = report {
2962            assert_eq!(status_report.order_type, OrderType::StopMarket);
2963            assert_eq!(status_report.order_side, OrderSide::Sell);
2964            assert_eq!(status_report.quantity, Quantity::from("0.01000000"));
2965            assert_eq!(status_report.trigger_price, Some(Price::from("95000.00")));
2966            assert_eq!(status_report.trigger_type, Some(TriggerType::LastPrice));
2967            assert_eq!(status_report.price, None); // No limit price for market orders
2968        } else {
2969            panic!("Expected Order report");
2970        }
2971    }
2972
2973    #[rstest]
2974    fn test_parse_algo_order_msg_stop_limit() {
2975        let json_data = load_test_json("ws_orders_algo.json");
2976        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
2977        let data: Vec<OKXAlgoOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
2978
2979        // Test second algo order (stop limit buy)
2980        let msg = &data[1];
2981        assert_eq!(msg.algo_id, "706620792746729473");
2982        assert_eq!(msg.state, OKXOrderStatus::Live);
2983        assert_eq!(msg.ord_px, "106000"); // Limit price
2984
2985        let account_id = AccountId::new("OKX-001");
2986        let mut instruments = AHashMap::new();
2987
2988        // Create mock instrument
2989        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2990        let instrument = CryptoPerpetual::new(
2991            instrument_id,
2992            Symbol::from("BTC-USDT-SWAP"),
2993            Currency::BTC(),
2994            Currency::USDT(),
2995            Currency::USDT(),
2996            false, // is_inverse
2997            2,     // price_precision
2998            8,     // size_precision
2999            Price::from("0.01"),
3000            Quantity::from("0.00000001"),
3001            None,
3002            None,
3003            None,
3004            None,
3005            None,
3006            None,
3007            None,
3008            None,
3009            None,
3010            None,
3011            None,
3012            None,
3013            0.into(), // ts_event
3014            0.into(), // ts_init
3015        );
3016        instruments.insert(
3017            Ustr::from("BTC-USDT-SWAP"),
3018            InstrumentAny::CryptoPerpetual(instrument),
3019        );
3020
3021        let result =
3022            parse_algo_order_msg(msg.clone(), account_id, &instruments, UnixNanos::default());
3023
3024        assert!(result.is_ok());
3025        let report = result.unwrap();
3026
3027        if let ExecutionReport::Order(status_report) = report {
3028            assert_eq!(status_report.order_type, OrderType::StopLimit);
3029            assert_eq!(status_report.order_side, OrderSide::Buy);
3030            assert_eq!(status_report.quantity, Quantity::from("0.02000000"));
3031            assert_eq!(status_report.trigger_price, Some(Price::from("105000.00")));
3032            assert_eq!(status_report.trigger_type, Some(TriggerType::MarkPrice));
3033            assert_eq!(status_report.price, Some(Price::from("106000.00"))); // Has limit price
3034        } else {
3035            panic!("Expected Order report");
3036        }
3037    }
3038
3039    #[rstest]
3040    fn test_parse_trigger_order_from_regular_channel() {
3041        let json_data = load_test_json("ws_orders_trigger.json");
3042        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3043        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
3044
3045        // Test triggered order that came through regular orders channel
3046        let msg = &data[0];
3047        assert_eq!(msg.ord_type, OKXOrderType::Trigger);
3048        assert_eq!(msg.state, OKXOrderStatus::Filled);
3049
3050        let account_id = AccountId::new("OKX-001");
3051        let mut instruments = AHashMap::new();
3052
3053        // Create mock instrument
3054        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
3055        let instrument = CryptoPerpetual::new(
3056            instrument_id,
3057            Symbol::from("BTC-USDT-SWAP"),
3058            Currency::BTC(),
3059            Currency::USDT(),
3060            Currency::USDT(),
3061            false, // is_inverse
3062            2,     // price_precision
3063            8,     // size_precision
3064            Price::from("0.01"),
3065            Quantity::from("0.00000001"),
3066            None,
3067            None,
3068            None,
3069            None,
3070            None,
3071            None,
3072            None,
3073            None,
3074            None,
3075            None,
3076            None,
3077            None,
3078            0.into(), // ts_event
3079            0.into(), // ts_init
3080        );
3081        instruments.insert(
3082            Ustr::from("BTC-USDT-SWAP"),
3083            InstrumentAny::CryptoPerpetual(instrument),
3084        );
3085        let fee_cache = AHashMap::new();
3086        let filled_qty_cache = AHashMap::new();
3087
3088        let result = parse_order_msg_vec(
3089            vec![msg.clone()],
3090            account_id,
3091            &instruments,
3092            &fee_cache,
3093            &filled_qty_cache,
3094            UnixNanos::default(),
3095        );
3096
3097        assert!(result.is_ok());
3098        let reports = result.unwrap();
3099        assert_eq!(reports.len(), 1);
3100
3101        if let ExecutionReport::Fill(fill_report) = &reports[0] {
3102            assert_eq!(fill_report.order_side, OrderSide::Sell);
3103            assert_eq!(fill_report.last_qty, Quantity::from("0.01000000"));
3104            assert_eq!(fill_report.last_px, Price::from("101950.00"));
3105        } else {
3106            panic!("Expected Fill report for filled trigger order");
3107        }
3108    }
3109
3110    #[rstest]
3111    fn test_parse_liquidation_order() {
3112        let json_data = load_test_json("ws_orders_liquidation.json");
3113        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3114        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
3115
3116        // Test liquidation order
3117        let msg = &data[0];
3118        assert_eq!(msg.category, OKXOrderCategory::FullLiquidation);
3119        assert_eq!(msg.state, OKXOrderStatus::Filled);
3120        assert_eq!(msg.inst_id.as_str(), "BTC-USDT-SWAP");
3121
3122        let account_id = AccountId::new("OKX-001");
3123        let mut instruments = AHashMap::new();
3124
3125        // Create mock instrument
3126        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
3127        let instrument = CryptoPerpetual::new(
3128            instrument_id,
3129            Symbol::from("BTC-USDT-SWAP"),
3130            Currency::BTC(),
3131            Currency::USDT(),
3132            Currency::USDT(),
3133            false, // is_inverse
3134            2,     // price_precision
3135            8,     // size_precision
3136            Price::from("0.01"),
3137            Quantity::from("0.00000001"),
3138            None,
3139            None,
3140            None,
3141            None,
3142            None,
3143            None,
3144            None,
3145            None,
3146            None,
3147            None,
3148            None,
3149            None,
3150            0.into(), // ts_event
3151            0.into(), // ts_init
3152        );
3153        instruments.insert(
3154            Ustr::from("BTC-USDT-SWAP"),
3155            InstrumentAny::CryptoPerpetual(instrument),
3156        );
3157        let fee_cache = AHashMap::new();
3158        let filled_qty_cache = AHashMap::new();
3159
3160        let result = parse_order_msg_vec(
3161            vec![msg.clone()],
3162            account_id,
3163            &instruments,
3164            &fee_cache,
3165            &filled_qty_cache,
3166            UnixNanos::default(),
3167        );
3168
3169        assert!(result.is_ok());
3170        let reports = result.unwrap();
3171        assert_eq!(reports.len(), 1);
3172
3173        // Verify it's a fill report for a liquidation
3174        if let ExecutionReport::Fill(fill_report) = &reports[0] {
3175            assert_eq!(fill_report.order_side, OrderSide::Sell);
3176            assert_eq!(fill_report.last_qty, Quantity::from("0.50000000"));
3177            assert_eq!(fill_report.last_px, Price::from("40000.00"));
3178            assert_eq!(fill_report.liquidity_side, LiquiditySide::Taker);
3179        } else {
3180            panic!("Expected Fill report for liquidation order");
3181        }
3182    }
3183
3184    #[rstest]
3185    fn test_parse_adl_order() {
3186        let json_data = load_test_json("ws_orders_adl.json");
3187        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3188        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
3189
3190        // Test ADL order
3191        let msg = &data[0];
3192        assert_eq!(msg.category, OKXOrderCategory::Adl);
3193        assert_eq!(msg.state, OKXOrderStatus::Filled);
3194        assert_eq!(msg.inst_id.as_str(), "ETH-USDT-SWAP");
3195
3196        let account_id = AccountId::new("OKX-001");
3197        let mut instruments = AHashMap::new();
3198
3199        // Create mock instrument
3200        let instrument_id = InstrumentId::from("ETH-USDT-SWAP.OKX");
3201        let instrument = CryptoPerpetual::new(
3202            instrument_id,
3203            Symbol::from("ETH-USDT-SWAP"),
3204            Currency::ETH(),
3205            Currency::USDT(),
3206            Currency::USDT(),
3207            false, // is_inverse
3208            2,     // price_precision
3209            8,     // size_precision
3210            Price::from("0.01"),
3211            Quantity::from("0.00000001"),
3212            None,
3213            None,
3214            None,
3215            None,
3216            None,
3217            None,
3218            None,
3219            None,
3220            None,
3221            None,
3222            None,
3223            None,
3224            0.into(), // ts_event
3225            0.into(), // ts_init
3226        );
3227        instruments.insert(
3228            Ustr::from("ETH-USDT-SWAP"),
3229            InstrumentAny::CryptoPerpetual(instrument),
3230        );
3231        let fee_cache = AHashMap::new();
3232        let filled_qty_cache = AHashMap::new();
3233
3234        let result = parse_order_msg_vec(
3235            vec![msg.clone()],
3236            account_id,
3237            &instruments,
3238            &fee_cache,
3239            &filled_qty_cache,
3240            UnixNanos::default(),
3241        );
3242
3243        assert!(result.is_ok());
3244        let reports = result.unwrap();
3245        assert_eq!(reports.len(), 1);
3246
3247        // Verify it's a fill report for ADL
3248        if let ExecutionReport::Fill(fill_report) = &reports[0] {
3249            assert_eq!(fill_report.order_side, OrderSide::Buy);
3250            assert_eq!(fill_report.last_qty, Quantity::from("0.30000000"));
3251            assert_eq!(fill_report.last_px, Price::from("41000.00"));
3252            assert_eq!(fill_report.liquidity_side, LiquiditySide::Taker);
3253        } else {
3254            panic!("Expected Fill report for ADL order");
3255        }
3256    }
3257
3258    #[rstest]
3259    fn test_parse_unknown_category_graceful_fallback() {
3260        // Test that unknown/future category values deserialize as Other instead of failing
3261        let json_with_unknown_category = r#"{
3262            "category": "some_future_category_we_dont_know"
3263        }"#;
3264
3265        let result: Result<serde_json::Value, _> = serde_json::from_str(json_with_unknown_category);
3266        assert!(result.is_ok());
3267
3268        // Test deserialization of the category field directly
3269        let category_result: Result<OKXOrderCategory, _> =
3270            serde_json::from_str(r#""some_future_category""#);
3271        assert!(category_result.is_ok());
3272        assert_eq!(category_result.unwrap(), OKXOrderCategory::Other);
3273
3274        // Verify known categories still work
3275        let normal: OKXOrderCategory = serde_json::from_str(r#""normal""#).unwrap();
3276        assert_eq!(normal, OKXOrderCategory::Normal);
3277
3278        let twap: OKXOrderCategory = serde_json::from_str(r#""twap""#).unwrap();
3279        assert_eq!(twap, OKXOrderCategory::Twap);
3280    }
3281
3282    #[rstest]
3283    fn test_parse_partial_liquidation_order() {
3284        // Create a test message with partial liquidation category
3285        let account_id = AccountId::new("OKX-001");
3286        let mut instruments = AHashMap::new();
3287
3288        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
3289        let instrument = CryptoPerpetual::new(
3290            instrument_id,
3291            Symbol::from("BTC-USDT-SWAP"),
3292            Currency::BTC(),
3293            Currency::USDT(),
3294            Currency::USDT(),
3295            false,
3296            2,
3297            8,
3298            Price::from("0.01"),
3299            Quantity::from("0.00000001"),
3300            None,
3301            None,
3302            None,
3303            None,
3304            None,
3305            None,
3306            None,
3307            None,
3308            None,
3309            None,
3310            None,
3311            None,
3312            0.into(),
3313            0.into(),
3314        );
3315        instruments.insert(
3316            Ustr::from("BTC-USDT-SWAP"),
3317            InstrumentAny::CryptoPerpetual(instrument),
3318        );
3319
3320        let partial_liq_msg = OKXOrderMsg {
3321            acc_fill_sz: Some("0.25".to_string()),
3322            avg_px: "39000.0".to_string(),
3323            c_time: 1746947317401,
3324            cancel_source: None,
3325            cancel_source_reason: None,
3326            category: OKXOrderCategory::PartialLiquidation,
3327            ccy: Ustr::from("USDT"),
3328            cl_ord_id: "".to_string(),
3329            algo_cl_ord_id: None,
3330            fee: Some("-9.75".to_string()),
3331            fee_ccy: Ustr::from("USDT"),
3332            fill_px: "39000.0".to_string(),
3333            fill_sz: "0.25".to_string(),
3334            fill_time: 1746947317402,
3335            inst_id: Ustr::from("BTC-USDT-SWAP"),
3336            inst_type: OKXInstrumentType::Swap,
3337            lever: "10.0".to_string(),
3338            ord_id: Ustr::from("2497956918703120888"),
3339            ord_type: OKXOrderType::Market,
3340            pnl: "-2500".to_string(),
3341            pos_side: OKXPositionSide::Long,
3342            px: "".to_string(),
3343            reduce_only: "false".to_string(),
3344            side: OKXSide::Sell,
3345            state: OKXOrderStatus::Filled,
3346            exec_type: OKXExecType::Taker,
3347            sz: "0.25".to_string(),
3348            td_mode: OKXTradeMode::Isolated,
3349            tgt_ccy: None,
3350            trade_id: "1518905888".to_string(),
3351            u_time: 1746947317402,
3352        };
3353
3354        let fee_cache = AHashMap::new();
3355        let filled_qty_cache = AHashMap::new();
3356        let result = parse_order_msg(
3357            &partial_liq_msg,
3358            account_id,
3359            &instruments,
3360            &fee_cache,
3361            &filled_qty_cache,
3362            UnixNanos::default(),
3363        );
3364
3365        assert!(result.is_ok());
3366        let report = result.unwrap();
3367
3368        // Verify it's a fill report for partial liquidation
3369        if let ExecutionReport::Fill(fill_report) = report {
3370            assert_eq!(fill_report.order_side, OrderSide::Sell);
3371            assert_eq!(fill_report.last_qty, Quantity::from("0.25000000"));
3372            assert_eq!(fill_report.last_px, Price::from("39000.00"));
3373        } else {
3374            panic!("Expected Fill report for partial liquidation order");
3375        }
3376    }
3377
3378    #[rstest]
3379    fn test_websocket_instrument_update_preserves_cached_fees() {
3380        use nautilus_model::{identifiers::InstrumentId, instruments::InstrumentAny};
3381
3382        use crate::common::{models::OKXInstrument, parse::parse_instrument_any};
3383
3384        let ts_init = UnixNanos::default();
3385
3386        // Create initial instrument with fees (simulating HTTP load)
3387        let initial_fees = (
3388            Some(Decimal::new(8, 4)),  // maker_fee = 0.0008
3389            Some(Decimal::new(10, 4)), // taker_fee = 0.0010
3390        );
3391
3392        // Deserialize initial instrument from JSON
3393        let initial_inst_json = serde_json::json!({
3394            "instType": "SPOT",
3395            "instId": "BTC-USD",
3396            "baseCcy": "BTC",
3397            "quoteCcy": "USD",
3398            "settleCcy": "",
3399            "ctVal": "",
3400            "ctMult": "",
3401            "ctValCcy": "",
3402            "optType": "",
3403            "stk": "",
3404            "listTime": "1733454000000",
3405            "expTime": "",
3406            "lever": "",
3407            "tickSz": "0.1",
3408            "lotSz": "0.00000001",
3409            "minSz": "0.00001",
3410            "ctType": "linear",
3411            "alias": "",
3412            "state": "live",
3413            "maxLmtSz": "9999999999",
3414            "maxMktSz": "1000000",
3415            "maxTwapSz": "9999999999.0000000000000000",
3416            "maxIcebergSz": "9999999999.0000000000000000",
3417            "maxTriggerSz": "9999999999.0000000000000000",
3418            "maxStopSz": "1000000",
3419            "uly": "",
3420            "instFamily": "",
3421            "ruleType": "normal",
3422            "maxLmtAmt": "20000000",
3423            "maxMktAmt": "1000000"
3424        });
3425
3426        let initial_inst: OKXInstrument = serde_json::from_value(initial_inst_json)
3427            .expect("Failed to deserialize initial instrument");
3428
3429        // Parse initial instrument with fees
3430        let parsed_initial = parse_instrument_any(
3431            &initial_inst,
3432            None,
3433            None,
3434            initial_fees.0,
3435            initial_fees.1,
3436            ts_init,
3437        )
3438        .expect("Failed to parse initial instrument")
3439        .expect("Initial instrument should not be None");
3440
3441        // Verify fees were applied
3442        if let InstrumentAny::CurrencyPair(ref pair) = parsed_initial {
3443            assert_eq!(pair.maker_fee, dec!(0.0008));
3444            assert_eq!(pair.taker_fee, dec!(0.0010));
3445        } else {
3446            panic!("Expected CurrencyPair instrument");
3447        }
3448
3449        // Build instrument cache with the initial instrument
3450        let mut instruments_cache = AHashMap::new();
3451        instruments_cache.insert(Ustr::from("BTC-USD"), parsed_initial);
3452
3453        // Create WebSocket update message (same structure as initial, simulating a WebSocket update)
3454        let ws_update = serde_json::json!({
3455            "instType": "SPOT",
3456            "instId": "BTC-USD",
3457            "baseCcy": "BTC",
3458            "quoteCcy": "USD",
3459            "settleCcy": "",
3460            "ctVal": "",
3461            "ctMult": "",
3462            "ctValCcy": "",
3463            "optType": "",
3464            "stk": "",
3465            "listTime": "1733454000000",
3466            "expTime": "",
3467            "lever": "",
3468            "tickSz": "0.1",
3469            "lotSz": "0.00000001",
3470            "minSz": "0.00001",
3471            "ctType": "linear",
3472            "alias": "",
3473            "state": "live",
3474            "maxLmtSz": "9999999999",
3475            "maxMktSz": "1000000",
3476            "maxTwapSz": "9999999999.0000000000000000",
3477            "maxIcebergSz": "9999999999.0000000000000000",
3478            "maxTriggerSz": "9999999999.0000000000000000",
3479            "maxStopSz": "1000000",
3480            "uly": "",
3481            "instFamily": "",
3482            "ruleType": "normal",
3483            "maxLmtAmt": "20000000",
3484            "maxMktAmt": "1000000"
3485        });
3486
3487        let instrument_id = InstrumentId::from("BTC-USD.OKX");
3488        let mut funding_cache = AHashMap::new();
3489
3490        // Parse WebSocket update with cache
3491        let result = parse_ws_message_data(
3492            &OKXWsChannel::Instruments,
3493            ws_update,
3494            &instrument_id,
3495            2,
3496            8,
3497            ts_init,
3498            &mut funding_cache,
3499            &instruments_cache,
3500        )
3501        .expect("Failed to parse WebSocket instrument update");
3502
3503        // Verify the update preserves the cached fees
3504        if let Some(NautilusWsMessage::Instrument(boxed_inst)) = result {
3505            if let InstrumentAny::CurrencyPair(pair) = *boxed_inst {
3506                assert_eq!(
3507                    pair.maker_fee,
3508                    Decimal::new(8, 4),
3509                    "Maker fee should be preserved from cache"
3510                );
3511                assert_eq!(
3512                    pair.taker_fee,
3513                    Decimal::new(10, 4),
3514                    "Taker fee should be preserved from cache"
3515                );
3516            } else {
3517                panic!("Expected CurrencyPair instrument from WebSocket update");
3518            }
3519        } else {
3520            panic!("Expected Instrument message from WebSocket update");
3521        }
3522    }
3523
3524    #[rstest]
3525    #[case::fok_order(OKXOrderType::Fok, TimeInForce::Fok)]
3526    #[case::ioc_order(OKXOrderType::Ioc, TimeInForce::Ioc)]
3527    #[case::optimal_limit_ioc_order(OKXOrderType::OptimalLimitIoc, TimeInForce::Ioc)]
3528    #[case::market_order(OKXOrderType::Market, TimeInForce::Gtc)]
3529    #[case::limit_order(OKXOrderType::Limit, TimeInForce::Gtc)]
3530    fn test_parse_time_in_force_from_ord_type(
3531        #[case] okx_ord_type: OKXOrderType,
3532        #[case] expected_tif: TimeInForce,
3533    ) {
3534        let time_in_force = match okx_ord_type {
3535            OKXOrderType::Fok => TimeInForce::Fok,
3536            OKXOrderType::Ioc | OKXOrderType::OptimalLimitIoc => TimeInForce::Ioc,
3537            _ => TimeInForce::Gtc,
3538        };
3539
3540        assert_eq!(
3541            time_in_force, expected_tif,
3542            "OKXOrderType::{:?} should parse to TimeInForce::{:?}",
3543            okx_ord_type, expected_tif
3544        );
3545    }
3546
3547    #[rstest]
3548    fn test_deserialize_fok_order_message() {
3549        let json_data = load_test_json("ws_orders_fok.json");
3550        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3551        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
3552
3553        assert!(!data.is_empty());
3554        assert_eq!(data[0].ord_type, OKXOrderType::Fok);
3555        assert_eq!(data[0].cl_ord_id, "FOK-TEST-001");
3556        assert_eq!(data[0].inst_id, Ustr::from("BTC-USDT"));
3557    }
3558
3559    #[rstest]
3560    fn test_deserialize_ioc_order_message() {
3561        let json_data = load_test_json("ws_orders_ioc.json");
3562        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3563        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
3564
3565        assert!(!data.is_empty());
3566        assert_eq!(data[0].ord_type, OKXOrderType::Ioc);
3567        assert_eq!(data[0].cl_ord_id, "IOC-TEST-001");
3568        assert_eq!(data[0].inst_id, Ustr::from("BTC-USDT"));
3569    }
3570
3571    #[rstest]
3572    fn test_deserialize_optimal_limit_ioc_order_message() {
3573        let json_data = load_test_json("ws_orders_optimal_limit_ioc.json");
3574        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3575        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
3576
3577        assert!(!data.is_empty());
3578        assert_eq!(data[0].ord_type, OKXOrderType::OptimalLimitIoc);
3579        assert_eq!(data[0].cl_ord_id, "OPTIMAL-IOC-TEST-001");
3580        assert_eq!(data[0].inst_id, Ustr::from("BTC-USDT-SWAP"));
3581    }
3582
3583    #[rstest]
3584    fn test_deserialize_regular_order_message() {
3585        let json_data = load_test_json("ws_orders.json");
3586        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3587        let data: Vec<OKXOrderMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
3588
3589        assert!(!data.is_empty());
3590        assert_eq!(data[0].inst_id, Ustr::from("BTC-USDT-SWAP"));
3591        assert_eq!(data[0].state, OKXOrderStatus::Filled);
3592        assert_eq!(data[0].category, OKXOrderCategory::Normal);
3593    }
3594
3595    #[rstest]
3596    fn test_deserialize_algo_order_message() {
3597        let json_data = load_test_json("ws_orders_algo.json");
3598        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3599        let data: Vec<OKXAlgoOrderMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
3600
3601        assert!(!data.is_empty());
3602        assert_eq!(data[0].inst_id, Ustr::from("BTC-USDT-SWAP"));
3603    }
3604
3605    #[rstest]
3606    fn test_deserialize_liquidation_order_message() {
3607        let json_data = load_test_json("ws_orders_liquidation.json");
3608        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3609        let data: Vec<OKXOrderMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
3610
3611        assert!(!data.is_empty());
3612        assert_eq!(data[0].category, OKXOrderCategory::FullLiquidation);
3613    }
3614
3615    #[rstest]
3616    fn test_deserialize_adl_order_message() {
3617        let json_data = load_test_json("ws_orders_adl.json");
3618        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3619        let data: Vec<OKXOrderMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
3620
3621        assert!(!data.is_empty());
3622        assert_eq!(data[0].category, OKXOrderCategory::Adl);
3623    }
3624
3625    #[rstest]
3626    fn test_deserialize_trigger_order_message() {
3627        let json_data = load_test_json("ws_orders_trigger.json");
3628        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3629        let data: Vec<OKXOrderMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
3630
3631        assert!(!data.is_empty());
3632        assert_eq!(data[0].ord_type, OKXOrderType::Trigger);
3633        assert_eq!(data[0].category, OKXOrderCategory::Normal);
3634    }
3635
3636    #[rstest]
3637    fn test_deserialize_book_snapshot_message() {
3638        let json_data = load_test_json("ws_books_snapshot.json");
3639        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3640        let action: Option<OKXBookAction> =
3641            serde_json::from_value(payload["action"].clone()).unwrap();
3642        let data: Vec<OKXBookMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
3643
3644        assert!(!data.is_empty());
3645        assert_eq!(action, Some(OKXBookAction::Snapshot));
3646        assert!(!data[0].asks.is_empty());
3647        assert!(!data[0].bids.is_empty());
3648    }
3649
3650    #[rstest]
3651    fn test_deserialize_book_update_message() {
3652        let json_data = load_test_json("ws_books_update.json");
3653        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3654        let action: Option<OKXBookAction> =
3655            serde_json::from_value(payload["action"].clone()).unwrap();
3656        let data: Vec<OKXBookMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
3657
3658        assert!(!data.is_empty());
3659        assert_eq!(action, Some(OKXBookAction::Update));
3660        assert!(!data[0].asks.is_empty());
3661        assert!(!data[0].bids.is_empty());
3662    }
3663
3664    #[rstest]
3665    fn test_deserialize_ticker_message() {
3666        let json_data = load_test_json("ws_tickers.json");
3667        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3668        let data: Vec<OKXTickerMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
3669
3670        assert!(!data.is_empty());
3671        assert_eq!(data[0].inst_id, Ustr::from("BTC-USDT"));
3672        assert_eq!(data[0].last_px, "9999.99");
3673    }
3674
3675    #[rstest]
3676    fn test_deserialize_candle_message() {
3677        let json_data = load_test_json("ws_candle.json");
3678        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3679        let data: Vec<OKXCandleMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
3680
3681        assert!(!data.is_empty());
3682        assert!(!data[0].o.is_empty());
3683        assert!(!data[0].h.is_empty());
3684        assert!(!data[0].l.is_empty());
3685        assert!(!data[0].c.is_empty());
3686    }
3687
3688    #[rstest]
3689    fn test_deserialize_funding_rate_message() {
3690        let json_data = load_test_json("ws_funding_rate.json");
3691        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3692        let data: Vec<OKXFundingRateMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
3693
3694        assert!(!data.is_empty());
3695        assert_eq!(data[0].inst_id, Ustr::from("BTC-USDT-SWAP"));
3696    }
3697
3698    #[rstest]
3699    fn test_deserialize_bbo_tbt_message() {
3700        let json_data = load_test_json("ws_bbo_tbt.json");
3701        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3702        let data: Vec<OKXBookMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
3703
3704        assert!(!data.is_empty());
3705        assert!(!data[0].asks.is_empty());
3706        assert!(!data[0].bids.is_empty());
3707    }
3708
3709    #[rstest]
3710    fn test_deserialize_trade_message() {
3711        let json_data = load_test_json("ws_trades.json");
3712        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3713        let data: Vec<OKXTradeMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
3714
3715        assert!(!data.is_empty());
3716        assert_eq!(data[0].inst_id, Ustr::from("BTC-USD"));
3717    }
3718}