1use std::{num::NonZero, str::FromStr};
19
20use ahash::AHashMap;
21use dashmap::DashMap;
22use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime, uuid::UUID4};
23#[cfg(test)]
24use nautilus_model::types::Currency;
25use nautilus_model::{
26 data::{
27 Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
28 MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
29 depth::DEPTH10_LEN,
30 },
31 enums::{
32 AccountType, AggregationSource, BarAggregation, OrderSide, OrderStatus, OrderType,
33 PriceType, RecordFlag, TimeInForce, TrailingOffsetType,
34 },
35 events::{OrderUpdated, account::state::AccountState},
36 identifiers::{
37 AccountId, ClientOrderId, InstrumentId, OrderListId, StrategyId, Symbol, TradeId, TraderId,
38 VenueOrderId,
39 },
40 instruments::{Instrument, InstrumentAny},
41 reports::{FillReport, OrderStatusReport, PositionStatusReport},
42 types::{AccountBalance, MarginBalance, Money, Price, Quantity},
43};
44use rust_decimal::Decimal;
45use ustr::Ustr;
46use uuid::Uuid;
47
48use super::{
49 enums::{BitmexAction, BitmexWsTopic},
50 messages::{
51 BitmexExecutionMsg, BitmexFundingMsg, BitmexInstrumentMsg, BitmexMarginMsg,
52 BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexOrderMsg, BitmexPositionMsg,
53 BitmexQuoteMsg, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg,
54 },
55};
56use crate::{
57 common::{
58 consts::BITMEX_VENUE,
59 enums::{
60 BitmexExecInstruction, BitmexExecType, BitmexOrderType, BitmexPegPriceType, BitmexSide,
61 },
62 parse::{
63 clean_reason, extract_trigger_type, map_bitmex_currency, normalize_trade_bin_prices,
64 normalize_trade_bin_volume, parse_contracts_quantity, parse_fractional_quantity,
65 parse_instrument_id, parse_liquidity_side, parse_optional_datetime_to_unix_nanos,
66 parse_position_side, parse_signed_contracts_quantity,
67 },
68 },
69 http::parse::get_currency,
70 websocket::messages::BitmexOrderUpdateMsg,
71};
72
73const BAR_SPEC_1_MINUTE: BarSpecification = BarSpecification {
74 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
75 aggregation: BarAggregation::Minute,
76 price_type: PriceType::Last,
77};
78const BAR_SPEC_5_MINUTE: BarSpecification = BarSpecification {
79 step: NonZero::new(5).expect("5 is a valid non-zero usize"),
80 aggregation: BarAggregation::Minute,
81 price_type: PriceType::Last,
82};
83const BAR_SPEC_1_HOUR: BarSpecification = BarSpecification {
84 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
85 aggregation: BarAggregation::Hour,
86 price_type: PriceType::Last,
87};
88const BAR_SPEC_1_DAY: BarSpecification = BarSpecification {
89 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
90 aggregation: BarAggregation::Day,
91 price_type: PriceType::Last,
92};
93
94#[inline]
102#[must_use]
103pub fn is_index_symbol(symbol: &Ustr) -> bool {
104 symbol.starts_with('.')
105}
106
107#[must_use]
109pub fn parse_book_msg_vec(
110 data: Vec<BitmexOrderBookMsg>,
111 action: BitmexAction,
112 instruments: &AHashMap<Ustr, InstrumentAny>,
113 ts_init: UnixNanos,
114) -> Vec<Data> {
115 let mut deltas = Vec::with_capacity(data.len());
116
117 for msg in data {
118 if let Some(instrument) = instruments.get(&msg.symbol) {
119 let instrument_id = instrument.id();
120 let price_precision = instrument.price_precision();
121 deltas.push(Data::Delta(parse_book_msg(
122 &msg,
123 &action,
124 instrument,
125 instrument_id,
126 price_precision,
127 ts_init,
128 )));
129 } else {
130 log::error!(
131 "Instrument cache miss: book delta dropped for symbol={}",
132 msg.symbol
133 );
134 }
135 }
136
137 if let Some(Data::Delta(last_delta)) = deltas.last_mut() {
139 *last_delta = OrderBookDelta::new(
140 last_delta.instrument_id,
141 last_delta.action,
142 last_delta.order,
143 last_delta.flags | RecordFlag::F_LAST as u8,
144 last_delta.sequence,
145 last_delta.ts_event,
146 last_delta.ts_init,
147 );
148 }
149
150 deltas
151}
152
153#[must_use]
155pub fn parse_book10_msg_vec(
156 data: Vec<BitmexOrderBook10Msg>,
157 instruments: &AHashMap<Ustr, InstrumentAny>,
158 ts_init: UnixNanos,
159) -> Vec<Data> {
160 let mut depths = Vec::with_capacity(data.len());
161
162 for msg in data {
163 if let Some(instrument) = instruments.get(&msg.symbol) {
164 let instrument_id = instrument.id();
165 let price_precision = instrument.price_precision();
166 match parse_book10_msg(&msg, instrument, instrument_id, price_precision, ts_init) {
167 Ok(depth) => depths.push(Data::Depth10(Box::new(depth))),
168 Err(e) => {
169 log::error!("Failed to parse orderBook10 for symbol={}: {e}", msg.symbol);
170 }
171 }
172 } else {
173 log::error!(
174 "Instrument cache miss: depth10 message dropped for symbol={}",
175 msg.symbol
176 );
177 }
178 }
179 depths
180}
181
182#[must_use]
184pub fn parse_trade_msg_vec(
185 data: Vec<BitmexTradeMsg>,
186 instruments: &AHashMap<Ustr, InstrumentAny>,
187 ts_init: UnixNanos,
188) -> Vec<Data> {
189 let mut trades = Vec::with_capacity(data.len());
190
191 for msg in data {
192 if let Some(instrument) = instruments.get(&msg.symbol) {
193 let instrument_id = instrument.id();
194 let price_precision = instrument.price_precision();
195 trades.push(Data::Trade(parse_trade_msg(
196 &msg,
197 instrument,
198 instrument_id,
199 price_precision,
200 ts_init,
201 )));
202 } else {
203 log::error!(
204 "Instrument cache miss: trade message dropped for symbol={}",
205 msg.symbol
206 );
207 }
208 }
209 trades
210}
211
212#[must_use]
214pub fn parse_trade_bin_msg_vec(
215 data: Vec<BitmexTradeBinMsg>,
216 topic: BitmexWsTopic,
217 instruments: &AHashMap<Ustr, InstrumentAny>,
218 ts_init: UnixNanos,
219) -> Vec<Data> {
220 let mut trades = Vec::with_capacity(data.len());
221
222 for msg in data {
223 if let Some(instrument) = instruments.get(&msg.symbol) {
224 let instrument_id = instrument.id();
225 let price_precision = instrument.price_precision();
226 trades.push(Data::Bar(parse_trade_bin_msg(
227 &msg,
228 &topic,
229 instrument,
230 instrument_id,
231 price_precision,
232 ts_init,
233 )));
234 } else {
235 log::error!(
236 "Instrument cache miss: trade bin (bar) dropped for symbol={}",
237 msg.symbol
238 );
239 }
240 }
241 trades
242}
243
244#[allow(clippy::too_many_arguments)]
246#[must_use]
247pub fn parse_book_msg(
248 msg: &BitmexOrderBookMsg,
249 action: &BitmexAction,
250 instrument: &InstrumentAny,
251 instrument_id: InstrumentId,
252 price_precision: u8,
253 ts_init: UnixNanos,
254) -> OrderBookDelta {
255 let flags = if action == &BitmexAction::Partial {
256 RecordFlag::F_SNAPSHOT as u8
257 } else {
258 0
259 };
260
261 let action = action.as_book_action();
262 let price = Price::new(msg.price, price_precision);
263 let side = msg.side.as_order_side();
264 let size = parse_contracts_quantity(msg.size.unwrap_or(0), instrument);
265 let order_id = msg.id;
266 let order = BookOrder::new(side, price, size, order_id);
267 let sequence = 0; let ts_event = UnixNanos::from(msg.timestamp);
269
270 OrderBookDelta::new(
271 instrument_id,
272 action,
273 order,
274 flags,
275 sequence,
276 ts_event,
277 ts_init,
278 )
279}
280
281#[allow(clippy::too_many_arguments)]
287pub fn parse_book10_msg(
288 msg: &BitmexOrderBook10Msg,
289 instrument: &InstrumentAny,
290 instrument_id: InstrumentId,
291 price_precision: u8,
292 ts_init: UnixNanos,
293) -> anyhow::Result<OrderBookDepth10> {
294 let mut bids = Vec::with_capacity(DEPTH10_LEN);
295 let mut asks = Vec::with_capacity(DEPTH10_LEN);
296
297 let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
299 let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
300
301 for (i, level) in msg.bids.iter().enumerate() {
302 let bid_order = BookOrder::new(
303 OrderSide::Buy,
304 Price::new(level[0], price_precision),
305 parse_fractional_quantity(level[1], instrument),
306 0,
307 );
308
309 bids.push(bid_order);
310 bid_counts[i] = 1;
311 }
312
313 for (i, level) in msg.asks.iter().enumerate() {
314 let ask_order = BookOrder::new(
315 OrderSide::Sell,
316 Price::new(level[0], price_precision),
317 parse_fractional_quantity(level[1], instrument),
318 0,
319 );
320
321 asks.push(ask_order);
322 ask_counts[i] = 1;
323 }
324
325 let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
326 anyhow::anyhow!(
327 "Bids length mismatch: expected {DEPTH10_LEN}, was {}",
328 v.len()
329 )
330 })?;
331 let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
332 anyhow::anyhow!(
333 "Asks length mismatch: expected {DEPTH10_LEN}, was {}",
334 v.len()
335 )
336 })?;
337
338 let ts_event = UnixNanos::from(msg.timestamp);
339
340 Ok(OrderBookDepth10::new(
341 instrument_id,
342 bids,
343 asks,
344 bid_counts,
345 ask_counts,
346 RecordFlag::F_SNAPSHOT as u8,
347 0, ts_event,
349 ts_init,
350 ))
351}
352
353#[must_use]
355pub fn parse_quote_msg(
356 msg: &BitmexQuoteMsg,
357 last_quote: &QuoteTick,
358 instrument: &InstrumentAny,
359 instrument_id: InstrumentId,
360 price_precision: u8,
361 ts_init: UnixNanos,
362) -> QuoteTick {
363 let bid_price = match msg.bid_price {
364 Some(price) => Price::new(price, price_precision),
365 None => last_quote.bid_price,
366 };
367
368 let ask_price = match msg.ask_price {
369 Some(price) => Price::new(price, price_precision),
370 None => last_quote.ask_price,
371 };
372
373 let bid_size = match msg.bid_size {
374 Some(size) => parse_contracts_quantity(size, instrument),
375 None => last_quote.bid_size,
376 };
377
378 let ask_size = match msg.ask_size {
379 Some(size) => parse_contracts_quantity(size, instrument),
380 None => last_quote.ask_size,
381 };
382
383 let ts_event = UnixNanos::from(msg.timestamp);
384
385 QuoteTick::new(
386 instrument_id,
387 bid_price,
388 ask_price,
389 bid_size,
390 ask_size,
391 ts_event,
392 ts_init,
393 )
394}
395
396#[must_use]
398pub fn parse_trade_msg(
399 msg: &BitmexTradeMsg,
400 instrument: &InstrumentAny,
401 instrument_id: InstrumentId,
402 price_precision: u8,
403 ts_init: UnixNanos,
404) -> TradeTick {
405 let price = Price::new(msg.price, price_precision);
406 let size = parse_contracts_quantity(msg.size, instrument);
407 let aggressor_side = msg.side.as_aggressor_side();
408 let trade_id = TradeId::new(
409 msg.trd_match_id
410 .map_or_else(|| Uuid::new_v4().to_string(), |uuid| uuid.to_string()),
411 );
412 let ts_event = UnixNanos::from(msg.timestamp);
413
414 TradeTick::new(
415 instrument_id,
416 price,
417 size,
418 aggressor_side,
419 trade_id,
420 ts_event,
421 ts_init,
422 )
423}
424
425#[must_use]
427pub fn parse_trade_bin_msg(
428 msg: &BitmexTradeBinMsg,
429 topic: &BitmexWsTopic,
430 instrument: &InstrumentAny,
431 instrument_id: InstrumentId,
432 price_precision: u8,
433 ts_init: UnixNanos,
434) -> Bar {
435 let spec = bar_spec_from_topic(topic);
436 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
437
438 let open = Price::new(msg.open, price_precision);
439 let high = Price::new(msg.high, price_precision);
440 let low = Price::new(msg.low, price_precision);
441 let close = Price::new(msg.close, price_precision);
442
443 let (open, high, low, close) =
444 normalize_trade_bin_prices(open, high, low, close, &msg.symbol, Some(&bar_type));
445
446 let volume_contracts = normalize_trade_bin_volume(Some(msg.volume), &msg.symbol);
447 let volume = parse_contracts_quantity(volume_contracts, instrument);
448 let ts_event = UnixNanos::from(msg.timestamp);
449
450 Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
451}
452
453#[must_use]
457pub fn bar_spec_from_topic(topic: &BitmexWsTopic) -> BarSpecification {
458 match topic {
459 BitmexWsTopic::TradeBin1m => BAR_SPEC_1_MINUTE,
460 BitmexWsTopic::TradeBin5m => BAR_SPEC_5_MINUTE,
461 BitmexWsTopic::TradeBin1h => BAR_SPEC_1_HOUR,
462 BitmexWsTopic::TradeBin1d => BAR_SPEC_1_DAY,
463 _ => {
464 log::error!("Bar specification not supported: topic={topic:?}");
465 BAR_SPEC_1_MINUTE
466 }
467 }
468}
469
470#[must_use]
474pub fn topic_from_bar_spec(spec: BarSpecification) -> BitmexWsTopic {
475 match spec {
476 BAR_SPEC_1_MINUTE => BitmexWsTopic::TradeBin1m,
477 BAR_SPEC_5_MINUTE => BitmexWsTopic::TradeBin5m,
478 BAR_SPEC_1_HOUR => BitmexWsTopic::TradeBin1h,
479 BAR_SPEC_1_DAY => BitmexWsTopic::TradeBin1d,
480 _ => {
481 log::error!("Bar specification not supported: spec={spec:?}");
482 BitmexWsTopic::TradeBin1m
483 }
484 }
485}
486
487fn infer_order_type_from_msg(msg: &BitmexOrderMsg) -> Option<OrderType> {
488 if msg.stop_px.is_some() {
489 if msg.price.is_some() {
490 Some(OrderType::StopLimit)
491 } else {
492 Some(OrderType::StopMarket)
493 }
494 } else if msg.price.is_some() {
495 Some(OrderType::Limit)
496 } else {
497 Some(OrderType::Market)
498 }
499}
500
501pub fn parse_order_msg(
515 msg: &BitmexOrderMsg,
516 instrument: &InstrumentAny,
517 order_type_cache: &DashMap<ClientOrderId, OrderType>,
518) -> anyhow::Result<OrderStatusReport> {
519 let account_id = AccountId::new(format!("BITMEX-{}", msg.account)); let instrument_id = parse_instrument_id(msg.symbol);
521 let venue_order_id = VenueOrderId::new(msg.order_id.to_string());
522 let common_side: BitmexSide = msg.side.into();
523 let order_side: OrderSide = common_side.into();
524
525 let order_type: OrderType = if let Some(ord_type) = msg.ord_type {
526 if ord_type == BitmexOrderType::Pegged
528 && msg.peg_price_type == Some(BitmexPegPriceType::TrailingStopPeg)
529 {
530 if msg.price.is_some() {
531 OrderType::TrailingStopLimit
532 } else {
533 OrderType::TrailingStopMarket
534 }
535 } else {
536 ord_type.into()
537 }
538 } else if let Some(client_order_id) = msg.cl_ord_id {
539 let client_order_id = ClientOrderId::new(client_order_id);
540 if let Some(entry) = order_type_cache.get(&client_order_id) {
541 *entry.value()
542 } else if let Some(inferred) = infer_order_type_from_msg(msg) {
543 order_type_cache.insert(client_order_id, inferred);
544 inferred
545 } else {
546 anyhow::bail!(
547 "Order type not found in cache for client_order_id: {client_order_id} (order missing ord_type field)"
548 );
549 }
550 } else if let Some(inferred) = infer_order_type_from_msg(msg) {
551 inferred
552 } else {
553 anyhow::bail!("Order missing both ord_type and cl_ord_id");
554 };
555
556 let time_in_force: TimeInForce = match msg.time_in_force {
557 Some(tif) => tif.try_into().map_err(|e| anyhow::anyhow!("{e}"))?,
558 None => TimeInForce::Gtc,
559 };
560 let order_status: OrderStatus = msg.ord_status.into();
561 let quantity = parse_signed_contracts_quantity(msg.order_qty, instrument);
562 let filled_qty = parse_signed_contracts_quantity(msg.cum_qty, instrument);
563 let report_id = UUID4::new();
564 let ts_accepted =
565 parse_optional_datetime_to_unix_nanos(&Some(msg.transact_time), "transact_time");
566 let ts_last = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
567 let ts_init = get_atomic_clock_realtime().get_time_ns();
568
569 let mut report = OrderStatusReport::new(
570 account_id,
571 instrument_id,
572 None, venue_order_id,
574 order_side,
575 order_type,
576 time_in_force,
577 order_status,
578 quantity,
579 filled_qty,
580 ts_accepted,
581 ts_last,
582 ts_init,
583 Some(report_id),
584 );
585
586 if let Some(cl_ord_id) = &msg.cl_ord_id {
587 report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
588 }
589
590 if let Some(cl_ord_link_id) = &msg.cl_ord_link_id {
591 report = report.with_order_list_id(OrderListId::new(cl_ord_link_id));
592 }
593
594 if let Some(price) = msg.price {
595 report = report.with_price(Price::new(price, instrument.price_precision()));
596 }
597
598 if let Some(avg_px) = msg.avg_px {
599 report = report.with_avg_px(avg_px)?;
600 }
601
602 if let Some(trigger_price) = msg.stop_px {
603 report = report
604 .with_trigger_price(Price::new(trigger_price, instrument.price_precision()))
605 .with_trigger_type(extract_trigger_type(msg.exec_inst.as_ref()));
606 }
607
608 if matches!(
610 order_type,
611 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
612 ) && let Some(peg_offset) = msg.peg_offset_value
613 {
614 let trailing_offset = Decimal::try_from(peg_offset.abs())
615 .unwrap_or_else(|_| Decimal::new(peg_offset.abs() as i64, 0));
616 report = report
617 .with_trailing_offset(trailing_offset)
618 .with_trailing_offset_type(TrailingOffsetType::Price);
619
620 if msg.stop_px.is_none() {
621 report = report.with_trigger_type(extract_trigger_type(msg.exec_inst.as_ref()));
622 }
623 }
624
625 if let Some(exec_insts) = &msg.exec_inst {
626 for exec_inst in exec_insts {
627 match exec_inst {
628 BitmexExecInstruction::ParticipateDoNotInitiate => {
629 report = report.with_post_only(true);
630 }
631 BitmexExecInstruction::ReduceOnly => {
632 report = report.with_reduce_only(true);
633 }
634 _ => {}
635 }
636 }
637 }
638
639 if order_status == OrderStatus::Rejected {
641 if let Some(reason_str) = msg.ord_rej_reason.or(msg.text) {
642 log::debug!(
643 "Order rejected with reason: order_id={:?}, client_order_id={:?}, reason={:?}",
644 venue_order_id,
645 msg.cl_ord_id,
646 reason_str,
647 );
648 report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
649 } else {
650 log::debug!(
651 "Order rejected without reason from BitMEX: order_id={:?}, client_order_id={:?}, ord_status={:?}, ord_rej_reason={:?}, text={:?}",
652 venue_order_id,
653 msg.cl_ord_id,
654 msg.ord_status,
655 msg.ord_rej_reason,
656 msg.text,
657 );
658 }
659 }
660
661 if order_status == OrderStatus::Canceled
664 && let Some(reason_str) = msg.ord_rej_reason.or(msg.text)
665 {
666 report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
667 }
668
669 Ok(report)
670}
671
672pub fn parse_order_update_msg(
676 msg: &BitmexOrderUpdateMsg,
677 instrument: &InstrumentAny,
678 account_id: AccountId,
679) -> Option<OrderUpdated> {
680 let trader_id = TraderId::external();
683 let strategy_id = StrategyId::external();
684 let instrument_id = parse_instrument_id(msg.symbol);
685 let venue_order_id = Some(VenueOrderId::new(msg.order_id.to_string()));
686 let client_order_id = msg
687 .cl_ord_id
688 .as_ref()
689 .map_or_else(ClientOrderId::external, ClientOrderId::new);
690
691 let quantity = match (msg.leaves_qty, msg.cum_qty) {
694 (Some(leaves), Some(cum)) => parse_contracts_quantity((leaves + cum) as u64, instrument),
695 _ => Quantity::zero(instrument.size_precision()),
696 };
697 let price = msg
698 .price
699 .map(|p| Price::new(p, instrument.price_precision()));
700
701 let trigger_price = None;
703 let protection_price = None;
705
706 let event_id = UUID4::new();
707 let ts_event = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
708 let ts_init = get_atomic_clock_realtime().get_time_ns();
709
710 Some(OrderUpdated::new(
711 trader_id,
712 strategy_id,
713 instrument_id,
714 client_order_id,
715 quantity,
716 event_id,
717 ts_event,
718 ts_init,
719 false, venue_order_id,
721 Some(account_id),
722 price,
723 trigger_price,
724 protection_price,
725 ))
726}
727
728pub fn parse_execution_msg(
747 msg: BitmexExecutionMsg,
748 instrument: &InstrumentAny,
749) -> Option<FillReport> {
750 let exec_type = msg.exec_type?;
751
752 match exec_type {
753 BitmexExecType::Trade | BitmexExecType::Liquidation => {}
755 BitmexExecType::Bankruptcy => {
756 log::warn!(
757 "Processing bankruptcy execution as fill: exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
758 msg.order_id,
759 msg.symbol,
760 );
761 }
762
763 BitmexExecType::Settlement => {
765 log::debug!(
766 "Settlement execution skipped (not a fill): applies quanto conversion/PnL transfer on contract settlement: exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
767 msg.order_id,
768 msg.symbol,
769 );
770 return None;
771 }
772 BitmexExecType::TrialFill => {
773 log::warn!(
774 "Trial fill execution received (testnet only), not processed as fill: exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
775 msg.order_id,
776 msg.symbol,
777 );
778 return None;
779 }
780
781 BitmexExecType::Funding => {
783 log::debug!(
784 "Funding execution skipped (not a fill): exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
785 msg.order_id,
786 msg.symbol,
787 );
788 return None;
789 }
790 BitmexExecType::Insurance => {
791 log::debug!(
792 "Insurance execution skipped (not a fill): exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
793 msg.order_id,
794 msg.symbol,
795 );
796 return None;
797 }
798 BitmexExecType::Rebalance => {
799 log::debug!(
800 "Rebalance execution skipped (not a fill): exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
801 msg.order_id,
802 msg.symbol,
803 );
804 return None;
805 }
806
807 BitmexExecType::New
809 | BitmexExecType::Canceled
810 | BitmexExecType::CancelReject
811 | BitmexExecType::Replaced
812 | BitmexExecType::Rejected
813 | BitmexExecType::AmendReject
814 | BitmexExecType::Suspended
815 | BitmexExecType::Released
816 | BitmexExecType::TriggeredOrActivatedBySystem => {
817 log::debug!(
818 "Execution message skipped (order state change, not a fill): exec_type={exec_type:?}, order_id={:?}",
819 msg.order_id,
820 );
821 return None;
822 }
823
824 BitmexExecType::Unknown(ref type_str) => {
825 log::warn!(
826 "Unknown execution type received, skipping: exec_type={type_str}, order_id={:?}, symbol={:?}",
827 msg.order_id,
828 msg.symbol,
829 );
830 return None;
831 }
832 }
833
834 let account_id = AccountId::new(format!("BITMEX-{}", msg.account?));
835 let instrument_id = parse_instrument_id(msg.symbol?);
836 let venue_order_id = VenueOrderId::new(msg.order_id?.to_string());
837 let trade_id = TradeId::new(msg.trd_match_id?.to_string());
838 let order_side: OrderSide = msg.side.map_or(OrderSide::NoOrderSide, |s| {
839 let side: BitmexSide = s.into();
840 side.into()
841 });
842 let last_qty = parse_signed_contracts_quantity(msg.last_qty?, instrument);
843 let last_px = Price::new(msg.last_px?, instrument.price_precision());
844 let settlement_currency_str = msg.settl_currency.unwrap_or(Ustr::from("XBT"));
845 let mapped_currency = map_bitmex_currency(settlement_currency_str.as_str());
846 let currency = get_currency(&mapped_currency);
847 let commission = Money::new(msg.commission.unwrap_or(0.0), currency);
848 let liquidity_side = parse_liquidity_side(&msg.last_liquidity_ind);
849 let client_order_id = msg.cl_ord_id.map(ClientOrderId::new);
850 let venue_position_id = None; let ts_event = parse_optional_datetime_to_unix_nanos(&msg.transact_time, "transact_time");
852 let ts_init = get_atomic_clock_realtime().get_time_ns();
853
854 Some(FillReport::new(
855 account_id,
856 instrument_id,
857 venue_order_id,
858 trade_id,
859 order_side,
860 last_qty,
861 last_px,
862 commission,
863 liquidity_side,
864 client_order_id,
865 venue_position_id,
866 ts_event,
867 ts_init,
868 None,
869 ))
870}
871
872#[must_use]
878pub fn parse_position_msg(
879 msg: BitmexPositionMsg,
880 instrument: &InstrumentAny,
881) -> PositionStatusReport {
882 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
883 let instrument_id = parse_instrument_id(msg.symbol);
884 let position_side = parse_position_side(msg.current_qty).as_specified();
885 let quantity = parse_signed_contracts_quantity(msg.current_qty.unwrap_or(0), instrument);
886 let venue_position_id = None; let avg_px_open = msg
888 .avg_entry_price
889 .and_then(|p| Decimal::from_str(&p.to_string()).ok());
890 let ts_last = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
891 let ts_init = get_atomic_clock_realtime().get_time_ns();
892
893 PositionStatusReport::new(
894 account_id,
895 instrument_id,
896 position_side,
897 quantity,
898 ts_last,
899 ts_init,
900 None, venue_position_id, avg_px_open, )
904}
905
906#[must_use]
919pub fn parse_instrument_msg(
920 msg: BitmexInstrumentMsg,
921 instruments_cache: &AHashMap<Ustr, InstrumentAny>,
922 ts_init: UnixNanos,
923) -> Vec<Data> {
924 let mut updates = Vec::new();
925 let is_index = is_index_symbol(&msg.symbol);
926
927 let effective_index_price = if is_index {
930 msg.last_price
931 } else {
932 msg.index_price
933 };
934
935 if msg.mark_price.is_none() && effective_index_price.is_none() {
939 return updates;
940 }
941
942 let instrument_id = InstrumentId::new(Symbol::from_ustr_unchecked(msg.symbol), *BITMEX_VENUE);
943 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
944
945 let price_precision = match instruments_cache.get(&Ustr::from(&msg.symbol)) {
947 Some(instrument) => instrument.price_precision(),
948 None => {
949 if is_index {
953 log::trace!(
954 "Index instrument {} not in cache, skipping update",
955 msg.symbol
956 );
957 } else {
958 log::debug!("Instrument {} not in cache, skipping update", msg.symbol);
959 }
960 return updates;
961 }
962 };
963
964 if let Some(mark_price) = msg.mark_price {
967 let price = Price::new(mark_price, price_precision);
968 updates.push(Data::MarkPriceUpdate(MarkPriceUpdate::new(
969 instrument_id,
970 price,
971 ts_event,
972 ts_init,
973 )));
974 }
975
976 if let Some(index_price) = effective_index_price {
978 let price = Price::new(index_price, price_precision);
979 updates.push(Data::IndexPriceUpdate(IndexPriceUpdate::new(
980 instrument_id,
981 price,
982 ts_event,
983 ts_init,
984 )));
985 }
986
987 updates
988}
989
990#[must_use]
996pub fn parse_funding_msg(msg: BitmexFundingMsg, ts_init: UnixNanos) -> FundingRateUpdate {
997 let instrument_id = InstrumentId::from(format!("{}.BITMEX", msg.symbol));
998 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
999
1000 FundingRateUpdate::new(
1001 instrument_id,
1002 msg.funding_rate,
1003 None, ts_event,
1005 ts_init,
1006 )
1007}
1008
1009#[must_use]
1018pub fn parse_wallet_msg(msg: BitmexWalletMsg, ts_init: UnixNanos) -> AccountState {
1019 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
1020
1021 let currency_str = map_bitmex_currency(msg.currency.as_str());
1023 let currency = get_currency(¤cy_str);
1024
1025 let divisor = if msg.currency == "XBt" {
1027 100_000_000.0 } else if msg.currency == "USDt" || msg.currency == "LAMp" {
1029 1_000_000.0 } else {
1031 1.0
1032 };
1033 let amount = msg.amount.unwrap_or(0) as f64 / divisor;
1034
1035 let total = Money::new(amount, currency);
1036 let locked = Money::new(0.0, currency); let free = total - locked;
1038
1039 let balance = AccountBalance::new_checked(total, locked, free)
1040 .expect("Balance calculation should be valid");
1041
1042 AccountState::new(
1043 account_id,
1044 AccountType::Margin,
1045 vec![balance],
1046 vec![], true, UUID4::new(),
1049 ts_init,
1050 ts_init,
1051 None,
1052 )
1053}
1054
1055#[must_use]
1059pub fn parse_margin_msg(msg: BitmexMarginMsg, instrument_id: InstrumentId) -> MarginBalance {
1060 let currency_str = map_bitmex_currency(msg.currency.as_str());
1062 let currency = get_currency(¤cy_str);
1063
1064 let divisor = if msg.currency == "XBt" {
1066 100_000_000.0 } else if msg.currency == "USDt" || msg.currency == "LAMp" {
1068 1_000_000.0 } else {
1070 1.0
1071 };
1072
1073 let initial = (msg.init_margin.unwrap_or(0) as f64 / divisor).max(0.0);
1074 let maintenance = (msg.maint_margin.unwrap_or(0) as f64 / divisor).max(0.0);
1075 let _unrealized = msg.unrealised_pnl.unwrap_or(0) as f64 / divisor;
1076
1077 MarginBalance::new(
1078 Money::new(initial, currency),
1079 Money::new(maintenance, currency),
1080 instrument_id,
1081 )
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086 use chrono::{DateTime, Utc};
1087 use nautilus_model::{
1088 enums::{AggressorSide, BookAction, LiquiditySide, PositionSide},
1089 identifiers::Symbol,
1090 instruments::crypto_perpetual::CryptoPerpetual,
1091 };
1092 use rstest::rstest;
1093 use ustr::Ustr;
1094
1095 use super::*;
1096 use crate::common::{
1097 enums::{BitmexExecType, BitmexOrderStatus},
1098 testing::load_test_json,
1099 };
1100
1101 fn create_test_perpetual_instrument_with_precisions(
1103 price_precision: u8,
1104 size_precision: u8,
1105 ) -> InstrumentAny {
1106 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
1107 InstrumentId::from("XBTUSD.BITMEX"),
1108 Symbol::new("XBTUSD"),
1109 Currency::BTC(),
1110 Currency::USD(),
1111 Currency::BTC(),
1112 true, price_precision,
1114 size_precision,
1115 Price::new(0.5, price_precision),
1116 Quantity::new(1.0, size_precision),
1117 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
1130 UnixNanos::default(),
1131 ))
1132 }
1133
1134 fn create_test_perpetual_instrument() -> InstrumentAny {
1135 create_test_perpetual_instrument_with_precisions(1, 0)
1136 }
1137
1138 #[rstest]
1139 fn test_orderbook_l2_message() {
1140 let json_data = load_test_json("ws_orderbook_l2.json");
1141
1142 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1143 let msg: BitmexOrderBookMsg = serde_json::from_str(&json_data).unwrap();
1144
1145 let instrument = create_test_perpetual_instrument();
1147
1148 let delta = parse_book_msg(
1150 &msg,
1151 &BitmexAction::Insert,
1152 &instrument,
1153 instrument.id(),
1154 instrument.price_precision(),
1155 UnixNanos::from(3),
1156 );
1157 assert_eq!(delta.instrument_id, instrument_id);
1158 assert_eq!(delta.order.price, Price::from("98459.9"));
1159 assert_eq!(delta.order.size, Quantity::from(33000));
1160 assert_eq!(delta.order.side, OrderSide::Sell);
1161 assert_eq!(delta.order.order_id, 62400580205);
1162 assert_eq!(delta.action, BookAction::Add);
1163 assert_eq!(delta.flags, 0);
1164 assert_eq!(delta.sequence, 0);
1165 assert_eq!(delta.ts_event, 1732436782356000000); assert_eq!(delta.ts_init, 3);
1167
1168 let delta = parse_book_msg(
1170 &msg,
1171 &BitmexAction::Partial,
1172 &instrument,
1173 instrument.id(),
1174 instrument.price_precision(),
1175 UnixNanos::from(3),
1176 );
1177 assert_eq!(delta.flags, RecordFlag::F_SNAPSHOT as u8);
1178 assert_eq!(delta.action, BookAction::Add);
1179
1180 let delta = parse_book_msg(
1182 &msg,
1183 &BitmexAction::Update,
1184 &instrument,
1185 instrument.id(),
1186 instrument.price_precision(),
1187 UnixNanos::from(3),
1188 );
1189 assert_eq!(delta.flags, 0);
1190 assert_eq!(delta.action, BookAction::Update);
1191 }
1192
1193 #[rstest]
1194 fn test_orderbook10_message() {
1195 let json_data = load_test_json("ws_orderbook_10.json");
1196 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1197 let msg: BitmexOrderBook10Msg = serde_json::from_str(&json_data).unwrap();
1198 let instrument = create_test_perpetual_instrument();
1199 let depth10 = parse_book10_msg(
1200 &msg,
1201 &instrument,
1202 instrument.id(),
1203 instrument.price_precision(),
1204 UnixNanos::from(3),
1205 )
1206 .unwrap();
1207
1208 assert_eq!(depth10.instrument_id, instrument_id);
1209
1210 assert_eq!(depth10.bids[0].price, Price::from("98490.3"));
1212 assert_eq!(depth10.bids[0].size, Quantity::from(22400));
1213 assert_eq!(depth10.bids[0].side, OrderSide::Buy);
1214
1215 assert_eq!(depth10.asks[0].price, Price::from("98490.4"));
1217 assert_eq!(depth10.asks[0].size, Quantity::from(17600));
1218 assert_eq!(depth10.asks[0].side, OrderSide::Sell);
1219
1220 assert_eq!(depth10.bid_counts, [1; DEPTH10_LEN]);
1222 assert_eq!(depth10.ask_counts, [1; DEPTH10_LEN]);
1223
1224 assert_eq!(depth10.sequence, 0);
1226 assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
1227 assert_eq!(depth10.ts_event, 1732436353513000000); assert_eq!(depth10.ts_init, 3);
1229 }
1230
1231 #[rstest]
1232 fn test_quote_message() {
1233 let json_data = load_test_json("ws_quote.json");
1234
1235 let instrument_id = InstrumentId::from("BCHUSDT.BITMEX");
1236 let last_quote = QuoteTick::new(
1237 instrument_id,
1238 Price::new(487.50, 2),
1239 Price::new(488.20, 2),
1240 Quantity::from(100_000),
1241 Quantity::from(100_000),
1242 UnixNanos::from(1),
1243 UnixNanos::from(2),
1244 );
1245 let msg: BitmexQuoteMsg = serde_json::from_str(&json_data).unwrap();
1246 let instrument = create_test_perpetual_instrument_with_precisions(2, 0);
1247 let quote = parse_quote_msg(
1248 &msg,
1249 &last_quote,
1250 &instrument,
1251 instrument_id,
1252 instrument.price_precision(),
1253 UnixNanos::from(3),
1254 );
1255
1256 assert_eq!(quote.instrument_id, instrument_id);
1257 assert_eq!(quote.bid_price, Price::from("487.55"));
1258 assert_eq!(quote.ask_price, Price::from("488.25"));
1259 assert_eq!(quote.bid_size, Quantity::from(103_000));
1260 assert_eq!(quote.ask_size, Quantity::from(50_000));
1261 assert_eq!(quote.ts_event, 1732315465085000000);
1262 assert_eq!(quote.ts_init, 3);
1263 }
1264
1265 #[rstest]
1266 fn test_trade_message() {
1267 let json_data = load_test_json("ws_trade.json");
1268
1269 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1270 let msg: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
1271 let instrument = create_test_perpetual_instrument();
1272 let trade = parse_trade_msg(
1273 &msg,
1274 &instrument,
1275 instrument.id(),
1276 instrument.price_precision(),
1277 UnixNanos::from(3),
1278 );
1279
1280 assert_eq!(trade.instrument_id, instrument_id);
1281 assert_eq!(trade.price, Price::from("98570.9"));
1282 assert_eq!(trade.size, Quantity::from(100));
1283 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1284 assert_eq!(
1285 trade.trade_id.to_string(),
1286 "00000000-006d-1000-0000-000e8737d536"
1287 );
1288 assert_eq!(trade.ts_event, 1732436138704000000); assert_eq!(trade.ts_init, 3);
1290 }
1291
1292 #[rstest]
1293 fn test_trade_bin_message() {
1294 let json_data = load_test_json("ws_trade_bin_1m.json");
1295
1296 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1297 let topic = BitmexWsTopic::TradeBin1m;
1298
1299 let msg: BitmexTradeBinMsg = serde_json::from_str(&json_data).unwrap();
1300 let instrument = create_test_perpetual_instrument();
1301 let bar = parse_trade_bin_msg(
1302 &msg,
1303 &topic,
1304 &instrument,
1305 instrument.id(),
1306 instrument.price_precision(),
1307 UnixNanos::from(3),
1308 );
1309
1310 assert_eq!(bar.instrument_id(), instrument_id);
1311 assert_eq!(
1312 bar.bar_type.spec(),
1313 BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
1314 );
1315 assert_eq!(bar.open, Price::from("97550.0"));
1316 assert_eq!(bar.high, Price::from("97584.4"));
1317 assert_eq!(bar.low, Price::from("97550.0"));
1318 assert_eq!(bar.close, Price::from("97570.1"));
1319 assert_eq!(bar.volume, Quantity::from(84_000));
1320 assert_eq!(bar.ts_event, 1732392420000000000); assert_eq!(bar.ts_init, 3);
1322 }
1323
1324 #[rstest]
1325 fn test_trade_bin_message_extreme_adjustment() {
1326 let topic = BitmexWsTopic::TradeBin1m;
1327 let instrument = create_test_perpetual_instrument();
1328
1329 let msg = BitmexTradeBinMsg {
1330 timestamp: DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
1331 .unwrap()
1332 .with_timezone(&Utc),
1333 symbol: Ustr::from("XBTUSD"),
1334 open: 50_000.0,
1335 high: 49_990.0,
1336 low: 50_010.0,
1337 close: 50_005.0,
1338 trades: 10,
1339 volume: 1_000,
1340 vwap: Some(0.0),
1341 last_size: Some(0),
1342 turnover: 0,
1343 home_notional: 0.0,
1344 foreign_notional: 0.0,
1345 pool: None,
1346 };
1347
1348 let bar = parse_trade_bin_msg(
1349 &msg,
1350 &topic,
1351 &instrument,
1352 instrument.id(),
1353 instrument.price_precision(),
1354 UnixNanos::from(3),
1355 );
1356
1357 assert_eq!(bar.high, Price::from("50010.0"));
1358 assert_eq!(bar.low, Price::from("49990.0"));
1359 assert_eq!(bar.open, Price::from("50000.0"));
1360 assert_eq!(bar.close, Price::from("50005.0"));
1361 assert_eq!(bar.volume, Quantity::from(1_000));
1362 }
1363
1364 #[rstest]
1365 fn test_parse_order_msg() {
1366 let json_data = load_test_json("ws_order.json");
1367 let msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1368 let cache = DashMap::new();
1369 let instrument = create_test_perpetual_instrument();
1370 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1371
1372 assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1373 assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1374 assert_eq!(
1375 report.venue_order_id.to_string(),
1376 "550e8400-e29b-41d4-a716-446655440001"
1377 );
1378 assert_eq!(
1379 report.client_order_id.unwrap().to_string(),
1380 "mm_bitmex_1a/oemUeQ4CAJZgP3fjHsA"
1381 );
1382 assert_eq!(report.order_side, OrderSide::Buy);
1383 assert_eq!(report.order_type, OrderType::Limit);
1384 assert_eq!(report.time_in_force, TimeInForce::Gtc);
1385 assert_eq!(report.order_status, OrderStatus::Accepted);
1386 assert_eq!(report.quantity, Quantity::from(100));
1387 assert_eq!(report.filled_qty, Quantity::from(0));
1388 assert_eq!(report.price.unwrap(), Price::from("98000.0"));
1389 assert_eq!(report.ts_accepted, 1732530600000000000); }
1391
1392 #[rstest]
1393 fn test_parse_order_msg_infers_type_when_missing() {
1394 let json_data = load_test_json("ws_order.json");
1395 let mut msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1396 msg.ord_type = None;
1397 msg.cl_ord_id = None;
1398 msg.price = Some(98_000.0);
1399 msg.stop_px = None;
1400
1401 let cache = DashMap::new();
1402 let instrument = create_test_perpetual_instrument();
1403
1404 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1405
1406 assert_eq!(report.order_type, OrderType::Limit);
1407 }
1408
1409 #[rstest]
1410 fn test_parse_order_msg_rejected_with_reason() {
1411 let mut msg: BitmexOrderMsg =
1412 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1413 msg.ord_status = BitmexOrderStatus::Rejected;
1414 msg.ord_rej_reason = Some(Ustr::from("Insufficient available balance"));
1415 msg.text = None;
1416 msg.cum_qty = 0;
1417
1418 let cache = DashMap::new();
1419 let instrument = create_test_perpetual_instrument();
1420 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1421
1422 assert_eq!(report.order_status, OrderStatus::Rejected);
1423 assert_eq!(
1424 report.cancel_reason,
1425 Some("Insufficient available balance".to_string())
1426 );
1427 }
1428
1429 #[rstest]
1430 fn test_parse_order_msg_rejected_with_text_fallback() {
1431 let mut msg: BitmexOrderMsg =
1432 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1433 msg.ord_status = BitmexOrderStatus::Rejected;
1434 msg.ord_rej_reason = None;
1435 msg.text = Some(Ustr::from("Order would execute immediately"));
1436 msg.cum_qty = 0;
1437
1438 let cache = DashMap::new();
1439 let instrument = create_test_perpetual_instrument();
1440 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1441
1442 assert_eq!(report.order_status, OrderStatus::Rejected);
1443 assert_eq!(
1444 report.cancel_reason,
1445 Some("Order would execute immediately".to_string())
1446 );
1447 }
1448
1449 #[rstest]
1450 fn test_parse_order_msg_rejected_without_reason() {
1451 let mut msg: BitmexOrderMsg =
1452 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1453 msg.ord_status = BitmexOrderStatus::Rejected;
1454 msg.ord_rej_reason = None;
1455 msg.text = None;
1456 msg.cum_qty = 0;
1457
1458 let cache = DashMap::new();
1459 let instrument = create_test_perpetual_instrument();
1460 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1461
1462 assert_eq!(report.order_status, OrderStatus::Rejected);
1463 assert_eq!(report.cancel_reason, None);
1464 }
1465
1466 #[rstest]
1467 fn test_parse_execution_msg() {
1468 let json_data = load_test_json("ws_execution.json");
1469 let msg: BitmexExecutionMsg = serde_json::from_str(&json_data).unwrap();
1470 let instrument = create_test_perpetual_instrument();
1471 let fill = parse_execution_msg(msg, &instrument).unwrap();
1472
1473 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1474 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1475 assert_eq!(
1476 fill.venue_order_id.to_string(),
1477 "550e8400-e29b-41d4-a716-446655440002"
1478 );
1479 assert_eq!(
1480 fill.trade_id.to_string(),
1481 "00000000-006d-1000-0000-000e8737d540"
1482 );
1483 assert_eq!(
1484 fill.client_order_id.unwrap().to_string(),
1485 "mm_bitmex_2b/oemUeQ4CAJZgP3fjHsB"
1486 );
1487 assert_eq!(fill.order_side, OrderSide::Sell);
1488 assert_eq!(fill.last_qty, Quantity::from(100));
1489 assert_eq!(fill.last_px, Price::from("98950.0"));
1490 assert_eq!(fill.liquidity_side, LiquiditySide::Maker);
1491 assert_eq!(fill.commission, Money::new(0.00075, Currency::from("XBT")));
1492 assert_eq!(fill.commission.currency.code.to_string(), "XBT");
1493 assert_eq!(fill.ts_event, 1732530900789000000); }
1495
1496 #[rstest]
1497 fn test_parse_execution_msg_non_trade() {
1498 let mut msg: BitmexExecutionMsg =
1500 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1501 msg.exec_type = Some(BitmexExecType::Settlement);
1502
1503 let instrument = create_test_perpetual_instrument();
1504 let result = parse_execution_msg(msg, &instrument);
1505 assert!(result.is_none());
1506 }
1507
1508 #[rstest]
1509 fn test_parse_cancel_reject_execution() {
1510 let json = load_test_json("ws_execution_cancel_reject.json");
1512
1513 let msg: BitmexExecutionMsg = serde_json::from_str(&json).unwrap();
1514 assert_eq!(msg.exec_type, Some(BitmexExecType::CancelReject));
1515 assert_eq!(msg.ord_status, Some(BitmexOrderStatus::Rejected));
1516 assert_eq!(msg.symbol, None);
1517
1518 let instrument = create_test_perpetual_instrument();
1520 let result = parse_execution_msg(msg, &instrument);
1521 assert!(result.is_none());
1522 }
1523
1524 #[rstest]
1525 fn test_parse_execution_msg_liquidation() {
1526 let mut msg: BitmexExecutionMsg =
1528 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1529 msg.exec_type = Some(BitmexExecType::Liquidation);
1530
1531 let instrument = create_test_perpetual_instrument();
1532 let fill = parse_execution_msg(msg, &instrument).unwrap();
1533
1534 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1535 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1536 assert_eq!(fill.order_side, OrderSide::Sell);
1537 assert_eq!(fill.last_qty, Quantity::from(100));
1538 assert_eq!(fill.last_px, Price::from("98950.0"));
1539 }
1540
1541 #[rstest]
1542 fn test_parse_execution_msg_bankruptcy() {
1543 let mut msg: BitmexExecutionMsg =
1544 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1545 msg.exec_type = Some(BitmexExecType::Bankruptcy);
1546
1547 let instrument = create_test_perpetual_instrument();
1548 let fill = parse_execution_msg(msg, &instrument).unwrap();
1549
1550 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1551 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1552 assert_eq!(fill.order_side, OrderSide::Sell);
1553 assert_eq!(fill.last_qty, Quantity::from(100));
1554 }
1555
1556 #[rstest]
1557 fn test_parse_execution_msg_settlement() {
1558 let mut msg: BitmexExecutionMsg =
1559 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1560 msg.exec_type = Some(BitmexExecType::Settlement);
1561
1562 let instrument = create_test_perpetual_instrument();
1563 let result = parse_execution_msg(msg, &instrument);
1564 assert!(result.is_none());
1565 }
1566
1567 #[rstest]
1568 fn test_parse_execution_msg_trial_fill() {
1569 let mut msg: BitmexExecutionMsg =
1570 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1571 msg.exec_type = Some(BitmexExecType::TrialFill);
1572
1573 let instrument = create_test_perpetual_instrument();
1574 let result = parse_execution_msg(msg, &instrument);
1575 assert!(result.is_none());
1576 }
1577
1578 #[rstest]
1579 fn test_parse_execution_msg_funding() {
1580 let mut msg: BitmexExecutionMsg =
1581 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1582 msg.exec_type = Some(BitmexExecType::Funding);
1583
1584 let instrument = create_test_perpetual_instrument();
1585 let result = parse_execution_msg(msg, &instrument);
1586 assert!(result.is_none());
1587 }
1588
1589 #[rstest]
1590 fn test_parse_execution_msg_insurance() {
1591 let mut msg: BitmexExecutionMsg =
1592 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1593 msg.exec_type = Some(BitmexExecType::Insurance);
1594
1595 let instrument = create_test_perpetual_instrument();
1596 let result = parse_execution_msg(msg, &instrument);
1597 assert!(result.is_none());
1598 }
1599
1600 #[rstest]
1601 fn test_parse_execution_msg_rebalance() {
1602 let mut msg: BitmexExecutionMsg =
1603 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1604 msg.exec_type = Some(BitmexExecType::Rebalance);
1605
1606 let instrument = create_test_perpetual_instrument();
1607 let result = parse_execution_msg(msg, &instrument);
1608 assert!(result.is_none());
1609 }
1610
1611 #[rstest]
1612 fn test_parse_execution_msg_order_state_changes() {
1613 let instrument = create_test_perpetual_instrument();
1614
1615 let order_state_types = vec![
1616 BitmexExecType::New,
1617 BitmexExecType::Canceled,
1618 BitmexExecType::CancelReject,
1619 BitmexExecType::Replaced,
1620 BitmexExecType::Rejected,
1621 BitmexExecType::AmendReject,
1622 BitmexExecType::Suspended,
1623 BitmexExecType::Released,
1624 BitmexExecType::TriggeredOrActivatedBySystem,
1625 ];
1626
1627 for exec_type in order_state_types {
1628 let mut msg: BitmexExecutionMsg =
1629 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1630 msg.exec_type = Some(exec_type.clone());
1631
1632 let result = parse_execution_msg(msg, &instrument);
1633 assert!(
1634 result.is_none(),
1635 "Expected None for exec_type {exec_type:?}"
1636 );
1637 }
1638 }
1639
1640 #[rstest]
1641 fn test_parse_position_msg() {
1642 let json_data = load_test_json("ws_position.json");
1643 let msg: BitmexPositionMsg = serde_json::from_str(&json_data).unwrap();
1644 let instrument = create_test_perpetual_instrument();
1645 let report = parse_position_msg(msg, &instrument);
1646
1647 assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1648 assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1649 assert_eq!(report.position_side.as_position_side(), PositionSide::Long);
1650 assert_eq!(report.quantity, Quantity::from(1000));
1651 assert!(report.venue_position_id.is_none());
1652 assert_eq!(report.ts_last, 1732530900789000000); }
1654
1655 #[rstest]
1656 fn test_parse_position_msg_short() {
1657 let mut msg: BitmexPositionMsg =
1658 serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1659 msg.current_qty = Some(-500);
1660
1661 let instrument = create_test_perpetual_instrument();
1662 let report = parse_position_msg(msg, &instrument);
1663 assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
1664 assert_eq!(report.quantity, Quantity::from(500));
1665 }
1666
1667 #[rstest]
1668 fn test_parse_position_msg_flat() {
1669 let mut msg: BitmexPositionMsg =
1670 serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1671 msg.current_qty = Some(0);
1672
1673 let instrument = create_test_perpetual_instrument();
1674 let report = parse_position_msg(msg, &instrument);
1675 assert_eq!(report.position_side.as_position_side(), PositionSide::Flat);
1676 assert_eq!(report.quantity, Quantity::from(0));
1677 }
1678
1679 #[rstest]
1680 fn test_parse_wallet_msg() {
1681 let json_data = load_test_json("ws_wallet.json");
1682 let msg: BitmexWalletMsg = serde_json::from_str(&json_data).unwrap();
1683 let ts_init = UnixNanos::from(1);
1684 let account_state = parse_wallet_msg(msg, ts_init);
1685
1686 assert_eq!(account_state.account_id.to_string(), "BITMEX-1234567");
1687 assert!(!account_state.balances.is_empty());
1688 let balance = &account_state.balances[0];
1689 assert_eq!(balance.currency.code.to_string(), "XBT");
1690 assert!((balance.total.as_f64() - 1.0000518).abs() < 1e-7);
1692 }
1693
1694 #[rstest]
1695 fn test_parse_wallet_msg_no_amount() {
1696 let mut msg: BitmexWalletMsg =
1697 serde_json::from_str(&load_test_json("ws_wallet.json")).unwrap();
1698 msg.amount = None;
1699
1700 let ts_init = UnixNanos::from(1);
1701 let account_state = parse_wallet_msg(msg, ts_init);
1702 let balance = &account_state.balances[0];
1703 assert_eq!(balance.total.as_f64(), 0.0);
1704 }
1705
1706 #[rstest]
1707 fn test_parse_margin_msg() {
1708 let json_data = load_test_json("ws_margin.json");
1709 let msg: BitmexMarginMsg = serde_json::from_str(&json_data).unwrap();
1710 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1711 let margin_balance = parse_margin_msg(msg, instrument_id);
1712
1713 assert_eq!(margin_balance.currency.code.to_string(), "XBT");
1714 assert_eq!(margin_balance.instrument_id, instrument_id);
1715 assert_eq!(margin_balance.initial.as_f64(), 0.0);
1718 assert!((margin_balance.maintenance.as_f64() - 0.00015949).abs() < 1e-8);
1720 }
1721
1722 #[rstest]
1723 fn test_parse_margin_msg_no_available() {
1724 let mut msg: BitmexMarginMsg =
1725 serde_json::from_str(&load_test_json("ws_margin.json")).unwrap();
1726 msg.available_margin = None;
1727
1728 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1729 let margin_balance = parse_margin_msg(msg, instrument_id);
1730 assert!(margin_balance.initial.as_f64() >= 0.0);
1732 assert!(margin_balance.maintenance.as_f64() >= 0.0);
1733 }
1734
1735 #[rstest]
1736 fn test_parse_instrument_msg_both_prices() {
1737 let json_data = load_test_json("ws_instrument.json");
1738 let msg: BitmexInstrumentMsg = serde_json::from_str(&json_data).unwrap();
1739
1740 let mut instruments_cache = AHashMap::new();
1742 let test_instrument = create_test_perpetual_instrument();
1743 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1744
1745 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1746
1747 assert_eq!(updates.len(), 2);
1749
1750 match &updates[0] {
1752 Data::MarkPriceUpdate(update) => {
1753 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1754 assert_eq!(update.value.as_f64(), 95125.7);
1755 }
1756 _ => panic!("Expected MarkPriceUpdate at index 0"),
1757 }
1758
1759 match &updates[1] {
1761 Data::IndexPriceUpdate(update) => {
1762 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1763 assert_eq!(update.value.as_f64(), 95124.3);
1764 }
1765 _ => panic!("Expected IndexPriceUpdate at index 1"),
1766 }
1767 }
1768
1769 #[rstest]
1770 fn test_parse_instrument_msg_mark_price_only() {
1771 let mut msg: BitmexInstrumentMsg =
1772 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1773 msg.index_price = None;
1774
1775 let mut instruments_cache = AHashMap::new();
1777 let test_instrument = create_test_perpetual_instrument();
1778 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1779
1780 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1781
1782 assert_eq!(updates.len(), 1);
1783 match &updates[0] {
1784 Data::MarkPriceUpdate(update) => {
1785 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1786 assert_eq!(update.value.as_f64(), 95125.7);
1787 }
1788 _ => panic!("Expected MarkPriceUpdate"),
1789 }
1790 }
1791
1792 #[rstest]
1793 fn test_parse_instrument_msg_index_price_only() {
1794 let mut msg: BitmexInstrumentMsg =
1795 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1796 msg.mark_price = None;
1797
1798 let mut instruments_cache = AHashMap::new();
1800 let test_instrument = create_test_perpetual_instrument();
1801 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1802
1803 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1804
1805 assert_eq!(updates.len(), 1);
1806 match &updates[0] {
1807 Data::IndexPriceUpdate(update) => {
1808 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1809 assert_eq!(update.value.as_f64(), 95124.3);
1810 }
1811 _ => panic!("Expected IndexPriceUpdate"),
1812 }
1813 }
1814
1815 #[rstest]
1816 fn test_parse_instrument_msg_no_prices() {
1817 let mut msg: BitmexInstrumentMsg =
1818 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1819 msg.mark_price = None;
1820 msg.index_price = None;
1821 msg.last_price = None;
1822
1823 let mut instruments_cache = AHashMap::new();
1825 let test_instrument = create_test_perpetual_instrument();
1826 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1827
1828 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1829 assert_eq!(updates.len(), 0);
1830 }
1831
1832 #[rstest]
1833 fn test_parse_instrument_msg_index_symbol() {
1834 let mut msg: BitmexInstrumentMsg =
1837 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1838 msg.symbol = Ustr::from(".BXBT");
1839 msg.last_price = Some(119163.05);
1840 msg.mark_price = Some(119163.05); msg.index_price = None;
1842
1843 let instrument_id = InstrumentId::from(".BXBT.BITMEX");
1845 let instrument = CryptoPerpetual::new(
1846 instrument_id,
1847 Symbol::from(".BXBT"),
1848 Currency::BTC(),
1849 Currency::USD(),
1850 Currency::USD(),
1851 false, 2, 8, Price::from("0.01"),
1855 Quantity::from("0.00000001"),
1856 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(), UnixNanos::default(), );
1871 let mut instruments_cache = AHashMap::new();
1872 instruments_cache.insert(
1873 Ustr::from(".BXBT"),
1874 InstrumentAny::CryptoPerpetual(instrument),
1875 );
1876
1877 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1878
1879 assert_eq!(updates.len(), 2);
1880
1881 match &updates[0] {
1883 Data::MarkPriceUpdate(update) => {
1884 assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1885 assert_eq!(update.value, Price::from("119163.05"));
1886 }
1887 _ => panic!("Expected MarkPriceUpdate for index symbol"),
1888 }
1889
1890 match &updates[1] {
1892 Data::IndexPriceUpdate(update) => {
1893 assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1894 assert_eq!(update.value, Price::from("119163.05"));
1895 assert_eq!(update.ts_init, UnixNanos::from(1));
1896 }
1897 _ => panic!("Expected IndexPriceUpdate for index symbol"),
1898 }
1899 }
1900
1901 #[rstest]
1902 fn test_parse_funding_msg() {
1903 let json_data = load_test_json("ws_funding_rate.json");
1904 let msg: BitmexFundingMsg = serde_json::from_str(&json_data).unwrap();
1905 let update = parse_funding_msg(msg, UnixNanos::from(1));
1906
1907 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1908 assert_eq!(update.rate.to_string(), "0.0001");
1909 assert!(update.next_funding_ns.is_none());
1910 assert_eq!(update.ts_event, UnixNanos::from(1732507200000000000));
1911 assert_eq!(update.ts_init, UnixNanos::from(1));
1912 }
1913}