1use std::num::NonZero;
19
20use ahash::AHashMap;
21use dashmap::DashMap;
22use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime, uuid::UUID4};
23#[cfg(test)]
24use nautilus_model::types::Currency;
25use nautilus_model::{
26 data::{
27 Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
28 MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
29 depth::DEPTH10_LEN,
30 },
31 enums::{
32 AccountType, AggregationSource, BarAggregation, OrderSide, OrderStatus, OrderType,
33 PriceType, RecordFlag, TimeInForce, TriggerType,
34 },
35 events::{OrderUpdated, account::state::AccountState},
36 identifiers::{
37 AccountId, ClientOrderId, InstrumentId, OrderListId, StrategyId, Symbol, TradeId, TraderId,
38 VenueOrderId,
39 },
40 instruments::{Instrument, InstrumentAny},
41 reports::{FillReport, OrderStatusReport, PositionStatusReport},
42 types::{AccountBalance, MarginBalance, Money, Price, Quantity},
43};
44use rust_decimal::{Decimal, prelude::FromPrimitive};
45use ustr::Ustr;
46use uuid::Uuid;
47
48use super::{
49 enums::{BitmexAction, BitmexWsTopic},
50 messages::{
51 BitmexExecutionMsg, BitmexFundingMsg, BitmexInstrumentMsg, BitmexMarginMsg,
52 BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexOrderMsg, BitmexPositionMsg,
53 BitmexQuoteMsg, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg,
54 },
55};
56use crate::{
57 common::{
58 consts::BITMEX_VENUE,
59 enums::{BitmexExecInstruction, BitmexExecType, BitmexSide},
60 parse::{
61 clean_reason, map_bitmex_currency, normalize_trade_bin_prices,
62 normalize_trade_bin_volume, parse_contracts_quantity, parse_fractional_quantity,
63 parse_instrument_id, parse_liquidity_side, parse_optional_datetime_to_unix_nanos,
64 parse_position_side, parse_signed_contracts_quantity,
65 },
66 },
67 http::parse::get_currency,
68 websocket::messages::BitmexOrderUpdateMsg,
69};
70
71const BAR_SPEC_1_MINUTE: BarSpecification = BarSpecification {
72 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
73 aggregation: BarAggregation::Minute,
74 price_type: PriceType::Last,
75};
76const BAR_SPEC_5_MINUTE: BarSpecification = BarSpecification {
77 step: NonZero::new(5).expect("5 is a valid non-zero usize"),
78 aggregation: BarAggregation::Minute,
79 price_type: PriceType::Last,
80};
81const BAR_SPEC_1_HOUR: BarSpecification = BarSpecification {
82 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
83 aggregation: BarAggregation::Hour,
84 price_type: PriceType::Last,
85};
86const BAR_SPEC_1_DAY: BarSpecification = BarSpecification {
87 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
88 aggregation: BarAggregation::Day,
89 price_type: PriceType::Last,
90};
91
92#[inline]
100#[must_use]
101pub fn is_index_symbol(symbol: &Ustr) -> bool {
102 symbol.starts_with('.')
103}
104
105#[must_use]
107pub fn parse_book_msg_vec(
108 data: Vec<BitmexOrderBookMsg>,
109 action: BitmexAction,
110 instruments: &AHashMap<Ustr, InstrumentAny>,
111 ts_init: UnixNanos,
112) -> Vec<Data> {
113 let mut deltas = Vec::with_capacity(data.len());
114
115 for msg in data {
116 if let Some(instrument) = instruments.get(&msg.symbol) {
117 let instrument_id = instrument.id();
118 let price_precision = instrument.price_precision();
119 deltas.push(Data::Delta(parse_book_msg(
120 &msg,
121 &action,
122 instrument,
123 instrument_id,
124 price_precision,
125 ts_init,
126 )));
127 } else {
128 tracing::error!(
129 "Instrument cache miss: book delta dropped for symbol={}",
130 msg.symbol
131 );
132 }
133 }
134 deltas
135}
136
137#[must_use]
139pub fn parse_book10_msg_vec(
140 data: Vec<BitmexOrderBook10Msg>,
141 instruments: &AHashMap<Ustr, InstrumentAny>,
142 ts_init: UnixNanos,
143) -> Vec<Data> {
144 let mut depths = Vec::with_capacity(data.len());
145
146 for msg in data {
147 if let Some(instrument) = instruments.get(&msg.symbol) {
148 let instrument_id = instrument.id();
149 let price_precision = instrument.price_precision();
150 depths.push(Data::Depth10(Box::new(parse_book10_msg(
151 &msg,
152 instrument,
153 instrument_id,
154 price_precision,
155 ts_init,
156 ))));
157 } else {
158 tracing::error!(
159 "Instrument cache miss: depth10 message dropped for symbol={}",
160 msg.symbol
161 );
162 }
163 }
164 depths
165}
166
167#[must_use]
169pub fn parse_trade_msg_vec(
170 data: Vec<BitmexTradeMsg>,
171 instruments: &AHashMap<Ustr, InstrumentAny>,
172 ts_init: UnixNanos,
173) -> Vec<Data> {
174 let mut trades = Vec::with_capacity(data.len());
175
176 for msg in data {
177 if let Some(instrument) = instruments.get(&msg.symbol) {
178 let instrument_id = instrument.id();
179 let price_precision = instrument.price_precision();
180 trades.push(Data::Trade(parse_trade_msg(
181 &msg,
182 instrument,
183 instrument_id,
184 price_precision,
185 ts_init,
186 )));
187 } else {
188 tracing::error!(
189 "Instrument cache miss: trade message dropped for symbol={}",
190 msg.symbol
191 );
192 }
193 }
194 trades
195}
196
197#[must_use]
199pub fn parse_trade_bin_msg_vec(
200 data: Vec<BitmexTradeBinMsg>,
201 topic: BitmexWsTopic,
202 instruments: &AHashMap<Ustr, InstrumentAny>,
203 ts_init: UnixNanos,
204) -> Vec<Data> {
205 let mut trades = Vec::with_capacity(data.len());
206
207 for msg in data {
208 if let Some(instrument) = instruments.get(&msg.symbol) {
209 let instrument_id = instrument.id();
210 let price_precision = instrument.price_precision();
211 trades.push(Data::Bar(parse_trade_bin_msg(
212 &msg,
213 &topic,
214 instrument,
215 instrument_id,
216 price_precision,
217 ts_init,
218 )));
219 } else {
220 tracing::error!(
221 "Instrument cache miss: trade bin (bar) dropped for symbol={}",
222 msg.symbol
223 );
224 }
225 }
226 trades
227}
228
229#[allow(clippy::too_many_arguments)]
231#[must_use]
232pub fn parse_book_msg(
233 msg: &BitmexOrderBookMsg,
234 action: &BitmexAction,
235 instrument: &InstrumentAny,
236 instrument_id: InstrumentId,
237 price_precision: u8,
238 ts_init: UnixNanos,
239) -> OrderBookDelta {
240 let flags = if action == &BitmexAction::Insert {
241 RecordFlag::F_SNAPSHOT as u8
242 } else {
243 0
244 };
245
246 let action = action.as_book_action();
247 let price = Price::new(msg.price, price_precision);
248 let side = msg.side.as_order_side();
249 let size = parse_contracts_quantity(msg.size.unwrap_or(0), instrument);
250 let order_id = msg.id;
251 let order = BookOrder::new(side, price, size, order_id);
252 let sequence = 0; let ts_event = UnixNanos::from(msg.timestamp);
254
255 OrderBookDelta::new(
256 instrument_id,
257 action,
258 order,
259 flags,
260 sequence,
261 ts_event,
262 ts_init,
263 )
264}
265
266#[allow(clippy::too_many_arguments)]
272#[must_use]
273pub fn parse_book10_msg(
274 msg: &BitmexOrderBook10Msg,
275 instrument: &InstrumentAny,
276 instrument_id: InstrumentId,
277 price_precision: u8,
278 ts_init: UnixNanos,
279) -> OrderBookDepth10 {
280 let mut bids = Vec::with_capacity(DEPTH10_LEN);
281 let mut asks = Vec::with_capacity(DEPTH10_LEN);
282
283 let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
285 let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
286
287 for (i, level) in msg.bids.iter().enumerate() {
288 let bid_order = BookOrder::new(
289 OrderSide::Buy,
290 Price::new(level[0], price_precision),
291 parse_fractional_quantity(level[1], instrument),
292 0,
293 );
294
295 bids.push(bid_order);
296 bid_counts[i] = 1;
297 }
298
299 for (i, level) in msg.asks.iter().enumerate() {
300 let ask_order = BookOrder::new(
301 OrderSide::Sell,
302 Price::new(level[0], price_precision),
303 parse_fractional_quantity(level[1], instrument),
304 0,
305 );
306
307 asks.push(ask_order);
308 ask_counts[i] = 1;
309 }
310
311 let bids: [BookOrder; DEPTH10_LEN] = bids
312 .try_into()
313 .inspect_err(|v: &Vec<BookOrder>| {
314 tracing::error!("Bids length mismatch: expected 10, was {}", v.len());
315 })
316 .expect("BitMEX orderBook10 should always have exactly 10 bid levels");
317 let asks: [BookOrder; DEPTH10_LEN] = asks
318 .try_into()
319 .inspect_err(|v: &Vec<BookOrder>| {
320 tracing::error!("Asks length mismatch: expected 10, was {}", v.len());
321 })
322 .expect("BitMEX orderBook10 should always have exactly 10 ask levels");
323
324 let ts_event = UnixNanos::from(msg.timestamp);
325
326 OrderBookDepth10::new(
327 instrument_id,
328 bids,
329 asks,
330 bid_counts,
331 ask_counts,
332 RecordFlag::F_SNAPSHOT as u8,
333 0, ts_event,
335 ts_init,
336 )
337}
338
339#[must_use]
341pub fn parse_quote_msg(
342 msg: &BitmexQuoteMsg,
343 last_quote: &QuoteTick,
344 instrument: &InstrumentAny,
345 instrument_id: InstrumentId,
346 price_precision: u8,
347 ts_init: UnixNanos,
348) -> QuoteTick {
349 let bid_price = match msg.bid_price {
350 Some(price) => Price::new(price, price_precision),
351 None => last_quote.bid_price,
352 };
353
354 let ask_price = match msg.ask_price {
355 Some(price) => Price::new(price, price_precision),
356 None => last_quote.ask_price,
357 };
358
359 let bid_size = match msg.bid_size {
360 Some(size) => parse_contracts_quantity(size, instrument),
361 None => last_quote.bid_size,
362 };
363
364 let ask_size = match msg.ask_size {
365 Some(size) => parse_contracts_quantity(size, instrument),
366 None => last_quote.ask_size,
367 };
368
369 let ts_event = UnixNanos::from(msg.timestamp);
370
371 QuoteTick::new(
372 instrument_id,
373 bid_price,
374 ask_price,
375 bid_size,
376 ask_size,
377 ts_event,
378 ts_init,
379 )
380}
381
382#[must_use]
384pub fn parse_trade_msg(
385 msg: &BitmexTradeMsg,
386 instrument: &InstrumentAny,
387 instrument_id: InstrumentId,
388 price_precision: u8,
389 ts_init: UnixNanos,
390) -> TradeTick {
391 let price = Price::new(msg.price, price_precision);
392 let size = parse_contracts_quantity(msg.size, instrument);
393 let aggressor_side = msg.side.as_aggressor_side();
394 let trade_id = TradeId::new(
395 msg.trd_match_id
396 .map_or_else(|| Uuid::new_v4().to_string(), |uuid| uuid.to_string()),
397 );
398 let ts_event = UnixNanos::from(msg.timestamp);
399
400 TradeTick::new(
401 instrument_id,
402 price,
403 size,
404 aggressor_side,
405 trade_id,
406 ts_event,
407 ts_init,
408 )
409}
410
411#[must_use]
413pub fn parse_trade_bin_msg(
414 msg: &BitmexTradeBinMsg,
415 topic: &BitmexWsTopic,
416 instrument: &InstrumentAny,
417 instrument_id: InstrumentId,
418 price_precision: u8,
419 ts_init: UnixNanos,
420) -> Bar {
421 let spec = bar_spec_from_topic(topic);
422 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
423
424 let open = Price::new(msg.open, price_precision);
425 let high = Price::new(msg.high, price_precision);
426 let low = Price::new(msg.low, price_precision);
427 let close = Price::new(msg.close, price_precision);
428
429 let (open, high, low, close) =
430 normalize_trade_bin_prices(open, high, low, close, &msg.symbol, Some(&bar_type));
431
432 let volume_contracts = normalize_trade_bin_volume(Some(msg.volume), &msg.symbol);
433 let volume = parse_contracts_quantity(volume_contracts, instrument);
434 let ts_event = UnixNanos::from(msg.timestamp);
435
436 Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
437}
438
439#[must_use]
445pub fn bar_spec_from_topic(topic: &BitmexWsTopic) -> BarSpecification {
446 match topic {
447 BitmexWsTopic::TradeBin1m => BAR_SPEC_1_MINUTE,
448 BitmexWsTopic::TradeBin5m => BAR_SPEC_5_MINUTE,
449 BitmexWsTopic::TradeBin1h => BAR_SPEC_1_HOUR,
450 BitmexWsTopic::TradeBin1d => BAR_SPEC_1_DAY,
451 _ => {
452 tracing::error!(topic = ?topic, "Bar specification not supported");
453 BAR_SPEC_1_MINUTE
454 }
455 }
456}
457
458#[must_use]
464pub fn topic_from_bar_spec(spec: BarSpecification) -> BitmexWsTopic {
465 match spec {
466 BAR_SPEC_1_MINUTE => BitmexWsTopic::TradeBin1m,
467 BAR_SPEC_5_MINUTE => BitmexWsTopic::TradeBin5m,
468 BAR_SPEC_1_HOUR => BitmexWsTopic::TradeBin1h,
469 BAR_SPEC_1_DAY => BitmexWsTopic::TradeBin1d,
470 _ => {
471 tracing::error!(spec = ?spec, "Bar specification not supported");
472 BitmexWsTopic::TradeBin1m
473 }
474 }
475}
476
477fn infer_order_type_from_msg(msg: &BitmexOrderMsg) -> Option<OrderType> {
478 if msg.stop_px.is_some() {
479 if msg.price.is_some() {
480 Some(OrderType::StopLimit)
481 } else {
482 Some(OrderType::StopMarket)
483 }
484 } else if msg.price.is_some() {
485 Some(OrderType::Limit)
486 } else {
487 Some(OrderType::Market)
488 }
489}
490
491pub fn parse_order_msg(
505 msg: &BitmexOrderMsg,
506 instrument: &InstrumentAny,
507 order_type_cache: &DashMap<ClientOrderId, OrderType>,
508) -> anyhow::Result<OrderStatusReport> {
509 let account_id = AccountId::new(format!("BITMEX-{}", msg.account)); let instrument_id = parse_instrument_id(msg.symbol);
511 let venue_order_id = VenueOrderId::new(msg.order_id.to_string());
512 let common_side: BitmexSide = msg.side.into();
513 let order_side: OrderSide = common_side.into();
514
515 let order_type: OrderType = if let Some(ord_type) = msg.ord_type {
516 ord_type.into()
517 } else if let Some(client_order_id) = msg.cl_ord_id {
518 let client_order_id = ClientOrderId::new(client_order_id);
519 if let Some(entry) = order_type_cache.get(&client_order_id) {
520 *entry.value()
521 } else if let Some(inferred) = infer_order_type_from_msg(msg) {
522 order_type_cache.insert(client_order_id, inferred);
523 inferred
524 } else {
525 anyhow::bail!(
526 "Order type not found in cache for client_order_id: {client_order_id} (order missing ord_type field)"
527 );
528 }
529 } else if let Some(inferred) = infer_order_type_from_msg(msg) {
530 inferred
531 } else {
532 anyhow::bail!("Order missing both ord_type and cl_ord_id");
533 };
534
535 let time_in_force: TimeInForce = match msg.time_in_force {
536 Some(tif) => tif.try_into().map_err(|e| anyhow::anyhow!("{e}"))?,
537 None => TimeInForce::Gtc,
538 };
539 let order_status: OrderStatus = msg.ord_status.into();
540 let quantity = parse_signed_contracts_quantity(msg.order_qty, instrument);
541 let filled_qty = parse_signed_contracts_quantity(msg.cum_qty, instrument);
542 let report_id = UUID4::new();
543 let ts_accepted =
544 parse_optional_datetime_to_unix_nanos(&Some(msg.transact_time), "transact_time");
545 let ts_last = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
546 let ts_init = get_atomic_clock_realtime().get_time_ns();
547
548 let mut report = OrderStatusReport::new(
549 account_id,
550 instrument_id,
551 None, venue_order_id,
553 order_side,
554 order_type,
555 time_in_force,
556 order_status,
557 quantity,
558 filled_qty,
559 ts_accepted,
560 ts_last,
561 ts_init,
562 Some(report_id),
563 );
564
565 if let Some(cl_ord_id) = &msg.cl_ord_id {
566 report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
567 }
568
569 if let Some(cl_ord_link_id) = &msg.cl_ord_link_id {
570 report = report.with_order_list_id(OrderListId::new(cl_ord_link_id));
571 }
572
573 if let Some(price) = msg.price {
574 report = report.with_price(Price::new(price, instrument.price_precision()));
575 }
576
577 if let Some(avg_px) = msg.avg_px {
578 report = report.with_avg_px(avg_px)?;
579 }
580
581 if let Some(trigger_price) = msg.stop_px {
582 let trigger_type = if let Some(exec_insts) = &msg.exec_inst {
583 if exec_insts.contains(&BitmexExecInstruction::MarkPrice) {
585 TriggerType::MarkPrice
586 } else if exec_insts.contains(&BitmexExecInstruction::IndexPrice) {
587 TriggerType::IndexPrice
588 } else if exec_insts.contains(&BitmexExecInstruction::LastPrice) {
589 TriggerType::LastPrice
590 } else {
591 TriggerType::Default
592 }
593 } else {
594 TriggerType::Default };
596
597 report = report
598 .with_trigger_price(Price::new(trigger_price, instrument.price_precision()))
599 .with_trigger_type(trigger_type);
600 }
601
602 if let Some(exec_insts) = &msg.exec_inst {
603 for exec_inst in exec_insts {
604 match exec_inst {
605 BitmexExecInstruction::ParticipateDoNotInitiate => {
606 report = report.with_post_only(true);
607 }
608 BitmexExecInstruction::ReduceOnly => {
609 report = report.with_reduce_only(true);
610 }
611 _ => {}
612 }
613 }
614 }
615
616 if order_status == OrderStatus::Rejected {
618 if let Some(reason_str) = msg.ord_rej_reason.or(msg.text) {
619 tracing::debug!(
620 order_id = ?venue_order_id,
621 client_order_id = ?msg.cl_ord_id,
622 reason = ?reason_str,
623 "Order rejected with reason"
624 );
625 report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
626 } else {
627 tracing::debug!(
628 order_id = ?venue_order_id,
629 client_order_id = ?msg.cl_ord_id,
630 ord_status = ?msg.ord_status,
631 ord_rej_reason = ?msg.ord_rej_reason,
632 text = ?msg.text,
633 "Order rejected without reason from BitMEX"
634 );
635 }
636 }
637
638 if order_status == OrderStatus::Canceled
641 && let Some(reason_str) = msg.ord_rej_reason.or(msg.text)
642 {
643 report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
644 }
645
646 Ok(report)
647}
648
649pub fn parse_order_update_msg(
653 msg: &BitmexOrderUpdateMsg,
654 instrument: &InstrumentAny,
655 account_id: AccountId,
656) -> Option<OrderUpdated> {
657 let trader_id = TraderId::external();
660 let strategy_id = StrategyId::external();
661 let instrument_id = parse_instrument_id(msg.symbol);
662 let venue_order_id = Some(VenueOrderId::new(msg.order_id.to_string()));
663 let client_order_id = msg
664 .cl_ord_id
665 .map_or_else(ClientOrderId::external, ClientOrderId::new);
666 let quantity = Quantity::zero(instrument.size_precision());
667 let price = msg
668 .price
669 .map(|p| Price::new(p, instrument.price_precision()));
670
671 let trigger_price = None;
673 let protection_price = None;
675
676 let event_id = UUID4::new();
677 let ts_event = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
678 let ts_init = get_atomic_clock_realtime().get_time_ns();
679
680 Some(OrderUpdated::new(
681 trader_id,
682 strategy_id,
683 instrument_id,
684 client_order_id,
685 quantity,
686 event_id,
687 ts_event,
688 ts_init,
689 false, venue_order_id,
691 Some(account_id),
692 price,
693 trigger_price,
694 protection_price,
695 ))
696}
697
698pub fn parse_execution_msg(
717 msg: BitmexExecutionMsg,
718 instrument: &InstrumentAny,
719) -> Option<FillReport> {
720 let exec_type = msg.exec_type?;
721
722 match exec_type {
723 BitmexExecType::Trade | BitmexExecType::Liquidation => {}
725 BitmexExecType::Bankruptcy => {
726 tracing::warn!(
727 exec_type = ?exec_type,
728 order_id = ?msg.order_id,
729 symbol = ?msg.symbol,
730 "Processing bankruptcy execution as fill"
731 );
732 }
733
734 BitmexExecType::Settlement => {
736 tracing::debug!(
737 exec_type = ?exec_type,
738 order_id = ?msg.order_id,
739 symbol = ?msg.symbol,
740 "Settlement execution skipped (not a fill): applies quanto conversion/PnL transfer on contract settlement"
741 );
742 return None;
743 }
744 BitmexExecType::TrialFill => {
745 tracing::warn!(
746 exec_type = ?exec_type,
747 order_id = ?msg.order_id,
748 symbol = ?msg.symbol,
749 "Trial fill execution received (testnet only), not processed as fill"
750 );
751 return None;
752 }
753
754 BitmexExecType::Funding => {
756 tracing::debug!(
757 exec_type = ?exec_type,
758 order_id = ?msg.order_id,
759 symbol = ?msg.symbol,
760 "Funding execution skipped (not a fill)"
761 );
762 return None;
763 }
764 BitmexExecType::Insurance => {
765 tracing::debug!(
766 exec_type = ?exec_type,
767 order_id = ?msg.order_id,
768 symbol = ?msg.symbol,
769 "Insurance execution skipped (not a fill)"
770 );
771 return None;
772 }
773 BitmexExecType::Rebalance => {
774 tracing::debug!(
775 exec_type = ?exec_type,
776 order_id = ?msg.order_id,
777 symbol = ?msg.symbol,
778 "Rebalance execution skipped (not a fill)"
779 );
780 return None;
781 }
782
783 BitmexExecType::New
785 | BitmexExecType::Canceled
786 | BitmexExecType::CancelReject
787 | BitmexExecType::Replaced
788 | BitmexExecType::Rejected
789 | BitmexExecType::AmendReject
790 | BitmexExecType::Suspended
791 | BitmexExecType::Released
792 | BitmexExecType::TriggeredOrActivatedBySystem => {
793 tracing::debug!(
794 exec_type = ?exec_type,
795 order_id = ?msg.order_id,
796 "Execution message skipped (order state change, not a fill)"
797 );
798 return None;
799 }
800 }
801
802 let account_id = AccountId::new(format!("BITMEX-{}", msg.account?));
803 let instrument_id = parse_instrument_id(msg.symbol?);
804 let venue_order_id = VenueOrderId::new(msg.order_id?.to_string());
805 let trade_id = TradeId::new(msg.trd_match_id?.to_string());
806 let order_side: OrderSide = msg.side.map_or(OrderSide::NoOrderSide, |s| {
807 let side: BitmexSide = s.into();
808 side.into()
809 });
810 let last_qty = parse_signed_contracts_quantity(msg.last_qty?, instrument);
811 let last_px = Price::new(msg.last_px?, instrument.price_precision());
812 let settlement_currency_str = msg.settl_currency.unwrap_or(Ustr::from("XBT"));
813 let mapped_currency = map_bitmex_currency(settlement_currency_str.as_str());
814 let currency = get_currency(&mapped_currency);
815 let commission = Money::new(msg.commission.unwrap_or(0.0), currency);
816 let liquidity_side = parse_liquidity_side(&msg.last_liquidity_ind);
817 let client_order_id = msg.cl_ord_id.map(ClientOrderId::new);
818 let venue_position_id = None; let ts_event = parse_optional_datetime_to_unix_nanos(&msg.transact_time, "transact_time");
820 let ts_init = get_atomic_clock_realtime().get_time_ns();
821
822 Some(FillReport::new(
823 account_id,
824 instrument_id,
825 venue_order_id,
826 trade_id,
827 order_side,
828 last_qty,
829 last_px,
830 commission,
831 liquidity_side,
832 client_order_id,
833 venue_position_id,
834 ts_event,
835 ts_init,
836 None,
837 ))
838}
839
840#[must_use]
846pub fn parse_position_msg(
847 msg: BitmexPositionMsg,
848 instrument: &InstrumentAny,
849) -> PositionStatusReport {
850 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
851 let instrument_id = parse_instrument_id(msg.symbol);
852 let position_side = parse_position_side(msg.current_qty).as_specified();
853 let quantity = parse_signed_contracts_quantity(msg.current_qty.unwrap_or(0), instrument);
854 let venue_position_id = None; let avg_px_open = msg.avg_entry_price.and_then(Decimal::from_f64);
856 let ts_last = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
857 let ts_init = get_atomic_clock_realtime().get_time_ns();
858
859 PositionStatusReport::new(
860 account_id,
861 instrument_id,
862 position_side,
863 quantity,
864 ts_last,
865 ts_init,
866 None, venue_position_id, avg_px_open, )
870}
871
872#[must_use]
885pub fn parse_instrument_msg(
886 msg: BitmexInstrumentMsg,
887 instruments_cache: &AHashMap<Ustr, InstrumentAny>,
888 ts_init: UnixNanos,
889) -> Vec<Data> {
890 let mut updates = Vec::new();
891 let is_index = is_index_symbol(&msg.symbol);
892
893 let effective_index_price = if is_index {
896 msg.last_price
897 } else {
898 msg.index_price
899 };
900
901 if msg.mark_price.is_none() && effective_index_price.is_none() {
905 return updates;
906 }
907
908 let instrument_id = InstrumentId::new(Symbol::from_ustr_unchecked(msg.symbol), *BITMEX_VENUE);
909 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
910
911 let price_precision = match instruments_cache.get(&Ustr::from(&msg.symbol)) {
913 Some(instrument) => instrument.price_precision(),
914 None => {
915 if is_index {
919 tracing::trace!(
920 "Index instrument {} not in cache, skipping update",
921 msg.symbol
922 );
923 } else {
924 tracing::debug!("Instrument {} not in cache, skipping update", msg.symbol);
925 }
926 return updates;
927 }
928 };
929
930 if let Some(mark_price) = msg.mark_price {
933 let price = Price::new(mark_price, price_precision);
934 updates.push(Data::MarkPriceUpdate(MarkPriceUpdate::new(
935 instrument_id,
936 price,
937 ts_event,
938 ts_init,
939 )));
940 }
941
942 if let Some(index_price) = effective_index_price {
944 let price = Price::new(index_price, price_precision);
945 updates.push(Data::IndexPriceUpdate(IndexPriceUpdate::new(
946 instrument_id,
947 price,
948 ts_event,
949 ts_init,
950 )));
951 }
952
953 updates
954}
955
956pub fn parse_funding_msg(msg: BitmexFundingMsg, ts_init: UnixNanos) -> Option<FundingRateUpdate> {
962 let instrument_id = InstrumentId::from(format!("{}.BITMEX", msg.symbol).as_str());
963 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
964
965 let rate = match Decimal::try_from(msg.funding_rate) {
967 Ok(rate) => rate,
968 Err(e) => {
969 tracing::error!("Failed to parse funding rate: {e}");
970 return None;
971 }
972 };
973
974 Some(FundingRateUpdate::new(
975 instrument_id,
976 rate,
977 None, ts_event,
979 ts_init,
980 ))
981}
982
983#[must_use]
992pub fn parse_wallet_msg(msg: BitmexWalletMsg, ts_init: UnixNanos) -> AccountState {
993 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
994
995 let currency_str = map_bitmex_currency(msg.currency.as_str());
997 let currency = get_currency(¤cy_str);
998
999 let divisor = if msg.currency == "XBt" {
1001 100_000_000.0 } else if msg.currency == "USDt" || msg.currency == "LAMp" {
1003 1_000_000.0 } else {
1005 1.0
1006 };
1007 let amount = msg.amount.unwrap_or(0) as f64 / divisor;
1008
1009 let total = Money::new(amount, currency);
1010 let locked = Money::new(0.0, currency); let free = total - locked;
1012
1013 let balance = AccountBalance::new_checked(total, locked, free)
1014 .expect("Balance calculation should be valid");
1015
1016 AccountState::new(
1017 account_id,
1018 AccountType::Margin,
1019 vec![balance],
1020 vec![], true, UUID4::new(),
1023 ts_init,
1024 ts_init,
1025 None,
1026 )
1027}
1028
1029#[must_use]
1033pub fn parse_margin_msg(msg: BitmexMarginMsg, instrument_id: InstrumentId) -> MarginBalance {
1034 let currency_str = map_bitmex_currency(msg.currency.as_str());
1036 let currency = get_currency(¤cy_str);
1037
1038 let divisor = if msg.currency == "XBt" {
1040 100_000_000.0 } else if msg.currency == "USDt" || msg.currency == "LAMp" {
1042 1_000_000.0 } else {
1044 1.0
1045 };
1046
1047 let initial = (msg.init_margin.unwrap_or(0) as f64 / divisor).max(0.0);
1048 let maintenance = (msg.maint_margin.unwrap_or(0) as f64 / divisor).max(0.0);
1049 let _unrealized = msg.unrealised_pnl.unwrap_or(0) as f64 / divisor;
1050
1051 MarginBalance::new(
1052 Money::new(initial, currency),
1053 Money::new(maintenance, currency),
1054 instrument_id,
1055 )
1056}
1057
1058#[cfg(test)]
1059mod tests {
1060 use chrono::{DateTime, Utc};
1061 use nautilus_model::{
1062 enums::{AggressorSide, BookAction, LiquiditySide, PositionSide},
1063 identifiers::Symbol,
1064 instruments::crypto_perpetual::CryptoPerpetual,
1065 };
1066 use rstest::rstest;
1067 use ustr::Ustr;
1068
1069 use super::*;
1070 use crate::common::{
1071 enums::{BitmexExecType, BitmexOrderStatus},
1072 testing::load_test_json,
1073 };
1074
1075 fn create_test_perpetual_instrument_with_precisions(
1077 price_precision: u8,
1078 size_precision: u8,
1079 ) -> InstrumentAny {
1080 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
1081 InstrumentId::from("XBTUSD.BITMEX"),
1082 Symbol::new("XBTUSD"),
1083 Currency::BTC(),
1084 Currency::USD(),
1085 Currency::BTC(),
1086 true, price_precision,
1088 size_precision,
1089 Price::new(0.5, price_precision),
1090 Quantity::new(1.0, size_precision),
1091 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
1104 UnixNanos::default(),
1105 ))
1106 }
1107
1108 fn create_test_perpetual_instrument() -> InstrumentAny {
1109 create_test_perpetual_instrument_with_precisions(1, 0)
1110 }
1111
1112 #[rstest]
1113 fn test_orderbook_l2_message() {
1114 let json_data = load_test_json("ws_orderbook_l2.json");
1115
1116 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1117 let msg: BitmexOrderBookMsg = serde_json::from_str(&json_data).unwrap();
1118
1119 let instrument = create_test_perpetual_instrument();
1121 let delta = parse_book_msg(
1122 &msg,
1123 &BitmexAction::Insert,
1124 &instrument,
1125 instrument.id(),
1126 instrument.price_precision(),
1127 UnixNanos::from(3),
1128 );
1129 assert_eq!(delta.instrument_id, instrument_id);
1130 assert_eq!(delta.order.price, Price::from("98459.9"));
1131 assert_eq!(delta.order.size, Quantity::from(33000));
1132 assert_eq!(delta.order.side, OrderSide::Sell);
1133 assert_eq!(delta.order.order_id, 62400580205);
1134 assert_eq!(delta.action, BookAction::Add);
1135 assert_eq!(delta.flags, RecordFlag::F_SNAPSHOT as u8);
1136 assert_eq!(delta.sequence, 0);
1137 assert_eq!(delta.ts_event, 1732436782356000000); assert_eq!(delta.ts_init, 3);
1139
1140 let delta = parse_book_msg(
1142 &msg,
1143 &BitmexAction::Update,
1144 &instrument,
1145 instrument.id(),
1146 instrument.price_precision(),
1147 UnixNanos::from(3),
1148 );
1149 assert_eq!(delta.flags, 0);
1150 assert_eq!(delta.action, BookAction::Update);
1151 }
1152
1153 #[rstest]
1154 fn test_orderbook10_message() {
1155 let json_data = load_test_json("ws_orderbook_10.json");
1156 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1157 let msg: BitmexOrderBook10Msg = serde_json::from_str(&json_data).unwrap();
1158 let instrument = create_test_perpetual_instrument();
1159 let depth10 = parse_book10_msg(
1160 &msg,
1161 &instrument,
1162 instrument.id(),
1163 instrument.price_precision(),
1164 UnixNanos::from(3),
1165 );
1166
1167 assert_eq!(depth10.instrument_id, instrument_id);
1168
1169 assert_eq!(depth10.bids[0].price, Price::from("98490.3"));
1171 assert_eq!(depth10.bids[0].size, Quantity::from(22400));
1172 assert_eq!(depth10.bids[0].side, OrderSide::Buy);
1173
1174 assert_eq!(depth10.asks[0].price, Price::from("98490.4"));
1176 assert_eq!(depth10.asks[0].size, Quantity::from(17600));
1177 assert_eq!(depth10.asks[0].side, OrderSide::Sell);
1178
1179 assert_eq!(depth10.bid_counts, [1; DEPTH10_LEN]);
1181 assert_eq!(depth10.ask_counts, [1; DEPTH10_LEN]);
1182
1183 assert_eq!(depth10.sequence, 0);
1185 assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
1186 assert_eq!(depth10.ts_event, 1732436353513000000); assert_eq!(depth10.ts_init, 3);
1188 }
1189
1190 #[rstest]
1191 fn test_quote_message() {
1192 let json_data = load_test_json("ws_quote.json");
1193
1194 let instrument_id = InstrumentId::from("BCHUSDT.BITMEX");
1195 let last_quote = QuoteTick::new(
1196 instrument_id,
1197 Price::new(487.50, 2),
1198 Price::new(488.20, 2),
1199 Quantity::from(100_000),
1200 Quantity::from(100_000),
1201 UnixNanos::from(1),
1202 UnixNanos::from(2),
1203 );
1204 let msg: BitmexQuoteMsg = serde_json::from_str(&json_data).unwrap();
1205 let instrument = create_test_perpetual_instrument_with_precisions(2, 0);
1206 let quote = parse_quote_msg(
1207 &msg,
1208 &last_quote,
1209 &instrument,
1210 instrument_id,
1211 instrument.price_precision(),
1212 UnixNanos::from(3),
1213 );
1214
1215 assert_eq!(quote.instrument_id, instrument_id);
1216 assert_eq!(quote.bid_price, Price::from("487.55"));
1217 assert_eq!(quote.ask_price, Price::from("488.25"));
1218 assert_eq!(quote.bid_size, Quantity::from(103_000));
1219 assert_eq!(quote.ask_size, Quantity::from(50_000));
1220 assert_eq!(quote.ts_event, 1732315465085000000);
1221 assert_eq!(quote.ts_init, 3);
1222 }
1223
1224 #[rstest]
1225 fn test_trade_message() {
1226 let json_data = load_test_json("ws_trade.json");
1227
1228 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1229 let msg: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
1230 let instrument = create_test_perpetual_instrument();
1231 let trade = parse_trade_msg(
1232 &msg,
1233 &instrument,
1234 instrument.id(),
1235 instrument.price_precision(),
1236 UnixNanos::from(3),
1237 );
1238
1239 assert_eq!(trade.instrument_id, instrument_id);
1240 assert_eq!(trade.price, Price::from("98570.9"));
1241 assert_eq!(trade.size, Quantity::from(100));
1242 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1243 assert_eq!(
1244 trade.trade_id.to_string(),
1245 "00000000-006d-1000-0000-000e8737d536"
1246 );
1247 assert_eq!(trade.ts_event, 1732436138704000000); assert_eq!(trade.ts_init, 3);
1249 }
1250
1251 #[rstest]
1252 fn test_trade_bin_message() {
1253 let json_data = load_test_json("ws_trade_bin_1m.json");
1254
1255 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1256 let topic = BitmexWsTopic::TradeBin1m;
1257
1258 let msg: BitmexTradeBinMsg = serde_json::from_str(&json_data).unwrap();
1259 let instrument = create_test_perpetual_instrument();
1260 let bar = parse_trade_bin_msg(
1261 &msg,
1262 &topic,
1263 &instrument,
1264 instrument.id(),
1265 instrument.price_precision(),
1266 UnixNanos::from(3),
1267 );
1268
1269 assert_eq!(bar.instrument_id(), instrument_id);
1270 assert_eq!(
1271 bar.bar_type.spec(),
1272 BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
1273 );
1274 assert_eq!(bar.open, Price::from("97550.0"));
1275 assert_eq!(bar.high, Price::from("97584.4"));
1276 assert_eq!(bar.low, Price::from("97550.0"));
1277 assert_eq!(bar.close, Price::from("97570.1"));
1278 assert_eq!(bar.volume, Quantity::from(84_000));
1279 assert_eq!(bar.ts_event, 1732392420000000000); assert_eq!(bar.ts_init, 3);
1281 }
1282
1283 #[rstest]
1284 fn test_trade_bin_message_extreme_adjustment() {
1285 let topic = BitmexWsTopic::TradeBin1m;
1286 let instrument = create_test_perpetual_instrument();
1287
1288 let msg = BitmexTradeBinMsg {
1289 timestamp: DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
1290 .unwrap()
1291 .with_timezone(&Utc),
1292 symbol: Ustr::from("XBTUSD"),
1293 open: 50_000.0,
1294 high: 49_990.0,
1295 low: 50_010.0,
1296 close: 50_005.0,
1297 trades: 10,
1298 volume: 1_000,
1299 vwap: 0.0,
1300 last_size: 0,
1301 turnover: 0,
1302 home_notional: 0.0,
1303 foreign_notional: 0.0,
1304 };
1305
1306 let bar = parse_trade_bin_msg(
1307 &msg,
1308 &topic,
1309 &instrument,
1310 instrument.id(),
1311 instrument.price_precision(),
1312 UnixNanos::from(3),
1313 );
1314
1315 assert_eq!(bar.high, Price::from("50010.0"));
1316 assert_eq!(bar.low, Price::from("49990.0"));
1317 assert_eq!(bar.open, Price::from("50000.0"));
1318 assert_eq!(bar.close, Price::from("50005.0"));
1319 assert_eq!(bar.volume, Quantity::from(1_000));
1320 }
1321
1322 #[rstest]
1323 fn test_parse_order_msg() {
1324 let json_data = load_test_json("ws_order.json");
1325 let msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1326 let cache = DashMap::new();
1327 let instrument = create_test_perpetual_instrument();
1328 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1329
1330 assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1331 assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1332 assert_eq!(
1333 report.venue_order_id.to_string(),
1334 "550e8400-e29b-41d4-a716-446655440001"
1335 );
1336 assert_eq!(
1337 report.client_order_id.unwrap().to_string(),
1338 "mm_bitmex_1a/oemUeQ4CAJZgP3fjHsA"
1339 );
1340 assert_eq!(report.order_side, OrderSide::Buy);
1341 assert_eq!(report.order_type, OrderType::Limit);
1342 assert_eq!(report.time_in_force, TimeInForce::Gtc);
1343 assert_eq!(report.order_status, OrderStatus::Accepted);
1344 assert_eq!(report.quantity, Quantity::from(100));
1345 assert_eq!(report.filled_qty, Quantity::from(0));
1346 assert_eq!(report.price.unwrap(), Price::from("98000.0"));
1347 assert_eq!(report.ts_accepted, 1732530600000000000); }
1349
1350 #[rstest]
1351 fn test_parse_order_msg_infers_type_when_missing() {
1352 let json_data = load_test_json("ws_order.json");
1353 let mut msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1354 msg.ord_type = None;
1355 msg.cl_ord_id = None;
1356 msg.price = Some(98_000.0);
1357 msg.stop_px = None;
1358
1359 let cache = DashMap::new();
1360 let instrument = create_test_perpetual_instrument();
1361
1362 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1363
1364 assert_eq!(report.order_type, OrderType::Limit);
1365 }
1366
1367 #[rstest]
1368 fn test_parse_order_msg_rejected_with_reason() {
1369 let mut msg: BitmexOrderMsg =
1370 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1371 msg.ord_status = BitmexOrderStatus::Rejected;
1372 msg.ord_rej_reason = Some(Ustr::from("Insufficient available balance"));
1373 msg.text = None;
1374 msg.cum_qty = 0;
1375
1376 let cache = DashMap::new();
1377 let instrument = create_test_perpetual_instrument();
1378 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1379
1380 assert_eq!(report.order_status, OrderStatus::Rejected);
1381 assert_eq!(
1382 report.cancel_reason,
1383 Some("Insufficient available balance".to_string())
1384 );
1385 }
1386
1387 #[rstest]
1388 fn test_parse_order_msg_rejected_with_text_fallback() {
1389 let mut msg: BitmexOrderMsg =
1390 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1391 msg.ord_status = BitmexOrderStatus::Rejected;
1392 msg.ord_rej_reason = None;
1393 msg.text = Some(Ustr::from("Order would execute immediately"));
1394 msg.cum_qty = 0;
1395
1396 let cache = DashMap::new();
1397 let instrument = create_test_perpetual_instrument();
1398 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1399
1400 assert_eq!(report.order_status, OrderStatus::Rejected);
1401 assert_eq!(
1402 report.cancel_reason,
1403 Some("Order would execute immediately".to_string())
1404 );
1405 }
1406
1407 #[rstest]
1408 fn test_parse_order_msg_rejected_without_reason() {
1409 let mut msg: BitmexOrderMsg =
1410 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1411 msg.ord_status = BitmexOrderStatus::Rejected;
1412 msg.ord_rej_reason = None;
1413 msg.text = None;
1414 msg.cum_qty = 0;
1415
1416 let cache = DashMap::new();
1417 let instrument = create_test_perpetual_instrument();
1418 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1419
1420 assert_eq!(report.order_status, OrderStatus::Rejected);
1421 assert_eq!(report.cancel_reason, None);
1422 }
1423
1424 #[rstest]
1425 fn test_parse_execution_msg() {
1426 let json_data = load_test_json("ws_execution.json");
1427 let msg: BitmexExecutionMsg = serde_json::from_str(&json_data).unwrap();
1428 let instrument = create_test_perpetual_instrument();
1429 let fill = parse_execution_msg(msg, &instrument).unwrap();
1430
1431 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1432 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1433 assert_eq!(
1434 fill.venue_order_id.to_string(),
1435 "550e8400-e29b-41d4-a716-446655440002"
1436 );
1437 assert_eq!(
1438 fill.trade_id.to_string(),
1439 "00000000-006d-1000-0000-000e8737d540"
1440 );
1441 assert_eq!(
1442 fill.client_order_id.unwrap().to_string(),
1443 "mm_bitmex_2b/oemUeQ4CAJZgP3fjHsB"
1444 );
1445 assert_eq!(fill.order_side, OrderSide::Sell);
1446 assert_eq!(fill.last_qty, Quantity::from(100));
1447 assert_eq!(fill.last_px, Price::from("98950.0"));
1448 assert_eq!(fill.liquidity_side, LiquiditySide::Maker);
1449 assert_eq!(fill.commission, Money::new(0.00075, Currency::from("XBT")));
1450 assert_eq!(fill.commission.currency.code.to_string(), "XBT");
1451 assert_eq!(fill.ts_event, 1732530900789000000); }
1453
1454 #[rstest]
1455 fn test_parse_execution_msg_non_trade() {
1456 let mut msg: BitmexExecutionMsg =
1458 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1459 msg.exec_type = Some(BitmexExecType::Settlement);
1460
1461 let instrument = create_test_perpetual_instrument();
1462 let result = parse_execution_msg(msg, &instrument);
1463 assert!(result.is_none());
1464 }
1465
1466 #[rstest]
1467 fn test_parse_cancel_reject_execution() {
1468 let json = load_test_json("ws_execution_cancel_reject.json");
1470
1471 let msg: BitmexExecutionMsg = serde_json::from_str(&json).unwrap();
1472 assert_eq!(msg.exec_type, Some(BitmexExecType::CancelReject));
1473 assert_eq!(msg.ord_status, Some(BitmexOrderStatus::Rejected));
1474 assert_eq!(msg.symbol, None);
1475
1476 let instrument = create_test_perpetual_instrument();
1478 let result = parse_execution_msg(msg, &instrument);
1479 assert!(result.is_none());
1480 }
1481
1482 #[rstest]
1483 fn test_parse_execution_msg_liquidation() {
1484 let mut msg: BitmexExecutionMsg =
1486 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1487 msg.exec_type = Some(BitmexExecType::Liquidation);
1488
1489 let instrument = create_test_perpetual_instrument();
1490 let fill = parse_execution_msg(msg, &instrument).unwrap();
1491
1492 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1493 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1494 assert_eq!(fill.order_side, OrderSide::Sell);
1495 assert_eq!(fill.last_qty, Quantity::from(100));
1496 assert_eq!(fill.last_px, Price::from("98950.0"));
1497 }
1498
1499 #[rstest]
1500 fn test_parse_execution_msg_bankruptcy() {
1501 let mut msg: BitmexExecutionMsg =
1502 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1503 msg.exec_type = Some(BitmexExecType::Bankruptcy);
1504
1505 let instrument = create_test_perpetual_instrument();
1506 let fill = parse_execution_msg(msg, &instrument).unwrap();
1507
1508 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1509 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1510 assert_eq!(fill.order_side, OrderSide::Sell);
1511 assert_eq!(fill.last_qty, Quantity::from(100));
1512 }
1513
1514 #[rstest]
1515 fn test_parse_execution_msg_settlement() {
1516 let mut msg: BitmexExecutionMsg =
1517 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1518 msg.exec_type = Some(BitmexExecType::Settlement);
1519
1520 let instrument = create_test_perpetual_instrument();
1521 let result = parse_execution_msg(msg, &instrument);
1522 assert!(result.is_none());
1523 }
1524
1525 #[rstest]
1526 fn test_parse_execution_msg_trial_fill() {
1527 let mut msg: BitmexExecutionMsg =
1528 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1529 msg.exec_type = Some(BitmexExecType::TrialFill);
1530
1531 let instrument = create_test_perpetual_instrument();
1532 let result = parse_execution_msg(msg, &instrument);
1533 assert!(result.is_none());
1534 }
1535
1536 #[rstest]
1537 fn test_parse_execution_msg_funding() {
1538 let mut msg: BitmexExecutionMsg =
1539 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1540 msg.exec_type = Some(BitmexExecType::Funding);
1541
1542 let instrument = create_test_perpetual_instrument();
1543 let result = parse_execution_msg(msg, &instrument);
1544 assert!(result.is_none());
1545 }
1546
1547 #[rstest]
1548 fn test_parse_execution_msg_insurance() {
1549 let mut msg: BitmexExecutionMsg =
1550 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1551 msg.exec_type = Some(BitmexExecType::Insurance);
1552
1553 let instrument = create_test_perpetual_instrument();
1554 let result = parse_execution_msg(msg, &instrument);
1555 assert!(result.is_none());
1556 }
1557
1558 #[rstest]
1559 fn test_parse_execution_msg_rebalance() {
1560 let mut msg: BitmexExecutionMsg =
1561 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1562 msg.exec_type = Some(BitmexExecType::Rebalance);
1563
1564 let instrument = create_test_perpetual_instrument();
1565 let result = parse_execution_msg(msg, &instrument);
1566 assert!(result.is_none());
1567 }
1568
1569 #[rstest]
1570 fn test_parse_execution_msg_order_state_changes() {
1571 let instrument = create_test_perpetual_instrument();
1572
1573 let order_state_types = vec![
1574 BitmexExecType::New,
1575 BitmexExecType::Canceled,
1576 BitmexExecType::CancelReject,
1577 BitmexExecType::Replaced,
1578 BitmexExecType::Rejected,
1579 BitmexExecType::AmendReject,
1580 BitmexExecType::Suspended,
1581 BitmexExecType::Released,
1582 BitmexExecType::TriggeredOrActivatedBySystem,
1583 ];
1584
1585 for exec_type in order_state_types {
1586 let mut msg: BitmexExecutionMsg =
1587 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1588 msg.exec_type = Some(exec_type);
1589
1590 let result = parse_execution_msg(msg, &instrument);
1591 assert!(
1592 result.is_none(),
1593 "Expected None for exec_type {exec_type:?}"
1594 );
1595 }
1596 }
1597
1598 #[rstest]
1599 fn test_parse_position_msg() {
1600 let json_data = load_test_json("ws_position.json");
1601 let msg: BitmexPositionMsg = serde_json::from_str(&json_data).unwrap();
1602 let instrument = create_test_perpetual_instrument();
1603 let report = parse_position_msg(msg, &instrument);
1604
1605 assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1606 assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1607 assert_eq!(report.position_side.as_position_side(), PositionSide::Long);
1608 assert_eq!(report.quantity, Quantity::from(1000));
1609 assert!(report.venue_position_id.is_none());
1610 assert_eq!(report.ts_last, 1732530900789000000); }
1612
1613 #[rstest]
1614 fn test_parse_position_msg_short() {
1615 let mut msg: BitmexPositionMsg =
1616 serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1617 msg.current_qty = Some(-500);
1618
1619 let instrument = create_test_perpetual_instrument();
1620 let report = parse_position_msg(msg, &instrument);
1621 assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
1622 assert_eq!(report.quantity, Quantity::from(500));
1623 }
1624
1625 #[rstest]
1626 fn test_parse_position_msg_flat() {
1627 let mut msg: BitmexPositionMsg =
1628 serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1629 msg.current_qty = Some(0);
1630
1631 let instrument = create_test_perpetual_instrument();
1632 let report = parse_position_msg(msg, &instrument);
1633 assert_eq!(report.position_side.as_position_side(), PositionSide::Flat);
1634 assert_eq!(report.quantity, Quantity::from(0));
1635 }
1636
1637 #[rstest]
1638 fn test_parse_wallet_msg() {
1639 let json_data = load_test_json("ws_wallet.json");
1640 let msg: BitmexWalletMsg = serde_json::from_str(&json_data).unwrap();
1641 let ts_init = UnixNanos::from(1);
1642 let account_state = parse_wallet_msg(msg, ts_init);
1643
1644 assert_eq!(account_state.account_id.to_string(), "BITMEX-1234567");
1645 assert!(!account_state.balances.is_empty());
1646 let balance = &account_state.balances[0];
1647 assert_eq!(balance.currency.code.to_string(), "XBT");
1648 assert!((balance.total.as_f64() - 1.0000518).abs() < 1e-7);
1650 }
1651
1652 #[rstest]
1653 fn test_parse_wallet_msg_no_amount() {
1654 let mut msg: BitmexWalletMsg =
1655 serde_json::from_str(&load_test_json("ws_wallet.json")).unwrap();
1656 msg.amount = None;
1657
1658 let ts_init = UnixNanos::from(1);
1659 let account_state = parse_wallet_msg(msg, ts_init);
1660 let balance = &account_state.balances[0];
1661 assert_eq!(balance.total.as_f64(), 0.0);
1662 }
1663
1664 #[rstest]
1665 fn test_parse_margin_msg() {
1666 let json_data = load_test_json("ws_margin.json");
1667 let msg: BitmexMarginMsg = serde_json::from_str(&json_data).unwrap();
1668 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1669 let margin_balance = parse_margin_msg(msg, instrument_id);
1670
1671 assert_eq!(margin_balance.currency.code.to_string(), "XBT");
1672 assert_eq!(margin_balance.instrument_id, instrument_id);
1673 assert_eq!(margin_balance.initial.as_f64(), 0.0);
1676 assert!((margin_balance.maintenance.as_f64() - 0.00015949).abs() < 1e-8);
1678 }
1679
1680 #[rstest]
1681 fn test_parse_margin_msg_no_available() {
1682 let mut msg: BitmexMarginMsg =
1683 serde_json::from_str(&load_test_json("ws_margin.json")).unwrap();
1684 msg.available_margin = None;
1685
1686 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1687 let margin_balance = parse_margin_msg(msg, instrument_id);
1688 assert!(margin_balance.initial.as_f64() >= 0.0);
1690 assert!(margin_balance.maintenance.as_f64() >= 0.0);
1691 }
1692
1693 #[rstest]
1694 fn test_parse_instrument_msg_both_prices() {
1695 let json_data = load_test_json("ws_instrument.json");
1696 let msg: BitmexInstrumentMsg = serde_json::from_str(&json_data).unwrap();
1697
1698 let mut instruments_cache = AHashMap::new();
1700 let test_instrument = create_test_perpetual_instrument();
1701 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1702
1703 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1704
1705 assert_eq!(updates.len(), 2);
1707
1708 match &updates[0] {
1710 Data::MarkPriceUpdate(update) => {
1711 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1712 assert_eq!(update.value.as_f64(), 95125.7);
1713 }
1714 _ => panic!("Expected MarkPriceUpdate at index 0"),
1715 }
1716
1717 match &updates[1] {
1719 Data::IndexPriceUpdate(update) => {
1720 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1721 assert_eq!(update.value.as_f64(), 95124.3);
1722 }
1723 _ => panic!("Expected IndexPriceUpdate at index 1"),
1724 }
1725 }
1726
1727 #[rstest]
1728 fn test_parse_instrument_msg_mark_price_only() {
1729 let mut msg: BitmexInstrumentMsg =
1730 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1731 msg.index_price = None;
1732
1733 let mut instruments_cache = AHashMap::new();
1735 let test_instrument = create_test_perpetual_instrument();
1736 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1737
1738 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1739
1740 assert_eq!(updates.len(), 1);
1741 match &updates[0] {
1742 Data::MarkPriceUpdate(update) => {
1743 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1744 assert_eq!(update.value.as_f64(), 95125.7);
1745 }
1746 _ => panic!("Expected MarkPriceUpdate"),
1747 }
1748 }
1749
1750 #[rstest]
1751 fn test_parse_instrument_msg_index_price_only() {
1752 let mut msg: BitmexInstrumentMsg =
1753 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1754 msg.mark_price = None;
1755
1756 let mut instruments_cache = AHashMap::new();
1758 let test_instrument = create_test_perpetual_instrument();
1759 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1760
1761 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1762
1763 assert_eq!(updates.len(), 1);
1764 match &updates[0] {
1765 Data::IndexPriceUpdate(update) => {
1766 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1767 assert_eq!(update.value.as_f64(), 95124.3);
1768 }
1769 _ => panic!("Expected IndexPriceUpdate"),
1770 }
1771 }
1772
1773 #[rstest]
1774 fn test_parse_instrument_msg_no_prices() {
1775 let mut msg: BitmexInstrumentMsg =
1776 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1777 msg.mark_price = None;
1778 msg.index_price = None;
1779 msg.last_price = None;
1780
1781 let mut instruments_cache = AHashMap::new();
1783 let test_instrument = create_test_perpetual_instrument();
1784 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1785
1786 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1787 assert_eq!(updates.len(), 0);
1788 }
1789
1790 #[rstest]
1791 fn test_parse_instrument_msg_index_symbol() {
1792 let mut msg: BitmexInstrumentMsg =
1795 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1796 msg.symbol = Ustr::from(".BXBT");
1797 msg.last_price = Some(119163.05);
1798 msg.mark_price = Some(119163.05); msg.index_price = None;
1800
1801 let instrument_id = InstrumentId::from(".BXBT.BITMEX");
1803 let instrument = CryptoPerpetual::new(
1804 instrument_id,
1805 Symbol::from(".BXBT"),
1806 Currency::BTC(),
1807 Currency::USD(),
1808 Currency::USD(),
1809 false, 2, 8, Price::from("0.01"),
1813 Quantity::from("0.00000001"),
1814 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(), UnixNanos::default(), );
1829 let mut instruments_cache = AHashMap::new();
1830 instruments_cache.insert(
1831 Ustr::from(".BXBT"),
1832 InstrumentAny::CryptoPerpetual(instrument),
1833 );
1834
1835 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1836
1837 assert_eq!(updates.len(), 2);
1838
1839 match &updates[0] {
1841 Data::MarkPriceUpdate(update) => {
1842 assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1843 assert_eq!(update.value, Price::from("119163.05"));
1844 }
1845 _ => panic!("Expected MarkPriceUpdate for index symbol"),
1846 }
1847
1848 match &updates[1] {
1850 Data::IndexPriceUpdate(update) => {
1851 assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1852 assert_eq!(update.value, Price::from("119163.05"));
1853 assert_eq!(update.ts_init, UnixNanos::from(1));
1854 }
1855 _ => panic!("Expected IndexPriceUpdate for index symbol"),
1856 }
1857 }
1858
1859 #[rstest]
1860 fn test_parse_funding_msg() {
1861 let json_data = load_test_json("ws_funding_rate.json");
1862 let msg: BitmexFundingMsg = serde_json::from_str(&json_data).unwrap();
1863 let update = parse_funding_msg(msg, UnixNanos::from(1));
1864
1865 assert!(update.is_some());
1866 let update = update.unwrap();
1867
1868 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1869 assert_eq!(update.rate.to_string(), "0.0001");
1870 assert!(update.next_funding_ns.is_none());
1871 assert_eq!(update.ts_event, UnixNanos::from(1732507200000000000));
1872 assert_eq!(update.ts_init, UnixNanos::from(1));
1873 }
1874}