nautilus_okx/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Functions translating raw OKX WebSocket frames into Nautilus data types.
17
18use ahash::AHashMap;
19use nautilus_core::{UUID4, nanos::UnixNanos};
20use nautilus_model::{
21    data::{
22        Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
23        MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API, OrderBookDepth10,
24        QuoteTick, TradeTick, depth::DEPTH10_LEN,
25    },
26    enums::{
27        AggregationSource, AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus,
28        OrderType, RecordFlag, TimeInForce, TriggerType,
29    },
30    identifiers::{AccountId, InstrumentId, TradeId, VenueOrderId},
31    instruments::{Instrument, InstrumentAny},
32    reports::{FillReport, OrderStatusReport},
33    types::{Money, Price, Quantity},
34};
35use rust_decimal::Decimal;
36use ustr::Ustr;
37
38use super::{
39    enums::OKXWsChannel,
40    messages::{
41        OKXAlgoOrderMsg, OKXBookMsg, OKXCandleMsg, OKXIndexPriceMsg, OKXMarkPriceMsg, OKXOrderMsg,
42        OKXTickerMsg, OKXTradeMsg, OrderBookEntry,
43    },
44};
45use crate::{
46    common::{
47        consts::{OKX_POST_ONLY_CANCEL_REASON, OKX_POST_ONLY_CANCEL_SOURCE},
48        enums::{
49            OKXBookAction, OKXCandleConfirm, OKXInstrumentType, OKXOrderCategory, OKXOrderStatus,
50            OKXOrderType, OKXSide, OKXTargetCurrency, OKXTriggerType,
51        },
52        models::OKXInstrument,
53        parse::{
54            okx_channel_to_bar_spec, parse_client_order_id, parse_fee, parse_fee_currency,
55            parse_funding_rate_msg, parse_instrument_any, parse_message_vec,
56            parse_millisecond_timestamp, parse_price, parse_quantity,
57        },
58    },
59    websocket::messages::{ExecutionReport, NautilusWsMessage, OKXFundingRateMsg},
60};
61
62/// Checks if a price string indicates market execution.
63///
64/// OKX uses special sentinel values for market price:
65/// - "" (empty string)
66/// - "0"
67/// - "-1" (market price)
68/// - "-2" (market price with protection)
69fn is_market_price(px: &str) -> bool {
70    px.is_empty() || px == "0" || px == "-1" || px == "-2"
71}
72
73/// Extracts fee rates from a cached instrument.
74///
75/// Returns a tuple of (margin_init, margin_maint, maker_fee, taker_fee).
76/// All values are None if the instrument type doesn't support fees.
77fn extract_fees_from_cached_instrument(
78    instrument: &InstrumentAny,
79) -> (
80    Option<Decimal>,
81    Option<Decimal>,
82    Option<Decimal>,
83    Option<Decimal>,
84) {
85    match instrument {
86        InstrumentAny::CurrencyPair(pair) => (
87            Some(pair.margin_init),
88            Some(pair.margin_maint),
89            Some(pair.maker_fee),
90            Some(pair.taker_fee),
91        ),
92        InstrumentAny::CryptoPerpetual(perp) => (
93            Some(perp.margin_init),
94            Some(perp.margin_maint),
95            Some(perp.maker_fee),
96            Some(perp.taker_fee),
97        ),
98        InstrumentAny::CryptoFuture(future) => (
99            Some(future.margin_init),
100            Some(future.margin_maint),
101            Some(future.maker_fee),
102            Some(future.taker_fee),
103        ),
104        InstrumentAny::CryptoOption(option) => (
105            Some(option.margin_init),
106            Some(option.margin_maint),
107            Some(option.maker_fee),
108            Some(option.taker_fee),
109        ),
110        _ => (None, None, None, None),
111    }
112}
113
114/// Parses vector of OKX book messages into Nautilus order book deltas.
115///
116/// # Errors
117///
118/// Returns an error if any underlying book message cannot be parsed.
119pub fn parse_book_msg_vec(
120    data: Vec<OKXBookMsg>,
121    instrument_id: &InstrumentId,
122    price_precision: u8,
123    size_precision: u8,
124    action: OKXBookAction,
125    ts_init: UnixNanos,
126) -> anyhow::Result<Vec<Data>> {
127    let mut deltas = Vec::with_capacity(data.len());
128
129    for msg in data {
130        let deltas_api = OrderBookDeltas_API::new(parse_book_msg(
131            &msg,
132            *instrument_id,
133            price_precision,
134            size_precision,
135            &action,
136            ts_init,
137        )?);
138        deltas.push(Data::Deltas(deltas_api));
139    }
140
141    Ok(deltas)
142}
143
144/// Parses vector of OKX ticker messages into Nautilus quote ticks.
145///
146/// # Errors
147///
148/// Returns an error if any ticker message fails to parse.
149pub fn parse_ticker_msg_vec(
150    data: serde_json::Value,
151    instrument_id: &InstrumentId,
152    price_precision: u8,
153    size_precision: u8,
154    ts_init: UnixNanos,
155) -> anyhow::Result<Vec<Data>> {
156    parse_message_vec(
157        data,
158        |msg| {
159            parse_ticker_msg(
160                msg,
161                *instrument_id,
162                price_precision,
163                size_precision,
164                ts_init,
165            )
166        },
167        Data::Quote,
168    )
169}
170
171/// Parses vector of OKX book messages into Nautilus quote ticks.
172///
173/// # Errors
174///
175/// Returns an error if any quote message fails to parse.
176pub fn parse_quote_msg_vec(
177    data: serde_json::Value,
178    instrument_id: &InstrumentId,
179    price_precision: u8,
180    size_precision: u8,
181    ts_init: UnixNanos,
182) -> anyhow::Result<Vec<Data>> {
183    parse_message_vec(
184        data,
185        |msg| {
186            parse_quote_msg(
187                msg,
188                *instrument_id,
189                price_precision,
190                size_precision,
191                ts_init,
192            )
193        },
194        Data::Quote,
195    )
196}
197
198/// Parses vector of OKX trade messages into Nautilus trade ticks.
199///
200/// # Errors
201///
202/// Returns an error if any trade message fails to parse.
203pub fn parse_trade_msg_vec(
204    data: serde_json::Value,
205    instrument_id: &InstrumentId,
206    price_precision: u8,
207    size_precision: u8,
208    ts_init: UnixNanos,
209) -> anyhow::Result<Vec<Data>> {
210    parse_message_vec(
211        data,
212        |msg| {
213            parse_trade_msg(
214                msg,
215                *instrument_id,
216                price_precision,
217                size_precision,
218                ts_init,
219            )
220        },
221        Data::Trade,
222    )
223}
224
225/// Parses vector of OKX mark price messages into Nautilus mark price updates.
226///
227/// # Errors
228///
229/// Returns an error if any mark price message fails to parse.
230pub fn parse_mark_price_msg_vec(
231    data: serde_json::Value,
232    instrument_id: &InstrumentId,
233    price_precision: u8,
234    ts_init: UnixNanos,
235) -> anyhow::Result<Vec<Data>> {
236    parse_message_vec(
237        data,
238        |msg| parse_mark_price_msg(msg, *instrument_id, price_precision, ts_init),
239        Data::MarkPriceUpdate,
240    )
241}
242
243/// Parses vector of OKX index price messages into Nautilus index price updates.
244///
245/// # Errors
246///
247/// Returns an error if any index price message fails to parse.
248pub fn parse_index_price_msg_vec(
249    data: serde_json::Value,
250    instrument_id: &InstrumentId,
251    price_precision: u8,
252    ts_init: UnixNanos,
253) -> anyhow::Result<Vec<Data>> {
254    parse_message_vec(
255        data,
256        |msg| parse_index_price_msg(msg, *instrument_id, price_precision, ts_init),
257        Data::IndexPriceUpdate,
258    )
259}
260
261/// Parses vector of OKX funding rate messages into Nautilus funding rate updates.
262/// Includes caching to filter out duplicate funding rates.
263///
264/// # Errors
265///
266/// Returns an error if any funding rate message fails to parse.
267pub fn parse_funding_rate_msg_vec(
268    data: serde_json::Value,
269    instrument_id: &InstrumentId,
270    ts_init: UnixNanos,
271    funding_cache: &mut AHashMap<Ustr, (Ustr, u64)>,
272) -> anyhow::Result<Vec<FundingRateUpdate>> {
273    let msgs: Vec<OKXFundingRateMsg> = serde_json::from_value(data)?;
274
275    let mut result = Vec::with_capacity(msgs.len());
276    for msg in &msgs {
277        let cache_key = (msg.funding_rate, msg.funding_time);
278
279        if let Some(cached) = funding_cache.get(&msg.inst_id)
280            && *cached == cache_key
281        {
282            continue; // Skip duplicate
283        }
284
285        // New or changed funding rate, update cache and parse
286        funding_cache.insert(msg.inst_id, cache_key);
287        let funding_rate = parse_funding_rate_msg(msg, *instrument_id, ts_init)?;
288        result.push(funding_rate);
289    }
290
291    Ok(result)
292}
293
294/// Parses vector of OKX candle messages into Nautilus bars.
295///
296/// # Errors
297///
298/// Returns an error if candle messages cannot be deserialized or parsed.
299pub fn parse_candle_msg_vec(
300    data: serde_json::Value,
301    instrument_id: &InstrumentId,
302    price_precision: u8,
303    size_precision: u8,
304    spec: BarSpecification,
305    ts_init: UnixNanos,
306) -> anyhow::Result<Vec<Data>> {
307    let msgs: Vec<OKXCandleMsg> = serde_json::from_value(data)?;
308    let bar_type = BarType::new(*instrument_id, spec, AggregationSource::External);
309    let mut bars = Vec::with_capacity(msgs.len());
310
311    for msg in msgs {
312        // Only process completed candles to avoid duplicate/partial bars
313        if msg.confirm == OKXCandleConfirm::Closed {
314            let bar = parse_candle_msg(&msg, bar_type, price_precision, size_precision, ts_init)?;
315            bars.push(Data::Bar(bar));
316        }
317    }
318
319    Ok(bars)
320}
321
322/// Parses vector of OKX book messages into Nautilus depth10 updates.
323///
324/// # Errors
325///
326/// Returns an error if any book10 message fails to parse.
327pub fn parse_book10_msg_vec(
328    data: Vec<OKXBookMsg>,
329    instrument_id: &InstrumentId,
330    price_precision: u8,
331    size_precision: u8,
332    ts_init: UnixNanos,
333) -> anyhow::Result<Vec<Data>> {
334    let mut depth10_updates = Vec::with_capacity(data.len());
335
336    for msg in data {
337        let depth10 = parse_book10_msg(
338            &msg,
339            *instrument_id,
340            price_precision,
341            size_precision,
342            ts_init,
343        )?;
344        depth10_updates.push(Data::Depth10(Box::new(depth10)));
345    }
346
347    Ok(depth10_updates)
348}
349
350/// Parses an OKX book message into Nautilus order book deltas.
351///
352/// # Errors
353///
354/// Returns an error if bid or ask levels contain values that cannot be parsed.
355pub fn parse_book_msg(
356    msg: &OKXBookMsg,
357    instrument_id: InstrumentId,
358    price_precision: u8,
359    size_precision: u8,
360    action: &OKXBookAction,
361    ts_init: UnixNanos,
362) -> anyhow::Result<OrderBookDeltas> {
363    let flags = if action == &OKXBookAction::Snapshot {
364        RecordFlag::F_SNAPSHOT as u8
365    } else {
366        0
367    };
368    let ts_event = parse_millisecond_timestamp(msg.ts);
369
370    let mut deltas = Vec::with_capacity(msg.asks.len() + msg.bids.len());
371
372    for bid in &msg.bids {
373        let book_action = match action {
374            OKXBookAction::Snapshot => BookAction::Add,
375            _ => match bid.size.as_str() {
376                "0" => BookAction::Delete,
377                _ => BookAction::Update,
378            },
379        };
380        let price = parse_price(&bid.price, price_precision)?;
381        let size = parse_quantity(&bid.size, size_precision)?;
382        let order_id = 0; // TBD
383        let order = BookOrder::new(OrderSide::Buy, price, size, order_id);
384        let delta = OrderBookDelta::new(
385            instrument_id,
386            book_action,
387            order,
388            flags,
389            msg.seq_id,
390            ts_event,
391            ts_init,
392        );
393        deltas.push(delta);
394    }
395
396    for ask in &msg.asks {
397        let book_action = match action {
398            OKXBookAction::Snapshot => BookAction::Add,
399            _ => match ask.size.as_str() {
400                "0" => BookAction::Delete,
401                _ => BookAction::Update,
402            },
403        };
404        let price = parse_price(&ask.price, price_precision)?;
405        let size = parse_quantity(&ask.size, size_precision)?;
406        let order_id = 0; // TBD
407        let order = BookOrder::new(OrderSide::Sell, price, size, order_id);
408        let delta = OrderBookDelta::new(
409            instrument_id,
410            book_action,
411            order,
412            flags,
413            msg.seq_id,
414            ts_event,
415            ts_init,
416        );
417        deltas.push(delta);
418    }
419
420    OrderBookDeltas::new_checked(instrument_id, deltas)
421}
422
423/// Parses an OKX book message into a Nautilus quote tick.
424///
425/// # Errors
426///
427/// Returns an error if any quote levels contain values that cannot be parsed.
428pub fn parse_quote_msg(
429    msg: &OKXBookMsg,
430    instrument_id: InstrumentId,
431    price_precision: u8,
432    size_precision: u8,
433    ts_init: UnixNanos,
434) -> anyhow::Result<QuoteTick> {
435    let best_bid: &OrderBookEntry = &msg.bids[0];
436    let best_ask: &OrderBookEntry = &msg.asks[0];
437
438    let bid_price = parse_price(&best_bid.price, price_precision)?;
439    let ask_price = parse_price(&best_ask.price, price_precision)?;
440    let bid_size = parse_quantity(&best_bid.size, size_precision)?;
441    let ask_size = parse_quantity(&best_ask.size, size_precision)?;
442    let ts_event = parse_millisecond_timestamp(msg.ts);
443
444    QuoteTick::new_checked(
445        instrument_id,
446        bid_price,
447        ask_price,
448        bid_size,
449        ask_size,
450        ts_event,
451        ts_init,
452    )
453}
454
455/// Parses an OKX book message into a Nautilus [`OrderBookDepth10`].
456///
457/// Converts order book data into a fixed-depth snapshot with top 10 levels for both sides.
458///
459/// # Errors
460///
461/// Returns an error if price or size fields cannot be parsed for any level.
462pub fn parse_book10_msg(
463    msg: &OKXBookMsg,
464    instrument_id: InstrumentId,
465    price_precision: u8,
466    size_precision: u8,
467    ts_init: UnixNanos,
468) -> anyhow::Result<OrderBookDepth10> {
469    // Initialize arrays - need to fill all 10 levels even if we have fewer
470    let mut bids: [BookOrder; DEPTH10_LEN] = [BookOrder::default(); DEPTH10_LEN];
471    let mut asks: [BookOrder; DEPTH10_LEN] = [BookOrder::default(); DEPTH10_LEN];
472    let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
473    let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
474
475    // Parse available bid levels (up to 10)
476    let bid_len = msg.bids.len().min(DEPTH10_LEN);
477    for (i, level) in msg.bids.iter().take(DEPTH10_LEN).enumerate() {
478        let price = parse_price(&level.price, price_precision)?;
479        let size = parse_quantity(&level.size, size_precision)?;
480        let orders_count = level.orders_count.parse::<u32>().unwrap_or(1);
481
482        let bid_order = BookOrder::new(OrderSide::Buy, price, size, 0);
483        bids[i] = bid_order;
484        bid_counts[i] = orders_count;
485    }
486
487    // Fill remaining bid slots with empty Buy orders (not NULL orders)
488    for i in bid_len..DEPTH10_LEN {
489        bids[i] = BookOrder::new(
490            OrderSide::Buy,
491            Price::zero(price_precision),
492            Quantity::zero(size_precision),
493            0,
494        );
495        bid_counts[i] = 0;
496    }
497
498    // Parse available ask levels (up to 10)
499    let ask_len = msg.asks.len().min(DEPTH10_LEN);
500    for (i, level) in msg.asks.iter().take(DEPTH10_LEN).enumerate() {
501        let price = parse_price(&level.price, price_precision)?;
502        let size = parse_quantity(&level.size, size_precision)?;
503        let orders_count = level.orders_count.parse::<u32>().unwrap_or(1);
504
505        let ask_order = BookOrder::new(OrderSide::Sell, price, size, 0);
506        asks[i] = ask_order;
507        ask_counts[i] = orders_count;
508    }
509
510    // Fill remaining ask slots with empty Sell orders (not NULL orders)
511    for i in ask_len..DEPTH10_LEN {
512        asks[i] = BookOrder::new(
513            OrderSide::Sell,
514            Price::zero(price_precision),
515            Quantity::zero(size_precision),
516            0,
517        );
518        ask_counts[i] = 0;
519    }
520
521    let ts_event = parse_millisecond_timestamp(msg.ts);
522
523    Ok(OrderBookDepth10::new(
524        instrument_id,
525        bids,
526        asks,
527        bid_counts,
528        ask_counts,
529        RecordFlag::F_SNAPSHOT as u8,
530        msg.seq_id, // Use sequence ID for OKX L2 books
531        ts_event,
532        ts_init,
533    ))
534}
535
536/// Parses an OKX ticker message into a Nautilus quote tick.
537///
538/// # Errors
539///
540/// Returns an error if bid or ask values cannot be parsed from the message.
541pub fn parse_ticker_msg(
542    msg: &OKXTickerMsg,
543    instrument_id: InstrumentId,
544    price_precision: u8,
545    size_precision: u8,
546    ts_init: UnixNanos,
547) -> anyhow::Result<QuoteTick> {
548    let bid_price = parse_price(&msg.bid_px, price_precision)?;
549    let ask_price = parse_price(&msg.ask_px, price_precision)?;
550    let bid_size = parse_quantity(&msg.bid_sz, size_precision)?;
551    let ask_size = parse_quantity(&msg.ask_sz, size_precision)?;
552    let ts_event = parse_millisecond_timestamp(msg.ts);
553
554    QuoteTick::new_checked(
555        instrument_id,
556        bid_price,
557        ask_price,
558        bid_size,
559        ask_size,
560        ts_event,
561        ts_init,
562    )
563}
564
565/// Parses an OKX trade message into a Nautilus trade tick.
566///
567/// # Errors
568///
569/// Returns an error if trade prices or sizes cannot be parsed.
570pub fn parse_trade_msg(
571    msg: &OKXTradeMsg,
572    instrument_id: InstrumentId,
573    price_precision: u8,
574    size_precision: u8,
575    ts_init: UnixNanos,
576) -> anyhow::Result<TradeTick> {
577    let price = parse_price(&msg.px, price_precision)?;
578    let size = parse_quantity(&msg.sz, size_precision)?;
579    let aggressor_side: AggressorSide = msg.side.into();
580    let trade_id = TradeId::new(&msg.trade_id);
581    let ts_event = parse_millisecond_timestamp(msg.ts);
582
583    TradeTick::new_checked(
584        instrument_id,
585        price,
586        size,
587        aggressor_side,
588        trade_id,
589        ts_event,
590        ts_init,
591    )
592}
593
594/// Parses an OKX mark price message into a Nautilus mark price update.
595///
596/// # Errors
597///
598/// Returns an error if the mark price fails to parse.
599pub fn parse_mark_price_msg(
600    msg: &OKXMarkPriceMsg,
601    instrument_id: InstrumentId,
602    price_precision: u8,
603    ts_init: UnixNanos,
604) -> anyhow::Result<MarkPriceUpdate> {
605    let price = parse_price(&msg.mark_px, price_precision)?;
606    let ts_event = parse_millisecond_timestamp(msg.ts);
607
608    Ok(MarkPriceUpdate::new(
609        instrument_id,
610        price,
611        ts_event,
612        ts_init,
613    ))
614}
615
616/// Parses an OKX index price message into a Nautilus index price update.
617///
618/// # Errors
619///
620/// Returns an error if the index price fails to parse.
621pub fn parse_index_price_msg(
622    msg: &OKXIndexPriceMsg,
623    instrument_id: InstrumentId,
624    price_precision: u8,
625    ts_init: UnixNanos,
626) -> anyhow::Result<IndexPriceUpdate> {
627    let price = parse_price(&msg.idx_px, price_precision)?;
628    let ts_event = parse_millisecond_timestamp(msg.ts);
629
630    Ok(IndexPriceUpdate::new(
631        instrument_id,
632        price,
633        ts_event,
634        ts_init,
635    ))
636}
637
638/// Parses an OKX candle message into a Nautilus bar.
639///
640/// # Errors
641///
642/// Returns an error if candle price or volume fields cannot be parsed.
643pub fn parse_candle_msg(
644    msg: &OKXCandleMsg,
645    bar_type: BarType,
646    price_precision: u8,
647    size_precision: u8,
648    ts_init: UnixNanos,
649) -> anyhow::Result<Bar> {
650    let open = parse_price(&msg.o, price_precision)?;
651    let high = parse_price(&msg.h, price_precision)?;
652    let low = parse_price(&msg.l, price_precision)?;
653    let close = parse_price(&msg.c, price_precision)?;
654    let volume = parse_quantity(&msg.vol, size_precision)?;
655    let ts_event = parse_millisecond_timestamp(msg.ts);
656
657    Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
658}
659
660/// Parses vector of OKX order messages into Nautilus execution reports.
661///
662/// # Errors
663///
664/// Returns an error if any contained order messages cannot be parsed.
665pub fn parse_order_msg_vec(
666    data: Vec<OKXOrderMsg>,
667    account_id: AccountId,
668    instruments: &AHashMap<Ustr, InstrumentAny>,
669    fee_cache: &AHashMap<Ustr, Money>,
670    filled_qty_cache: &AHashMap<Ustr, Quantity>,
671    ts_init: UnixNanos,
672) -> anyhow::Result<Vec<ExecutionReport>> {
673    let mut order_reports = Vec::with_capacity(data.len());
674
675    for msg in data {
676        match parse_order_msg(
677            &msg,
678            account_id,
679            instruments,
680            fee_cache,
681            filled_qty_cache,
682            ts_init,
683        ) {
684            Ok(report) => order_reports.push(report),
685            Err(e) => tracing::error!("Failed to parse execution report from message: {e}"),
686        }
687    }
688
689    Ok(order_reports)
690}
691
692/// Checks if acc_fill_sz has increased compared to the previous filled quantity.
693fn has_acc_fill_sz_increased(
694    acc_fill_sz: &Option<String>,
695    previous_filled_qty: Option<Quantity>,
696    size_precision: u8,
697) -> bool {
698    if let Some(acc_str) = acc_fill_sz {
699        if acc_str.is_empty() || acc_str == "0" {
700            return false;
701        }
702        if let Ok(current_filled) = parse_quantity(acc_str, size_precision) {
703            if let Some(prev_qty) = previous_filled_qty {
704                return current_filled > prev_qty;
705            }
706            return !current_filled.is_zero();
707        }
708    }
709    false
710}
711
712/// Parses a single OKX order message into an [`ExecutionReport`].
713///
714/// # Errors
715///
716/// Returns an error if the instrument cannot be found or if parsing the
717/// underlying order payload fails.
718pub fn parse_order_msg(
719    msg: &OKXOrderMsg,
720    account_id: AccountId,
721    instruments: &AHashMap<Ustr, InstrumentAny>,
722    fee_cache: &AHashMap<Ustr, Money>,
723    filled_qty_cache: &AHashMap<Ustr, Quantity>,
724    ts_init: UnixNanos,
725) -> anyhow::Result<ExecutionReport> {
726    let instrument = instruments
727        .get(&msg.inst_id)
728        .ok_or_else(|| anyhow::anyhow!("No instrument found for inst_id: {}", msg.inst_id))?;
729
730    let previous_fee = fee_cache.get(&msg.ord_id).copied();
731    let previous_filled_qty = filled_qty_cache.get(&msg.ord_id).copied();
732
733    let has_new_fill = (!msg.fill_sz.is_empty() && msg.fill_sz != "0")
734        || !msg.trade_id.is_empty()
735        || has_acc_fill_sz_increased(
736            &msg.acc_fill_sz,
737            previous_filled_qty,
738            instrument.size_precision(),
739        );
740
741    match msg.state {
742        OKXOrderStatus::Filled | OKXOrderStatus::PartiallyFilled if has_new_fill => {
743            parse_fill_report(
744                msg,
745                instrument,
746                account_id,
747                previous_fee,
748                previous_filled_qty,
749                ts_init,
750            )
751            .map(ExecutionReport::Fill)
752        }
753        _ => parse_order_status_report(msg, instrument, account_id, ts_init)
754            .map(ExecutionReport::Order),
755    }
756}
757
758/// Parses an OKX algo order message into a Nautilus execution report.
759///
760/// # Errors
761///
762/// Returns an error if the instrument cannot be found or if message fields
763/// fail to parse.
764pub fn parse_algo_order_msg(
765    msg: OKXAlgoOrderMsg,
766    account_id: AccountId,
767    instruments: &AHashMap<Ustr, InstrumentAny>,
768    ts_init: UnixNanos,
769) -> anyhow::Result<ExecutionReport> {
770    let inst = instruments
771        .get(&msg.inst_id)
772        .ok_or_else(|| anyhow::anyhow!("No instrument found for inst_id: {}", msg.inst_id))?;
773
774    // Algo orders primarily return status reports (not fills since they haven't been triggered yet)
775    parse_algo_order_status_report(&msg, inst, account_id, ts_init).map(ExecutionReport::Order)
776}
777
778/// Parses an OKX algo order message into a Nautilus order status report.
779///
780/// # Errors
781///
782/// Returns an error if any order identifiers or numeric fields cannot be
783/// parsed.
784pub fn parse_algo_order_status_report(
785    msg: &OKXAlgoOrderMsg,
786    instrument: &InstrumentAny,
787    account_id: AccountId,
788    ts_init: UnixNanos,
789) -> anyhow::Result<OrderStatusReport> {
790    // For algo orders, use algo_cl_ord_id if cl_ord_id is empty
791    let client_order_id = if msg.cl_ord_id.is_empty() {
792        parse_client_order_id(&msg.algo_cl_ord_id)
793    } else {
794        parse_client_order_id(&msg.cl_ord_id)
795    };
796
797    // For algo orders that haven't triggered, ord_id will be empty, use algo_id instead
798    let venue_order_id = if msg.ord_id.is_empty() {
799        VenueOrderId::new(msg.algo_id.as_str())
800    } else {
801        VenueOrderId::new(msg.ord_id.as_str())
802    };
803
804    let order_side: OrderSide = msg.side.into();
805
806    // Determine order type based on ord_px for conditional/stop orders
807    let order_type = if is_market_price(&msg.ord_px) {
808        OrderType::StopMarket
809    } else {
810        OrderType::StopLimit
811    };
812
813    let status: OrderStatus = msg.state.into();
814
815    let quantity = parse_quantity(msg.sz.as_str(), instrument.size_precision())?;
816
817    // For algo orders, actual_sz represents filled quantity (if any)
818    let filled_qty = if msg.actual_sz.is_empty() || msg.actual_sz == "0" {
819        Quantity::zero(instrument.size_precision())
820    } else {
821        parse_quantity(msg.actual_sz.as_str(), instrument.size_precision())?
822    };
823
824    let trigger_px = parse_price(msg.trigger_px.as_str(), instrument.price_precision())?;
825
826    // Parse limit price if it exists (not -1)
827    let price = if msg.ord_px != "-1" {
828        Some(parse_price(
829            msg.ord_px.as_str(),
830            instrument.price_precision(),
831        )?)
832    } else {
833        None
834    };
835
836    let trigger_type = match msg.trigger_px_type {
837        OKXTriggerType::Last => TriggerType::LastPrice,
838        OKXTriggerType::Mark => TriggerType::MarkPrice,
839        OKXTriggerType::Index => TriggerType::IndexPrice,
840        OKXTriggerType::None => TriggerType::Default,
841    };
842
843    let mut report = OrderStatusReport::new(
844        account_id,
845        instrument.id(),
846        client_order_id,
847        venue_order_id,
848        order_side,
849        order_type,
850        TimeInForce::Gtc, // Algo orders are typically GTC
851        status,
852        quantity,
853        filled_qty,
854        msg.c_time.into(), // ts_accepted
855        msg.u_time.into(), // ts_last
856        ts_init,
857        None, // report_id - auto-generated
858    );
859
860    report.trigger_price = Some(trigger_px);
861    report.trigger_type = Some(trigger_type);
862
863    if let Some(limit_price) = price {
864        report.price = Some(limit_price);
865    }
866
867    Ok(report)
868}
869
870/// Parses an OKX order message into a Nautilus order status report.
871///
872/// # Errors
873///
874/// Returns an error if order metadata or numeric values cannot be parsed.
875pub fn parse_order_status_report(
876    msg: &OKXOrderMsg,
877    instrument: &InstrumentAny,
878    account_id: AccountId,
879    ts_init: UnixNanos,
880) -> anyhow::Result<OrderStatusReport> {
881    let client_order_id = parse_client_order_id(&msg.cl_ord_id);
882    let venue_order_id = VenueOrderId::new(msg.ord_id);
883    let order_side: OrderSide = msg.side.into();
884
885    let okx_order_type = msg.ord_type;
886
887    // Determine order type based on presence of limit price for certain OKX order types
888    let order_type = match okx_order_type {
889        // Trigger orders: check if they have a price
890        OKXOrderType::Trigger => {
891            if is_market_price(&msg.px) {
892                OrderType::StopMarket
893            } else {
894                OrderType::StopLimit
895            }
896        }
897        // FOK/IOC orders: check if they have a price
898        // Without a price, they're market orders with TIF
899        // With a price, they're limit orders with TIF
900        OKXOrderType::Fok | OKXOrderType::Ioc | OKXOrderType::OptimalLimitIoc => {
901            if is_market_price(&msg.px) {
902                OrderType::Market
903            } else {
904                OrderType::Limit
905            }
906        }
907        // All other order types use standard mapping
908        _ => msg.ord_type.into(),
909    };
910    let order_status: OrderStatus = msg.state.into();
911
912    let time_in_force = match okx_order_type {
913        OKXOrderType::Fok => TimeInForce::Fok,
914        OKXOrderType::Ioc | OKXOrderType::OptimalLimitIoc => TimeInForce::Ioc,
915        _ => TimeInForce::Gtc,
916    };
917
918    let size_precision = instrument.size_precision();
919
920    // Parse quantities based on target currency
921    // OKX always returns acc_fill_sz in base currency, but sz depends on tgt_ccy
922
923    // Determine if this is a quote-quantity order
924    // Method 1: Explicit tgt_ccy field set to QuoteCcy
925    let is_quote_qty_explicit = msg.tgt_ccy == Some(OKXTargetCurrency::QuoteCcy);
926
927    // Method 2: Use OKX defaults when tgt_ccy is None (old orders or missing field)
928    // OKX API defaults for SPOT market orders: BUY orders use quote_ccy, SELL orders use base_ccy
929    // Note: tgtCcy only applies to SPOT market orders (not limit orders)
930    // For limit orders, sz is always in base currency regardless of side
931    let is_quote_qty_heuristic = msg.tgt_ccy.is_none()
932        && (msg.inst_type == OKXInstrumentType::Spot || msg.inst_type == OKXInstrumentType::Margin)
933        && msg.side == OKXSide::Buy
934        && msg.ord_type == OKXOrderType::Market;
935
936    let (quantity, filled_qty) = if is_quote_qty_explicit || is_quote_qty_heuristic {
937        // Quote-quantity order: sz is in quote currency, need to convert to base
938        let sz_quote = msg.sz.parse::<f64>().map_err(|e| {
939            anyhow::anyhow!("Failed to parse sz='{}' as quote quantity: {}", msg.sz, e)
940        })?;
941
942        // Determine the price to use for conversion
943        // Priority: 1) limit price (px) for limit orders, 2) avg_px for market orders
944        let conversion_price = if !is_market_price(&msg.px) {
945            // Limit order: use the limit price (msg.px)
946            msg.px
947                .parse::<f64>()
948                .map_err(|e| anyhow::anyhow!("Failed to parse px='{}': {}", msg.px, e))?
949        } else if !msg.avg_px.is_empty() && msg.avg_px != "0" {
950            // Market order with fills: use average fill price
951            msg.avg_px
952                .parse::<f64>()
953                .map_err(|e| anyhow::anyhow!("Failed to parse avg_px='{}': {}", msg.avg_px, e))?
954        } else {
955            0.0
956        };
957
958        // Convert quote quantity to base: quantity_base = sz_quote / price
959        let quantity_base = if conversion_price > 0.0 {
960            Quantity::new(sz_quote / conversion_price, size_precision)
961        } else {
962            // No price available, can't convert - use sz as-is temporarily
963            // This will be corrected once the order gets filled and price is available
964            parse_quantity(&msg.sz, size_precision)?
965        };
966
967        let filled_qty =
968            parse_quantity(&msg.acc_fill_sz.clone().unwrap_or_default(), size_precision)?;
969
970        (quantity_base, filled_qty)
971    } else {
972        // Base-quantity order: both sz and acc_fill_sz are in base currency
973        let quantity = parse_quantity(&msg.sz, size_precision)?;
974        let filled_qty =
975            parse_quantity(&msg.acc_fill_sz.clone().unwrap_or_default(), size_precision)?;
976
977        (quantity, filled_qty)
978    };
979
980    // For quote-quantity orders marked as FILLED, adjust quantity to match filled_qty
981    // to avoid precision mismatches from quote-to-base conversion
982    let (quantity, filled_qty) = if (is_quote_qty_explicit || is_quote_qty_heuristic)
983        && msg.state == OKXOrderStatus::Filled
984        && filled_qty.is_positive()
985    {
986        (filled_qty, filled_qty)
987    } else {
988        (quantity, filled_qty)
989    };
990
991    let ts_accepted = parse_millisecond_timestamp(msg.c_time);
992    let ts_last = parse_millisecond_timestamp(msg.u_time);
993
994    let is_liquidation = matches!(
995        msg.category,
996        OKXOrderCategory::FullLiquidation | OKXOrderCategory::PartialLiquidation
997    );
998
999    let is_adl = msg.category == OKXOrderCategory::Adl;
1000
1001    if is_liquidation {
1002        tracing::warn!(
1003            order_id = msg.ord_id.as_str(),
1004            category = ?msg.category,
1005            inst_id = msg.inst_id.as_str(),
1006            state = ?msg.state,
1007            "Liquidation order status update"
1008        );
1009    }
1010
1011    if is_adl {
1012        tracing::warn!(
1013            order_id = msg.ord_id.as_str(),
1014            inst_id = msg.inst_id.as_str(),
1015            state = ?msg.state,
1016            "ADL (Auto-Deleveraging) order status update"
1017        );
1018    }
1019
1020    let mut report = OrderStatusReport::new(
1021        account_id,
1022        instrument.id(),
1023        client_order_id,
1024        venue_order_id,
1025        order_side,
1026        order_type,
1027        time_in_force,
1028        order_status,
1029        quantity,
1030        filled_qty,
1031        ts_accepted,
1032        ts_init,
1033        ts_last,
1034        None, // Generate UUID4 automatically
1035    );
1036
1037    let price_precision = instrument.price_precision();
1038
1039    if okx_order_type == OKXOrderType::Trigger {
1040        // For triggered orders coming through regular orders channel,
1041        // set the price if it's a stop-limit order
1042        if !is_market_price(&msg.px)
1043            && let Ok(price) = parse_price(&msg.px, price_precision)
1044        {
1045            report = report.with_price(price);
1046        }
1047    } else {
1048        // For regular orders, use px field
1049        if !is_market_price(&msg.px)
1050            && let Ok(price) = parse_price(&msg.px, price_precision)
1051        {
1052            report = report.with_price(price);
1053        }
1054    }
1055
1056    if !msg.avg_px.is_empty()
1057        && let Ok(avg_px) = msg.avg_px.parse::<f64>()
1058    {
1059        report = report.with_avg_px(avg_px);
1060    }
1061
1062    if matches!(
1063        msg.ord_type,
1064        OKXOrderType::PostOnly | OKXOrderType::MmpAndPostOnly
1065    ) || matches!(
1066        msg.cancel_source.as_deref(),
1067        Some(source) if source == OKX_POST_ONLY_CANCEL_SOURCE
1068    ) || matches!(
1069        msg.cancel_source_reason.as_deref(),
1070        Some(reason) if reason.contains("POST_ONLY")
1071    ) {
1072        report = report.with_post_only(true);
1073    }
1074
1075    if msg.reduce_only == "true" {
1076        report = report.with_reduce_only(true);
1077    }
1078
1079    if let Some(reason) = msg
1080        .cancel_source_reason
1081        .as_ref()
1082        .filter(|reason| !reason.is_empty())
1083    {
1084        report = report.with_cancel_reason(reason.clone());
1085    } else if let Some(source) = msg
1086        .cancel_source
1087        .as_ref()
1088        .filter(|source| !source.is_empty())
1089    {
1090        let reason = if source == OKX_POST_ONLY_CANCEL_SOURCE {
1091            OKX_POST_ONLY_CANCEL_REASON.to_string()
1092        } else {
1093            format!("cancel_source={source}")
1094        };
1095        report = report.with_cancel_reason(reason);
1096    }
1097
1098    Ok(report)
1099}
1100
1101/// Parses an OKX order message into a Nautilus fill report.
1102///
1103/// # Errors
1104///
1105/// Returns an error if order quantities, prices, or fees cannot be parsed.
1106pub fn parse_fill_report(
1107    msg: &OKXOrderMsg,
1108    instrument: &InstrumentAny,
1109    account_id: AccountId,
1110    previous_fee: Option<Money>,
1111    previous_filled_qty: Option<Quantity>,
1112    ts_init: UnixNanos,
1113) -> anyhow::Result<FillReport> {
1114    let client_order_id = parse_client_order_id(&msg.cl_ord_id);
1115    let venue_order_id = VenueOrderId::new(msg.ord_id);
1116
1117    // TODO: Extract to dedicated function:
1118    // OKX may not provide a trade_id, so generate a UUID4 as fallback
1119    let trade_id = if msg.trade_id.is_empty() {
1120        TradeId::from(UUID4::new().to_string().as_str())
1121    } else {
1122        TradeId::from(msg.trade_id.as_str())
1123    };
1124
1125    let order_side: OrderSide = msg.side.into();
1126
1127    let price_precision = instrument.price_precision();
1128    let size_precision = instrument.size_precision();
1129
1130    let price_str = if !msg.fill_px.is_empty() {
1131        &msg.fill_px
1132    } else if !msg.avg_px.is_empty() {
1133        &msg.avg_px
1134    } else {
1135        &msg.px
1136    };
1137    let last_px = parse_price(price_str, price_precision).map_err(|e| {
1138        anyhow::anyhow!(
1139            "Failed to parse price (fill_px='{}', avg_px='{}', px='{}'): {}",
1140            msg.fill_px,
1141            msg.avg_px,
1142            msg.px,
1143            e
1144        )
1145    })?;
1146
1147    // OKX provides fillSz (incremental fill) or accFillSz (cumulative total)
1148    // If fillSz is provided, use it directly as the incremental fill quantity
1149    let last_qty = if !msg.fill_sz.is_empty() && msg.fill_sz != "0" {
1150        parse_quantity(&msg.fill_sz, size_precision)
1151            .map_err(|e| anyhow::anyhow!("Failed to parse fill_sz='{}': {e}", msg.fill_sz,))?
1152    } else if let Some(ref acc_fill_sz) = msg.acc_fill_sz {
1153        // If fillSz is missing but accFillSz is available, calculate incremental fill
1154        if !acc_fill_sz.is_empty() && acc_fill_sz != "0" {
1155            let current_filled = parse_quantity(acc_fill_sz, size_precision).map_err(|e| {
1156                anyhow::anyhow!("Failed to parse acc_fill_sz='{}': {e}", acc_fill_sz,)
1157            })?;
1158
1159            // Calculate incremental fill as: current_total - previous_total
1160            if let Some(prev_qty) = previous_filled_qty {
1161                let incremental = current_filled - prev_qty;
1162                if incremental.is_zero() {
1163                    anyhow::bail!(
1164                        "Incremental fill quantity is zero (acc_fill_sz='{}', previous_filled_qty={})",
1165                        acc_fill_sz,
1166                        prev_qty
1167                    );
1168                }
1169                incremental
1170            } else {
1171                // First fill, use accumulated as incremental
1172                current_filled
1173            }
1174        } else {
1175            anyhow::bail!(
1176                "Cannot determine fill quantity: fill_sz is empty/zero and acc_fill_sz is empty/zero"
1177            );
1178        }
1179    } else {
1180        anyhow::bail!(
1181            "Cannot determine fill quantity: fill_sz='{}' and acc_fill_sz is None",
1182            msg.fill_sz
1183        );
1184    };
1185
1186    let fee_str = msg.fee.as_deref().unwrap_or("0");
1187    let fee_value = fee_str
1188        .parse::<f64>()
1189        .map_err(|e| anyhow::anyhow!("Failed to parse fee '{}': {}", fee_str, e))?;
1190
1191    let fee_currency = parse_fee_currency(msg.fee_ccy.as_str(), fee_value, || {
1192        format!("fill report for inst_id={}", msg.inst_id)
1193    });
1194
1195    // OKX sends fees as negative numbers (e.g., "-2.5" for a $2.5 charge), parse_fee negates to positive
1196    let total_fee = parse_fee(msg.fee.as_deref(), fee_currency)
1197        .map_err(|e| anyhow::anyhow!("Failed to parse fee={:?}: {}", msg.fee, e))?;
1198
1199    // OKX sends cumulative fees, so we subtract the previous total to get this fill's fee
1200    let commission = if let Some(previous_fee) = previous_fee {
1201        let incremental = total_fee - previous_fee;
1202
1203        if incremental < Money::zero(fee_currency) {
1204            tracing::debug!(
1205                order_id = msg.ord_id.as_str(),
1206                total_fee = %total_fee,
1207                previous_fee = %previous_fee,
1208                incremental = %incremental,
1209                "Negative incremental fee detected - likely a maker rebate or fee refund"
1210            );
1211        }
1212
1213        // Skip corruption check when previous is negative (rebate), as transitions from
1214        // rebate to charge legitimately have incremental > total (e.g., -1 → +2 gives +3)
1215        if previous_fee >= Money::zero(fee_currency)
1216            && total_fee > Money::zero(fee_currency)
1217            && incremental > total_fee
1218        {
1219            tracing::error!(
1220                order_id = msg.ord_id.as_str(),
1221                total_fee = %total_fee,
1222                previous_fee = %previous_fee,
1223                incremental = %incremental,
1224                "Incremental fee exceeds total fee - likely fee cache corruption, using total fee as fallback"
1225            );
1226            total_fee
1227        } else {
1228            incremental
1229        }
1230    } else {
1231        total_fee
1232    };
1233
1234    let liquidity_side: LiquiditySide = msg.exec_type.into();
1235    let ts_event = parse_millisecond_timestamp(msg.fill_time);
1236
1237    let is_liquidation = matches!(
1238        msg.category,
1239        OKXOrderCategory::FullLiquidation | OKXOrderCategory::PartialLiquidation
1240    );
1241
1242    let is_adl = msg.category == OKXOrderCategory::Adl;
1243
1244    if is_liquidation {
1245        tracing::warn!(
1246            order_id = msg.ord_id.as_str(),
1247            category = ?msg.category,
1248            inst_id = msg.inst_id.as_str(),
1249            side = ?msg.side,
1250            fill_sz = %msg.fill_sz,
1251            fill_px = %msg.fill_px,
1252            "Liquidation order detected"
1253        );
1254    }
1255
1256    if is_adl {
1257        tracing::warn!(
1258            order_id = msg.ord_id.as_str(),
1259            inst_id = msg.inst_id.as_str(),
1260            side = ?msg.side,
1261            fill_sz = %msg.fill_sz,
1262            fill_px = %msg.fill_px,
1263            "ADL (Auto-Deleveraging) order detected"
1264        );
1265    }
1266
1267    let report = FillReport::new(
1268        account_id,
1269        instrument.id(),
1270        venue_order_id,
1271        trade_id,
1272        order_side,
1273        last_qty,
1274        last_px,
1275        commission,
1276        liquidity_side,
1277        client_order_id,
1278        None,
1279        ts_event,
1280        ts_init,
1281        None, // Generate UUID4 automatically
1282    );
1283
1284    Ok(report)
1285}
1286
1287/// Parses OKX WebSocket message payloads into Nautilus data structures.
1288///
1289/// # Errors
1290///
1291/// Returns an error if the payload cannot be deserialized or if downstream
1292/// parsing routines fail.
1293///
1294/// # Panics
1295///
1296/// Panics only in the case where `okx_channel_to_bar_spec(channel)` returns
1297/// `None` after a prior `is_some` check – an unreachable scenario indicating a
1298/// logic error.
1299#[allow(clippy::too_many_arguments)]
1300pub fn parse_ws_message_data(
1301    channel: &OKXWsChannel,
1302    data: serde_json::Value,
1303    instrument_id: &InstrumentId,
1304    price_precision: u8,
1305    size_precision: u8,
1306    ts_init: UnixNanos,
1307    funding_cache: &mut AHashMap<Ustr, (Ustr, u64)>,
1308    instruments_cache: &AHashMap<Ustr, InstrumentAny>,
1309) -> anyhow::Result<Option<NautilusWsMessage>> {
1310    match channel {
1311        OKXWsChannel::Instruments => {
1312            if let Ok(msg) = serde_json::from_value::<OKXInstrument>(data) {
1313                // Look up cached instrument to extract existing fees
1314                let (margin_init, margin_maint, maker_fee, taker_fee) =
1315                    instruments_cache.get(&Ustr::from(&msg.inst_id)).map_or(
1316                        (None, None, None, None),
1317                        extract_fees_from_cached_instrument,
1318                    );
1319
1320                match parse_instrument_any(
1321                    &msg,
1322                    margin_init,
1323                    margin_maint,
1324                    maker_fee,
1325                    taker_fee,
1326                    ts_init,
1327                )? {
1328                    Some(inst_any) => Ok(Some(NautilusWsMessage::Instrument(Box::new(inst_any)))),
1329                    None => {
1330                        tracing::warn!("Empty instrument payload: {:?}", msg);
1331                        Ok(None)
1332                    }
1333                }
1334            } else {
1335                anyhow::bail!("Failed to deserialize instrument payload")
1336            }
1337        }
1338        OKXWsChannel::BboTbt => {
1339            let data_vec = parse_quote_msg_vec(
1340                data,
1341                instrument_id,
1342                price_precision,
1343                size_precision,
1344                ts_init,
1345            )?;
1346            Ok(Some(NautilusWsMessage::Data(data_vec)))
1347        }
1348        OKXWsChannel::Tickers => {
1349            let data_vec = parse_ticker_msg_vec(
1350                data,
1351                instrument_id,
1352                price_precision,
1353                size_precision,
1354                ts_init,
1355            )?;
1356            Ok(Some(NautilusWsMessage::Data(data_vec)))
1357        }
1358        OKXWsChannel::Trades => {
1359            let data_vec = parse_trade_msg_vec(
1360                data,
1361                instrument_id,
1362                price_precision,
1363                size_precision,
1364                ts_init,
1365            )?;
1366            Ok(Some(NautilusWsMessage::Data(data_vec)))
1367        }
1368        OKXWsChannel::MarkPrice => {
1369            let data_vec = parse_mark_price_msg_vec(data, instrument_id, price_precision, ts_init)?;
1370            Ok(Some(NautilusWsMessage::Data(data_vec)))
1371        }
1372        OKXWsChannel::IndexTickers => {
1373            let data_vec =
1374                parse_index_price_msg_vec(data, instrument_id, price_precision, ts_init)?;
1375            Ok(Some(NautilusWsMessage::Data(data_vec)))
1376        }
1377        OKXWsChannel::FundingRate => {
1378            let data_vec = parse_funding_rate_msg_vec(data, instrument_id, ts_init, funding_cache)?;
1379            Ok(Some(NautilusWsMessage::FundingRates(data_vec)))
1380        }
1381        channel if okx_channel_to_bar_spec(channel).is_some() => {
1382            let bar_spec = okx_channel_to_bar_spec(channel).expect("bar_spec checked above");
1383            let data_vec = parse_candle_msg_vec(
1384                data,
1385                instrument_id,
1386                price_precision,
1387                size_precision,
1388                bar_spec,
1389                ts_init,
1390            )?;
1391            Ok(Some(NautilusWsMessage::Data(data_vec)))
1392        }
1393        OKXWsChannel::Books
1394        | OKXWsChannel::BooksTbt
1395        | OKXWsChannel::Books5
1396        | OKXWsChannel::Books50Tbt => {
1397            if let Ok(book_msgs) = serde_json::from_value::<Vec<OKXBookMsg>>(data) {
1398                let data_vec = parse_book10_msg_vec(
1399                    book_msgs,
1400                    instrument_id,
1401                    price_precision,
1402                    size_precision,
1403                    ts_init,
1404                )?;
1405                Ok(Some(NautilusWsMessage::Data(data_vec)))
1406            } else {
1407                anyhow::bail!("Failed to deserialize Books channel data as Vec<OKXBookMsg>")
1408            }
1409        }
1410        _ => {
1411            tracing::warn!("Unsupported channel for message parsing: {channel:?}");
1412            Ok(None)
1413        }
1414    }
1415}
1416
1417////////////////////////////////////////////////////////////////////////////////
1418// Tests
1419////////////////////////////////////////////////////////////////////////////////
1420#[cfg(test)]
1421mod tests {
1422    use ahash::AHashMap;
1423    use nautilus_core::nanos::UnixNanos;
1424    use nautilus_model::{
1425        data::bar::BAR_SPEC_1_DAY_LAST,
1426        identifiers::{ClientOrderId, Symbol},
1427        instruments::CryptoPerpetual,
1428        types::Currency,
1429    };
1430    use rstest::rstest;
1431    use rust_decimal::Decimal;
1432    use ustr::Ustr;
1433
1434    use super::*;
1435    use crate::{
1436        OKXPositionSide,
1437        common::{
1438            enums::{OKXExecType, OKXInstrumentType, OKXOrderType, OKXSide, OKXTradeMode},
1439            parse::parse_account_state,
1440            testing::load_test_json,
1441        },
1442        http::models::OKXAccount,
1443        websocket::messages::{OKXWebSocketArg, OKXWebSocketEvent},
1444    };
1445
1446    fn create_stub_instrument() -> CryptoPerpetual {
1447        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1448        CryptoPerpetual::new(
1449            instrument_id,
1450            Symbol::from("BTC-USDT-SWAP"),
1451            Currency::BTC(),
1452            Currency::USDT(),
1453            Currency::USDT(),
1454            false,
1455            2,
1456            8,
1457            Price::from("0.01"),
1458            Quantity::from("0.00000001"),
1459            None,
1460            None,
1461            None,
1462            None,
1463            None,
1464            None,
1465            None,
1466            None,
1467            None,
1468            None,
1469            None,
1470            None,
1471            UnixNanos::default(),
1472            UnixNanos::default(),
1473        )
1474    }
1475
1476    fn create_stub_order_msg(
1477        fill_sz: &str,
1478        acc_fill_sz: Option<String>,
1479        order_id: &str,
1480        trade_id: &str,
1481    ) -> OKXOrderMsg {
1482        OKXOrderMsg {
1483            acc_fill_sz,
1484            avg_px: "50000.0".to_string(),
1485            c_time: 1746947317401,
1486            cancel_source: None,
1487            cancel_source_reason: None,
1488            category: OKXOrderCategory::Normal,
1489            ccy: Ustr::from("USDT"),
1490            cl_ord_id: "test_order_1".to_string(),
1491            algo_cl_ord_id: None,
1492            fee: Some("-1.0".to_string()),
1493            fee_ccy: Ustr::from("USDT"),
1494            fill_px: "50000.0".to_string(),
1495            fill_sz: fill_sz.to_string(),
1496            fill_time: 1746947317402,
1497            inst_id: Ustr::from("BTC-USDT-SWAP"),
1498            inst_type: OKXInstrumentType::Swap,
1499            lever: "2.0".to_string(),
1500            ord_id: Ustr::from(order_id),
1501            ord_type: OKXOrderType::Market,
1502            pnl: "0".to_string(),
1503            pos_side: OKXPositionSide::Long,
1504            px: "".to_string(),
1505            reduce_only: "false".to_string(),
1506            side: OKXSide::Buy,
1507            state: crate::common::enums::OKXOrderStatus::PartiallyFilled,
1508            exec_type: OKXExecType::Taker,
1509            sz: "0.03".to_string(),
1510            td_mode: OKXTradeMode::Isolated,
1511            tgt_ccy: None,
1512            trade_id: trade_id.to_string(),
1513            u_time: 1746947317402,
1514        }
1515    }
1516
1517    #[rstest]
1518    fn test_parse_books_snapshot() {
1519        let json_data = load_test_json("ws_books_snapshot.json");
1520        let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1521        let (okx_books, action): (Vec<OKXBookMsg>, OKXBookAction) = match msg {
1522            OKXWebSocketEvent::BookData { data, action, .. } => (data, action),
1523            _ => panic!("Expected a `BookData` variant"),
1524        };
1525
1526        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1527        let deltas = parse_book_msg(
1528            &okx_books[0],
1529            instrument_id,
1530            2,
1531            1,
1532            &action,
1533            UnixNanos::default(),
1534        )
1535        .unwrap();
1536
1537        assert_eq!(deltas.instrument_id, instrument_id);
1538        assert_eq!(deltas.deltas.len(), 16);
1539        assert_eq!(deltas.flags, 32);
1540        assert_eq!(deltas.sequence, 123456);
1541        assert_eq!(deltas.ts_event, UnixNanos::from(1597026383085000000));
1542        assert_eq!(deltas.ts_init, UnixNanos::default());
1543
1544        // Verify some individual deltas are parsed correctly
1545        assert!(!deltas.deltas.is_empty());
1546        // Snapshot should have both bid and ask deltas
1547        let bid_deltas: Vec<_> = deltas
1548            .deltas
1549            .iter()
1550            .filter(|d| d.order.side == OrderSide::Buy)
1551            .collect();
1552        let ask_deltas: Vec<_> = deltas
1553            .deltas
1554            .iter()
1555            .filter(|d| d.order.side == OrderSide::Sell)
1556            .collect();
1557        assert!(!bid_deltas.is_empty());
1558        assert!(!ask_deltas.is_empty());
1559    }
1560
1561    #[rstest]
1562    fn test_parse_books_update() {
1563        let json_data = load_test_json("ws_books_update.json");
1564        let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1565        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1566        let (okx_books, action): (Vec<OKXBookMsg>, OKXBookAction) = match msg {
1567            OKXWebSocketEvent::BookData { data, action, .. } => (data, action),
1568            _ => panic!("Expected a `BookData` variant"),
1569        };
1570
1571        let deltas = parse_book_msg(
1572            &okx_books[0],
1573            instrument_id,
1574            2,
1575            1,
1576            &action,
1577            UnixNanos::default(),
1578        )
1579        .unwrap();
1580
1581        assert_eq!(deltas.instrument_id, instrument_id);
1582        assert_eq!(deltas.deltas.len(), 16);
1583        assert_eq!(deltas.flags, 0);
1584        assert_eq!(deltas.sequence, 123457);
1585        assert_eq!(deltas.ts_event, UnixNanos::from(1597026383085000000));
1586        assert_eq!(deltas.ts_init, UnixNanos::default());
1587
1588        // Verify some individual deltas are parsed correctly
1589        assert!(!deltas.deltas.is_empty());
1590        // Update should also have both bid and ask deltas
1591        let bid_deltas: Vec<_> = deltas
1592            .deltas
1593            .iter()
1594            .filter(|d| d.order.side == OrderSide::Buy)
1595            .collect();
1596        let ask_deltas: Vec<_> = deltas
1597            .deltas
1598            .iter()
1599            .filter(|d| d.order.side == OrderSide::Sell)
1600            .collect();
1601        assert!(!bid_deltas.is_empty());
1602        assert!(!ask_deltas.is_empty());
1603    }
1604
1605    #[rstest]
1606    fn test_parse_tickers() {
1607        let json_data = load_test_json("ws_tickers.json");
1608        let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1609        let okx_tickers: Vec<OKXTickerMsg> = match msg {
1610            OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
1611            _ => panic!("Expected a `Data` variant"),
1612        };
1613
1614        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1615        let trade =
1616            parse_ticker_msg(&okx_tickers[0], instrument_id, 2, 1, UnixNanos::default()).unwrap();
1617
1618        assert_eq!(trade.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
1619        assert_eq!(trade.bid_price, Price::from("8888.88"));
1620        assert_eq!(trade.ask_price, Price::from("9999.99"));
1621        assert_eq!(trade.bid_size, Quantity::from(5));
1622        assert_eq!(trade.ask_size, Quantity::from(11));
1623        assert_eq!(trade.ts_event, UnixNanos::from(1597026383085000000));
1624        assert_eq!(trade.ts_init, UnixNanos::default());
1625    }
1626
1627    #[rstest]
1628    fn test_parse_quotes() {
1629        let json_data = load_test_json("ws_bbo_tbt.json");
1630        let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1631        let okx_quotes: Vec<OKXBookMsg> = match msg {
1632            OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
1633            _ => panic!("Expected a `Data` variant"),
1634        };
1635        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1636
1637        let quote =
1638            parse_quote_msg(&okx_quotes[0], instrument_id, 2, 1, UnixNanos::default()).unwrap();
1639
1640        assert_eq!(quote.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
1641        assert_eq!(quote.bid_price, Price::from("8476.97"));
1642        assert_eq!(quote.ask_price, Price::from("8476.98"));
1643        assert_eq!(quote.bid_size, Quantity::from(256));
1644        assert_eq!(quote.ask_size, Quantity::from(415));
1645        assert_eq!(quote.ts_event, UnixNanos::from(1597026383085000000));
1646        assert_eq!(quote.ts_init, UnixNanos::default());
1647    }
1648
1649    #[rstest]
1650    fn test_parse_trades() {
1651        let json_data = load_test_json("ws_trades.json");
1652        let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1653        let okx_trades: Vec<OKXTradeMsg> = match msg {
1654            OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
1655            _ => panic!("Expected a `Data` variant"),
1656        };
1657
1658        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1659        let trade =
1660            parse_trade_msg(&okx_trades[0], instrument_id, 1, 8, UnixNanos::default()).unwrap();
1661
1662        assert_eq!(trade.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
1663        assert_eq!(trade.price, Price::from("42219.9"));
1664        assert_eq!(trade.size, Quantity::from("0.12060306"));
1665        assert_eq!(trade.aggressor_side, AggressorSide::Buyer);
1666        assert_eq!(trade.trade_id, TradeId::from("130639474"));
1667        assert_eq!(trade.ts_event, UnixNanos::from(1630048897897000000));
1668        assert_eq!(trade.ts_init, UnixNanos::default());
1669    }
1670
1671    #[rstest]
1672    fn test_parse_candle() {
1673        let json_data = load_test_json("ws_candle.json");
1674        let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1675        let okx_candles: Vec<OKXCandleMsg> = match msg {
1676            OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
1677            _ => panic!("Expected a `Data` variant"),
1678        };
1679
1680        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1681        let bar_type = BarType::new(
1682            instrument_id,
1683            BAR_SPEC_1_DAY_LAST,
1684            AggregationSource::External,
1685        );
1686        let bar = parse_candle_msg(&okx_candles[0], bar_type, 2, 0, UnixNanos::default()).unwrap();
1687
1688        assert_eq!(bar.bar_type, bar_type);
1689        assert_eq!(bar.open, Price::from("8533.02"));
1690        assert_eq!(bar.high, Price::from("8553.74"));
1691        assert_eq!(bar.low, Price::from("8527.17"));
1692        assert_eq!(bar.close, Price::from("8548.26"));
1693        assert_eq!(bar.volume, Quantity::from(45247));
1694        assert_eq!(bar.ts_event, UnixNanos::from(1597026383085000000));
1695        assert_eq!(bar.ts_init, UnixNanos::default());
1696    }
1697
1698    #[rstest]
1699    fn test_parse_funding_rate() {
1700        let json_data = load_test_json("ws_funding_rate.json");
1701        let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1702
1703        let okx_funding_rates: Vec<crate::websocket::messages::OKXFundingRateMsg> = match msg {
1704            OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
1705            _ => panic!("Expected a `Data` variant"),
1706        };
1707
1708        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1709        let funding_rate =
1710            parse_funding_rate_msg(&okx_funding_rates[0], instrument_id, UnixNanos::default())
1711                .unwrap();
1712
1713        assert_eq!(funding_rate.instrument_id, instrument_id);
1714        assert_eq!(funding_rate.rate, Decimal::new(1, 4));
1715        assert_eq!(
1716            funding_rate.next_funding_ns,
1717            Some(UnixNanos::from(1744590349506000000))
1718        );
1719        assert_eq!(funding_rate.ts_event, UnixNanos::from(1744590349506000000));
1720        assert_eq!(funding_rate.ts_init, UnixNanos::default());
1721    }
1722
1723    #[rstest]
1724    fn test_parse_book_vec() {
1725        let json_data = load_test_json("ws_books_snapshot.json");
1726        let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1727        let (msgs, action): (Vec<OKXBookMsg>, OKXBookAction) = match event {
1728            OKXWebSocketEvent::BookData { data, action, .. } => (data, action),
1729            _ => panic!("Expected BookData"),
1730        };
1731
1732        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1733        let deltas_vec =
1734            parse_book_msg_vec(msgs, &instrument_id, 8, 1, action, UnixNanos::default()).unwrap();
1735
1736        assert_eq!(deltas_vec.len(), 1);
1737
1738        if let Data::Deltas(d) = &deltas_vec[0] {
1739            assert_eq!(d.sequence, 123456);
1740        } else {
1741            panic!("Expected Deltas");
1742        }
1743    }
1744
1745    #[rstest]
1746    fn test_parse_ticker_vec() {
1747        let json_data = load_test_json("ws_tickers.json");
1748        let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1749        let data_val: serde_json::Value = match event {
1750            OKXWebSocketEvent::Data { data, .. } => data,
1751            _ => panic!("Expected Data"),
1752        };
1753
1754        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1755        let quotes_vec =
1756            parse_ticker_msg_vec(data_val, &instrument_id, 8, 1, UnixNanos::default()).unwrap();
1757
1758        assert_eq!(quotes_vec.len(), 1);
1759
1760        if let Data::Quote(q) = &quotes_vec[0] {
1761            assert_eq!(q.bid_price, Price::from("8888.88000000"));
1762            assert_eq!(q.ask_price, Price::from("9999.99"));
1763        } else {
1764            panic!("Expected Quote");
1765        }
1766    }
1767
1768    #[rstest]
1769    fn test_parse_trade_vec() {
1770        let json_data = load_test_json("ws_trades.json");
1771        let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1772        let data_val: serde_json::Value = match event {
1773            OKXWebSocketEvent::Data { data, .. } => data,
1774            _ => panic!("Expected Data"),
1775        };
1776
1777        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1778        let trades_vec =
1779            parse_trade_msg_vec(data_val, &instrument_id, 8, 1, UnixNanos::default()).unwrap();
1780
1781        assert_eq!(trades_vec.len(), 1);
1782
1783        if let Data::Trade(t) = &trades_vec[0] {
1784            assert_eq!(t.trade_id, TradeId::new("130639474"));
1785        } else {
1786            panic!("Expected Trade");
1787        }
1788    }
1789
1790    #[rstest]
1791    fn test_parse_candle_vec() {
1792        let json_data = load_test_json("ws_candle.json");
1793        let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1794        let data_val: serde_json::Value = match event {
1795            OKXWebSocketEvent::Data { data, .. } => data,
1796            _ => panic!("Expected Data"),
1797        };
1798
1799        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1800        let bars_vec = parse_candle_msg_vec(
1801            data_val,
1802            &instrument_id,
1803            2,
1804            1,
1805            BAR_SPEC_1_DAY_LAST,
1806            UnixNanos::default(),
1807        )
1808        .unwrap();
1809
1810        assert_eq!(bars_vec.len(), 1);
1811
1812        if let Data::Bar(b) = &bars_vec[0] {
1813            assert_eq!(b.open, Price::from("8533.02"));
1814        } else {
1815            panic!("Expected Bar");
1816        }
1817    }
1818
1819    #[rstest]
1820    fn test_parse_book_message() {
1821        let json_data = load_test_json("ws_bbo_tbt.json");
1822        let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1823        let (okx_books, arg): (Vec<OKXBookMsg>, OKXWebSocketArg) = match msg {
1824            OKXWebSocketEvent::Data { data, arg, .. } => {
1825                (serde_json::from_value(data).unwrap(), arg)
1826            }
1827            _ => panic!("Expected a `Data` variant"),
1828        };
1829
1830        assert_eq!(arg.channel, OKXWsChannel::BboTbt);
1831        assert_eq!(arg.inst_id.as_ref().unwrap(), &Ustr::from("BTC-USDT"));
1832        assert_eq!(arg.inst_type, None);
1833        assert_eq!(okx_books.len(), 1);
1834
1835        let book_msg = &okx_books[0];
1836
1837        // Check asks
1838        assert_eq!(book_msg.asks.len(), 1);
1839        let ask = &book_msg.asks[0];
1840        assert_eq!(ask.price, "8476.98");
1841        assert_eq!(ask.size, "415");
1842        assert_eq!(ask.liquidated_orders_count, "0");
1843        assert_eq!(ask.orders_count, "13");
1844
1845        // Check bids
1846        assert_eq!(book_msg.bids.len(), 1);
1847        let bid = &book_msg.bids[0];
1848        assert_eq!(bid.price, "8476.97");
1849        assert_eq!(bid.size, "256");
1850        assert_eq!(bid.liquidated_orders_count, "0");
1851        assert_eq!(bid.orders_count, "12");
1852        assert_eq!(book_msg.ts, 1597026383085);
1853        assert_eq!(book_msg.seq_id, 123456);
1854        assert_eq!(book_msg.checksum, None);
1855        assert_eq!(book_msg.prev_seq_id, None);
1856    }
1857
1858    #[rstest]
1859    fn test_parse_ws_account_message() {
1860        let json_data = load_test_json("ws_account.json");
1861        let accounts: Vec<OKXAccount> = serde_json::from_str(&json_data).unwrap();
1862
1863        assert_eq!(accounts.len(), 1);
1864        let account = &accounts[0];
1865
1866        assert_eq!(account.total_eq, "100.56089404807182");
1867        assert_eq!(account.details.len(), 3);
1868
1869        let usdt_detail = &account.details[0];
1870        assert_eq!(usdt_detail.ccy, "USDT");
1871        assert_eq!(usdt_detail.avail_bal, "100.52768569797846");
1872        assert_eq!(usdt_detail.cash_bal, "100.52768569797846");
1873
1874        let btc_detail = &account.details[1];
1875        assert_eq!(btc_detail.ccy, "BTC");
1876        assert_eq!(btc_detail.avail_bal, "0.0000000051");
1877
1878        let eth_detail = &account.details[2];
1879        assert_eq!(eth_detail.ccy, "ETH");
1880        assert_eq!(eth_detail.avail_bal, "0.000000185");
1881
1882        let account_id = AccountId::new("OKX-001");
1883        let ts_init = nautilus_core::nanos::UnixNanos::default();
1884        let account_state = parse_account_state(account, account_id, ts_init);
1885
1886        assert!(account_state.is_ok());
1887        let state = account_state.unwrap();
1888        assert_eq!(state.account_id, account_id);
1889        assert_eq!(state.balances.len(), 3);
1890    }
1891
1892    #[rstest]
1893    fn test_parse_order_msg() {
1894        let json_data = load_test_json("ws_orders.json");
1895        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
1896
1897        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
1898
1899        let account_id = AccountId::new("OKX-001");
1900        let mut instruments = AHashMap::new();
1901
1902        // Create a mock instrument for testing
1903        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1904        let instrument = CryptoPerpetual::new(
1905            instrument_id,
1906            Symbol::from("BTC-USDT-SWAP"),
1907            Currency::BTC(),
1908            Currency::USDT(),
1909            Currency::USDT(),
1910            false, // is_inverse
1911            2,     // price_precision
1912            8,     // size_precision
1913            Price::from("0.01"),
1914            Quantity::from("0.00000001"),
1915            None, // multiplier
1916            None, // lot_size
1917            None, // max_quantity
1918            None, // min_quantity
1919            None, // max_notional
1920            None, // min_notional
1921            None, // max_price
1922            None, // min_price
1923            None, // margin_init
1924            None, // margin_maint
1925            None, // maker_fee
1926            None, // taker_fee
1927            UnixNanos::default(),
1928            UnixNanos::default(),
1929        );
1930
1931        instruments.insert(
1932            Ustr::from("BTC-USDT-SWAP"),
1933            InstrumentAny::CryptoPerpetual(instrument),
1934        );
1935
1936        let ts_init = UnixNanos::default();
1937        let fee_cache = AHashMap::new();
1938        let filled_qty_cache = AHashMap::new();
1939
1940        let result = parse_order_msg_vec(
1941            data,
1942            account_id,
1943            &instruments,
1944            &fee_cache,
1945            &filled_qty_cache,
1946            ts_init,
1947        );
1948
1949        assert!(result.is_ok());
1950        let order_reports = result.unwrap();
1951        assert_eq!(order_reports.len(), 1);
1952
1953        // Verify the parsed order report
1954        let report = &order_reports[0];
1955
1956        if let ExecutionReport::Fill(fill_report) = report {
1957            assert_eq!(fill_report.account_id, account_id);
1958            assert_eq!(fill_report.instrument_id, instrument_id);
1959            assert_eq!(
1960                fill_report.client_order_id,
1961                Some(ClientOrderId::new("001BTCUSDT20250106001"))
1962            );
1963            assert_eq!(
1964                fill_report.venue_order_id,
1965                VenueOrderId::new("2497956918703120384")
1966            );
1967            assert_eq!(fill_report.trade_id, TradeId::from("1518905529"));
1968            assert_eq!(fill_report.order_side, OrderSide::Buy);
1969            assert_eq!(fill_report.last_px, Price::from("103698.90"));
1970            assert_eq!(fill_report.last_qty, Quantity::from("0.03000000"));
1971            assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
1972        } else {
1973            panic!("Expected Fill report for filled order");
1974        }
1975    }
1976
1977    #[rstest]
1978    fn test_parse_order_status_report() {
1979        let json_data = load_test_json("ws_orders.json");
1980        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
1981        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
1982        let order_msg = &data[0];
1983
1984        let account_id = AccountId::new("OKX-001");
1985        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1986        let instrument = CryptoPerpetual::new(
1987            instrument_id,
1988            Symbol::from("BTC-USDT-SWAP"),
1989            Currency::BTC(),
1990            Currency::USDT(),
1991            Currency::USDT(),
1992            false, // is_inverse
1993            2,     // price_precision
1994            8,     // size_precision
1995            Price::from("0.01"),
1996            Quantity::from("0.00000001"),
1997            None,
1998            None,
1999            None,
2000            None,
2001            None,
2002            None,
2003            None,
2004            None,
2005            None,
2006            None,
2007            None,
2008            None,
2009            UnixNanos::default(),
2010            UnixNanos::default(),
2011        );
2012
2013        let ts_init = UnixNanos::default();
2014
2015        let result = parse_order_status_report(
2016            order_msg,
2017            &InstrumentAny::CryptoPerpetual(instrument),
2018            account_id,
2019            ts_init,
2020        );
2021
2022        assert!(result.is_ok());
2023        let order_status_report = result.unwrap();
2024
2025        assert_eq!(order_status_report.account_id, account_id);
2026        assert_eq!(order_status_report.instrument_id, instrument_id);
2027        assert_eq!(
2028            order_status_report.client_order_id,
2029            Some(ClientOrderId::new("001BTCUSDT20250106001"))
2030        );
2031        assert_eq!(
2032            order_status_report.venue_order_id,
2033            VenueOrderId::new("2497956918703120384")
2034        );
2035        assert_eq!(order_status_report.order_side, OrderSide::Buy);
2036        assert_eq!(order_status_report.order_status, OrderStatus::Filled);
2037        assert_eq!(order_status_report.quantity, Quantity::from("0.03000000"));
2038        assert_eq!(order_status_report.filled_qty, Quantity::from("0.03000000"));
2039    }
2040
2041    #[rstest]
2042    fn test_parse_fill_report() {
2043        let json_data = load_test_json("ws_orders.json");
2044        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
2045        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
2046        let order_msg = &data[0];
2047
2048        let account_id = AccountId::new("OKX-001");
2049        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2050        let instrument = CryptoPerpetual::new(
2051            instrument_id,
2052            Symbol::from("BTC-USDT-SWAP"),
2053            Currency::BTC(),
2054            Currency::USDT(),
2055            Currency::USDT(),
2056            false, // is_inverse
2057            2,     // price_precision
2058            8,     // size_precision
2059            Price::from("0.01"),
2060            Quantity::from("0.00000001"),
2061            None,
2062            None,
2063            None,
2064            None,
2065            None,
2066            None,
2067            None,
2068            None,
2069            None,
2070            None,
2071            None,
2072            None,
2073            UnixNanos::default(),
2074            UnixNanos::default(),
2075        );
2076
2077        let ts_init = UnixNanos::default();
2078
2079        let result = parse_fill_report(
2080            order_msg,
2081            &InstrumentAny::CryptoPerpetual(instrument),
2082            account_id,
2083            None,
2084            None,
2085            ts_init,
2086        );
2087
2088        assert!(result.is_ok());
2089        let fill_report = result.unwrap();
2090
2091        assert_eq!(fill_report.account_id, account_id);
2092        assert_eq!(fill_report.instrument_id, instrument_id);
2093        assert_eq!(
2094            fill_report.client_order_id,
2095            Some(ClientOrderId::new("001BTCUSDT20250106001"))
2096        );
2097        assert_eq!(
2098            fill_report.venue_order_id,
2099            VenueOrderId::new("2497956918703120384")
2100        );
2101        assert_eq!(fill_report.trade_id, TradeId::from("1518905529"));
2102        assert_eq!(fill_report.order_side, OrderSide::Buy);
2103        assert_eq!(fill_report.last_px, Price::from("103698.90"));
2104        assert_eq!(fill_report.last_qty, Quantity::from("0.03000000"));
2105        assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
2106    }
2107
2108    #[rstest]
2109    fn test_parse_book10_msg() {
2110        let json_data = load_test_json("ws_books_snapshot.json");
2111        let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
2112        let msgs: Vec<OKXBookMsg> = match event {
2113            OKXWebSocketEvent::BookData { data, .. } => data,
2114            _ => panic!("Expected BookData"),
2115        };
2116
2117        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2118        let depth10 =
2119            parse_book10_msg(&msgs[0], instrument_id, 2, 0, UnixNanos::default()).unwrap();
2120
2121        assert_eq!(depth10.instrument_id, instrument_id);
2122        assert_eq!(depth10.sequence, 123456);
2123        assert_eq!(depth10.ts_event, UnixNanos::from(1597026383085000000));
2124        assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
2125
2126        // Check bid levels (available in test data: 8 levels)
2127        assert_eq!(depth10.bids[0].price, Price::from("8476.97"));
2128        assert_eq!(depth10.bids[0].size, Quantity::from("256"));
2129        assert_eq!(depth10.bids[0].side, OrderSide::Buy);
2130        assert_eq!(depth10.bid_counts[0], 12);
2131
2132        assert_eq!(depth10.bids[1].price, Price::from("8475.55"));
2133        assert_eq!(depth10.bids[1].size, Quantity::from("101"));
2134        assert_eq!(depth10.bid_counts[1], 1);
2135
2136        // Check that levels beyond available data are padded with empty orders
2137        assert_eq!(depth10.bids[8].price, Price::from("0"));
2138        assert_eq!(depth10.bids[8].size, Quantity::from("0"));
2139        assert_eq!(depth10.bid_counts[8], 0);
2140
2141        // Check ask levels (available in test data: 8 levels)
2142        assert_eq!(depth10.asks[0].price, Price::from("8476.98"));
2143        assert_eq!(depth10.asks[0].size, Quantity::from("415"));
2144        assert_eq!(depth10.asks[0].side, OrderSide::Sell);
2145        assert_eq!(depth10.ask_counts[0], 13);
2146
2147        assert_eq!(depth10.asks[1].price, Price::from("8477.00"));
2148        assert_eq!(depth10.asks[1].size, Quantity::from("7"));
2149        assert_eq!(depth10.ask_counts[1], 2);
2150
2151        // Check that levels beyond available data are padded with empty orders
2152        assert_eq!(depth10.asks[8].price, Price::from("0"));
2153        assert_eq!(depth10.asks[8].size, Quantity::from("0"));
2154        assert_eq!(depth10.ask_counts[8], 0);
2155    }
2156
2157    #[rstest]
2158    fn test_parse_book10_msg_vec() {
2159        let json_data = load_test_json("ws_books_snapshot.json");
2160        let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
2161        let msgs: Vec<OKXBookMsg> = match event {
2162            OKXWebSocketEvent::BookData { data, .. } => data,
2163            _ => panic!("Expected BookData"),
2164        };
2165
2166        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2167        let depth10_vec =
2168            parse_book10_msg_vec(msgs, &instrument_id, 2, 0, UnixNanos::default()).unwrap();
2169
2170        assert_eq!(depth10_vec.len(), 1);
2171
2172        if let Data::Depth10(d) = &depth10_vec[0] {
2173            assert_eq!(d.instrument_id, instrument_id);
2174            assert_eq!(d.sequence, 123456);
2175            assert_eq!(d.bids[0].price, Price::from("8476.97"));
2176            assert_eq!(d.asks[0].price, Price::from("8476.98"));
2177        } else {
2178            panic!("Expected Depth10");
2179        }
2180    }
2181
2182    #[rstest]
2183    fn test_parse_fill_report_with_fee_cache() {
2184        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2185        let instrument = CryptoPerpetual::new(
2186            instrument_id,
2187            Symbol::from("BTC-USDT-SWAP"),
2188            Currency::BTC(),
2189            Currency::USDT(),
2190            Currency::USDT(),
2191            false, // is_inverse
2192            2,     // price_precision
2193            8,     // size_precision
2194            Price::from("0.01"),
2195            Quantity::from("0.00000001"),
2196            None, // multiplier
2197            None, // lot_size
2198            None, // max_quantity
2199            None, // min_quantity
2200            None, // max_notional
2201            None, // min_notional
2202            None, // max_price
2203            None, // min_price
2204            None, // margin_init
2205            None, // margin_maint
2206            None, // maker_fee
2207            None, // taker_fee
2208            UnixNanos::default(),
2209            UnixNanos::default(),
2210        );
2211
2212        let account_id = AccountId::new("OKX-001");
2213        let ts_init = UnixNanos::default();
2214
2215        // First fill: 0.01 BTC out of 0.03 BTC total (1/3)
2216        let order_msg_1 = OKXOrderMsg {
2217            acc_fill_sz: Some("0.01".to_string()),
2218            avg_px: "50000.0".to_string(),
2219            c_time: 1746947317401,
2220            cancel_source: None,
2221            cancel_source_reason: None,
2222            category: OKXOrderCategory::Normal,
2223            ccy: Ustr::from("USDT"),
2224            cl_ord_id: "test_order_1".to_string(),
2225            algo_cl_ord_id: None,
2226            fee: Some("-1.0".to_string()), // Total fee so far
2227            fee_ccy: Ustr::from("USDT"),
2228            fill_px: "50000.0".to_string(),
2229            fill_sz: "0.01".to_string(),
2230            fill_time: 1746947317402,
2231            inst_id: Ustr::from("BTC-USDT-SWAP"),
2232            inst_type: crate::common::enums::OKXInstrumentType::Swap,
2233            lever: "2.0".to_string(),
2234            ord_id: Ustr::from("1234567890"),
2235            ord_type: OKXOrderType::Market,
2236            pnl: "0".to_string(),
2237            pos_side: OKXPositionSide::Long,
2238            px: "".to_string(),
2239            reduce_only: "false".to_string(),
2240            side: crate::common::enums::OKXSide::Buy,
2241            state: crate::common::enums::OKXOrderStatus::PartiallyFilled,
2242            exec_type: crate::common::enums::OKXExecType::Maker,
2243            sz: "0.03".to_string(), // Total order size
2244            td_mode: OKXTradeMode::Isolated,
2245            tgt_ccy: None,
2246            trade_id: "trade_1".to_string(),
2247            u_time: 1746947317402,
2248        };
2249
2250        let fill_report_1 = parse_fill_report(
2251            &order_msg_1,
2252            &InstrumentAny::CryptoPerpetual(instrument),
2253            account_id,
2254            None,
2255            None,
2256            ts_init,
2257        )
2258        .unwrap();
2259
2260        // First fill should get the full fee since there's no previous fee
2261        assert_eq!(fill_report_1.commission, Money::new(1.0, Currency::USDT()));
2262
2263        // Second fill: 0.02 BTC more, now 0.03 BTC total (completely filled)
2264        let order_msg_2 = OKXOrderMsg {
2265            acc_fill_sz: Some("0.03".to_string()),
2266            avg_px: "50000.0".to_string(),
2267            c_time: 1746947317401,
2268            cancel_source: None,
2269            cancel_source_reason: None,
2270            category: OKXOrderCategory::Normal,
2271            ccy: Ustr::from("USDT"),
2272            cl_ord_id: "test_order_1".to_string(),
2273            algo_cl_ord_id: None,
2274            fee: Some("-3.0".to_string()), // Same total fee
2275            fee_ccy: Ustr::from("USDT"),
2276            fill_px: "50000.0".to_string(),
2277            fill_sz: "0.02".to_string(),
2278            fill_time: 1746947317403,
2279            inst_id: Ustr::from("BTC-USDT-SWAP"),
2280            inst_type: crate::common::enums::OKXInstrumentType::Swap,
2281            lever: "2.0".to_string(),
2282            ord_id: Ustr::from("1234567890"),
2283            ord_type: OKXOrderType::Market,
2284            pnl: "0".to_string(),
2285            pos_side: OKXPositionSide::Long,
2286            px: "".to_string(),
2287            reduce_only: "false".to_string(),
2288            side: crate::common::enums::OKXSide::Buy,
2289            state: crate::common::enums::OKXOrderStatus::Filled,
2290            exec_type: crate::common::enums::OKXExecType::Maker,
2291            sz: "0.03".to_string(), // Same total order size
2292            td_mode: OKXTradeMode::Isolated,
2293            tgt_ccy: None,
2294            trade_id: "trade_2".to_string(),
2295            u_time: 1746947317403,
2296        };
2297
2298        let fill_report_2 = parse_fill_report(
2299            &order_msg_2,
2300            &InstrumentAny::CryptoPerpetual(instrument),
2301            account_id,
2302            Some(fill_report_1.commission),
2303            Some(fill_report_1.last_qty),
2304            ts_init,
2305        )
2306        .unwrap();
2307
2308        // Second fill should get total_fee - previous_fee = 3.0 - 1.0 = 2.0
2309        assert_eq!(fill_report_2.commission, Money::new(2.0, Currency::USDT()));
2310
2311        // Test passed - fee was correctly split proportionally
2312    }
2313
2314    #[rstest]
2315    fn test_parse_fill_report_with_maker_rebates() {
2316        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2317        let instrument = CryptoPerpetual::new(
2318            instrument_id,
2319            Symbol::from("BTC-USDT-SWAP"),
2320            Currency::BTC(),
2321            Currency::USDT(),
2322            Currency::USDT(),
2323            false,
2324            2,
2325            8,
2326            Price::from("0.01"),
2327            Quantity::from("0.00000001"),
2328            None,
2329            None,
2330            None,
2331            None,
2332            None,
2333            None,
2334            None,
2335            None,
2336            None,
2337            None,
2338            None,
2339            None,
2340            UnixNanos::default(),
2341            UnixNanos::default(),
2342        );
2343
2344        let account_id = AccountId::new("OKX-001");
2345        let ts_init = UnixNanos::default();
2346
2347        // First fill: maker rebate of $0.5 (OKX sends as "0.5", parse_fee makes it -0.5)
2348        let order_msg_1 = OKXOrderMsg {
2349            acc_fill_sz: Some("0.01".to_string()),
2350            avg_px: "50000.0".to_string(),
2351            c_time: 1746947317401,
2352            cancel_source: None,
2353            cancel_source_reason: None,
2354            category: OKXOrderCategory::Normal,
2355            ccy: Ustr::from("USDT"),
2356            cl_ord_id: "test_order_rebate".to_string(),
2357            algo_cl_ord_id: None,
2358            fee: Some("0.5".to_string()), // Rebate: positive value from OKX
2359            fee_ccy: Ustr::from("USDT"),
2360            fill_px: "50000.0".to_string(),
2361            fill_sz: "0.01".to_string(),
2362            fill_time: 1746947317402,
2363            inst_id: Ustr::from("BTC-USDT-SWAP"),
2364            inst_type: crate::common::enums::OKXInstrumentType::Swap,
2365            lever: "2.0".to_string(),
2366            ord_id: Ustr::from("rebate_order_123"),
2367            ord_type: OKXOrderType::Market,
2368            pnl: "0".to_string(),
2369            pos_side: OKXPositionSide::Long,
2370            px: "".to_string(),
2371            reduce_only: "false".to_string(),
2372            side: crate::common::enums::OKXSide::Buy,
2373            state: crate::common::enums::OKXOrderStatus::PartiallyFilled,
2374            exec_type: crate::common::enums::OKXExecType::Maker,
2375            sz: "0.02".to_string(),
2376            td_mode: OKXTradeMode::Isolated,
2377            tgt_ccy: None,
2378            trade_id: "trade_rebate_1".to_string(),
2379            u_time: 1746947317402,
2380        };
2381
2382        let fill_report_1 = parse_fill_report(
2383            &order_msg_1,
2384            &InstrumentAny::CryptoPerpetual(instrument),
2385            account_id,
2386            None,
2387            None,
2388            ts_init,
2389        )
2390        .unwrap();
2391
2392        // First fill gets the full rebate (negative commission)
2393        assert_eq!(fill_report_1.commission, Money::new(-0.5, Currency::USDT()));
2394
2395        // Second fill: another maker rebate of $0.3, cumulative now $0.8
2396        let order_msg_2 = OKXOrderMsg {
2397            acc_fill_sz: Some("0.02".to_string()),
2398            avg_px: "50000.0".to_string(),
2399            c_time: 1746947317401,
2400            cancel_source: None,
2401            cancel_source_reason: None,
2402            category: OKXOrderCategory::Normal,
2403            ccy: Ustr::from("USDT"),
2404            cl_ord_id: "test_order_rebate".to_string(),
2405            algo_cl_ord_id: None,
2406            fee: Some("0.8".to_string()), // Cumulative rebate
2407            fee_ccy: Ustr::from("USDT"),
2408            fill_px: "50000.0".to_string(),
2409            fill_sz: "0.01".to_string(),
2410            fill_time: 1746947317403,
2411            inst_id: Ustr::from("BTC-USDT-SWAP"),
2412            inst_type: crate::common::enums::OKXInstrumentType::Swap,
2413            lever: "2.0".to_string(),
2414            ord_id: Ustr::from("rebate_order_123"),
2415            ord_type: OKXOrderType::Market,
2416            pnl: "0".to_string(),
2417            pos_side: OKXPositionSide::Long,
2418            px: "".to_string(),
2419            reduce_only: "false".to_string(),
2420            side: crate::common::enums::OKXSide::Buy,
2421            state: crate::common::enums::OKXOrderStatus::Filled,
2422            exec_type: crate::common::enums::OKXExecType::Maker,
2423            sz: "0.02".to_string(),
2424            td_mode: OKXTradeMode::Isolated,
2425            tgt_ccy: None,
2426            trade_id: "trade_rebate_2".to_string(),
2427            u_time: 1746947317403,
2428        };
2429
2430        let fill_report_2 = parse_fill_report(
2431            &order_msg_2,
2432            &InstrumentAny::CryptoPerpetual(instrument),
2433            account_id,
2434            Some(fill_report_1.commission),
2435            Some(fill_report_1.last_qty),
2436            ts_init,
2437        )
2438        .unwrap();
2439
2440        // Second fill: incremental = -0.8 - (-0.5) = -0.3
2441        assert_eq!(fill_report_2.commission, Money::new(-0.3, Currency::USDT()));
2442    }
2443
2444    #[rstest]
2445    fn test_parse_fill_report_rebate_to_charge_transition() {
2446        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2447        let instrument = CryptoPerpetual::new(
2448            instrument_id,
2449            Symbol::from("BTC-USDT-SWAP"),
2450            Currency::BTC(),
2451            Currency::USDT(),
2452            Currency::USDT(),
2453            false,
2454            2,
2455            8,
2456            Price::from("0.01"),
2457            Quantity::from("0.00000001"),
2458            None,
2459            None,
2460            None,
2461            None,
2462            None,
2463            None,
2464            None,
2465            None,
2466            None,
2467            None,
2468            None,
2469            None,
2470            UnixNanos::default(),
2471            UnixNanos::default(),
2472        );
2473
2474        let account_id = AccountId::new("OKX-001");
2475        let ts_init = UnixNanos::default();
2476
2477        // First fill: maker rebate of $1.0
2478        let order_msg_1 = OKXOrderMsg {
2479            acc_fill_sz: Some("0.01".to_string()),
2480            avg_px: "50000.0".to_string(),
2481            c_time: 1746947317401,
2482            cancel_source: None,
2483            cancel_source_reason: None,
2484            category: OKXOrderCategory::Normal,
2485            ccy: Ustr::from("USDT"),
2486            cl_ord_id: "test_order_transition".to_string(),
2487            algo_cl_ord_id: None,
2488            fee: Some("1.0".to_string()), // Rebate from OKX
2489            fee_ccy: Ustr::from("USDT"),
2490            fill_px: "50000.0".to_string(),
2491            fill_sz: "0.01".to_string(),
2492            fill_time: 1746947317402,
2493            inst_id: Ustr::from("BTC-USDT-SWAP"),
2494            inst_type: crate::common::enums::OKXInstrumentType::Swap,
2495            lever: "2.0".to_string(),
2496            ord_id: Ustr::from("transition_order_456"),
2497            ord_type: OKXOrderType::Market,
2498            pnl: "0".to_string(),
2499            pos_side: OKXPositionSide::Long,
2500            px: "".to_string(),
2501            reduce_only: "false".to_string(),
2502            side: crate::common::enums::OKXSide::Buy,
2503            state: crate::common::enums::OKXOrderStatus::PartiallyFilled,
2504            exec_type: crate::common::enums::OKXExecType::Maker,
2505            sz: "0.02".to_string(),
2506            td_mode: OKXTradeMode::Isolated,
2507            tgt_ccy: None,
2508            trade_id: "trade_transition_1".to_string(),
2509            u_time: 1746947317402,
2510        };
2511
2512        let fill_report_1 = parse_fill_report(
2513            &order_msg_1,
2514            &InstrumentAny::CryptoPerpetual(instrument),
2515            account_id,
2516            None,
2517            None,
2518            ts_init,
2519        )
2520        .unwrap();
2521
2522        // First fill gets rebate (negative)
2523        assert_eq!(fill_report_1.commission, Money::new(-1.0, Currency::USDT()));
2524
2525        // Second fill: taker charge of $5.0, net cumulative is now $2.0 charge
2526        // This is the edge case: incremental = 2.0 - (-1.0) = 3.0, which exceeds total (2.0)
2527        // But it's legitimate, not corruption
2528        let order_msg_2 = OKXOrderMsg {
2529            acc_fill_sz: Some("0.02".to_string()),
2530            avg_px: "50000.0".to_string(),
2531            c_time: 1746947317401,
2532            cancel_source: None,
2533            cancel_source_reason: None,
2534            category: OKXOrderCategory::Normal,
2535            ccy: Ustr::from("USDT"),
2536            cl_ord_id: "test_order_transition".to_string(),
2537            algo_cl_ord_id: None,
2538            fee: Some("-2.0".to_string()), // Now a charge (negative from OKX)
2539            fee_ccy: Ustr::from("USDT"),
2540            fill_px: "50000.0".to_string(),
2541            fill_sz: "0.01".to_string(),
2542            fill_time: 1746947317403,
2543            inst_id: Ustr::from("BTC-USDT-SWAP"),
2544            inst_type: crate::common::enums::OKXInstrumentType::Swap,
2545            lever: "2.0".to_string(),
2546            ord_id: Ustr::from("transition_order_456"),
2547            ord_type: OKXOrderType::Market,
2548            pnl: "0".to_string(),
2549            pos_side: OKXPositionSide::Long,
2550            px: "".to_string(),
2551            reduce_only: "false".to_string(),
2552            side: crate::common::enums::OKXSide::Buy,
2553            state: crate::common::enums::OKXOrderStatus::Filled,
2554            exec_type: crate::common::enums::OKXExecType::Taker,
2555            sz: "0.02".to_string(),
2556            td_mode: OKXTradeMode::Isolated,
2557            tgt_ccy: None,
2558            trade_id: "trade_transition_2".to_string(),
2559            u_time: 1746947317403,
2560        };
2561
2562        let fill_report_2 = parse_fill_report(
2563            &order_msg_2,
2564            &InstrumentAny::CryptoPerpetual(instrument),
2565            account_id,
2566            Some(fill_report_1.commission),
2567            Some(fill_report_1.last_qty),
2568            ts_init,
2569        )
2570        .unwrap();
2571
2572        // Second fill: incremental = 2.0 - (-1.0) = 3.0
2573        // This should NOT trigger corruption detection because previous was negative
2574        assert_eq!(fill_report_2.commission, Money::new(3.0, Currency::USDT()));
2575    }
2576
2577    #[rstest]
2578    fn test_parse_fill_report_negative_incremental() {
2579        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2580        let instrument = CryptoPerpetual::new(
2581            instrument_id,
2582            Symbol::from("BTC-USDT-SWAP"),
2583            Currency::BTC(),
2584            Currency::USDT(),
2585            Currency::USDT(),
2586            false,
2587            2,
2588            8,
2589            Price::from("0.01"),
2590            Quantity::from("0.00000001"),
2591            None,
2592            None,
2593            None,
2594            None,
2595            None,
2596            None,
2597            None,
2598            None,
2599            None,
2600            None,
2601            None,
2602            None,
2603            UnixNanos::default(),
2604            UnixNanos::default(),
2605        );
2606
2607        let account_id = AccountId::new("OKX-001");
2608        let ts_init = UnixNanos::default();
2609
2610        // First fill: charge of $2.0
2611        let order_msg_1 = OKXOrderMsg {
2612            acc_fill_sz: Some("0.01".to_string()),
2613            avg_px: "50000.0".to_string(),
2614            c_time: 1746947317401,
2615            cancel_source: None,
2616            cancel_source_reason: None,
2617            category: OKXOrderCategory::Normal,
2618            ccy: Ustr::from("USDT"),
2619            cl_ord_id: "test_order_neg_inc".to_string(),
2620            algo_cl_ord_id: None,
2621            fee: Some("-2.0".to_string()),
2622            fee_ccy: Ustr::from("USDT"),
2623            fill_px: "50000.0".to_string(),
2624            fill_sz: "0.01".to_string(),
2625            fill_time: 1746947317402,
2626            inst_id: Ustr::from("BTC-USDT-SWAP"),
2627            inst_type: crate::common::enums::OKXInstrumentType::Swap,
2628            lever: "2.0".to_string(),
2629            ord_id: Ustr::from("neg_inc_order_789"),
2630            ord_type: OKXOrderType::Market,
2631            pnl: "0".to_string(),
2632            pos_side: OKXPositionSide::Long,
2633            px: "".to_string(),
2634            reduce_only: "false".to_string(),
2635            side: crate::common::enums::OKXSide::Buy,
2636            state: crate::common::enums::OKXOrderStatus::PartiallyFilled,
2637            exec_type: crate::common::enums::OKXExecType::Taker,
2638            sz: "0.02".to_string(),
2639            td_mode: OKXTradeMode::Isolated,
2640            tgt_ccy: None,
2641            trade_id: "trade_neg_inc_1".to_string(),
2642            u_time: 1746947317402,
2643        };
2644
2645        let fill_report_1 = parse_fill_report(
2646            &order_msg_1,
2647            &InstrumentAny::CryptoPerpetual(instrument),
2648            account_id,
2649            None,
2650            None,
2651            ts_init,
2652        )
2653        .unwrap();
2654
2655        assert_eq!(fill_report_1.commission, Money::new(2.0, Currency::USDT()));
2656
2657        // Second fill: charge reduced to $1.5 total (refund or maker rebate on this fill)
2658        // Incremental = 1.5 - 2.0 = -0.5 (negative incremental triggers debug log)
2659        let order_msg_2 = OKXOrderMsg {
2660            acc_fill_sz: Some("0.02".to_string()),
2661            avg_px: "50000.0".to_string(),
2662            c_time: 1746947317401,
2663            cancel_source: None,
2664            cancel_source_reason: None,
2665            category: OKXOrderCategory::Normal,
2666            ccy: Ustr::from("USDT"),
2667            cl_ord_id: "test_order_neg_inc".to_string(),
2668            algo_cl_ord_id: None,
2669            fee: Some("-1.5".to_string()), // Total reduced
2670            fee_ccy: Ustr::from("USDT"),
2671            fill_px: "50000.0".to_string(),
2672            fill_sz: "0.01".to_string(),
2673            fill_time: 1746947317403,
2674            inst_id: Ustr::from("BTC-USDT-SWAP"),
2675            inst_type: crate::common::enums::OKXInstrumentType::Swap,
2676            lever: "2.0".to_string(),
2677            ord_id: Ustr::from("neg_inc_order_789"),
2678            ord_type: OKXOrderType::Market,
2679            pnl: "0".to_string(),
2680            pos_side: OKXPositionSide::Long,
2681            px: "".to_string(),
2682            reduce_only: "false".to_string(),
2683            side: crate::common::enums::OKXSide::Buy,
2684            state: crate::common::enums::OKXOrderStatus::Filled,
2685            exec_type: crate::common::enums::OKXExecType::Maker,
2686            sz: "0.02".to_string(),
2687            td_mode: OKXTradeMode::Isolated,
2688            tgt_ccy: None,
2689            trade_id: "trade_neg_inc_2".to_string(),
2690            u_time: 1746947317403,
2691        };
2692
2693        let fill_report_2 = parse_fill_report(
2694            &order_msg_2,
2695            &InstrumentAny::CryptoPerpetual(instrument),
2696            account_id,
2697            Some(fill_report_1.commission),
2698            Some(fill_report_1.last_qty),
2699            ts_init,
2700        )
2701        .unwrap();
2702
2703        // Incremental is negative: 1.5 - 2.0 = -0.5
2704        assert_eq!(fill_report_2.commission, Money::new(-0.5, Currency::USDT()));
2705    }
2706
2707    #[rstest]
2708    fn test_parse_fill_report_empty_fill_sz_first_fill() {
2709        let instrument = create_stub_instrument();
2710        let account_id = AccountId::new("OKX-001");
2711        let ts_init = UnixNanos::default();
2712
2713        let order_msg =
2714            create_stub_order_msg("", Some("0.01".to_string()), "1234567890", "trade_1");
2715
2716        let fill_report = parse_fill_report(
2717            &order_msg,
2718            &InstrumentAny::CryptoPerpetual(instrument),
2719            account_id,
2720            None,
2721            None,
2722            ts_init,
2723        )
2724        .unwrap();
2725
2726        assert_eq!(fill_report.last_qty, Quantity::from("0.01"));
2727    }
2728
2729    #[rstest]
2730    fn test_parse_fill_report_empty_fill_sz_subsequent_fills() {
2731        let instrument = create_stub_instrument();
2732        let account_id = AccountId::new("OKX-001");
2733        let ts_init = UnixNanos::default();
2734
2735        let order_msg_1 =
2736            create_stub_order_msg("", Some("0.01".to_string()), "1234567890", "trade_1");
2737
2738        let fill_report_1 = parse_fill_report(
2739            &order_msg_1,
2740            &InstrumentAny::CryptoPerpetual(instrument),
2741            account_id,
2742            None,
2743            None,
2744            ts_init,
2745        )
2746        .unwrap();
2747
2748        assert_eq!(fill_report_1.last_qty, Quantity::from("0.01"));
2749
2750        let order_msg_2 =
2751            create_stub_order_msg("", Some("0.03".to_string()), "1234567890", "trade_2");
2752
2753        let fill_report_2 = parse_fill_report(
2754            &order_msg_2,
2755            &InstrumentAny::CryptoPerpetual(instrument),
2756            account_id,
2757            Some(fill_report_1.commission),
2758            Some(fill_report_1.last_qty),
2759            ts_init,
2760        )
2761        .unwrap();
2762
2763        assert_eq!(fill_report_2.last_qty, Quantity::from("0.02"));
2764    }
2765
2766    #[rstest]
2767    fn test_parse_fill_report_error_both_empty() {
2768        let instrument = create_stub_instrument();
2769        let account_id = AccountId::new("OKX-001");
2770        let ts_init = UnixNanos::default();
2771
2772        let order_msg = create_stub_order_msg("", Some("".to_string()), "1234567890", "trade_1");
2773
2774        let result = parse_fill_report(
2775            &order_msg,
2776            &InstrumentAny::CryptoPerpetual(instrument),
2777            account_id,
2778            None,
2779            None,
2780            ts_init,
2781        );
2782
2783        assert!(result.is_err());
2784        let err_msg = result.unwrap_err().to_string();
2785        assert!(err_msg.contains("Cannot determine fill quantity"));
2786        assert!(err_msg.contains("empty/zero"));
2787    }
2788
2789    #[rstest]
2790    fn test_parse_fill_report_error_acc_fill_sz_none() {
2791        let instrument = create_stub_instrument();
2792        let account_id = AccountId::new("OKX-001");
2793        let ts_init = UnixNanos::default();
2794
2795        let order_msg = create_stub_order_msg("", None, "1234567890", "trade_1");
2796
2797        let result = parse_fill_report(
2798            &order_msg,
2799            &InstrumentAny::CryptoPerpetual(instrument),
2800            account_id,
2801            None,
2802            None,
2803            ts_init,
2804        );
2805
2806        assert!(result.is_err());
2807        let err_msg = result.unwrap_err().to_string();
2808        assert!(err_msg.contains("Cannot determine fill quantity"));
2809        assert!(err_msg.contains("acc_fill_sz is None"));
2810    }
2811
2812    #[rstest]
2813    fn test_parse_order_msg_acc_fill_sz_only_update() {
2814        // Test that we emit fill reports when OKX only updates acc_fill_sz without fill_sz or trade_id
2815        let instrument = create_stub_instrument();
2816        let account_id = AccountId::new("OKX-001");
2817        let ts_init = UnixNanos::default();
2818
2819        let mut instruments = AHashMap::new();
2820        instruments.insert(
2821            Ustr::from("BTC-USDT-SWAP"),
2822            InstrumentAny::CryptoPerpetual(instrument),
2823        );
2824
2825        let fee_cache = AHashMap::new();
2826        let mut filled_qty_cache = AHashMap::new();
2827
2828        // First update: acc_fill_sz = 0.01, no fill_sz, no trade_id
2829        let msg_1 = create_stub_order_msg("", Some("0.01".to_string()), "1234567890", "");
2830
2831        let report_1 = parse_order_msg(
2832            &msg_1,
2833            account_id,
2834            &instruments,
2835            &fee_cache,
2836            &filled_qty_cache,
2837            ts_init,
2838        )
2839        .unwrap();
2840
2841        // Should generate a fill report (not a status report)
2842        assert!(matches!(report_1, ExecutionReport::Fill(_)));
2843        if let ExecutionReport::Fill(fill) = &report_1 {
2844            assert_eq!(fill.last_qty, Quantity::from("0.01"));
2845        }
2846
2847        // Update cache
2848        filled_qty_cache.insert(Ustr::from("1234567890"), Quantity::from("0.01"));
2849
2850        // Second update: acc_fill_sz increased to 0.03, still no fill_sz or trade_id
2851        let msg_2 = create_stub_order_msg("", Some("0.03".to_string()), "1234567890", "");
2852
2853        let report_2 = parse_order_msg(
2854            &msg_2,
2855            account_id,
2856            &instruments,
2857            &fee_cache,
2858            &filled_qty_cache,
2859            ts_init,
2860        )
2861        .unwrap();
2862
2863        // Should still generate a fill report for the incremental 0.02
2864        assert!(matches!(report_2, ExecutionReport::Fill(_)));
2865        if let ExecutionReport::Fill(fill) = &report_2 {
2866            assert_eq!(fill.last_qty, Quantity::from("0.02"));
2867        }
2868    }
2869
2870    #[rstest]
2871    fn test_parse_book10_msg_partial_levels() {
2872        // Test with fewer than 10 levels - should pad with empty orders
2873        let book_msg = OKXBookMsg {
2874            asks: vec![
2875                OrderBookEntry {
2876                    price: "8476.98".to_string(),
2877                    size: "415".to_string(),
2878                    liquidated_orders_count: "0".to_string(),
2879                    orders_count: "13".to_string(),
2880                },
2881                OrderBookEntry {
2882                    price: "8477.00".to_string(),
2883                    size: "7".to_string(),
2884                    liquidated_orders_count: "0".to_string(),
2885                    orders_count: "2".to_string(),
2886                },
2887            ],
2888            bids: vec![OrderBookEntry {
2889                price: "8476.97".to_string(),
2890                size: "256".to_string(),
2891                liquidated_orders_count: "0".to_string(),
2892                orders_count: "12".to_string(),
2893            }],
2894            ts: 1597026383085,
2895            checksum: None,
2896            prev_seq_id: None,
2897            seq_id: 123456,
2898        };
2899
2900        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2901        let depth10 =
2902            parse_book10_msg(&book_msg, instrument_id, 2, 0, UnixNanos::default()).unwrap();
2903
2904        // Check that first levels have data
2905        assert_eq!(depth10.bids[0].price, Price::from("8476.97"));
2906        assert_eq!(depth10.bids[0].size, Quantity::from("256"));
2907        assert_eq!(depth10.bid_counts[0], 12);
2908
2909        // Check that remaining levels are padded with default (empty) orders
2910        assert_eq!(depth10.bids[1].price, Price::from("0"));
2911        assert_eq!(depth10.bids[1].size, Quantity::from("0"));
2912        assert_eq!(depth10.bid_counts[1], 0);
2913
2914        // Check asks
2915        assert_eq!(depth10.asks[0].price, Price::from("8476.98"));
2916        assert_eq!(depth10.asks[1].price, Price::from("8477.00"));
2917        assert_eq!(depth10.asks[2].price, Price::from("0")); // padded with empty
2918    }
2919
2920    #[rstest]
2921    fn test_parse_algo_order_msg_stop_market() {
2922        let json_data = load_test_json("ws_orders_algo.json");
2923        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
2924        let data: Vec<OKXAlgoOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
2925
2926        // Test first algo order (stop market sell)
2927        let msg = &data[0];
2928        assert_eq!(msg.algo_id, "706620792746729472");
2929        assert_eq!(msg.algo_cl_ord_id, "STOP001BTCUSDT20250120");
2930        assert_eq!(msg.state, OKXOrderStatus::Live);
2931        assert_eq!(msg.ord_px, "-1"); // Market order indicator
2932
2933        let account_id = AccountId::new("OKX-001");
2934        let mut instruments = AHashMap::new();
2935
2936        // Create mock instrument
2937        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2938        let instrument = CryptoPerpetual::new(
2939            instrument_id,
2940            Symbol::from("BTC-USDT-SWAP"),
2941            Currency::BTC(),
2942            Currency::USDT(),
2943            Currency::USDT(),
2944            false, // is_inverse
2945            2,     // price_precision
2946            8,     // size_precision
2947            Price::from("0.01"),
2948            Quantity::from("0.00000001"),
2949            None,
2950            None,
2951            None,
2952            None,
2953            None,
2954            None,
2955            None,
2956            None,
2957            None,
2958            None,
2959            None,
2960            None,
2961            0.into(), // ts_event
2962            0.into(), // ts_init
2963        );
2964        instruments.insert(
2965            Ustr::from("BTC-USDT-SWAP"),
2966            InstrumentAny::CryptoPerpetual(instrument),
2967        );
2968
2969        let result =
2970            parse_algo_order_msg(msg.clone(), account_id, &instruments, UnixNanos::default());
2971
2972        assert!(result.is_ok());
2973        let report = result.unwrap();
2974
2975        if let ExecutionReport::Order(status_report) = report {
2976            assert_eq!(status_report.order_type, OrderType::StopMarket);
2977            assert_eq!(status_report.order_side, OrderSide::Sell);
2978            assert_eq!(status_report.quantity, Quantity::from("0.01000000"));
2979            assert_eq!(status_report.trigger_price, Some(Price::from("95000.00")));
2980            assert_eq!(status_report.trigger_type, Some(TriggerType::LastPrice));
2981            assert_eq!(status_report.price, None); // No limit price for market orders
2982        } else {
2983            panic!("Expected Order report");
2984        }
2985    }
2986
2987    #[rstest]
2988    fn test_parse_algo_order_msg_stop_limit() {
2989        let json_data = load_test_json("ws_orders_algo.json");
2990        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
2991        let data: Vec<OKXAlgoOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
2992
2993        // Test second algo order (stop limit buy)
2994        let msg = &data[1];
2995        assert_eq!(msg.algo_id, "706620792746729473");
2996        assert_eq!(msg.state, OKXOrderStatus::Live);
2997        assert_eq!(msg.ord_px, "106000"); // Limit price
2998
2999        let account_id = AccountId::new("OKX-001");
3000        let mut instruments = AHashMap::new();
3001
3002        // Create mock instrument
3003        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
3004        let instrument = CryptoPerpetual::new(
3005            instrument_id,
3006            Symbol::from("BTC-USDT-SWAP"),
3007            Currency::BTC(),
3008            Currency::USDT(),
3009            Currency::USDT(),
3010            false, // is_inverse
3011            2,     // price_precision
3012            8,     // size_precision
3013            Price::from("0.01"),
3014            Quantity::from("0.00000001"),
3015            None,
3016            None,
3017            None,
3018            None,
3019            None,
3020            None,
3021            None,
3022            None,
3023            None,
3024            None,
3025            None,
3026            None,
3027            0.into(), // ts_event
3028            0.into(), // ts_init
3029        );
3030        instruments.insert(
3031            Ustr::from("BTC-USDT-SWAP"),
3032            InstrumentAny::CryptoPerpetual(instrument),
3033        );
3034
3035        let result =
3036            parse_algo_order_msg(msg.clone(), account_id, &instruments, UnixNanos::default());
3037
3038        assert!(result.is_ok());
3039        let report = result.unwrap();
3040
3041        if let ExecutionReport::Order(status_report) = report {
3042            assert_eq!(status_report.order_type, OrderType::StopLimit);
3043            assert_eq!(status_report.order_side, OrderSide::Buy);
3044            assert_eq!(status_report.quantity, Quantity::from("0.02000000"));
3045            assert_eq!(status_report.trigger_price, Some(Price::from("105000.00")));
3046            assert_eq!(status_report.trigger_type, Some(TriggerType::MarkPrice));
3047            assert_eq!(status_report.price, Some(Price::from("106000.00"))); // Has limit price
3048        } else {
3049            panic!("Expected Order report");
3050        }
3051    }
3052
3053    #[rstest]
3054    fn test_parse_trigger_order_from_regular_channel() {
3055        let json_data = load_test_json("ws_orders_trigger.json");
3056        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3057        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
3058
3059        // Test triggered order that came through regular orders channel
3060        let msg = &data[0];
3061        assert_eq!(msg.ord_type, OKXOrderType::Trigger);
3062        assert_eq!(msg.state, OKXOrderStatus::Filled);
3063
3064        let account_id = AccountId::new("OKX-001");
3065        let mut instruments = AHashMap::new();
3066
3067        // Create mock instrument
3068        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
3069        let instrument = CryptoPerpetual::new(
3070            instrument_id,
3071            Symbol::from("BTC-USDT-SWAP"),
3072            Currency::BTC(),
3073            Currency::USDT(),
3074            Currency::USDT(),
3075            false, // is_inverse
3076            2,     // price_precision
3077            8,     // size_precision
3078            Price::from("0.01"),
3079            Quantity::from("0.00000001"),
3080            None,
3081            None,
3082            None,
3083            None,
3084            None,
3085            None,
3086            None,
3087            None,
3088            None,
3089            None,
3090            None,
3091            None,
3092            0.into(), // ts_event
3093            0.into(), // ts_init
3094        );
3095        instruments.insert(
3096            Ustr::from("BTC-USDT-SWAP"),
3097            InstrumentAny::CryptoPerpetual(instrument),
3098        );
3099        let fee_cache = AHashMap::new();
3100        let filled_qty_cache = AHashMap::new();
3101
3102        let result = parse_order_msg_vec(
3103            vec![msg.clone()],
3104            account_id,
3105            &instruments,
3106            &fee_cache,
3107            &filled_qty_cache,
3108            UnixNanos::default(),
3109        );
3110
3111        assert!(result.is_ok());
3112        let reports = result.unwrap();
3113        assert_eq!(reports.len(), 1);
3114
3115        if let ExecutionReport::Fill(fill_report) = &reports[0] {
3116            assert_eq!(fill_report.order_side, OrderSide::Sell);
3117            assert_eq!(fill_report.last_qty, Quantity::from("0.01000000"));
3118            assert_eq!(fill_report.last_px, Price::from("101950.00"));
3119        } else {
3120            panic!("Expected Fill report for filled trigger order");
3121        }
3122    }
3123
3124    #[rstest]
3125    fn test_parse_liquidation_order() {
3126        let json_data = load_test_json("ws_orders_liquidation.json");
3127        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3128        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
3129
3130        // Test liquidation order
3131        let msg = &data[0];
3132        assert_eq!(msg.category, OKXOrderCategory::FullLiquidation);
3133        assert_eq!(msg.state, OKXOrderStatus::Filled);
3134        assert_eq!(msg.inst_id.as_str(), "BTC-USDT-SWAP");
3135
3136        let account_id = AccountId::new("OKX-001");
3137        let mut instruments = AHashMap::new();
3138
3139        // Create mock instrument
3140        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
3141        let instrument = CryptoPerpetual::new(
3142            instrument_id,
3143            Symbol::from("BTC-USDT-SWAP"),
3144            Currency::BTC(),
3145            Currency::USDT(),
3146            Currency::USDT(),
3147            false, // is_inverse
3148            2,     // price_precision
3149            8,     // size_precision
3150            Price::from("0.01"),
3151            Quantity::from("0.00000001"),
3152            None,
3153            None,
3154            None,
3155            None,
3156            None,
3157            None,
3158            None,
3159            None,
3160            None,
3161            None,
3162            None,
3163            None,
3164            0.into(), // ts_event
3165            0.into(), // ts_init
3166        );
3167        instruments.insert(
3168            Ustr::from("BTC-USDT-SWAP"),
3169            InstrumentAny::CryptoPerpetual(instrument),
3170        );
3171        let fee_cache = AHashMap::new();
3172        let filled_qty_cache = AHashMap::new();
3173
3174        let result = parse_order_msg_vec(
3175            vec![msg.clone()],
3176            account_id,
3177            &instruments,
3178            &fee_cache,
3179            &filled_qty_cache,
3180            UnixNanos::default(),
3181        );
3182
3183        assert!(result.is_ok());
3184        let reports = result.unwrap();
3185        assert_eq!(reports.len(), 1);
3186
3187        // Verify it's a fill report for a liquidation
3188        if let ExecutionReport::Fill(fill_report) = &reports[0] {
3189            assert_eq!(fill_report.order_side, OrderSide::Sell);
3190            assert_eq!(fill_report.last_qty, Quantity::from("0.50000000"));
3191            assert_eq!(fill_report.last_px, Price::from("40000.00"));
3192            assert_eq!(fill_report.liquidity_side, LiquiditySide::Taker);
3193        } else {
3194            panic!("Expected Fill report for liquidation order");
3195        }
3196    }
3197
3198    #[rstest]
3199    fn test_parse_adl_order() {
3200        let json_data = load_test_json("ws_orders_adl.json");
3201        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3202        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
3203
3204        // Test ADL order
3205        let msg = &data[0];
3206        assert_eq!(msg.category, OKXOrderCategory::Adl);
3207        assert_eq!(msg.state, OKXOrderStatus::Filled);
3208        assert_eq!(msg.inst_id.as_str(), "ETH-USDT-SWAP");
3209
3210        let account_id = AccountId::new("OKX-001");
3211        let mut instruments = AHashMap::new();
3212
3213        // Create mock instrument
3214        let instrument_id = InstrumentId::from("ETH-USDT-SWAP.OKX");
3215        let instrument = CryptoPerpetual::new(
3216            instrument_id,
3217            Symbol::from("ETH-USDT-SWAP"),
3218            Currency::ETH(),
3219            Currency::USDT(),
3220            Currency::USDT(),
3221            false, // is_inverse
3222            2,     // price_precision
3223            8,     // size_precision
3224            Price::from("0.01"),
3225            Quantity::from("0.00000001"),
3226            None,
3227            None,
3228            None,
3229            None,
3230            None,
3231            None,
3232            None,
3233            None,
3234            None,
3235            None,
3236            None,
3237            None,
3238            0.into(), // ts_event
3239            0.into(), // ts_init
3240        );
3241        instruments.insert(
3242            Ustr::from("ETH-USDT-SWAP"),
3243            InstrumentAny::CryptoPerpetual(instrument),
3244        );
3245        let fee_cache = AHashMap::new();
3246        let filled_qty_cache = AHashMap::new();
3247
3248        let result = parse_order_msg_vec(
3249            vec![msg.clone()],
3250            account_id,
3251            &instruments,
3252            &fee_cache,
3253            &filled_qty_cache,
3254            UnixNanos::default(),
3255        );
3256
3257        assert!(result.is_ok());
3258        let reports = result.unwrap();
3259        assert_eq!(reports.len(), 1);
3260
3261        // Verify it's a fill report for ADL
3262        if let ExecutionReport::Fill(fill_report) = &reports[0] {
3263            assert_eq!(fill_report.order_side, OrderSide::Buy);
3264            assert_eq!(fill_report.last_qty, Quantity::from("0.30000000"));
3265            assert_eq!(fill_report.last_px, Price::from("41000.00"));
3266            assert_eq!(fill_report.liquidity_side, LiquiditySide::Taker);
3267        } else {
3268            panic!("Expected Fill report for ADL order");
3269        }
3270    }
3271
3272    #[rstest]
3273    fn test_parse_unknown_category_graceful_fallback() {
3274        // Test that unknown/future category values deserialize as Other instead of failing
3275        let json_with_unknown_category = r#"{
3276            "category": "some_future_category_we_dont_know"
3277        }"#;
3278
3279        let result: Result<serde_json::Value, _> = serde_json::from_str(json_with_unknown_category);
3280        assert!(result.is_ok());
3281
3282        // Test deserialization of the category field directly
3283        let category_result: Result<OKXOrderCategory, _> =
3284            serde_json::from_str(r#""some_future_category""#);
3285        assert!(category_result.is_ok());
3286        assert_eq!(category_result.unwrap(), OKXOrderCategory::Other);
3287
3288        // Verify known categories still work
3289        let normal: OKXOrderCategory = serde_json::from_str(r#""normal""#).unwrap();
3290        assert_eq!(normal, OKXOrderCategory::Normal);
3291
3292        let twap: OKXOrderCategory = serde_json::from_str(r#""twap""#).unwrap();
3293        assert_eq!(twap, OKXOrderCategory::Twap);
3294    }
3295
3296    #[rstest]
3297    fn test_parse_partial_liquidation_order() {
3298        // Create a test message with partial liquidation category
3299        let account_id = AccountId::new("OKX-001");
3300        let mut instruments = AHashMap::new();
3301
3302        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
3303        let instrument = CryptoPerpetual::new(
3304            instrument_id,
3305            Symbol::from("BTC-USDT-SWAP"),
3306            Currency::BTC(),
3307            Currency::USDT(),
3308            Currency::USDT(),
3309            false,
3310            2,
3311            8,
3312            Price::from("0.01"),
3313            Quantity::from("0.00000001"),
3314            None,
3315            None,
3316            None,
3317            None,
3318            None,
3319            None,
3320            None,
3321            None,
3322            None,
3323            None,
3324            None,
3325            None,
3326            0.into(),
3327            0.into(),
3328        );
3329        instruments.insert(
3330            Ustr::from("BTC-USDT-SWAP"),
3331            InstrumentAny::CryptoPerpetual(instrument),
3332        );
3333
3334        let partial_liq_msg = OKXOrderMsg {
3335            acc_fill_sz: Some("0.25".to_string()),
3336            avg_px: "39000.0".to_string(),
3337            c_time: 1746947317401,
3338            cancel_source: None,
3339            cancel_source_reason: None,
3340            category: OKXOrderCategory::PartialLiquidation,
3341            ccy: Ustr::from("USDT"),
3342            cl_ord_id: "".to_string(),
3343            algo_cl_ord_id: None,
3344            fee: Some("-9.75".to_string()),
3345            fee_ccy: Ustr::from("USDT"),
3346            fill_px: "39000.0".to_string(),
3347            fill_sz: "0.25".to_string(),
3348            fill_time: 1746947317402,
3349            inst_id: Ustr::from("BTC-USDT-SWAP"),
3350            inst_type: OKXInstrumentType::Swap,
3351            lever: "10.0".to_string(),
3352            ord_id: Ustr::from("2497956918703120888"),
3353            ord_type: OKXOrderType::Market,
3354            pnl: "-2500".to_string(),
3355            pos_side: OKXPositionSide::Long,
3356            px: "".to_string(),
3357            reduce_only: "false".to_string(),
3358            side: OKXSide::Sell,
3359            state: OKXOrderStatus::Filled,
3360            exec_type: OKXExecType::Taker,
3361            sz: "0.25".to_string(),
3362            td_mode: OKXTradeMode::Isolated,
3363            tgt_ccy: None,
3364            trade_id: "1518905888".to_string(),
3365            u_time: 1746947317402,
3366        };
3367
3368        let fee_cache = AHashMap::new();
3369        let filled_qty_cache = AHashMap::new();
3370        let result = parse_order_msg(
3371            &partial_liq_msg,
3372            account_id,
3373            &instruments,
3374            &fee_cache,
3375            &filled_qty_cache,
3376            UnixNanos::default(),
3377        );
3378
3379        assert!(result.is_ok());
3380        let report = result.unwrap();
3381
3382        // Verify it's a fill report for partial liquidation
3383        if let ExecutionReport::Fill(fill_report) = report {
3384            assert_eq!(fill_report.order_side, OrderSide::Sell);
3385            assert_eq!(fill_report.last_qty, Quantity::from("0.25000000"));
3386            assert_eq!(fill_report.last_px, Price::from("39000.00"));
3387        } else {
3388            panic!("Expected Fill report for partial liquidation order");
3389        }
3390    }
3391
3392    #[rstest]
3393    fn test_websocket_instrument_update_preserves_cached_fees() {
3394        use nautilus_model::{identifiers::InstrumentId, instruments::InstrumentAny};
3395
3396        use crate::common::{models::OKXInstrument, parse::parse_instrument_any};
3397
3398        let ts_init = UnixNanos::default();
3399
3400        // Create initial instrument with fees (simulating HTTP load)
3401        let initial_fees = (
3402            Some(Decimal::new(8, 4)),  // maker_fee = 0.0008
3403            Some(Decimal::new(10, 4)), // taker_fee = 0.0010
3404        );
3405
3406        // Deserialize initial instrument from JSON
3407        let initial_inst_json = serde_json::json!({
3408            "instType": "SPOT",
3409            "instId": "BTC-USD",
3410            "baseCcy": "BTC",
3411            "quoteCcy": "USD",
3412            "settleCcy": "",
3413            "ctVal": "",
3414            "ctMult": "",
3415            "ctValCcy": "",
3416            "optType": "",
3417            "stk": "",
3418            "listTime": "1733454000000",
3419            "expTime": "",
3420            "lever": "",
3421            "tickSz": "0.1",
3422            "lotSz": "0.00000001",
3423            "minSz": "0.00001",
3424            "ctType": "linear",
3425            "alias": "",
3426            "state": "live",
3427            "maxLmtSz": "9999999999",
3428            "maxMktSz": "1000000",
3429            "maxTwapSz": "9999999999.0000000000000000",
3430            "maxIcebergSz": "9999999999.0000000000000000",
3431            "maxTriggerSz": "9999999999.0000000000000000",
3432            "maxStopSz": "1000000",
3433            "uly": "",
3434            "instFamily": "",
3435            "ruleType": "normal",
3436            "maxLmtAmt": "20000000",
3437            "maxMktAmt": "1000000"
3438        });
3439
3440        let initial_inst: OKXInstrument = serde_json::from_value(initial_inst_json)
3441            .expect("Failed to deserialize initial instrument");
3442
3443        // Parse initial instrument with fees
3444        let parsed_initial = parse_instrument_any(
3445            &initial_inst,
3446            None,
3447            None,
3448            initial_fees.0,
3449            initial_fees.1,
3450            ts_init,
3451        )
3452        .expect("Failed to parse initial instrument")
3453        .expect("Initial instrument should not be None");
3454
3455        // Verify fees were applied
3456        if let InstrumentAny::CurrencyPair(ref pair) = parsed_initial {
3457            assert_eq!(pair.maker_fee, Decimal::new(8, 4));
3458            assert_eq!(pair.taker_fee, Decimal::new(10, 4));
3459        } else {
3460            panic!("Expected CurrencyPair instrument");
3461        }
3462
3463        // Build instrument cache with the initial instrument
3464        let mut instruments_cache = AHashMap::new();
3465        instruments_cache.insert(Ustr::from("BTC-USD"), parsed_initial);
3466
3467        // Create WebSocket update message (same structure as initial, simulating a WebSocket update)
3468        let ws_update = serde_json::json!({
3469            "instType": "SPOT",
3470            "instId": "BTC-USD",
3471            "baseCcy": "BTC",
3472            "quoteCcy": "USD",
3473            "settleCcy": "",
3474            "ctVal": "",
3475            "ctMult": "",
3476            "ctValCcy": "",
3477            "optType": "",
3478            "stk": "",
3479            "listTime": "1733454000000",
3480            "expTime": "",
3481            "lever": "",
3482            "tickSz": "0.1",
3483            "lotSz": "0.00000001",
3484            "minSz": "0.00001",
3485            "ctType": "linear",
3486            "alias": "",
3487            "state": "live",
3488            "maxLmtSz": "9999999999",
3489            "maxMktSz": "1000000",
3490            "maxTwapSz": "9999999999.0000000000000000",
3491            "maxIcebergSz": "9999999999.0000000000000000",
3492            "maxTriggerSz": "9999999999.0000000000000000",
3493            "maxStopSz": "1000000",
3494            "uly": "",
3495            "instFamily": "",
3496            "ruleType": "normal",
3497            "maxLmtAmt": "20000000",
3498            "maxMktAmt": "1000000"
3499        });
3500
3501        let instrument_id = InstrumentId::from("BTC-USD.OKX");
3502        let mut funding_cache = AHashMap::new();
3503
3504        // Parse WebSocket update with cache
3505        let result = parse_ws_message_data(
3506            &OKXWsChannel::Instruments,
3507            ws_update,
3508            &instrument_id,
3509            2,
3510            8,
3511            ts_init,
3512            &mut funding_cache,
3513            &instruments_cache,
3514        )
3515        .expect("Failed to parse WebSocket instrument update");
3516
3517        // Verify the update preserves the cached fees
3518        if let Some(NautilusWsMessage::Instrument(boxed_inst)) = result {
3519            if let InstrumentAny::CurrencyPair(pair) = *boxed_inst {
3520                assert_eq!(
3521                    pair.maker_fee,
3522                    Decimal::new(8, 4),
3523                    "Maker fee should be preserved from cache"
3524                );
3525                assert_eq!(
3526                    pair.taker_fee,
3527                    Decimal::new(10, 4),
3528                    "Taker fee should be preserved from cache"
3529                );
3530            } else {
3531                panic!("Expected CurrencyPair instrument from WebSocket update");
3532            }
3533        } else {
3534            panic!("Expected Instrument message from WebSocket update");
3535        }
3536    }
3537}