1use std::num::NonZero;
19
20use ahash::AHashMap;
21use dashmap::DashMap;
22use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime, uuid::UUID4};
23#[cfg(test)]
24use nautilus_model::types::Currency;
25use nautilus_model::{
26 data::{
27 Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
28 MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
29 depth::DEPTH10_LEN,
30 },
31 enums::{
32 AccountType, AggregationSource, BarAggregation, OrderSide, OrderStatus, OrderType,
33 PriceType, RecordFlag, TimeInForce, TriggerType,
34 },
35 events::{OrderUpdated, account::state::AccountState},
36 identifiers::{
37 AccountId, ClientOrderId, InstrumentId, OrderListId, StrategyId, Symbol, TradeId, TraderId,
38 VenueOrderId,
39 },
40 instruments::{Instrument, InstrumentAny},
41 reports::{FillReport, OrderStatusReport, PositionStatusReport},
42 types::{AccountBalance, MarginBalance, Money, Price, Quantity},
43};
44use rust_decimal::{Decimal, prelude::FromPrimitive};
45use ustr::Ustr;
46use uuid::Uuid;
47
48use super::{
49 enums::{BitmexAction, BitmexWsTopic},
50 messages::{
51 BitmexExecutionMsg, BitmexFundingMsg, BitmexInstrumentMsg, BitmexMarginMsg,
52 BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexOrderMsg, BitmexPositionMsg,
53 BitmexQuoteMsg, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg,
54 },
55};
56use crate::{
57 common::{
58 consts::BITMEX_VENUE,
59 enums::{BitmexExecInstruction, BitmexExecType, BitmexSide},
60 parse::{
61 clean_reason, map_bitmex_currency, normalize_trade_bin_prices,
62 normalize_trade_bin_volume, parse_contracts_quantity, parse_fractional_quantity,
63 parse_instrument_id, parse_liquidity_side, parse_optional_datetime_to_unix_nanos,
64 parse_position_side, parse_signed_contracts_quantity,
65 },
66 },
67 http::parse::get_currency,
68 websocket::messages::BitmexOrderUpdateMsg,
69};
70
71const BAR_SPEC_1_MINUTE: BarSpecification = BarSpecification {
72 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
73 aggregation: BarAggregation::Minute,
74 price_type: PriceType::Last,
75};
76const BAR_SPEC_5_MINUTE: BarSpecification = BarSpecification {
77 step: NonZero::new(5).expect("5 is a valid non-zero usize"),
78 aggregation: BarAggregation::Minute,
79 price_type: PriceType::Last,
80};
81const BAR_SPEC_1_HOUR: BarSpecification = BarSpecification {
82 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
83 aggregation: BarAggregation::Hour,
84 price_type: PriceType::Last,
85};
86const BAR_SPEC_1_DAY: BarSpecification = BarSpecification {
87 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
88 aggregation: BarAggregation::Day,
89 price_type: PriceType::Last,
90};
91
92#[inline]
100#[must_use]
101pub fn is_index_symbol(symbol: &Ustr) -> bool {
102 symbol.starts_with('.')
103}
104
105#[must_use]
107pub fn parse_book_msg_vec(
108 data: Vec<BitmexOrderBookMsg>,
109 action: BitmexAction,
110 instruments: &AHashMap<Ustr, InstrumentAny>,
111 ts_init: UnixNanos,
112) -> Vec<Data> {
113 let mut deltas = Vec::with_capacity(data.len());
114
115 for msg in data {
116 if let Some(instrument) = instruments.get(&msg.symbol) {
117 let instrument_id = instrument.id();
118 let price_precision = instrument.price_precision();
119 deltas.push(Data::Delta(parse_book_msg(
120 &msg,
121 &action,
122 instrument,
123 instrument_id,
124 price_precision,
125 ts_init,
126 )));
127 } else {
128 tracing::error!(
129 "Instrument cache miss: book delta dropped for symbol={}",
130 msg.symbol
131 );
132 }
133 }
134 deltas
135}
136
137#[must_use]
139pub fn parse_book10_msg_vec(
140 data: Vec<BitmexOrderBook10Msg>,
141 instruments: &AHashMap<Ustr, InstrumentAny>,
142 ts_init: UnixNanos,
143) -> Vec<Data> {
144 let mut depths = Vec::with_capacity(data.len());
145
146 for msg in data {
147 if let Some(instrument) = instruments.get(&msg.symbol) {
148 let instrument_id = instrument.id();
149 let price_precision = instrument.price_precision();
150 depths.push(Data::Depth10(Box::new(parse_book10_msg(
151 &msg,
152 instrument,
153 instrument_id,
154 price_precision,
155 ts_init,
156 ))));
157 } else {
158 tracing::error!(
159 "Instrument cache miss: depth10 message dropped for symbol={}",
160 msg.symbol
161 );
162 }
163 }
164 depths
165}
166
167#[must_use]
169pub fn parse_trade_msg_vec(
170 data: Vec<BitmexTradeMsg>,
171 instruments: &AHashMap<Ustr, InstrumentAny>,
172 ts_init: UnixNanos,
173) -> Vec<Data> {
174 let mut trades = Vec::with_capacity(data.len());
175
176 for msg in data {
177 if let Some(instrument) = instruments.get(&msg.symbol) {
178 let instrument_id = instrument.id();
179 let price_precision = instrument.price_precision();
180 trades.push(Data::Trade(parse_trade_msg(
181 &msg,
182 instrument,
183 instrument_id,
184 price_precision,
185 ts_init,
186 )));
187 } else {
188 tracing::error!(
189 "Instrument cache miss: trade message dropped for symbol={}",
190 msg.symbol
191 );
192 }
193 }
194 trades
195}
196
197#[must_use]
199pub fn parse_trade_bin_msg_vec(
200 data: Vec<BitmexTradeBinMsg>,
201 topic: BitmexWsTopic,
202 instruments: &AHashMap<Ustr, InstrumentAny>,
203 ts_init: UnixNanos,
204) -> Vec<Data> {
205 let mut trades = Vec::with_capacity(data.len());
206
207 for msg in data {
208 if let Some(instrument) = instruments.get(&msg.symbol) {
209 let instrument_id = instrument.id();
210 let price_precision = instrument.price_precision();
211 trades.push(Data::Bar(parse_trade_bin_msg(
212 &msg,
213 &topic,
214 instrument,
215 instrument_id,
216 price_precision,
217 ts_init,
218 )));
219 } else {
220 tracing::error!(
221 "Instrument cache miss: trade bin (bar) dropped for symbol={}",
222 msg.symbol
223 );
224 }
225 }
226 trades
227}
228
229#[allow(clippy::too_many_arguments)]
231#[must_use]
232pub fn parse_book_msg(
233 msg: &BitmexOrderBookMsg,
234 action: &BitmexAction,
235 instrument: &InstrumentAny,
236 instrument_id: InstrumentId,
237 price_precision: u8,
238 ts_init: UnixNanos,
239) -> OrderBookDelta {
240 let flags = if action == &BitmexAction::Insert {
241 RecordFlag::F_SNAPSHOT as u8
242 } else {
243 0
244 };
245
246 let action = action.as_book_action();
247 let price = Price::new(msg.price, price_precision);
248 let side = msg.side.as_order_side();
249 let size = parse_contracts_quantity(msg.size.unwrap_or(0), instrument);
250 let order_id = msg.id;
251 let order = BookOrder::new(side, price, size, order_id);
252 let sequence = 0; let ts_event = UnixNanos::from(msg.timestamp);
254
255 OrderBookDelta::new(
256 instrument_id,
257 action,
258 order,
259 flags,
260 sequence,
261 ts_event,
262 ts_init,
263 )
264}
265
266#[allow(clippy::too_many_arguments)]
272#[must_use]
273pub fn parse_book10_msg(
274 msg: &BitmexOrderBook10Msg,
275 instrument: &InstrumentAny,
276 instrument_id: InstrumentId,
277 price_precision: u8,
278 ts_init: UnixNanos,
279) -> OrderBookDepth10 {
280 let mut bids = Vec::with_capacity(DEPTH10_LEN);
281 let mut asks = Vec::with_capacity(DEPTH10_LEN);
282
283 let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
285 let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
286
287 for (i, level) in msg.bids.iter().enumerate() {
288 let bid_order = BookOrder::new(
289 OrderSide::Buy,
290 Price::new(level[0], price_precision),
291 parse_fractional_quantity(level[1], instrument),
292 0,
293 );
294
295 bids.push(bid_order);
296 bid_counts[i] = 1;
297 }
298
299 for (i, level) in msg.asks.iter().enumerate() {
300 let ask_order = BookOrder::new(
301 OrderSide::Sell,
302 Price::new(level[0], price_precision),
303 parse_fractional_quantity(level[1], instrument),
304 0,
305 );
306
307 asks.push(ask_order);
308 ask_counts[i] = 1;
309 }
310
311 let bids: [BookOrder; DEPTH10_LEN] = bids
312 .try_into()
313 .inspect_err(|v: &Vec<BookOrder>| {
314 tracing::error!("Bids length mismatch: expected 10, was {}", v.len());
315 })
316 .expect("BitMEX orderBook10 should always have exactly 10 bid levels");
317 let asks: [BookOrder; DEPTH10_LEN] = asks
318 .try_into()
319 .inspect_err(|v: &Vec<BookOrder>| {
320 tracing::error!("Asks length mismatch: expected 10, was {}", v.len());
321 })
322 .expect("BitMEX orderBook10 should always have exactly 10 ask levels");
323
324 let ts_event = UnixNanos::from(msg.timestamp);
325
326 OrderBookDepth10::new(
327 instrument_id,
328 bids,
329 asks,
330 bid_counts,
331 ask_counts,
332 RecordFlag::F_SNAPSHOT as u8,
333 0, ts_event,
335 ts_init,
336 )
337}
338
339#[must_use]
341pub fn parse_quote_msg(
342 msg: &BitmexQuoteMsg,
343 last_quote: &QuoteTick,
344 instrument: &InstrumentAny,
345 instrument_id: InstrumentId,
346 price_precision: u8,
347 ts_init: UnixNanos,
348) -> QuoteTick {
349 let bid_price = match msg.bid_price {
350 Some(price) => Price::new(price, price_precision),
351 None => last_quote.bid_price,
352 };
353
354 let ask_price = match msg.ask_price {
355 Some(price) => Price::new(price, price_precision),
356 None => last_quote.ask_price,
357 };
358
359 let bid_size = match msg.bid_size {
360 Some(size) => parse_contracts_quantity(size, instrument),
361 None => last_quote.bid_size,
362 };
363
364 let ask_size = match msg.ask_size {
365 Some(size) => parse_contracts_quantity(size, instrument),
366 None => last_quote.ask_size,
367 };
368
369 let ts_event = UnixNanos::from(msg.timestamp);
370
371 QuoteTick::new(
372 instrument_id,
373 bid_price,
374 ask_price,
375 bid_size,
376 ask_size,
377 ts_event,
378 ts_init,
379 )
380}
381
382#[must_use]
384pub fn parse_trade_msg(
385 msg: &BitmexTradeMsg,
386 instrument: &InstrumentAny,
387 instrument_id: InstrumentId,
388 price_precision: u8,
389 ts_init: UnixNanos,
390) -> TradeTick {
391 let price = Price::new(msg.price, price_precision);
392 let size = parse_contracts_quantity(msg.size, instrument);
393 let aggressor_side = msg.side.as_aggressor_side();
394 let trade_id = TradeId::new(
395 msg.trd_match_id
396 .map_or_else(|| Uuid::new_v4().to_string(), |uuid| uuid.to_string()),
397 );
398 let ts_event = UnixNanos::from(msg.timestamp);
399
400 TradeTick::new(
401 instrument_id,
402 price,
403 size,
404 aggressor_side,
405 trade_id,
406 ts_event,
407 ts_init,
408 )
409}
410
411#[must_use]
413pub fn parse_trade_bin_msg(
414 msg: &BitmexTradeBinMsg,
415 topic: &BitmexWsTopic,
416 instrument: &InstrumentAny,
417 instrument_id: InstrumentId,
418 price_precision: u8,
419 ts_init: UnixNanos,
420) -> Bar {
421 let spec = bar_spec_from_topic(topic);
422 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
423
424 let open = Price::new(msg.open, price_precision);
425 let high = Price::new(msg.high, price_precision);
426 let low = Price::new(msg.low, price_precision);
427 let close = Price::new(msg.close, price_precision);
428
429 let (open, high, low, close) =
430 normalize_trade_bin_prices(open, high, low, close, &msg.symbol, Some(&bar_type));
431
432 let volume_contracts = normalize_trade_bin_volume(Some(msg.volume), &msg.symbol);
433 let volume = parse_contracts_quantity(volume_contracts, instrument);
434 let ts_event = UnixNanos::from(msg.timestamp);
435
436 Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
437}
438
439#[must_use]
445pub fn bar_spec_from_topic(topic: &BitmexWsTopic) -> BarSpecification {
446 match topic {
447 BitmexWsTopic::TradeBin1m => BAR_SPEC_1_MINUTE,
448 BitmexWsTopic::TradeBin5m => BAR_SPEC_5_MINUTE,
449 BitmexWsTopic::TradeBin1h => BAR_SPEC_1_HOUR,
450 BitmexWsTopic::TradeBin1d => BAR_SPEC_1_DAY,
451 _ => {
452 tracing::error!(topic = ?topic, "Bar specification not supported");
453 BAR_SPEC_1_MINUTE
454 }
455 }
456}
457
458#[must_use]
464pub fn topic_from_bar_spec(spec: BarSpecification) -> BitmexWsTopic {
465 match spec {
466 BAR_SPEC_1_MINUTE => BitmexWsTopic::TradeBin1m,
467 BAR_SPEC_5_MINUTE => BitmexWsTopic::TradeBin5m,
468 BAR_SPEC_1_HOUR => BitmexWsTopic::TradeBin1h,
469 BAR_SPEC_1_DAY => BitmexWsTopic::TradeBin1d,
470 _ => {
471 tracing::error!(spec = ?spec, "Bar specification not supported");
472 BitmexWsTopic::TradeBin1m
473 }
474 }
475}
476
477fn infer_order_type_from_msg(msg: &BitmexOrderMsg) -> Option<OrderType> {
478 if msg.stop_px.is_some() {
479 if msg.price.is_some() {
480 Some(OrderType::StopLimit)
481 } else {
482 Some(OrderType::StopMarket)
483 }
484 } else if msg.price.is_some() {
485 Some(OrderType::Limit)
486 } else {
487 Some(OrderType::Market)
488 }
489}
490
491pub fn parse_order_msg(
505 msg: &BitmexOrderMsg,
506 instrument: &InstrumentAny,
507 order_type_cache: &DashMap<ClientOrderId, OrderType>,
508) -> anyhow::Result<OrderStatusReport> {
509 let account_id = AccountId::new(format!("BITMEX-{}", msg.account)); let instrument_id = parse_instrument_id(msg.symbol);
511 let venue_order_id = VenueOrderId::new(msg.order_id.to_string());
512 let common_side: BitmexSide = msg.side.into();
513 let order_side: OrderSide = common_side.into();
514
515 let order_type: OrderType = if let Some(ord_type) = msg.ord_type {
516 ord_type.into()
517 } else if let Some(client_order_id) = msg.cl_ord_id {
518 let client_order_id = ClientOrderId::new(client_order_id);
519 if let Some(entry) = order_type_cache.get(&client_order_id) {
520 *entry.value()
521 } else if let Some(inferred) = infer_order_type_from_msg(msg) {
522 order_type_cache.insert(client_order_id, inferred);
523 inferred
524 } else {
525 anyhow::bail!(
526 "Order type not found in cache for client_order_id: {client_order_id} (order missing ord_type field)"
527 );
528 }
529 } else if let Some(inferred) = infer_order_type_from_msg(msg) {
530 inferred
531 } else {
532 anyhow::bail!("Order missing both ord_type and cl_ord_id");
533 };
534
535 let time_in_force: TimeInForce = match msg.time_in_force {
536 Some(tif) => tif.try_into().map_err(|e| anyhow::anyhow!("{e}"))?,
537 None => TimeInForce::Gtc,
538 };
539 let order_status: OrderStatus = msg.ord_status.into();
540 let quantity = parse_signed_contracts_quantity(msg.order_qty, instrument);
541 let filled_qty = parse_signed_contracts_quantity(msg.cum_qty, instrument);
542 let report_id = UUID4::new();
543 let ts_accepted =
544 parse_optional_datetime_to_unix_nanos(&Some(msg.transact_time), "transact_time");
545 let ts_last = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
546 let ts_init = get_atomic_clock_realtime().get_time_ns();
547
548 let mut report = OrderStatusReport::new(
549 account_id,
550 instrument_id,
551 None, venue_order_id,
553 order_side,
554 order_type,
555 time_in_force,
556 order_status,
557 quantity,
558 filled_qty,
559 ts_accepted,
560 ts_last,
561 ts_init,
562 Some(report_id),
563 );
564
565 if let Some(cl_ord_id) = &msg.cl_ord_id {
566 report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
567 }
568
569 if let Some(cl_ord_link_id) = &msg.cl_ord_link_id {
570 report = report.with_order_list_id(OrderListId::new(cl_ord_link_id));
571 }
572
573 if let Some(price) = msg.price {
574 report = report.with_price(Price::new(price, instrument.price_precision()));
575 }
576
577 if let Some(avg_px) = msg.avg_px {
578 report = report.with_avg_px(avg_px)?;
579 }
580
581 if let Some(trigger_price) = msg.stop_px {
582 let trigger_type = if let Some(exec_insts) = &msg.exec_inst {
583 if exec_insts.contains(&BitmexExecInstruction::MarkPrice) {
585 TriggerType::MarkPrice
586 } else if exec_insts.contains(&BitmexExecInstruction::IndexPrice) {
587 TriggerType::IndexPrice
588 } else if exec_insts.contains(&BitmexExecInstruction::LastPrice) {
589 TriggerType::LastPrice
590 } else {
591 TriggerType::Default
592 }
593 } else {
594 TriggerType::Default };
596
597 report = report
598 .with_trigger_price(Price::new(trigger_price, instrument.price_precision()))
599 .with_trigger_type(trigger_type);
600 }
601
602 if let Some(exec_insts) = &msg.exec_inst {
603 for exec_inst in exec_insts {
604 match exec_inst {
605 BitmexExecInstruction::ParticipateDoNotInitiate => {
606 report = report.with_post_only(true);
607 }
608 BitmexExecInstruction::ReduceOnly => {
609 report = report.with_reduce_only(true);
610 }
611 _ => {}
612 }
613 }
614 }
615
616 if order_status == OrderStatus::Rejected {
618 if let Some(reason_str) = msg.ord_rej_reason.or(msg.text) {
619 tracing::debug!(
620 order_id = ?venue_order_id,
621 client_order_id = ?msg.cl_ord_id,
622 reason = ?reason_str,
623 "Order rejected with reason"
624 );
625 report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
626 } else {
627 tracing::debug!(
628 order_id = ?venue_order_id,
629 client_order_id = ?msg.cl_ord_id,
630 ord_status = ?msg.ord_status,
631 ord_rej_reason = ?msg.ord_rej_reason,
632 text = ?msg.text,
633 "Order rejected without reason from BitMEX"
634 );
635 }
636 }
637
638 if order_status == OrderStatus::Canceled
641 && let Some(reason_str) = msg.ord_rej_reason.or(msg.text)
642 {
643 report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
644 }
645
646 Ok(report)
647}
648
649pub fn parse_order_update_msg(
653 msg: &BitmexOrderUpdateMsg,
654 instrument: &InstrumentAny,
655 account_id: AccountId,
656) -> Option<OrderUpdated> {
657 let trader_id = TraderId::default();
660 let strategy_id = StrategyId::default();
661 let instrument_id = parse_instrument_id(msg.symbol);
662 let venue_order_id = Some(VenueOrderId::new(msg.order_id.to_string()));
663 let client_order_id = msg.cl_ord_id.map(ClientOrderId::new).unwrap_or_default();
664 let quantity = Quantity::zero(instrument.size_precision());
665 let price = msg
666 .price
667 .map(|p| Price::new(p, instrument.price_precision()));
668
669 let trigger_price = None;
671 let protection_price = None;
673
674 let event_id = UUID4::new();
675 let ts_event = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
676 let ts_init = get_atomic_clock_realtime().get_time_ns();
677
678 Some(OrderUpdated::new(
679 trader_id,
680 strategy_id,
681 instrument_id,
682 client_order_id,
683 quantity,
684 event_id,
685 ts_event,
686 ts_init,
687 false, venue_order_id,
689 Some(account_id),
690 price,
691 trigger_price,
692 protection_price,
693 ))
694}
695
696pub fn parse_execution_msg(
715 msg: BitmexExecutionMsg,
716 instrument: &InstrumentAny,
717) -> Option<FillReport> {
718 let exec_type = msg.exec_type?;
719
720 match exec_type {
721 BitmexExecType::Trade | BitmexExecType::Liquidation => {}
723 BitmexExecType::Bankruptcy => {
724 tracing::warn!(
725 exec_type = ?exec_type,
726 order_id = ?msg.order_id,
727 symbol = ?msg.symbol,
728 "Processing bankruptcy execution as fill"
729 );
730 }
731
732 BitmexExecType::Settlement => {
734 tracing::debug!(
735 exec_type = ?exec_type,
736 order_id = ?msg.order_id,
737 symbol = ?msg.symbol,
738 "Settlement execution skipped (not a fill): applies quanto conversion/PnL transfer on contract settlement"
739 );
740 return None;
741 }
742 BitmexExecType::TrialFill => {
743 tracing::warn!(
744 exec_type = ?exec_type,
745 order_id = ?msg.order_id,
746 symbol = ?msg.symbol,
747 "Trial fill execution received (testnet only), not processed as fill"
748 );
749 return None;
750 }
751
752 BitmexExecType::Funding => {
754 tracing::debug!(
755 exec_type = ?exec_type,
756 order_id = ?msg.order_id,
757 symbol = ?msg.symbol,
758 "Funding execution skipped (not a fill)"
759 );
760 return None;
761 }
762 BitmexExecType::Insurance => {
763 tracing::debug!(
764 exec_type = ?exec_type,
765 order_id = ?msg.order_id,
766 symbol = ?msg.symbol,
767 "Insurance execution skipped (not a fill)"
768 );
769 return None;
770 }
771 BitmexExecType::Rebalance => {
772 tracing::debug!(
773 exec_type = ?exec_type,
774 order_id = ?msg.order_id,
775 symbol = ?msg.symbol,
776 "Rebalance execution skipped (not a fill)"
777 );
778 return None;
779 }
780
781 BitmexExecType::New
783 | BitmexExecType::Canceled
784 | BitmexExecType::CancelReject
785 | BitmexExecType::Replaced
786 | BitmexExecType::Rejected
787 | BitmexExecType::AmendReject
788 | BitmexExecType::Suspended
789 | BitmexExecType::Released
790 | BitmexExecType::TriggeredOrActivatedBySystem => {
791 tracing::debug!(
792 exec_type = ?exec_type,
793 order_id = ?msg.order_id,
794 "Execution message skipped (order state change, not a fill)"
795 );
796 return None;
797 }
798 }
799
800 let account_id = AccountId::new(format!("BITMEX-{}", msg.account?));
801 let instrument_id = parse_instrument_id(msg.symbol?);
802 let venue_order_id = VenueOrderId::new(msg.order_id?.to_string());
803 let trade_id = TradeId::new(msg.trd_match_id?.to_string());
804 let order_side: OrderSide = msg.side.map_or(OrderSide::NoOrderSide, |s| {
805 let side: BitmexSide = s.into();
806 side.into()
807 });
808 let last_qty = parse_signed_contracts_quantity(msg.last_qty?, instrument);
809 let last_px = Price::new(msg.last_px?, instrument.price_precision());
810 let settlement_currency_str = msg.settl_currency.unwrap_or(Ustr::from("XBT"));
811 let mapped_currency = map_bitmex_currency(settlement_currency_str.as_str());
812 let currency = get_currency(&mapped_currency);
813 let commission = Money::new(msg.commission.unwrap_or(0.0), currency);
814 let liquidity_side = parse_liquidity_side(&msg.last_liquidity_ind);
815 let client_order_id = msg.cl_ord_id.map(ClientOrderId::new);
816 let venue_position_id = None; let ts_event = parse_optional_datetime_to_unix_nanos(&msg.transact_time, "transact_time");
818 let ts_init = get_atomic_clock_realtime().get_time_ns();
819
820 Some(FillReport::new(
821 account_id,
822 instrument_id,
823 venue_order_id,
824 trade_id,
825 order_side,
826 last_qty,
827 last_px,
828 commission,
829 liquidity_side,
830 client_order_id,
831 venue_position_id,
832 ts_event,
833 ts_init,
834 None,
835 ))
836}
837
838#[must_use]
844pub fn parse_position_msg(
845 msg: BitmexPositionMsg,
846 instrument: &InstrumentAny,
847) -> PositionStatusReport {
848 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
849 let instrument_id = parse_instrument_id(msg.symbol);
850 let position_side = parse_position_side(msg.current_qty).as_specified();
851 let quantity = parse_signed_contracts_quantity(msg.current_qty.unwrap_or(0), instrument);
852 let venue_position_id = None; let avg_px_open = msg.avg_entry_price.and_then(Decimal::from_f64);
854 let ts_last = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
855 let ts_init = get_atomic_clock_realtime().get_time_ns();
856
857 PositionStatusReport::new(
858 account_id,
859 instrument_id,
860 position_side,
861 quantity,
862 ts_last,
863 ts_init,
864 None, venue_position_id, avg_px_open, )
868}
869
870#[must_use]
883pub fn parse_instrument_msg(
884 msg: BitmexInstrumentMsg,
885 instruments_cache: &AHashMap<Ustr, InstrumentAny>,
886 ts_init: UnixNanos,
887) -> Vec<Data> {
888 let mut updates = Vec::new();
889 let is_index = is_index_symbol(&msg.symbol);
890
891 let effective_index_price = if is_index {
894 msg.last_price
895 } else {
896 msg.index_price
897 };
898
899 if msg.mark_price.is_none() && effective_index_price.is_none() {
903 return updates;
904 }
905
906 let instrument_id = InstrumentId::new(Symbol::from_ustr_unchecked(msg.symbol), *BITMEX_VENUE);
907 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
908
909 let price_precision = match instruments_cache.get(&Ustr::from(&msg.symbol)) {
911 Some(instrument) => instrument.price_precision(),
912 None => {
913 if is_index {
917 tracing::trace!(
918 "Index instrument {} not in cache, skipping update",
919 msg.symbol
920 );
921 } else {
922 tracing::debug!("Instrument {} not in cache, skipping update", msg.symbol);
923 }
924 return updates;
925 }
926 };
927
928 if let Some(mark_price) = msg.mark_price {
931 let price = Price::new(mark_price, price_precision);
932 updates.push(Data::MarkPriceUpdate(MarkPriceUpdate::new(
933 instrument_id,
934 price,
935 ts_event,
936 ts_init,
937 )));
938 }
939
940 if let Some(index_price) = effective_index_price {
942 let price = Price::new(index_price, price_precision);
943 updates.push(Data::IndexPriceUpdate(IndexPriceUpdate::new(
944 instrument_id,
945 price,
946 ts_event,
947 ts_init,
948 )));
949 }
950
951 updates
952}
953
954pub fn parse_funding_msg(msg: BitmexFundingMsg, ts_init: UnixNanos) -> Option<FundingRateUpdate> {
960 let instrument_id = InstrumentId::from(format!("{}.BITMEX", msg.symbol).as_str());
961 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
962
963 let rate = match Decimal::try_from(msg.funding_rate) {
965 Ok(rate) => rate,
966 Err(e) => {
967 tracing::error!("Failed to parse funding rate: {e}");
968 return None;
969 }
970 };
971
972 Some(FundingRateUpdate::new(
973 instrument_id,
974 rate,
975 None, ts_event,
977 ts_init,
978 ))
979}
980
981#[must_use]
990pub fn parse_wallet_msg(msg: BitmexWalletMsg, ts_init: UnixNanos) -> AccountState {
991 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
992
993 let currency_str = map_bitmex_currency(msg.currency.as_str());
995 let currency = get_currency(¤cy_str);
996
997 let divisor = if msg.currency == "XBt" {
999 100_000_000.0 } else if msg.currency == "USDt" || msg.currency == "LAMp" {
1001 1_000_000.0 } else {
1003 1.0
1004 };
1005 let amount = msg.amount.unwrap_or(0) as f64 / divisor;
1006
1007 let total = Money::new(amount, currency);
1008 let locked = Money::new(0.0, currency); let free = total - locked;
1010
1011 let balance = AccountBalance::new_checked(total, locked, free)
1012 .expect("Balance calculation should be valid");
1013
1014 AccountState::new(
1015 account_id,
1016 AccountType::Margin,
1017 vec![balance],
1018 vec![], true, UUID4::new(),
1021 ts_init,
1022 ts_init,
1023 None,
1024 )
1025}
1026
1027#[must_use]
1031pub fn parse_margin_msg(msg: BitmexMarginMsg, instrument_id: InstrumentId) -> MarginBalance {
1032 let currency_str = map_bitmex_currency(msg.currency.as_str());
1034 let currency = get_currency(¤cy_str);
1035
1036 let divisor = if msg.currency == "XBt" {
1038 100_000_000.0 } else if msg.currency == "USDt" || msg.currency == "LAMp" {
1040 1_000_000.0 } else {
1042 1.0
1043 };
1044
1045 let initial = (msg.init_margin.unwrap_or(0) as f64 / divisor).max(0.0);
1046 let maintenance = (msg.maint_margin.unwrap_or(0) as f64 / divisor).max(0.0);
1047 let _unrealized = msg.unrealised_pnl.unwrap_or(0) as f64 / divisor;
1048
1049 MarginBalance::new(
1050 Money::new(initial, currency),
1051 Money::new(maintenance, currency),
1052 instrument_id,
1053 )
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058 use chrono::{DateTime, Utc};
1059 use nautilus_model::{
1060 enums::{AggressorSide, BookAction, LiquiditySide, PositionSide},
1061 identifiers::Symbol,
1062 instruments::crypto_perpetual::CryptoPerpetual,
1063 };
1064 use rstest::rstest;
1065 use ustr::Ustr;
1066
1067 use super::*;
1068 use crate::common::{
1069 enums::{BitmexExecType, BitmexOrderStatus},
1070 testing::load_test_json,
1071 };
1072
1073 fn create_test_perpetual_instrument_with_precisions(
1075 price_precision: u8,
1076 size_precision: u8,
1077 ) -> InstrumentAny {
1078 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
1079 InstrumentId::from("XBTUSD.BITMEX"),
1080 Symbol::new("XBTUSD"),
1081 Currency::BTC(),
1082 Currency::USD(),
1083 Currency::BTC(),
1084 true, price_precision,
1086 size_precision,
1087 Price::new(0.5, price_precision),
1088 Quantity::new(1.0, size_precision),
1089 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
1102 UnixNanos::default(),
1103 ))
1104 }
1105
1106 fn create_test_perpetual_instrument() -> InstrumentAny {
1107 create_test_perpetual_instrument_with_precisions(1, 0)
1108 }
1109
1110 #[rstest]
1111 fn test_orderbook_l2_message() {
1112 let json_data = load_test_json("ws_orderbook_l2.json");
1113
1114 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1115 let msg: BitmexOrderBookMsg = serde_json::from_str(&json_data).unwrap();
1116
1117 let instrument = create_test_perpetual_instrument();
1119 let delta = parse_book_msg(
1120 &msg,
1121 &BitmexAction::Insert,
1122 &instrument,
1123 instrument.id(),
1124 instrument.price_precision(),
1125 UnixNanos::from(3),
1126 );
1127 assert_eq!(delta.instrument_id, instrument_id);
1128 assert_eq!(delta.order.price, Price::from("98459.9"));
1129 assert_eq!(delta.order.size, Quantity::from(33000));
1130 assert_eq!(delta.order.side, OrderSide::Sell);
1131 assert_eq!(delta.order.order_id, 62400580205);
1132 assert_eq!(delta.action, BookAction::Add);
1133 assert_eq!(delta.flags, RecordFlag::F_SNAPSHOT as u8);
1134 assert_eq!(delta.sequence, 0);
1135 assert_eq!(delta.ts_event, 1732436782356000000); assert_eq!(delta.ts_init, 3);
1137
1138 let delta = parse_book_msg(
1140 &msg,
1141 &BitmexAction::Update,
1142 &instrument,
1143 instrument.id(),
1144 instrument.price_precision(),
1145 UnixNanos::from(3),
1146 );
1147 assert_eq!(delta.flags, 0);
1148 assert_eq!(delta.action, BookAction::Update);
1149 }
1150
1151 #[rstest]
1152 fn test_orderbook10_message() {
1153 let json_data = load_test_json("ws_orderbook_10.json");
1154 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1155 let msg: BitmexOrderBook10Msg = serde_json::from_str(&json_data).unwrap();
1156 let instrument = create_test_perpetual_instrument();
1157 let depth10 = parse_book10_msg(
1158 &msg,
1159 &instrument,
1160 instrument.id(),
1161 instrument.price_precision(),
1162 UnixNanos::from(3),
1163 );
1164
1165 assert_eq!(depth10.instrument_id, instrument_id);
1166
1167 assert_eq!(depth10.bids[0].price, Price::from("98490.3"));
1169 assert_eq!(depth10.bids[0].size, Quantity::from(22400));
1170 assert_eq!(depth10.bids[0].side, OrderSide::Buy);
1171
1172 assert_eq!(depth10.asks[0].price, Price::from("98490.4"));
1174 assert_eq!(depth10.asks[0].size, Quantity::from(17600));
1175 assert_eq!(depth10.asks[0].side, OrderSide::Sell);
1176
1177 assert_eq!(depth10.bid_counts, [1; DEPTH10_LEN]);
1179 assert_eq!(depth10.ask_counts, [1; DEPTH10_LEN]);
1180
1181 assert_eq!(depth10.sequence, 0);
1183 assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
1184 assert_eq!(depth10.ts_event, 1732436353513000000); assert_eq!(depth10.ts_init, 3);
1186 }
1187
1188 #[rstest]
1189 fn test_quote_message() {
1190 let json_data = load_test_json("ws_quote.json");
1191
1192 let instrument_id = InstrumentId::from("BCHUSDT.BITMEX");
1193 let last_quote = QuoteTick::new(
1194 instrument_id,
1195 Price::new(487.50, 2),
1196 Price::new(488.20, 2),
1197 Quantity::from(100_000),
1198 Quantity::from(100_000),
1199 UnixNanos::from(1),
1200 UnixNanos::from(2),
1201 );
1202 let msg: BitmexQuoteMsg = serde_json::from_str(&json_data).unwrap();
1203 let instrument = create_test_perpetual_instrument_with_precisions(2, 0);
1204 let quote = parse_quote_msg(
1205 &msg,
1206 &last_quote,
1207 &instrument,
1208 instrument_id,
1209 instrument.price_precision(),
1210 UnixNanos::from(3),
1211 );
1212
1213 assert_eq!(quote.instrument_id, instrument_id);
1214 assert_eq!(quote.bid_price, Price::from("487.55"));
1215 assert_eq!(quote.ask_price, Price::from("488.25"));
1216 assert_eq!(quote.bid_size, Quantity::from(103_000));
1217 assert_eq!(quote.ask_size, Quantity::from(50_000));
1218 assert_eq!(quote.ts_event, 1732315465085000000);
1219 assert_eq!(quote.ts_init, 3);
1220 }
1221
1222 #[rstest]
1223 fn test_trade_message() {
1224 let json_data = load_test_json("ws_trade.json");
1225
1226 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1227 let msg: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
1228 let instrument = create_test_perpetual_instrument();
1229 let trade = parse_trade_msg(
1230 &msg,
1231 &instrument,
1232 instrument.id(),
1233 instrument.price_precision(),
1234 UnixNanos::from(3),
1235 );
1236
1237 assert_eq!(trade.instrument_id, instrument_id);
1238 assert_eq!(trade.price, Price::from("98570.9"));
1239 assert_eq!(trade.size, Quantity::from(100));
1240 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1241 assert_eq!(
1242 trade.trade_id.to_string(),
1243 "00000000-006d-1000-0000-000e8737d536"
1244 );
1245 assert_eq!(trade.ts_event, 1732436138704000000); assert_eq!(trade.ts_init, 3);
1247 }
1248
1249 #[rstest]
1250 fn test_trade_bin_message() {
1251 let json_data = load_test_json("ws_trade_bin_1m.json");
1252
1253 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1254 let topic = BitmexWsTopic::TradeBin1m;
1255
1256 let msg: BitmexTradeBinMsg = serde_json::from_str(&json_data).unwrap();
1257 let instrument = create_test_perpetual_instrument();
1258 let bar = parse_trade_bin_msg(
1259 &msg,
1260 &topic,
1261 &instrument,
1262 instrument.id(),
1263 instrument.price_precision(),
1264 UnixNanos::from(3),
1265 );
1266
1267 assert_eq!(bar.instrument_id(), instrument_id);
1268 assert_eq!(
1269 bar.bar_type.spec(),
1270 BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
1271 );
1272 assert_eq!(bar.open, Price::from("97550.0"));
1273 assert_eq!(bar.high, Price::from("97584.4"));
1274 assert_eq!(bar.low, Price::from("97550.0"));
1275 assert_eq!(bar.close, Price::from("97570.1"));
1276 assert_eq!(bar.volume, Quantity::from(84_000));
1277 assert_eq!(bar.ts_event, 1732392420000000000); assert_eq!(bar.ts_init, 3);
1279 }
1280
1281 #[rstest]
1282 fn test_trade_bin_message_extreme_adjustment() {
1283 let topic = BitmexWsTopic::TradeBin1m;
1284 let instrument = create_test_perpetual_instrument();
1285
1286 let msg = BitmexTradeBinMsg {
1287 timestamp: DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
1288 .unwrap()
1289 .with_timezone(&Utc),
1290 symbol: Ustr::from("XBTUSD"),
1291 open: 50_000.0,
1292 high: 49_990.0,
1293 low: 50_010.0,
1294 close: 50_005.0,
1295 trades: 10,
1296 volume: 1_000,
1297 vwap: 0.0,
1298 last_size: 0,
1299 turnover: 0,
1300 home_notional: 0.0,
1301 foreign_notional: 0.0,
1302 };
1303
1304 let bar = parse_trade_bin_msg(
1305 &msg,
1306 &topic,
1307 &instrument,
1308 instrument.id(),
1309 instrument.price_precision(),
1310 UnixNanos::from(3),
1311 );
1312
1313 assert_eq!(bar.high, Price::from("50010.0"));
1314 assert_eq!(bar.low, Price::from("49990.0"));
1315 assert_eq!(bar.open, Price::from("50000.0"));
1316 assert_eq!(bar.close, Price::from("50005.0"));
1317 assert_eq!(bar.volume, Quantity::from(1_000));
1318 }
1319
1320 #[rstest]
1321 fn test_parse_order_msg() {
1322 let json_data = load_test_json("ws_order.json");
1323 let msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1324 let cache = DashMap::new();
1325 let instrument = create_test_perpetual_instrument();
1326 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1327
1328 assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1329 assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1330 assert_eq!(
1331 report.venue_order_id.to_string(),
1332 "550e8400-e29b-41d4-a716-446655440001"
1333 );
1334 assert_eq!(
1335 report.client_order_id.unwrap().to_string(),
1336 "mm_bitmex_1a/oemUeQ4CAJZgP3fjHsA"
1337 );
1338 assert_eq!(report.order_side, OrderSide::Buy);
1339 assert_eq!(report.order_type, OrderType::Limit);
1340 assert_eq!(report.time_in_force, TimeInForce::Gtc);
1341 assert_eq!(report.order_status, OrderStatus::Accepted);
1342 assert_eq!(report.quantity, Quantity::from(100));
1343 assert_eq!(report.filled_qty, Quantity::from(0));
1344 assert_eq!(report.price.unwrap(), Price::from("98000.0"));
1345 assert_eq!(report.ts_accepted, 1732530600000000000); }
1347
1348 #[rstest]
1349 fn test_parse_order_msg_infers_type_when_missing() {
1350 let json_data = load_test_json("ws_order.json");
1351 let mut msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1352 msg.ord_type = None;
1353 msg.cl_ord_id = None;
1354 msg.price = Some(98_000.0);
1355 msg.stop_px = None;
1356
1357 let cache = DashMap::new();
1358 let instrument = create_test_perpetual_instrument();
1359
1360 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1361
1362 assert_eq!(report.order_type, OrderType::Limit);
1363 }
1364
1365 #[rstest]
1366 fn test_parse_order_msg_rejected_with_reason() {
1367 let mut msg: BitmexOrderMsg =
1368 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1369 msg.ord_status = BitmexOrderStatus::Rejected;
1370 msg.ord_rej_reason = Some(Ustr::from("Insufficient available balance"));
1371 msg.text = None;
1372 msg.cum_qty = 0;
1373
1374 let cache = DashMap::new();
1375 let instrument = create_test_perpetual_instrument();
1376 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1377
1378 assert_eq!(report.order_status, OrderStatus::Rejected);
1379 assert_eq!(
1380 report.cancel_reason,
1381 Some("Insufficient available balance".to_string())
1382 );
1383 }
1384
1385 #[rstest]
1386 fn test_parse_order_msg_rejected_with_text_fallback() {
1387 let mut msg: BitmexOrderMsg =
1388 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1389 msg.ord_status = BitmexOrderStatus::Rejected;
1390 msg.ord_rej_reason = None;
1391 msg.text = Some(Ustr::from("Order would execute immediately"));
1392 msg.cum_qty = 0;
1393
1394 let cache = DashMap::new();
1395 let instrument = create_test_perpetual_instrument();
1396 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1397
1398 assert_eq!(report.order_status, OrderStatus::Rejected);
1399 assert_eq!(
1400 report.cancel_reason,
1401 Some("Order would execute immediately".to_string())
1402 );
1403 }
1404
1405 #[rstest]
1406 fn test_parse_order_msg_rejected_without_reason() {
1407 let mut msg: BitmexOrderMsg =
1408 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1409 msg.ord_status = BitmexOrderStatus::Rejected;
1410 msg.ord_rej_reason = None;
1411 msg.text = None;
1412 msg.cum_qty = 0;
1413
1414 let cache = DashMap::new();
1415 let instrument = create_test_perpetual_instrument();
1416 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1417
1418 assert_eq!(report.order_status, OrderStatus::Rejected);
1419 assert_eq!(report.cancel_reason, None);
1420 }
1421
1422 #[rstest]
1423 fn test_parse_execution_msg() {
1424 let json_data = load_test_json("ws_execution.json");
1425 let msg: BitmexExecutionMsg = serde_json::from_str(&json_data).unwrap();
1426 let instrument = create_test_perpetual_instrument();
1427 let fill = parse_execution_msg(msg, &instrument).unwrap();
1428
1429 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1430 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1431 assert_eq!(
1432 fill.venue_order_id.to_string(),
1433 "550e8400-e29b-41d4-a716-446655440002"
1434 );
1435 assert_eq!(
1436 fill.trade_id.to_string(),
1437 "00000000-006d-1000-0000-000e8737d540"
1438 );
1439 assert_eq!(
1440 fill.client_order_id.unwrap().to_string(),
1441 "mm_bitmex_2b/oemUeQ4CAJZgP3fjHsB"
1442 );
1443 assert_eq!(fill.order_side, OrderSide::Sell);
1444 assert_eq!(fill.last_qty, Quantity::from(100));
1445 assert_eq!(fill.last_px, Price::from("98950.0"));
1446 assert_eq!(fill.liquidity_side, LiquiditySide::Maker);
1447 assert_eq!(fill.commission, Money::new(0.00075, Currency::from("XBT")));
1448 assert_eq!(fill.commission.currency.code.to_string(), "XBT");
1449 assert_eq!(fill.ts_event, 1732530900789000000); }
1451
1452 #[rstest]
1453 fn test_parse_execution_msg_non_trade() {
1454 let mut msg: BitmexExecutionMsg =
1456 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1457 msg.exec_type = Some(BitmexExecType::Settlement);
1458
1459 let instrument = create_test_perpetual_instrument();
1460 let result = parse_execution_msg(msg, &instrument);
1461 assert!(result.is_none());
1462 }
1463
1464 #[rstest]
1465 fn test_parse_cancel_reject_execution() {
1466 let json = load_test_json("ws_execution_cancel_reject.json");
1468
1469 let msg: BitmexExecutionMsg = serde_json::from_str(&json).unwrap();
1470 assert_eq!(msg.exec_type, Some(BitmexExecType::CancelReject));
1471 assert_eq!(msg.ord_status, Some(BitmexOrderStatus::Rejected));
1472 assert_eq!(msg.symbol, None);
1473
1474 let instrument = create_test_perpetual_instrument();
1476 let result = parse_execution_msg(msg, &instrument);
1477 assert!(result.is_none());
1478 }
1479
1480 #[rstest]
1481 fn test_parse_execution_msg_liquidation() {
1482 let mut msg: BitmexExecutionMsg =
1484 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1485 msg.exec_type = Some(BitmexExecType::Liquidation);
1486
1487 let instrument = create_test_perpetual_instrument();
1488 let fill = parse_execution_msg(msg, &instrument).unwrap();
1489
1490 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1491 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1492 assert_eq!(fill.order_side, OrderSide::Sell);
1493 assert_eq!(fill.last_qty, Quantity::from(100));
1494 assert_eq!(fill.last_px, Price::from("98950.0"));
1495 }
1496
1497 #[rstest]
1498 fn test_parse_execution_msg_bankruptcy() {
1499 let mut msg: BitmexExecutionMsg =
1500 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1501 msg.exec_type = Some(BitmexExecType::Bankruptcy);
1502
1503 let instrument = create_test_perpetual_instrument();
1504 let fill = parse_execution_msg(msg, &instrument).unwrap();
1505
1506 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1507 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1508 assert_eq!(fill.order_side, OrderSide::Sell);
1509 assert_eq!(fill.last_qty, Quantity::from(100));
1510 }
1511
1512 #[rstest]
1513 fn test_parse_execution_msg_settlement() {
1514 let mut msg: BitmexExecutionMsg =
1515 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1516 msg.exec_type = Some(BitmexExecType::Settlement);
1517
1518 let instrument = create_test_perpetual_instrument();
1519 let result = parse_execution_msg(msg, &instrument);
1520 assert!(result.is_none());
1521 }
1522
1523 #[rstest]
1524 fn test_parse_execution_msg_trial_fill() {
1525 let mut msg: BitmexExecutionMsg =
1526 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1527 msg.exec_type = Some(BitmexExecType::TrialFill);
1528
1529 let instrument = create_test_perpetual_instrument();
1530 let result = parse_execution_msg(msg, &instrument);
1531 assert!(result.is_none());
1532 }
1533
1534 #[rstest]
1535 fn test_parse_execution_msg_funding() {
1536 let mut msg: BitmexExecutionMsg =
1537 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1538 msg.exec_type = Some(BitmexExecType::Funding);
1539
1540 let instrument = create_test_perpetual_instrument();
1541 let result = parse_execution_msg(msg, &instrument);
1542 assert!(result.is_none());
1543 }
1544
1545 #[rstest]
1546 fn test_parse_execution_msg_insurance() {
1547 let mut msg: BitmexExecutionMsg =
1548 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1549 msg.exec_type = Some(BitmexExecType::Insurance);
1550
1551 let instrument = create_test_perpetual_instrument();
1552 let result = parse_execution_msg(msg, &instrument);
1553 assert!(result.is_none());
1554 }
1555
1556 #[rstest]
1557 fn test_parse_execution_msg_rebalance() {
1558 let mut msg: BitmexExecutionMsg =
1559 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1560 msg.exec_type = Some(BitmexExecType::Rebalance);
1561
1562 let instrument = create_test_perpetual_instrument();
1563 let result = parse_execution_msg(msg, &instrument);
1564 assert!(result.is_none());
1565 }
1566
1567 #[rstest]
1568 fn test_parse_execution_msg_order_state_changes() {
1569 let instrument = create_test_perpetual_instrument();
1570
1571 let order_state_types = vec![
1572 BitmexExecType::New,
1573 BitmexExecType::Canceled,
1574 BitmexExecType::CancelReject,
1575 BitmexExecType::Replaced,
1576 BitmexExecType::Rejected,
1577 BitmexExecType::AmendReject,
1578 BitmexExecType::Suspended,
1579 BitmexExecType::Released,
1580 BitmexExecType::TriggeredOrActivatedBySystem,
1581 ];
1582
1583 for exec_type in order_state_types {
1584 let mut msg: BitmexExecutionMsg =
1585 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1586 msg.exec_type = Some(exec_type);
1587
1588 let result = parse_execution_msg(msg, &instrument);
1589 assert!(
1590 result.is_none(),
1591 "Expected None for exec_type {exec_type:?}"
1592 );
1593 }
1594 }
1595
1596 #[rstest]
1597 fn test_parse_position_msg() {
1598 let json_data = load_test_json("ws_position.json");
1599 let msg: BitmexPositionMsg = serde_json::from_str(&json_data).unwrap();
1600 let instrument = create_test_perpetual_instrument();
1601 let report = parse_position_msg(msg, &instrument);
1602
1603 assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1604 assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1605 assert_eq!(report.position_side.as_position_side(), PositionSide::Long);
1606 assert_eq!(report.quantity, Quantity::from(1000));
1607 assert!(report.venue_position_id.is_none());
1608 assert_eq!(report.ts_last, 1732530900789000000); }
1610
1611 #[rstest]
1612 fn test_parse_position_msg_short() {
1613 let mut msg: BitmexPositionMsg =
1614 serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1615 msg.current_qty = Some(-500);
1616
1617 let instrument = create_test_perpetual_instrument();
1618 let report = parse_position_msg(msg, &instrument);
1619 assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
1620 assert_eq!(report.quantity, Quantity::from(500));
1621 }
1622
1623 #[rstest]
1624 fn test_parse_position_msg_flat() {
1625 let mut msg: BitmexPositionMsg =
1626 serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1627 msg.current_qty = Some(0);
1628
1629 let instrument = create_test_perpetual_instrument();
1630 let report = parse_position_msg(msg, &instrument);
1631 assert_eq!(report.position_side.as_position_side(), PositionSide::Flat);
1632 assert_eq!(report.quantity, Quantity::from(0));
1633 }
1634
1635 #[rstest]
1636 fn test_parse_wallet_msg() {
1637 let json_data = load_test_json("ws_wallet.json");
1638 let msg: BitmexWalletMsg = serde_json::from_str(&json_data).unwrap();
1639 let ts_init = UnixNanos::from(1);
1640 let account_state = parse_wallet_msg(msg, ts_init);
1641
1642 assert_eq!(account_state.account_id.to_string(), "BITMEX-1234567");
1643 assert!(!account_state.balances.is_empty());
1644 let balance = &account_state.balances[0];
1645 assert_eq!(balance.currency.code.to_string(), "XBT");
1646 assert!((balance.total.as_f64() - 1.0000518).abs() < 1e-7);
1648 }
1649
1650 #[rstest]
1651 fn test_parse_wallet_msg_no_amount() {
1652 let mut msg: BitmexWalletMsg =
1653 serde_json::from_str(&load_test_json("ws_wallet.json")).unwrap();
1654 msg.amount = None;
1655
1656 let ts_init = UnixNanos::from(1);
1657 let account_state = parse_wallet_msg(msg, ts_init);
1658 let balance = &account_state.balances[0];
1659 assert_eq!(balance.total.as_f64(), 0.0);
1660 }
1661
1662 #[rstest]
1663 fn test_parse_margin_msg() {
1664 let json_data = load_test_json("ws_margin.json");
1665 let msg: BitmexMarginMsg = serde_json::from_str(&json_data).unwrap();
1666 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1667 let margin_balance = parse_margin_msg(msg, instrument_id);
1668
1669 assert_eq!(margin_balance.currency.code.to_string(), "XBT");
1670 assert_eq!(margin_balance.instrument_id, instrument_id);
1671 assert_eq!(margin_balance.initial.as_f64(), 0.0);
1674 assert!((margin_balance.maintenance.as_f64() - 0.00015949).abs() < 1e-8);
1676 }
1677
1678 #[rstest]
1679 fn test_parse_margin_msg_no_available() {
1680 let mut msg: BitmexMarginMsg =
1681 serde_json::from_str(&load_test_json("ws_margin.json")).unwrap();
1682 msg.available_margin = None;
1683
1684 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1685 let margin_balance = parse_margin_msg(msg, instrument_id);
1686 assert!(margin_balance.initial.as_f64() >= 0.0);
1688 assert!(margin_balance.maintenance.as_f64() >= 0.0);
1689 }
1690
1691 #[rstest]
1692 fn test_parse_instrument_msg_both_prices() {
1693 let json_data = load_test_json("ws_instrument.json");
1694 let msg: BitmexInstrumentMsg = serde_json::from_str(&json_data).unwrap();
1695
1696 let mut instruments_cache = AHashMap::new();
1698 let test_instrument = create_test_perpetual_instrument();
1699 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1700
1701 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1702
1703 assert_eq!(updates.len(), 2);
1705
1706 match &updates[0] {
1708 Data::MarkPriceUpdate(update) => {
1709 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1710 assert_eq!(update.value.as_f64(), 95125.7);
1711 }
1712 _ => panic!("Expected MarkPriceUpdate at index 0"),
1713 }
1714
1715 match &updates[1] {
1717 Data::IndexPriceUpdate(update) => {
1718 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1719 assert_eq!(update.value.as_f64(), 95124.3);
1720 }
1721 _ => panic!("Expected IndexPriceUpdate at index 1"),
1722 }
1723 }
1724
1725 #[rstest]
1726 fn test_parse_instrument_msg_mark_price_only() {
1727 let mut msg: BitmexInstrumentMsg =
1728 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1729 msg.index_price = None;
1730
1731 let mut instruments_cache = AHashMap::new();
1733 let test_instrument = create_test_perpetual_instrument();
1734 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1735
1736 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1737
1738 assert_eq!(updates.len(), 1);
1739 match &updates[0] {
1740 Data::MarkPriceUpdate(update) => {
1741 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1742 assert_eq!(update.value.as_f64(), 95125.7);
1743 }
1744 _ => panic!("Expected MarkPriceUpdate"),
1745 }
1746 }
1747
1748 #[rstest]
1749 fn test_parse_instrument_msg_index_price_only() {
1750 let mut msg: BitmexInstrumentMsg =
1751 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1752 msg.mark_price = None;
1753
1754 let mut instruments_cache = AHashMap::new();
1756 let test_instrument = create_test_perpetual_instrument();
1757 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1758
1759 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1760
1761 assert_eq!(updates.len(), 1);
1762 match &updates[0] {
1763 Data::IndexPriceUpdate(update) => {
1764 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1765 assert_eq!(update.value.as_f64(), 95124.3);
1766 }
1767 _ => panic!("Expected IndexPriceUpdate"),
1768 }
1769 }
1770
1771 #[rstest]
1772 fn test_parse_instrument_msg_no_prices() {
1773 let mut msg: BitmexInstrumentMsg =
1774 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1775 msg.mark_price = None;
1776 msg.index_price = None;
1777 msg.last_price = None;
1778
1779 let mut instruments_cache = AHashMap::new();
1781 let test_instrument = create_test_perpetual_instrument();
1782 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1783
1784 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1785 assert_eq!(updates.len(), 0);
1786 }
1787
1788 #[rstest]
1789 fn test_parse_instrument_msg_index_symbol() {
1790 let mut msg: BitmexInstrumentMsg =
1793 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1794 msg.symbol = Ustr::from(".BXBT");
1795 msg.last_price = Some(119163.05);
1796 msg.mark_price = Some(119163.05); msg.index_price = None;
1798
1799 let instrument_id = InstrumentId::from(".BXBT.BITMEX");
1801 let instrument = CryptoPerpetual::new(
1802 instrument_id,
1803 Symbol::from(".BXBT"),
1804 Currency::BTC(),
1805 Currency::USD(),
1806 Currency::USD(),
1807 false, 2, 8, Price::from("0.01"),
1811 Quantity::from("0.00000001"),
1812 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(), UnixNanos::default(), );
1827 let mut instruments_cache = AHashMap::new();
1828 instruments_cache.insert(
1829 Ustr::from(".BXBT"),
1830 InstrumentAny::CryptoPerpetual(instrument),
1831 );
1832
1833 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1834
1835 assert_eq!(updates.len(), 2);
1836
1837 match &updates[0] {
1839 Data::MarkPriceUpdate(update) => {
1840 assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1841 assert_eq!(update.value, Price::from("119163.05"));
1842 }
1843 _ => panic!("Expected MarkPriceUpdate for index symbol"),
1844 }
1845
1846 match &updates[1] {
1848 Data::IndexPriceUpdate(update) => {
1849 assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1850 assert_eq!(update.value, Price::from("119163.05"));
1851 assert_eq!(update.ts_init, UnixNanos::from(1));
1852 }
1853 _ => panic!("Expected IndexPriceUpdate for index symbol"),
1854 }
1855 }
1856
1857 #[rstest]
1858 fn test_parse_funding_msg() {
1859 let json_data = load_test_json("ws_funding_rate.json");
1860 let msg: BitmexFundingMsg = serde_json::from_str(&json_data).unwrap();
1861 let update = parse_funding_msg(msg, UnixNanos::from(1));
1862
1863 assert!(update.is_some());
1864 let update = update.unwrap();
1865
1866 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1867 assert_eq!(update.rate.to_string(), "0.0001");
1868 assert!(update.next_funding_ns.is_none());
1869 assert_eq!(update.ts_event, UnixNanos::from(1732507200000000000));
1870 assert_eq!(update.ts_init, UnixNanos::from(1));
1871 }
1872}