1use std::{num::NonZero, str::FromStr};
19
20use ahash::AHashMap;
21use dashmap::DashMap;
22use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime, uuid::UUID4};
23use nautilus_model::{
24 data::{
25 Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
26 MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
27 depth::DEPTH10_LEN,
28 },
29 enums::{
30 AccountType, AggregationSource, BarAggregation, OrderSide, OrderStatus, OrderType,
31 PriceType, RecordFlag, TimeInForce, TriggerType,
32 },
33 events::{OrderUpdated, account::state::AccountState},
34 identifiers::{
35 AccountId, ClientOrderId, InstrumentId, OrderListId, StrategyId, Symbol, TradeId, TraderId,
36 VenueOrderId,
37 },
38 instruments::{Instrument, InstrumentAny},
39 reports::{FillReport, OrderStatusReport, PositionStatusReport},
40 types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
41};
42use rust_decimal::{Decimal, prelude::FromPrimitive};
43use ustr::Ustr;
44use uuid::Uuid;
45
46use super::{
47 enums::{BitmexAction, BitmexWsTopic},
48 messages::{
49 BitmexExecutionMsg, BitmexFundingMsg, BitmexInstrumentMsg, BitmexMarginMsg,
50 BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexOrderMsg, BitmexPositionMsg,
51 BitmexQuoteMsg, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg,
52 },
53};
54use crate::{
55 common::{
56 consts::BITMEX_VENUE,
57 enums::{BitmexExecInstruction, BitmexExecType, BitmexSide},
58 parse::{
59 clean_reason, map_bitmex_currency, normalize_trade_bin_prices,
60 normalize_trade_bin_volume, parse_contracts_quantity, parse_fractional_quantity,
61 parse_instrument_id, parse_liquidity_side, parse_optional_datetime_to_unix_nanos,
62 parse_position_side, parse_signed_contracts_quantity,
63 },
64 },
65 websocket::messages::BitmexOrderUpdateMsg,
66};
67
68const BAR_SPEC_1_MINUTE: BarSpecification = BarSpecification {
69 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
70 aggregation: BarAggregation::Minute,
71 price_type: PriceType::Last,
72};
73const BAR_SPEC_5_MINUTE: BarSpecification = BarSpecification {
74 step: NonZero::new(5).expect("5 is a valid non-zero usize"),
75 aggregation: BarAggregation::Minute,
76 price_type: PriceType::Last,
77};
78const BAR_SPEC_1_HOUR: BarSpecification = BarSpecification {
79 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
80 aggregation: BarAggregation::Hour,
81 price_type: PriceType::Last,
82};
83const BAR_SPEC_1_DAY: BarSpecification = BarSpecification {
84 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
85 aggregation: BarAggregation::Day,
86 price_type: PriceType::Last,
87};
88
89#[inline]
97#[must_use]
98pub fn is_index_symbol(symbol: &Ustr) -> bool {
99 symbol.starts_with('.')
100}
101
102#[must_use]
104pub fn parse_book_msg_vec(
105 data: Vec<BitmexOrderBookMsg>,
106 action: BitmexAction,
107 instruments: &AHashMap<Ustr, InstrumentAny>,
108 ts_init: UnixNanos,
109) -> Vec<Data> {
110 let mut deltas = Vec::with_capacity(data.len());
111
112 for msg in data {
113 if let Some(instrument) = instruments.get(&msg.symbol) {
114 let instrument_id = instrument.id();
115 let price_precision = instrument.price_precision();
116 deltas.push(Data::Delta(parse_book_msg(
117 &msg,
118 &action,
119 instrument,
120 instrument_id,
121 price_precision,
122 ts_init,
123 )));
124 } else {
125 tracing::error!(
126 "Instrument cache miss: book delta dropped for symbol={}",
127 msg.symbol
128 );
129 }
130 }
131 deltas
132}
133
134#[must_use]
136pub fn parse_book10_msg_vec(
137 data: Vec<BitmexOrderBook10Msg>,
138 instruments: &AHashMap<Ustr, InstrumentAny>,
139 ts_init: UnixNanos,
140) -> Vec<Data> {
141 let mut depths = Vec::with_capacity(data.len());
142
143 for msg in data {
144 if let Some(instrument) = instruments.get(&msg.symbol) {
145 let instrument_id = instrument.id();
146 let price_precision = instrument.price_precision();
147 depths.push(Data::Depth10(Box::new(parse_book10_msg(
148 &msg,
149 instrument,
150 instrument_id,
151 price_precision,
152 ts_init,
153 ))));
154 } else {
155 tracing::error!(
156 "Instrument cache miss: depth10 message dropped for symbol={}",
157 msg.symbol
158 );
159 }
160 }
161 depths
162}
163
164#[must_use]
166pub fn parse_trade_msg_vec(
167 data: Vec<BitmexTradeMsg>,
168 instruments: &AHashMap<Ustr, InstrumentAny>,
169 ts_init: UnixNanos,
170) -> Vec<Data> {
171 let mut trades = Vec::with_capacity(data.len());
172
173 for msg in data {
174 if let Some(instrument) = instruments.get(&msg.symbol) {
175 let instrument_id = instrument.id();
176 let price_precision = instrument.price_precision();
177 trades.push(Data::Trade(parse_trade_msg(
178 &msg,
179 instrument,
180 instrument_id,
181 price_precision,
182 ts_init,
183 )));
184 } else {
185 tracing::error!(
186 "Instrument cache miss: trade message dropped for symbol={}",
187 msg.symbol
188 );
189 }
190 }
191 trades
192}
193
194#[must_use]
196pub fn parse_trade_bin_msg_vec(
197 data: Vec<BitmexTradeBinMsg>,
198 topic: BitmexWsTopic,
199 instruments: &AHashMap<Ustr, InstrumentAny>,
200 ts_init: UnixNanos,
201) -> Vec<Data> {
202 let mut trades = Vec::with_capacity(data.len());
203
204 for msg in data {
205 if let Some(instrument) = instruments.get(&msg.symbol) {
206 let instrument_id = instrument.id();
207 let price_precision = instrument.price_precision();
208 trades.push(Data::Bar(parse_trade_bin_msg(
209 &msg,
210 &topic,
211 instrument,
212 instrument_id,
213 price_precision,
214 ts_init,
215 )));
216 } else {
217 tracing::error!(
218 "Instrument cache miss: trade bin (bar) dropped for symbol={}",
219 msg.symbol
220 );
221 }
222 }
223 trades
224}
225
226#[allow(clippy::too_many_arguments)]
228#[must_use]
229pub fn parse_book_msg(
230 msg: &BitmexOrderBookMsg,
231 action: &BitmexAction,
232 instrument: &InstrumentAny,
233 instrument_id: InstrumentId,
234 price_precision: u8,
235 ts_init: UnixNanos,
236) -> OrderBookDelta {
237 let flags = if action == &BitmexAction::Insert {
238 RecordFlag::F_SNAPSHOT as u8
239 } else {
240 0
241 };
242
243 let action = action.as_book_action();
244 let price = Price::new(msg.price, price_precision);
245 let side = msg.side.as_order_side();
246 let size = parse_contracts_quantity(msg.size.unwrap_or(0), instrument);
247 let order_id = msg.id;
248 let order = BookOrder::new(side, price, size, order_id);
249 let sequence = 0; let ts_event = UnixNanos::from(msg.timestamp);
251
252 OrderBookDelta::new(
253 instrument_id,
254 action,
255 order,
256 flags,
257 sequence,
258 ts_event,
259 ts_init,
260 )
261}
262
263#[allow(clippy::too_many_arguments)]
269#[must_use]
270pub fn parse_book10_msg(
271 msg: &BitmexOrderBook10Msg,
272 instrument: &InstrumentAny,
273 instrument_id: InstrumentId,
274 price_precision: u8,
275 ts_init: UnixNanos,
276) -> OrderBookDepth10 {
277 let mut bids = Vec::with_capacity(DEPTH10_LEN);
278 let mut asks = Vec::with_capacity(DEPTH10_LEN);
279
280 let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
282 let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
283
284 for (i, level) in msg.bids.iter().enumerate() {
285 let bid_order = BookOrder::new(
286 OrderSide::Buy,
287 Price::new(level[0], price_precision),
288 parse_fractional_quantity(level[1], instrument),
289 0,
290 );
291
292 bids.push(bid_order);
293 bid_counts[i] = 1;
294 }
295
296 for (i, level) in msg.asks.iter().enumerate() {
297 let ask_order = BookOrder::new(
298 OrderSide::Sell,
299 Price::new(level[0], price_precision),
300 parse_fractional_quantity(level[1], instrument),
301 0,
302 );
303
304 asks.push(ask_order);
305 ask_counts[i] = 1;
306 }
307
308 let bids: [BookOrder; DEPTH10_LEN] = bids
309 .try_into()
310 .inspect_err(|v: &Vec<BookOrder>| {
311 tracing::error!("Bids length mismatch: expected 10, was {}", v.len());
312 })
313 .expect("BitMEX orderBook10 should always have exactly 10 bid levels");
314 let asks: [BookOrder; DEPTH10_LEN] = asks
315 .try_into()
316 .inspect_err(|v: &Vec<BookOrder>| {
317 tracing::error!("Asks length mismatch: expected 10, was {}", v.len());
318 })
319 .expect("BitMEX orderBook10 should always have exactly 10 ask levels");
320
321 let ts_event = UnixNanos::from(msg.timestamp);
322
323 OrderBookDepth10::new(
324 instrument_id,
325 bids,
326 asks,
327 bid_counts,
328 ask_counts,
329 RecordFlag::F_SNAPSHOT as u8,
330 0, ts_event,
332 ts_init,
333 )
334}
335
336#[must_use]
338pub fn parse_quote_msg(
339 msg: &BitmexQuoteMsg,
340 last_quote: &QuoteTick,
341 instrument: &InstrumentAny,
342 instrument_id: InstrumentId,
343 price_precision: u8,
344 ts_init: UnixNanos,
345) -> QuoteTick {
346 let bid_price = match msg.bid_price {
347 Some(price) => Price::new(price, price_precision),
348 None => last_quote.bid_price,
349 };
350
351 let ask_price = match msg.ask_price {
352 Some(price) => Price::new(price, price_precision),
353 None => last_quote.ask_price,
354 };
355
356 let bid_size = match msg.bid_size {
357 Some(size) => parse_contracts_quantity(size, instrument),
358 None => last_quote.bid_size,
359 };
360
361 let ask_size = match msg.ask_size {
362 Some(size) => parse_contracts_quantity(size, instrument),
363 None => last_quote.ask_size,
364 };
365
366 let ts_event = UnixNanos::from(msg.timestamp);
367
368 QuoteTick::new(
369 instrument_id,
370 bid_price,
371 ask_price,
372 bid_size,
373 ask_size,
374 ts_event,
375 ts_init,
376 )
377}
378
379#[must_use]
381pub fn parse_trade_msg(
382 msg: &BitmexTradeMsg,
383 instrument: &InstrumentAny,
384 instrument_id: InstrumentId,
385 price_precision: u8,
386 ts_init: UnixNanos,
387) -> TradeTick {
388 let price = Price::new(msg.price, price_precision);
389 let size = parse_contracts_quantity(msg.size, instrument);
390 let aggressor_side = msg.side.as_aggressor_side();
391 let trade_id = TradeId::new(
392 msg.trd_match_id
393 .map_or_else(|| Uuid::new_v4().to_string(), |uuid| uuid.to_string()),
394 );
395 let ts_event = UnixNanos::from(msg.timestamp);
396
397 TradeTick::new(
398 instrument_id,
399 price,
400 size,
401 aggressor_side,
402 trade_id,
403 ts_event,
404 ts_init,
405 )
406}
407
408#[must_use]
410pub fn parse_trade_bin_msg(
411 msg: &BitmexTradeBinMsg,
412 topic: &BitmexWsTopic,
413 instrument: &InstrumentAny,
414 instrument_id: InstrumentId,
415 price_precision: u8,
416 ts_init: UnixNanos,
417) -> Bar {
418 let spec = bar_spec_from_topic(topic);
419 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
420
421 let open = Price::new(msg.open, price_precision);
422 let high = Price::new(msg.high, price_precision);
423 let low = Price::new(msg.low, price_precision);
424 let close = Price::new(msg.close, price_precision);
425
426 let (open, high, low, close) =
427 normalize_trade_bin_prices(open, high, low, close, &msg.symbol, Some(&bar_type));
428
429 let volume_contracts = normalize_trade_bin_volume(Some(msg.volume), &msg.symbol);
430 let volume = parse_contracts_quantity(volume_contracts, instrument);
431 let ts_event = UnixNanos::from(msg.timestamp);
432
433 Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
434}
435
436#[must_use]
442pub fn bar_spec_from_topic(topic: &BitmexWsTopic) -> BarSpecification {
443 match topic {
444 BitmexWsTopic::TradeBin1m => BAR_SPEC_1_MINUTE,
445 BitmexWsTopic::TradeBin5m => BAR_SPEC_5_MINUTE,
446 BitmexWsTopic::TradeBin1h => BAR_SPEC_1_HOUR,
447 BitmexWsTopic::TradeBin1d => BAR_SPEC_1_DAY,
448 _ => {
449 tracing::error!(topic = ?topic, "Bar specification not supported");
450 BAR_SPEC_1_MINUTE
451 }
452 }
453}
454
455#[must_use]
461pub fn topic_from_bar_spec(spec: BarSpecification) -> BitmexWsTopic {
462 match spec {
463 BAR_SPEC_1_MINUTE => BitmexWsTopic::TradeBin1m,
464 BAR_SPEC_5_MINUTE => BitmexWsTopic::TradeBin5m,
465 BAR_SPEC_1_HOUR => BitmexWsTopic::TradeBin1h,
466 BAR_SPEC_1_DAY => BitmexWsTopic::TradeBin1d,
467 _ => {
468 tracing::error!(spec = ?spec, "Bar specification not supported");
469 BitmexWsTopic::TradeBin1m
470 }
471 }
472}
473
474fn infer_order_type_from_msg(msg: &BitmexOrderMsg) -> Option<OrderType> {
475 if msg.stop_px.is_some() {
476 if msg.price.is_some() {
477 Some(OrderType::StopLimit)
478 } else {
479 Some(OrderType::StopMarket)
480 }
481 } else if msg.price.is_some() {
482 Some(OrderType::Limit)
483 } else {
484 Some(OrderType::Market)
485 }
486}
487
488pub fn parse_order_msg(
502 msg: &BitmexOrderMsg,
503 instrument: &InstrumentAny,
504 order_type_cache: &DashMap<ClientOrderId, OrderType>,
505) -> anyhow::Result<OrderStatusReport> {
506 let account_id = AccountId::new(format!("BITMEX-{}", msg.account)); let instrument_id = parse_instrument_id(msg.symbol);
508 let venue_order_id = VenueOrderId::new(msg.order_id.to_string());
509 let common_side: BitmexSide = msg.side.into();
510 let order_side: OrderSide = common_side.into();
511
512 let order_type: OrderType = if let Some(ord_type) = msg.ord_type {
513 ord_type.into()
514 } else if let Some(client_order_id) = msg.cl_ord_id {
515 let client_order_id = ClientOrderId::new(client_order_id);
516 if let Some(entry) = order_type_cache.get(&client_order_id) {
517 *entry.value()
518 } else if let Some(inferred) = infer_order_type_from_msg(msg) {
519 order_type_cache.insert(client_order_id, inferred);
520 inferred
521 } else {
522 anyhow::bail!(
523 "Order type not found in cache for client_order_id: {client_order_id} (order missing ord_type field)"
524 );
525 }
526 } else if let Some(inferred) = infer_order_type_from_msg(msg) {
527 inferred
528 } else {
529 anyhow::bail!("Order missing both ord_type and cl_ord_id");
530 };
531
532 let time_in_force: TimeInForce = match msg.time_in_force {
533 Some(tif) => tif.try_into().map_err(|e| anyhow::anyhow!("{e}"))?,
534 None => TimeInForce::Gtc,
535 };
536 let order_status: OrderStatus = msg.ord_status.into();
537 let quantity = parse_signed_contracts_quantity(msg.order_qty, instrument);
538 let filled_qty = parse_signed_contracts_quantity(msg.cum_qty, instrument);
539 let report_id = UUID4::new();
540 let ts_accepted =
541 parse_optional_datetime_to_unix_nanos(&Some(msg.transact_time), "transact_time");
542 let ts_last = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
543 let ts_init = get_atomic_clock_realtime().get_time_ns();
544
545 let mut report = OrderStatusReport::new(
546 account_id,
547 instrument_id,
548 None, venue_order_id,
550 order_side,
551 order_type,
552 time_in_force,
553 order_status,
554 quantity,
555 filled_qty,
556 ts_accepted,
557 ts_last,
558 ts_init,
559 Some(report_id),
560 );
561
562 if let Some(cl_ord_id) = &msg.cl_ord_id {
563 report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
564 }
565
566 if let Some(cl_ord_link_id) = &msg.cl_ord_link_id {
567 report = report.with_order_list_id(OrderListId::new(cl_ord_link_id));
568 }
569
570 if let Some(price) = msg.price {
571 report = report.with_price(Price::new(price, instrument.price_precision()));
572 }
573
574 if let Some(avg_px) = msg.avg_px {
575 report = report.with_avg_px(avg_px);
576 }
577
578 if let Some(trigger_price) = msg.stop_px {
579 let trigger_type = if let Some(exec_insts) = &msg.exec_inst {
580 if exec_insts.contains(&BitmexExecInstruction::MarkPrice) {
582 TriggerType::MarkPrice
583 } else if exec_insts.contains(&BitmexExecInstruction::IndexPrice) {
584 TriggerType::IndexPrice
585 } else if exec_insts.contains(&BitmexExecInstruction::LastPrice) {
586 TriggerType::LastPrice
587 } else {
588 TriggerType::Default
589 }
590 } else {
591 TriggerType::Default };
593
594 report = report
595 .with_trigger_price(Price::new(trigger_price, instrument.price_precision()))
596 .with_trigger_type(trigger_type);
597 }
598
599 if let Some(exec_insts) = &msg.exec_inst {
600 for exec_inst in exec_insts {
601 match exec_inst {
602 BitmexExecInstruction::ParticipateDoNotInitiate => {
603 report = report.with_post_only(true);
604 }
605 BitmexExecInstruction::ReduceOnly => {
606 report = report.with_reduce_only(true);
607 }
608 _ => {}
609 }
610 }
611 }
612
613 if order_status == OrderStatus::Rejected {
615 if let Some(reason_str) = msg.ord_rej_reason.or(msg.text) {
616 tracing::debug!(
617 order_id = ?venue_order_id,
618 client_order_id = ?msg.cl_ord_id,
619 reason = ?reason_str,
620 "Order rejected with reason"
621 );
622 report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
623 } else {
624 tracing::debug!(
625 order_id = ?venue_order_id,
626 client_order_id = ?msg.cl_ord_id,
627 ord_status = ?msg.ord_status,
628 ord_rej_reason = ?msg.ord_rej_reason,
629 text = ?msg.text,
630 "Order rejected without reason from BitMEX"
631 );
632 }
633 }
634
635 if order_status == OrderStatus::Canceled
638 && let Some(reason_str) = msg.ord_rej_reason.or(msg.text)
639 {
640 report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
641 }
642
643 Ok(report)
644}
645
646pub fn parse_order_update_msg(
650 msg: &BitmexOrderUpdateMsg,
651 instrument: &InstrumentAny,
652 account_id: AccountId,
653) -> Option<OrderUpdated> {
654 let trader_id = TraderId::default();
657 let strategy_id = StrategyId::default();
658 let instrument_id = parse_instrument_id(msg.symbol);
659 let venue_order_id = Some(VenueOrderId::new(msg.order_id.to_string()));
660 let client_order_id = msg.cl_ord_id.map(ClientOrderId::new).unwrap_or_default();
661 let quantity = Quantity::zero(instrument.size_precision());
662 let price = msg
663 .price
664 .map(|p| Price::new(p, instrument.price_precision()));
665
666 let trigger_price = None;
668
669 let event_id = UUID4::new();
670 let ts_event = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
671 let ts_init = get_atomic_clock_realtime().get_time_ns();
672
673 Some(nautilus_model::events::OrderUpdated::new(
674 trader_id,
675 strategy_id,
676 instrument_id,
677 client_order_id,
678 quantity,
679 event_id,
680 ts_event,
681 ts_init,
682 false, venue_order_id,
684 Some(account_id),
685 price,
686 trigger_price,
687 ))
688}
689
690pub fn parse_execution_msg(
709 msg: BitmexExecutionMsg,
710 instrument: &InstrumentAny,
711) -> Option<FillReport> {
712 let exec_type = msg.exec_type?;
713
714 match exec_type {
715 BitmexExecType::Trade | BitmexExecType::Liquidation => {}
717 BitmexExecType::Bankruptcy => {
718 tracing::warn!(
719 exec_type = ?exec_type,
720 order_id = ?msg.order_id,
721 symbol = ?msg.symbol,
722 "Processing bankruptcy execution as fill"
723 );
724 }
725
726 BitmexExecType::Settlement => {
728 tracing::debug!(
729 exec_type = ?exec_type,
730 order_id = ?msg.order_id,
731 symbol = ?msg.symbol,
732 "Settlement execution skipped (not a fill): applies quanto conversion/PnL transfer on contract settlement"
733 );
734 return None;
735 }
736 BitmexExecType::TrialFill => {
737 tracing::warn!(
738 exec_type = ?exec_type,
739 order_id = ?msg.order_id,
740 symbol = ?msg.symbol,
741 "Trial fill execution received (testnet only), not processed as fill"
742 );
743 return None;
744 }
745
746 BitmexExecType::Funding => {
748 tracing::debug!(
749 exec_type = ?exec_type,
750 order_id = ?msg.order_id,
751 symbol = ?msg.symbol,
752 "Funding execution skipped (not a fill)"
753 );
754 return None;
755 }
756 BitmexExecType::Insurance => {
757 tracing::debug!(
758 exec_type = ?exec_type,
759 order_id = ?msg.order_id,
760 symbol = ?msg.symbol,
761 "Insurance execution skipped (not a fill)"
762 );
763 return None;
764 }
765 BitmexExecType::Rebalance => {
766 tracing::debug!(
767 exec_type = ?exec_type,
768 order_id = ?msg.order_id,
769 symbol = ?msg.symbol,
770 "Rebalance execution skipped (not a fill)"
771 );
772 return None;
773 }
774
775 BitmexExecType::New
777 | BitmexExecType::Canceled
778 | BitmexExecType::CancelReject
779 | BitmexExecType::Replaced
780 | BitmexExecType::Rejected
781 | BitmexExecType::AmendReject
782 | BitmexExecType::Suspended
783 | BitmexExecType::Released
784 | BitmexExecType::TriggeredOrActivatedBySystem => {
785 tracing::debug!(
786 exec_type = ?exec_type,
787 order_id = ?msg.order_id,
788 "Execution message skipped (order state change, not a fill)"
789 );
790 return None;
791 }
792 }
793
794 let account_id = AccountId::new(format!("BITMEX-{}", msg.account?));
795 let instrument_id = parse_instrument_id(msg.symbol?);
796 let venue_order_id = VenueOrderId::new(msg.order_id?.to_string());
797 let trade_id = TradeId::new(msg.trd_match_id?.to_string());
798 let order_side: OrderSide = msg.side.map_or(OrderSide::NoOrderSide, |s| {
799 let side: BitmexSide = s.into();
800 side.into()
801 });
802 let last_qty = parse_signed_contracts_quantity(msg.last_qty?, instrument);
803 let last_px = Price::new(msg.last_px?, instrument.price_precision());
804 let settlement_currency_str = msg.settl_currency.unwrap_or(Ustr::from("XBT"));
805 let mapped_currency = map_bitmex_currency(settlement_currency_str.as_str());
806 let commission = Money::new(
807 msg.commission.unwrap_or(0.0),
808 Currency::from(mapped_currency.as_str()),
809 );
810 let liquidity_side = parse_liquidity_side(&msg.last_liquidity_ind);
811 let client_order_id = msg.cl_ord_id.map(ClientOrderId::new);
812 let venue_position_id = None; let ts_event = parse_optional_datetime_to_unix_nanos(&msg.transact_time, "transact_time");
814 let ts_init = get_atomic_clock_realtime().get_time_ns();
815
816 Some(FillReport::new(
817 account_id,
818 instrument_id,
819 venue_order_id,
820 trade_id,
821 order_side,
822 last_qty,
823 last_px,
824 commission,
825 liquidity_side,
826 client_order_id,
827 venue_position_id,
828 ts_event,
829 ts_init,
830 None,
831 ))
832}
833
834#[must_use]
840pub fn parse_position_msg(
841 msg: BitmexPositionMsg,
842 instrument: &InstrumentAny,
843) -> PositionStatusReport {
844 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
845 let instrument_id = parse_instrument_id(msg.symbol);
846 let position_side = parse_position_side(msg.current_qty).as_specified();
847 let quantity = parse_signed_contracts_quantity(msg.current_qty.unwrap_or(0), instrument);
848 let venue_position_id = None; let avg_px_open = msg.avg_entry_price.and_then(Decimal::from_f64);
850 let ts_last = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
851 let ts_init = get_atomic_clock_realtime().get_time_ns();
852
853 PositionStatusReport::new(
854 account_id,
855 instrument_id,
856 position_side,
857 quantity,
858 ts_last,
859 ts_init,
860 None, venue_position_id, avg_px_open, )
864}
865
866#[must_use]
879pub fn parse_instrument_msg(
880 msg: BitmexInstrumentMsg,
881 instruments_cache: &AHashMap<Ustr, InstrumentAny>,
882 ts_init: UnixNanos,
883) -> Vec<Data> {
884 let mut updates = Vec::new();
885 let is_index = is_index_symbol(&msg.symbol);
886
887 let effective_index_price = if is_index {
890 msg.last_price
891 } else {
892 msg.index_price
893 };
894
895 if msg.mark_price.is_none() && effective_index_price.is_none() {
899 return updates;
900 }
901
902 let instrument_id = InstrumentId::new(Symbol::from_ustr_unchecked(msg.symbol), *BITMEX_VENUE);
903 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
904
905 let price_precision = match instruments_cache.get(&Ustr::from(&msg.symbol)) {
907 Some(instrument) => instrument.price_precision(),
908 None => {
909 if is_index {
913 tracing::trace!(
914 "Index instrument {} not in cache, skipping update",
915 msg.symbol
916 );
917 } else {
918 tracing::debug!("Instrument {} not in cache, skipping update", msg.symbol);
919 }
920 return updates;
921 }
922 };
923
924 if let Some(mark_price) = msg.mark_price {
927 let price = Price::new(mark_price, price_precision);
928 updates.push(Data::MarkPriceUpdate(MarkPriceUpdate::new(
929 instrument_id,
930 price,
931 ts_event,
932 ts_init,
933 )));
934 }
935
936 if let Some(index_price) = effective_index_price {
938 let price = Price::new(index_price, price_precision);
939 updates.push(Data::IndexPriceUpdate(IndexPriceUpdate::new(
940 instrument_id,
941 price,
942 ts_event,
943 ts_init,
944 )));
945 }
946
947 updates
948}
949
950pub fn parse_funding_msg(msg: BitmexFundingMsg, ts_init: UnixNanos) -> Option<FundingRateUpdate> {
956 let instrument_id = InstrumentId::from(format!("{}.BITMEX", msg.symbol).as_str());
957 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
958
959 let rate = match Decimal::from_str(&msg.funding_rate.to_string()) {
961 Ok(rate) => rate,
962 Err(e) => {
963 tracing::error!("Failed to parse funding rate: {e}");
964 return None;
965 }
966 };
967
968 Some(FundingRateUpdate::new(
969 instrument_id,
970 rate,
971 None, ts_event,
973 ts_init,
974 ))
975}
976
977#[must_use]
986pub fn parse_wallet_msg(msg: BitmexWalletMsg, ts_init: UnixNanos) -> AccountState {
987 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
988
989 let currency_str = crate::common::parse::map_bitmex_currency(msg.currency.as_str());
991 let currency = Currency::from(currency_str.as_str());
992
993 let divisor = if msg.currency == "XBt" {
995 100_000_000.0 } else if msg.currency == "USDt" || msg.currency == "LAMp" {
997 1_000_000.0 } else {
999 1.0
1000 };
1001 let amount = msg.amount.unwrap_or(0) as f64 / divisor;
1002
1003 let total = Money::new(amount, currency);
1004 let locked = Money::new(0.0, currency); let free = total - locked;
1006
1007 let balance = AccountBalance::new_checked(total, locked, free)
1008 .expect("Balance calculation should be valid");
1009
1010 AccountState::new(
1011 account_id,
1012 AccountType::Margin,
1013 vec![balance],
1014 vec![], true, UUID4::new(),
1017 ts_init,
1018 ts_init,
1019 None,
1020 )
1021}
1022
1023#[must_use]
1027pub fn parse_margin_msg(msg: BitmexMarginMsg, instrument_id: InstrumentId) -> MarginBalance {
1028 let currency_str = crate::common::parse::map_bitmex_currency(msg.currency.as_str());
1030 let currency = Currency::from(currency_str.as_str());
1031
1032 let divisor = if msg.currency == "XBt" {
1034 100_000_000.0 } else if msg.currency == "USDt" || msg.currency == "LAMp" {
1036 1_000_000.0 } else {
1038 1.0
1039 };
1040
1041 let initial = (msg.init_margin.unwrap_or(0) as f64 / divisor).max(0.0);
1042 let maintenance = (msg.maint_margin.unwrap_or(0) as f64 / divisor).max(0.0);
1043 let _unrealized = msg.unrealised_pnl.unwrap_or(0) as f64 / divisor;
1044
1045 MarginBalance::new(
1046 Money::new(initial, currency),
1047 Money::new(maintenance, currency),
1048 instrument_id,
1049 )
1050}
1051
1052#[cfg(test)]
1057mod tests {
1058 use chrono::{DateTime, Utc};
1059 use nautilus_model::{
1060 enums::{AggressorSide, BookAction, LiquiditySide, PositionSide},
1061 identifiers::Symbol,
1062 instruments::crypto_perpetual::CryptoPerpetual,
1063 };
1064 use rstest::rstest;
1065 use ustr::Ustr;
1066
1067 use super::*;
1068 use crate::common::{
1069 enums::{BitmexExecType, BitmexOrderStatus},
1070 testing::load_test_json,
1071 };
1072
1073 fn create_test_perpetual_instrument_with_precisions(
1075 price_precision: u8,
1076 size_precision: u8,
1077 ) -> InstrumentAny {
1078 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
1079 InstrumentId::from("XBTUSD.BITMEX"),
1080 Symbol::new("XBTUSD"),
1081 Currency::BTC(),
1082 Currency::USD(),
1083 Currency::BTC(),
1084 true, price_precision,
1086 size_precision,
1087 Price::new(0.5, price_precision),
1088 Quantity::new(1.0, size_precision),
1089 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
1102 UnixNanos::default(),
1103 ))
1104 }
1105
1106 fn create_test_perpetual_instrument() -> InstrumentAny {
1107 create_test_perpetual_instrument_with_precisions(1, 0)
1108 }
1109
1110 #[rstest]
1111 fn test_orderbook_l2_message() {
1112 let json_data = load_test_json("ws_orderbook_l2.json");
1113
1114 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1115 let msg: BitmexOrderBookMsg = serde_json::from_str(&json_data).unwrap();
1116
1117 let instrument = create_test_perpetual_instrument();
1119 let delta = parse_book_msg(
1120 &msg,
1121 &BitmexAction::Insert,
1122 &instrument,
1123 instrument.id(),
1124 instrument.price_precision(),
1125 UnixNanos::from(3),
1126 );
1127 assert_eq!(delta.instrument_id, instrument_id);
1128 assert_eq!(delta.order.price, Price::from("98459.9"));
1129 assert_eq!(delta.order.size, Quantity::from(33000));
1130 assert_eq!(delta.order.side, OrderSide::Sell);
1131 assert_eq!(delta.order.order_id, 62400580205);
1132 assert_eq!(delta.action, BookAction::Add);
1133 assert_eq!(delta.flags, RecordFlag::F_SNAPSHOT as u8);
1134 assert_eq!(delta.sequence, 0);
1135 assert_eq!(delta.ts_event, 1732436782356000000); assert_eq!(delta.ts_init, 3);
1137
1138 let delta = parse_book_msg(
1140 &msg,
1141 &BitmexAction::Update,
1142 &instrument,
1143 instrument.id(),
1144 instrument.price_precision(),
1145 UnixNanos::from(3),
1146 );
1147 assert_eq!(delta.flags, 0);
1148 assert_eq!(delta.action, BookAction::Update);
1149 }
1150
1151 #[rstest]
1152 fn test_orderbook10_message() {
1153 let json_data = load_test_json("ws_orderbook_10.json");
1154 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1155 let msg: BitmexOrderBook10Msg = serde_json::from_str(&json_data).unwrap();
1156 let instrument = create_test_perpetual_instrument();
1157 let depth10 = parse_book10_msg(
1158 &msg,
1159 &instrument,
1160 instrument.id(),
1161 instrument.price_precision(),
1162 UnixNanos::from(3),
1163 );
1164
1165 assert_eq!(depth10.instrument_id, instrument_id);
1166
1167 assert_eq!(depth10.bids[0].price, Price::from("98490.3"));
1169 assert_eq!(depth10.bids[0].size, Quantity::from(22400));
1170 assert_eq!(depth10.bids[0].side, OrderSide::Buy);
1171
1172 assert_eq!(depth10.asks[0].price, Price::from("98490.4"));
1174 assert_eq!(depth10.asks[0].size, Quantity::from(17600));
1175 assert_eq!(depth10.asks[0].side, OrderSide::Sell);
1176
1177 assert_eq!(depth10.bid_counts, [1; DEPTH10_LEN]);
1179 assert_eq!(depth10.ask_counts, [1; DEPTH10_LEN]);
1180
1181 assert_eq!(depth10.sequence, 0);
1183 assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
1184 assert_eq!(depth10.ts_event, 1732436353513000000); assert_eq!(depth10.ts_init, 3);
1186 }
1187
1188 #[rstest]
1189 fn test_quote_message() {
1190 let json_data = load_test_json("ws_quote.json");
1191
1192 let instrument_id = InstrumentId::from("BCHUSDT.BITMEX");
1193 let last_quote = QuoteTick::new(
1194 instrument_id,
1195 Price::new(487.50, 2),
1196 Price::new(488.20, 2),
1197 Quantity::from(100_000),
1198 Quantity::from(100_000),
1199 UnixNanos::from(1),
1200 UnixNanos::from(2),
1201 );
1202 let msg: BitmexQuoteMsg = serde_json::from_str(&json_data).unwrap();
1203 let instrument = create_test_perpetual_instrument_with_precisions(2, 0);
1204 let quote = parse_quote_msg(
1205 &msg,
1206 &last_quote,
1207 &instrument,
1208 instrument_id,
1209 instrument.price_precision(),
1210 UnixNanos::from(3),
1211 );
1212
1213 assert_eq!(quote.instrument_id, instrument_id);
1214 assert_eq!(quote.bid_price, Price::from("487.55"));
1215 assert_eq!(quote.ask_price, Price::from("488.25"));
1216 assert_eq!(quote.bid_size, Quantity::from(103_000));
1217 assert_eq!(quote.ask_size, Quantity::from(50_000));
1218 assert_eq!(quote.ts_event, 1732315465085000000);
1219 assert_eq!(quote.ts_init, 3);
1220 }
1221
1222 #[rstest]
1223 fn test_trade_message() {
1224 let json_data = load_test_json("ws_trade.json");
1225
1226 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1227 let msg: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
1228 let instrument = create_test_perpetual_instrument();
1229 let trade = parse_trade_msg(
1230 &msg,
1231 &instrument,
1232 instrument.id(),
1233 instrument.price_precision(),
1234 UnixNanos::from(3),
1235 );
1236
1237 assert_eq!(trade.instrument_id, instrument_id);
1238 assert_eq!(trade.price, Price::from("98570.9"));
1239 assert_eq!(trade.size, Quantity::from(100));
1240 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1241 assert_eq!(
1242 trade.trade_id.to_string(),
1243 "00000000-006d-1000-0000-000e8737d536"
1244 );
1245 assert_eq!(trade.ts_event, 1732436138704000000); assert_eq!(trade.ts_init, 3);
1247 }
1248
1249 #[rstest]
1250 fn test_trade_bin_message() {
1251 let json_data = load_test_json("ws_trade_bin_1m.json");
1252
1253 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1254 let topic = BitmexWsTopic::TradeBin1m;
1255
1256 let msg: BitmexTradeBinMsg = serde_json::from_str(&json_data).unwrap();
1257 let instrument = create_test_perpetual_instrument();
1258 let bar = parse_trade_bin_msg(
1259 &msg,
1260 &topic,
1261 &instrument,
1262 instrument.id(),
1263 instrument.price_precision(),
1264 UnixNanos::from(3),
1265 );
1266
1267 assert_eq!(bar.instrument_id(), instrument_id);
1268 assert_eq!(
1269 bar.bar_type.spec(),
1270 BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
1271 );
1272 assert_eq!(bar.open, Price::from("97550.0"));
1273 assert_eq!(bar.high, Price::from("97584.4"));
1274 assert_eq!(bar.low, Price::from("97550.0"));
1275 assert_eq!(bar.close, Price::from("97570.1"));
1276 assert_eq!(bar.volume, Quantity::from(84_000));
1277 assert_eq!(bar.ts_event, 1732392420000000000); assert_eq!(bar.ts_init, 3);
1279 }
1280
1281 #[rstest]
1282 fn test_trade_bin_message_extreme_adjustment() {
1283 let topic = BitmexWsTopic::TradeBin1m;
1284 let instrument = create_test_perpetual_instrument();
1285
1286 let msg = BitmexTradeBinMsg {
1287 timestamp: DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
1288 .unwrap()
1289 .with_timezone(&Utc),
1290 symbol: Ustr::from("XBTUSD"),
1291 open: 50_000.0,
1292 high: 49_990.0,
1293 low: 50_010.0,
1294 close: 50_005.0,
1295 trades: 10,
1296 volume: 1_000,
1297 vwap: 0.0,
1298 last_size: 0,
1299 turnover: 0,
1300 home_notional: 0.0,
1301 foreign_notional: 0.0,
1302 };
1303
1304 let bar = parse_trade_bin_msg(
1305 &msg,
1306 &topic,
1307 &instrument,
1308 instrument.id(),
1309 instrument.price_precision(),
1310 UnixNanos::from(3),
1311 );
1312
1313 assert_eq!(bar.high, Price::from("50010.0"));
1314 assert_eq!(bar.low, Price::from("49990.0"));
1315 assert_eq!(bar.open, Price::from("50000.0"));
1316 assert_eq!(bar.close, Price::from("50005.0"));
1317 assert_eq!(bar.volume, Quantity::from(1_000));
1318 }
1319
1320 #[rstest]
1321 fn test_parse_order_msg() {
1322 let json_data = load_test_json("ws_order.json");
1323 let msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1324 let cache = dashmap::DashMap::new();
1325 let instrument = create_test_perpetual_instrument();
1326 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1327
1328 assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1329 assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1330 assert_eq!(
1331 report.venue_order_id.to_string(),
1332 "550e8400-e29b-41d4-a716-446655440001"
1333 );
1334 assert_eq!(
1335 report.client_order_id.unwrap().to_string(),
1336 "mm_bitmex_1a/oemUeQ4CAJZgP3fjHsA"
1337 );
1338 assert_eq!(report.order_side, OrderSide::Buy);
1339 assert_eq!(report.order_type, OrderType::Limit);
1340 assert_eq!(report.time_in_force, TimeInForce::Gtc);
1341 assert_eq!(report.order_status, OrderStatus::Accepted);
1342 assert_eq!(report.quantity, Quantity::from(100));
1343 assert_eq!(report.filled_qty, Quantity::from(0));
1344 assert_eq!(report.price.unwrap(), Price::from("98000.0"));
1345 assert_eq!(report.ts_accepted, 1732530600000000000); }
1347
1348 #[rstest]
1349 fn test_parse_order_msg_infers_type_when_missing() {
1350 let json_data = load_test_json("ws_order.json");
1351 let mut msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1352 msg.ord_type = None;
1353 msg.cl_ord_id = None;
1354 msg.price = Some(98_000.0);
1355 msg.stop_px = None;
1356
1357 let cache = dashmap::DashMap::new();
1358 let instrument = create_test_perpetual_instrument();
1359
1360 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1361
1362 assert_eq!(report.order_type, OrderType::Limit);
1363 }
1364
1365 #[rstest]
1366 fn test_parse_order_msg_rejected_with_reason() {
1367 let mut msg: BitmexOrderMsg =
1368 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1369 msg.ord_status = BitmexOrderStatus::Rejected;
1370 msg.ord_rej_reason = Some(Ustr::from("Insufficient available balance"));
1371 msg.text = None;
1372 msg.cum_qty = 0;
1373
1374 let cache = dashmap::DashMap::new();
1375 let instrument = create_test_perpetual_instrument();
1376 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1377
1378 assert_eq!(report.order_status, OrderStatus::Rejected);
1379 assert_eq!(
1380 report.cancel_reason,
1381 Some("Insufficient available balance".to_string())
1382 );
1383 }
1384
1385 #[rstest]
1386 fn test_parse_order_msg_rejected_with_text_fallback() {
1387 let mut msg: BitmexOrderMsg =
1388 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1389 msg.ord_status = BitmexOrderStatus::Rejected;
1390 msg.ord_rej_reason = None;
1391 msg.text = Some(Ustr::from("Order would execute immediately"));
1392 msg.cum_qty = 0;
1393
1394 let cache = dashmap::DashMap::new();
1395 let instrument = create_test_perpetual_instrument();
1396 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1397
1398 assert_eq!(report.order_status, OrderStatus::Rejected);
1399 assert_eq!(
1400 report.cancel_reason,
1401 Some("Order would execute immediately".to_string())
1402 );
1403 }
1404
1405 #[rstest]
1406 fn test_parse_order_msg_rejected_without_reason() {
1407 let mut msg: BitmexOrderMsg =
1408 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1409 msg.ord_status = BitmexOrderStatus::Rejected;
1410 msg.ord_rej_reason = None;
1411 msg.text = None;
1412 msg.cum_qty = 0;
1413
1414 let cache = dashmap::DashMap::new();
1415 let instrument = create_test_perpetual_instrument();
1416 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1417
1418 assert_eq!(report.order_status, OrderStatus::Rejected);
1419 assert_eq!(report.cancel_reason, None);
1420 }
1421
1422 #[rstest]
1423 fn test_parse_execution_msg() {
1424 let json_data = load_test_json("ws_execution.json");
1425 let msg: BitmexExecutionMsg = serde_json::from_str(&json_data).unwrap();
1426 let instrument = create_test_perpetual_instrument();
1427 let fill = parse_execution_msg(msg, &instrument).unwrap();
1428
1429 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1430 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1431 assert_eq!(
1432 fill.venue_order_id.to_string(),
1433 "550e8400-e29b-41d4-a716-446655440002"
1434 );
1435 assert_eq!(
1436 fill.trade_id.to_string(),
1437 "00000000-006d-1000-0000-000e8737d540"
1438 );
1439 assert_eq!(
1440 fill.client_order_id.unwrap().to_string(),
1441 "mm_bitmex_2b/oemUeQ4CAJZgP3fjHsB"
1442 );
1443 assert_eq!(fill.order_side, OrderSide::Sell);
1444 assert_eq!(fill.last_qty, Quantity::from(100));
1445 assert_eq!(fill.last_px, Price::from("98950.0"));
1446 assert_eq!(fill.liquidity_side, LiquiditySide::Maker);
1447 assert_eq!(fill.commission, Money::new(0.00075, Currency::from("XBT")));
1448 assert_eq!(fill.commission.currency.code.to_string(), "XBT");
1449 assert_eq!(fill.ts_event, 1732530900789000000); }
1451
1452 #[rstest]
1453 fn test_parse_execution_msg_non_trade() {
1454 let mut msg: BitmexExecutionMsg =
1456 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1457 msg.exec_type = Some(BitmexExecType::Settlement);
1458
1459 let instrument = create_test_perpetual_instrument();
1460 let result = parse_execution_msg(msg, &instrument);
1461 assert!(result.is_none());
1462 }
1463
1464 #[rstest]
1465 fn test_parse_cancel_reject_execution() {
1466 let json = load_test_json("ws_execution_cancel_reject.json");
1468
1469 let msg: BitmexExecutionMsg = serde_json::from_str(&json).unwrap();
1470 assert_eq!(msg.exec_type, Some(BitmexExecType::CancelReject));
1471 assert_eq!(msg.ord_status, Some(BitmexOrderStatus::Rejected));
1472 assert_eq!(msg.symbol, None);
1473
1474 let instrument = create_test_perpetual_instrument();
1476 let result = parse_execution_msg(msg, &instrument);
1477 assert!(result.is_none());
1478 }
1479
1480 #[rstest]
1481 fn test_parse_execution_msg_liquidation() {
1482 let mut msg: BitmexExecutionMsg =
1484 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1485 msg.exec_type = Some(BitmexExecType::Liquidation);
1486
1487 let instrument = create_test_perpetual_instrument();
1488 let fill = parse_execution_msg(msg, &instrument).unwrap();
1489
1490 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1491 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1492 assert_eq!(fill.order_side, OrderSide::Sell);
1493 assert_eq!(fill.last_qty, Quantity::from(100));
1494 assert_eq!(fill.last_px, Price::from("98950.0"));
1495 }
1496
1497 #[rstest]
1498 fn test_parse_execution_msg_bankruptcy() {
1499 let mut msg: BitmexExecutionMsg =
1500 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1501 msg.exec_type = Some(BitmexExecType::Bankruptcy);
1502
1503 let instrument = create_test_perpetual_instrument();
1504 let fill = parse_execution_msg(msg, &instrument).unwrap();
1505
1506 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1507 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1508 assert_eq!(fill.order_side, OrderSide::Sell);
1509 assert_eq!(fill.last_qty, Quantity::from(100));
1510 }
1511
1512 #[rstest]
1513 fn test_parse_execution_msg_settlement() {
1514 let mut msg: BitmexExecutionMsg =
1515 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1516 msg.exec_type = Some(BitmexExecType::Settlement);
1517
1518 let instrument = create_test_perpetual_instrument();
1519 let result = parse_execution_msg(msg, &instrument);
1520 assert!(result.is_none());
1521 }
1522
1523 #[rstest]
1524 fn test_parse_execution_msg_trial_fill() {
1525 let mut msg: BitmexExecutionMsg =
1526 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1527 msg.exec_type = Some(BitmexExecType::TrialFill);
1528
1529 let instrument = create_test_perpetual_instrument();
1530 let result = parse_execution_msg(msg, &instrument);
1531 assert!(result.is_none());
1532 }
1533
1534 #[rstest]
1535 fn test_parse_execution_msg_funding() {
1536 let mut msg: BitmexExecutionMsg =
1537 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1538 msg.exec_type = Some(BitmexExecType::Funding);
1539
1540 let instrument = create_test_perpetual_instrument();
1541 let result = parse_execution_msg(msg, &instrument);
1542 assert!(result.is_none());
1543 }
1544
1545 #[rstest]
1546 fn test_parse_execution_msg_insurance() {
1547 let mut msg: BitmexExecutionMsg =
1548 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1549 msg.exec_type = Some(BitmexExecType::Insurance);
1550
1551 let instrument = create_test_perpetual_instrument();
1552 let result = parse_execution_msg(msg, &instrument);
1553 assert!(result.is_none());
1554 }
1555
1556 #[rstest]
1557 fn test_parse_execution_msg_rebalance() {
1558 let mut msg: BitmexExecutionMsg =
1559 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1560 msg.exec_type = Some(BitmexExecType::Rebalance);
1561
1562 let instrument = create_test_perpetual_instrument();
1563 let result = parse_execution_msg(msg, &instrument);
1564 assert!(result.is_none());
1565 }
1566
1567 #[rstest]
1568 fn test_parse_execution_msg_order_state_changes() {
1569 let instrument = create_test_perpetual_instrument();
1570
1571 let order_state_types = vec![
1572 BitmexExecType::New,
1573 BitmexExecType::Canceled,
1574 BitmexExecType::CancelReject,
1575 BitmexExecType::Replaced,
1576 BitmexExecType::Rejected,
1577 BitmexExecType::AmendReject,
1578 BitmexExecType::Suspended,
1579 BitmexExecType::Released,
1580 BitmexExecType::TriggeredOrActivatedBySystem,
1581 ];
1582
1583 for exec_type in order_state_types {
1584 let mut msg: BitmexExecutionMsg =
1585 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1586 msg.exec_type = Some(exec_type);
1587
1588 let result = parse_execution_msg(msg, &instrument);
1589 assert!(
1590 result.is_none(),
1591 "Expected None for exec_type {:?}",
1592 exec_type
1593 );
1594 }
1595 }
1596
1597 #[rstest]
1598 fn test_parse_position_msg() {
1599 let json_data = load_test_json("ws_position.json");
1600 let msg: BitmexPositionMsg = serde_json::from_str(&json_data).unwrap();
1601 let instrument = create_test_perpetual_instrument();
1602 let report = parse_position_msg(msg, &instrument);
1603
1604 assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1605 assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1606 assert_eq!(report.position_side.as_position_side(), PositionSide::Long);
1607 assert_eq!(report.quantity, Quantity::from(1000));
1608 assert!(report.venue_position_id.is_none());
1609 assert_eq!(report.ts_last, 1732530900789000000); }
1611
1612 #[rstest]
1613 fn test_parse_position_msg_short() {
1614 let mut msg: BitmexPositionMsg =
1615 serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1616 msg.current_qty = Some(-500);
1617
1618 let instrument = create_test_perpetual_instrument();
1619 let report = parse_position_msg(msg, &instrument);
1620 assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
1621 assert_eq!(report.quantity, Quantity::from(500));
1622 }
1623
1624 #[rstest]
1625 fn test_parse_position_msg_flat() {
1626 let mut msg: BitmexPositionMsg =
1627 serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1628 msg.current_qty = Some(0);
1629
1630 let instrument = create_test_perpetual_instrument();
1631 let report = parse_position_msg(msg, &instrument);
1632 assert_eq!(report.position_side.as_position_side(), PositionSide::Flat);
1633 assert_eq!(report.quantity, Quantity::from(0));
1634 }
1635
1636 #[rstest]
1637 fn test_parse_wallet_msg() {
1638 let json_data = load_test_json("ws_wallet.json");
1639 let msg: BitmexWalletMsg = serde_json::from_str(&json_data).unwrap();
1640 let ts_init = UnixNanos::from(1);
1641 let account_state = parse_wallet_msg(msg, ts_init);
1642
1643 assert_eq!(account_state.account_id.to_string(), "BITMEX-1234567");
1644 assert!(!account_state.balances.is_empty());
1645 let balance = &account_state.balances[0];
1646 assert_eq!(balance.currency.code.to_string(), "XBT");
1647 assert!((balance.total.as_f64() - 1.0000518).abs() < 1e-7);
1649 }
1650
1651 #[rstest]
1652 fn test_parse_wallet_msg_no_amount() {
1653 let mut msg: BitmexWalletMsg =
1654 serde_json::from_str(&load_test_json("ws_wallet.json")).unwrap();
1655 msg.amount = None;
1656
1657 let ts_init = UnixNanos::from(1);
1658 let account_state = parse_wallet_msg(msg, ts_init);
1659 let balance = &account_state.balances[0];
1660 assert_eq!(balance.total.as_f64(), 0.0);
1661 }
1662
1663 #[rstest]
1664 fn test_parse_margin_msg() {
1665 let json_data = load_test_json("ws_margin.json");
1666 let msg: BitmexMarginMsg = serde_json::from_str(&json_data).unwrap();
1667 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1668 let margin_balance = parse_margin_msg(msg, instrument_id);
1669
1670 assert_eq!(margin_balance.currency.code.to_string(), "XBT");
1671 assert_eq!(margin_balance.instrument_id, instrument_id);
1672 assert_eq!(margin_balance.initial.as_f64(), 0.0);
1675 assert!((margin_balance.maintenance.as_f64() - 0.00015949).abs() < 1e-8);
1677 }
1678
1679 #[rstest]
1680 fn test_parse_margin_msg_no_available() {
1681 let mut msg: BitmexMarginMsg =
1682 serde_json::from_str(&load_test_json("ws_margin.json")).unwrap();
1683 msg.available_margin = None;
1684
1685 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1686 let margin_balance = parse_margin_msg(msg, instrument_id);
1687 assert!(margin_balance.initial.as_f64() >= 0.0);
1689 assert!(margin_balance.maintenance.as_f64() >= 0.0);
1690 }
1691
1692 #[rstest]
1693 fn test_parse_instrument_msg_both_prices() {
1694 let json_data = load_test_json("ws_instrument.json");
1695 let msg: BitmexInstrumentMsg = serde_json::from_str(&json_data).unwrap();
1696
1697 let mut instruments_cache = AHashMap::new();
1699 let test_instrument = create_test_perpetual_instrument();
1700 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1701
1702 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1703
1704 assert_eq!(updates.len(), 2);
1706
1707 match &updates[0] {
1709 Data::MarkPriceUpdate(update) => {
1710 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1711 assert_eq!(update.value.as_f64(), 95125.7);
1712 }
1713 _ => panic!("Expected MarkPriceUpdate at index 0"),
1714 }
1715
1716 match &updates[1] {
1718 Data::IndexPriceUpdate(update) => {
1719 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1720 assert_eq!(update.value.as_f64(), 95124.3);
1721 }
1722 _ => panic!("Expected IndexPriceUpdate at index 1"),
1723 }
1724 }
1725
1726 #[rstest]
1727 fn test_parse_instrument_msg_mark_price_only() {
1728 let mut msg: BitmexInstrumentMsg =
1729 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1730 msg.index_price = None;
1731
1732 let mut instruments_cache = AHashMap::new();
1734 let test_instrument = create_test_perpetual_instrument();
1735 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1736
1737 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1738
1739 assert_eq!(updates.len(), 1);
1740 match &updates[0] {
1741 Data::MarkPriceUpdate(update) => {
1742 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1743 assert_eq!(update.value.as_f64(), 95125.7);
1744 }
1745 _ => panic!("Expected MarkPriceUpdate"),
1746 }
1747 }
1748
1749 #[rstest]
1750 fn test_parse_instrument_msg_index_price_only() {
1751 let mut msg: BitmexInstrumentMsg =
1752 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1753 msg.mark_price = None;
1754
1755 let mut instruments_cache = AHashMap::new();
1757 let test_instrument = create_test_perpetual_instrument();
1758 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1759
1760 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1761
1762 assert_eq!(updates.len(), 1);
1763 match &updates[0] {
1764 Data::IndexPriceUpdate(update) => {
1765 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1766 assert_eq!(update.value.as_f64(), 95124.3);
1767 }
1768 _ => panic!("Expected IndexPriceUpdate"),
1769 }
1770 }
1771
1772 #[rstest]
1773 fn test_parse_instrument_msg_no_prices() {
1774 let mut msg: BitmexInstrumentMsg =
1775 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1776 msg.mark_price = None;
1777 msg.index_price = None;
1778 msg.last_price = None;
1779
1780 let mut instruments_cache = AHashMap::new();
1782 let test_instrument = create_test_perpetual_instrument();
1783 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1784
1785 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1786 assert_eq!(updates.len(), 0);
1787 }
1788
1789 #[rstest]
1790 fn test_parse_instrument_msg_index_symbol() {
1791 let mut msg: BitmexInstrumentMsg =
1794 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1795 msg.symbol = Ustr::from(".BXBT");
1796 msg.last_price = Some(119163.05);
1797 msg.mark_price = Some(119163.05); msg.index_price = None;
1799
1800 let instrument_id = InstrumentId::from(".BXBT.BITMEX");
1802 let instrument = CryptoPerpetual::new(
1803 instrument_id,
1804 Symbol::from(".BXBT"),
1805 Currency::BTC(),
1806 Currency::USD(),
1807 Currency::USD(),
1808 false, 2, 8, Price::from("0.01"),
1812 Quantity::from("0.00000001"),
1813 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(), UnixNanos::default(), );
1828 let mut instruments_cache = AHashMap::new();
1829 instruments_cache.insert(
1830 Ustr::from(".BXBT"),
1831 InstrumentAny::CryptoPerpetual(instrument),
1832 );
1833
1834 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1835
1836 assert_eq!(updates.len(), 2);
1837
1838 match &updates[0] {
1840 Data::MarkPriceUpdate(update) => {
1841 assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1842 assert_eq!(update.value, Price::from("119163.05"));
1843 }
1844 _ => panic!("Expected MarkPriceUpdate for index symbol"),
1845 }
1846
1847 match &updates[1] {
1849 Data::IndexPriceUpdate(update) => {
1850 assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1851 assert_eq!(update.value, Price::from("119163.05"));
1852 assert_eq!(update.ts_init, UnixNanos::from(1));
1853 }
1854 _ => panic!("Expected IndexPriceUpdate for index symbol"),
1855 }
1856 }
1857
1858 #[rstest]
1859 fn test_parse_funding_msg() {
1860 let json_data = load_test_json("ws_funding_rate.json");
1861 let msg: BitmexFundingMsg = serde_json::from_str(&json_data).unwrap();
1862 let update = parse_funding_msg(msg, UnixNanos::from(1));
1863
1864 assert!(update.is_some());
1865 let update = update.unwrap();
1866
1867 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1868 assert_eq!(update.rate.to_string(), "0.0001");
1869 assert!(update.next_funding_ns.is_none());
1870 assert_eq!(update.ts_event, UnixNanos::from(1732507200000000000));
1871 assert_eq!(update.ts_init, UnixNanos::from(1));
1872 }
1873}