1use std::{num::NonZero, str::FromStr};
19
20use ahash::AHashMap;
21use dashmap::DashMap;
22use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime, uuid::UUID4};
23use nautilus_model::{
24 data::{
25 Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
26 MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
27 depth::DEPTH10_LEN,
28 },
29 enums::{
30 AccountType, AggregationSource, BarAggregation, OrderSide, OrderStatus, OrderType,
31 PriceType, RecordFlag, TimeInForce, TriggerType,
32 },
33 events::{OrderUpdated, account::state::AccountState},
34 identifiers::{
35 AccountId, ClientOrderId, InstrumentId, OrderListId, StrategyId, Symbol, TradeId, TraderId,
36 VenueOrderId,
37 },
38 instruments::{Instrument, InstrumentAny},
39 reports::{FillReport, OrderStatusReport, PositionStatusReport},
40 types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
41};
42use rust_decimal::{Decimal, prelude::FromPrimitive};
43use ustr::Ustr;
44use uuid::Uuid;
45
46use super::{
47 enums::{BitmexAction, BitmexWsTopic},
48 messages::{
49 BitmexExecutionMsg, BitmexFundingMsg, BitmexInstrumentMsg, BitmexMarginMsg,
50 BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexOrderMsg, BitmexPositionMsg,
51 BitmexQuoteMsg, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg,
52 },
53};
54use crate::{
55 common::{
56 consts::BITMEX_VENUE,
57 enums::{BitmexExecInstruction, BitmexExecType, BitmexSide},
58 parse::{
59 clean_reason, map_bitmex_currency, normalize_trade_bin_prices,
60 normalize_trade_bin_volume, parse_contracts_quantity, parse_fractional_quantity,
61 parse_instrument_id, parse_liquidity_side, parse_optional_datetime_to_unix_nanos,
62 parse_position_side, parse_signed_contracts_quantity,
63 },
64 },
65 websocket::messages::BitmexOrderUpdateMsg,
66};
67
68const BAR_SPEC_1_MINUTE: BarSpecification = BarSpecification {
69 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
70 aggregation: BarAggregation::Minute,
71 price_type: PriceType::Last,
72};
73const BAR_SPEC_5_MINUTE: BarSpecification = BarSpecification {
74 step: NonZero::new(5).expect("5 is a valid non-zero usize"),
75 aggregation: BarAggregation::Minute,
76 price_type: PriceType::Last,
77};
78const BAR_SPEC_1_HOUR: BarSpecification = BarSpecification {
79 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
80 aggregation: BarAggregation::Hour,
81 price_type: PriceType::Last,
82};
83const BAR_SPEC_1_DAY: BarSpecification = BarSpecification {
84 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
85 aggregation: BarAggregation::Day,
86 price_type: PriceType::Last,
87};
88
89#[inline]
97#[must_use]
98pub fn is_index_symbol(symbol: &Ustr) -> bool {
99 symbol.starts_with('.')
100}
101
102#[must_use]
104pub fn parse_book_msg_vec(
105 data: Vec<BitmexOrderBookMsg>,
106 action: BitmexAction,
107 instruments: &AHashMap<Ustr, InstrumentAny>,
108 ts_init: UnixNanos,
109) -> Vec<Data> {
110 let mut deltas = Vec::with_capacity(data.len());
111
112 for msg in data {
113 if let Some(instrument) = instruments.get(&msg.symbol) {
114 let instrument_id = instrument.id();
115 let price_precision = instrument.price_precision();
116 deltas.push(Data::Delta(parse_book_msg(
117 &msg,
118 &action,
119 instrument,
120 instrument_id,
121 price_precision,
122 ts_init,
123 )));
124 } else {
125 tracing::warn!(symbol = %msg.symbol, "Instrument not found in cache for book delta");
126 }
127 }
128 deltas
129}
130
131#[must_use]
133pub fn parse_book10_msg_vec(
134 data: Vec<BitmexOrderBook10Msg>,
135 instruments: &AHashMap<Ustr, InstrumentAny>,
136 ts_init: UnixNanos,
137) -> Vec<Data> {
138 let mut depths = Vec::with_capacity(data.len());
139
140 for msg in data {
141 if let Some(instrument) = instruments.get(&msg.symbol) {
142 let instrument_id = instrument.id();
143 let price_precision = instrument.price_precision();
144 depths.push(Data::Depth10(Box::new(parse_book10_msg(
145 &msg,
146 instrument,
147 instrument_id,
148 price_precision,
149 ts_init,
150 ))));
151 } else {
152 tracing::warn!(symbol = %msg.symbol, "Instrument not found in cache for depth10");
153 }
154 }
155 depths
156}
157
158#[must_use]
160pub fn parse_trade_msg_vec(
161 data: Vec<BitmexTradeMsg>,
162 instruments: &AHashMap<Ustr, InstrumentAny>,
163 ts_init: UnixNanos,
164) -> Vec<Data> {
165 let mut trades = Vec::with_capacity(data.len());
166
167 for msg in data {
168 if let Some(instrument) = instruments.get(&msg.symbol) {
169 let instrument_id = instrument.id();
170 let price_precision = instrument.price_precision();
171 trades.push(Data::Trade(parse_trade_msg(
172 &msg,
173 instrument,
174 instrument_id,
175 price_precision,
176 ts_init,
177 )));
178 } else {
179 tracing::warn!(symbol = %msg.symbol, "Instrument not found in cache for trade");
180 }
181 }
182 trades
183}
184
185#[must_use]
187pub fn parse_trade_bin_msg_vec(
188 data: Vec<BitmexTradeBinMsg>,
189 topic: BitmexWsTopic,
190 instruments: &AHashMap<Ustr, InstrumentAny>,
191 ts_init: UnixNanos,
192) -> Vec<Data> {
193 let mut trades = Vec::with_capacity(data.len());
194
195 for msg in data {
196 if let Some(instrument) = instruments.get(&msg.symbol) {
197 let instrument_id = instrument.id();
198 let price_precision = instrument.price_precision();
199 trades.push(Data::Bar(parse_trade_bin_msg(
200 &msg,
201 &topic,
202 instrument,
203 instrument_id,
204 price_precision,
205 ts_init,
206 )));
207 } else {
208 tracing::warn!(symbol = %msg.symbol, "Instrument not found in cache for trade bin");
209 }
210 }
211 trades
212}
213
214#[allow(clippy::too_many_arguments)]
216#[must_use]
217pub fn parse_book_msg(
218 msg: &BitmexOrderBookMsg,
219 action: &BitmexAction,
220 instrument: &InstrumentAny,
221 instrument_id: InstrumentId,
222 price_precision: u8,
223 ts_init: UnixNanos,
224) -> OrderBookDelta {
225 let flags = if action == &BitmexAction::Insert {
226 RecordFlag::F_SNAPSHOT as u8
227 } else {
228 0
229 };
230
231 let action = action.as_book_action();
232 let price = Price::new(msg.price, price_precision);
233 let side = msg.side.as_order_side();
234 let size = parse_contracts_quantity(msg.size.unwrap_or(0), instrument);
235 let order_id = msg.id;
236 let order = BookOrder::new(side, price, size, order_id);
237 let sequence = 0; let ts_event = UnixNanos::from(msg.timestamp);
239
240 OrderBookDelta::new(
241 instrument_id,
242 action,
243 order,
244 flags,
245 sequence,
246 ts_event,
247 ts_init,
248 )
249}
250
251#[allow(clippy::too_many_arguments)]
257#[must_use]
258pub fn parse_book10_msg(
259 msg: &BitmexOrderBook10Msg,
260 instrument: &InstrumentAny,
261 instrument_id: InstrumentId,
262 price_precision: u8,
263 ts_init: UnixNanos,
264) -> OrderBookDepth10 {
265 let mut bids = Vec::with_capacity(DEPTH10_LEN);
266 let mut asks = Vec::with_capacity(DEPTH10_LEN);
267
268 let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
270 let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
271
272 for (i, level) in msg.bids.iter().enumerate() {
273 let bid_order = BookOrder::new(
274 OrderSide::Buy,
275 Price::new(level[0], price_precision),
276 parse_fractional_quantity(level[1], instrument),
277 0,
278 );
279
280 bids.push(bid_order);
281 bid_counts[i] = 1;
282 }
283
284 for (i, level) in msg.asks.iter().enumerate() {
285 let ask_order = BookOrder::new(
286 OrderSide::Sell,
287 Price::new(level[0], price_precision),
288 parse_fractional_quantity(level[1], instrument),
289 0,
290 );
291
292 asks.push(ask_order);
293 ask_counts[i] = 1;
294 }
295
296 let bids: [BookOrder; DEPTH10_LEN] = bids
297 .try_into()
298 .inspect_err(|v: &Vec<BookOrder>| {
299 tracing::error!("Bids length mismatch: expected 10, was {}", v.len());
300 })
301 .expect("BitMEX orderBook10 should always have exactly 10 bid levels");
302 let asks: [BookOrder; DEPTH10_LEN] = asks
303 .try_into()
304 .inspect_err(|v: &Vec<BookOrder>| {
305 tracing::error!("Asks length mismatch: expected 10, was {}", v.len());
306 })
307 .expect("BitMEX orderBook10 should always have exactly 10 ask levels");
308
309 let ts_event = UnixNanos::from(msg.timestamp);
310
311 OrderBookDepth10::new(
312 instrument_id,
313 bids,
314 asks,
315 bid_counts,
316 ask_counts,
317 RecordFlag::F_SNAPSHOT as u8,
318 0, ts_event,
320 ts_init,
321 )
322}
323
324#[must_use]
326pub fn parse_quote_msg(
327 msg: &BitmexQuoteMsg,
328 last_quote: &QuoteTick,
329 instrument: &InstrumentAny,
330 instrument_id: InstrumentId,
331 price_precision: u8,
332 ts_init: UnixNanos,
333) -> QuoteTick {
334 let bid_price = match msg.bid_price {
335 Some(price) => Price::new(price, price_precision),
336 None => last_quote.bid_price,
337 };
338
339 let ask_price = match msg.ask_price {
340 Some(price) => Price::new(price, price_precision),
341 None => last_quote.ask_price,
342 };
343
344 let bid_size = match msg.bid_size {
345 Some(size) => parse_contracts_quantity(size, instrument),
346 None => last_quote.bid_size,
347 };
348
349 let ask_size = match msg.ask_size {
350 Some(size) => parse_contracts_quantity(size, instrument),
351 None => last_quote.ask_size,
352 };
353
354 let ts_event = UnixNanos::from(msg.timestamp);
355
356 QuoteTick::new(
357 instrument_id,
358 bid_price,
359 ask_price,
360 bid_size,
361 ask_size,
362 ts_event,
363 ts_init,
364 )
365}
366
367#[must_use]
369pub fn parse_trade_msg(
370 msg: &BitmexTradeMsg,
371 instrument: &InstrumentAny,
372 instrument_id: InstrumentId,
373 price_precision: u8,
374 ts_init: UnixNanos,
375) -> TradeTick {
376 let price = Price::new(msg.price, price_precision);
377 let size = parse_contracts_quantity(msg.size, instrument);
378 let aggressor_side = msg.side.as_aggressor_side();
379 let trade_id = TradeId::new(
380 msg.trd_match_id
381 .map_or_else(|| Uuid::new_v4().to_string(), |uuid| uuid.to_string()),
382 );
383 let ts_event = UnixNanos::from(msg.timestamp);
384
385 TradeTick::new(
386 instrument_id,
387 price,
388 size,
389 aggressor_side,
390 trade_id,
391 ts_event,
392 ts_init,
393 )
394}
395
396#[must_use]
398pub fn parse_trade_bin_msg(
399 msg: &BitmexTradeBinMsg,
400 topic: &BitmexWsTopic,
401 instrument: &InstrumentAny,
402 instrument_id: InstrumentId,
403 price_precision: u8,
404 ts_init: UnixNanos,
405) -> Bar {
406 let spec = bar_spec_from_topic(topic);
407 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
408
409 let open = Price::new(msg.open, price_precision);
410 let high = Price::new(msg.high, price_precision);
411 let low = Price::new(msg.low, price_precision);
412 let close = Price::new(msg.close, price_precision);
413
414 let (open, high, low, close) =
415 normalize_trade_bin_prices(open, high, low, close, &msg.symbol, Some(&bar_type));
416
417 let volume_contracts = normalize_trade_bin_volume(Some(msg.volume), &msg.symbol);
418 let volume = parse_contracts_quantity(volume_contracts, instrument);
419 let ts_event = UnixNanos::from(msg.timestamp);
420
421 Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
422}
423
424#[must_use]
430pub fn bar_spec_from_topic(topic: &BitmexWsTopic) -> BarSpecification {
431 match topic {
432 BitmexWsTopic::TradeBin1m => BAR_SPEC_1_MINUTE,
433 BitmexWsTopic::TradeBin5m => BAR_SPEC_5_MINUTE,
434 BitmexWsTopic::TradeBin1h => BAR_SPEC_1_HOUR,
435 BitmexWsTopic::TradeBin1d => BAR_SPEC_1_DAY,
436 _ => {
437 tracing::error!(topic = ?topic, "Bar specification not supported");
438 BAR_SPEC_1_MINUTE
439 }
440 }
441}
442
443#[must_use]
449pub fn topic_from_bar_spec(spec: BarSpecification) -> BitmexWsTopic {
450 match spec {
451 BAR_SPEC_1_MINUTE => BitmexWsTopic::TradeBin1m,
452 BAR_SPEC_5_MINUTE => BitmexWsTopic::TradeBin5m,
453 BAR_SPEC_1_HOUR => BitmexWsTopic::TradeBin1h,
454 BAR_SPEC_1_DAY => BitmexWsTopic::TradeBin1d,
455 _ => {
456 tracing::error!(spec = ?spec, "Bar specification not supported");
457 BitmexWsTopic::TradeBin1m
458 }
459 }
460}
461
462fn infer_order_type_from_msg(msg: &BitmexOrderMsg) -> Option<OrderType> {
463 if msg.stop_px.is_some() {
464 if msg.price.is_some() {
465 Some(OrderType::StopLimit)
466 } else {
467 Some(OrderType::StopMarket)
468 }
469 } else if msg.price.is_some() {
470 Some(OrderType::Limit)
471 } else {
472 Some(OrderType::Market)
473 }
474}
475
476pub fn parse_order_msg(
490 msg: &BitmexOrderMsg,
491 instrument: &InstrumentAny,
492 order_type_cache: &DashMap<ClientOrderId, OrderType>,
493) -> anyhow::Result<OrderStatusReport> {
494 let account_id = AccountId::new(format!("BITMEX-{}", msg.account)); let instrument_id = parse_instrument_id(msg.symbol);
496 let venue_order_id = VenueOrderId::new(msg.order_id.to_string());
497 let common_side: BitmexSide = msg.side.into();
498 let order_side: OrderSide = common_side.into();
499
500 let order_type: OrderType = if let Some(ord_type) = msg.ord_type {
501 ord_type.into()
502 } else if let Some(client_order_id) = msg.cl_ord_id {
503 let client_order_id = ClientOrderId::new(client_order_id);
504 if let Some(entry) = order_type_cache.get(&client_order_id) {
505 *entry.value()
506 } else if let Some(inferred) = infer_order_type_from_msg(msg) {
507 order_type_cache.insert(client_order_id, inferred);
508 inferred
509 } else {
510 anyhow::bail!(
511 "Order type not found in cache for client_order_id: {client_order_id} (order missing ord_type field)"
512 );
513 }
514 } else if let Some(inferred) = infer_order_type_from_msg(msg) {
515 inferred
516 } else {
517 anyhow::bail!("Order missing both ord_type and cl_ord_id");
518 };
519
520 let time_in_force: TimeInForce = match msg.time_in_force {
521 Some(tif) => tif.try_into().map_err(|e| anyhow::anyhow!("{e}"))?,
522 None => TimeInForce::Gtc,
523 };
524 let order_status: OrderStatus = msg.ord_status.into();
525 let quantity = parse_signed_contracts_quantity(msg.order_qty, instrument);
526 let filled_qty = parse_signed_contracts_quantity(msg.cum_qty, instrument);
527 let report_id = UUID4::new();
528 let ts_accepted =
529 parse_optional_datetime_to_unix_nanos(&Some(msg.transact_time), "transact_time");
530 let ts_last = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
531 let ts_init = get_atomic_clock_realtime().get_time_ns();
532
533 let mut report = OrderStatusReport::new(
534 account_id,
535 instrument_id,
536 None, venue_order_id,
538 order_side,
539 order_type,
540 time_in_force,
541 order_status,
542 quantity,
543 filled_qty,
544 ts_accepted,
545 ts_last,
546 ts_init,
547 Some(report_id),
548 );
549
550 if let Some(cl_ord_id) = &msg.cl_ord_id {
551 report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
552 }
553
554 if let Some(cl_ord_link_id) = &msg.cl_ord_link_id {
555 report = report.with_order_list_id(OrderListId::new(cl_ord_link_id));
556 }
557
558 if let Some(price) = msg.price {
559 report = report.with_price(Price::new(price, instrument.price_precision()));
560 }
561
562 if let Some(avg_px) = msg.avg_px {
563 report = report.with_avg_px(avg_px);
564 }
565
566 if let Some(trigger_price) = msg.stop_px {
567 let trigger_type = if let Some(exec_insts) = &msg.exec_inst {
568 if exec_insts.contains(&BitmexExecInstruction::MarkPrice) {
570 TriggerType::MarkPrice
571 } else if exec_insts.contains(&BitmexExecInstruction::IndexPrice) {
572 TriggerType::IndexPrice
573 } else if exec_insts.contains(&BitmexExecInstruction::LastPrice) {
574 TriggerType::LastPrice
575 } else {
576 TriggerType::Default
577 }
578 } else {
579 TriggerType::Default };
581
582 report = report
583 .with_trigger_price(Price::new(trigger_price, instrument.price_precision()))
584 .with_trigger_type(trigger_type);
585 }
586
587 if let Some(exec_insts) = &msg.exec_inst {
588 for exec_inst in exec_insts {
589 match exec_inst {
590 BitmexExecInstruction::ParticipateDoNotInitiate => {
591 report = report.with_post_only(true);
592 }
593 BitmexExecInstruction::ReduceOnly => {
594 report = report.with_reduce_only(true);
595 }
596 _ => {}
597 }
598 }
599 }
600
601 if order_status == OrderStatus::Rejected {
603 if let Some(reason_str) = msg.ord_rej_reason.or(msg.text) {
604 tracing::debug!(
605 order_id = ?venue_order_id,
606 client_order_id = ?msg.cl_ord_id,
607 reason = ?reason_str,
608 "Order rejected with reason"
609 );
610 report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
611 } else {
612 tracing::debug!(
613 order_id = ?venue_order_id,
614 client_order_id = ?msg.cl_ord_id,
615 ord_status = ?msg.ord_status,
616 ord_rej_reason = ?msg.ord_rej_reason,
617 text = ?msg.text,
618 "Order rejected without reason from BitMEX"
619 );
620 }
621 }
622
623 if order_status == OrderStatus::Canceled
626 && let Some(reason_str) = msg.ord_rej_reason.or(msg.text)
627 {
628 report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
629 }
630
631 Ok(report)
632}
633
634pub fn parse_order_update_msg(
638 msg: &BitmexOrderUpdateMsg,
639 instrument: &InstrumentAny,
640 account_id: AccountId,
641) -> Option<OrderUpdated> {
642 let trader_id = TraderId::default();
645 let strategy_id = StrategyId::default();
646 let instrument_id = parse_instrument_id(msg.symbol);
647 let venue_order_id = Some(VenueOrderId::new(msg.order_id.to_string()));
648 let client_order_id = msg.cl_ord_id.map(ClientOrderId::new).unwrap_or_default();
649 let quantity = Quantity::zero(instrument.size_precision());
650 let price = msg
651 .price
652 .map(|p| Price::new(p, instrument.price_precision()));
653
654 let trigger_price = None;
656
657 let event_id = UUID4::new();
658 let ts_event = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
659 let ts_init = get_atomic_clock_realtime().get_time_ns();
660
661 Some(nautilus_model::events::OrderUpdated::new(
662 trader_id,
663 strategy_id,
664 instrument_id,
665 client_order_id,
666 quantity,
667 event_id,
668 ts_event,
669 ts_init,
670 false, venue_order_id,
672 Some(account_id),
673 price,
674 trigger_price,
675 ))
676}
677
678pub fn parse_execution_msg(
689 msg: BitmexExecutionMsg,
690 instrument: &InstrumentAny,
691) -> Option<FillReport> {
692 if msg.exec_type != Some(BitmexExecType::Trade) {
693 return None;
694 }
695
696 let account_id = AccountId::new(format!("BITMEX-{}", msg.account?));
697 let instrument_id = parse_instrument_id(msg.symbol?);
698 let venue_order_id = VenueOrderId::new(msg.order_id?.to_string());
699 let trade_id = TradeId::new(msg.trd_match_id?.to_string());
700 let order_side: OrderSide = msg
701 .side
702 .map(|s| {
703 let side: BitmexSide = s.into();
704 side.into()
705 })
706 .unwrap_or(OrderSide::NoOrderSide);
707 let last_qty = parse_signed_contracts_quantity(msg.last_qty?, instrument);
708 let last_px = Price::new(msg.last_px?, instrument.price_precision());
709 let settlement_currency_str = msg.settl_currency.unwrap_or(Ustr::from("XBT"));
710 let mapped_currency = map_bitmex_currency(settlement_currency_str.as_str());
711 let commission = Money::new(
712 msg.commission.unwrap_or(0.0),
713 Currency::from(mapped_currency.as_str()),
714 );
715 let liquidity_side = parse_liquidity_side(&msg.last_liquidity_ind);
716 let client_order_id = msg.cl_ord_id.map(ClientOrderId::new);
717 let venue_position_id = None; let ts_event = parse_optional_datetime_to_unix_nanos(&msg.transact_time, "transact_time");
719 let ts_init = get_atomic_clock_realtime().get_time_ns();
720
721 Some(FillReport::new(
722 account_id,
723 instrument_id,
724 venue_order_id,
725 trade_id,
726 order_side,
727 last_qty,
728 last_px,
729 commission,
730 liquidity_side,
731 client_order_id,
732 venue_position_id,
733 ts_event,
734 ts_init,
735 None,
736 ))
737}
738
739#[must_use]
745pub fn parse_position_msg(
746 msg: BitmexPositionMsg,
747 instrument: &InstrumentAny,
748) -> PositionStatusReport {
749 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
750 let instrument_id = parse_instrument_id(msg.symbol);
751 let position_side = parse_position_side(msg.current_qty).as_specified();
752 let quantity = parse_signed_contracts_quantity(msg.current_qty.unwrap_or(0), instrument);
753 let venue_position_id = None; let avg_px_open = msg.avg_entry_price.and_then(Decimal::from_f64);
755 let ts_last = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
756 let ts_init = get_atomic_clock_realtime().get_time_ns();
757
758 PositionStatusReport::new(
759 account_id,
760 instrument_id,
761 position_side,
762 quantity,
763 ts_last,
764 ts_init,
765 None, venue_position_id, avg_px_open, )
769}
770
771#[must_use]
784pub fn parse_instrument_msg(
785 msg: BitmexInstrumentMsg,
786 instruments_cache: &AHashMap<Ustr, InstrumentAny>,
787 ts_init: UnixNanos,
788) -> Vec<Data> {
789 let mut updates = Vec::new();
790 let is_index = is_index_symbol(&msg.symbol);
791
792 let effective_index_price = if is_index {
795 msg.last_price
796 } else {
797 msg.index_price
798 };
799
800 if msg.mark_price.is_none() && effective_index_price.is_none() {
804 return updates;
805 }
806
807 let instrument_id = InstrumentId::new(Symbol::from_ustr_unchecked(msg.symbol), *BITMEX_VENUE);
808 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
809
810 let price_precision = match instruments_cache.get(&Ustr::from(&msg.symbol)) {
812 Some(instrument) => instrument.price_precision(),
813 None => {
814 if is_index {
818 tracing::trace!(
819 "Index instrument {} not in cache, skipping update",
820 msg.symbol
821 );
822 } else {
823 tracing::debug!("Instrument {} not in cache, skipping update", msg.symbol);
824 }
825 return updates;
826 }
827 };
828
829 if let Some(mark_price) = msg.mark_price {
832 let price = Price::new(mark_price, price_precision);
833 updates.push(Data::MarkPriceUpdate(MarkPriceUpdate::new(
834 instrument_id,
835 price,
836 ts_event,
837 ts_init,
838 )));
839 }
840
841 if let Some(index_price) = effective_index_price {
843 let price = Price::new(index_price, price_precision);
844 updates.push(Data::IndexPriceUpdate(IndexPriceUpdate::new(
845 instrument_id,
846 price,
847 ts_event,
848 ts_init,
849 )));
850 }
851
852 updates
853}
854
855pub fn parse_funding_msg(msg: BitmexFundingMsg, ts_init: UnixNanos) -> Option<FundingRateUpdate> {
861 let instrument_id = InstrumentId::from(format!("{}.BITMEX", msg.symbol).as_str());
862 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
863
864 let rate = match Decimal::from_str(&msg.funding_rate.to_string()) {
866 Ok(rate) => rate,
867 Err(e) => {
868 tracing::error!("Failed to parse funding rate: {e}");
869 return None;
870 }
871 };
872
873 Some(FundingRateUpdate::new(
874 instrument_id,
875 rate,
876 None, ts_event,
878 ts_init,
879 ))
880}
881
882#[must_use]
891pub fn parse_wallet_msg(msg: BitmexWalletMsg, ts_init: UnixNanos) -> AccountState {
892 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
893
894 let currency_str = crate::common::parse::map_bitmex_currency(msg.currency.as_str());
896 let currency = Currency::from(currency_str.as_str());
897
898 let divisor = if msg.currency == "XBt" {
900 100_000_000.0 } else if msg.currency == "USDt" || msg.currency == "LAMp" {
902 1_000_000.0 } else {
904 1.0
905 };
906 let amount = msg.amount.unwrap_or(0) as f64 / divisor;
907
908 let total = Money::new(amount, currency);
909 let locked = Money::new(0.0, currency); let free = total - locked;
911
912 let balance = AccountBalance::new_checked(total, locked, free)
913 .expect("Balance calculation should be valid");
914
915 AccountState::new(
916 account_id,
917 AccountType::Margin,
918 vec![balance],
919 vec![], true, UUID4::new(),
922 ts_init,
923 ts_init,
924 None,
925 )
926}
927
928#[must_use]
932pub fn parse_margin_msg(msg: BitmexMarginMsg, instrument_id: InstrumentId) -> MarginBalance {
933 let currency_str = crate::common::parse::map_bitmex_currency(msg.currency.as_str());
935 let currency = Currency::from(currency_str.as_str());
936
937 let divisor = if msg.currency == "XBt" {
939 100_000_000.0 } else if msg.currency == "USDt" || msg.currency == "LAMp" {
941 1_000_000.0 } else {
943 1.0
944 };
945
946 let initial = (msg.init_margin.unwrap_or(0) as f64 / divisor).max(0.0);
947 let maintenance = (msg.maint_margin.unwrap_or(0) as f64 / divisor).max(0.0);
948 let _unrealized = msg.unrealised_pnl.unwrap_or(0) as f64 / divisor;
949
950 MarginBalance::new(
951 Money::new(initial, currency),
952 Money::new(maintenance, currency),
953 instrument_id,
954 )
955}
956
957#[cfg(test)]
962mod tests {
963 use chrono::{DateTime, Utc};
964 use nautilus_model::{
965 enums::{AggressorSide, BookAction, LiquiditySide, PositionSide},
966 identifiers::Symbol,
967 instruments::crypto_perpetual::CryptoPerpetual,
968 };
969 use rstest::rstest;
970 use ustr::Ustr;
971
972 use super::*;
973 use crate::common::{
974 enums::{BitmexExecType, BitmexOrderStatus},
975 testing::load_test_json,
976 };
977
978 fn create_test_perpetual_instrument_with_precisions(
980 price_precision: u8,
981 size_precision: u8,
982 ) -> InstrumentAny {
983 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
984 InstrumentId::from("XBTUSD.BITMEX"),
985 Symbol::new("XBTUSD"),
986 Currency::BTC(),
987 Currency::USD(),
988 Currency::BTC(),
989 true, price_precision,
991 size_precision,
992 Price::new(0.5, price_precision),
993 Quantity::new(1.0, size_precision),
994 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
1007 UnixNanos::default(),
1008 ))
1009 }
1010
1011 fn create_test_perpetual_instrument() -> InstrumentAny {
1012 create_test_perpetual_instrument_with_precisions(1, 0)
1013 }
1014
1015 #[rstest]
1016 fn test_orderbook_l2_message() {
1017 let json_data = load_test_json("ws_orderbook_l2.json");
1018
1019 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1020 let msg: BitmexOrderBookMsg = serde_json::from_str(&json_data).unwrap();
1021
1022 let instrument = create_test_perpetual_instrument();
1024 let delta = parse_book_msg(
1025 &msg,
1026 &BitmexAction::Insert,
1027 &instrument,
1028 instrument.id(),
1029 instrument.price_precision(),
1030 UnixNanos::from(3),
1031 );
1032 assert_eq!(delta.instrument_id, instrument_id);
1033 assert_eq!(delta.order.price, Price::from("98459.9"));
1034 assert_eq!(delta.order.size, Quantity::from(33000));
1035 assert_eq!(delta.order.side, OrderSide::Sell);
1036 assert_eq!(delta.order.order_id, 62400580205);
1037 assert_eq!(delta.action, BookAction::Add);
1038 assert_eq!(delta.flags, RecordFlag::F_SNAPSHOT as u8);
1039 assert_eq!(delta.sequence, 0);
1040 assert_eq!(delta.ts_event, 1732436782356000000); assert_eq!(delta.ts_init, 3);
1042
1043 let delta = parse_book_msg(
1045 &msg,
1046 &BitmexAction::Update,
1047 &instrument,
1048 instrument.id(),
1049 instrument.price_precision(),
1050 UnixNanos::from(3),
1051 );
1052 assert_eq!(delta.flags, 0);
1053 assert_eq!(delta.action, BookAction::Update);
1054 }
1055
1056 #[rstest]
1057 fn test_orderbook10_message() {
1058 let json_data = load_test_json("ws_orderbook_10.json");
1059 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1060 let msg: BitmexOrderBook10Msg = serde_json::from_str(&json_data).unwrap();
1061 let instrument = create_test_perpetual_instrument();
1062 let depth10 = parse_book10_msg(
1063 &msg,
1064 &instrument,
1065 instrument.id(),
1066 instrument.price_precision(),
1067 UnixNanos::from(3),
1068 );
1069
1070 assert_eq!(depth10.instrument_id, instrument_id);
1071
1072 assert_eq!(depth10.bids[0].price, Price::from("98490.3"));
1074 assert_eq!(depth10.bids[0].size, Quantity::from(22400));
1075 assert_eq!(depth10.bids[0].side, OrderSide::Buy);
1076
1077 assert_eq!(depth10.asks[0].price, Price::from("98490.4"));
1079 assert_eq!(depth10.asks[0].size, Quantity::from(17600));
1080 assert_eq!(depth10.asks[0].side, OrderSide::Sell);
1081
1082 assert_eq!(depth10.bid_counts, [1; DEPTH10_LEN]);
1084 assert_eq!(depth10.ask_counts, [1; DEPTH10_LEN]);
1085
1086 assert_eq!(depth10.sequence, 0);
1088 assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
1089 assert_eq!(depth10.ts_event, 1732436353513000000); assert_eq!(depth10.ts_init, 3);
1091 }
1092
1093 #[rstest]
1094 fn test_quote_message() {
1095 let json_data = load_test_json("ws_quote.json");
1096
1097 let instrument_id = InstrumentId::from("BCHUSDT.BITMEX");
1098 let last_quote = QuoteTick::new(
1099 instrument_id,
1100 Price::new(487.50, 2),
1101 Price::new(488.20, 2),
1102 Quantity::from(100_000),
1103 Quantity::from(100_000),
1104 UnixNanos::from(1),
1105 UnixNanos::from(2),
1106 );
1107 let msg: BitmexQuoteMsg = serde_json::from_str(&json_data).unwrap();
1108 let instrument = create_test_perpetual_instrument_with_precisions(2, 0);
1109 let quote = parse_quote_msg(
1110 &msg,
1111 &last_quote,
1112 &instrument,
1113 instrument_id,
1114 instrument.price_precision(),
1115 UnixNanos::from(3),
1116 );
1117
1118 assert_eq!(quote.instrument_id, instrument_id);
1119 assert_eq!(quote.bid_price, Price::from("487.55"));
1120 assert_eq!(quote.ask_price, Price::from("488.25"));
1121 assert_eq!(quote.bid_size, Quantity::from(103_000));
1122 assert_eq!(quote.ask_size, Quantity::from(50_000));
1123 assert_eq!(quote.ts_event, 1732315465085000000);
1124 assert_eq!(quote.ts_init, 3);
1125 }
1126
1127 #[rstest]
1128 fn test_trade_message() {
1129 let json_data = load_test_json("ws_trade.json");
1130
1131 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1132 let msg: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
1133 let instrument = create_test_perpetual_instrument();
1134 let trade = parse_trade_msg(
1135 &msg,
1136 &instrument,
1137 instrument.id(),
1138 instrument.price_precision(),
1139 UnixNanos::from(3),
1140 );
1141
1142 assert_eq!(trade.instrument_id, instrument_id);
1143 assert_eq!(trade.price, Price::from("98570.9"));
1144 assert_eq!(trade.size, Quantity::from(100));
1145 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1146 assert_eq!(
1147 trade.trade_id.to_string(),
1148 "00000000-006d-1000-0000-000e8737d536"
1149 );
1150 assert_eq!(trade.ts_event, 1732436138704000000); assert_eq!(trade.ts_init, 3);
1152 }
1153
1154 #[rstest]
1155 fn test_trade_bin_message() {
1156 let json_data = load_test_json("ws_trade_bin_1m.json");
1157
1158 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1159 let topic = BitmexWsTopic::TradeBin1m;
1160
1161 let msg: BitmexTradeBinMsg = serde_json::from_str(&json_data).unwrap();
1162 let instrument = create_test_perpetual_instrument();
1163 let bar = parse_trade_bin_msg(
1164 &msg,
1165 &topic,
1166 &instrument,
1167 instrument.id(),
1168 instrument.price_precision(),
1169 UnixNanos::from(3),
1170 );
1171
1172 assert_eq!(bar.instrument_id(), instrument_id);
1173 assert_eq!(
1174 bar.bar_type.spec(),
1175 BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
1176 );
1177 assert_eq!(bar.open, Price::from("97550.0"));
1178 assert_eq!(bar.high, Price::from("97584.4"));
1179 assert_eq!(bar.low, Price::from("97550.0"));
1180 assert_eq!(bar.close, Price::from("97570.1"));
1181 assert_eq!(bar.volume, Quantity::from(84_000));
1182 assert_eq!(bar.ts_event, 1732392420000000000); assert_eq!(bar.ts_init, 3);
1184 }
1185
1186 #[rstest]
1187 fn test_trade_bin_message_extreme_adjustment() {
1188 let topic = BitmexWsTopic::TradeBin1m;
1189 let instrument = create_test_perpetual_instrument();
1190
1191 let msg = BitmexTradeBinMsg {
1192 timestamp: DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
1193 .unwrap()
1194 .with_timezone(&Utc),
1195 symbol: Ustr::from("XBTUSD"),
1196 open: 50_000.0,
1197 high: 49_990.0,
1198 low: 50_010.0,
1199 close: 50_005.0,
1200 trades: 10,
1201 volume: 1_000,
1202 vwap: 0.0,
1203 last_size: 0,
1204 turnover: 0,
1205 home_notional: 0.0,
1206 foreign_notional: 0.0,
1207 };
1208
1209 let bar = parse_trade_bin_msg(
1210 &msg,
1211 &topic,
1212 &instrument,
1213 instrument.id(),
1214 instrument.price_precision(),
1215 UnixNanos::from(3),
1216 );
1217
1218 assert_eq!(bar.high, Price::from("50010.0"));
1219 assert_eq!(bar.low, Price::from("49990.0"));
1220 assert_eq!(bar.open, Price::from("50000.0"));
1221 assert_eq!(bar.close, Price::from("50005.0"));
1222 assert_eq!(bar.volume, Quantity::from(1_000));
1223 }
1224
1225 #[rstest]
1226 fn test_parse_order_msg() {
1227 let json_data = load_test_json("ws_order.json");
1228 let msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1229 let cache = dashmap::DashMap::new();
1230 let instrument = create_test_perpetual_instrument();
1231 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1232
1233 assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1234 assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1235 assert_eq!(
1236 report.venue_order_id.to_string(),
1237 "550e8400-e29b-41d4-a716-446655440001"
1238 );
1239 assert_eq!(
1240 report.client_order_id.unwrap().to_string(),
1241 "mm_bitmex_1a/oemUeQ4CAJZgP3fjHsA"
1242 );
1243 assert_eq!(report.order_side, OrderSide::Buy);
1244 assert_eq!(report.order_type, OrderType::Limit);
1245 assert_eq!(report.time_in_force, TimeInForce::Gtc);
1246 assert_eq!(report.order_status, OrderStatus::Accepted);
1247 assert_eq!(report.quantity, Quantity::from(100));
1248 assert_eq!(report.filled_qty, Quantity::from(0));
1249 assert_eq!(report.price.unwrap(), Price::from("98000.0"));
1250 assert_eq!(report.ts_accepted, 1732530600000000000); }
1252
1253 #[rstest]
1254 fn test_parse_order_msg_infers_type_when_missing() {
1255 let json_data = load_test_json("ws_order.json");
1256 let mut msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1257 msg.ord_type = None;
1258 msg.cl_ord_id = None;
1259 msg.price = Some(98_000.0);
1260 msg.stop_px = None;
1261
1262 let cache = dashmap::DashMap::new();
1263 let instrument = create_test_perpetual_instrument();
1264
1265 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1266
1267 assert_eq!(report.order_type, OrderType::Limit);
1268 }
1269
1270 #[rstest]
1271 fn test_parse_order_msg_rejected_with_reason() {
1272 let mut msg: BitmexOrderMsg =
1273 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1274 msg.ord_status = BitmexOrderStatus::Rejected;
1275 msg.ord_rej_reason = Some(Ustr::from("Insufficient available balance"));
1276 msg.text = None;
1277 msg.cum_qty = 0;
1278
1279 let cache = dashmap::DashMap::new();
1280 let instrument = create_test_perpetual_instrument();
1281 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1282
1283 assert_eq!(report.order_status, OrderStatus::Rejected);
1284 assert_eq!(
1285 report.cancel_reason,
1286 Some("Insufficient available balance".to_string())
1287 );
1288 }
1289
1290 #[rstest]
1291 fn test_parse_order_msg_rejected_with_text_fallback() {
1292 let mut msg: BitmexOrderMsg =
1293 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1294 msg.ord_status = BitmexOrderStatus::Rejected;
1295 msg.ord_rej_reason = None;
1296 msg.text = Some(Ustr::from("Order would execute immediately"));
1297 msg.cum_qty = 0;
1298
1299 let cache = dashmap::DashMap::new();
1300 let instrument = create_test_perpetual_instrument();
1301 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1302
1303 assert_eq!(report.order_status, OrderStatus::Rejected);
1304 assert_eq!(
1305 report.cancel_reason,
1306 Some("Order would execute immediately".to_string())
1307 );
1308 }
1309
1310 #[rstest]
1311 fn test_parse_order_msg_rejected_without_reason() {
1312 let mut msg: BitmexOrderMsg =
1313 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1314 msg.ord_status = BitmexOrderStatus::Rejected;
1315 msg.ord_rej_reason = None;
1316 msg.text = None;
1317 msg.cum_qty = 0;
1318
1319 let cache = dashmap::DashMap::new();
1320 let instrument = create_test_perpetual_instrument();
1321 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1322
1323 assert_eq!(report.order_status, OrderStatus::Rejected);
1324 assert_eq!(report.cancel_reason, None);
1325 }
1326
1327 #[rstest]
1328 fn test_parse_execution_msg() {
1329 let json_data = load_test_json("ws_execution.json");
1330 let msg: BitmexExecutionMsg = serde_json::from_str(&json_data).unwrap();
1331 let instrument = create_test_perpetual_instrument();
1332 let fill = parse_execution_msg(msg, &instrument).unwrap();
1333
1334 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1335 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1336 assert_eq!(
1337 fill.venue_order_id.to_string(),
1338 "550e8400-e29b-41d4-a716-446655440002"
1339 );
1340 assert_eq!(
1341 fill.trade_id.to_string(),
1342 "00000000-006d-1000-0000-000e8737d540"
1343 );
1344 assert_eq!(
1345 fill.client_order_id.unwrap().to_string(),
1346 "mm_bitmex_2b/oemUeQ4CAJZgP3fjHsB"
1347 );
1348 assert_eq!(fill.order_side, OrderSide::Sell);
1349 assert_eq!(fill.last_qty, Quantity::from(100));
1350 assert_eq!(fill.last_px, Price::from("98950.0"));
1351 assert_eq!(fill.liquidity_side, LiquiditySide::Maker);
1352 assert_eq!(fill.commission, Money::new(0.00075, Currency::from("XBT")));
1353 assert_eq!(fill.commission.currency.code.to_string(), "XBT");
1354 assert_eq!(fill.ts_event, 1732530900789000000); }
1356
1357 #[rstest]
1358 fn test_parse_execution_msg_non_trade() {
1359 let mut msg: BitmexExecutionMsg =
1361 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1362 msg.exec_type = Some(BitmexExecType::Settlement);
1363
1364 let instrument = create_test_perpetual_instrument();
1365 let result = parse_execution_msg(msg, &instrument);
1366 assert!(result.is_none());
1367 }
1368
1369 #[rstest]
1370 fn test_parse_cancel_reject_execution() {
1371 let json = load_test_json("ws_execution_cancel_reject.json");
1373
1374 let msg: BitmexExecutionMsg = serde_json::from_str(&json).unwrap();
1375 assert_eq!(msg.exec_type, Some(BitmexExecType::CancelReject));
1376 assert_eq!(msg.ord_status, Some(BitmexOrderStatus::Rejected));
1377 assert_eq!(msg.symbol, None);
1378
1379 let instrument = create_test_perpetual_instrument();
1381 let result = parse_execution_msg(msg, &instrument);
1382 assert!(result.is_none());
1383 }
1384
1385 #[rstest]
1386 fn test_parse_position_msg() {
1387 let json_data = load_test_json("ws_position.json");
1388 let msg: BitmexPositionMsg = serde_json::from_str(&json_data).unwrap();
1389 let instrument = create_test_perpetual_instrument();
1390 let report = parse_position_msg(msg, &instrument);
1391
1392 assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1393 assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1394 assert_eq!(report.position_side.as_position_side(), PositionSide::Long);
1395 assert_eq!(report.quantity, Quantity::from(1000));
1396 assert!(report.venue_position_id.is_none());
1397 assert_eq!(report.ts_last, 1732530900789000000); }
1399
1400 #[rstest]
1401 fn test_parse_position_msg_short() {
1402 let mut msg: BitmexPositionMsg =
1403 serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1404 msg.current_qty = Some(-500);
1405
1406 let instrument = create_test_perpetual_instrument();
1407 let report = parse_position_msg(msg, &instrument);
1408 assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
1409 assert_eq!(report.quantity, Quantity::from(500));
1410 }
1411
1412 #[rstest]
1413 fn test_parse_position_msg_flat() {
1414 let mut msg: BitmexPositionMsg =
1415 serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1416 msg.current_qty = Some(0);
1417
1418 let instrument = create_test_perpetual_instrument();
1419 let report = parse_position_msg(msg, &instrument);
1420 assert_eq!(report.position_side.as_position_side(), PositionSide::Flat);
1421 assert_eq!(report.quantity, Quantity::from(0));
1422 }
1423
1424 #[rstest]
1425 fn test_parse_wallet_msg() {
1426 let json_data = load_test_json("ws_wallet.json");
1427 let msg: BitmexWalletMsg = serde_json::from_str(&json_data).unwrap();
1428 let ts_init = UnixNanos::from(1);
1429 let account_state = parse_wallet_msg(msg, ts_init);
1430
1431 assert_eq!(account_state.account_id.to_string(), "BITMEX-1234567");
1432 assert!(!account_state.balances.is_empty());
1433 let balance = &account_state.balances[0];
1434 assert_eq!(balance.currency.code.to_string(), "XBT");
1435 assert!((balance.total.as_f64() - 1.0000518).abs() < 1e-7);
1437 }
1438
1439 #[rstest]
1440 fn test_parse_wallet_msg_no_amount() {
1441 let mut msg: BitmexWalletMsg =
1442 serde_json::from_str(&load_test_json("ws_wallet.json")).unwrap();
1443 msg.amount = None;
1444
1445 let ts_init = UnixNanos::from(1);
1446 let account_state = parse_wallet_msg(msg, ts_init);
1447 let balance = &account_state.balances[0];
1448 assert_eq!(balance.total.as_f64(), 0.0);
1449 }
1450
1451 #[rstest]
1452 fn test_parse_margin_msg() {
1453 let json_data = load_test_json("ws_margin.json");
1454 let msg: BitmexMarginMsg = serde_json::from_str(&json_data).unwrap();
1455 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1456 let margin_balance = parse_margin_msg(msg, instrument_id);
1457
1458 assert_eq!(margin_balance.currency.code.to_string(), "XBT");
1459 assert_eq!(margin_balance.instrument_id, instrument_id);
1460 assert_eq!(margin_balance.initial.as_f64(), 0.0);
1463 assert!((margin_balance.maintenance.as_f64() - 0.00015949).abs() < 1e-8);
1465 }
1466
1467 #[rstest]
1468 fn test_parse_margin_msg_no_available() {
1469 let mut msg: BitmexMarginMsg =
1470 serde_json::from_str(&load_test_json("ws_margin.json")).unwrap();
1471 msg.available_margin = None;
1472
1473 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1474 let margin_balance = parse_margin_msg(msg, instrument_id);
1475 assert!(margin_balance.initial.as_f64() >= 0.0);
1477 assert!(margin_balance.maintenance.as_f64() >= 0.0);
1478 }
1479
1480 #[rstest]
1481 fn test_parse_instrument_msg_both_prices() {
1482 let json_data = load_test_json("ws_instrument.json");
1483 let msg: BitmexInstrumentMsg = serde_json::from_str(&json_data).unwrap();
1484
1485 let mut instruments_cache = AHashMap::new();
1487 let test_instrument = create_test_perpetual_instrument();
1488 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1489
1490 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1491
1492 assert_eq!(updates.len(), 2);
1494
1495 match &updates[0] {
1497 Data::MarkPriceUpdate(update) => {
1498 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1499 assert_eq!(update.value.as_f64(), 95125.7);
1500 }
1501 _ => panic!("Expected MarkPriceUpdate at index 0"),
1502 }
1503
1504 match &updates[1] {
1506 Data::IndexPriceUpdate(update) => {
1507 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1508 assert_eq!(update.value.as_f64(), 95124.3);
1509 }
1510 _ => panic!("Expected IndexPriceUpdate at index 1"),
1511 }
1512 }
1513
1514 #[rstest]
1515 fn test_parse_instrument_msg_mark_price_only() {
1516 let mut msg: BitmexInstrumentMsg =
1517 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1518 msg.index_price = None;
1519
1520 let mut instruments_cache = AHashMap::new();
1522 let test_instrument = create_test_perpetual_instrument();
1523 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1524
1525 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1526
1527 assert_eq!(updates.len(), 1);
1528 match &updates[0] {
1529 Data::MarkPriceUpdate(update) => {
1530 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1531 assert_eq!(update.value.as_f64(), 95125.7);
1532 }
1533 _ => panic!("Expected MarkPriceUpdate"),
1534 }
1535 }
1536
1537 #[rstest]
1538 fn test_parse_instrument_msg_index_price_only() {
1539 let mut msg: BitmexInstrumentMsg =
1540 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1541 msg.mark_price = None;
1542
1543 let mut instruments_cache = AHashMap::new();
1545 let test_instrument = create_test_perpetual_instrument();
1546 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1547
1548 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1549
1550 assert_eq!(updates.len(), 1);
1551 match &updates[0] {
1552 Data::IndexPriceUpdate(update) => {
1553 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1554 assert_eq!(update.value.as_f64(), 95124.3);
1555 }
1556 _ => panic!("Expected IndexPriceUpdate"),
1557 }
1558 }
1559
1560 #[rstest]
1561 fn test_parse_instrument_msg_no_prices() {
1562 let mut msg: BitmexInstrumentMsg =
1563 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1564 msg.mark_price = None;
1565 msg.index_price = None;
1566 msg.last_price = None;
1567
1568 let mut instruments_cache = AHashMap::new();
1570 let test_instrument = create_test_perpetual_instrument();
1571 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1572
1573 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1574 assert_eq!(updates.len(), 0);
1575 }
1576
1577 #[rstest]
1578 fn test_parse_instrument_msg_index_symbol() {
1579 let mut msg: BitmexInstrumentMsg =
1582 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1583 msg.symbol = Ustr::from(".BXBT");
1584 msg.last_price = Some(119163.05);
1585 msg.mark_price = Some(119163.05); msg.index_price = None;
1587
1588 let instrument_id = InstrumentId::from(".BXBT.BITMEX");
1590 let instrument = CryptoPerpetual::new(
1591 instrument_id,
1592 Symbol::from(".BXBT"),
1593 Currency::BTC(),
1594 Currency::USD(),
1595 Currency::USD(),
1596 false, 2, 8, Price::from("0.01"),
1600 Quantity::from("0.00000001"),
1601 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(), UnixNanos::default(), );
1616 let mut instruments_cache = AHashMap::new();
1617 instruments_cache.insert(
1618 Ustr::from(".BXBT"),
1619 InstrumentAny::CryptoPerpetual(instrument),
1620 );
1621
1622 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1623
1624 assert_eq!(updates.len(), 2);
1625
1626 match &updates[0] {
1628 Data::MarkPriceUpdate(update) => {
1629 assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1630 assert_eq!(update.value, Price::from("119163.05"));
1631 }
1632 _ => panic!("Expected MarkPriceUpdate for index symbol"),
1633 }
1634
1635 match &updates[1] {
1637 Data::IndexPriceUpdate(update) => {
1638 assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1639 assert_eq!(update.value, Price::from("119163.05"));
1640 assert_eq!(update.ts_init, UnixNanos::from(1));
1641 }
1642 _ => panic!("Expected IndexPriceUpdate for index symbol"),
1643 }
1644 }
1645
1646 #[rstest]
1647 fn test_parse_funding_msg() {
1648 let json_data = load_test_json("ws_funding_rate.json");
1649 let msg: BitmexFundingMsg = serde_json::from_str(&json_data).unwrap();
1650 let update = parse_funding_msg(msg, UnixNanos::from(1));
1651
1652 assert!(update.is_some());
1653 let update = update.unwrap();
1654
1655 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1656 assert_eq!(update.rate.to_string(), "0.0001");
1657 assert!(update.next_funding_ns.is_none());
1658 assert_eq!(update.ts_event, UnixNanos::from(1732507200000000000));
1659 assert_eq!(update.ts_init, UnixNanos::from(1));
1660 }
1661}