nautilus_bitmex/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{num::NonZero, str::FromStr};
17
18use ahash::AHashMap;
19use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime, uuid::UUID4};
20use nautilus_model::{
21    data::{
22        Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
23        MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
24        depth::DEPTH10_LEN,
25    },
26    enums::{
27        AccountType, AggregationSource, BarAggregation, OrderSide, OrderStatus, OrderType,
28        PriceType, RecordFlag, TimeInForce,
29    },
30    events::{OrderUpdated, account::state::AccountState},
31    identifiers::{
32        AccountId, ClientOrderId, InstrumentId, OrderListId, StrategyId, Symbol, TradeId, TraderId,
33        VenueOrderId,
34    },
35    instruments::{Instrument, InstrumentAny},
36    reports::{FillReport, OrderStatusReport, PositionStatusReport},
37    types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
38};
39use rust_decimal::Decimal;
40use ustr::Ustr;
41use uuid::Uuid;
42
43use super::{
44    enums::{BitmexAction, BitmexWsTopic},
45    messages::{
46        BitmexExecutionMsg, BitmexFundingMsg, BitmexInstrumentMsg, BitmexMarginMsg,
47        BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexOrderMsg, BitmexPositionMsg,
48        BitmexQuoteMsg, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg,
49    },
50};
51use crate::{
52    common::{
53        consts::BITMEX_VENUE,
54        enums::{BitmexExecInstruction, BitmexExecType, BitmexSide},
55        parse::{
56            map_bitmex_currency, parse_contracts_quantity, parse_frac_quantity,
57            parse_instrument_id, parse_liquidity_side, parse_optional_datetime_to_unix_nanos,
58            parse_position_side,
59        },
60    },
61    websocket::messages::BitmexOrderUpdateMsg,
62};
63
64const BAR_SPEC_1_MINUTE: BarSpecification = BarSpecification {
65    step: NonZero::new(1).expect("1 is a valid non-zero usize"),
66    aggregation: BarAggregation::Minute,
67    price_type: PriceType::Last,
68};
69const BAR_SPEC_5_MINUTE: BarSpecification = BarSpecification {
70    step: NonZero::new(5).expect("5 is a valid non-zero usize"),
71    aggregation: BarAggregation::Minute,
72    price_type: PriceType::Last,
73};
74const BAR_SPEC_1_HOUR: BarSpecification = BarSpecification {
75    step: NonZero::new(1).expect("1 is a valid non-zero usize"),
76    aggregation: BarAggregation::Hour,
77    price_type: PriceType::Last,
78};
79const BAR_SPEC_1_DAY: BarSpecification = BarSpecification {
80    step: NonZero::new(1).expect("1 is a valid non-zero usize"),
81    aggregation: BarAggregation::Day,
82    price_type: PriceType::Last,
83};
84
85/// Check if a symbol is an index symbol (starts with '.').
86///
87/// Index symbols in BitMEX represent indices like `.BXBT` and have different
88/// behavior from regular instruments:
89/// - They only have a single price value (no bid/ask spread).
90/// - They don't have trades or quotes.
91/// - Their price is delivered via the `lastPrice` field.
92#[inline]
93#[must_use]
94pub fn is_index_symbol(symbol: &Ustr) -> bool {
95    symbol.starts_with('.')
96}
97
98#[must_use]
99pub fn parse_book_msg_vec(
100    data: Vec<BitmexOrderBookMsg>,
101    action: BitmexAction,
102    price_precision: u8,
103    ts_init: UnixNanos,
104) -> Vec<Data> {
105    let mut deltas = Vec::with_capacity(data.len());
106
107    for msg in data {
108        deltas.push(Data::Delta(parse_book_msg(
109            &msg,
110            &action,
111            price_precision,
112            ts_init,
113        )));
114    }
115    deltas
116}
117
118#[must_use]
119pub fn parse_book10_msg_vec(
120    data: Vec<BitmexOrderBook10Msg>,
121    price_precision: u8,
122    ts_init: UnixNanos,
123) -> Vec<Data> {
124    let mut depths = Vec::with_capacity(data.len());
125
126    for msg in data {
127        depths.push(Data::Depth10(Box::new(parse_book10_msg(
128            &msg,
129            price_precision,
130            ts_init,
131        ))));
132    }
133    depths
134}
135
136#[must_use]
137pub fn parse_trade_msg_vec(
138    data: Vec<BitmexTradeMsg>,
139    price_precision: u8,
140    ts_init: UnixNanos,
141) -> Vec<Data> {
142    let mut trades = Vec::with_capacity(data.len());
143
144    for msg in data {
145        trades.push(Data::Trade(parse_trade_msg(&msg, price_precision, ts_init)));
146    }
147    trades
148}
149
150#[must_use]
151pub fn parse_trade_bin_msg_vec(
152    data: Vec<BitmexTradeBinMsg>,
153    topic: BitmexWsTopic,
154    price_precision: u8,
155    ts_init: UnixNanos,
156) -> Vec<Data> {
157    let mut trades = Vec::with_capacity(data.len());
158
159    for msg in data {
160        trades.push(Data::Bar(parse_trade_bin_msg(
161            &msg,
162            &topic,
163            price_precision,
164            ts_init,
165        )));
166    }
167    trades
168}
169
170#[allow(clippy::too_many_arguments)]
171#[must_use]
172pub fn parse_book_msg(
173    msg: &BitmexOrderBookMsg,
174    action: &BitmexAction,
175    price_precision: u8,
176    ts_init: UnixNanos,
177) -> OrderBookDelta {
178    let flags = if action == &BitmexAction::Insert {
179        RecordFlag::F_SNAPSHOT as u8
180    } else {
181        0
182    };
183
184    let instrument_id = parse_instrument_id(msg.symbol);
185    let action = action.as_book_action();
186    let price = Price::new(msg.price, price_precision);
187    let side = msg.side.as_order_side();
188    let size = parse_contracts_quantity(msg.size.unwrap_or(0));
189    let order_id = msg.id;
190    let order = BookOrder::new(side, price, size, order_id);
191    let sequence = 0; // Not available
192    let ts_event = UnixNanos::from(msg.transact_time);
193
194    OrderBookDelta::new(
195        instrument_id,
196        action,
197        order,
198        flags,
199        sequence,
200        ts_event,
201        ts_init,
202    )
203}
204
205/// Parses an `OrderBook10` message into an `OrderBookDepth10` object.
206///
207/// # Panics
208///
209/// Panics if the bid or ask arrays cannot be converted to exactly 10 elements.
210#[allow(clippy::too_many_arguments)]
211#[must_use]
212pub fn parse_book10_msg(
213    msg: &BitmexOrderBook10Msg,
214    price_precision: u8,
215    ts_init: UnixNanos,
216) -> OrderBookDepth10 {
217    let instrument_id = parse_instrument_id(msg.symbol);
218
219    let mut bids = Vec::with_capacity(DEPTH10_LEN);
220    let mut asks = Vec::with_capacity(DEPTH10_LEN);
221
222    // Initialized with zeros
223    let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
224    let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
225
226    for (i, level) in msg.bids.iter().enumerate() {
227        let bid_order = BookOrder::new(
228            OrderSide::Buy,
229            Price::new(level[0], price_precision),
230            parse_frac_quantity(level[1], 0),
231            0,
232        );
233
234        bids.push(bid_order);
235        bid_counts[i] = 1;
236    }
237
238    for (i, level) in msg.asks.iter().enumerate() {
239        let ask_order = BookOrder::new(
240            OrderSide::Sell,
241            Price::new(level[0], price_precision),
242            parse_frac_quantity(level[1], 0),
243            0,
244        );
245
246        asks.push(ask_order);
247        ask_counts[i] = 1;
248    }
249
250    let bids: [BookOrder; DEPTH10_LEN] = bids
251        .try_into()
252        .inspect_err(|v: &Vec<BookOrder>| {
253            tracing::error!("Bids length mismatch: expected 10, got {}", v.len());
254        })
255        .expect("BitMEX orderBook10 should always have exactly 10 bid levels");
256    let asks: [BookOrder; DEPTH10_LEN] = asks
257        .try_into()
258        .inspect_err(|v: &Vec<BookOrder>| {
259            tracing::error!("Asks length mismatch: expected 10, got {}", v.len());
260        })
261        .expect("BitMEX orderBook10 should always have exactly 10 ask levels");
262
263    let ts_event = UnixNanos::from(msg.timestamp);
264
265    OrderBookDepth10::new(
266        instrument_id,
267        bids,
268        asks,
269        bid_counts,
270        ask_counts,
271        RecordFlag::F_SNAPSHOT as u8,
272        0, // Not applicable for BitMEX L2 books
273        ts_event,
274        ts_init,
275    )
276}
277
278#[must_use]
279pub fn parse_quote_msg(
280    msg: &BitmexQuoteMsg,
281    last_quote: &QuoteTick,
282    price_precision: u8,
283    ts_init: UnixNanos,
284) -> QuoteTick {
285    let instrument_id = parse_instrument_id(msg.symbol);
286
287    let bid_price = match msg.bid_price {
288        Some(price) => Price::new(price, price_precision),
289        None => last_quote.bid_price,
290    };
291
292    let ask_price = match msg.ask_price {
293        Some(price) => Price::new(price, price_precision),
294        None => last_quote.ask_price,
295    };
296
297    let bid_size = match msg.bid_size {
298        Some(size) => parse_contracts_quantity(size),
299        None => last_quote.bid_size,
300    };
301
302    let ask_size = match msg.ask_size {
303        Some(size) => parse_contracts_quantity(size),
304        None => last_quote.ask_size,
305    };
306
307    let ts_event = UnixNanos::from(msg.timestamp);
308
309    QuoteTick::new(
310        instrument_id,
311        bid_price,
312        ask_price,
313        bid_size,
314        ask_size,
315        ts_event,
316        ts_init,
317    )
318}
319
320#[must_use]
321pub fn parse_trade_msg(msg: &BitmexTradeMsg, price_precision: u8, ts_init: UnixNanos) -> TradeTick {
322    let instrument_id = parse_instrument_id(msg.symbol);
323    let price = Price::new(msg.price, price_precision);
324    let size = parse_contracts_quantity(msg.size);
325    let aggressor_side = msg.side.as_aggressor_side();
326    let trade_id = TradeId::new(
327        msg.trd_match_id
328            .map_or_else(|| Uuid::new_v4().to_string(), |uuid| uuid.to_string()),
329    );
330    let ts_event = UnixNanos::from(msg.timestamp);
331
332    TradeTick::new(
333        instrument_id,
334        price,
335        size,
336        aggressor_side,
337        trade_id,
338        ts_event,
339        ts_init,
340    )
341}
342
343#[must_use]
344pub fn parse_trade_bin_msg(
345    msg: &BitmexTradeBinMsg,
346    topic: &BitmexWsTopic,
347    price_precision: u8,
348    ts_init: UnixNanos,
349) -> Bar {
350    let instrument_id = parse_instrument_id(msg.symbol);
351    let spec = bar_spec_from_topic(topic);
352    let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
353
354    let open = Price::new(msg.open, price_precision);
355    let high = Price::new(msg.high, price_precision);
356    let low = Price::new(msg.low, price_precision);
357    let close = Price::new(msg.close, price_precision);
358    let volume = parse_contracts_quantity(msg.volume as u64);
359    let ts_event = UnixNanos::from(msg.timestamp);
360
361    Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
362}
363
364/// Converts a WebSocket topic to a bar specification.
365///
366/// # Panics
367///
368/// Panics if the topic is not a valid bar topic (`TradeBin1m`, `TradeBin5m`, `TradeBin1h`, or `TradeBin1d`).
369#[must_use]
370pub fn bar_spec_from_topic(topic: &BitmexWsTopic) -> BarSpecification {
371    match topic {
372        BitmexWsTopic::TradeBin1m => BAR_SPEC_1_MINUTE,
373        BitmexWsTopic::TradeBin5m => BAR_SPEC_5_MINUTE,
374        BitmexWsTopic::TradeBin1h => BAR_SPEC_1_HOUR,
375        BitmexWsTopic::TradeBin1d => BAR_SPEC_1_DAY,
376        _ => {
377            tracing::error!(topic = ?topic, "Bar specification not supported");
378            BAR_SPEC_1_MINUTE
379        }
380    }
381}
382
383/// Converts a bar specification to a WebSocket topic.
384///
385/// # Panics
386///
387/// Panics if the specification is not one of the supported values (1m, 5m, 1h, or 1d).
388#[must_use]
389pub fn topic_from_bar_spec(spec: BarSpecification) -> BitmexWsTopic {
390    match spec {
391        BAR_SPEC_1_MINUTE => BitmexWsTopic::TradeBin1m,
392        BAR_SPEC_5_MINUTE => BitmexWsTopic::TradeBin5m,
393        BAR_SPEC_1_HOUR => BitmexWsTopic::TradeBin1h,
394        BAR_SPEC_1_DAY => BitmexWsTopic::TradeBin1d,
395        _ => {
396            tracing::error!(spec = ?spec, "Bar specification not supported");
397            BitmexWsTopic::TradeBin1m
398        }
399    }
400}
401
402/// Parse a BitMEX WebSocket order message into a Nautilus `OrderStatusReport`.
403///
404/// # Panics
405///
406/// Panics if required fields are missing or invalid.
407///
408/// # References
409///
410/// <https://www.bitmex.com/app/wsAPI#Order>
411///
412/// # Errors
413///
414/// Returns an error if the time in force conversion fails.
415pub fn parse_order_msg(
416    msg: &BitmexOrderMsg,
417    price_precision: u8,
418) -> anyhow::Result<OrderStatusReport> {
419    let account_id = AccountId::new(format!("BITMEX-{}", msg.account)); // TODO: Revisit
420    let instrument_id = parse_instrument_id(msg.symbol);
421    let venue_order_id = VenueOrderId::new(msg.order_id.to_string());
422    let common_side: BitmexSide = msg.side.into();
423    let order_side: OrderSide = common_side.into();
424    let order_type: OrderType = msg.ord_type.into();
425    let time_in_force: TimeInForce = msg
426        .time_in_force
427        .try_into()
428        .map_err(|e| anyhow::anyhow!("{e}"))?;
429    let order_status: OrderStatus = msg.ord_status.into();
430    let quantity = Quantity::from(msg.order_qty);
431    let filled_qty = Quantity::from(msg.cum_qty);
432    let report_id = UUID4::new();
433    let ts_accepted =
434        parse_optional_datetime_to_unix_nanos(&Some(msg.transact_time), "transact_time");
435    let ts_last = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
436    let ts_init = get_atomic_clock_realtime().get_time_ns();
437
438    let mut report = OrderStatusReport::new(
439        account_id,
440        instrument_id,
441        None, // client_order_id - will be set later if present
442        venue_order_id,
443        order_side,
444        order_type,
445        time_in_force,
446        order_status,
447        quantity,
448        filled_qty,
449        ts_accepted,
450        ts_last,
451        ts_init,
452        Some(report_id),
453    );
454
455    if let Some(cl_ord_id) = &msg.cl_ord_id {
456        report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
457    }
458
459    if let Some(cl_ord_link_id) = &msg.cl_ord_link_id {
460        report = report.with_order_list_id(OrderListId::new(cl_ord_link_id));
461    }
462
463    if let Some(price) = msg.price {
464        report = report.with_price(Price::new(price, price_precision));
465    }
466
467    if let Some(avg_px) = msg.avg_px {
468        report = report.with_avg_px(avg_px);
469    }
470
471    if let Some(trigger_price) = msg.stop_px {
472        report = report.with_trigger_price(Price::new(trigger_price, price_precision));
473    }
474
475    if let Some(exec_inst) = &msg.exec_inst {
476        match exec_inst {
477            BitmexExecInstruction::ParticipateDoNotInitiate => {
478                report = report.with_post_only(true);
479            }
480            BitmexExecInstruction::ReduceOnly => {
481                report = report.with_reduce_only(true);
482            }
483            _ => {}
484        }
485    }
486
487    // Check if this is a canceled post-only order (BitMEX cancels instead of rejecting)
488    // We need to preserve the rejection reason for the execution client to handle
489    if order_status == OrderStatus::Canceled
490        && let Some(reason_str) = msg.ord_rej_reason.or(msg.text)
491    {
492        report = report.with_cancel_reason(reason_str.to_string());
493    }
494
495    Ok(report)
496}
497
498/// Parse a BitMEX WebSocket order update message into a Nautilus `OrderUpdated` event.
499///
500/// This handles partial updates where only changed fields are present.
501pub fn parse_order_update_msg(
502    msg: &BitmexOrderUpdateMsg,
503    price_precision: u8,
504    account_id: AccountId,
505) -> Option<OrderUpdated> {
506    // For BitMEX updates, we don't have trader_id or strategy_id from the exchange
507    // These will be populated by the execution engine when it matches the venue_order_id
508    let trader_id = TraderId::default();
509    let strategy_id = StrategyId::default();
510    let instrument_id = parse_instrument_id(msg.symbol);
511    let venue_order_id = Some(VenueOrderId::new(msg.order_id.to_string()));
512    let client_order_id = msg.cl_ord_id.map(ClientOrderId::new).unwrap_or_default();
513    let quantity = Quantity::zero(0); // Sentinel to signal no quantity update
514    let price = msg.price.map(|p| Price::new(p, price_precision));
515
516    // BitMEX doesn't send trigger price in regular order updates?
517    let trigger_price = None;
518
519    let event_id = UUID4::new();
520    let ts_event = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
521    let ts_init = get_atomic_clock_realtime().get_time_ns();
522
523    Some(nautilus_model::events::OrderUpdated::new(
524        trader_id,
525        strategy_id,
526        instrument_id,
527        client_order_id,
528        quantity,
529        event_id,
530        ts_event,
531        ts_init,
532        false, // reconciliation
533        venue_order_id,
534        Some(account_id),
535        price,
536        trigger_price,
537    ))
538}
539
540/// Parse a BitMEX WebSocket execution message into a Nautilus `FillReport`.
541///
542/// # Panics
543///
544/// Panics if required fields are missing or invalid.
545///
546/// # References
547///
548/// <https://www.bitmex.com/app/wsAPI#Execution>
549///
550pub fn parse_execution_msg(msg: BitmexExecutionMsg, price_precision: u8) -> Option<FillReport> {
551    if msg.exec_type != Some(BitmexExecType::Trade) {
552        return None;
553    }
554
555    let account_id = AccountId::new(format!("BITMEX-{}", msg.account?));
556    let instrument_id = parse_instrument_id(msg.symbol?);
557    let venue_order_id = VenueOrderId::new(msg.order_id?.to_string());
558    let trade_id = TradeId::new(msg.trd_match_id?.to_string());
559    let order_side: OrderSide = msg
560        .side
561        .map(|s| {
562            let side: BitmexSide = s.into();
563            side.into()
564        })
565        .unwrap_or(OrderSide::NoOrderSide);
566    let last_qty = Quantity::from(msg.last_qty?);
567    let last_px = Price::new(msg.last_px?, price_precision);
568    let settlement_currency_str = msg.settl_currency.unwrap_or(Ustr::from("XBT"));
569    let mapped_currency = map_bitmex_currency(settlement_currency_str.as_str());
570    let commission = Money::new(
571        msg.commission.unwrap_or(0.0),
572        Currency::from(mapped_currency.as_str()),
573    );
574    let liquidity_side = parse_liquidity_side(&msg.last_liquidity_ind);
575    let client_order_id = msg.cl_ord_id.map(ClientOrderId::new);
576    let venue_position_id = None; // Not applicable on BitMEX
577    let ts_event = parse_optional_datetime_to_unix_nanos(&msg.transact_time, "transact_time");
578    let ts_init = get_atomic_clock_realtime().get_time_ns();
579
580    Some(FillReport::new(
581        account_id,
582        instrument_id,
583        venue_order_id,
584        trade_id,
585        order_side,
586        last_qty,
587        last_px,
588        commission,
589        liquidity_side,
590        client_order_id,
591        venue_position_id,
592        ts_event,
593        ts_init,
594        None,
595    ))
596}
597
598/// Parse a BitMEX WebSocket position message into a Nautilus `PositionStatusReport`.
599///
600/// # References
601///
602/// <https://www.bitmex.com/app/wsAPI#Position>
603#[must_use]
604pub fn parse_position_msg(msg: BitmexPositionMsg) -> PositionStatusReport {
605    let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
606    let instrument_id = parse_instrument_id(msg.symbol);
607    let position_side = parse_position_side(msg.current_qty).as_specified();
608    let quantity = Quantity::from(msg.current_qty.map_or(0, i64::abs));
609    let venue_position_id = None; // Not applicable on BitMEX
610    let ts_last = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
611    let ts_init = get_atomic_clock_realtime().get_time_ns();
612
613    PositionStatusReport::new(
614        account_id,
615        instrument_id,
616        position_side,
617        quantity,
618        venue_position_id,
619        ts_last,
620        ts_init,
621        None,
622    )
623}
624
625/// Parse a BitMEX WebSocket instrument message for mark and index prices.
626///
627/// For index symbols (e.g., `.BXBT`):
628/// - Uses the `lastPrice` field as the index price.
629/// - Also emits the `markPrice` field (which equals `lastPrice` for indices).
630///
631/// For regular instruments:
632/// - Uses the `index_price` field for index price updates.
633/// - Uses the `mark_price` field for mark price updates.
634///
635/// Returns a Vec of Data containing mark and/or index price updates
636/// or an empty Vec if no relevant price is present.
637#[must_use]
638pub fn parse_instrument_msg(
639    msg: BitmexInstrumentMsg,
640    instruments_cache: &AHashMap<Ustr, InstrumentAny>,
641    ts_init: UnixNanos,
642) -> Vec<Data> {
643    let mut updates = Vec::new();
644    let is_index = is_index_symbol(&msg.symbol);
645
646    // For index symbols (like .BXBT), the lastPrice field contains the index price
647    // For regular instruments, use the explicit index_price field if present
648    let effective_index_price = if is_index {
649        msg.last_price
650    } else {
651        msg.index_price
652    };
653
654    // Return early if no relevant prices present (mark_price or effective_index_price)
655    // Note: effective_index_price uses lastPrice for index symbols, index_price for others
656    // (Funding rates come through a separate Funding channel)
657    if msg.mark_price.is_none() && effective_index_price.is_none() {
658        return updates;
659    }
660
661    let instrument_id = InstrumentId::new(Symbol::from_ustr_unchecked(msg.symbol), *BITMEX_VENUE);
662    let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
663
664    // Look up instrument for proper precision
665    let price_precision = match instruments_cache.get(&Ustr::from(&msg.symbol)) {
666        Some(instrument) => instrument.price_precision(),
667        None => {
668            // BitMEX sends updates for all instruments on the instrument channel,
669            // but we only cache instruments that are explicitly requested.
670            // Index instruments (starting with '.') are not loaded via regular API endpoints.
671            if is_index {
672                tracing::trace!(
673                    "Index instrument {} not in cache, skipping update",
674                    msg.symbol
675                );
676            } else {
677                tracing::debug!("Instrument {} not in cache, skipping update", msg.symbol);
678            }
679            return updates;
680        }
681    };
682
683    // Add mark price update if present
684    // For index symbols, markPrice equals lastPrice and is valid to emit
685    if let Some(mark_price) = msg.mark_price {
686        let price = Price::new(mark_price, price_precision);
687        updates.push(Data::MarkPriceUpdate(MarkPriceUpdate::new(
688            instrument_id,
689            price,
690            ts_event,
691            ts_init,
692        )));
693    }
694
695    // Add index price update if present
696    if let Some(index_price) = effective_index_price {
697        let price = Price::new(index_price, price_precision);
698        updates.push(Data::IndexPriceUpdate(IndexPriceUpdate::new(
699            instrument_id,
700            price,
701            ts_event,
702            ts_init,
703        )));
704    }
705
706    updates
707}
708
709/// Parse a BitMEX WebSocket funding message.
710///
711/// Returns `Some(FundingRateUpdate)` containing funding rate information.
712/// Note: This returns `FundingRateUpdate` directly, not wrapped in Data enum,
713/// to keep it separate from the FFI layer.
714pub fn parse_funding_msg(msg: BitmexFundingMsg, ts_init: UnixNanos) -> Option<FundingRateUpdate> {
715    let instrument_id = InstrumentId::from(format!("{}.BITMEX", msg.symbol).as_str());
716    let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
717
718    // Convert funding rate to Decimal
719    let rate = match Decimal::from_str(&msg.funding_rate.to_string()) {
720        Ok(rate) => rate,
721        Err(e) => {
722            tracing::error!("Failed to parse funding rate: {e}");
723            return None;
724        }
725    };
726
727    Some(FundingRateUpdate::new(
728        instrument_id,
729        rate,
730        None, // Next funding time not provided in this message
731        ts_event,
732        ts_init,
733    ))
734}
735
736/// Parse a BitMEX wallet message into an AccountState.
737///
738/// BitMEX uses XBT (satoshis) as the base unit for Bitcoin.
739/// 1 XBT = 0.00000001 BTC (1 satoshi).
740///
741/// # Panics
742///
743/// Panics if the balance calculation is invalid (total != locked + free).
744#[must_use]
745pub fn parse_wallet_msg(msg: BitmexWalletMsg, ts_init: UnixNanos) -> AccountState {
746    let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
747
748    // Map BitMEX currency to standard currency code
749    let currency_str = crate::common::parse::map_bitmex_currency(msg.currency.as_str());
750    let currency = Currency::from(currency_str.as_str());
751
752    // BitMEX returns values in satoshis for BTC (XBt) or microunits for USDT/LAMp
753    let divisor = if msg.currency == "XBt" {
754        100_000_000.0 // Satoshis to BTC
755    } else if msg.currency == "USDt" || msg.currency == "LAMp" {
756        1_000_000.0 // Microunits to units
757    } else {
758        1.0
759    };
760    let amount = msg.amount.unwrap_or(0) as f64 / divisor;
761
762    let total = Money::new(amount, currency);
763    let locked = Money::new(0.0, currency); // No locked amount info available
764    let free = total - locked;
765
766    let balance = AccountBalance::new_checked(total, locked, free)
767        .expect("Balance calculation should be valid");
768
769    AccountState::new(
770        account_id,
771        AccountType::Margin,
772        vec![balance],
773        vec![], // margins will be added separately
774        true,   // is_reported
775        UUID4::new(),
776        ts_init,
777        ts_init,
778        None,
779    )
780}
781
782/// Parse a BitMEX margin message into margin balance information.
783///
784/// This creates a MarginBalance that can be added to an AccountState.
785#[must_use]
786pub fn parse_margin_msg(msg: BitmexMarginMsg, instrument_id: InstrumentId) -> MarginBalance {
787    // Map BitMEX currency to standard currency code
788    let currency_str = crate::common::parse::map_bitmex_currency(msg.currency.as_str());
789    let currency = Currency::from(currency_str.as_str());
790
791    // BitMEX returns values in satoshis for BTC (XBt) or microunits for USDT/LAMp
792    let divisor = if msg.currency == "XBt" {
793        100_000_000.0 // Satoshis to BTC
794    } else if msg.currency == "USDt" || msg.currency == "LAMp" {
795        1_000_000.0 // Microunits to units
796    } else {
797        1.0
798    };
799
800    let initial = (msg.init_margin.unwrap_or(0) as f64 / divisor).max(0.0);
801    let maintenance = (msg.maint_margin.unwrap_or(0) as f64 / divisor).max(0.0);
802    let _unrealized = msg.unrealised_pnl.unwrap_or(0) as f64 / divisor;
803
804    MarginBalance::new(
805        Money::new(initial, currency),
806        Money::new(maintenance, currency),
807        instrument_id,
808    )
809}
810
811////////////////////////////////////////////////////////////////////////////////
812// Tests
813////////////////////////////////////////////////////////////////////////////////
814
815#[cfg(test)]
816mod tests {
817    use nautilus_model::{
818        data::quote::QuoteTick,
819        enums::{
820            AggressorSide, BookAction, LiquiditySide, OrderStatus, OrderType, PositionSide,
821            TimeInForce,
822        },
823        identifiers::{InstrumentId, Symbol},
824        instruments::{CryptoPerpetual, any::InstrumentAny},
825        types::{Currency, Price, Quantity},
826    };
827    use rstest::rstest;
828
829    use super::*;
830    use crate::common::{
831        enums::{BitmexExecType, BitmexOrderStatus},
832        testing::load_test_json,
833    };
834
835    // Helper function to create a test perpetual instrument for tests
836    fn create_test_perpetual_instrument() -> InstrumentAny {
837        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
838            InstrumentId::from("XBTUSD.BITMEX"),
839            Symbol::new("XBTUSD"),
840            Currency::BTC(),
841            Currency::USD(),
842            Currency::BTC(),
843            true, // is_inverse
844            1,    // price_precision
845            0,    // size_precision
846            Price::from("0.5"),
847            Quantity::from(1),
848            None, // multiplier
849            None, // lot_size
850            None, // max_quantity
851            None, // min_quantity
852            None, // max_notional
853            None, // min_notional
854            None, // max_price
855            None, // min_price
856            None, // margin_init
857            None, // margin_maint
858            None, // maker_fee
859            None, // taker_fee
860            UnixNanos::default(),
861            UnixNanos::default(),
862        ))
863    }
864
865    #[rstest]
866    fn test_orderbook_l2_message() {
867        let json_data = load_test_json("ws_orderbook_l2.json");
868
869        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
870        let msg: BitmexOrderBookMsg = serde_json::from_str(&json_data).unwrap();
871
872        // Test Insert action
873        let delta = parse_book_msg(&msg, &BitmexAction::Insert, 1, UnixNanos::from(3));
874        assert_eq!(delta.instrument_id, instrument_id);
875        assert_eq!(delta.order.price, Price::from("98459.9"));
876        assert_eq!(delta.order.size, Quantity::from(33000));
877        assert_eq!(delta.order.side, OrderSide::Sell);
878        assert_eq!(delta.order.order_id, 62400580205);
879        assert_eq!(delta.action, BookAction::Add);
880        assert_eq!(delta.flags, RecordFlag::F_SNAPSHOT as u8);
881        assert_eq!(delta.sequence, 0);
882        assert_eq!(delta.ts_event, 1732436782275000000); // 2024-11-24T08:26:22.275Z in nanos
883        assert_eq!(delta.ts_init, 3);
884
885        // Test Update action (should have different flags)
886        let delta = parse_book_msg(&msg, &BitmexAction::Update, 1, UnixNanos::from(3));
887        assert_eq!(delta.flags, 0);
888        assert_eq!(delta.action, BookAction::Update);
889    }
890
891    #[rstest]
892    fn test_orderbook10_message() {
893        let json_data = load_test_json("ws_orderbook_10.json");
894        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
895        let msg: BitmexOrderBook10Msg = serde_json::from_str(&json_data).unwrap();
896        let depth10 = parse_book10_msg(&msg, 1, UnixNanos::from(3));
897
898        assert_eq!(depth10.instrument_id, instrument_id);
899
900        // Check first bid level
901        assert_eq!(depth10.bids[0].price, Price::from("98490.3"));
902        assert_eq!(depth10.bids[0].size, Quantity::from(22400));
903        assert_eq!(depth10.bids[0].side, OrderSide::Buy);
904
905        // Check first ask level
906        assert_eq!(depth10.asks[0].price, Price::from("98490.4"));
907        assert_eq!(depth10.asks[0].size, Quantity::from(17600));
908        assert_eq!(depth10.asks[0].side, OrderSide::Sell);
909
910        // Check counts (should be 1 for each populated level)
911        assert_eq!(depth10.bid_counts, [1; DEPTH10_LEN]);
912        assert_eq!(depth10.ask_counts, [1; DEPTH10_LEN]);
913
914        // Check flags and timestamps
915        assert_eq!(depth10.sequence, 0);
916        assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
917        assert_eq!(depth10.ts_event, 1732436353513000000); // 2024-11-24T08:19:13.513Z in nanos
918        assert_eq!(depth10.ts_init, 3);
919    }
920
921    #[rstest]
922    fn test_quote_message() {
923        let json_data = load_test_json("ws_quote.json");
924
925        let instrument_id = InstrumentId::from("BCHUSDT.BITMEX");
926        let last_quote = QuoteTick::new(
927            instrument_id,
928            Price::new(487.50, 2),
929            Price::new(488.20, 2),
930            Quantity::from(100_000),
931            Quantity::from(100_000),
932            UnixNanos::from(1),
933            UnixNanos::from(2),
934        );
935        let msg: BitmexQuoteMsg = serde_json::from_str(&json_data).unwrap();
936        let quote = parse_quote_msg(&msg, &last_quote, 2, UnixNanos::from(3));
937
938        assert_eq!(quote.instrument_id, instrument_id);
939        assert_eq!(quote.bid_price, Price::from("487.55"));
940        assert_eq!(quote.ask_price, Price::from("488.25"));
941        assert_eq!(quote.bid_size, Quantity::from(103_000));
942        assert_eq!(quote.ask_size, Quantity::from(50_000));
943        assert_eq!(quote.ts_event, 1732315465085000000);
944        assert_eq!(quote.ts_init, 3);
945    }
946
947    #[rstest]
948    fn test_trade_message() {
949        let json_data = load_test_json("ws_trade.json");
950
951        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
952        let msg: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
953        let trade = parse_trade_msg(&msg, 1, UnixNanos::from(3));
954
955        assert_eq!(trade.instrument_id, instrument_id);
956        assert_eq!(trade.price, Price::from("98570.9"));
957        assert_eq!(trade.size, Quantity::from(100));
958        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
959        assert_eq!(
960            trade.trade_id.to_string(),
961            "00000000-006d-1000-0000-000e8737d536"
962        );
963        assert_eq!(trade.ts_event, 1732436138704000000); // 2024-11-24T08:15:38.704Z in nanos
964        assert_eq!(trade.ts_init, 3);
965    }
966
967    #[rstest]
968    fn test_trade_bin_message() {
969        let json_data = load_test_json("ws_trade_bin_1m.json");
970
971        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
972        let topic = BitmexWsTopic::TradeBin1m;
973
974        let msg: BitmexTradeBinMsg = serde_json::from_str(&json_data).unwrap();
975        let bar = parse_trade_bin_msg(&msg, &topic, 1, UnixNanos::from(3));
976
977        assert_eq!(bar.instrument_id(), instrument_id);
978        assert_eq!(
979            bar.bar_type.spec(),
980            BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
981        );
982        assert_eq!(bar.open, Price::from("97550.0"));
983        assert_eq!(bar.high, Price::from("97584.4"));
984        assert_eq!(bar.low, Price::from("97550.0"));
985        assert_eq!(bar.close, Price::from("97570.1"));
986        assert_eq!(bar.volume, Quantity::from(84_000));
987        assert_eq!(bar.ts_event, 1732392420000000000); // 2024-11-23T20:07:00.000Z in nanos
988        assert_eq!(bar.ts_init, 3);
989    }
990
991    #[rstest]
992    fn test_parse_order_msg() {
993        let json_data = load_test_json("ws_order.json");
994        let msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
995        let report = parse_order_msg(&msg, 1).unwrap();
996
997        assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
998        assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
999        assert_eq!(
1000            report.venue_order_id.to_string(),
1001            "550e8400-e29b-41d4-a716-446655440001"
1002        );
1003        assert_eq!(
1004            report.client_order_id.unwrap().to_string(),
1005            "mm_bitmex_1a/oemUeQ4CAJZgP3fjHsA"
1006        );
1007        assert_eq!(report.order_side, OrderSide::Buy);
1008        assert_eq!(report.order_type, OrderType::Limit);
1009        assert_eq!(report.time_in_force, TimeInForce::Gtc);
1010        assert_eq!(report.order_status, OrderStatus::Accepted);
1011        assert_eq!(report.quantity, Quantity::from(100));
1012        assert_eq!(report.filled_qty, Quantity::from(0));
1013        assert_eq!(report.price.unwrap(), Price::from("98000.0"));
1014        assert_eq!(report.ts_accepted, 1732530600000000000); // 2024-11-25T10:30:00.000Z
1015    }
1016
1017    #[rstest]
1018    fn test_parse_execution_msg() {
1019        let json_data = load_test_json("ws_execution.json");
1020        let msg: BitmexExecutionMsg = serde_json::from_str(&json_data).unwrap();
1021        let fill = parse_execution_msg(msg, 1).unwrap();
1022
1023        assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1024        assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1025        assert_eq!(
1026            fill.venue_order_id.to_string(),
1027            "550e8400-e29b-41d4-a716-446655440002"
1028        );
1029        assert_eq!(
1030            fill.trade_id.to_string(),
1031            "00000000-006d-1000-0000-000e8737d540"
1032        );
1033        assert_eq!(
1034            fill.client_order_id.unwrap().to_string(),
1035            "mm_bitmex_2b/oemUeQ4CAJZgP3fjHsB"
1036        );
1037        assert_eq!(fill.order_side, OrderSide::Sell);
1038        assert_eq!(fill.last_qty, Quantity::from(100));
1039        assert_eq!(fill.last_px, Price::from("98950.0"));
1040        assert_eq!(fill.liquidity_side, LiquiditySide::Maker);
1041        assert_eq!(fill.commission, Money::new(0.00075, Currency::from("XBT")));
1042        assert_eq!(fill.commission.currency.code.to_string(), "XBT");
1043        assert_eq!(fill.ts_event, 1732530900789000000); // 2024-11-25T10:35:00.789Z
1044    }
1045
1046    #[rstest]
1047    fn test_parse_execution_msg_non_trade() {
1048        // Test that non-trade executions return None
1049        let mut msg: BitmexExecutionMsg =
1050            serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1051        msg.exec_type = Some(BitmexExecType::Settlement);
1052
1053        let result = parse_execution_msg(msg, 1);
1054        assert!(result.is_none());
1055    }
1056
1057    #[rstest]
1058    fn test_parse_cancel_reject_execution() {
1059        // Test that CancelReject messages can be parsed (even without symbol)
1060        let json = r#"{
1061            "execID":"00000000-006d-1000-0000-001e7f5081ad",
1062            "orderID":"ece0a2cc-7729-4f4c-bc6c-65d7c723e75b",
1063            "account":1667725,
1064            "execType":"CancelReject",
1065            "ordStatus":"Rejected",
1066            "workingIndicator":false,
1067            "ordRejReason":"Invalid orderID",
1068            "text":"Invalid orderID",
1069            "transactTime":"2025-09-05T05:38:28.001Z",
1070            "timestamp":"2025-09-05T05:38:28.001Z"
1071        }"#;
1072
1073        let msg: BitmexExecutionMsg = serde_json::from_str(json).unwrap();
1074        assert_eq!(msg.exec_type, Some(BitmexExecType::CancelReject));
1075        assert_eq!(msg.ord_status, Some(BitmexOrderStatus::Rejected));
1076        assert_eq!(msg.symbol, None);
1077
1078        // Should return None since it's not a Trade
1079        let result = parse_execution_msg(msg, 1);
1080        assert!(result.is_none());
1081    }
1082
1083    #[rstest]
1084    fn test_parse_position_msg() {
1085        let json_data = load_test_json("ws_position.json");
1086        let msg: BitmexPositionMsg = serde_json::from_str(&json_data).unwrap();
1087        let report = parse_position_msg(msg);
1088
1089        assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1090        assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1091        assert_eq!(report.position_side.as_position_side(), PositionSide::Long);
1092        assert_eq!(report.quantity, Quantity::from(1000));
1093        assert!(report.venue_position_id.is_none());
1094        assert_eq!(report.ts_last, 1732530900789000000); // 2024-11-25T10:35:00.789Z
1095    }
1096
1097    #[rstest]
1098    fn test_parse_position_msg_short() {
1099        let mut msg: BitmexPositionMsg =
1100            serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1101        msg.current_qty = Some(-500);
1102
1103        let report = parse_position_msg(msg);
1104        assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
1105        assert_eq!(report.quantity, Quantity::from(500));
1106    }
1107
1108    #[rstest]
1109    fn test_parse_position_msg_flat() {
1110        let mut msg: BitmexPositionMsg =
1111            serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1112        msg.current_qty = Some(0);
1113
1114        let report = parse_position_msg(msg);
1115        assert_eq!(report.position_side.as_position_side(), PositionSide::Flat);
1116        assert_eq!(report.quantity, Quantity::from(0));
1117    }
1118
1119    #[rstest]
1120    fn test_parse_wallet_msg() {
1121        let json_data = load_test_json("ws_wallet.json");
1122        let msg: BitmexWalletMsg = serde_json::from_str(&json_data).unwrap();
1123        let ts_init = UnixNanos::from(1);
1124        let account_state = parse_wallet_msg(msg, ts_init);
1125
1126        assert_eq!(account_state.account_id.to_string(), "BITMEX-1234567");
1127        assert!(!account_state.balances.is_empty());
1128        let balance = &account_state.balances[0];
1129        assert_eq!(balance.currency.code.to_string(), "XBT");
1130        // Amount should be converted from satoshis (100005180 / 100_000_000.0 = 1.0000518)
1131        assert!((balance.total.as_f64() - 1.0000518).abs() < 1e-7);
1132    }
1133
1134    #[rstest]
1135    fn test_parse_wallet_msg_no_amount() {
1136        let mut msg: BitmexWalletMsg =
1137            serde_json::from_str(&load_test_json("ws_wallet.json")).unwrap();
1138        msg.amount = None;
1139
1140        let ts_init = UnixNanos::from(1);
1141        let account_state = parse_wallet_msg(msg, ts_init);
1142        let balance = &account_state.balances[0];
1143        assert_eq!(balance.total.as_f64(), 0.0);
1144    }
1145
1146    #[rstest]
1147    fn test_parse_margin_msg() {
1148        let json_data = load_test_json("ws_margin.json");
1149        let msg: BitmexMarginMsg = serde_json::from_str(&json_data).unwrap();
1150        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1151        let margin_balance = parse_margin_msg(msg, instrument_id);
1152
1153        assert_eq!(margin_balance.currency.code.to_string(), "XBT");
1154        assert_eq!(margin_balance.instrument_id, instrument_id);
1155        // Values should be converted from satoshis to BTC
1156        // initMargin is 0 in test data, so should be 0.0
1157        assert_eq!(margin_balance.initial.as_f64(), 0.0);
1158        // maintMargin is 15949 satoshis = 0.00015949 BTC
1159        assert!((margin_balance.maintenance.as_f64() - 0.00015949).abs() < 1e-8);
1160    }
1161
1162    #[rstest]
1163    fn test_parse_margin_msg_no_available() {
1164        let mut msg: BitmexMarginMsg =
1165            serde_json::from_str(&load_test_json("ws_margin.json")).unwrap();
1166        msg.available_margin = None;
1167
1168        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1169        let margin_balance = parse_margin_msg(msg, instrument_id);
1170        // Should still have valid margin values even if available_margin is None
1171        assert!(margin_balance.initial.as_f64() >= 0.0);
1172        assert!(margin_balance.maintenance.as_f64() >= 0.0);
1173    }
1174
1175    #[rstest]
1176    fn test_parse_instrument_msg_both_prices() {
1177        let json_data = load_test_json("ws_instrument.json");
1178        let msg: BitmexInstrumentMsg = serde_json::from_str(&json_data).unwrap();
1179
1180        // Create cache with test instrument
1181        let mut instruments_cache = AHashMap::new();
1182        let test_instrument = create_test_perpetual_instrument();
1183        instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1184
1185        let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1186
1187        // XBTUSD is not an index symbol, so it should have both mark and index prices
1188        assert_eq!(updates.len(), 2);
1189
1190        // Check mark price update
1191        match &updates[0] {
1192            Data::MarkPriceUpdate(update) => {
1193                assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1194                assert_eq!(update.value.as_f64(), 95125.7);
1195            }
1196            _ => panic!("Expected MarkPriceUpdate at index 0"),
1197        }
1198
1199        // Check index price update
1200        match &updates[1] {
1201            Data::IndexPriceUpdate(update) => {
1202                assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1203                assert_eq!(update.value.as_f64(), 95124.3);
1204            }
1205            _ => panic!("Expected IndexPriceUpdate at index 1"),
1206        }
1207    }
1208
1209    #[rstest]
1210    fn test_parse_instrument_msg_mark_price_only() {
1211        let mut msg: BitmexInstrumentMsg =
1212            serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1213        msg.index_price = None;
1214
1215        // Create cache with test instrument
1216        let mut instruments_cache = AHashMap::new();
1217        let test_instrument = create_test_perpetual_instrument();
1218        instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1219
1220        let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1221
1222        assert_eq!(updates.len(), 1);
1223        match &updates[0] {
1224            Data::MarkPriceUpdate(update) => {
1225                assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1226                assert_eq!(update.value.as_f64(), 95125.7);
1227            }
1228            _ => panic!("Expected MarkPriceUpdate"),
1229        }
1230    }
1231
1232    #[rstest]
1233    fn test_parse_instrument_msg_index_price_only() {
1234        let mut msg: BitmexInstrumentMsg =
1235            serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1236        msg.mark_price = None;
1237
1238        // Create cache with test instrument
1239        let mut instruments_cache = AHashMap::new();
1240        let test_instrument = create_test_perpetual_instrument();
1241        instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1242
1243        let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1244
1245        assert_eq!(updates.len(), 1);
1246        match &updates[0] {
1247            Data::IndexPriceUpdate(update) => {
1248                assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1249                assert_eq!(update.value.as_f64(), 95124.3);
1250            }
1251            _ => panic!("Expected IndexPriceUpdate"),
1252        }
1253    }
1254
1255    #[rstest]
1256    fn test_parse_instrument_msg_no_prices() {
1257        let mut msg: BitmexInstrumentMsg =
1258            serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1259        msg.mark_price = None;
1260        msg.index_price = None;
1261        msg.last_price = None;
1262
1263        // Create cache with test instrument
1264        let mut instruments_cache = AHashMap::new();
1265        let test_instrument = create_test_perpetual_instrument();
1266        instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1267
1268        let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1269        assert_eq!(updates.len(), 0);
1270    }
1271
1272    #[rstest]
1273    fn test_parse_instrument_msg_index_symbol() {
1274        // Test for index symbols like .BXBT where lastPrice is the index price
1275        // and markPrice equals lastPrice
1276        let mut msg: BitmexInstrumentMsg =
1277            serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1278        msg.symbol = Ustr::from(".BXBT");
1279        msg.last_price = Some(119163.05);
1280        msg.mark_price = Some(119163.05); // Index symbols have mark price equal to last price
1281        msg.index_price = None;
1282
1283        // Create instruments cache with proper precision for .BXBT
1284        let instrument_id = InstrumentId::from(".BXBT.BITMEX");
1285        let instrument = CryptoPerpetual::new(
1286            instrument_id,
1287            Symbol::from(".BXBT"),
1288            Currency::BTC(),
1289            Currency::USD(),
1290            Currency::USD(),
1291            false, // is_inverse
1292            2,     // price_precision (for 119163.05)
1293            8,     // size_precision
1294            Price::from("0.01"),
1295            Quantity::from("0.00000001"),
1296            None,                 // multiplier
1297            None,                 // lot_size
1298            None,                 // max_quantity
1299            None,                 // min_quantity
1300            None,                 // max_notional
1301            None,                 // min_notional
1302            None,                 // max_price
1303            None,                 // min_price
1304            None,                 // margin_init
1305            None,                 // margin_maint
1306            None,                 // maker_fee
1307            None,                 // taker_fee
1308            UnixNanos::default(), // ts_event
1309            UnixNanos::default(), // ts_init
1310        );
1311        let mut instruments_cache = AHashMap::new();
1312        instruments_cache.insert(
1313            Ustr::from(".BXBT"),
1314            InstrumentAny::CryptoPerpetual(instrument),
1315        );
1316
1317        let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1318
1319        assert_eq!(updates.len(), 2);
1320
1321        // Check mark price update
1322        match &updates[0] {
1323            Data::MarkPriceUpdate(update) => {
1324                assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1325                assert_eq!(update.value, Price::from("119163.05"));
1326            }
1327            _ => panic!("Expected MarkPriceUpdate for index symbol"),
1328        }
1329
1330        // Check index price update
1331        match &updates[1] {
1332            Data::IndexPriceUpdate(update) => {
1333                assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1334                assert_eq!(update.value, Price::from("119163.05"));
1335                assert_eq!(update.ts_init, UnixNanos::from(1));
1336            }
1337            _ => panic!("Expected IndexPriceUpdate for index symbol"),
1338        }
1339    }
1340
1341    #[rstest]
1342    fn test_parse_funding_msg() {
1343        let json_data = load_test_json("ws_funding_rate.json");
1344        let msg: BitmexFundingMsg = serde_json::from_str(&json_data).unwrap();
1345        let update = parse_funding_msg(msg, UnixNanos::from(1));
1346
1347        assert!(update.is_some());
1348        let update = update.unwrap();
1349
1350        assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1351        assert_eq!(update.rate.to_string(), "0.0001");
1352        assert!(update.next_funding_ns.is_none());
1353        assert_eq!(update.ts_event, UnixNanos::from(1732507200000000000));
1354        assert_eq!(update.ts_init, UnixNanos::from(1));
1355    }
1356}