nautilus_bitmex/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsers that convert BitMEX WebSocket payloads into Nautilus data structures.
17
18use std::{num::NonZero, str::FromStr};
19
20use ahash::AHashMap;
21use dashmap::DashMap;
22use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime, uuid::UUID4};
23use nautilus_model::{
24    data::{
25        Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
26        MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
27        depth::DEPTH10_LEN,
28    },
29    enums::{
30        AccountType, AggregationSource, BarAggregation, OrderSide, OrderStatus, OrderType,
31        PriceType, RecordFlag, TimeInForce, TriggerType,
32    },
33    events::{OrderUpdated, account::state::AccountState},
34    identifiers::{
35        AccountId, ClientOrderId, InstrumentId, OrderListId, StrategyId, Symbol, TradeId, TraderId,
36        VenueOrderId,
37    },
38    instruments::{Instrument, InstrumentAny},
39    reports::{FillReport, OrderStatusReport, PositionStatusReport},
40    types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
41};
42use rust_decimal::{Decimal, prelude::FromPrimitive};
43use ustr::Ustr;
44use uuid::Uuid;
45
46use super::{
47    enums::{BitmexAction, BitmexWsTopic},
48    messages::{
49        BitmexExecutionMsg, BitmexFundingMsg, BitmexInstrumentMsg, BitmexMarginMsg,
50        BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexOrderMsg, BitmexPositionMsg,
51        BitmexQuoteMsg, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg,
52    },
53};
54use crate::{
55    common::{
56        consts::BITMEX_VENUE,
57        enums::{BitmexExecInstruction, BitmexExecType, BitmexSide},
58        parse::{
59            clean_reason, map_bitmex_currency, normalize_trade_bin_prices,
60            normalize_trade_bin_volume, parse_contracts_quantity, parse_fractional_quantity,
61            parse_instrument_id, parse_liquidity_side, parse_optional_datetime_to_unix_nanos,
62            parse_position_side, parse_signed_contracts_quantity,
63        },
64    },
65    websocket::messages::BitmexOrderUpdateMsg,
66};
67
68const BAR_SPEC_1_MINUTE: BarSpecification = BarSpecification {
69    step: NonZero::new(1).expect("1 is a valid non-zero usize"),
70    aggregation: BarAggregation::Minute,
71    price_type: PriceType::Last,
72};
73const BAR_SPEC_5_MINUTE: BarSpecification = BarSpecification {
74    step: NonZero::new(5).expect("5 is a valid non-zero usize"),
75    aggregation: BarAggregation::Minute,
76    price_type: PriceType::Last,
77};
78const BAR_SPEC_1_HOUR: BarSpecification = BarSpecification {
79    step: NonZero::new(1).expect("1 is a valid non-zero usize"),
80    aggregation: BarAggregation::Hour,
81    price_type: PriceType::Last,
82};
83const BAR_SPEC_1_DAY: BarSpecification = BarSpecification {
84    step: NonZero::new(1).expect("1 is a valid non-zero usize"),
85    aggregation: BarAggregation::Day,
86    price_type: PriceType::Last,
87};
88
89/// Check if a symbol is an index symbol (starts with '.').
90///
91/// Index symbols in BitMEX represent indices like `.BXBT` and have different
92/// behavior from regular instruments:
93/// - They only have a single price value (no bid/ask spread).
94/// - They don't have trades or quotes.
95/// - Their price is delivered via the `lastPrice` field.
96#[inline]
97#[must_use]
98pub fn is_index_symbol(symbol: &Ustr) -> bool {
99    symbol.starts_with('.')
100}
101
102/// Converts a batch of BitMEX order-book rows into Nautilus delta events.
103#[must_use]
104pub fn parse_book_msg_vec(
105    data: Vec<BitmexOrderBookMsg>,
106    action: BitmexAction,
107    instruments: &AHashMap<Ustr, InstrumentAny>,
108    ts_init: UnixNanos,
109) -> Vec<Data> {
110    let mut deltas = Vec::with_capacity(data.len());
111
112    for msg in data {
113        if let Some(instrument) = instruments.get(&msg.symbol) {
114            let instrument_id = instrument.id();
115            let price_precision = instrument.price_precision();
116            deltas.push(Data::Delta(parse_book_msg(
117                &msg,
118                &action,
119                instrument,
120                instrument_id,
121                price_precision,
122                ts_init,
123            )));
124        } else {
125            tracing::warn!(symbol = %msg.symbol, "Instrument not found in cache for book delta");
126        }
127    }
128    deltas
129}
130
131/// Converts BitMEX level-10 snapshots into Nautilus depth events.
132#[must_use]
133pub fn parse_book10_msg_vec(
134    data: Vec<BitmexOrderBook10Msg>,
135    instruments: &AHashMap<Ustr, InstrumentAny>,
136    ts_init: UnixNanos,
137) -> Vec<Data> {
138    let mut depths = Vec::with_capacity(data.len());
139
140    for msg in data {
141        if let Some(instrument) = instruments.get(&msg.symbol) {
142            let instrument_id = instrument.id();
143            let price_precision = instrument.price_precision();
144            depths.push(Data::Depth10(Box::new(parse_book10_msg(
145                &msg,
146                instrument,
147                instrument_id,
148                price_precision,
149                ts_init,
150            ))));
151        } else {
152            tracing::warn!(symbol = %msg.symbol, "Instrument not found in cache for depth10");
153        }
154    }
155    depths
156}
157
158/// Converts BitMEX trade messages into Nautilus trade data events.
159#[must_use]
160pub fn parse_trade_msg_vec(
161    data: Vec<BitmexTradeMsg>,
162    instruments: &AHashMap<Ustr, InstrumentAny>,
163    ts_init: UnixNanos,
164) -> Vec<Data> {
165    let mut trades = Vec::with_capacity(data.len());
166
167    for msg in data {
168        if let Some(instrument) = instruments.get(&msg.symbol) {
169            let instrument_id = instrument.id();
170            let price_precision = instrument.price_precision();
171            trades.push(Data::Trade(parse_trade_msg(
172                &msg,
173                instrument,
174                instrument_id,
175                price_precision,
176                ts_init,
177            )));
178        } else {
179            tracing::warn!(symbol = %msg.symbol, "Instrument not found in cache for trade");
180        }
181    }
182    trades
183}
184
185/// Converts aggregated trade-bin messages into Nautilus data events.
186#[must_use]
187pub fn parse_trade_bin_msg_vec(
188    data: Vec<BitmexTradeBinMsg>,
189    topic: BitmexWsTopic,
190    instruments: &AHashMap<Ustr, InstrumentAny>,
191    ts_init: UnixNanos,
192) -> Vec<Data> {
193    let mut trades = Vec::with_capacity(data.len());
194
195    for msg in data {
196        if let Some(instrument) = instruments.get(&msg.symbol) {
197            let instrument_id = instrument.id();
198            let price_precision = instrument.price_precision();
199            trades.push(Data::Bar(parse_trade_bin_msg(
200                &msg,
201                &topic,
202                instrument,
203                instrument_id,
204                price_precision,
205                ts_init,
206            )));
207        } else {
208            tracing::warn!(symbol = %msg.symbol, "Instrument not found in cache for trade bin");
209        }
210    }
211    trades
212}
213
214/// Converts a BitMEX order book row into a Nautilus order-book delta.
215#[allow(clippy::too_many_arguments)]
216#[must_use]
217pub fn parse_book_msg(
218    msg: &BitmexOrderBookMsg,
219    action: &BitmexAction,
220    instrument: &InstrumentAny,
221    instrument_id: InstrumentId,
222    price_precision: u8,
223    ts_init: UnixNanos,
224) -> OrderBookDelta {
225    let flags = if action == &BitmexAction::Insert {
226        RecordFlag::F_SNAPSHOT as u8
227    } else {
228        0
229    };
230
231    let action = action.as_book_action();
232    let price = Price::new(msg.price, price_precision);
233    let side = msg.side.as_order_side();
234    let size = parse_contracts_quantity(msg.size.unwrap_or(0), instrument);
235    let order_id = msg.id;
236    let order = BookOrder::new(side, price, size, order_id);
237    let sequence = 0; // Not available
238    let ts_event = UnixNanos::from(msg.timestamp);
239
240    OrderBookDelta::new(
241        instrument_id,
242        action,
243        order,
244        flags,
245        sequence,
246        ts_event,
247        ts_init,
248    )
249}
250
251/// Parses an `OrderBook10` message into an `OrderBookDepth10` object.
252///
253/// # Panics
254///
255/// Panics if the bid or ask arrays cannot be converted to exactly 10 elements.
256#[allow(clippy::too_many_arguments)]
257#[must_use]
258pub fn parse_book10_msg(
259    msg: &BitmexOrderBook10Msg,
260    instrument: &InstrumentAny,
261    instrument_id: InstrumentId,
262    price_precision: u8,
263    ts_init: UnixNanos,
264) -> OrderBookDepth10 {
265    let mut bids = Vec::with_capacity(DEPTH10_LEN);
266    let mut asks = Vec::with_capacity(DEPTH10_LEN);
267
268    // Initialized with zeros
269    let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
270    let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
271
272    for (i, level) in msg.bids.iter().enumerate() {
273        let bid_order = BookOrder::new(
274            OrderSide::Buy,
275            Price::new(level[0], price_precision),
276            parse_fractional_quantity(level[1], instrument),
277            0,
278        );
279
280        bids.push(bid_order);
281        bid_counts[i] = 1;
282    }
283
284    for (i, level) in msg.asks.iter().enumerate() {
285        let ask_order = BookOrder::new(
286            OrderSide::Sell,
287            Price::new(level[0], price_precision),
288            parse_fractional_quantity(level[1], instrument),
289            0,
290        );
291
292        asks.push(ask_order);
293        ask_counts[i] = 1;
294    }
295
296    let bids: [BookOrder; DEPTH10_LEN] = bids
297        .try_into()
298        .inspect_err(|v: &Vec<BookOrder>| {
299            tracing::error!("Bids length mismatch: expected 10, was {}", v.len());
300        })
301        .expect("BitMEX orderBook10 should always have exactly 10 bid levels");
302    let asks: [BookOrder; DEPTH10_LEN] = asks
303        .try_into()
304        .inspect_err(|v: &Vec<BookOrder>| {
305            tracing::error!("Asks length mismatch: expected 10, was {}", v.len());
306        })
307        .expect("BitMEX orderBook10 should always have exactly 10 ask levels");
308
309    let ts_event = UnixNanos::from(msg.timestamp);
310
311    OrderBookDepth10::new(
312        instrument_id,
313        bids,
314        asks,
315        bid_counts,
316        ask_counts,
317        RecordFlag::F_SNAPSHOT as u8,
318        0, // Not applicable for BitMEX L2 books
319        ts_event,
320        ts_init,
321    )
322}
323
324/// Converts a BitMEX quote message into a `QuoteTick`, filling missing data from cache.
325#[must_use]
326pub fn parse_quote_msg(
327    msg: &BitmexQuoteMsg,
328    last_quote: &QuoteTick,
329    instrument: &InstrumentAny,
330    instrument_id: InstrumentId,
331    price_precision: u8,
332    ts_init: UnixNanos,
333) -> QuoteTick {
334    let bid_price = match msg.bid_price {
335        Some(price) => Price::new(price, price_precision),
336        None => last_quote.bid_price,
337    };
338
339    let ask_price = match msg.ask_price {
340        Some(price) => Price::new(price, price_precision),
341        None => last_quote.ask_price,
342    };
343
344    let bid_size = match msg.bid_size {
345        Some(size) => parse_contracts_quantity(size, instrument),
346        None => last_quote.bid_size,
347    };
348
349    let ask_size = match msg.ask_size {
350        Some(size) => parse_contracts_quantity(size, instrument),
351        None => last_quote.ask_size,
352    };
353
354    let ts_event = UnixNanos::from(msg.timestamp);
355
356    QuoteTick::new(
357        instrument_id,
358        bid_price,
359        ask_price,
360        bid_size,
361        ask_size,
362        ts_event,
363        ts_init,
364    )
365}
366
367/// Converts a BitMEX trade message into a `TradeTick`.
368#[must_use]
369pub fn parse_trade_msg(
370    msg: &BitmexTradeMsg,
371    instrument: &InstrumentAny,
372    instrument_id: InstrumentId,
373    price_precision: u8,
374    ts_init: UnixNanos,
375) -> TradeTick {
376    let price = Price::new(msg.price, price_precision);
377    let size = parse_contracts_quantity(msg.size, instrument);
378    let aggressor_side = msg.side.as_aggressor_side();
379    let trade_id = TradeId::new(
380        msg.trd_match_id
381            .map_or_else(|| Uuid::new_v4().to_string(), |uuid| uuid.to_string()),
382    );
383    let ts_event = UnixNanos::from(msg.timestamp);
384
385    TradeTick::new(
386        instrument_id,
387        price,
388        size,
389        aggressor_side,
390        trade_id,
391        ts_event,
392        ts_init,
393    )
394}
395
396/// Converts a BitMEX trade-bin summary into a `Bar` for the matching topic.
397#[must_use]
398pub fn parse_trade_bin_msg(
399    msg: &BitmexTradeBinMsg,
400    topic: &BitmexWsTopic,
401    instrument: &InstrumentAny,
402    instrument_id: InstrumentId,
403    price_precision: u8,
404    ts_init: UnixNanos,
405) -> Bar {
406    let spec = bar_spec_from_topic(topic);
407    let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
408
409    let open = Price::new(msg.open, price_precision);
410    let high = Price::new(msg.high, price_precision);
411    let low = Price::new(msg.low, price_precision);
412    let close = Price::new(msg.close, price_precision);
413
414    let (open, high, low, close) =
415        normalize_trade_bin_prices(open, high, low, close, &msg.symbol, Some(&bar_type));
416
417    let volume_contracts = normalize_trade_bin_volume(Some(msg.volume), &msg.symbol);
418    let volume = parse_contracts_quantity(volume_contracts, instrument);
419    let ts_event = UnixNanos::from(msg.timestamp);
420
421    Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
422}
423
424/// Converts a WebSocket topic to a bar specification.
425///
426/// # Panics
427///
428/// Panics if the topic is not a valid bar topic (`TradeBin1m`, `TradeBin5m`, `TradeBin1h`, or `TradeBin1d`).
429#[must_use]
430pub fn bar_spec_from_topic(topic: &BitmexWsTopic) -> BarSpecification {
431    match topic {
432        BitmexWsTopic::TradeBin1m => BAR_SPEC_1_MINUTE,
433        BitmexWsTopic::TradeBin5m => BAR_SPEC_5_MINUTE,
434        BitmexWsTopic::TradeBin1h => BAR_SPEC_1_HOUR,
435        BitmexWsTopic::TradeBin1d => BAR_SPEC_1_DAY,
436        _ => {
437            tracing::error!(topic = ?topic, "Bar specification not supported");
438            BAR_SPEC_1_MINUTE
439        }
440    }
441}
442
443/// Converts a bar specification to a WebSocket topic.
444///
445/// # Panics
446///
447/// Panics if the specification is not one of the supported values (1m, 5m, 1h, or 1d).
448#[must_use]
449pub fn topic_from_bar_spec(spec: BarSpecification) -> BitmexWsTopic {
450    match spec {
451        BAR_SPEC_1_MINUTE => BitmexWsTopic::TradeBin1m,
452        BAR_SPEC_5_MINUTE => BitmexWsTopic::TradeBin5m,
453        BAR_SPEC_1_HOUR => BitmexWsTopic::TradeBin1h,
454        BAR_SPEC_1_DAY => BitmexWsTopic::TradeBin1d,
455        _ => {
456            tracing::error!(spec = ?spec, "Bar specification not supported");
457            BitmexWsTopic::TradeBin1m
458        }
459    }
460}
461
462fn infer_order_type_from_msg(msg: &BitmexOrderMsg) -> Option<OrderType> {
463    if msg.stop_px.is_some() {
464        if msg.price.is_some() {
465            Some(OrderType::StopLimit)
466        } else {
467            Some(OrderType::StopMarket)
468        }
469    } else if msg.price.is_some() {
470        Some(OrderType::Limit)
471    } else {
472        Some(OrderType::Market)
473    }
474}
475
476/// Parse a BitMEX WebSocket order message into a Nautilus `OrderStatusReport`.
477///
478/// # Panics
479///
480/// Panics if required fields are missing or invalid.
481///
482/// # References
483///
484/// <https://www.bitmex.com/app/wsAPI#Order>
485///
486/// # Errors
487///
488/// Returns an error if the time in force conversion fails.
489pub fn parse_order_msg(
490    msg: &BitmexOrderMsg,
491    instrument: &InstrumentAny,
492    order_type_cache: &DashMap<ClientOrderId, OrderType>,
493) -> anyhow::Result<OrderStatusReport> {
494    let account_id = AccountId::new(format!("BITMEX-{}", msg.account)); // TODO: Revisit
495    let instrument_id = parse_instrument_id(msg.symbol);
496    let venue_order_id = VenueOrderId::new(msg.order_id.to_string());
497    let common_side: BitmexSide = msg.side.into();
498    let order_side: OrderSide = common_side.into();
499
500    let order_type: OrderType = if let Some(ord_type) = msg.ord_type {
501        ord_type.into()
502    } else if let Some(client_order_id) = msg.cl_ord_id {
503        let client_order_id = ClientOrderId::new(client_order_id);
504        if let Some(entry) = order_type_cache.get(&client_order_id) {
505            *entry.value()
506        } else if let Some(inferred) = infer_order_type_from_msg(msg) {
507            order_type_cache.insert(client_order_id, inferred);
508            inferred
509        } else {
510            anyhow::bail!(
511                "Order type not found in cache for client_order_id: {client_order_id} (order missing ord_type field)"
512            );
513        }
514    } else if let Some(inferred) = infer_order_type_from_msg(msg) {
515        inferred
516    } else {
517        anyhow::bail!("Order missing both ord_type and cl_ord_id");
518    };
519
520    let time_in_force: TimeInForce = match msg.time_in_force {
521        Some(tif) => tif.try_into().map_err(|e| anyhow::anyhow!("{e}"))?,
522        None => TimeInForce::Gtc,
523    };
524    let order_status: OrderStatus = msg.ord_status.into();
525    let quantity = parse_signed_contracts_quantity(msg.order_qty, instrument);
526    let filled_qty = parse_signed_contracts_quantity(msg.cum_qty, instrument);
527    let report_id = UUID4::new();
528    let ts_accepted =
529        parse_optional_datetime_to_unix_nanos(&Some(msg.transact_time), "transact_time");
530    let ts_last = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
531    let ts_init = get_atomic_clock_realtime().get_time_ns();
532
533    let mut report = OrderStatusReport::new(
534        account_id,
535        instrument_id,
536        None, // client_order_id - will be set later if present
537        venue_order_id,
538        order_side,
539        order_type,
540        time_in_force,
541        order_status,
542        quantity,
543        filled_qty,
544        ts_accepted,
545        ts_last,
546        ts_init,
547        Some(report_id),
548    );
549
550    if let Some(cl_ord_id) = &msg.cl_ord_id {
551        report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
552    }
553
554    if let Some(cl_ord_link_id) = &msg.cl_ord_link_id {
555        report = report.with_order_list_id(OrderListId::new(cl_ord_link_id));
556    }
557
558    if let Some(price) = msg.price {
559        report = report.with_price(Price::new(price, instrument.price_precision()));
560    }
561
562    if let Some(avg_px) = msg.avg_px {
563        report = report.with_avg_px(avg_px);
564    }
565
566    if let Some(trigger_price) = msg.stop_px {
567        let trigger_type = if let Some(exec_insts) = &msg.exec_inst {
568            // Check if any trigger type instruction is present
569            if exec_insts.contains(&BitmexExecInstruction::MarkPrice) {
570                TriggerType::MarkPrice
571            } else if exec_insts.contains(&BitmexExecInstruction::IndexPrice) {
572                TriggerType::IndexPrice
573            } else if exec_insts.contains(&BitmexExecInstruction::LastPrice) {
574                TriggerType::LastPrice
575            } else {
576                TriggerType::Default
577            }
578        } else {
579            TriggerType::Default // BitMEX defaults to LastPrice when not specified
580        };
581
582        report = report
583            .with_trigger_price(Price::new(trigger_price, instrument.price_precision()))
584            .with_trigger_type(trigger_type);
585    }
586
587    if let Some(exec_insts) = &msg.exec_inst {
588        for exec_inst in exec_insts {
589            match exec_inst {
590                BitmexExecInstruction::ParticipateDoNotInitiate => {
591                    report = report.with_post_only(true);
592                }
593                BitmexExecInstruction::ReduceOnly => {
594                    report = report.with_reduce_only(true);
595                }
596                _ => {}
597            }
598        }
599    }
600
601    // Extract rejection reason for rejected orders
602    if order_status == OrderStatus::Rejected {
603        if let Some(reason_str) = msg.ord_rej_reason.or(msg.text) {
604            tracing::debug!(
605                order_id = ?venue_order_id,
606                client_order_id = ?msg.cl_ord_id,
607                reason = ?reason_str,
608                "Order rejected with reason"
609            );
610            report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
611        } else {
612            tracing::debug!(
613                order_id = ?venue_order_id,
614                client_order_id = ?msg.cl_ord_id,
615                ord_status = ?msg.ord_status,
616                ord_rej_reason = ?msg.ord_rej_reason,
617                text = ?msg.text,
618                "Order rejected without reason from BitMEX"
619            );
620        }
621    }
622
623    // Check if this is a canceled post-only order (BitMEX cancels instead of rejecting)
624    // We need to preserve the rejection reason for the execution client to handle
625    if order_status == OrderStatus::Canceled
626        && let Some(reason_str) = msg.ord_rej_reason.or(msg.text)
627    {
628        report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
629    }
630
631    Ok(report)
632}
633
634/// Parse a BitMEX WebSocket order update message into a Nautilus `OrderUpdated` event.
635///
636/// This handles partial updates where only changed fields are present.
637pub fn parse_order_update_msg(
638    msg: &BitmexOrderUpdateMsg,
639    instrument: &InstrumentAny,
640    account_id: AccountId,
641) -> Option<OrderUpdated> {
642    // For BitMEX updates, we don't have trader_id or strategy_id from the exchange
643    // These will be populated by the execution engine when it matches the venue_order_id
644    let trader_id = TraderId::default();
645    let strategy_id = StrategyId::default();
646    let instrument_id = parse_instrument_id(msg.symbol);
647    let venue_order_id = Some(VenueOrderId::new(msg.order_id.to_string()));
648    let client_order_id = msg.cl_ord_id.map(ClientOrderId::new).unwrap_or_default();
649    let quantity = Quantity::zero(instrument.size_precision());
650    let price = msg
651        .price
652        .map(|p| Price::new(p, instrument.price_precision()));
653
654    // BitMEX doesn't send trigger price in regular order updates?
655    let trigger_price = None;
656
657    let event_id = UUID4::new();
658    let ts_event = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
659    let ts_init = get_atomic_clock_realtime().get_time_ns();
660
661    Some(nautilus_model::events::OrderUpdated::new(
662        trader_id,
663        strategy_id,
664        instrument_id,
665        client_order_id,
666        quantity,
667        event_id,
668        ts_event,
669        ts_init,
670        false, // reconciliation
671        venue_order_id,
672        Some(account_id),
673        price,
674        trigger_price,
675    ))
676}
677
678/// Parse a BitMEX WebSocket execution message into a Nautilus `FillReport`.
679///
680/// # Panics
681///
682/// Panics if required fields are missing or invalid.
683///
684/// # References
685///
686/// <https://www.bitmex.com/app/wsAPI#Execution>
687///
688pub fn parse_execution_msg(
689    msg: BitmexExecutionMsg,
690    instrument: &InstrumentAny,
691) -> Option<FillReport> {
692    if msg.exec_type != Some(BitmexExecType::Trade) {
693        return None;
694    }
695
696    let account_id = AccountId::new(format!("BITMEX-{}", msg.account?));
697    let instrument_id = parse_instrument_id(msg.symbol?);
698    let venue_order_id = VenueOrderId::new(msg.order_id?.to_string());
699    let trade_id = TradeId::new(msg.trd_match_id?.to_string());
700    let order_side: OrderSide = msg
701        .side
702        .map(|s| {
703            let side: BitmexSide = s.into();
704            side.into()
705        })
706        .unwrap_or(OrderSide::NoOrderSide);
707    let last_qty = parse_signed_contracts_quantity(msg.last_qty?, instrument);
708    let last_px = Price::new(msg.last_px?, instrument.price_precision());
709    let settlement_currency_str = msg.settl_currency.unwrap_or(Ustr::from("XBT"));
710    let mapped_currency = map_bitmex_currency(settlement_currency_str.as_str());
711    let commission = Money::new(
712        msg.commission.unwrap_or(0.0),
713        Currency::from(mapped_currency.as_str()),
714    );
715    let liquidity_side = parse_liquidity_side(&msg.last_liquidity_ind);
716    let client_order_id = msg.cl_ord_id.map(ClientOrderId::new);
717    let venue_position_id = None; // Not applicable on BitMEX
718    let ts_event = parse_optional_datetime_to_unix_nanos(&msg.transact_time, "transact_time");
719    let ts_init = get_atomic_clock_realtime().get_time_ns();
720
721    Some(FillReport::new(
722        account_id,
723        instrument_id,
724        venue_order_id,
725        trade_id,
726        order_side,
727        last_qty,
728        last_px,
729        commission,
730        liquidity_side,
731        client_order_id,
732        venue_position_id,
733        ts_event,
734        ts_init,
735        None,
736    ))
737}
738
739/// Parse a BitMEX WebSocket position message into a Nautilus `PositionStatusReport`.
740///
741/// # References
742///
743/// <https://www.bitmex.com/app/wsAPI#Position>
744#[must_use]
745pub fn parse_position_msg(
746    msg: BitmexPositionMsg,
747    instrument: &InstrumentAny,
748) -> PositionStatusReport {
749    let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
750    let instrument_id = parse_instrument_id(msg.symbol);
751    let position_side = parse_position_side(msg.current_qty).as_specified();
752    let quantity = parse_signed_contracts_quantity(msg.current_qty.unwrap_or(0), instrument);
753    let venue_position_id = None; // Not applicable on BitMEX
754    let avg_px_open = msg.avg_entry_price.and_then(Decimal::from_f64);
755    let ts_last = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
756    let ts_init = get_atomic_clock_realtime().get_time_ns();
757
758    PositionStatusReport::new(
759        account_id,
760        instrument_id,
761        position_side,
762        quantity,
763        ts_last,
764        ts_init,
765        None,              // report_id
766        venue_position_id, // venue_position_id
767        avg_px_open,       // avg_px_open
768    )
769}
770
771/// Parse a BitMEX WebSocket instrument message for mark and index prices.
772///
773/// For index symbols (e.g., `.BXBT`):
774/// - Uses the `lastPrice` field as the index price.
775/// - Also emits the `markPrice` field (which equals `lastPrice` for indices).
776///
777/// For regular instruments:
778/// - Uses the `index_price` field for index price updates.
779/// - Uses the `mark_price` field for mark price updates.
780///
781/// Returns a Vec of Data containing mark and/or index price updates
782/// or an empty Vec if no relevant price is present.
783#[must_use]
784pub fn parse_instrument_msg(
785    msg: BitmexInstrumentMsg,
786    instruments_cache: &AHashMap<Ustr, InstrumentAny>,
787    ts_init: UnixNanos,
788) -> Vec<Data> {
789    let mut updates = Vec::new();
790    let is_index = is_index_symbol(&msg.symbol);
791
792    // For index symbols (like .BXBT), the lastPrice field contains the index price
793    // For regular instruments, use the explicit index_price field if present
794    let effective_index_price = if is_index {
795        msg.last_price
796    } else {
797        msg.index_price
798    };
799
800    // Return early if no relevant prices present (mark_price or effective_index_price)
801    // Note: effective_index_price uses lastPrice for index symbols, index_price for others
802    // (Funding rates come through a separate Funding channel)
803    if msg.mark_price.is_none() && effective_index_price.is_none() {
804        return updates;
805    }
806
807    let instrument_id = InstrumentId::new(Symbol::from_ustr_unchecked(msg.symbol), *BITMEX_VENUE);
808    let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
809
810    // Look up instrument for proper precision
811    let price_precision = match instruments_cache.get(&Ustr::from(&msg.symbol)) {
812        Some(instrument) => instrument.price_precision(),
813        None => {
814            // BitMEX sends updates for all instruments on the instrument channel,
815            // but we only cache instruments that are explicitly requested.
816            // Index instruments (starting with '.') are not loaded via regular API endpoints.
817            if is_index {
818                tracing::trace!(
819                    "Index instrument {} not in cache, skipping update",
820                    msg.symbol
821                );
822            } else {
823                tracing::debug!("Instrument {} not in cache, skipping update", msg.symbol);
824            }
825            return updates;
826        }
827    };
828
829    // Add mark price update if present
830    // For index symbols, markPrice equals lastPrice and is valid to emit
831    if let Some(mark_price) = msg.mark_price {
832        let price = Price::new(mark_price, price_precision);
833        updates.push(Data::MarkPriceUpdate(MarkPriceUpdate::new(
834            instrument_id,
835            price,
836            ts_event,
837            ts_init,
838        )));
839    }
840
841    // Add index price update if present
842    if let Some(index_price) = effective_index_price {
843        let price = Price::new(index_price, price_precision);
844        updates.push(Data::IndexPriceUpdate(IndexPriceUpdate::new(
845            instrument_id,
846            price,
847            ts_event,
848            ts_init,
849        )));
850    }
851
852    updates
853}
854
855/// Parse a BitMEX WebSocket funding message.
856///
857/// Returns `Some(FundingRateUpdate)` containing funding rate information.
858/// Note: This returns `FundingRateUpdate` directly, not wrapped in Data enum,
859/// to keep it separate from the FFI layer.
860pub fn parse_funding_msg(msg: BitmexFundingMsg, ts_init: UnixNanos) -> Option<FundingRateUpdate> {
861    let instrument_id = InstrumentId::from(format!("{}.BITMEX", msg.symbol).as_str());
862    let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
863
864    // Convert funding rate to Decimal
865    let rate = match Decimal::from_str(&msg.funding_rate.to_string()) {
866        Ok(rate) => rate,
867        Err(e) => {
868            tracing::error!("Failed to parse funding rate: {e}");
869            return None;
870        }
871    };
872
873    Some(FundingRateUpdate::new(
874        instrument_id,
875        rate,
876        None, // Next funding time not provided in this message
877        ts_event,
878        ts_init,
879    ))
880}
881
882/// Parse a BitMEX wallet message into an AccountState.
883///
884/// BitMEX uses XBT (satoshis) as the base unit for Bitcoin.
885/// 1 XBT = 0.00000001 BTC (1 satoshi).
886///
887/// # Panics
888///
889/// Panics if the balance calculation is invalid (total != locked + free).
890#[must_use]
891pub fn parse_wallet_msg(msg: BitmexWalletMsg, ts_init: UnixNanos) -> AccountState {
892    let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
893
894    // Map BitMEX currency to standard currency code
895    let currency_str = crate::common::parse::map_bitmex_currency(msg.currency.as_str());
896    let currency = Currency::from(currency_str.as_str());
897
898    // BitMEX returns values in satoshis for BTC (XBt) or microunits for USDT/LAMp
899    let divisor = if msg.currency == "XBt" {
900        100_000_000.0 // Satoshis to BTC
901    } else if msg.currency == "USDt" || msg.currency == "LAMp" {
902        1_000_000.0 // Microunits to units
903    } else {
904        1.0
905    };
906    let amount = msg.amount.unwrap_or(0) as f64 / divisor;
907
908    let total = Money::new(amount, currency);
909    let locked = Money::new(0.0, currency); // No locked amount info available
910    let free = total - locked;
911
912    let balance = AccountBalance::new_checked(total, locked, free)
913        .expect("Balance calculation should be valid");
914
915    AccountState::new(
916        account_id,
917        AccountType::Margin,
918        vec![balance],
919        vec![], // margins will be added separately
920        true,   // is_reported
921        UUID4::new(),
922        ts_init,
923        ts_init,
924        None,
925    )
926}
927
928/// Parse a BitMEX margin message into margin balance information.
929///
930/// This creates a MarginBalance that can be added to an AccountState.
931#[must_use]
932pub fn parse_margin_msg(msg: BitmexMarginMsg, instrument_id: InstrumentId) -> MarginBalance {
933    // Map BitMEX currency to standard currency code
934    let currency_str = crate::common::parse::map_bitmex_currency(msg.currency.as_str());
935    let currency = Currency::from(currency_str.as_str());
936
937    // BitMEX returns values in satoshis for BTC (XBt) or microunits for USDT/LAMp
938    let divisor = if msg.currency == "XBt" {
939        100_000_000.0 // Satoshis to BTC
940    } else if msg.currency == "USDt" || msg.currency == "LAMp" {
941        1_000_000.0 // Microunits to units
942    } else {
943        1.0
944    };
945
946    let initial = (msg.init_margin.unwrap_or(0) as f64 / divisor).max(0.0);
947    let maintenance = (msg.maint_margin.unwrap_or(0) as f64 / divisor).max(0.0);
948    let _unrealized = msg.unrealised_pnl.unwrap_or(0) as f64 / divisor;
949
950    MarginBalance::new(
951        Money::new(initial, currency),
952        Money::new(maintenance, currency),
953        instrument_id,
954    )
955}
956
957////////////////////////////////////////////////////////////////////////////////
958// Tests
959////////////////////////////////////////////////////////////////////////////////
960
961#[cfg(test)]
962mod tests {
963    use chrono::{DateTime, Utc};
964    use nautilus_model::{
965        enums::{AggressorSide, BookAction, LiquiditySide, PositionSide},
966        identifiers::Symbol,
967        instruments::crypto_perpetual::CryptoPerpetual,
968    };
969    use rstest::rstest;
970    use ustr::Ustr;
971
972    use super::*;
973    use crate::common::{
974        enums::{BitmexExecType, BitmexOrderStatus},
975        testing::load_test_json,
976    };
977
978    // Helper function to create a test perpetual instrument for tests
979    fn create_test_perpetual_instrument_with_precisions(
980        price_precision: u8,
981        size_precision: u8,
982    ) -> InstrumentAny {
983        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
984            InstrumentId::from("XBTUSD.BITMEX"),
985            Symbol::new("XBTUSD"),
986            Currency::BTC(),
987            Currency::USD(),
988            Currency::BTC(),
989            true, // is_inverse
990            price_precision,
991            size_precision,
992            Price::new(0.5, price_precision),
993            Quantity::new(1.0, size_precision),
994            None, // multiplier
995            None, // lot_size
996            None, // max_quantity
997            None, // min_quantity
998            None, // max_notional
999            None, // min_notional
1000            None, // max_price
1001            None, // min_price
1002            None, // margin_init
1003            None, // margin_maint
1004            None, // maker_fee
1005            None, // taker_fee
1006            UnixNanos::default(),
1007            UnixNanos::default(),
1008        ))
1009    }
1010
1011    fn create_test_perpetual_instrument() -> InstrumentAny {
1012        create_test_perpetual_instrument_with_precisions(1, 0)
1013    }
1014
1015    #[rstest]
1016    fn test_orderbook_l2_message() {
1017        let json_data = load_test_json("ws_orderbook_l2.json");
1018
1019        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1020        let msg: BitmexOrderBookMsg = serde_json::from_str(&json_data).unwrap();
1021
1022        // Test Insert action
1023        let instrument = create_test_perpetual_instrument();
1024        let delta = parse_book_msg(
1025            &msg,
1026            &BitmexAction::Insert,
1027            &instrument,
1028            instrument.id(),
1029            instrument.price_precision(),
1030            UnixNanos::from(3),
1031        );
1032        assert_eq!(delta.instrument_id, instrument_id);
1033        assert_eq!(delta.order.price, Price::from("98459.9"));
1034        assert_eq!(delta.order.size, Quantity::from(33000));
1035        assert_eq!(delta.order.side, OrderSide::Sell);
1036        assert_eq!(delta.order.order_id, 62400580205);
1037        assert_eq!(delta.action, BookAction::Add);
1038        assert_eq!(delta.flags, RecordFlag::F_SNAPSHOT as u8);
1039        assert_eq!(delta.sequence, 0);
1040        assert_eq!(delta.ts_event, 1732436782356000000); // 2024-11-24T08:26:22.356Z in nanos
1041        assert_eq!(delta.ts_init, 3);
1042
1043        // Test Update action (should have different flags)
1044        let delta = parse_book_msg(
1045            &msg,
1046            &BitmexAction::Update,
1047            &instrument,
1048            instrument.id(),
1049            instrument.price_precision(),
1050            UnixNanos::from(3),
1051        );
1052        assert_eq!(delta.flags, 0);
1053        assert_eq!(delta.action, BookAction::Update);
1054    }
1055
1056    #[rstest]
1057    fn test_orderbook10_message() {
1058        let json_data = load_test_json("ws_orderbook_10.json");
1059        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1060        let msg: BitmexOrderBook10Msg = serde_json::from_str(&json_data).unwrap();
1061        let instrument = create_test_perpetual_instrument();
1062        let depth10 = parse_book10_msg(
1063            &msg,
1064            &instrument,
1065            instrument.id(),
1066            instrument.price_precision(),
1067            UnixNanos::from(3),
1068        );
1069
1070        assert_eq!(depth10.instrument_id, instrument_id);
1071
1072        // Check first bid level
1073        assert_eq!(depth10.bids[0].price, Price::from("98490.3"));
1074        assert_eq!(depth10.bids[0].size, Quantity::from(22400));
1075        assert_eq!(depth10.bids[0].side, OrderSide::Buy);
1076
1077        // Check first ask level
1078        assert_eq!(depth10.asks[0].price, Price::from("98490.4"));
1079        assert_eq!(depth10.asks[0].size, Quantity::from(17600));
1080        assert_eq!(depth10.asks[0].side, OrderSide::Sell);
1081
1082        // Check counts (should be 1 for each populated level)
1083        assert_eq!(depth10.bid_counts, [1; DEPTH10_LEN]);
1084        assert_eq!(depth10.ask_counts, [1; DEPTH10_LEN]);
1085
1086        // Check flags and timestamps
1087        assert_eq!(depth10.sequence, 0);
1088        assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
1089        assert_eq!(depth10.ts_event, 1732436353513000000); // 2024-11-24T08:19:13.513Z in nanos
1090        assert_eq!(depth10.ts_init, 3);
1091    }
1092
1093    #[rstest]
1094    fn test_quote_message() {
1095        let json_data = load_test_json("ws_quote.json");
1096
1097        let instrument_id = InstrumentId::from("BCHUSDT.BITMEX");
1098        let last_quote = QuoteTick::new(
1099            instrument_id,
1100            Price::new(487.50, 2),
1101            Price::new(488.20, 2),
1102            Quantity::from(100_000),
1103            Quantity::from(100_000),
1104            UnixNanos::from(1),
1105            UnixNanos::from(2),
1106        );
1107        let msg: BitmexQuoteMsg = serde_json::from_str(&json_data).unwrap();
1108        let instrument = create_test_perpetual_instrument_with_precisions(2, 0);
1109        let quote = parse_quote_msg(
1110            &msg,
1111            &last_quote,
1112            &instrument,
1113            instrument_id,
1114            instrument.price_precision(),
1115            UnixNanos::from(3),
1116        );
1117
1118        assert_eq!(quote.instrument_id, instrument_id);
1119        assert_eq!(quote.bid_price, Price::from("487.55"));
1120        assert_eq!(quote.ask_price, Price::from("488.25"));
1121        assert_eq!(quote.bid_size, Quantity::from(103_000));
1122        assert_eq!(quote.ask_size, Quantity::from(50_000));
1123        assert_eq!(quote.ts_event, 1732315465085000000);
1124        assert_eq!(quote.ts_init, 3);
1125    }
1126
1127    #[rstest]
1128    fn test_trade_message() {
1129        let json_data = load_test_json("ws_trade.json");
1130
1131        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1132        let msg: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
1133        let instrument = create_test_perpetual_instrument();
1134        let trade = parse_trade_msg(
1135            &msg,
1136            &instrument,
1137            instrument.id(),
1138            instrument.price_precision(),
1139            UnixNanos::from(3),
1140        );
1141
1142        assert_eq!(trade.instrument_id, instrument_id);
1143        assert_eq!(trade.price, Price::from("98570.9"));
1144        assert_eq!(trade.size, Quantity::from(100));
1145        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1146        assert_eq!(
1147            trade.trade_id.to_string(),
1148            "00000000-006d-1000-0000-000e8737d536"
1149        );
1150        assert_eq!(trade.ts_event, 1732436138704000000); // 2024-11-24T08:15:38.704Z in nanos
1151        assert_eq!(trade.ts_init, 3);
1152    }
1153
1154    #[rstest]
1155    fn test_trade_bin_message() {
1156        let json_data = load_test_json("ws_trade_bin_1m.json");
1157
1158        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1159        let topic = BitmexWsTopic::TradeBin1m;
1160
1161        let msg: BitmexTradeBinMsg = serde_json::from_str(&json_data).unwrap();
1162        let instrument = create_test_perpetual_instrument();
1163        let bar = parse_trade_bin_msg(
1164            &msg,
1165            &topic,
1166            &instrument,
1167            instrument.id(),
1168            instrument.price_precision(),
1169            UnixNanos::from(3),
1170        );
1171
1172        assert_eq!(bar.instrument_id(), instrument_id);
1173        assert_eq!(
1174            bar.bar_type.spec(),
1175            BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
1176        );
1177        assert_eq!(bar.open, Price::from("97550.0"));
1178        assert_eq!(bar.high, Price::from("97584.4"));
1179        assert_eq!(bar.low, Price::from("97550.0"));
1180        assert_eq!(bar.close, Price::from("97570.1"));
1181        assert_eq!(bar.volume, Quantity::from(84_000));
1182        assert_eq!(bar.ts_event, 1732392420000000000); // 2024-11-23T20:07:00.000Z in nanos
1183        assert_eq!(bar.ts_init, 3);
1184    }
1185
1186    #[rstest]
1187    fn test_trade_bin_message_extreme_adjustment() {
1188        let topic = BitmexWsTopic::TradeBin1m;
1189        let instrument = create_test_perpetual_instrument();
1190
1191        let msg = BitmexTradeBinMsg {
1192            timestamp: DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
1193                .unwrap()
1194                .with_timezone(&Utc),
1195            symbol: Ustr::from("XBTUSD"),
1196            open: 50_000.0,
1197            high: 49_990.0,
1198            low: 50_010.0,
1199            close: 50_005.0,
1200            trades: 10,
1201            volume: 1_000,
1202            vwap: 0.0,
1203            last_size: 0,
1204            turnover: 0,
1205            home_notional: 0.0,
1206            foreign_notional: 0.0,
1207        };
1208
1209        let bar = parse_trade_bin_msg(
1210            &msg,
1211            &topic,
1212            &instrument,
1213            instrument.id(),
1214            instrument.price_precision(),
1215            UnixNanos::from(3),
1216        );
1217
1218        assert_eq!(bar.high, Price::from("50010.0"));
1219        assert_eq!(bar.low, Price::from("49990.0"));
1220        assert_eq!(bar.open, Price::from("50000.0"));
1221        assert_eq!(bar.close, Price::from("50005.0"));
1222        assert_eq!(bar.volume, Quantity::from(1_000));
1223    }
1224
1225    #[rstest]
1226    fn test_parse_order_msg() {
1227        let json_data = load_test_json("ws_order.json");
1228        let msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1229        let cache = dashmap::DashMap::new();
1230        let instrument = create_test_perpetual_instrument();
1231        let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1232
1233        assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1234        assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1235        assert_eq!(
1236            report.venue_order_id.to_string(),
1237            "550e8400-e29b-41d4-a716-446655440001"
1238        );
1239        assert_eq!(
1240            report.client_order_id.unwrap().to_string(),
1241            "mm_bitmex_1a/oemUeQ4CAJZgP3fjHsA"
1242        );
1243        assert_eq!(report.order_side, OrderSide::Buy);
1244        assert_eq!(report.order_type, OrderType::Limit);
1245        assert_eq!(report.time_in_force, TimeInForce::Gtc);
1246        assert_eq!(report.order_status, OrderStatus::Accepted);
1247        assert_eq!(report.quantity, Quantity::from(100));
1248        assert_eq!(report.filled_qty, Quantity::from(0));
1249        assert_eq!(report.price.unwrap(), Price::from("98000.0"));
1250        assert_eq!(report.ts_accepted, 1732530600000000000); // 2024-11-25T10:30:00.000Z
1251    }
1252
1253    #[rstest]
1254    fn test_parse_order_msg_infers_type_when_missing() {
1255        let json_data = load_test_json("ws_order.json");
1256        let mut msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1257        msg.ord_type = None;
1258        msg.cl_ord_id = None;
1259        msg.price = Some(98_000.0);
1260        msg.stop_px = None;
1261
1262        let cache = dashmap::DashMap::new();
1263        let instrument = create_test_perpetual_instrument();
1264
1265        let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1266
1267        assert_eq!(report.order_type, OrderType::Limit);
1268    }
1269
1270    #[rstest]
1271    fn test_parse_order_msg_rejected_with_reason() {
1272        let mut msg: BitmexOrderMsg =
1273            serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1274        msg.ord_status = BitmexOrderStatus::Rejected;
1275        msg.ord_rej_reason = Some(Ustr::from("Insufficient available balance"));
1276        msg.text = None;
1277        msg.cum_qty = 0;
1278
1279        let cache = dashmap::DashMap::new();
1280        let instrument = create_test_perpetual_instrument();
1281        let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1282
1283        assert_eq!(report.order_status, OrderStatus::Rejected);
1284        assert_eq!(
1285            report.cancel_reason,
1286            Some("Insufficient available balance".to_string())
1287        );
1288    }
1289
1290    #[rstest]
1291    fn test_parse_order_msg_rejected_with_text_fallback() {
1292        let mut msg: BitmexOrderMsg =
1293            serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1294        msg.ord_status = BitmexOrderStatus::Rejected;
1295        msg.ord_rej_reason = None;
1296        msg.text = Some(Ustr::from("Order would execute immediately"));
1297        msg.cum_qty = 0;
1298
1299        let cache = dashmap::DashMap::new();
1300        let instrument = create_test_perpetual_instrument();
1301        let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1302
1303        assert_eq!(report.order_status, OrderStatus::Rejected);
1304        assert_eq!(
1305            report.cancel_reason,
1306            Some("Order would execute immediately".to_string())
1307        );
1308    }
1309
1310    #[rstest]
1311    fn test_parse_order_msg_rejected_without_reason() {
1312        let mut msg: BitmexOrderMsg =
1313            serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1314        msg.ord_status = BitmexOrderStatus::Rejected;
1315        msg.ord_rej_reason = None;
1316        msg.text = None;
1317        msg.cum_qty = 0;
1318
1319        let cache = dashmap::DashMap::new();
1320        let instrument = create_test_perpetual_instrument();
1321        let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1322
1323        assert_eq!(report.order_status, OrderStatus::Rejected);
1324        assert_eq!(report.cancel_reason, None);
1325    }
1326
1327    #[rstest]
1328    fn test_parse_execution_msg() {
1329        let json_data = load_test_json("ws_execution.json");
1330        let msg: BitmexExecutionMsg = serde_json::from_str(&json_data).unwrap();
1331        let instrument = create_test_perpetual_instrument();
1332        let fill = parse_execution_msg(msg, &instrument).unwrap();
1333
1334        assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1335        assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1336        assert_eq!(
1337            fill.venue_order_id.to_string(),
1338            "550e8400-e29b-41d4-a716-446655440002"
1339        );
1340        assert_eq!(
1341            fill.trade_id.to_string(),
1342            "00000000-006d-1000-0000-000e8737d540"
1343        );
1344        assert_eq!(
1345            fill.client_order_id.unwrap().to_string(),
1346            "mm_bitmex_2b/oemUeQ4CAJZgP3fjHsB"
1347        );
1348        assert_eq!(fill.order_side, OrderSide::Sell);
1349        assert_eq!(fill.last_qty, Quantity::from(100));
1350        assert_eq!(fill.last_px, Price::from("98950.0"));
1351        assert_eq!(fill.liquidity_side, LiquiditySide::Maker);
1352        assert_eq!(fill.commission, Money::new(0.00075, Currency::from("XBT")));
1353        assert_eq!(fill.commission.currency.code.to_string(), "XBT");
1354        assert_eq!(fill.ts_event, 1732530900789000000); // 2024-11-25T10:35:00.789Z
1355    }
1356
1357    #[rstest]
1358    fn test_parse_execution_msg_non_trade() {
1359        // Test that non-trade executions return None
1360        let mut msg: BitmexExecutionMsg =
1361            serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1362        msg.exec_type = Some(BitmexExecType::Settlement);
1363
1364        let instrument = create_test_perpetual_instrument();
1365        let result = parse_execution_msg(msg, &instrument);
1366        assert!(result.is_none());
1367    }
1368
1369    #[rstest]
1370    fn test_parse_cancel_reject_execution() {
1371        // Test that CancelReject messages can be parsed (even without symbol)
1372        let json = load_test_json("ws_execution_cancel_reject.json");
1373
1374        let msg: BitmexExecutionMsg = serde_json::from_str(&json).unwrap();
1375        assert_eq!(msg.exec_type, Some(BitmexExecType::CancelReject));
1376        assert_eq!(msg.ord_status, Some(BitmexOrderStatus::Rejected));
1377        assert_eq!(msg.symbol, None);
1378
1379        // Should return None since it's not a Trade
1380        let instrument = create_test_perpetual_instrument();
1381        let result = parse_execution_msg(msg, &instrument);
1382        assert!(result.is_none());
1383    }
1384
1385    #[rstest]
1386    fn test_parse_position_msg() {
1387        let json_data = load_test_json("ws_position.json");
1388        let msg: BitmexPositionMsg = serde_json::from_str(&json_data).unwrap();
1389        let instrument = create_test_perpetual_instrument();
1390        let report = parse_position_msg(msg, &instrument);
1391
1392        assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1393        assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1394        assert_eq!(report.position_side.as_position_side(), PositionSide::Long);
1395        assert_eq!(report.quantity, Quantity::from(1000));
1396        assert!(report.venue_position_id.is_none());
1397        assert_eq!(report.ts_last, 1732530900789000000); // 2024-11-25T10:35:00.789Z
1398    }
1399
1400    #[rstest]
1401    fn test_parse_position_msg_short() {
1402        let mut msg: BitmexPositionMsg =
1403            serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1404        msg.current_qty = Some(-500);
1405
1406        let instrument = create_test_perpetual_instrument();
1407        let report = parse_position_msg(msg, &instrument);
1408        assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
1409        assert_eq!(report.quantity, Quantity::from(500));
1410    }
1411
1412    #[rstest]
1413    fn test_parse_position_msg_flat() {
1414        let mut msg: BitmexPositionMsg =
1415            serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1416        msg.current_qty = Some(0);
1417
1418        let instrument = create_test_perpetual_instrument();
1419        let report = parse_position_msg(msg, &instrument);
1420        assert_eq!(report.position_side.as_position_side(), PositionSide::Flat);
1421        assert_eq!(report.quantity, Quantity::from(0));
1422    }
1423
1424    #[rstest]
1425    fn test_parse_wallet_msg() {
1426        let json_data = load_test_json("ws_wallet.json");
1427        let msg: BitmexWalletMsg = serde_json::from_str(&json_data).unwrap();
1428        let ts_init = UnixNanos::from(1);
1429        let account_state = parse_wallet_msg(msg, ts_init);
1430
1431        assert_eq!(account_state.account_id.to_string(), "BITMEX-1234567");
1432        assert!(!account_state.balances.is_empty());
1433        let balance = &account_state.balances[0];
1434        assert_eq!(balance.currency.code.to_string(), "XBT");
1435        // Amount should be converted from satoshis (100005180 / 100_000_000.0 = 1.0000518)
1436        assert!((balance.total.as_f64() - 1.0000518).abs() < 1e-7);
1437    }
1438
1439    #[rstest]
1440    fn test_parse_wallet_msg_no_amount() {
1441        let mut msg: BitmexWalletMsg =
1442            serde_json::from_str(&load_test_json("ws_wallet.json")).unwrap();
1443        msg.amount = None;
1444
1445        let ts_init = UnixNanos::from(1);
1446        let account_state = parse_wallet_msg(msg, ts_init);
1447        let balance = &account_state.balances[0];
1448        assert_eq!(balance.total.as_f64(), 0.0);
1449    }
1450
1451    #[rstest]
1452    fn test_parse_margin_msg() {
1453        let json_data = load_test_json("ws_margin.json");
1454        let msg: BitmexMarginMsg = serde_json::from_str(&json_data).unwrap();
1455        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1456        let margin_balance = parse_margin_msg(msg, instrument_id);
1457
1458        assert_eq!(margin_balance.currency.code.to_string(), "XBT");
1459        assert_eq!(margin_balance.instrument_id, instrument_id);
1460        // Values should be converted from satoshis to BTC
1461        // initMargin is 0 in test data, so should be 0.0
1462        assert_eq!(margin_balance.initial.as_f64(), 0.0);
1463        // maintMargin is 15949 satoshis = 0.00015949 BTC
1464        assert!((margin_balance.maintenance.as_f64() - 0.00015949).abs() < 1e-8);
1465    }
1466
1467    #[rstest]
1468    fn test_parse_margin_msg_no_available() {
1469        let mut msg: BitmexMarginMsg =
1470            serde_json::from_str(&load_test_json("ws_margin.json")).unwrap();
1471        msg.available_margin = None;
1472
1473        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1474        let margin_balance = parse_margin_msg(msg, instrument_id);
1475        // Should still have valid margin values even if available_margin is None
1476        assert!(margin_balance.initial.as_f64() >= 0.0);
1477        assert!(margin_balance.maintenance.as_f64() >= 0.0);
1478    }
1479
1480    #[rstest]
1481    fn test_parse_instrument_msg_both_prices() {
1482        let json_data = load_test_json("ws_instrument.json");
1483        let msg: BitmexInstrumentMsg = serde_json::from_str(&json_data).unwrap();
1484
1485        // Create cache with test instrument
1486        let mut instruments_cache = AHashMap::new();
1487        let test_instrument = create_test_perpetual_instrument();
1488        instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1489
1490        let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1491
1492        // XBTUSD is not an index symbol, so it should have both mark and index prices
1493        assert_eq!(updates.len(), 2);
1494
1495        // Check mark price update
1496        match &updates[0] {
1497            Data::MarkPriceUpdate(update) => {
1498                assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1499                assert_eq!(update.value.as_f64(), 95125.7);
1500            }
1501            _ => panic!("Expected MarkPriceUpdate at index 0"),
1502        }
1503
1504        // Check index price update
1505        match &updates[1] {
1506            Data::IndexPriceUpdate(update) => {
1507                assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1508                assert_eq!(update.value.as_f64(), 95124.3);
1509            }
1510            _ => panic!("Expected IndexPriceUpdate at index 1"),
1511        }
1512    }
1513
1514    #[rstest]
1515    fn test_parse_instrument_msg_mark_price_only() {
1516        let mut msg: BitmexInstrumentMsg =
1517            serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1518        msg.index_price = None;
1519
1520        // Create cache with test instrument
1521        let mut instruments_cache = AHashMap::new();
1522        let test_instrument = create_test_perpetual_instrument();
1523        instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1524
1525        let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1526
1527        assert_eq!(updates.len(), 1);
1528        match &updates[0] {
1529            Data::MarkPriceUpdate(update) => {
1530                assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1531                assert_eq!(update.value.as_f64(), 95125.7);
1532            }
1533            _ => panic!("Expected MarkPriceUpdate"),
1534        }
1535    }
1536
1537    #[rstest]
1538    fn test_parse_instrument_msg_index_price_only() {
1539        let mut msg: BitmexInstrumentMsg =
1540            serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1541        msg.mark_price = None;
1542
1543        // Create cache with test instrument
1544        let mut instruments_cache = AHashMap::new();
1545        let test_instrument = create_test_perpetual_instrument();
1546        instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1547
1548        let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1549
1550        assert_eq!(updates.len(), 1);
1551        match &updates[0] {
1552            Data::IndexPriceUpdate(update) => {
1553                assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1554                assert_eq!(update.value.as_f64(), 95124.3);
1555            }
1556            _ => panic!("Expected IndexPriceUpdate"),
1557        }
1558    }
1559
1560    #[rstest]
1561    fn test_parse_instrument_msg_no_prices() {
1562        let mut msg: BitmexInstrumentMsg =
1563            serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1564        msg.mark_price = None;
1565        msg.index_price = None;
1566        msg.last_price = None;
1567
1568        // Create cache with test instrument
1569        let mut instruments_cache = AHashMap::new();
1570        let test_instrument = create_test_perpetual_instrument();
1571        instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1572
1573        let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1574        assert_eq!(updates.len(), 0);
1575    }
1576
1577    #[rstest]
1578    fn test_parse_instrument_msg_index_symbol() {
1579        // Test for index symbols like .BXBT where lastPrice is the index price
1580        // and markPrice equals lastPrice
1581        let mut msg: BitmexInstrumentMsg =
1582            serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1583        msg.symbol = Ustr::from(".BXBT");
1584        msg.last_price = Some(119163.05);
1585        msg.mark_price = Some(119163.05); // Index symbols have mark price equal to last price
1586        msg.index_price = None;
1587
1588        // Create instruments cache with proper precision for .BXBT
1589        let instrument_id = InstrumentId::from(".BXBT.BITMEX");
1590        let instrument = CryptoPerpetual::new(
1591            instrument_id,
1592            Symbol::from(".BXBT"),
1593            Currency::BTC(),
1594            Currency::USD(),
1595            Currency::USD(),
1596            false, // is_inverse
1597            2,     // price_precision (for 119163.05)
1598            8,     // size_precision
1599            Price::from("0.01"),
1600            Quantity::from("0.00000001"),
1601            None,                 // multiplier
1602            None,                 // lot_size
1603            None,                 // max_quantity
1604            None,                 // min_quantity
1605            None,                 // max_notional
1606            None,                 // min_notional
1607            None,                 // max_price
1608            None,                 // min_price
1609            None,                 // margin_init
1610            None,                 // margin_maint
1611            None,                 // maker_fee
1612            None,                 // taker_fee
1613            UnixNanos::default(), // ts_event
1614            UnixNanos::default(), // ts_init
1615        );
1616        let mut instruments_cache = AHashMap::new();
1617        instruments_cache.insert(
1618            Ustr::from(".BXBT"),
1619            InstrumentAny::CryptoPerpetual(instrument),
1620        );
1621
1622        let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1623
1624        assert_eq!(updates.len(), 2);
1625
1626        // Check mark price update
1627        match &updates[0] {
1628            Data::MarkPriceUpdate(update) => {
1629                assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1630                assert_eq!(update.value, Price::from("119163.05"));
1631            }
1632            _ => panic!("Expected MarkPriceUpdate for index symbol"),
1633        }
1634
1635        // Check index price update
1636        match &updates[1] {
1637            Data::IndexPriceUpdate(update) => {
1638                assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1639                assert_eq!(update.value, Price::from("119163.05"));
1640                assert_eq!(update.ts_init, UnixNanos::from(1));
1641            }
1642            _ => panic!("Expected IndexPriceUpdate for index symbol"),
1643        }
1644    }
1645
1646    #[rstest]
1647    fn test_parse_funding_msg() {
1648        let json_data = load_test_json("ws_funding_rate.json");
1649        let msg: BitmexFundingMsg = serde_json::from_str(&json_data).unwrap();
1650        let update = parse_funding_msg(msg, UnixNanos::from(1));
1651
1652        assert!(update.is_some());
1653        let update = update.unwrap();
1654
1655        assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1656        assert_eq!(update.rate.to_string(), "0.0001");
1657        assert!(update.next_funding_ns.is_none());
1658        assert_eq!(update.ts_event, UnixNanos::from(1732507200000000000));
1659        assert_eq!(update.ts_init, UnixNanos::from(1));
1660    }
1661}