1use std::num::NonZero;
19
20use ahash::AHashMap;
21use dashmap::DashMap;
22use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime, uuid::UUID4};
23#[cfg(test)]
24use nautilus_model::types::Currency;
25use nautilus_model::{
26 data::{
27 Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
28 MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
29 depth::DEPTH10_LEN,
30 },
31 enums::{
32 AccountType, AggregationSource, BarAggregation, OrderSide, OrderStatus, OrderType,
33 PriceType, RecordFlag, TimeInForce, TriggerType,
34 },
35 events::{OrderUpdated, account::state::AccountState},
36 identifiers::{
37 AccountId, ClientOrderId, InstrumentId, OrderListId, StrategyId, Symbol, TradeId, TraderId,
38 VenueOrderId,
39 },
40 instruments::{Instrument, InstrumentAny},
41 reports::{FillReport, OrderStatusReport, PositionStatusReport},
42 types::{AccountBalance, MarginBalance, Money, Price, Quantity},
43};
44use rust_decimal::{Decimal, prelude::FromPrimitive};
45use ustr::Ustr;
46use uuid::Uuid;
47
48use super::{
49 enums::{BitmexAction, BitmexWsTopic},
50 messages::{
51 BitmexExecutionMsg, BitmexFundingMsg, BitmexInstrumentMsg, BitmexMarginMsg,
52 BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexOrderMsg, BitmexPositionMsg,
53 BitmexQuoteMsg, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg,
54 },
55};
56use crate::{
57 common::{
58 consts::BITMEX_VENUE,
59 enums::{BitmexExecInstruction, BitmexExecType, BitmexSide},
60 parse::{
61 clean_reason, map_bitmex_currency, normalize_trade_bin_prices,
62 normalize_trade_bin_volume, parse_contracts_quantity, parse_fractional_quantity,
63 parse_instrument_id, parse_liquidity_side, parse_optional_datetime_to_unix_nanos,
64 parse_position_side, parse_signed_contracts_quantity,
65 },
66 },
67 http::parse::get_currency,
68 websocket::messages::BitmexOrderUpdateMsg,
69};
70
71const BAR_SPEC_1_MINUTE: BarSpecification = BarSpecification {
72 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
73 aggregation: BarAggregation::Minute,
74 price_type: PriceType::Last,
75};
76const BAR_SPEC_5_MINUTE: BarSpecification = BarSpecification {
77 step: NonZero::new(5).expect("5 is a valid non-zero usize"),
78 aggregation: BarAggregation::Minute,
79 price_type: PriceType::Last,
80};
81const BAR_SPEC_1_HOUR: BarSpecification = BarSpecification {
82 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
83 aggregation: BarAggregation::Hour,
84 price_type: PriceType::Last,
85};
86const BAR_SPEC_1_DAY: BarSpecification = BarSpecification {
87 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
88 aggregation: BarAggregation::Day,
89 price_type: PriceType::Last,
90};
91
92#[inline]
100#[must_use]
101pub fn is_index_symbol(symbol: &Ustr) -> bool {
102 symbol.starts_with('.')
103}
104
105#[must_use]
107pub fn parse_book_msg_vec(
108 data: Vec<BitmexOrderBookMsg>,
109 action: BitmexAction,
110 instruments: &AHashMap<Ustr, InstrumentAny>,
111 ts_init: UnixNanos,
112) -> Vec<Data> {
113 let mut deltas = Vec::with_capacity(data.len());
114
115 for msg in data {
116 if let Some(instrument) = instruments.get(&msg.symbol) {
117 let instrument_id = instrument.id();
118 let price_precision = instrument.price_precision();
119 deltas.push(Data::Delta(parse_book_msg(
120 &msg,
121 &action,
122 instrument,
123 instrument_id,
124 price_precision,
125 ts_init,
126 )));
127 } else {
128 log::error!(
129 "Instrument cache miss: book delta dropped for symbol={}",
130 msg.symbol
131 );
132 }
133 }
134 deltas
135}
136
137#[must_use]
139pub fn parse_book10_msg_vec(
140 data: Vec<BitmexOrderBook10Msg>,
141 instruments: &AHashMap<Ustr, InstrumentAny>,
142 ts_init: UnixNanos,
143) -> Vec<Data> {
144 let mut depths = Vec::with_capacity(data.len());
145
146 for msg in data {
147 if let Some(instrument) = instruments.get(&msg.symbol) {
148 let instrument_id = instrument.id();
149 let price_precision = instrument.price_precision();
150 depths.push(Data::Depth10(Box::new(parse_book10_msg(
151 &msg,
152 instrument,
153 instrument_id,
154 price_precision,
155 ts_init,
156 ))));
157 } else {
158 log::error!(
159 "Instrument cache miss: depth10 message dropped for symbol={}",
160 msg.symbol
161 );
162 }
163 }
164 depths
165}
166
167#[must_use]
169pub fn parse_trade_msg_vec(
170 data: Vec<BitmexTradeMsg>,
171 instruments: &AHashMap<Ustr, InstrumentAny>,
172 ts_init: UnixNanos,
173) -> Vec<Data> {
174 let mut trades = Vec::with_capacity(data.len());
175
176 for msg in data {
177 if let Some(instrument) = instruments.get(&msg.symbol) {
178 let instrument_id = instrument.id();
179 let price_precision = instrument.price_precision();
180 trades.push(Data::Trade(parse_trade_msg(
181 &msg,
182 instrument,
183 instrument_id,
184 price_precision,
185 ts_init,
186 )));
187 } else {
188 log::error!(
189 "Instrument cache miss: trade message dropped for symbol={}",
190 msg.symbol
191 );
192 }
193 }
194 trades
195}
196
197#[must_use]
199pub fn parse_trade_bin_msg_vec(
200 data: Vec<BitmexTradeBinMsg>,
201 topic: BitmexWsTopic,
202 instruments: &AHashMap<Ustr, InstrumentAny>,
203 ts_init: UnixNanos,
204) -> Vec<Data> {
205 let mut trades = Vec::with_capacity(data.len());
206
207 for msg in data {
208 if let Some(instrument) = instruments.get(&msg.symbol) {
209 let instrument_id = instrument.id();
210 let price_precision = instrument.price_precision();
211 trades.push(Data::Bar(parse_trade_bin_msg(
212 &msg,
213 &topic,
214 instrument,
215 instrument_id,
216 price_precision,
217 ts_init,
218 )));
219 } else {
220 log::error!(
221 "Instrument cache miss: trade bin (bar) dropped for symbol={}",
222 msg.symbol
223 );
224 }
225 }
226 trades
227}
228
229#[allow(clippy::too_many_arguments)]
231#[must_use]
232pub fn parse_book_msg(
233 msg: &BitmexOrderBookMsg,
234 action: &BitmexAction,
235 instrument: &InstrumentAny,
236 instrument_id: InstrumentId,
237 price_precision: u8,
238 ts_init: UnixNanos,
239) -> OrderBookDelta {
240 let flags = if action == &BitmexAction::Insert {
241 RecordFlag::F_SNAPSHOT as u8
242 } else {
243 0
244 };
245
246 let action = action.as_book_action();
247 let price = Price::new(msg.price, price_precision);
248 let side = msg.side.as_order_side();
249 let size = parse_contracts_quantity(msg.size.unwrap_or(0), instrument);
250 let order_id = msg.id;
251 let order = BookOrder::new(side, price, size, order_id);
252 let sequence = 0; let ts_event = UnixNanos::from(msg.timestamp);
254
255 OrderBookDelta::new(
256 instrument_id,
257 action,
258 order,
259 flags,
260 sequence,
261 ts_event,
262 ts_init,
263 )
264}
265
266#[allow(clippy::too_many_arguments)]
272#[must_use]
273pub fn parse_book10_msg(
274 msg: &BitmexOrderBook10Msg,
275 instrument: &InstrumentAny,
276 instrument_id: InstrumentId,
277 price_precision: u8,
278 ts_init: UnixNanos,
279) -> OrderBookDepth10 {
280 let mut bids = Vec::with_capacity(DEPTH10_LEN);
281 let mut asks = Vec::with_capacity(DEPTH10_LEN);
282
283 let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
285 let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
286
287 for (i, level) in msg.bids.iter().enumerate() {
288 let bid_order = BookOrder::new(
289 OrderSide::Buy,
290 Price::new(level[0], price_precision),
291 parse_fractional_quantity(level[1], instrument),
292 0,
293 );
294
295 bids.push(bid_order);
296 bid_counts[i] = 1;
297 }
298
299 for (i, level) in msg.asks.iter().enumerate() {
300 let ask_order = BookOrder::new(
301 OrderSide::Sell,
302 Price::new(level[0], price_precision),
303 parse_fractional_quantity(level[1], instrument),
304 0,
305 );
306
307 asks.push(ask_order);
308 ask_counts[i] = 1;
309 }
310
311 let bids: [BookOrder; DEPTH10_LEN] = bids
312 .try_into()
313 .inspect_err(|v: &Vec<BookOrder>| {
314 log::error!("Bids length mismatch: expected 10, was {}", v.len());
315 })
316 .expect("BitMEX orderBook10 should always have exactly 10 bid levels");
317 let asks: [BookOrder; DEPTH10_LEN] = asks
318 .try_into()
319 .inspect_err(|v: &Vec<BookOrder>| {
320 log::error!("Asks length mismatch: expected 10, was {}", v.len());
321 })
322 .expect("BitMEX orderBook10 should always have exactly 10 ask levels");
323
324 let ts_event = UnixNanos::from(msg.timestamp);
325
326 OrderBookDepth10::new(
327 instrument_id,
328 bids,
329 asks,
330 bid_counts,
331 ask_counts,
332 RecordFlag::F_SNAPSHOT as u8,
333 0, ts_event,
335 ts_init,
336 )
337}
338
339#[must_use]
341pub fn parse_quote_msg(
342 msg: &BitmexQuoteMsg,
343 last_quote: &QuoteTick,
344 instrument: &InstrumentAny,
345 instrument_id: InstrumentId,
346 price_precision: u8,
347 ts_init: UnixNanos,
348) -> QuoteTick {
349 let bid_price = match msg.bid_price {
350 Some(price) => Price::new(price, price_precision),
351 None => last_quote.bid_price,
352 };
353
354 let ask_price = match msg.ask_price {
355 Some(price) => Price::new(price, price_precision),
356 None => last_quote.ask_price,
357 };
358
359 let bid_size = match msg.bid_size {
360 Some(size) => parse_contracts_quantity(size, instrument),
361 None => last_quote.bid_size,
362 };
363
364 let ask_size = match msg.ask_size {
365 Some(size) => parse_contracts_quantity(size, instrument),
366 None => last_quote.ask_size,
367 };
368
369 let ts_event = UnixNanos::from(msg.timestamp);
370
371 QuoteTick::new(
372 instrument_id,
373 bid_price,
374 ask_price,
375 bid_size,
376 ask_size,
377 ts_event,
378 ts_init,
379 )
380}
381
382#[must_use]
384pub fn parse_trade_msg(
385 msg: &BitmexTradeMsg,
386 instrument: &InstrumentAny,
387 instrument_id: InstrumentId,
388 price_precision: u8,
389 ts_init: UnixNanos,
390) -> TradeTick {
391 let price = Price::new(msg.price, price_precision);
392 let size = parse_contracts_quantity(msg.size, instrument);
393 let aggressor_side = msg.side.as_aggressor_side();
394 let trade_id = TradeId::new(
395 msg.trd_match_id
396 .map_or_else(|| Uuid::new_v4().to_string(), |uuid| uuid.to_string()),
397 );
398 let ts_event = UnixNanos::from(msg.timestamp);
399
400 TradeTick::new(
401 instrument_id,
402 price,
403 size,
404 aggressor_side,
405 trade_id,
406 ts_event,
407 ts_init,
408 )
409}
410
411#[must_use]
413pub fn parse_trade_bin_msg(
414 msg: &BitmexTradeBinMsg,
415 topic: &BitmexWsTopic,
416 instrument: &InstrumentAny,
417 instrument_id: InstrumentId,
418 price_precision: u8,
419 ts_init: UnixNanos,
420) -> Bar {
421 let spec = bar_spec_from_topic(topic);
422 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
423
424 let open = Price::new(msg.open, price_precision);
425 let high = Price::new(msg.high, price_precision);
426 let low = Price::new(msg.low, price_precision);
427 let close = Price::new(msg.close, price_precision);
428
429 let (open, high, low, close) =
430 normalize_trade_bin_prices(open, high, low, close, &msg.symbol, Some(&bar_type));
431
432 let volume_contracts = normalize_trade_bin_volume(Some(msg.volume), &msg.symbol);
433 let volume = parse_contracts_quantity(volume_contracts, instrument);
434 let ts_event = UnixNanos::from(msg.timestamp);
435
436 Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
437}
438
439#[must_use]
445pub fn bar_spec_from_topic(topic: &BitmexWsTopic) -> BarSpecification {
446 match topic {
447 BitmexWsTopic::TradeBin1m => BAR_SPEC_1_MINUTE,
448 BitmexWsTopic::TradeBin5m => BAR_SPEC_5_MINUTE,
449 BitmexWsTopic::TradeBin1h => BAR_SPEC_1_HOUR,
450 BitmexWsTopic::TradeBin1d => BAR_SPEC_1_DAY,
451 _ => {
452 log::error!("Bar specification not supported: topic={topic:?}");
453 BAR_SPEC_1_MINUTE
454 }
455 }
456}
457
458#[must_use]
464pub fn topic_from_bar_spec(spec: BarSpecification) -> BitmexWsTopic {
465 match spec {
466 BAR_SPEC_1_MINUTE => BitmexWsTopic::TradeBin1m,
467 BAR_SPEC_5_MINUTE => BitmexWsTopic::TradeBin5m,
468 BAR_SPEC_1_HOUR => BitmexWsTopic::TradeBin1h,
469 BAR_SPEC_1_DAY => BitmexWsTopic::TradeBin1d,
470 _ => {
471 log::error!("Bar specification not supported: spec={spec:?}");
472 BitmexWsTopic::TradeBin1m
473 }
474 }
475}
476
477fn infer_order_type_from_msg(msg: &BitmexOrderMsg) -> Option<OrderType> {
478 if msg.stop_px.is_some() {
479 if msg.price.is_some() {
480 Some(OrderType::StopLimit)
481 } else {
482 Some(OrderType::StopMarket)
483 }
484 } else if msg.price.is_some() {
485 Some(OrderType::Limit)
486 } else {
487 Some(OrderType::Market)
488 }
489}
490
491pub fn parse_order_msg(
505 msg: &BitmexOrderMsg,
506 instrument: &InstrumentAny,
507 order_type_cache: &DashMap<ClientOrderId, OrderType>,
508) -> anyhow::Result<OrderStatusReport> {
509 let account_id = AccountId::new(format!("BITMEX-{}", msg.account)); let instrument_id = parse_instrument_id(msg.symbol);
511 let venue_order_id = VenueOrderId::new(msg.order_id.to_string());
512 let common_side: BitmexSide = msg.side.into();
513 let order_side: OrderSide = common_side.into();
514
515 let order_type: OrderType = if let Some(ord_type) = msg.ord_type {
516 ord_type.into()
517 } else if let Some(client_order_id) = msg.cl_ord_id {
518 let client_order_id = ClientOrderId::new(client_order_id);
519 if let Some(entry) = order_type_cache.get(&client_order_id) {
520 *entry.value()
521 } else if let Some(inferred) = infer_order_type_from_msg(msg) {
522 order_type_cache.insert(client_order_id, inferred);
523 inferred
524 } else {
525 anyhow::bail!(
526 "Order type not found in cache for client_order_id: {client_order_id} (order missing ord_type field)"
527 );
528 }
529 } else if let Some(inferred) = infer_order_type_from_msg(msg) {
530 inferred
531 } else {
532 anyhow::bail!("Order missing both ord_type and cl_ord_id");
533 };
534
535 let time_in_force: TimeInForce = match msg.time_in_force {
536 Some(tif) => tif.try_into().map_err(|e| anyhow::anyhow!("{e}"))?,
537 None => TimeInForce::Gtc,
538 };
539 let order_status: OrderStatus = msg.ord_status.into();
540 let quantity = parse_signed_contracts_quantity(msg.order_qty, instrument);
541 let filled_qty = parse_signed_contracts_quantity(msg.cum_qty, instrument);
542 let report_id = UUID4::new();
543 let ts_accepted =
544 parse_optional_datetime_to_unix_nanos(&Some(msg.transact_time), "transact_time");
545 let ts_last = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
546 let ts_init = get_atomic_clock_realtime().get_time_ns();
547
548 let mut report = OrderStatusReport::new(
549 account_id,
550 instrument_id,
551 None, venue_order_id,
553 order_side,
554 order_type,
555 time_in_force,
556 order_status,
557 quantity,
558 filled_qty,
559 ts_accepted,
560 ts_last,
561 ts_init,
562 Some(report_id),
563 );
564
565 if let Some(cl_ord_id) = &msg.cl_ord_id {
566 report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
567 }
568
569 if let Some(cl_ord_link_id) = &msg.cl_ord_link_id {
570 report = report.with_order_list_id(OrderListId::new(cl_ord_link_id));
571 }
572
573 if let Some(price) = msg.price {
574 report = report.with_price(Price::new(price, instrument.price_precision()));
575 }
576
577 if let Some(avg_px) = msg.avg_px {
578 report = report.with_avg_px(avg_px)?;
579 }
580
581 if let Some(trigger_price) = msg.stop_px {
582 let trigger_type = if let Some(exec_insts) = &msg.exec_inst {
583 if exec_insts.contains(&BitmexExecInstruction::MarkPrice) {
585 TriggerType::MarkPrice
586 } else if exec_insts.contains(&BitmexExecInstruction::IndexPrice) {
587 TriggerType::IndexPrice
588 } else if exec_insts.contains(&BitmexExecInstruction::LastPrice) {
589 TriggerType::LastPrice
590 } else {
591 TriggerType::Default
592 }
593 } else {
594 TriggerType::Default };
596
597 report = report
598 .with_trigger_price(Price::new(trigger_price, instrument.price_precision()))
599 .with_trigger_type(trigger_type);
600 }
601
602 if let Some(exec_insts) = &msg.exec_inst {
603 for exec_inst in exec_insts {
604 match exec_inst {
605 BitmexExecInstruction::ParticipateDoNotInitiate => {
606 report = report.with_post_only(true);
607 }
608 BitmexExecInstruction::ReduceOnly => {
609 report = report.with_reduce_only(true);
610 }
611 _ => {}
612 }
613 }
614 }
615
616 if order_status == OrderStatus::Rejected {
618 if let Some(reason_str) = msg.ord_rej_reason.or(msg.text) {
619 log::debug!(
620 "Order rejected with reason: order_id={:?}, client_order_id={:?}, reason={:?}",
621 venue_order_id,
622 msg.cl_ord_id,
623 reason_str,
624 );
625 report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
626 } else {
627 log::debug!(
628 "Order rejected without reason from BitMEX: order_id={:?}, client_order_id={:?}, ord_status={:?}, ord_rej_reason={:?}, text={:?}",
629 venue_order_id,
630 msg.cl_ord_id,
631 msg.ord_status,
632 msg.ord_rej_reason,
633 msg.text,
634 );
635 }
636 }
637
638 if order_status == OrderStatus::Canceled
641 && let Some(reason_str) = msg.ord_rej_reason.or(msg.text)
642 {
643 report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
644 }
645
646 Ok(report)
647}
648
649pub fn parse_order_update_msg(
653 msg: &BitmexOrderUpdateMsg,
654 instrument: &InstrumentAny,
655 account_id: AccountId,
656) -> Option<OrderUpdated> {
657 let trader_id = TraderId::external();
660 let strategy_id = StrategyId::external();
661 let instrument_id = parse_instrument_id(msg.symbol);
662 let venue_order_id = Some(VenueOrderId::new(msg.order_id.to_string()));
663 let client_order_id = msg
664 .cl_ord_id
665 .map_or_else(ClientOrderId::external, ClientOrderId::new);
666 let quantity = Quantity::zero(instrument.size_precision());
667 let price = msg
668 .price
669 .map(|p| Price::new(p, instrument.price_precision()));
670
671 let trigger_price = None;
673 let protection_price = None;
675
676 let event_id = UUID4::new();
677 let ts_event = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
678 let ts_init = get_atomic_clock_realtime().get_time_ns();
679
680 Some(OrderUpdated::new(
681 trader_id,
682 strategy_id,
683 instrument_id,
684 client_order_id,
685 quantity,
686 event_id,
687 ts_event,
688 ts_init,
689 false, venue_order_id,
691 Some(account_id),
692 price,
693 trigger_price,
694 protection_price,
695 ))
696}
697
698pub fn parse_execution_msg(
717 msg: BitmexExecutionMsg,
718 instrument: &InstrumentAny,
719) -> Option<FillReport> {
720 let exec_type = msg.exec_type?;
721
722 match exec_type {
723 BitmexExecType::Trade | BitmexExecType::Liquidation => {}
725 BitmexExecType::Bankruptcy => {
726 log::warn!(
727 "Processing bankruptcy execution as fill: exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
728 msg.order_id,
729 msg.symbol,
730 );
731 }
732
733 BitmexExecType::Settlement => {
735 log::debug!(
736 "Settlement execution skipped (not a fill): applies quanto conversion/PnL transfer on contract settlement: exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
737 msg.order_id,
738 msg.symbol,
739 );
740 return None;
741 }
742 BitmexExecType::TrialFill => {
743 log::warn!(
744 "Trial fill execution received (testnet only), not processed as fill: exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
745 msg.order_id,
746 msg.symbol,
747 );
748 return None;
749 }
750
751 BitmexExecType::Funding => {
753 log::debug!(
754 "Funding execution skipped (not a fill): exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
755 msg.order_id,
756 msg.symbol,
757 );
758 return None;
759 }
760 BitmexExecType::Insurance => {
761 log::debug!(
762 "Insurance execution skipped (not a fill): exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
763 msg.order_id,
764 msg.symbol,
765 );
766 return None;
767 }
768 BitmexExecType::Rebalance => {
769 log::debug!(
770 "Rebalance execution skipped (not a fill): exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
771 msg.order_id,
772 msg.symbol,
773 );
774 return None;
775 }
776
777 BitmexExecType::New
779 | BitmexExecType::Canceled
780 | BitmexExecType::CancelReject
781 | BitmexExecType::Replaced
782 | BitmexExecType::Rejected
783 | BitmexExecType::AmendReject
784 | BitmexExecType::Suspended
785 | BitmexExecType::Released
786 | BitmexExecType::TriggeredOrActivatedBySystem => {
787 log::debug!(
788 "Execution message skipped (order state change, not a fill): exec_type={exec_type:?}, order_id={:?}",
789 msg.order_id,
790 );
791 return None;
792 }
793 }
794
795 let account_id = AccountId::new(format!("BITMEX-{}", msg.account?));
796 let instrument_id = parse_instrument_id(msg.symbol?);
797 let venue_order_id = VenueOrderId::new(msg.order_id?.to_string());
798 let trade_id = TradeId::new(msg.trd_match_id?.to_string());
799 let order_side: OrderSide = msg.side.map_or(OrderSide::NoOrderSide, |s| {
800 let side: BitmexSide = s.into();
801 side.into()
802 });
803 let last_qty = parse_signed_contracts_quantity(msg.last_qty?, instrument);
804 let last_px = Price::new(msg.last_px?, instrument.price_precision());
805 let settlement_currency_str = msg.settl_currency.unwrap_or(Ustr::from("XBT"));
806 let mapped_currency = map_bitmex_currency(settlement_currency_str.as_str());
807 let currency = get_currency(&mapped_currency);
808 let commission = Money::new(msg.commission.unwrap_or(0.0), currency);
809 let liquidity_side = parse_liquidity_side(&msg.last_liquidity_ind);
810 let client_order_id = msg.cl_ord_id.map(ClientOrderId::new);
811 let venue_position_id = None; let ts_event = parse_optional_datetime_to_unix_nanos(&msg.transact_time, "transact_time");
813 let ts_init = get_atomic_clock_realtime().get_time_ns();
814
815 Some(FillReport::new(
816 account_id,
817 instrument_id,
818 venue_order_id,
819 trade_id,
820 order_side,
821 last_qty,
822 last_px,
823 commission,
824 liquidity_side,
825 client_order_id,
826 venue_position_id,
827 ts_event,
828 ts_init,
829 None,
830 ))
831}
832
833#[must_use]
839pub fn parse_position_msg(
840 msg: BitmexPositionMsg,
841 instrument: &InstrumentAny,
842) -> PositionStatusReport {
843 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
844 let instrument_id = parse_instrument_id(msg.symbol);
845 let position_side = parse_position_side(msg.current_qty).as_specified();
846 let quantity = parse_signed_contracts_quantity(msg.current_qty.unwrap_or(0), instrument);
847 let venue_position_id = None; let avg_px_open = msg.avg_entry_price.and_then(Decimal::from_f64);
849 let ts_last = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
850 let ts_init = get_atomic_clock_realtime().get_time_ns();
851
852 PositionStatusReport::new(
853 account_id,
854 instrument_id,
855 position_side,
856 quantity,
857 ts_last,
858 ts_init,
859 None, venue_position_id, avg_px_open, )
863}
864
865#[must_use]
878pub fn parse_instrument_msg(
879 msg: BitmexInstrumentMsg,
880 instruments_cache: &AHashMap<Ustr, InstrumentAny>,
881 ts_init: UnixNanos,
882) -> Vec<Data> {
883 let mut updates = Vec::new();
884 let is_index = is_index_symbol(&msg.symbol);
885
886 let effective_index_price = if is_index {
889 msg.last_price
890 } else {
891 msg.index_price
892 };
893
894 if msg.mark_price.is_none() && effective_index_price.is_none() {
898 return updates;
899 }
900
901 let instrument_id = InstrumentId::new(Symbol::from_ustr_unchecked(msg.symbol), *BITMEX_VENUE);
902 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
903
904 let price_precision = match instruments_cache.get(&Ustr::from(&msg.symbol)) {
906 Some(instrument) => instrument.price_precision(),
907 None => {
908 if is_index {
912 log::trace!(
913 "Index instrument {} not in cache, skipping update",
914 msg.symbol
915 );
916 } else {
917 log::debug!("Instrument {} not in cache, skipping update", msg.symbol);
918 }
919 return updates;
920 }
921 };
922
923 if let Some(mark_price) = msg.mark_price {
926 let price = Price::new(mark_price, price_precision);
927 updates.push(Data::MarkPriceUpdate(MarkPriceUpdate::new(
928 instrument_id,
929 price,
930 ts_event,
931 ts_init,
932 )));
933 }
934
935 if let Some(index_price) = effective_index_price {
937 let price = Price::new(index_price, price_precision);
938 updates.push(Data::IndexPriceUpdate(IndexPriceUpdate::new(
939 instrument_id,
940 price,
941 ts_event,
942 ts_init,
943 )));
944 }
945
946 updates
947}
948
949pub fn parse_funding_msg(msg: BitmexFundingMsg, ts_init: UnixNanos) -> Option<FundingRateUpdate> {
955 let instrument_id = InstrumentId::from(format!("{}.BITMEX", msg.symbol).as_str());
956 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
957
958 let rate = match Decimal::try_from(msg.funding_rate) {
960 Ok(rate) => rate,
961 Err(e) => {
962 log::error!("Failed to parse funding rate: {e}");
963 return None;
964 }
965 };
966
967 Some(FundingRateUpdate::new(
968 instrument_id,
969 rate,
970 None, ts_event,
972 ts_init,
973 ))
974}
975
976#[must_use]
985pub fn parse_wallet_msg(msg: BitmexWalletMsg, ts_init: UnixNanos) -> AccountState {
986 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
987
988 let currency_str = map_bitmex_currency(msg.currency.as_str());
990 let currency = get_currency(¤cy_str);
991
992 let divisor = if msg.currency == "XBt" {
994 100_000_000.0 } else if msg.currency == "USDt" || msg.currency == "LAMp" {
996 1_000_000.0 } else {
998 1.0
999 };
1000 let amount = msg.amount.unwrap_or(0) as f64 / divisor;
1001
1002 let total = Money::new(amount, currency);
1003 let locked = Money::new(0.0, currency); let free = total - locked;
1005
1006 let balance = AccountBalance::new_checked(total, locked, free)
1007 .expect("Balance calculation should be valid");
1008
1009 AccountState::new(
1010 account_id,
1011 AccountType::Margin,
1012 vec![balance],
1013 vec![], true, UUID4::new(),
1016 ts_init,
1017 ts_init,
1018 None,
1019 )
1020}
1021
1022#[must_use]
1026pub fn parse_margin_msg(msg: BitmexMarginMsg, instrument_id: InstrumentId) -> MarginBalance {
1027 let currency_str = map_bitmex_currency(msg.currency.as_str());
1029 let currency = get_currency(¤cy_str);
1030
1031 let divisor = if msg.currency == "XBt" {
1033 100_000_000.0 } else if msg.currency == "USDt" || msg.currency == "LAMp" {
1035 1_000_000.0 } else {
1037 1.0
1038 };
1039
1040 let initial = (msg.init_margin.unwrap_or(0) as f64 / divisor).max(0.0);
1041 let maintenance = (msg.maint_margin.unwrap_or(0) as f64 / divisor).max(0.0);
1042 let _unrealized = msg.unrealised_pnl.unwrap_or(0) as f64 / divisor;
1043
1044 MarginBalance::new(
1045 Money::new(initial, currency),
1046 Money::new(maintenance, currency),
1047 instrument_id,
1048 )
1049}
1050
1051#[cfg(test)]
1052mod tests {
1053 use chrono::{DateTime, Utc};
1054 use nautilus_model::{
1055 enums::{AggressorSide, BookAction, LiquiditySide, PositionSide},
1056 identifiers::Symbol,
1057 instruments::crypto_perpetual::CryptoPerpetual,
1058 };
1059 use rstest::rstest;
1060 use ustr::Ustr;
1061
1062 use super::*;
1063 use crate::common::{
1064 enums::{BitmexExecType, BitmexOrderStatus},
1065 testing::load_test_json,
1066 };
1067
1068 fn create_test_perpetual_instrument_with_precisions(
1070 price_precision: u8,
1071 size_precision: u8,
1072 ) -> InstrumentAny {
1073 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
1074 InstrumentId::from("XBTUSD.BITMEX"),
1075 Symbol::new("XBTUSD"),
1076 Currency::BTC(),
1077 Currency::USD(),
1078 Currency::BTC(),
1079 true, price_precision,
1081 size_precision,
1082 Price::new(0.5, price_precision),
1083 Quantity::new(1.0, size_precision),
1084 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
1097 UnixNanos::default(),
1098 ))
1099 }
1100
1101 fn create_test_perpetual_instrument() -> InstrumentAny {
1102 create_test_perpetual_instrument_with_precisions(1, 0)
1103 }
1104
1105 #[rstest]
1106 fn test_orderbook_l2_message() {
1107 let json_data = load_test_json("ws_orderbook_l2.json");
1108
1109 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1110 let msg: BitmexOrderBookMsg = serde_json::from_str(&json_data).unwrap();
1111
1112 let instrument = create_test_perpetual_instrument();
1114 let delta = parse_book_msg(
1115 &msg,
1116 &BitmexAction::Insert,
1117 &instrument,
1118 instrument.id(),
1119 instrument.price_precision(),
1120 UnixNanos::from(3),
1121 );
1122 assert_eq!(delta.instrument_id, instrument_id);
1123 assert_eq!(delta.order.price, Price::from("98459.9"));
1124 assert_eq!(delta.order.size, Quantity::from(33000));
1125 assert_eq!(delta.order.side, OrderSide::Sell);
1126 assert_eq!(delta.order.order_id, 62400580205);
1127 assert_eq!(delta.action, BookAction::Add);
1128 assert_eq!(delta.flags, RecordFlag::F_SNAPSHOT as u8);
1129 assert_eq!(delta.sequence, 0);
1130 assert_eq!(delta.ts_event, 1732436782356000000); assert_eq!(delta.ts_init, 3);
1132
1133 let delta = parse_book_msg(
1135 &msg,
1136 &BitmexAction::Update,
1137 &instrument,
1138 instrument.id(),
1139 instrument.price_precision(),
1140 UnixNanos::from(3),
1141 );
1142 assert_eq!(delta.flags, 0);
1143 assert_eq!(delta.action, BookAction::Update);
1144 }
1145
1146 #[rstest]
1147 fn test_orderbook10_message() {
1148 let json_data = load_test_json("ws_orderbook_10.json");
1149 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1150 let msg: BitmexOrderBook10Msg = serde_json::from_str(&json_data).unwrap();
1151 let instrument = create_test_perpetual_instrument();
1152 let depth10 = parse_book10_msg(
1153 &msg,
1154 &instrument,
1155 instrument.id(),
1156 instrument.price_precision(),
1157 UnixNanos::from(3),
1158 );
1159
1160 assert_eq!(depth10.instrument_id, instrument_id);
1161
1162 assert_eq!(depth10.bids[0].price, Price::from("98490.3"));
1164 assert_eq!(depth10.bids[0].size, Quantity::from(22400));
1165 assert_eq!(depth10.bids[0].side, OrderSide::Buy);
1166
1167 assert_eq!(depth10.asks[0].price, Price::from("98490.4"));
1169 assert_eq!(depth10.asks[0].size, Quantity::from(17600));
1170 assert_eq!(depth10.asks[0].side, OrderSide::Sell);
1171
1172 assert_eq!(depth10.bid_counts, [1; DEPTH10_LEN]);
1174 assert_eq!(depth10.ask_counts, [1; DEPTH10_LEN]);
1175
1176 assert_eq!(depth10.sequence, 0);
1178 assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
1179 assert_eq!(depth10.ts_event, 1732436353513000000); assert_eq!(depth10.ts_init, 3);
1181 }
1182
1183 #[rstest]
1184 fn test_quote_message() {
1185 let json_data = load_test_json("ws_quote.json");
1186
1187 let instrument_id = InstrumentId::from("BCHUSDT.BITMEX");
1188 let last_quote = QuoteTick::new(
1189 instrument_id,
1190 Price::new(487.50, 2),
1191 Price::new(488.20, 2),
1192 Quantity::from(100_000),
1193 Quantity::from(100_000),
1194 UnixNanos::from(1),
1195 UnixNanos::from(2),
1196 );
1197 let msg: BitmexQuoteMsg = serde_json::from_str(&json_data).unwrap();
1198 let instrument = create_test_perpetual_instrument_with_precisions(2, 0);
1199 let quote = parse_quote_msg(
1200 &msg,
1201 &last_quote,
1202 &instrument,
1203 instrument_id,
1204 instrument.price_precision(),
1205 UnixNanos::from(3),
1206 );
1207
1208 assert_eq!(quote.instrument_id, instrument_id);
1209 assert_eq!(quote.bid_price, Price::from("487.55"));
1210 assert_eq!(quote.ask_price, Price::from("488.25"));
1211 assert_eq!(quote.bid_size, Quantity::from(103_000));
1212 assert_eq!(quote.ask_size, Quantity::from(50_000));
1213 assert_eq!(quote.ts_event, 1732315465085000000);
1214 assert_eq!(quote.ts_init, 3);
1215 }
1216
1217 #[rstest]
1218 fn test_trade_message() {
1219 let json_data = load_test_json("ws_trade.json");
1220
1221 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1222 let msg: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
1223 let instrument = create_test_perpetual_instrument();
1224 let trade = parse_trade_msg(
1225 &msg,
1226 &instrument,
1227 instrument.id(),
1228 instrument.price_precision(),
1229 UnixNanos::from(3),
1230 );
1231
1232 assert_eq!(trade.instrument_id, instrument_id);
1233 assert_eq!(trade.price, Price::from("98570.9"));
1234 assert_eq!(trade.size, Quantity::from(100));
1235 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1236 assert_eq!(
1237 trade.trade_id.to_string(),
1238 "00000000-006d-1000-0000-000e8737d536"
1239 );
1240 assert_eq!(trade.ts_event, 1732436138704000000); assert_eq!(trade.ts_init, 3);
1242 }
1243
1244 #[rstest]
1245 fn test_trade_bin_message() {
1246 let json_data = load_test_json("ws_trade_bin_1m.json");
1247
1248 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1249 let topic = BitmexWsTopic::TradeBin1m;
1250
1251 let msg: BitmexTradeBinMsg = serde_json::from_str(&json_data).unwrap();
1252 let instrument = create_test_perpetual_instrument();
1253 let bar = parse_trade_bin_msg(
1254 &msg,
1255 &topic,
1256 &instrument,
1257 instrument.id(),
1258 instrument.price_precision(),
1259 UnixNanos::from(3),
1260 );
1261
1262 assert_eq!(bar.instrument_id(), instrument_id);
1263 assert_eq!(
1264 bar.bar_type.spec(),
1265 BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
1266 );
1267 assert_eq!(bar.open, Price::from("97550.0"));
1268 assert_eq!(bar.high, Price::from("97584.4"));
1269 assert_eq!(bar.low, Price::from("97550.0"));
1270 assert_eq!(bar.close, Price::from("97570.1"));
1271 assert_eq!(bar.volume, Quantity::from(84_000));
1272 assert_eq!(bar.ts_event, 1732392420000000000); assert_eq!(bar.ts_init, 3);
1274 }
1275
1276 #[rstest]
1277 fn test_trade_bin_message_extreme_adjustment() {
1278 let topic = BitmexWsTopic::TradeBin1m;
1279 let instrument = create_test_perpetual_instrument();
1280
1281 let msg = BitmexTradeBinMsg {
1282 timestamp: DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
1283 .unwrap()
1284 .with_timezone(&Utc),
1285 symbol: Ustr::from("XBTUSD"),
1286 open: 50_000.0,
1287 high: 49_990.0,
1288 low: 50_010.0,
1289 close: 50_005.0,
1290 trades: 10,
1291 volume: 1_000,
1292 vwap: 0.0,
1293 last_size: 0,
1294 turnover: 0,
1295 home_notional: 0.0,
1296 foreign_notional: 0.0,
1297 };
1298
1299 let bar = parse_trade_bin_msg(
1300 &msg,
1301 &topic,
1302 &instrument,
1303 instrument.id(),
1304 instrument.price_precision(),
1305 UnixNanos::from(3),
1306 );
1307
1308 assert_eq!(bar.high, Price::from("50010.0"));
1309 assert_eq!(bar.low, Price::from("49990.0"));
1310 assert_eq!(bar.open, Price::from("50000.0"));
1311 assert_eq!(bar.close, Price::from("50005.0"));
1312 assert_eq!(bar.volume, Quantity::from(1_000));
1313 }
1314
1315 #[rstest]
1316 fn test_parse_order_msg() {
1317 let json_data = load_test_json("ws_order.json");
1318 let msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1319 let cache = DashMap::new();
1320 let instrument = create_test_perpetual_instrument();
1321 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1322
1323 assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1324 assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1325 assert_eq!(
1326 report.venue_order_id.to_string(),
1327 "550e8400-e29b-41d4-a716-446655440001"
1328 );
1329 assert_eq!(
1330 report.client_order_id.unwrap().to_string(),
1331 "mm_bitmex_1a/oemUeQ4CAJZgP3fjHsA"
1332 );
1333 assert_eq!(report.order_side, OrderSide::Buy);
1334 assert_eq!(report.order_type, OrderType::Limit);
1335 assert_eq!(report.time_in_force, TimeInForce::Gtc);
1336 assert_eq!(report.order_status, OrderStatus::Accepted);
1337 assert_eq!(report.quantity, Quantity::from(100));
1338 assert_eq!(report.filled_qty, Quantity::from(0));
1339 assert_eq!(report.price.unwrap(), Price::from("98000.0"));
1340 assert_eq!(report.ts_accepted, 1732530600000000000); }
1342
1343 #[rstest]
1344 fn test_parse_order_msg_infers_type_when_missing() {
1345 let json_data = load_test_json("ws_order.json");
1346 let mut msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1347 msg.ord_type = None;
1348 msg.cl_ord_id = None;
1349 msg.price = Some(98_000.0);
1350 msg.stop_px = None;
1351
1352 let cache = DashMap::new();
1353 let instrument = create_test_perpetual_instrument();
1354
1355 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1356
1357 assert_eq!(report.order_type, OrderType::Limit);
1358 }
1359
1360 #[rstest]
1361 fn test_parse_order_msg_rejected_with_reason() {
1362 let mut msg: BitmexOrderMsg =
1363 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1364 msg.ord_status = BitmexOrderStatus::Rejected;
1365 msg.ord_rej_reason = Some(Ustr::from("Insufficient available balance"));
1366 msg.text = None;
1367 msg.cum_qty = 0;
1368
1369 let cache = DashMap::new();
1370 let instrument = create_test_perpetual_instrument();
1371 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1372
1373 assert_eq!(report.order_status, OrderStatus::Rejected);
1374 assert_eq!(
1375 report.cancel_reason,
1376 Some("Insufficient available balance".to_string())
1377 );
1378 }
1379
1380 #[rstest]
1381 fn test_parse_order_msg_rejected_with_text_fallback() {
1382 let mut msg: BitmexOrderMsg =
1383 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1384 msg.ord_status = BitmexOrderStatus::Rejected;
1385 msg.ord_rej_reason = None;
1386 msg.text = Some(Ustr::from("Order would execute immediately"));
1387 msg.cum_qty = 0;
1388
1389 let cache = DashMap::new();
1390 let instrument = create_test_perpetual_instrument();
1391 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1392
1393 assert_eq!(report.order_status, OrderStatus::Rejected);
1394 assert_eq!(
1395 report.cancel_reason,
1396 Some("Order would execute immediately".to_string())
1397 );
1398 }
1399
1400 #[rstest]
1401 fn test_parse_order_msg_rejected_without_reason() {
1402 let mut msg: BitmexOrderMsg =
1403 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1404 msg.ord_status = BitmexOrderStatus::Rejected;
1405 msg.ord_rej_reason = None;
1406 msg.text = None;
1407 msg.cum_qty = 0;
1408
1409 let cache = DashMap::new();
1410 let instrument = create_test_perpetual_instrument();
1411 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1412
1413 assert_eq!(report.order_status, OrderStatus::Rejected);
1414 assert_eq!(report.cancel_reason, None);
1415 }
1416
1417 #[rstest]
1418 fn test_parse_execution_msg() {
1419 let json_data = load_test_json("ws_execution.json");
1420 let msg: BitmexExecutionMsg = serde_json::from_str(&json_data).unwrap();
1421 let instrument = create_test_perpetual_instrument();
1422 let fill = parse_execution_msg(msg, &instrument).unwrap();
1423
1424 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1425 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1426 assert_eq!(
1427 fill.venue_order_id.to_string(),
1428 "550e8400-e29b-41d4-a716-446655440002"
1429 );
1430 assert_eq!(
1431 fill.trade_id.to_string(),
1432 "00000000-006d-1000-0000-000e8737d540"
1433 );
1434 assert_eq!(
1435 fill.client_order_id.unwrap().to_string(),
1436 "mm_bitmex_2b/oemUeQ4CAJZgP3fjHsB"
1437 );
1438 assert_eq!(fill.order_side, OrderSide::Sell);
1439 assert_eq!(fill.last_qty, Quantity::from(100));
1440 assert_eq!(fill.last_px, Price::from("98950.0"));
1441 assert_eq!(fill.liquidity_side, LiquiditySide::Maker);
1442 assert_eq!(fill.commission, Money::new(0.00075, Currency::from("XBT")));
1443 assert_eq!(fill.commission.currency.code.to_string(), "XBT");
1444 assert_eq!(fill.ts_event, 1732530900789000000); }
1446
1447 #[rstest]
1448 fn test_parse_execution_msg_non_trade() {
1449 let mut msg: BitmexExecutionMsg =
1451 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1452 msg.exec_type = Some(BitmexExecType::Settlement);
1453
1454 let instrument = create_test_perpetual_instrument();
1455 let result = parse_execution_msg(msg, &instrument);
1456 assert!(result.is_none());
1457 }
1458
1459 #[rstest]
1460 fn test_parse_cancel_reject_execution() {
1461 let json = load_test_json("ws_execution_cancel_reject.json");
1463
1464 let msg: BitmexExecutionMsg = serde_json::from_str(&json).unwrap();
1465 assert_eq!(msg.exec_type, Some(BitmexExecType::CancelReject));
1466 assert_eq!(msg.ord_status, Some(BitmexOrderStatus::Rejected));
1467 assert_eq!(msg.symbol, None);
1468
1469 let instrument = create_test_perpetual_instrument();
1471 let result = parse_execution_msg(msg, &instrument);
1472 assert!(result.is_none());
1473 }
1474
1475 #[rstest]
1476 fn test_parse_execution_msg_liquidation() {
1477 let mut msg: BitmexExecutionMsg =
1479 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1480 msg.exec_type = Some(BitmexExecType::Liquidation);
1481
1482 let instrument = create_test_perpetual_instrument();
1483 let fill = parse_execution_msg(msg, &instrument).unwrap();
1484
1485 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1486 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1487 assert_eq!(fill.order_side, OrderSide::Sell);
1488 assert_eq!(fill.last_qty, Quantity::from(100));
1489 assert_eq!(fill.last_px, Price::from("98950.0"));
1490 }
1491
1492 #[rstest]
1493 fn test_parse_execution_msg_bankruptcy() {
1494 let mut msg: BitmexExecutionMsg =
1495 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1496 msg.exec_type = Some(BitmexExecType::Bankruptcy);
1497
1498 let instrument = create_test_perpetual_instrument();
1499 let fill = parse_execution_msg(msg, &instrument).unwrap();
1500
1501 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1502 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1503 assert_eq!(fill.order_side, OrderSide::Sell);
1504 assert_eq!(fill.last_qty, Quantity::from(100));
1505 }
1506
1507 #[rstest]
1508 fn test_parse_execution_msg_settlement() {
1509 let mut msg: BitmexExecutionMsg =
1510 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1511 msg.exec_type = Some(BitmexExecType::Settlement);
1512
1513 let instrument = create_test_perpetual_instrument();
1514 let result = parse_execution_msg(msg, &instrument);
1515 assert!(result.is_none());
1516 }
1517
1518 #[rstest]
1519 fn test_parse_execution_msg_trial_fill() {
1520 let mut msg: BitmexExecutionMsg =
1521 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1522 msg.exec_type = Some(BitmexExecType::TrialFill);
1523
1524 let instrument = create_test_perpetual_instrument();
1525 let result = parse_execution_msg(msg, &instrument);
1526 assert!(result.is_none());
1527 }
1528
1529 #[rstest]
1530 fn test_parse_execution_msg_funding() {
1531 let mut msg: BitmexExecutionMsg =
1532 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1533 msg.exec_type = Some(BitmexExecType::Funding);
1534
1535 let instrument = create_test_perpetual_instrument();
1536 let result = parse_execution_msg(msg, &instrument);
1537 assert!(result.is_none());
1538 }
1539
1540 #[rstest]
1541 fn test_parse_execution_msg_insurance() {
1542 let mut msg: BitmexExecutionMsg =
1543 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1544 msg.exec_type = Some(BitmexExecType::Insurance);
1545
1546 let instrument = create_test_perpetual_instrument();
1547 let result = parse_execution_msg(msg, &instrument);
1548 assert!(result.is_none());
1549 }
1550
1551 #[rstest]
1552 fn test_parse_execution_msg_rebalance() {
1553 let mut msg: BitmexExecutionMsg =
1554 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1555 msg.exec_type = Some(BitmexExecType::Rebalance);
1556
1557 let instrument = create_test_perpetual_instrument();
1558 let result = parse_execution_msg(msg, &instrument);
1559 assert!(result.is_none());
1560 }
1561
1562 #[rstest]
1563 fn test_parse_execution_msg_order_state_changes() {
1564 let instrument = create_test_perpetual_instrument();
1565
1566 let order_state_types = vec![
1567 BitmexExecType::New,
1568 BitmexExecType::Canceled,
1569 BitmexExecType::CancelReject,
1570 BitmexExecType::Replaced,
1571 BitmexExecType::Rejected,
1572 BitmexExecType::AmendReject,
1573 BitmexExecType::Suspended,
1574 BitmexExecType::Released,
1575 BitmexExecType::TriggeredOrActivatedBySystem,
1576 ];
1577
1578 for exec_type in order_state_types {
1579 let mut msg: BitmexExecutionMsg =
1580 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1581 msg.exec_type = Some(exec_type);
1582
1583 let result = parse_execution_msg(msg, &instrument);
1584 assert!(
1585 result.is_none(),
1586 "Expected None for exec_type {exec_type:?}"
1587 );
1588 }
1589 }
1590
1591 #[rstest]
1592 fn test_parse_position_msg() {
1593 let json_data = load_test_json("ws_position.json");
1594 let msg: BitmexPositionMsg = serde_json::from_str(&json_data).unwrap();
1595 let instrument = create_test_perpetual_instrument();
1596 let report = parse_position_msg(msg, &instrument);
1597
1598 assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1599 assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1600 assert_eq!(report.position_side.as_position_side(), PositionSide::Long);
1601 assert_eq!(report.quantity, Quantity::from(1000));
1602 assert!(report.venue_position_id.is_none());
1603 assert_eq!(report.ts_last, 1732530900789000000); }
1605
1606 #[rstest]
1607 fn test_parse_position_msg_short() {
1608 let mut msg: BitmexPositionMsg =
1609 serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1610 msg.current_qty = Some(-500);
1611
1612 let instrument = create_test_perpetual_instrument();
1613 let report = parse_position_msg(msg, &instrument);
1614 assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
1615 assert_eq!(report.quantity, Quantity::from(500));
1616 }
1617
1618 #[rstest]
1619 fn test_parse_position_msg_flat() {
1620 let mut msg: BitmexPositionMsg =
1621 serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1622 msg.current_qty = Some(0);
1623
1624 let instrument = create_test_perpetual_instrument();
1625 let report = parse_position_msg(msg, &instrument);
1626 assert_eq!(report.position_side.as_position_side(), PositionSide::Flat);
1627 assert_eq!(report.quantity, Quantity::from(0));
1628 }
1629
1630 #[rstest]
1631 fn test_parse_wallet_msg() {
1632 let json_data = load_test_json("ws_wallet.json");
1633 let msg: BitmexWalletMsg = serde_json::from_str(&json_data).unwrap();
1634 let ts_init = UnixNanos::from(1);
1635 let account_state = parse_wallet_msg(msg, ts_init);
1636
1637 assert_eq!(account_state.account_id.to_string(), "BITMEX-1234567");
1638 assert!(!account_state.balances.is_empty());
1639 let balance = &account_state.balances[0];
1640 assert_eq!(balance.currency.code.to_string(), "XBT");
1641 assert!((balance.total.as_f64() - 1.0000518).abs() < 1e-7);
1643 }
1644
1645 #[rstest]
1646 fn test_parse_wallet_msg_no_amount() {
1647 let mut msg: BitmexWalletMsg =
1648 serde_json::from_str(&load_test_json("ws_wallet.json")).unwrap();
1649 msg.amount = None;
1650
1651 let ts_init = UnixNanos::from(1);
1652 let account_state = parse_wallet_msg(msg, ts_init);
1653 let balance = &account_state.balances[0];
1654 assert_eq!(balance.total.as_f64(), 0.0);
1655 }
1656
1657 #[rstest]
1658 fn test_parse_margin_msg() {
1659 let json_data = load_test_json("ws_margin.json");
1660 let msg: BitmexMarginMsg = serde_json::from_str(&json_data).unwrap();
1661 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1662 let margin_balance = parse_margin_msg(msg, instrument_id);
1663
1664 assert_eq!(margin_balance.currency.code.to_string(), "XBT");
1665 assert_eq!(margin_balance.instrument_id, instrument_id);
1666 assert_eq!(margin_balance.initial.as_f64(), 0.0);
1669 assert!((margin_balance.maintenance.as_f64() - 0.00015949).abs() < 1e-8);
1671 }
1672
1673 #[rstest]
1674 fn test_parse_margin_msg_no_available() {
1675 let mut msg: BitmexMarginMsg =
1676 serde_json::from_str(&load_test_json("ws_margin.json")).unwrap();
1677 msg.available_margin = None;
1678
1679 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1680 let margin_balance = parse_margin_msg(msg, instrument_id);
1681 assert!(margin_balance.initial.as_f64() >= 0.0);
1683 assert!(margin_balance.maintenance.as_f64() >= 0.0);
1684 }
1685
1686 #[rstest]
1687 fn test_parse_instrument_msg_both_prices() {
1688 let json_data = load_test_json("ws_instrument.json");
1689 let msg: BitmexInstrumentMsg = serde_json::from_str(&json_data).unwrap();
1690
1691 let mut instruments_cache = AHashMap::new();
1693 let test_instrument = create_test_perpetual_instrument();
1694 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1695
1696 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1697
1698 assert_eq!(updates.len(), 2);
1700
1701 match &updates[0] {
1703 Data::MarkPriceUpdate(update) => {
1704 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1705 assert_eq!(update.value.as_f64(), 95125.7);
1706 }
1707 _ => panic!("Expected MarkPriceUpdate at index 0"),
1708 }
1709
1710 match &updates[1] {
1712 Data::IndexPriceUpdate(update) => {
1713 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1714 assert_eq!(update.value.as_f64(), 95124.3);
1715 }
1716 _ => panic!("Expected IndexPriceUpdate at index 1"),
1717 }
1718 }
1719
1720 #[rstest]
1721 fn test_parse_instrument_msg_mark_price_only() {
1722 let mut msg: BitmexInstrumentMsg =
1723 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1724 msg.index_price = None;
1725
1726 let mut instruments_cache = AHashMap::new();
1728 let test_instrument = create_test_perpetual_instrument();
1729 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1730
1731 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1732
1733 assert_eq!(updates.len(), 1);
1734 match &updates[0] {
1735 Data::MarkPriceUpdate(update) => {
1736 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1737 assert_eq!(update.value.as_f64(), 95125.7);
1738 }
1739 _ => panic!("Expected MarkPriceUpdate"),
1740 }
1741 }
1742
1743 #[rstest]
1744 fn test_parse_instrument_msg_index_price_only() {
1745 let mut msg: BitmexInstrumentMsg =
1746 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1747 msg.mark_price = None;
1748
1749 let mut instruments_cache = AHashMap::new();
1751 let test_instrument = create_test_perpetual_instrument();
1752 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1753
1754 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1755
1756 assert_eq!(updates.len(), 1);
1757 match &updates[0] {
1758 Data::IndexPriceUpdate(update) => {
1759 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1760 assert_eq!(update.value.as_f64(), 95124.3);
1761 }
1762 _ => panic!("Expected IndexPriceUpdate"),
1763 }
1764 }
1765
1766 #[rstest]
1767 fn test_parse_instrument_msg_no_prices() {
1768 let mut msg: BitmexInstrumentMsg =
1769 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1770 msg.mark_price = None;
1771 msg.index_price = None;
1772 msg.last_price = None;
1773
1774 let mut instruments_cache = AHashMap::new();
1776 let test_instrument = create_test_perpetual_instrument();
1777 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1778
1779 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1780 assert_eq!(updates.len(), 0);
1781 }
1782
1783 #[rstest]
1784 fn test_parse_instrument_msg_index_symbol() {
1785 let mut msg: BitmexInstrumentMsg =
1788 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1789 msg.symbol = Ustr::from(".BXBT");
1790 msg.last_price = Some(119163.05);
1791 msg.mark_price = Some(119163.05); msg.index_price = None;
1793
1794 let instrument_id = InstrumentId::from(".BXBT.BITMEX");
1796 let instrument = CryptoPerpetual::new(
1797 instrument_id,
1798 Symbol::from(".BXBT"),
1799 Currency::BTC(),
1800 Currency::USD(),
1801 Currency::USD(),
1802 false, 2, 8, Price::from("0.01"),
1806 Quantity::from("0.00000001"),
1807 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(), UnixNanos::default(), );
1822 let mut instruments_cache = AHashMap::new();
1823 instruments_cache.insert(
1824 Ustr::from(".BXBT"),
1825 InstrumentAny::CryptoPerpetual(instrument),
1826 );
1827
1828 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1829
1830 assert_eq!(updates.len(), 2);
1831
1832 match &updates[0] {
1834 Data::MarkPriceUpdate(update) => {
1835 assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1836 assert_eq!(update.value, Price::from("119163.05"));
1837 }
1838 _ => panic!("Expected MarkPriceUpdate for index symbol"),
1839 }
1840
1841 match &updates[1] {
1843 Data::IndexPriceUpdate(update) => {
1844 assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1845 assert_eq!(update.value, Price::from("119163.05"));
1846 assert_eq!(update.ts_init, UnixNanos::from(1));
1847 }
1848 _ => panic!("Expected IndexPriceUpdate for index symbol"),
1849 }
1850 }
1851
1852 #[rstest]
1853 fn test_parse_funding_msg() {
1854 let json_data = load_test_json("ws_funding_rate.json");
1855 let msg: BitmexFundingMsg = serde_json::from_str(&json_data).unwrap();
1856 let update = parse_funding_msg(msg, UnixNanos::from(1));
1857
1858 assert!(update.is_some());
1859 let update = update.unwrap();
1860
1861 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1862 assert_eq!(update.rate.to_string(), "0.0001");
1863 assert!(update.next_funding_ns.is_none());
1864 assert_eq!(update.ts_event, UnixNanos::from(1732507200000000000));
1865 assert_eq!(update.ts_init, UnixNanos::from(1));
1866 }
1867}