Skip to main content

nautilus_bitmex/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsers that convert BitMEX WebSocket payloads into Nautilus data structures.
17
18use std::{num::NonZero, str::FromStr};
19
20use ahash::AHashMap;
21use dashmap::DashMap;
22use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime, uuid::UUID4};
23#[cfg(test)]
24use nautilus_model::types::Currency;
25use nautilus_model::{
26    data::{
27        Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
28        MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
29        depth::DEPTH10_LEN,
30    },
31    enums::{
32        AccountType, AggregationSource, BarAggregation, OrderSide, OrderStatus, OrderType,
33        PriceType, RecordFlag, TimeInForce, TrailingOffsetType,
34    },
35    events::{OrderUpdated, account::state::AccountState},
36    identifiers::{
37        AccountId, ClientOrderId, InstrumentId, OrderListId, StrategyId, Symbol, TradeId, TraderId,
38        VenueOrderId,
39    },
40    instruments::{Instrument, InstrumentAny},
41    reports::{FillReport, OrderStatusReport, PositionStatusReport},
42    types::{AccountBalance, MarginBalance, Money, Price, Quantity},
43};
44use rust_decimal::Decimal;
45use ustr::Ustr;
46use uuid::Uuid;
47
48use super::{
49    enums::{BitmexAction, BitmexWsTopic},
50    messages::{
51        BitmexExecutionMsg, BitmexFundingMsg, BitmexInstrumentMsg, BitmexMarginMsg,
52        BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexOrderMsg, BitmexPositionMsg,
53        BitmexQuoteMsg, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg,
54    },
55};
56use crate::{
57    common::{
58        consts::BITMEX_VENUE,
59        enums::{
60            BitmexExecInstruction, BitmexExecType, BitmexOrderType, BitmexPegPriceType, BitmexSide,
61        },
62        parse::{
63            clean_reason, extract_trigger_type, map_bitmex_currency, normalize_trade_bin_prices,
64            normalize_trade_bin_volume, parse_contracts_quantity, parse_fractional_quantity,
65            parse_instrument_id, parse_liquidity_side, parse_optional_datetime_to_unix_nanos,
66            parse_position_side, parse_signed_contracts_quantity,
67        },
68    },
69    http::parse::get_currency,
70    websocket::messages::BitmexOrderUpdateMsg,
71};
72
73const BAR_SPEC_1_MINUTE: BarSpecification = BarSpecification {
74    step: NonZero::new(1).expect("1 is a valid non-zero usize"),
75    aggregation: BarAggregation::Minute,
76    price_type: PriceType::Last,
77};
78const BAR_SPEC_5_MINUTE: BarSpecification = BarSpecification {
79    step: NonZero::new(5).expect("5 is a valid non-zero usize"),
80    aggregation: BarAggregation::Minute,
81    price_type: PriceType::Last,
82};
83const BAR_SPEC_1_HOUR: BarSpecification = BarSpecification {
84    step: NonZero::new(1).expect("1 is a valid non-zero usize"),
85    aggregation: BarAggregation::Hour,
86    price_type: PriceType::Last,
87};
88const BAR_SPEC_1_DAY: BarSpecification = BarSpecification {
89    step: NonZero::new(1).expect("1 is a valid non-zero usize"),
90    aggregation: BarAggregation::Day,
91    price_type: PriceType::Last,
92};
93
94/// Check if a symbol is an index symbol (starts with '.').
95///
96/// Index symbols in BitMEX represent indices like `.BXBT` and have different
97/// behavior from regular instruments:
98/// - They only have a single price value (no bid/ask spread).
99/// - They don't have trades or quotes.
100/// - Their price is delivered via the `lastPrice` field.
101#[inline]
102#[must_use]
103pub fn is_index_symbol(symbol: &Ustr) -> bool {
104    symbol.starts_with('.')
105}
106
107/// Converts a batch of BitMEX order-book rows into Nautilus delta events.
108#[must_use]
109pub fn parse_book_msg_vec(
110    data: Vec<BitmexOrderBookMsg>,
111    action: BitmexAction,
112    instruments: &AHashMap<Ustr, InstrumentAny>,
113    ts_init: UnixNanos,
114) -> Vec<Data> {
115    let mut deltas = Vec::with_capacity(data.len());
116
117    for msg in data {
118        if let Some(instrument) = instruments.get(&msg.symbol) {
119            let instrument_id = instrument.id();
120            let price_precision = instrument.price_precision();
121            deltas.push(Data::Delta(parse_book_msg(
122                &msg,
123                &action,
124                instrument,
125                instrument_id,
126                price_precision,
127                ts_init,
128            )));
129        } else {
130            log::error!(
131                "Instrument cache miss: book delta dropped for symbol={}",
132                msg.symbol
133            );
134        }
135    }
136
137    // Set F_LAST on the last delta so data engine knows the batch is complete
138    if let Some(Data::Delta(last_delta)) = deltas.last_mut() {
139        *last_delta = OrderBookDelta::new(
140            last_delta.instrument_id,
141            last_delta.action,
142            last_delta.order,
143            last_delta.flags | RecordFlag::F_LAST as u8,
144            last_delta.sequence,
145            last_delta.ts_event,
146            last_delta.ts_init,
147        );
148    }
149
150    deltas
151}
152
153/// Converts BitMEX level-10 snapshots into Nautilus depth events.
154#[must_use]
155pub fn parse_book10_msg_vec(
156    data: Vec<BitmexOrderBook10Msg>,
157    instruments: &AHashMap<Ustr, InstrumentAny>,
158    ts_init: UnixNanos,
159) -> Vec<Data> {
160    let mut depths = Vec::with_capacity(data.len());
161
162    for msg in data {
163        if let Some(instrument) = instruments.get(&msg.symbol) {
164            let instrument_id = instrument.id();
165            let price_precision = instrument.price_precision();
166            match parse_book10_msg(&msg, instrument, instrument_id, price_precision, ts_init) {
167                Ok(depth) => depths.push(Data::Depth10(Box::new(depth))),
168                Err(e) => {
169                    log::error!("Failed to parse orderBook10 for symbol={}: {e}", msg.symbol);
170                }
171            }
172        } else {
173            log::error!(
174                "Instrument cache miss: depth10 message dropped for symbol={}",
175                msg.symbol
176            );
177        }
178    }
179    depths
180}
181
182/// Converts BitMEX trade messages into Nautilus trade data events.
183#[must_use]
184pub fn parse_trade_msg_vec(
185    data: Vec<BitmexTradeMsg>,
186    instruments: &AHashMap<Ustr, InstrumentAny>,
187    ts_init: UnixNanos,
188) -> Vec<Data> {
189    let mut trades = Vec::with_capacity(data.len());
190
191    for msg in data {
192        if let Some(instrument) = instruments.get(&msg.symbol) {
193            let instrument_id = instrument.id();
194            let price_precision = instrument.price_precision();
195            trades.push(Data::Trade(parse_trade_msg(
196                &msg,
197                instrument,
198                instrument_id,
199                price_precision,
200                ts_init,
201            )));
202        } else {
203            log::error!(
204                "Instrument cache miss: trade message dropped for symbol={}",
205                msg.symbol
206            );
207        }
208    }
209    trades
210}
211
212/// Converts aggregated trade-bin messages into Nautilus data events.
213#[must_use]
214pub fn parse_trade_bin_msg_vec(
215    data: Vec<BitmexTradeBinMsg>,
216    topic: BitmexWsTopic,
217    instruments: &AHashMap<Ustr, InstrumentAny>,
218    ts_init: UnixNanos,
219) -> Vec<Data> {
220    let mut trades = Vec::with_capacity(data.len());
221
222    for msg in data {
223        if let Some(instrument) = instruments.get(&msg.symbol) {
224            let instrument_id = instrument.id();
225            let price_precision = instrument.price_precision();
226            trades.push(Data::Bar(parse_trade_bin_msg(
227                &msg,
228                &topic,
229                instrument,
230                instrument_id,
231                price_precision,
232                ts_init,
233            )));
234        } else {
235            log::error!(
236                "Instrument cache miss: trade bin (bar) dropped for symbol={}",
237                msg.symbol
238            );
239        }
240    }
241    trades
242}
243
244/// Converts a BitMEX order book row into a Nautilus order-book delta.
245#[allow(clippy::too_many_arguments)]
246#[must_use]
247pub fn parse_book_msg(
248    msg: &BitmexOrderBookMsg,
249    action: &BitmexAction,
250    instrument: &InstrumentAny,
251    instrument_id: InstrumentId,
252    price_precision: u8,
253    ts_init: UnixNanos,
254) -> OrderBookDelta {
255    let flags = if action == &BitmexAction::Partial {
256        RecordFlag::F_SNAPSHOT as u8
257    } else {
258        0
259    };
260
261    let action = action.as_book_action();
262    let price = Price::new(msg.price, price_precision);
263    let side = msg.side.as_order_side();
264    let size = parse_contracts_quantity(msg.size.unwrap_or(0), instrument);
265    let order_id = msg.id;
266    let order = BookOrder::new(side, price, size, order_id);
267    let sequence = 0; // Not available
268    let ts_event = UnixNanos::from(msg.timestamp);
269
270    OrderBookDelta::new(
271        instrument_id,
272        action,
273        order,
274        flags,
275        sequence,
276        ts_event,
277        ts_init,
278    )
279}
280
281/// Parses an `OrderBook10` message into an `OrderBookDepth10` object.
282///
283/// # Errors
284///
285/// Returns an error if the bid or ask arrays are not exactly 10 elements.
286#[allow(clippy::too_many_arguments)]
287pub fn parse_book10_msg(
288    msg: &BitmexOrderBook10Msg,
289    instrument: &InstrumentAny,
290    instrument_id: InstrumentId,
291    price_precision: u8,
292    ts_init: UnixNanos,
293) -> anyhow::Result<OrderBookDepth10> {
294    let mut bids = Vec::with_capacity(DEPTH10_LEN);
295    let mut asks = Vec::with_capacity(DEPTH10_LEN);
296
297    // Initialized with zeros
298    let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
299    let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
300
301    for (i, level) in msg.bids.iter().enumerate() {
302        let bid_order = BookOrder::new(
303            OrderSide::Buy,
304            Price::new(level[0], price_precision),
305            parse_fractional_quantity(level[1], instrument),
306            0,
307        );
308
309        bids.push(bid_order);
310        bid_counts[i] = 1;
311    }
312
313    for (i, level) in msg.asks.iter().enumerate() {
314        let ask_order = BookOrder::new(
315            OrderSide::Sell,
316            Price::new(level[0], price_precision),
317            parse_fractional_quantity(level[1], instrument),
318            0,
319        );
320
321        asks.push(ask_order);
322        ask_counts[i] = 1;
323    }
324
325    let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
326        anyhow::anyhow!(
327            "Bids length mismatch: expected {DEPTH10_LEN}, was {}",
328            v.len()
329        )
330    })?;
331    let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
332        anyhow::anyhow!(
333            "Asks length mismatch: expected {DEPTH10_LEN}, was {}",
334            v.len()
335        )
336    })?;
337
338    let ts_event = UnixNanos::from(msg.timestamp);
339
340    Ok(OrderBookDepth10::new(
341        instrument_id,
342        bids,
343        asks,
344        bid_counts,
345        ask_counts,
346        RecordFlag::F_SNAPSHOT as u8,
347        0, // Not applicable for BitMEX L2 books
348        ts_event,
349        ts_init,
350    ))
351}
352
353/// Converts a BitMEX quote message into a `QuoteTick`, filling missing data from cache.
354#[must_use]
355pub fn parse_quote_msg(
356    msg: &BitmexQuoteMsg,
357    last_quote: &QuoteTick,
358    instrument: &InstrumentAny,
359    instrument_id: InstrumentId,
360    price_precision: u8,
361    ts_init: UnixNanos,
362) -> QuoteTick {
363    let bid_price = match msg.bid_price {
364        Some(price) => Price::new(price, price_precision),
365        None => last_quote.bid_price,
366    };
367
368    let ask_price = match msg.ask_price {
369        Some(price) => Price::new(price, price_precision),
370        None => last_quote.ask_price,
371    };
372
373    let bid_size = match msg.bid_size {
374        Some(size) => parse_contracts_quantity(size, instrument),
375        None => last_quote.bid_size,
376    };
377
378    let ask_size = match msg.ask_size {
379        Some(size) => parse_contracts_quantity(size, instrument),
380        None => last_quote.ask_size,
381    };
382
383    let ts_event = UnixNanos::from(msg.timestamp);
384
385    QuoteTick::new(
386        instrument_id,
387        bid_price,
388        ask_price,
389        bid_size,
390        ask_size,
391        ts_event,
392        ts_init,
393    )
394}
395
396/// Converts a BitMEX trade message into a `TradeTick`.
397#[must_use]
398pub fn parse_trade_msg(
399    msg: &BitmexTradeMsg,
400    instrument: &InstrumentAny,
401    instrument_id: InstrumentId,
402    price_precision: u8,
403    ts_init: UnixNanos,
404) -> TradeTick {
405    let price = Price::new(msg.price, price_precision);
406    let size = parse_contracts_quantity(msg.size, instrument);
407    let aggressor_side = msg.side.as_aggressor_side();
408    let trade_id = TradeId::new(
409        msg.trd_match_id
410            .map_or_else(|| Uuid::new_v4().to_string(), |uuid| uuid.to_string()),
411    );
412    let ts_event = UnixNanos::from(msg.timestamp);
413
414    TradeTick::new(
415        instrument_id,
416        price,
417        size,
418        aggressor_side,
419        trade_id,
420        ts_event,
421        ts_init,
422    )
423}
424
425/// Converts a BitMEX trade-bin summary into a `Bar` for the matching topic.
426#[must_use]
427pub fn parse_trade_bin_msg(
428    msg: &BitmexTradeBinMsg,
429    topic: &BitmexWsTopic,
430    instrument: &InstrumentAny,
431    instrument_id: InstrumentId,
432    price_precision: u8,
433    ts_init: UnixNanos,
434) -> Bar {
435    let spec = bar_spec_from_topic(topic);
436    let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
437
438    let open = Price::new(msg.open, price_precision);
439    let high = Price::new(msg.high, price_precision);
440    let low = Price::new(msg.low, price_precision);
441    let close = Price::new(msg.close, price_precision);
442
443    let (open, high, low, close) =
444        normalize_trade_bin_prices(open, high, low, close, &msg.symbol, Some(&bar_type));
445
446    let volume_contracts = normalize_trade_bin_volume(Some(msg.volume), &msg.symbol);
447    let volume = parse_contracts_quantity(volume_contracts, instrument);
448    let ts_event = UnixNanos::from(msg.timestamp);
449
450    Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
451}
452
453/// Converts a WebSocket topic to a bar specification.
454///
455/// Returns `BAR_SPEC_1_MINUTE` and logs an error for unsupported topics.
456#[must_use]
457pub fn bar_spec_from_topic(topic: &BitmexWsTopic) -> BarSpecification {
458    match topic {
459        BitmexWsTopic::TradeBin1m => BAR_SPEC_1_MINUTE,
460        BitmexWsTopic::TradeBin5m => BAR_SPEC_5_MINUTE,
461        BitmexWsTopic::TradeBin1h => BAR_SPEC_1_HOUR,
462        BitmexWsTopic::TradeBin1d => BAR_SPEC_1_DAY,
463        _ => {
464            log::error!("Bar specification not supported: topic={topic:?}");
465            BAR_SPEC_1_MINUTE
466        }
467    }
468}
469
470/// Converts a bar specification to a WebSocket topic.
471///
472/// Returns `TradeBin1m` and logs an error for unsupported specifications.
473#[must_use]
474pub fn topic_from_bar_spec(spec: BarSpecification) -> BitmexWsTopic {
475    match spec {
476        BAR_SPEC_1_MINUTE => BitmexWsTopic::TradeBin1m,
477        BAR_SPEC_5_MINUTE => BitmexWsTopic::TradeBin5m,
478        BAR_SPEC_1_HOUR => BitmexWsTopic::TradeBin1h,
479        BAR_SPEC_1_DAY => BitmexWsTopic::TradeBin1d,
480        _ => {
481            log::error!("Bar specification not supported: spec={spec:?}");
482            BitmexWsTopic::TradeBin1m
483        }
484    }
485}
486
487fn infer_order_type_from_msg(msg: &BitmexOrderMsg) -> Option<OrderType> {
488    if msg.stop_px.is_some() {
489        if msg.price.is_some() {
490            Some(OrderType::StopLimit)
491        } else {
492            Some(OrderType::StopMarket)
493        }
494    } else if msg.price.is_some() {
495        Some(OrderType::Limit)
496    } else {
497        Some(OrderType::Market)
498    }
499}
500
501/// Parse a BitMEX WebSocket order message into a Nautilus `OrderStatusReport`.
502///
503/// # Panics
504///
505/// Panics if required fields are missing or invalid.
506///
507/// # References
508///
509/// <https://www.bitmex.com/app/wsAPI#Order>
510///
511/// # Errors
512///
513/// Returns an error if the time in force conversion fails.
514pub fn parse_order_msg(
515    msg: &BitmexOrderMsg,
516    instrument: &InstrumentAny,
517    order_type_cache: &DashMap<ClientOrderId, OrderType>,
518) -> anyhow::Result<OrderStatusReport> {
519    let account_id = AccountId::new(format!("BITMEX-{}", msg.account)); // TODO: Revisit
520    let instrument_id = parse_instrument_id(msg.symbol);
521    let venue_order_id = VenueOrderId::new(msg.order_id.to_string());
522    let common_side: BitmexSide = msg.side.into();
523    let order_side: OrderSide = common_side.into();
524
525    let order_type: OrderType = if let Some(ord_type) = msg.ord_type {
526        // Pegged orders with TrailingStopPeg are trailing stop orders
527        if ord_type == BitmexOrderType::Pegged
528            && msg.peg_price_type == Some(BitmexPegPriceType::TrailingStopPeg)
529        {
530            if msg.price.is_some() {
531                OrderType::TrailingStopLimit
532            } else {
533                OrderType::TrailingStopMarket
534            }
535        } else {
536            ord_type.into()
537        }
538    } else if let Some(client_order_id) = msg.cl_ord_id {
539        let client_order_id = ClientOrderId::new(client_order_id);
540        if let Some(entry) = order_type_cache.get(&client_order_id) {
541            *entry.value()
542        } else if let Some(inferred) = infer_order_type_from_msg(msg) {
543            order_type_cache.insert(client_order_id, inferred);
544            inferred
545        } else {
546            anyhow::bail!(
547                "Order type not found in cache for client_order_id: {client_order_id} (order missing ord_type field)"
548            );
549        }
550    } else if let Some(inferred) = infer_order_type_from_msg(msg) {
551        inferred
552    } else {
553        anyhow::bail!("Order missing both ord_type and cl_ord_id");
554    };
555
556    let time_in_force: TimeInForce = match msg.time_in_force {
557        Some(tif) => tif.try_into().map_err(|e| anyhow::anyhow!("{e}"))?,
558        None => TimeInForce::Gtc,
559    };
560    let order_status: OrderStatus = msg.ord_status.into();
561    let quantity = parse_signed_contracts_quantity(msg.order_qty, instrument);
562    let filled_qty = parse_signed_contracts_quantity(msg.cum_qty, instrument);
563    let report_id = UUID4::new();
564    let ts_accepted =
565        parse_optional_datetime_to_unix_nanos(&Some(msg.transact_time), "transact_time");
566    let ts_last = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
567    let ts_init = get_atomic_clock_realtime().get_time_ns();
568
569    let mut report = OrderStatusReport::new(
570        account_id,
571        instrument_id,
572        None, // client_order_id - will be set later if present
573        venue_order_id,
574        order_side,
575        order_type,
576        time_in_force,
577        order_status,
578        quantity,
579        filled_qty,
580        ts_accepted,
581        ts_last,
582        ts_init,
583        Some(report_id),
584    );
585
586    if let Some(cl_ord_id) = &msg.cl_ord_id {
587        report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
588    }
589
590    if let Some(cl_ord_link_id) = &msg.cl_ord_link_id {
591        report = report.with_order_list_id(OrderListId::new(cl_ord_link_id));
592    }
593
594    if let Some(price) = msg.price {
595        report = report.with_price(Price::new(price, instrument.price_precision()));
596    }
597
598    if let Some(avg_px) = msg.avg_px {
599        report = report.with_avg_px(avg_px)?;
600    }
601
602    if let Some(trigger_price) = msg.stop_px {
603        report = report
604            .with_trigger_price(Price::new(trigger_price, instrument.price_precision()))
605            .with_trigger_type(extract_trigger_type(msg.exec_inst.as_ref()));
606    }
607
608    // Populate trailing offset for trailing stop orders
609    if matches!(
610        order_type,
611        OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
612    ) && let Some(peg_offset) = msg.peg_offset_value
613    {
614        let trailing_offset = Decimal::try_from(peg_offset.abs())
615            .unwrap_or_else(|_| Decimal::new(peg_offset.abs() as i64, 0));
616        report = report
617            .with_trailing_offset(trailing_offset)
618            .with_trailing_offset_type(TrailingOffsetType::Price);
619
620        if msg.stop_px.is_none() {
621            report = report.with_trigger_type(extract_trigger_type(msg.exec_inst.as_ref()));
622        }
623    }
624
625    if let Some(exec_insts) = &msg.exec_inst {
626        for exec_inst in exec_insts {
627            match exec_inst {
628                BitmexExecInstruction::ParticipateDoNotInitiate => {
629                    report = report.with_post_only(true);
630                }
631                BitmexExecInstruction::ReduceOnly => {
632                    report = report.with_reduce_only(true);
633                }
634                _ => {}
635            }
636        }
637    }
638
639    // Extract rejection reason for rejected orders
640    if order_status == OrderStatus::Rejected {
641        if let Some(reason_str) = msg.ord_rej_reason.or(msg.text) {
642            log::debug!(
643                "Order rejected with reason: order_id={:?}, client_order_id={:?}, reason={:?}",
644                venue_order_id,
645                msg.cl_ord_id,
646                reason_str,
647            );
648            report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
649        } else {
650            log::debug!(
651                "Order rejected without reason from BitMEX: order_id={:?}, client_order_id={:?}, ord_status={:?}, ord_rej_reason={:?}, text={:?}",
652                venue_order_id,
653                msg.cl_ord_id,
654                msg.ord_status,
655                msg.ord_rej_reason,
656                msg.text,
657            );
658        }
659    }
660
661    // Check if this is a canceled post-only order (BitMEX cancels instead of rejecting)
662    // We need to preserve the rejection reason for the execution client to handle
663    if order_status == OrderStatus::Canceled
664        && let Some(reason_str) = msg.ord_rej_reason.or(msg.text)
665    {
666        report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
667    }
668
669    Ok(report)
670}
671
672/// Parse a BitMEX WebSocket order update message into a Nautilus `OrderUpdated` event.
673///
674/// This handles partial updates where only changed fields are present.
675pub fn parse_order_update_msg(
676    msg: &BitmexOrderUpdateMsg,
677    instrument: &InstrumentAny,
678    account_id: AccountId,
679) -> Option<OrderUpdated> {
680    // For BitMEX updates, we don't have trader_id or strategy_id from the exchange
681    // These will be populated by the execution engine when it matches the venue_order_id
682    let trader_id = TraderId::external();
683    let strategy_id = StrategyId::external();
684    let instrument_id = parse_instrument_id(msg.symbol);
685    let venue_order_id = Some(VenueOrderId::new(msg.order_id.to_string()));
686    let client_order_id = msg
687        .cl_ord_id
688        .as_ref()
689        .map_or_else(ClientOrderId::external, ClientOrderId::new);
690
691    // BitMEX partial updates may omit leaves_qty/cum_qty. When missing, we fall back
692    // to zero which signals the execution engine to use the cached order quantity.
693    let quantity = match (msg.leaves_qty, msg.cum_qty) {
694        (Some(leaves), Some(cum)) => parse_contracts_quantity((leaves + cum) as u64, instrument),
695        _ => Quantity::zero(instrument.size_precision()),
696    };
697    let price = msg
698        .price
699        .map(|p| Price::new(p, instrument.price_precision()));
700
701    // BitMEX doesn't send trigger price in regular order updates?
702    let trigger_price = None;
703    // BitMEX doesn't send protection price in regular order updates
704    let protection_price = None;
705
706    let event_id = UUID4::new();
707    let ts_event = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
708    let ts_init = get_atomic_clock_realtime().get_time_ns();
709
710    Some(OrderUpdated::new(
711        trader_id,
712        strategy_id,
713        instrument_id,
714        client_order_id,
715        quantity,
716        event_id,
717        ts_event,
718        ts_init,
719        false, // reconciliation
720        venue_order_id,
721        Some(account_id),
722        price,
723        trigger_price,
724        protection_price,
725    ))
726}
727
728/// Parse a BitMEX WebSocket execution message into a Nautilus `FillReport`.
729///
730/// Handles different execution types appropriately:
731/// - `Trade`: Normal trade execution → FillReport
732/// - `Liquidation`: Auto-deleveraging or liquidation → FillReport
733/// - `Bankruptcy`: Bankruptcy execution → FillReport (with warning)
734/// - `Settlement`, `TrialFill`: Non-obvious cases → None (with warning)
735/// - `Funding`, `Insurance`, `Rebalance`: Expected non-fills → None (debug log)
736/// - Order state changes (`New`, `Canceled`, etc.): → None (debug log)
737///
738/// # Panics
739///
740/// Panics if required fields are missing or invalid.
741///
742/// # References
743///
744/// <https://www.bitmex.com/app/wsAPI#Execution>
745///
746pub fn parse_execution_msg(
747    msg: BitmexExecutionMsg,
748    instrument: &InstrumentAny,
749) -> Option<FillReport> {
750    let exec_type = msg.exec_type?;
751
752    match exec_type {
753        // Position-affecting executions that generate fills
754        BitmexExecType::Trade | BitmexExecType::Liquidation => {}
755        BitmexExecType::Bankruptcy => {
756            log::warn!(
757                "Processing bankruptcy execution as fill: exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
758                msg.order_id,
759                msg.symbol,
760            );
761        }
762
763        // Settlement executions are mark-to-market events, not fills
764        BitmexExecType::Settlement => {
765            log::debug!(
766                "Settlement execution skipped (not a fill): applies quanto conversion/PnL transfer on contract settlement: exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
767                msg.order_id,
768                msg.symbol,
769            );
770            return None;
771        }
772        BitmexExecType::TrialFill => {
773            log::warn!(
774                "Trial fill execution received (testnet only), not processed as fill: exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
775                msg.order_id,
776                msg.symbol,
777            );
778            return None;
779        }
780
781        // Expected non-fill executions
782        BitmexExecType::Funding => {
783            log::debug!(
784                "Funding execution skipped (not a fill): exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
785                msg.order_id,
786                msg.symbol,
787            );
788            return None;
789        }
790        BitmexExecType::Insurance => {
791            log::debug!(
792                "Insurance execution skipped (not a fill): exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
793                msg.order_id,
794                msg.symbol,
795            );
796            return None;
797        }
798        BitmexExecType::Rebalance => {
799            log::debug!(
800                "Rebalance execution skipped (not a fill): exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
801                msg.order_id,
802                msg.symbol,
803            );
804            return None;
805        }
806
807        // Order state changes (not fills)
808        BitmexExecType::New
809        | BitmexExecType::Canceled
810        | BitmexExecType::CancelReject
811        | BitmexExecType::Replaced
812        | BitmexExecType::Rejected
813        | BitmexExecType::AmendReject
814        | BitmexExecType::Suspended
815        | BitmexExecType::Released
816        | BitmexExecType::TriggeredOrActivatedBySystem => {
817            log::debug!(
818                "Execution message skipped (order state change, not a fill): exec_type={exec_type:?}, order_id={:?}",
819                msg.order_id,
820            );
821            return None;
822        }
823
824        BitmexExecType::Unknown(ref type_str) => {
825            log::warn!(
826                "Unknown execution type received, skipping: exec_type={type_str}, order_id={:?}, symbol={:?}",
827                msg.order_id,
828                msg.symbol,
829            );
830            return None;
831        }
832    }
833
834    let account_id = AccountId::new(format!("BITMEX-{}", msg.account?));
835    let instrument_id = parse_instrument_id(msg.symbol?);
836    let venue_order_id = VenueOrderId::new(msg.order_id?.to_string());
837    let trade_id = TradeId::new(msg.trd_match_id?.to_string());
838    let order_side: OrderSide = msg.side.map_or(OrderSide::NoOrderSide, |s| {
839        let side: BitmexSide = s.into();
840        side.into()
841    });
842    let last_qty = parse_signed_contracts_quantity(msg.last_qty?, instrument);
843    let last_px = Price::new(msg.last_px?, instrument.price_precision());
844    let settlement_currency_str = msg.settl_currency.unwrap_or(Ustr::from("XBT"));
845    let mapped_currency = map_bitmex_currency(settlement_currency_str.as_str());
846    let currency = get_currency(&mapped_currency);
847    let commission = Money::new(msg.commission.unwrap_or(0.0), currency);
848    let liquidity_side = parse_liquidity_side(&msg.last_liquidity_ind);
849    let client_order_id = msg.cl_ord_id.map(ClientOrderId::new);
850    let venue_position_id = None; // Not applicable on BitMEX
851    let ts_event = parse_optional_datetime_to_unix_nanos(&msg.transact_time, "transact_time");
852    let ts_init = get_atomic_clock_realtime().get_time_ns();
853
854    Some(FillReport::new(
855        account_id,
856        instrument_id,
857        venue_order_id,
858        trade_id,
859        order_side,
860        last_qty,
861        last_px,
862        commission,
863        liquidity_side,
864        client_order_id,
865        venue_position_id,
866        ts_event,
867        ts_init,
868        None,
869    ))
870}
871
872/// Parse a BitMEX WebSocket position message into a Nautilus `PositionStatusReport`.
873///
874/// # References
875///
876/// <https://www.bitmex.com/app/wsAPI#Position>
877#[must_use]
878pub fn parse_position_msg(
879    msg: BitmexPositionMsg,
880    instrument: &InstrumentAny,
881) -> PositionStatusReport {
882    let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
883    let instrument_id = parse_instrument_id(msg.symbol);
884    let position_side = parse_position_side(msg.current_qty).as_specified();
885    let quantity = parse_signed_contracts_quantity(msg.current_qty.unwrap_or(0), instrument);
886    let venue_position_id = None; // Not applicable on BitMEX
887    let avg_px_open = msg
888        .avg_entry_price
889        .and_then(|p| Decimal::from_str(&p.to_string()).ok());
890    let ts_last = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
891    let ts_init = get_atomic_clock_realtime().get_time_ns();
892
893    PositionStatusReport::new(
894        account_id,
895        instrument_id,
896        position_side,
897        quantity,
898        ts_last,
899        ts_init,
900        None,              // report_id
901        venue_position_id, // venue_position_id
902        avg_px_open,       // avg_px_open
903    )
904}
905
906/// Parse a BitMEX WebSocket instrument message for mark and index prices.
907///
908/// For index symbols (e.g., `.BXBT`):
909/// - Uses the `lastPrice` field as the index price.
910/// - Also emits the `markPrice` field (which equals `lastPrice` for indices).
911///
912/// For regular instruments:
913/// - Uses the `index_price` field for index price updates.
914/// - Uses the `mark_price` field for mark price updates.
915///
916/// Returns a Vec of Data containing mark and/or index price updates
917/// or an empty Vec if no relevant price is present.
918#[must_use]
919pub fn parse_instrument_msg(
920    msg: BitmexInstrumentMsg,
921    instruments_cache: &AHashMap<Ustr, InstrumentAny>,
922    ts_init: UnixNanos,
923) -> Vec<Data> {
924    let mut updates = Vec::new();
925    let is_index = is_index_symbol(&msg.symbol);
926
927    // For index symbols (like .BXBT), the lastPrice field contains the index price
928    // For regular instruments, use the explicit index_price field if present
929    let effective_index_price = if is_index {
930        msg.last_price
931    } else {
932        msg.index_price
933    };
934
935    // Return early if no relevant prices present (mark_price or effective_index_price)
936    // Note: effective_index_price uses lastPrice for index symbols, index_price for others
937    // (Funding rates come through a separate Funding channel)
938    if msg.mark_price.is_none() && effective_index_price.is_none() {
939        return updates;
940    }
941
942    let instrument_id = InstrumentId::new(Symbol::from_ustr_unchecked(msg.symbol), *BITMEX_VENUE);
943    let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
944
945    // Look up instrument for proper precision
946    let price_precision = match instruments_cache.get(&Ustr::from(&msg.symbol)) {
947        Some(instrument) => instrument.price_precision(),
948        None => {
949            // BitMEX sends updates for all instruments on the instrument channel,
950            // but we only cache instruments that are explicitly requested.
951            // Index instruments (starting with '.') are not loaded via regular API endpoints.
952            if is_index {
953                log::trace!(
954                    "Index instrument {} not in cache, skipping update",
955                    msg.symbol
956                );
957            } else {
958                log::debug!("Instrument {} not in cache, skipping update", msg.symbol);
959            }
960            return updates;
961        }
962    };
963
964    // Add mark price update if present
965    // For index symbols, markPrice equals lastPrice and is valid to emit
966    if let Some(mark_price) = msg.mark_price {
967        let price = Price::new(mark_price, price_precision);
968        updates.push(Data::MarkPriceUpdate(MarkPriceUpdate::new(
969            instrument_id,
970            price,
971            ts_event,
972            ts_init,
973        )));
974    }
975
976    // Add index price update if present
977    if let Some(index_price) = effective_index_price {
978        let price = Price::new(index_price, price_precision);
979        updates.push(Data::IndexPriceUpdate(IndexPriceUpdate::new(
980            instrument_id,
981            price,
982            ts_event,
983            ts_init,
984        )));
985    }
986
987    updates
988}
989
990/// Parse a BitMEX WebSocket funding message.
991///
992/// Returns `FundingRateUpdate` containing funding rate information.
993/// Note: This returns `FundingRateUpdate` directly, not wrapped in Data enum,
994/// to keep it separate from the FFI layer.
995#[must_use]
996pub fn parse_funding_msg(msg: BitmexFundingMsg, ts_init: UnixNanos) -> FundingRateUpdate {
997    let instrument_id = InstrumentId::from(format!("{}.BITMEX", msg.symbol));
998    let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
999
1000    FundingRateUpdate::new(
1001        instrument_id,
1002        msg.funding_rate,
1003        None, // Next funding time not provided in this message
1004        ts_event,
1005        ts_init,
1006    )
1007}
1008
1009/// Parse a BitMEX wallet message into an AccountState.
1010///
1011/// BitMEX uses XBT (satoshis) as the base unit for Bitcoin.
1012/// 1 XBT = 0.00000001 BTC (1 satoshi).
1013///
1014/// # Panics
1015///
1016/// Panics if the balance calculation is invalid (total != locked + free).
1017#[must_use]
1018pub fn parse_wallet_msg(msg: BitmexWalletMsg, ts_init: UnixNanos) -> AccountState {
1019    let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
1020
1021    // Map BitMEX currency to standard currency code
1022    let currency_str = map_bitmex_currency(msg.currency.as_str());
1023    let currency = get_currency(&currency_str);
1024
1025    // BitMEX returns values in satoshis for BTC (XBt) or microunits for USDT/LAMp
1026    let divisor = if msg.currency == "XBt" {
1027        100_000_000.0 // Satoshis to BTC
1028    } else if msg.currency == "USDt" || msg.currency == "LAMp" {
1029        1_000_000.0 // Microunits to units
1030    } else {
1031        1.0
1032    };
1033    let amount = msg.amount.unwrap_or(0) as f64 / divisor;
1034
1035    let total = Money::new(amount, currency);
1036    let locked = Money::new(0.0, currency); // No locked amount info available
1037    let free = total - locked;
1038
1039    let balance = AccountBalance::new_checked(total, locked, free)
1040        .expect("Balance calculation should be valid");
1041
1042    AccountState::new(
1043        account_id,
1044        AccountType::Margin,
1045        vec![balance],
1046        vec![], // margins will be added separately
1047        true,   // is_reported
1048        UUID4::new(),
1049        ts_init,
1050        ts_init,
1051        None,
1052    )
1053}
1054
1055/// Parse a BitMEX margin message into margin balance information.
1056///
1057/// This creates a MarginBalance that can be added to an AccountState.
1058#[must_use]
1059pub fn parse_margin_msg(msg: BitmexMarginMsg, instrument_id: InstrumentId) -> MarginBalance {
1060    // Map BitMEX currency to standard currency code
1061    let currency_str = map_bitmex_currency(msg.currency.as_str());
1062    let currency = get_currency(&currency_str);
1063
1064    // BitMEX returns values in satoshis for BTC (XBt) or microunits for USDT/LAMp
1065    let divisor = if msg.currency == "XBt" {
1066        100_000_000.0 // Satoshis to BTC
1067    } else if msg.currency == "USDt" || msg.currency == "LAMp" {
1068        1_000_000.0 // Microunits to units
1069    } else {
1070        1.0
1071    };
1072
1073    let initial = (msg.init_margin.unwrap_or(0) as f64 / divisor).max(0.0);
1074    let maintenance = (msg.maint_margin.unwrap_or(0) as f64 / divisor).max(0.0);
1075    let _unrealized = msg.unrealised_pnl.unwrap_or(0) as f64 / divisor;
1076
1077    MarginBalance::new(
1078        Money::new(initial, currency),
1079        Money::new(maintenance, currency),
1080        instrument_id,
1081    )
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086    use chrono::{DateTime, Utc};
1087    use nautilus_model::{
1088        enums::{AggressorSide, BookAction, LiquiditySide, PositionSide},
1089        identifiers::Symbol,
1090        instruments::crypto_perpetual::CryptoPerpetual,
1091    };
1092    use rstest::rstest;
1093    use ustr::Ustr;
1094
1095    use super::*;
1096    use crate::common::{
1097        enums::{BitmexExecType, BitmexOrderStatus},
1098        testing::load_test_json,
1099    };
1100
1101    // Helper function to create a test perpetual instrument for tests
1102    fn create_test_perpetual_instrument_with_precisions(
1103        price_precision: u8,
1104        size_precision: u8,
1105    ) -> InstrumentAny {
1106        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
1107            InstrumentId::from("XBTUSD.BITMEX"),
1108            Symbol::new("XBTUSD"),
1109            Currency::BTC(),
1110            Currency::USD(),
1111            Currency::BTC(),
1112            true, // is_inverse
1113            price_precision,
1114            size_precision,
1115            Price::new(0.5, price_precision),
1116            Quantity::new(1.0, size_precision),
1117            None, // multiplier
1118            None, // lot_size
1119            None, // max_quantity
1120            None, // min_quantity
1121            None, // max_notional
1122            None, // min_notional
1123            None, // max_price
1124            None, // min_price
1125            None, // margin_init
1126            None, // margin_maint
1127            None, // maker_fee
1128            None, // taker_fee
1129            UnixNanos::default(),
1130            UnixNanos::default(),
1131        ))
1132    }
1133
1134    fn create_test_perpetual_instrument() -> InstrumentAny {
1135        create_test_perpetual_instrument_with_precisions(1, 0)
1136    }
1137
1138    #[rstest]
1139    fn test_orderbook_l2_message() {
1140        let json_data = load_test_json("ws_orderbook_l2.json");
1141
1142        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1143        let msg: BitmexOrderBookMsg = serde_json::from_str(&json_data).unwrap();
1144
1145        // Test Insert action
1146        let instrument = create_test_perpetual_instrument();
1147
1148        // Test Insert action (no snapshot flag)
1149        let delta = parse_book_msg(
1150            &msg,
1151            &BitmexAction::Insert,
1152            &instrument,
1153            instrument.id(),
1154            instrument.price_precision(),
1155            UnixNanos::from(3),
1156        );
1157        assert_eq!(delta.instrument_id, instrument_id);
1158        assert_eq!(delta.order.price, Price::from("98459.9"));
1159        assert_eq!(delta.order.size, Quantity::from(33000));
1160        assert_eq!(delta.order.side, OrderSide::Sell);
1161        assert_eq!(delta.order.order_id, 62400580205);
1162        assert_eq!(delta.action, BookAction::Add);
1163        assert_eq!(delta.flags, 0);
1164        assert_eq!(delta.sequence, 0);
1165        assert_eq!(delta.ts_event, 1732436782356000000); // 2024-11-24T08:26:22.356Z in nanos
1166        assert_eq!(delta.ts_init, 3);
1167
1168        // Test Partial action (should have F_SNAPSHOT flag)
1169        let delta = parse_book_msg(
1170            &msg,
1171            &BitmexAction::Partial,
1172            &instrument,
1173            instrument.id(),
1174            instrument.price_precision(),
1175            UnixNanos::from(3),
1176        );
1177        assert_eq!(delta.flags, RecordFlag::F_SNAPSHOT as u8);
1178        assert_eq!(delta.action, BookAction::Add);
1179
1180        // Test Update action (no flags)
1181        let delta = parse_book_msg(
1182            &msg,
1183            &BitmexAction::Update,
1184            &instrument,
1185            instrument.id(),
1186            instrument.price_precision(),
1187            UnixNanos::from(3),
1188        );
1189        assert_eq!(delta.flags, 0);
1190        assert_eq!(delta.action, BookAction::Update);
1191    }
1192
1193    #[rstest]
1194    fn test_orderbook10_message() {
1195        let json_data = load_test_json("ws_orderbook_10.json");
1196        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1197        let msg: BitmexOrderBook10Msg = serde_json::from_str(&json_data).unwrap();
1198        let instrument = create_test_perpetual_instrument();
1199        let depth10 = parse_book10_msg(
1200            &msg,
1201            &instrument,
1202            instrument.id(),
1203            instrument.price_precision(),
1204            UnixNanos::from(3),
1205        )
1206        .unwrap();
1207
1208        assert_eq!(depth10.instrument_id, instrument_id);
1209
1210        // Check first bid level
1211        assert_eq!(depth10.bids[0].price, Price::from("98490.3"));
1212        assert_eq!(depth10.bids[0].size, Quantity::from(22400));
1213        assert_eq!(depth10.bids[0].side, OrderSide::Buy);
1214
1215        // Check first ask level
1216        assert_eq!(depth10.asks[0].price, Price::from("98490.4"));
1217        assert_eq!(depth10.asks[0].size, Quantity::from(17600));
1218        assert_eq!(depth10.asks[0].side, OrderSide::Sell);
1219
1220        // Check counts (should be 1 for each populated level)
1221        assert_eq!(depth10.bid_counts, [1; DEPTH10_LEN]);
1222        assert_eq!(depth10.ask_counts, [1; DEPTH10_LEN]);
1223
1224        // Check flags and timestamps
1225        assert_eq!(depth10.sequence, 0);
1226        assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
1227        assert_eq!(depth10.ts_event, 1732436353513000000); // 2024-11-24T08:19:13.513Z in nanos
1228        assert_eq!(depth10.ts_init, 3);
1229    }
1230
1231    #[rstest]
1232    fn test_quote_message() {
1233        let json_data = load_test_json("ws_quote.json");
1234
1235        let instrument_id = InstrumentId::from("BCHUSDT.BITMEX");
1236        let last_quote = QuoteTick::new(
1237            instrument_id,
1238            Price::new(487.50, 2),
1239            Price::new(488.20, 2),
1240            Quantity::from(100_000),
1241            Quantity::from(100_000),
1242            UnixNanos::from(1),
1243            UnixNanos::from(2),
1244        );
1245        let msg: BitmexQuoteMsg = serde_json::from_str(&json_data).unwrap();
1246        let instrument = create_test_perpetual_instrument_with_precisions(2, 0);
1247        let quote = parse_quote_msg(
1248            &msg,
1249            &last_quote,
1250            &instrument,
1251            instrument_id,
1252            instrument.price_precision(),
1253            UnixNanos::from(3),
1254        );
1255
1256        assert_eq!(quote.instrument_id, instrument_id);
1257        assert_eq!(quote.bid_price, Price::from("487.55"));
1258        assert_eq!(quote.ask_price, Price::from("488.25"));
1259        assert_eq!(quote.bid_size, Quantity::from(103_000));
1260        assert_eq!(quote.ask_size, Quantity::from(50_000));
1261        assert_eq!(quote.ts_event, 1732315465085000000);
1262        assert_eq!(quote.ts_init, 3);
1263    }
1264
1265    #[rstest]
1266    fn test_trade_message() {
1267        let json_data = load_test_json("ws_trade.json");
1268
1269        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1270        let msg: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
1271        let instrument = create_test_perpetual_instrument();
1272        let trade = parse_trade_msg(
1273            &msg,
1274            &instrument,
1275            instrument.id(),
1276            instrument.price_precision(),
1277            UnixNanos::from(3),
1278        );
1279
1280        assert_eq!(trade.instrument_id, instrument_id);
1281        assert_eq!(trade.price, Price::from("98570.9"));
1282        assert_eq!(trade.size, Quantity::from(100));
1283        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1284        assert_eq!(
1285            trade.trade_id.to_string(),
1286            "00000000-006d-1000-0000-000e8737d536"
1287        );
1288        assert_eq!(trade.ts_event, 1732436138704000000); // 2024-11-24T08:15:38.704Z in nanos
1289        assert_eq!(trade.ts_init, 3);
1290    }
1291
1292    #[rstest]
1293    fn test_trade_bin_message() {
1294        let json_data = load_test_json("ws_trade_bin_1m.json");
1295
1296        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1297        let topic = BitmexWsTopic::TradeBin1m;
1298
1299        let msg: BitmexTradeBinMsg = serde_json::from_str(&json_data).unwrap();
1300        let instrument = create_test_perpetual_instrument();
1301        let bar = parse_trade_bin_msg(
1302            &msg,
1303            &topic,
1304            &instrument,
1305            instrument.id(),
1306            instrument.price_precision(),
1307            UnixNanos::from(3),
1308        );
1309
1310        assert_eq!(bar.instrument_id(), instrument_id);
1311        assert_eq!(
1312            bar.bar_type.spec(),
1313            BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
1314        );
1315        assert_eq!(bar.open, Price::from("97550.0"));
1316        assert_eq!(bar.high, Price::from("97584.4"));
1317        assert_eq!(bar.low, Price::from("97550.0"));
1318        assert_eq!(bar.close, Price::from("97570.1"));
1319        assert_eq!(bar.volume, Quantity::from(84_000));
1320        assert_eq!(bar.ts_event, 1732392420000000000); // 2024-11-23T20:07:00.000Z in nanos
1321        assert_eq!(bar.ts_init, 3);
1322    }
1323
1324    #[rstest]
1325    fn test_trade_bin_message_extreme_adjustment() {
1326        let topic = BitmexWsTopic::TradeBin1m;
1327        let instrument = create_test_perpetual_instrument();
1328
1329        let msg = BitmexTradeBinMsg {
1330            timestamp: DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
1331                .unwrap()
1332                .with_timezone(&Utc),
1333            symbol: Ustr::from("XBTUSD"),
1334            open: 50_000.0,
1335            high: 49_990.0,
1336            low: 50_010.0,
1337            close: 50_005.0,
1338            trades: 10,
1339            volume: 1_000,
1340            vwap: Some(0.0),
1341            last_size: Some(0),
1342            turnover: 0,
1343            home_notional: 0.0,
1344            foreign_notional: 0.0,
1345            pool: None,
1346        };
1347
1348        let bar = parse_trade_bin_msg(
1349            &msg,
1350            &topic,
1351            &instrument,
1352            instrument.id(),
1353            instrument.price_precision(),
1354            UnixNanos::from(3),
1355        );
1356
1357        assert_eq!(bar.high, Price::from("50010.0"));
1358        assert_eq!(bar.low, Price::from("49990.0"));
1359        assert_eq!(bar.open, Price::from("50000.0"));
1360        assert_eq!(bar.close, Price::from("50005.0"));
1361        assert_eq!(bar.volume, Quantity::from(1_000));
1362    }
1363
1364    #[rstest]
1365    fn test_parse_order_msg() {
1366        let json_data = load_test_json("ws_order.json");
1367        let msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1368        let cache = DashMap::new();
1369        let instrument = create_test_perpetual_instrument();
1370        let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1371
1372        assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1373        assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1374        assert_eq!(
1375            report.venue_order_id.to_string(),
1376            "550e8400-e29b-41d4-a716-446655440001"
1377        );
1378        assert_eq!(
1379            report.client_order_id.unwrap().to_string(),
1380            "mm_bitmex_1a/oemUeQ4CAJZgP3fjHsA"
1381        );
1382        assert_eq!(report.order_side, OrderSide::Buy);
1383        assert_eq!(report.order_type, OrderType::Limit);
1384        assert_eq!(report.time_in_force, TimeInForce::Gtc);
1385        assert_eq!(report.order_status, OrderStatus::Accepted);
1386        assert_eq!(report.quantity, Quantity::from(100));
1387        assert_eq!(report.filled_qty, Quantity::from(0));
1388        assert_eq!(report.price.unwrap(), Price::from("98000.0"));
1389        assert_eq!(report.ts_accepted, 1732530600000000000); // 2024-11-25T10:30:00.000Z
1390    }
1391
1392    #[rstest]
1393    fn test_parse_order_msg_infers_type_when_missing() {
1394        let json_data = load_test_json("ws_order.json");
1395        let mut msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1396        msg.ord_type = None;
1397        msg.cl_ord_id = None;
1398        msg.price = Some(98_000.0);
1399        msg.stop_px = None;
1400
1401        let cache = DashMap::new();
1402        let instrument = create_test_perpetual_instrument();
1403
1404        let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1405
1406        assert_eq!(report.order_type, OrderType::Limit);
1407    }
1408
1409    #[rstest]
1410    fn test_parse_order_msg_rejected_with_reason() {
1411        let mut msg: BitmexOrderMsg =
1412            serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1413        msg.ord_status = BitmexOrderStatus::Rejected;
1414        msg.ord_rej_reason = Some(Ustr::from("Insufficient available balance"));
1415        msg.text = None;
1416        msg.cum_qty = 0;
1417
1418        let cache = DashMap::new();
1419        let instrument = create_test_perpetual_instrument();
1420        let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1421
1422        assert_eq!(report.order_status, OrderStatus::Rejected);
1423        assert_eq!(
1424            report.cancel_reason,
1425            Some("Insufficient available balance".to_string())
1426        );
1427    }
1428
1429    #[rstest]
1430    fn test_parse_order_msg_rejected_with_text_fallback() {
1431        let mut msg: BitmexOrderMsg =
1432            serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1433        msg.ord_status = BitmexOrderStatus::Rejected;
1434        msg.ord_rej_reason = None;
1435        msg.text = Some(Ustr::from("Order would execute immediately"));
1436        msg.cum_qty = 0;
1437
1438        let cache = DashMap::new();
1439        let instrument = create_test_perpetual_instrument();
1440        let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1441
1442        assert_eq!(report.order_status, OrderStatus::Rejected);
1443        assert_eq!(
1444            report.cancel_reason,
1445            Some("Order would execute immediately".to_string())
1446        );
1447    }
1448
1449    #[rstest]
1450    fn test_parse_order_msg_rejected_without_reason() {
1451        let mut msg: BitmexOrderMsg =
1452            serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1453        msg.ord_status = BitmexOrderStatus::Rejected;
1454        msg.ord_rej_reason = None;
1455        msg.text = None;
1456        msg.cum_qty = 0;
1457
1458        let cache = DashMap::new();
1459        let instrument = create_test_perpetual_instrument();
1460        let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1461
1462        assert_eq!(report.order_status, OrderStatus::Rejected);
1463        assert_eq!(report.cancel_reason, None);
1464    }
1465
1466    #[rstest]
1467    fn test_parse_execution_msg() {
1468        let json_data = load_test_json("ws_execution.json");
1469        let msg: BitmexExecutionMsg = serde_json::from_str(&json_data).unwrap();
1470        let instrument = create_test_perpetual_instrument();
1471        let fill = parse_execution_msg(msg, &instrument).unwrap();
1472
1473        assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1474        assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1475        assert_eq!(
1476            fill.venue_order_id.to_string(),
1477            "550e8400-e29b-41d4-a716-446655440002"
1478        );
1479        assert_eq!(
1480            fill.trade_id.to_string(),
1481            "00000000-006d-1000-0000-000e8737d540"
1482        );
1483        assert_eq!(
1484            fill.client_order_id.unwrap().to_string(),
1485            "mm_bitmex_2b/oemUeQ4CAJZgP3fjHsB"
1486        );
1487        assert_eq!(fill.order_side, OrderSide::Sell);
1488        assert_eq!(fill.last_qty, Quantity::from(100));
1489        assert_eq!(fill.last_px, Price::from("98950.0"));
1490        assert_eq!(fill.liquidity_side, LiquiditySide::Maker);
1491        assert_eq!(fill.commission, Money::new(0.00075, Currency::from("XBT")));
1492        assert_eq!(fill.commission.currency.code.to_string(), "XBT");
1493        assert_eq!(fill.ts_event, 1732530900789000000); // 2024-11-25T10:35:00.789Z
1494    }
1495
1496    #[rstest]
1497    fn test_parse_execution_msg_non_trade() {
1498        // Test that non-trade executions return None
1499        let mut msg: BitmexExecutionMsg =
1500            serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1501        msg.exec_type = Some(BitmexExecType::Settlement);
1502
1503        let instrument = create_test_perpetual_instrument();
1504        let result = parse_execution_msg(msg, &instrument);
1505        assert!(result.is_none());
1506    }
1507
1508    #[rstest]
1509    fn test_parse_cancel_reject_execution() {
1510        // Test that CancelReject messages can be parsed (even without symbol)
1511        let json = load_test_json("ws_execution_cancel_reject.json");
1512
1513        let msg: BitmexExecutionMsg = serde_json::from_str(&json).unwrap();
1514        assert_eq!(msg.exec_type, Some(BitmexExecType::CancelReject));
1515        assert_eq!(msg.ord_status, Some(BitmexOrderStatus::Rejected));
1516        assert_eq!(msg.symbol, None);
1517
1518        // Should return None since it's not a Trade
1519        let instrument = create_test_perpetual_instrument();
1520        let result = parse_execution_msg(msg, &instrument);
1521        assert!(result.is_none());
1522    }
1523
1524    #[rstest]
1525    fn test_parse_execution_msg_liquidation() {
1526        // Critical for ADL/hedge tracking
1527        let mut msg: BitmexExecutionMsg =
1528            serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1529        msg.exec_type = Some(BitmexExecType::Liquidation);
1530
1531        let instrument = create_test_perpetual_instrument();
1532        let fill = parse_execution_msg(msg, &instrument).unwrap();
1533
1534        assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1535        assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1536        assert_eq!(fill.order_side, OrderSide::Sell);
1537        assert_eq!(fill.last_qty, Quantity::from(100));
1538        assert_eq!(fill.last_px, Price::from("98950.0"));
1539    }
1540
1541    #[rstest]
1542    fn test_parse_execution_msg_bankruptcy() {
1543        let mut msg: BitmexExecutionMsg =
1544            serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1545        msg.exec_type = Some(BitmexExecType::Bankruptcy);
1546
1547        let instrument = create_test_perpetual_instrument();
1548        let fill = parse_execution_msg(msg, &instrument).unwrap();
1549
1550        assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1551        assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1552        assert_eq!(fill.order_side, OrderSide::Sell);
1553        assert_eq!(fill.last_qty, Quantity::from(100));
1554    }
1555
1556    #[rstest]
1557    fn test_parse_execution_msg_settlement() {
1558        let mut msg: BitmexExecutionMsg =
1559            serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1560        msg.exec_type = Some(BitmexExecType::Settlement);
1561
1562        let instrument = create_test_perpetual_instrument();
1563        let result = parse_execution_msg(msg, &instrument);
1564        assert!(result.is_none());
1565    }
1566
1567    #[rstest]
1568    fn test_parse_execution_msg_trial_fill() {
1569        let mut msg: BitmexExecutionMsg =
1570            serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1571        msg.exec_type = Some(BitmexExecType::TrialFill);
1572
1573        let instrument = create_test_perpetual_instrument();
1574        let result = parse_execution_msg(msg, &instrument);
1575        assert!(result.is_none());
1576    }
1577
1578    #[rstest]
1579    fn test_parse_execution_msg_funding() {
1580        let mut msg: BitmexExecutionMsg =
1581            serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1582        msg.exec_type = Some(BitmexExecType::Funding);
1583
1584        let instrument = create_test_perpetual_instrument();
1585        let result = parse_execution_msg(msg, &instrument);
1586        assert!(result.is_none());
1587    }
1588
1589    #[rstest]
1590    fn test_parse_execution_msg_insurance() {
1591        let mut msg: BitmexExecutionMsg =
1592            serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1593        msg.exec_type = Some(BitmexExecType::Insurance);
1594
1595        let instrument = create_test_perpetual_instrument();
1596        let result = parse_execution_msg(msg, &instrument);
1597        assert!(result.is_none());
1598    }
1599
1600    #[rstest]
1601    fn test_parse_execution_msg_rebalance() {
1602        let mut msg: BitmexExecutionMsg =
1603            serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1604        msg.exec_type = Some(BitmexExecType::Rebalance);
1605
1606        let instrument = create_test_perpetual_instrument();
1607        let result = parse_execution_msg(msg, &instrument);
1608        assert!(result.is_none());
1609    }
1610
1611    #[rstest]
1612    fn test_parse_execution_msg_order_state_changes() {
1613        let instrument = create_test_perpetual_instrument();
1614
1615        let order_state_types = vec![
1616            BitmexExecType::New,
1617            BitmexExecType::Canceled,
1618            BitmexExecType::CancelReject,
1619            BitmexExecType::Replaced,
1620            BitmexExecType::Rejected,
1621            BitmexExecType::AmendReject,
1622            BitmexExecType::Suspended,
1623            BitmexExecType::Released,
1624            BitmexExecType::TriggeredOrActivatedBySystem,
1625        ];
1626
1627        for exec_type in order_state_types {
1628            let mut msg: BitmexExecutionMsg =
1629                serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1630            msg.exec_type = Some(exec_type.clone());
1631
1632            let result = parse_execution_msg(msg, &instrument);
1633            assert!(
1634                result.is_none(),
1635                "Expected None for exec_type {exec_type:?}"
1636            );
1637        }
1638    }
1639
1640    #[rstest]
1641    fn test_parse_position_msg() {
1642        let json_data = load_test_json("ws_position.json");
1643        let msg: BitmexPositionMsg = serde_json::from_str(&json_data).unwrap();
1644        let instrument = create_test_perpetual_instrument();
1645        let report = parse_position_msg(msg, &instrument);
1646
1647        assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1648        assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1649        assert_eq!(report.position_side.as_position_side(), PositionSide::Long);
1650        assert_eq!(report.quantity, Quantity::from(1000));
1651        assert!(report.venue_position_id.is_none());
1652        assert_eq!(report.ts_last, 1732530900789000000); // 2024-11-25T10:35:00.789Z
1653    }
1654
1655    #[rstest]
1656    fn test_parse_position_msg_short() {
1657        let mut msg: BitmexPositionMsg =
1658            serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1659        msg.current_qty = Some(-500);
1660
1661        let instrument = create_test_perpetual_instrument();
1662        let report = parse_position_msg(msg, &instrument);
1663        assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
1664        assert_eq!(report.quantity, Quantity::from(500));
1665    }
1666
1667    #[rstest]
1668    fn test_parse_position_msg_flat() {
1669        let mut msg: BitmexPositionMsg =
1670            serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1671        msg.current_qty = Some(0);
1672
1673        let instrument = create_test_perpetual_instrument();
1674        let report = parse_position_msg(msg, &instrument);
1675        assert_eq!(report.position_side.as_position_side(), PositionSide::Flat);
1676        assert_eq!(report.quantity, Quantity::from(0));
1677    }
1678
1679    #[rstest]
1680    fn test_parse_wallet_msg() {
1681        let json_data = load_test_json("ws_wallet.json");
1682        let msg: BitmexWalletMsg = serde_json::from_str(&json_data).unwrap();
1683        let ts_init = UnixNanos::from(1);
1684        let account_state = parse_wallet_msg(msg, ts_init);
1685
1686        assert_eq!(account_state.account_id.to_string(), "BITMEX-1234567");
1687        assert!(!account_state.balances.is_empty());
1688        let balance = &account_state.balances[0];
1689        assert_eq!(balance.currency.code.to_string(), "XBT");
1690        // Amount should be converted from satoshis (100005180 / 100_000_000.0 = 1.0000518)
1691        assert!((balance.total.as_f64() - 1.0000518).abs() < 1e-7);
1692    }
1693
1694    #[rstest]
1695    fn test_parse_wallet_msg_no_amount() {
1696        let mut msg: BitmexWalletMsg =
1697            serde_json::from_str(&load_test_json("ws_wallet.json")).unwrap();
1698        msg.amount = None;
1699
1700        let ts_init = UnixNanos::from(1);
1701        let account_state = parse_wallet_msg(msg, ts_init);
1702        let balance = &account_state.balances[0];
1703        assert_eq!(balance.total.as_f64(), 0.0);
1704    }
1705
1706    #[rstest]
1707    fn test_parse_margin_msg() {
1708        let json_data = load_test_json("ws_margin.json");
1709        let msg: BitmexMarginMsg = serde_json::from_str(&json_data).unwrap();
1710        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1711        let margin_balance = parse_margin_msg(msg, instrument_id);
1712
1713        assert_eq!(margin_balance.currency.code.to_string(), "XBT");
1714        assert_eq!(margin_balance.instrument_id, instrument_id);
1715        // Values should be converted from satoshis to BTC
1716        // initMargin is 0 in test data, so should be 0.0
1717        assert_eq!(margin_balance.initial.as_f64(), 0.0);
1718        // maintMargin is 15949 satoshis = 0.00015949 BTC
1719        assert!((margin_balance.maintenance.as_f64() - 0.00015949).abs() < 1e-8);
1720    }
1721
1722    #[rstest]
1723    fn test_parse_margin_msg_no_available() {
1724        let mut msg: BitmexMarginMsg =
1725            serde_json::from_str(&load_test_json("ws_margin.json")).unwrap();
1726        msg.available_margin = None;
1727
1728        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1729        let margin_balance = parse_margin_msg(msg, instrument_id);
1730        // Should still have valid margin values even if available_margin is None
1731        assert!(margin_balance.initial.as_f64() >= 0.0);
1732        assert!(margin_balance.maintenance.as_f64() >= 0.0);
1733    }
1734
1735    #[rstest]
1736    fn test_parse_instrument_msg_both_prices() {
1737        let json_data = load_test_json("ws_instrument.json");
1738        let msg: BitmexInstrumentMsg = serde_json::from_str(&json_data).unwrap();
1739
1740        // Create cache with test instrument
1741        let mut instruments_cache = AHashMap::new();
1742        let test_instrument = create_test_perpetual_instrument();
1743        instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1744
1745        let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1746
1747        // XBTUSD is not an index symbol, so it should have both mark and index prices
1748        assert_eq!(updates.len(), 2);
1749
1750        // Check mark price update
1751        match &updates[0] {
1752            Data::MarkPriceUpdate(update) => {
1753                assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1754                assert_eq!(update.value.as_f64(), 95125.7);
1755            }
1756            _ => panic!("Expected MarkPriceUpdate at index 0"),
1757        }
1758
1759        // Check index price update
1760        match &updates[1] {
1761            Data::IndexPriceUpdate(update) => {
1762                assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1763                assert_eq!(update.value.as_f64(), 95124.3);
1764            }
1765            _ => panic!("Expected IndexPriceUpdate at index 1"),
1766        }
1767    }
1768
1769    #[rstest]
1770    fn test_parse_instrument_msg_mark_price_only() {
1771        let mut msg: BitmexInstrumentMsg =
1772            serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1773        msg.index_price = None;
1774
1775        // Create cache with test instrument
1776        let mut instruments_cache = AHashMap::new();
1777        let test_instrument = create_test_perpetual_instrument();
1778        instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1779
1780        let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1781
1782        assert_eq!(updates.len(), 1);
1783        match &updates[0] {
1784            Data::MarkPriceUpdate(update) => {
1785                assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1786                assert_eq!(update.value.as_f64(), 95125.7);
1787            }
1788            _ => panic!("Expected MarkPriceUpdate"),
1789        }
1790    }
1791
1792    #[rstest]
1793    fn test_parse_instrument_msg_index_price_only() {
1794        let mut msg: BitmexInstrumentMsg =
1795            serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1796        msg.mark_price = None;
1797
1798        // Create cache with test instrument
1799        let mut instruments_cache = AHashMap::new();
1800        let test_instrument = create_test_perpetual_instrument();
1801        instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1802
1803        let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1804
1805        assert_eq!(updates.len(), 1);
1806        match &updates[0] {
1807            Data::IndexPriceUpdate(update) => {
1808                assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1809                assert_eq!(update.value.as_f64(), 95124.3);
1810            }
1811            _ => panic!("Expected IndexPriceUpdate"),
1812        }
1813    }
1814
1815    #[rstest]
1816    fn test_parse_instrument_msg_no_prices() {
1817        let mut msg: BitmexInstrumentMsg =
1818            serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1819        msg.mark_price = None;
1820        msg.index_price = None;
1821        msg.last_price = None;
1822
1823        // Create cache with test instrument
1824        let mut instruments_cache = AHashMap::new();
1825        let test_instrument = create_test_perpetual_instrument();
1826        instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1827
1828        let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1829        assert_eq!(updates.len(), 0);
1830    }
1831
1832    #[rstest]
1833    fn test_parse_instrument_msg_index_symbol() {
1834        // Test for index symbols like .BXBT where lastPrice is the index price
1835        // and markPrice equals lastPrice
1836        let mut msg: BitmexInstrumentMsg =
1837            serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1838        msg.symbol = Ustr::from(".BXBT");
1839        msg.last_price = Some(119163.05);
1840        msg.mark_price = Some(119163.05); // Index symbols have mark price equal to last price
1841        msg.index_price = None;
1842
1843        // Create instruments cache with proper precision for .BXBT
1844        let instrument_id = InstrumentId::from(".BXBT.BITMEX");
1845        let instrument = CryptoPerpetual::new(
1846            instrument_id,
1847            Symbol::from(".BXBT"),
1848            Currency::BTC(),
1849            Currency::USD(),
1850            Currency::USD(),
1851            false, // is_inverse
1852            2,     // price_precision (for 119163.05)
1853            8,     // size_precision
1854            Price::from("0.01"),
1855            Quantity::from("0.00000001"),
1856            None,                 // multiplier
1857            None,                 // lot_size
1858            None,                 // max_quantity
1859            None,                 // min_quantity
1860            None,                 // max_notional
1861            None,                 // min_notional
1862            None,                 // max_price
1863            None,                 // min_price
1864            None,                 // margin_init
1865            None,                 // margin_maint
1866            None,                 // maker_fee
1867            None,                 // taker_fee
1868            UnixNanos::default(), // ts_event
1869            UnixNanos::default(), // ts_init
1870        );
1871        let mut instruments_cache = AHashMap::new();
1872        instruments_cache.insert(
1873            Ustr::from(".BXBT"),
1874            InstrumentAny::CryptoPerpetual(instrument),
1875        );
1876
1877        let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1878
1879        assert_eq!(updates.len(), 2);
1880
1881        // Check mark price update
1882        match &updates[0] {
1883            Data::MarkPriceUpdate(update) => {
1884                assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1885                assert_eq!(update.value, Price::from("119163.05"));
1886            }
1887            _ => panic!("Expected MarkPriceUpdate for index symbol"),
1888        }
1889
1890        // Check index price update
1891        match &updates[1] {
1892            Data::IndexPriceUpdate(update) => {
1893                assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1894                assert_eq!(update.value, Price::from("119163.05"));
1895                assert_eq!(update.ts_init, UnixNanos::from(1));
1896            }
1897            _ => panic!("Expected IndexPriceUpdate for index symbol"),
1898        }
1899    }
1900
1901    #[rstest]
1902    fn test_parse_funding_msg() {
1903        let json_data = load_test_json("ws_funding_rate.json");
1904        let msg: BitmexFundingMsg = serde_json::from_str(&json_data).unwrap();
1905        let update = parse_funding_msg(msg, UnixNanos::from(1));
1906
1907        assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1908        assert_eq!(update.rate.to_string(), "0.0001");
1909        assert!(update.next_funding_ns.is_none());
1910        assert_eq!(update.ts_event, UnixNanos::from(1732507200000000000));
1911        assert_eq!(update.ts_init, UnixNanos::from(1));
1912    }
1913}