1use std::{num::NonZero, str::FromStr};
17
18use ahash::AHashMap;
19use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime, uuid::UUID4};
20use nautilus_model::{
21 data::{
22 Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
23 MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
24 depth::DEPTH10_LEN,
25 },
26 enums::{
27 AccountType, AggregationSource, BarAggregation, OrderSide, OrderStatus, OrderType,
28 PriceType, RecordFlag, TimeInForce,
29 },
30 events::{OrderUpdated, account::state::AccountState},
31 identifiers::{
32 AccountId, ClientOrderId, InstrumentId, OrderListId, StrategyId, Symbol, TradeId, TraderId,
33 VenueOrderId,
34 },
35 instruments::{Instrument, InstrumentAny},
36 reports::{FillReport, OrderStatusReport, PositionStatusReport},
37 types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
38};
39use rust_decimal::Decimal;
40use ustr::Ustr;
41use uuid::Uuid;
42
43use super::{
44 enums::{BitmexAction, BitmexWsTopic},
45 messages::{
46 BitmexExecutionMsg, BitmexFundingMsg, BitmexInstrumentMsg, BitmexMarginMsg,
47 BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexOrderMsg, BitmexPositionMsg,
48 BitmexQuoteMsg, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg,
49 },
50};
51use crate::{
52 common::{
53 consts::BITMEX_VENUE,
54 enums::{BitmexExecInstruction, BitmexExecType, BitmexSide},
55 parse::{
56 map_bitmex_currency, parse_contracts_quantity, parse_frac_quantity,
57 parse_instrument_id, parse_liquidity_side, parse_optional_datetime_to_unix_nanos,
58 parse_position_side,
59 },
60 },
61 websocket::messages::BitmexOrderUpdateMsg,
62};
63
64const BAR_SPEC_1_MINUTE: BarSpecification = BarSpecification {
65 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
66 aggregation: BarAggregation::Minute,
67 price_type: PriceType::Last,
68};
69const BAR_SPEC_5_MINUTE: BarSpecification = BarSpecification {
70 step: NonZero::new(5).expect("5 is a valid non-zero usize"),
71 aggregation: BarAggregation::Minute,
72 price_type: PriceType::Last,
73};
74const BAR_SPEC_1_HOUR: BarSpecification = BarSpecification {
75 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
76 aggregation: BarAggregation::Hour,
77 price_type: PriceType::Last,
78};
79const BAR_SPEC_1_DAY: BarSpecification = BarSpecification {
80 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
81 aggregation: BarAggregation::Day,
82 price_type: PriceType::Last,
83};
84
85#[inline]
93#[must_use]
94pub fn is_index_symbol(symbol: &Ustr) -> bool {
95 symbol.starts_with('.')
96}
97
98#[must_use]
99pub fn parse_book_msg_vec(
100 data: Vec<BitmexOrderBookMsg>,
101 action: BitmexAction,
102 price_precision: u8,
103 ts_init: UnixNanos,
104) -> Vec<Data> {
105 let mut deltas = Vec::with_capacity(data.len());
106
107 for msg in data {
108 deltas.push(Data::Delta(parse_book_msg(
109 &msg,
110 &action,
111 price_precision,
112 ts_init,
113 )));
114 }
115 deltas
116}
117
118#[must_use]
119pub fn parse_book10_msg_vec(
120 data: Vec<BitmexOrderBook10Msg>,
121 price_precision: u8,
122 ts_init: UnixNanos,
123) -> Vec<Data> {
124 let mut depths = Vec::with_capacity(data.len());
125
126 for msg in data {
127 depths.push(Data::Depth10(Box::new(parse_book10_msg(
128 &msg,
129 price_precision,
130 ts_init,
131 ))));
132 }
133 depths
134}
135
136#[must_use]
137pub fn parse_trade_msg_vec(
138 data: Vec<BitmexTradeMsg>,
139 price_precision: u8,
140 ts_init: UnixNanos,
141) -> Vec<Data> {
142 let mut trades = Vec::with_capacity(data.len());
143
144 for msg in data {
145 trades.push(Data::Trade(parse_trade_msg(&msg, price_precision, ts_init)));
146 }
147 trades
148}
149
150#[must_use]
151pub fn parse_trade_bin_msg_vec(
152 data: Vec<BitmexTradeBinMsg>,
153 topic: BitmexWsTopic,
154 price_precision: u8,
155 ts_init: UnixNanos,
156) -> Vec<Data> {
157 let mut trades = Vec::with_capacity(data.len());
158
159 for msg in data {
160 trades.push(Data::Bar(parse_trade_bin_msg(
161 &msg,
162 &topic,
163 price_precision,
164 ts_init,
165 )));
166 }
167 trades
168}
169
170#[allow(clippy::too_many_arguments)]
171#[must_use]
172pub fn parse_book_msg(
173 msg: &BitmexOrderBookMsg,
174 action: &BitmexAction,
175 price_precision: u8,
176 ts_init: UnixNanos,
177) -> OrderBookDelta {
178 let flags = if action == &BitmexAction::Insert {
179 RecordFlag::F_SNAPSHOT as u8
180 } else {
181 0
182 };
183
184 let instrument_id = parse_instrument_id(msg.symbol);
185 let action = action.as_book_action();
186 let price = Price::new(msg.price, price_precision);
187 let side = msg.side.as_order_side();
188 let size = parse_contracts_quantity(msg.size.unwrap_or(0));
189 let order_id = msg.id;
190 let order = BookOrder::new(side, price, size, order_id);
191 let sequence = 0; let ts_event = UnixNanos::from(msg.transact_time);
193
194 OrderBookDelta::new(
195 instrument_id,
196 action,
197 order,
198 flags,
199 sequence,
200 ts_event,
201 ts_init,
202 )
203}
204
205#[allow(clippy::too_many_arguments)]
211#[must_use]
212pub fn parse_book10_msg(
213 msg: &BitmexOrderBook10Msg,
214 price_precision: u8,
215 ts_init: UnixNanos,
216) -> OrderBookDepth10 {
217 let instrument_id = parse_instrument_id(msg.symbol);
218
219 let mut bids = Vec::with_capacity(DEPTH10_LEN);
220 let mut asks = Vec::with_capacity(DEPTH10_LEN);
221
222 let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
224 let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
225
226 for (i, level) in msg.bids.iter().enumerate() {
227 let bid_order = BookOrder::new(
228 OrderSide::Buy,
229 Price::new(level[0], price_precision),
230 parse_frac_quantity(level[1], 0),
231 0,
232 );
233
234 bids.push(bid_order);
235 bid_counts[i] = 1;
236 }
237
238 for (i, level) in msg.asks.iter().enumerate() {
239 let ask_order = BookOrder::new(
240 OrderSide::Sell,
241 Price::new(level[0], price_precision),
242 parse_frac_quantity(level[1], 0),
243 0,
244 );
245
246 asks.push(ask_order);
247 ask_counts[i] = 1;
248 }
249
250 let bids: [BookOrder; DEPTH10_LEN] = bids
251 .try_into()
252 .inspect_err(|v: &Vec<BookOrder>| {
253 tracing::error!("Bids length mismatch: expected 10, got {}", v.len());
254 })
255 .expect("BitMEX orderBook10 should always have exactly 10 bid levels");
256 let asks: [BookOrder; DEPTH10_LEN] = asks
257 .try_into()
258 .inspect_err(|v: &Vec<BookOrder>| {
259 tracing::error!("Asks length mismatch: expected 10, got {}", v.len());
260 })
261 .expect("BitMEX orderBook10 should always have exactly 10 ask levels");
262
263 let ts_event = UnixNanos::from(msg.timestamp);
264
265 OrderBookDepth10::new(
266 instrument_id,
267 bids,
268 asks,
269 bid_counts,
270 ask_counts,
271 RecordFlag::F_SNAPSHOT as u8,
272 0, ts_event,
274 ts_init,
275 )
276}
277
278#[must_use]
279pub fn parse_quote_msg(
280 msg: &BitmexQuoteMsg,
281 last_quote: &QuoteTick,
282 price_precision: u8,
283 ts_init: UnixNanos,
284) -> QuoteTick {
285 let instrument_id = parse_instrument_id(msg.symbol);
286
287 let bid_price = match msg.bid_price {
288 Some(price) => Price::new(price, price_precision),
289 None => last_quote.bid_price,
290 };
291
292 let ask_price = match msg.ask_price {
293 Some(price) => Price::new(price, price_precision),
294 None => last_quote.ask_price,
295 };
296
297 let bid_size = match msg.bid_size {
298 Some(size) => parse_contracts_quantity(size),
299 None => last_quote.bid_size,
300 };
301
302 let ask_size = match msg.ask_size {
303 Some(size) => parse_contracts_quantity(size),
304 None => last_quote.ask_size,
305 };
306
307 let ts_event = UnixNanos::from(msg.timestamp);
308
309 QuoteTick::new(
310 instrument_id,
311 bid_price,
312 ask_price,
313 bid_size,
314 ask_size,
315 ts_event,
316 ts_init,
317 )
318}
319
320#[must_use]
321pub fn parse_trade_msg(msg: &BitmexTradeMsg, price_precision: u8, ts_init: UnixNanos) -> TradeTick {
322 let instrument_id = parse_instrument_id(msg.symbol);
323 let price = Price::new(msg.price, price_precision);
324 let size = parse_contracts_quantity(msg.size);
325 let aggressor_side = msg.side.as_aggressor_side();
326 let trade_id = TradeId::new(
327 msg.trd_match_id
328 .map_or_else(|| Uuid::new_v4().to_string(), |uuid| uuid.to_string()),
329 );
330 let ts_event = UnixNanos::from(msg.timestamp);
331
332 TradeTick::new(
333 instrument_id,
334 price,
335 size,
336 aggressor_side,
337 trade_id,
338 ts_event,
339 ts_init,
340 )
341}
342
343#[must_use]
344pub fn parse_trade_bin_msg(
345 msg: &BitmexTradeBinMsg,
346 topic: &BitmexWsTopic,
347 price_precision: u8,
348 ts_init: UnixNanos,
349) -> Bar {
350 let instrument_id = parse_instrument_id(msg.symbol);
351 let spec = bar_spec_from_topic(topic);
352 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
353
354 let open = Price::new(msg.open, price_precision);
355 let high = Price::new(msg.high, price_precision);
356 let low = Price::new(msg.low, price_precision);
357 let close = Price::new(msg.close, price_precision);
358 let volume = parse_contracts_quantity(msg.volume as u64);
359 let ts_event = UnixNanos::from(msg.timestamp);
360
361 Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
362}
363
364#[must_use]
370pub fn bar_spec_from_topic(topic: &BitmexWsTopic) -> BarSpecification {
371 match topic {
372 BitmexWsTopic::TradeBin1m => BAR_SPEC_1_MINUTE,
373 BitmexWsTopic::TradeBin5m => BAR_SPEC_5_MINUTE,
374 BitmexWsTopic::TradeBin1h => BAR_SPEC_1_HOUR,
375 BitmexWsTopic::TradeBin1d => BAR_SPEC_1_DAY,
376 _ => {
377 tracing::error!(topic = ?topic, "Bar specification not supported");
378 BAR_SPEC_1_MINUTE
379 }
380 }
381}
382
383#[must_use]
389pub fn topic_from_bar_spec(spec: BarSpecification) -> BitmexWsTopic {
390 match spec {
391 BAR_SPEC_1_MINUTE => BitmexWsTopic::TradeBin1m,
392 BAR_SPEC_5_MINUTE => BitmexWsTopic::TradeBin5m,
393 BAR_SPEC_1_HOUR => BitmexWsTopic::TradeBin1h,
394 BAR_SPEC_1_DAY => BitmexWsTopic::TradeBin1d,
395 _ => {
396 tracing::error!(spec = ?spec, "Bar specification not supported");
397 BitmexWsTopic::TradeBin1m
398 }
399 }
400}
401
402pub fn parse_order_msg(
416 msg: &BitmexOrderMsg,
417 price_precision: u8,
418) -> anyhow::Result<OrderStatusReport> {
419 let account_id = AccountId::new(format!("BITMEX-{}", msg.account)); let instrument_id = parse_instrument_id(msg.symbol);
421 let venue_order_id = VenueOrderId::new(msg.order_id.to_string());
422 let common_side: BitmexSide = msg.side.into();
423 let order_side: OrderSide = common_side.into();
424 let order_type: OrderType = msg.ord_type.into();
425 let time_in_force: TimeInForce = msg
426 .time_in_force
427 .try_into()
428 .map_err(|e| anyhow::anyhow!("{e}"))?;
429 let order_status: OrderStatus = msg.ord_status.into();
430 let quantity = Quantity::from(msg.order_qty);
431 let filled_qty = Quantity::from(msg.cum_qty);
432 let report_id = UUID4::new();
433 let ts_accepted =
434 parse_optional_datetime_to_unix_nanos(&Some(msg.transact_time), "transact_time");
435 let ts_last = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
436 let ts_init = get_atomic_clock_realtime().get_time_ns();
437
438 let mut report = OrderStatusReport::new(
439 account_id,
440 instrument_id,
441 None, venue_order_id,
443 order_side,
444 order_type,
445 time_in_force,
446 order_status,
447 quantity,
448 filled_qty,
449 ts_accepted,
450 ts_last,
451 ts_init,
452 Some(report_id),
453 );
454
455 if let Some(cl_ord_id) = &msg.cl_ord_id {
456 report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
457 }
458
459 if let Some(cl_ord_link_id) = &msg.cl_ord_link_id {
460 report = report.with_order_list_id(OrderListId::new(cl_ord_link_id));
461 }
462
463 if let Some(price) = msg.price {
464 report = report.with_price(Price::new(price, price_precision));
465 }
466
467 if let Some(avg_px) = msg.avg_px {
468 report = report.with_avg_px(avg_px);
469 }
470
471 if let Some(trigger_price) = msg.stop_px {
472 report = report.with_trigger_price(Price::new(trigger_price, price_precision));
473 }
474
475 if let Some(exec_inst) = &msg.exec_inst {
476 match exec_inst {
477 BitmexExecInstruction::ParticipateDoNotInitiate => {
478 report = report.with_post_only(true);
479 }
480 BitmexExecInstruction::ReduceOnly => {
481 report = report.with_reduce_only(true);
482 }
483 _ => {}
484 }
485 }
486
487 if order_status == OrderStatus::Canceled
490 && let Some(reason_str) = msg.ord_rej_reason.or(msg.text)
491 {
492 report = report.with_cancel_reason(reason_str.to_string());
493 }
494
495 Ok(report)
496}
497
498pub fn parse_order_update_msg(
502 msg: &BitmexOrderUpdateMsg,
503 price_precision: u8,
504 account_id: AccountId,
505) -> Option<OrderUpdated> {
506 let trader_id = TraderId::default();
509 let strategy_id = StrategyId::default();
510 let instrument_id = parse_instrument_id(msg.symbol);
511 let venue_order_id = Some(VenueOrderId::new(msg.order_id.to_string()));
512 let client_order_id = msg.cl_ord_id.map(ClientOrderId::new).unwrap_or_default();
513 let quantity = Quantity::zero(0); let price = msg.price.map(|p| Price::new(p, price_precision));
515
516 let trigger_price = None;
518
519 let event_id = UUID4::new();
520 let ts_event = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
521 let ts_init = get_atomic_clock_realtime().get_time_ns();
522
523 Some(nautilus_model::events::OrderUpdated::new(
524 trader_id,
525 strategy_id,
526 instrument_id,
527 client_order_id,
528 quantity,
529 event_id,
530 ts_event,
531 ts_init,
532 false, venue_order_id,
534 Some(account_id),
535 price,
536 trigger_price,
537 ))
538}
539
540pub fn parse_execution_msg(msg: BitmexExecutionMsg, price_precision: u8) -> Option<FillReport> {
551 if msg.exec_type != Some(BitmexExecType::Trade) {
552 return None;
553 }
554
555 let account_id = AccountId::new(format!("BITMEX-{}", msg.account?));
556 let instrument_id = parse_instrument_id(msg.symbol?);
557 let venue_order_id = VenueOrderId::new(msg.order_id?.to_string());
558 let trade_id = TradeId::new(msg.trd_match_id?.to_string());
559 let order_side: OrderSide = msg
560 .side
561 .map(|s| {
562 let side: BitmexSide = s.into();
563 side.into()
564 })
565 .unwrap_or(OrderSide::NoOrderSide);
566 let last_qty = Quantity::from(msg.last_qty?);
567 let last_px = Price::new(msg.last_px?, price_precision);
568 let settlement_currency_str = msg.settl_currency.unwrap_or(Ustr::from("XBT"));
569 let mapped_currency = map_bitmex_currency(settlement_currency_str.as_str());
570 let commission = Money::new(
571 msg.commission.unwrap_or(0.0),
572 Currency::from(mapped_currency.as_str()),
573 );
574 let liquidity_side = parse_liquidity_side(&msg.last_liquidity_ind);
575 let client_order_id = msg.cl_ord_id.map(ClientOrderId::new);
576 let venue_position_id = None; let ts_event = parse_optional_datetime_to_unix_nanos(&msg.transact_time, "transact_time");
578 let ts_init = get_atomic_clock_realtime().get_time_ns();
579
580 Some(FillReport::new(
581 account_id,
582 instrument_id,
583 venue_order_id,
584 trade_id,
585 order_side,
586 last_qty,
587 last_px,
588 commission,
589 liquidity_side,
590 client_order_id,
591 venue_position_id,
592 ts_event,
593 ts_init,
594 None,
595 ))
596}
597
598#[must_use]
604pub fn parse_position_msg(msg: BitmexPositionMsg) -> PositionStatusReport {
605 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
606 let instrument_id = parse_instrument_id(msg.symbol);
607 let position_side = parse_position_side(msg.current_qty).as_specified();
608 let quantity = Quantity::from(msg.current_qty.map_or(0, i64::abs));
609 let venue_position_id = None; let ts_last = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
611 let ts_init = get_atomic_clock_realtime().get_time_ns();
612
613 PositionStatusReport::new(
614 account_id,
615 instrument_id,
616 position_side,
617 quantity,
618 venue_position_id,
619 ts_last,
620 ts_init,
621 None,
622 )
623}
624
625#[must_use]
638pub fn parse_instrument_msg(
639 msg: BitmexInstrumentMsg,
640 instruments_cache: &AHashMap<Ustr, InstrumentAny>,
641 ts_init: UnixNanos,
642) -> Vec<Data> {
643 let mut updates = Vec::new();
644 let is_index = is_index_symbol(&msg.symbol);
645
646 let effective_index_price = if is_index {
649 msg.last_price
650 } else {
651 msg.index_price
652 };
653
654 if msg.mark_price.is_none() && effective_index_price.is_none() {
658 return updates;
659 }
660
661 let instrument_id = InstrumentId::new(Symbol::from_ustr_unchecked(msg.symbol), *BITMEX_VENUE);
662 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
663
664 let price_precision = match instruments_cache.get(&Ustr::from(&msg.symbol)) {
666 Some(instrument) => instrument.price_precision(),
667 None => {
668 if is_index {
672 tracing::trace!(
673 "Index instrument {} not in cache, skipping update",
674 msg.symbol
675 );
676 } else {
677 tracing::debug!("Instrument {} not in cache, skipping update", msg.symbol);
678 }
679 return updates;
680 }
681 };
682
683 if let Some(mark_price) = msg.mark_price {
686 let price = Price::new(mark_price, price_precision);
687 updates.push(Data::MarkPriceUpdate(MarkPriceUpdate::new(
688 instrument_id,
689 price,
690 ts_event,
691 ts_init,
692 )));
693 }
694
695 if let Some(index_price) = effective_index_price {
697 let price = Price::new(index_price, price_precision);
698 updates.push(Data::IndexPriceUpdate(IndexPriceUpdate::new(
699 instrument_id,
700 price,
701 ts_event,
702 ts_init,
703 )));
704 }
705
706 updates
707}
708
709pub fn parse_funding_msg(msg: BitmexFundingMsg, ts_init: UnixNanos) -> Option<FundingRateUpdate> {
715 let instrument_id = InstrumentId::from(format!("{}.BITMEX", msg.symbol).as_str());
716 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
717
718 let rate = match Decimal::from_str(&msg.funding_rate.to_string()) {
720 Ok(rate) => rate,
721 Err(e) => {
722 tracing::error!("Failed to parse funding rate: {e}");
723 return None;
724 }
725 };
726
727 Some(FundingRateUpdate::new(
728 instrument_id,
729 rate,
730 None, ts_event,
732 ts_init,
733 ))
734}
735
736#[must_use]
745pub fn parse_wallet_msg(msg: BitmexWalletMsg, ts_init: UnixNanos) -> AccountState {
746 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
747
748 let currency_str = crate::common::parse::map_bitmex_currency(msg.currency.as_str());
750 let currency = Currency::from(currency_str.as_str());
751
752 let divisor = if msg.currency == "XBt" {
754 100_000_000.0 } else if msg.currency == "USDt" || msg.currency == "LAMp" {
756 1_000_000.0 } else {
758 1.0
759 };
760 let amount = msg.amount.unwrap_or(0) as f64 / divisor;
761
762 let total = Money::new(amount, currency);
763 let locked = Money::new(0.0, currency); let free = total - locked;
765
766 let balance = AccountBalance::new_checked(total, locked, free)
767 .expect("Balance calculation should be valid");
768
769 AccountState::new(
770 account_id,
771 AccountType::Margin,
772 vec![balance],
773 vec![], true, UUID4::new(),
776 ts_init,
777 ts_init,
778 None,
779 )
780}
781
782#[must_use]
786pub fn parse_margin_msg(msg: BitmexMarginMsg, instrument_id: InstrumentId) -> MarginBalance {
787 let currency_str = crate::common::parse::map_bitmex_currency(msg.currency.as_str());
789 let currency = Currency::from(currency_str.as_str());
790
791 let divisor = if msg.currency == "XBt" {
793 100_000_000.0 } else if msg.currency == "USDt" || msg.currency == "LAMp" {
795 1_000_000.0 } else {
797 1.0
798 };
799
800 let initial = (msg.init_margin.unwrap_or(0) as f64 / divisor).max(0.0);
801 let maintenance = (msg.maint_margin.unwrap_or(0) as f64 / divisor).max(0.0);
802 let _unrealized = msg.unrealised_pnl.unwrap_or(0) as f64 / divisor;
803
804 MarginBalance::new(
805 Money::new(initial, currency),
806 Money::new(maintenance, currency),
807 instrument_id,
808 )
809}
810
811#[cfg(test)]
816mod tests {
817 use nautilus_model::{
818 data::quote::QuoteTick,
819 enums::{
820 AggressorSide, BookAction, LiquiditySide, OrderStatus, OrderType, PositionSide,
821 TimeInForce,
822 },
823 identifiers::{InstrumentId, Symbol},
824 instruments::{CryptoPerpetual, any::InstrumentAny},
825 types::{Currency, Price, Quantity},
826 };
827 use rstest::rstest;
828
829 use super::*;
830 use crate::common::{
831 enums::{BitmexExecType, BitmexOrderStatus},
832 testing::load_test_json,
833 };
834
835 fn create_test_perpetual_instrument() -> InstrumentAny {
837 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
838 InstrumentId::from("XBTUSD.BITMEX"),
839 Symbol::new("XBTUSD"),
840 Currency::BTC(),
841 Currency::USD(),
842 Currency::BTC(),
843 true, 1, 0, Price::from("0.5"),
847 Quantity::from(1),
848 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
861 UnixNanos::default(),
862 ))
863 }
864
865 #[rstest]
866 fn test_orderbook_l2_message() {
867 let json_data = load_test_json("ws_orderbook_l2.json");
868
869 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
870 let msg: BitmexOrderBookMsg = serde_json::from_str(&json_data).unwrap();
871
872 let delta = parse_book_msg(&msg, &BitmexAction::Insert, 1, UnixNanos::from(3));
874 assert_eq!(delta.instrument_id, instrument_id);
875 assert_eq!(delta.order.price, Price::from("98459.9"));
876 assert_eq!(delta.order.size, Quantity::from(33000));
877 assert_eq!(delta.order.side, OrderSide::Sell);
878 assert_eq!(delta.order.order_id, 62400580205);
879 assert_eq!(delta.action, BookAction::Add);
880 assert_eq!(delta.flags, RecordFlag::F_SNAPSHOT as u8);
881 assert_eq!(delta.sequence, 0);
882 assert_eq!(delta.ts_event, 1732436782275000000); assert_eq!(delta.ts_init, 3);
884
885 let delta = parse_book_msg(&msg, &BitmexAction::Update, 1, UnixNanos::from(3));
887 assert_eq!(delta.flags, 0);
888 assert_eq!(delta.action, BookAction::Update);
889 }
890
891 #[rstest]
892 fn test_orderbook10_message() {
893 let json_data = load_test_json("ws_orderbook_10.json");
894 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
895 let msg: BitmexOrderBook10Msg = serde_json::from_str(&json_data).unwrap();
896 let depth10 = parse_book10_msg(&msg, 1, UnixNanos::from(3));
897
898 assert_eq!(depth10.instrument_id, instrument_id);
899
900 assert_eq!(depth10.bids[0].price, Price::from("98490.3"));
902 assert_eq!(depth10.bids[0].size, Quantity::from(22400));
903 assert_eq!(depth10.bids[0].side, OrderSide::Buy);
904
905 assert_eq!(depth10.asks[0].price, Price::from("98490.4"));
907 assert_eq!(depth10.asks[0].size, Quantity::from(17600));
908 assert_eq!(depth10.asks[0].side, OrderSide::Sell);
909
910 assert_eq!(depth10.bid_counts, [1; DEPTH10_LEN]);
912 assert_eq!(depth10.ask_counts, [1; DEPTH10_LEN]);
913
914 assert_eq!(depth10.sequence, 0);
916 assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
917 assert_eq!(depth10.ts_event, 1732436353513000000); assert_eq!(depth10.ts_init, 3);
919 }
920
921 #[rstest]
922 fn test_quote_message() {
923 let json_data = load_test_json("ws_quote.json");
924
925 let instrument_id = InstrumentId::from("BCHUSDT.BITMEX");
926 let last_quote = QuoteTick::new(
927 instrument_id,
928 Price::new(487.50, 2),
929 Price::new(488.20, 2),
930 Quantity::from(100_000),
931 Quantity::from(100_000),
932 UnixNanos::from(1),
933 UnixNanos::from(2),
934 );
935 let msg: BitmexQuoteMsg = serde_json::from_str(&json_data).unwrap();
936 let quote = parse_quote_msg(&msg, &last_quote, 2, UnixNanos::from(3));
937
938 assert_eq!(quote.instrument_id, instrument_id);
939 assert_eq!(quote.bid_price, Price::from("487.55"));
940 assert_eq!(quote.ask_price, Price::from("488.25"));
941 assert_eq!(quote.bid_size, Quantity::from(103_000));
942 assert_eq!(quote.ask_size, Quantity::from(50_000));
943 assert_eq!(quote.ts_event, 1732315465085000000);
944 assert_eq!(quote.ts_init, 3);
945 }
946
947 #[rstest]
948 fn test_trade_message() {
949 let json_data = load_test_json("ws_trade.json");
950
951 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
952 let msg: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
953 let trade = parse_trade_msg(&msg, 1, UnixNanos::from(3));
954
955 assert_eq!(trade.instrument_id, instrument_id);
956 assert_eq!(trade.price, Price::from("98570.9"));
957 assert_eq!(trade.size, Quantity::from(100));
958 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
959 assert_eq!(
960 trade.trade_id.to_string(),
961 "00000000-006d-1000-0000-000e8737d536"
962 );
963 assert_eq!(trade.ts_event, 1732436138704000000); assert_eq!(trade.ts_init, 3);
965 }
966
967 #[rstest]
968 fn test_trade_bin_message() {
969 let json_data = load_test_json("ws_trade_bin_1m.json");
970
971 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
972 let topic = BitmexWsTopic::TradeBin1m;
973
974 let msg: BitmexTradeBinMsg = serde_json::from_str(&json_data).unwrap();
975 let bar = parse_trade_bin_msg(&msg, &topic, 1, UnixNanos::from(3));
976
977 assert_eq!(bar.instrument_id(), instrument_id);
978 assert_eq!(
979 bar.bar_type.spec(),
980 BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
981 );
982 assert_eq!(bar.open, Price::from("97550.0"));
983 assert_eq!(bar.high, Price::from("97584.4"));
984 assert_eq!(bar.low, Price::from("97550.0"));
985 assert_eq!(bar.close, Price::from("97570.1"));
986 assert_eq!(bar.volume, Quantity::from(84_000));
987 assert_eq!(bar.ts_event, 1732392420000000000); assert_eq!(bar.ts_init, 3);
989 }
990
991 #[rstest]
992 fn test_parse_order_msg() {
993 let json_data = load_test_json("ws_order.json");
994 let msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
995 let report = parse_order_msg(&msg, 1).unwrap();
996
997 assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
998 assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
999 assert_eq!(
1000 report.venue_order_id.to_string(),
1001 "550e8400-e29b-41d4-a716-446655440001"
1002 );
1003 assert_eq!(
1004 report.client_order_id.unwrap().to_string(),
1005 "mm_bitmex_1a/oemUeQ4CAJZgP3fjHsA"
1006 );
1007 assert_eq!(report.order_side, OrderSide::Buy);
1008 assert_eq!(report.order_type, OrderType::Limit);
1009 assert_eq!(report.time_in_force, TimeInForce::Gtc);
1010 assert_eq!(report.order_status, OrderStatus::Accepted);
1011 assert_eq!(report.quantity, Quantity::from(100));
1012 assert_eq!(report.filled_qty, Quantity::from(0));
1013 assert_eq!(report.price.unwrap(), Price::from("98000.0"));
1014 assert_eq!(report.ts_accepted, 1732530600000000000); }
1016
1017 #[rstest]
1018 fn test_parse_execution_msg() {
1019 let json_data = load_test_json("ws_execution.json");
1020 let msg: BitmexExecutionMsg = serde_json::from_str(&json_data).unwrap();
1021 let fill = parse_execution_msg(msg, 1).unwrap();
1022
1023 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1024 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1025 assert_eq!(
1026 fill.venue_order_id.to_string(),
1027 "550e8400-e29b-41d4-a716-446655440002"
1028 );
1029 assert_eq!(
1030 fill.trade_id.to_string(),
1031 "00000000-006d-1000-0000-000e8737d540"
1032 );
1033 assert_eq!(
1034 fill.client_order_id.unwrap().to_string(),
1035 "mm_bitmex_2b/oemUeQ4CAJZgP3fjHsB"
1036 );
1037 assert_eq!(fill.order_side, OrderSide::Sell);
1038 assert_eq!(fill.last_qty, Quantity::from(100));
1039 assert_eq!(fill.last_px, Price::from("98950.0"));
1040 assert_eq!(fill.liquidity_side, LiquiditySide::Maker);
1041 assert_eq!(fill.commission, Money::new(0.00075, Currency::from("XBT")));
1042 assert_eq!(fill.commission.currency.code.to_string(), "XBT");
1043 assert_eq!(fill.ts_event, 1732530900789000000); }
1045
1046 #[rstest]
1047 fn test_parse_execution_msg_non_trade() {
1048 let mut msg: BitmexExecutionMsg =
1050 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1051 msg.exec_type = Some(BitmexExecType::Settlement);
1052
1053 let result = parse_execution_msg(msg, 1);
1054 assert!(result.is_none());
1055 }
1056
1057 #[rstest]
1058 fn test_parse_cancel_reject_execution() {
1059 let json = r#"{
1061 "execID":"00000000-006d-1000-0000-001e7f5081ad",
1062 "orderID":"ece0a2cc-7729-4f4c-bc6c-65d7c723e75b",
1063 "account":1667725,
1064 "execType":"CancelReject",
1065 "ordStatus":"Rejected",
1066 "workingIndicator":false,
1067 "ordRejReason":"Invalid orderID",
1068 "text":"Invalid orderID",
1069 "transactTime":"2025-09-05T05:38:28.001Z",
1070 "timestamp":"2025-09-05T05:38:28.001Z"
1071 }"#;
1072
1073 let msg: BitmexExecutionMsg = serde_json::from_str(json).unwrap();
1074 assert_eq!(msg.exec_type, Some(BitmexExecType::CancelReject));
1075 assert_eq!(msg.ord_status, Some(BitmexOrderStatus::Rejected));
1076 assert_eq!(msg.symbol, None);
1077
1078 let result = parse_execution_msg(msg, 1);
1080 assert!(result.is_none());
1081 }
1082
1083 #[rstest]
1084 fn test_parse_position_msg() {
1085 let json_data = load_test_json("ws_position.json");
1086 let msg: BitmexPositionMsg = serde_json::from_str(&json_data).unwrap();
1087 let report = parse_position_msg(msg);
1088
1089 assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1090 assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1091 assert_eq!(report.position_side.as_position_side(), PositionSide::Long);
1092 assert_eq!(report.quantity, Quantity::from(1000));
1093 assert!(report.venue_position_id.is_none());
1094 assert_eq!(report.ts_last, 1732530900789000000); }
1096
1097 #[rstest]
1098 fn test_parse_position_msg_short() {
1099 let mut msg: BitmexPositionMsg =
1100 serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1101 msg.current_qty = Some(-500);
1102
1103 let report = parse_position_msg(msg);
1104 assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
1105 assert_eq!(report.quantity, Quantity::from(500));
1106 }
1107
1108 #[rstest]
1109 fn test_parse_position_msg_flat() {
1110 let mut msg: BitmexPositionMsg =
1111 serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1112 msg.current_qty = Some(0);
1113
1114 let report = parse_position_msg(msg);
1115 assert_eq!(report.position_side.as_position_side(), PositionSide::Flat);
1116 assert_eq!(report.quantity, Quantity::from(0));
1117 }
1118
1119 #[rstest]
1120 fn test_parse_wallet_msg() {
1121 let json_data = load_test_json("ws_wallet.json");
1122 let msg: BitmexWalletMsg = serde_json::from_str(&json_data).unwrap();
1123 let ts_init = UnixNanos::from(1);
1124 let account_state = parse_wallet_msg(msg, ts_init);
1125
1126 assert_eq!(account_state.account_id.to_string(), "BITMEX-1234567");
1127 assert!(!account_state.balances.is_empty());
1128 let balance = &account_state.balances[0];
1129 assert_eq!(balance.currency.code.to_string(), "XBT");
1130 assert!((balance.total.as_f64() - 1.0000518).abs() < 1e-7);
1132 }
1133
1134 #[rstest]
1135 fn test_parse_wallet_msg_no_amount() {
1136 let mut msg: BitmexWalletMsg =
1137 serde_json::from_str(&load_test_json("ws_wallet.json")).unwrap();
1138 msg.amount = None;
1139
1140 let ts_init = UnixNanos::from(1);
1141 let account_state = parse_wallet_msg(msg, ts_init);
1142 let balance = &account_state.balances[0];
1143 assert_eq!(balance.total.as_f64(), 0.0);
1144 }
1145
1146 #[rstest]
1147 fn test_parse_margin_msg() {
1148 let json_data = load_test_json("ws_margin.json");
1149 let msg: BitmexMarginMsg = serde_json::from_str(&json_data).unwrap();
1150 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1151 let margin_balance = parse_margin_msg(msg, instrument_id);
1152
1153 assert_eq!(margin_balance.currency.code.to_string(), "XBT");
1154 assert_eq!(margin_balance.instrument_id, instrument_id);
1155 assert_eq!(margin_balance.initial.as_f64(), 0.0);
1158 assert!((margin_balance.maintenance.as_f64() - 0.00015949).abs() < 1e-8);
1160 }
1161
1162 #[rstest]
1163 fn test_parse_margin_msg_no_available() {
1164 let mut msg: BitmexMarginMsg =
1165 serde_json::from_str(&load_test_json("ws_margin.json")).unwrap();
1166 msg.available_margin = None;
1167
1168 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1169 let margin_balance = parse_margin_msg(msg, instrument_id);
1170 assert!(margin_balance.initial.as_f64() >= 0.0);
1172 assert!(margin_balance.maintenance.as_f64() >= 0.0);
1173 }
1174
1175 #[rstest]
1176 fn test_parse_instrument_msg_both_prices() {
1177 let json_data = load_test_json("ws_instrument.json");
1178 let msg: BitmexInstrumentMsg = serde_json::from_str(&json_data).unwrap();
1179
1180 let mut instruments_cache = AHashMap::new();
1182 let test_instrument = create_test_perpetual_instrument();
1183 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1184
1185 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1186
1187 assert_eq!(updates.len(), 2);
1189
1190 match &updates[0] {
1192 Data::MarkPriceUpdate(update) => {
1193 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1194 assert_eq!(update.value.as_f64(), 95125.7);
1195 }
1196 _ => panic!("Expected MarkPriceUpdate at index 0"),
1197 }
1198
1199 match &updates[1] {
1201 Data::IndexPriceUpdate(update) => {
1202 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1203 assert_eq!(update.value.as_f64(), 95124.3);
1204 }
1205 _ => panic!("Expected IndexPriceUpdate at index 1"),
1206 }
1207 }
1208
1209 #[rstest]
1210 fn test_parse_instrument_msg_mark_price_only() {
1211 let mut msg: BitmexInstrumentMsg =
1212 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1213 msg.index_price = None;
1214
1215 let mut instruments_cache = AHashMap::new();
1217 let test_instrument = create_test_perpetual_instrument();
1218 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1219
1220 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1221
1222 assert_eq!(updates.len(), 1);
1223 match &updates[0] {
1224 Data::MarkPriceUpdate(update) => {
1225 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1226 assert_eq!(update.value.as_f64(), 95125.7);
1227 }
1228 _ => panic!("Expected MarkPriceUpdate"),
1229 }
1230 }
1231
1232 #[rstest]
1233 fn test_parse_instrument_msg_index_price_only() {
1234 let mut msg: BitmexInstrumentMsg =
1235 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1236 msg.mark_price = None;
1237
1238 let mut instruments_cache = AHashMap::new();
1240 let test_instrument = create_test_perpetual_instrument();
1241 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1242
1243 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1244
1245 assert_eq!(updates.len(), 1);
1246 match &updates[0] {
1247 Data::IndexPriceUpdate(update) => {
1248 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1249 assert_eq!(update.value.as_f64(), 95124.3);
1250 }
1251 _ => panic!("Expected IndexPriceUpdate"),
1252 }
1253 }
1254
1255 #[rstest]
1256 fn test_parse_instrument_msg_no_prices() {
1257 let mut msg: BitmexInstrumentMsg =
1258 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1259 msg.mark_price = None;
1260 msg.index_price = None;
1261 msg.last_price = None;
1262
1263 let mut instruments_cache = AHashMap::new();
1265 let test_instrument = create_test_perpetual_instrument();
1266 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1267
1268 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1269 assert_eq!(updates.len(), 0);
1270 }
1271
1272 #[rstest]
1273 fn test_parse_instrument_msg_index_symbol() {
1274 let mut msg: BitmexInstrumentMsg =
1277 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1278 msg.symbol = Ustr::from(".BXBT");
1279 msg.last_price = Some(119163.05);
1280 msg.mark_price = Some(119163.05); msg.index_price = None;
1282
1283 let instrument_id = InstrumentId::from(".BXBT.BITMEX");
1285 let instrument = CryptoPerpetual::new(
1286 instrument_id,
1287 Symbol::from(".BXBT"),
1288 Currency::BTC(),
1289 Currency::USD(),
1290 Currency::USD(),
1291 false, 2, 8, Price::from("0.01"),
1295 Quantity::from("0.00000001"),
1296 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(), UnixNanos::default(), );
1311 let mut instruments_cache = AHashMap::new();
1312 instruments_cache.insert(
1313 Ustr::from(".BXBT"),
1314 InstrumentAny::CryptoPerpetual(instrument),
1315 );
1316
1317 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1318
1319 assert_eq!(updates.len(), 2);
1320
1321 match &updates[0] {
1323 Data::MarkPriceUpdate(update) => {
1324 assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1325 assert_eq!(update.value, Price::from("119163.05"));
1326 }
1327 _ => panic!("Expected MarkPriceUpdate for index symbol"),
1328 }
1329
1330 match &updates[1] {
1332 Data::IndexPriceUpdate(update) => {
1333 assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1334 assert_eq!(update.value, Price::from("119163.05"));
1335 assert_eq!(update.ts_init, UnixNanos::from(1));
1336 }
1337 _ => panic!("Expected IndexPriceUpdate for index symbol"),
1338 }
1339 }
1340
1341 #[rstest]
1342 fn test_parse_funding_msg() {
1343 let json_data = load_test_json("ws_funding_rate.json");
1344 let msg: BitmexFundingMsg = serde_json::from_str(&json_data).unwrap();
1345 let update = parse_funding_msg(msg, UnixNanos::from(1));
1346
1347 assert!(update.is_some());
1348 let update = update.unwrap();
1349
1350 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1351 assert_eq!(update.rate.to_string(), "0.0001");
1352 assert!(update.next_funding_ns.is_none());
1353 assert_eq!(update.ts_event, UnixNanos::from(1732507200000000000));
1354 assert_eq!(update.ts_init, UnixNanos::from(1));
1355 }
1356}