1use std::{num::NonZero, str::FromStr};
19
20use ahash::AHashMap;
21use dashmap::DashMap;
22use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime, uuid::UUID4};
23#[cfg(test)]
24use nautilus_model::types::Currency;
25use nautilus_model::{
26 data::{
27 Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
28 MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
29 depth::DEPTH10_LEN,
30 },
31 enums::{
32 AccountType, AggregationSource, BarAggregation, OrderSide, OrderStatus, OrderType,
33 PriceType, RecordFlag, TimeInForce, TriggerType,
34 },
35 events::{OrderUpdated, account::state::AccountState},
36 identifiers::{
37 AccountId, ClientOrderId, InstrumentId, OrderListId, StrategyId, Symbol, TradeId, TraderId,
38 VenueOrderId,
39 },
40 instruments::{Instrument, InstrumentAny},
41 reports::{FillReport, OrderStatusReport, PositionStatusReport},
42 types::{AccountBalance, MarginBalance, Money, Price, Quantity},
43};
44use rust_decimal::{Decimal, prelude::FromPrimitive};
45use ustr::Ustr;
46use uuid::Uuid;
47
48use super::{
49 enums::{BitmexAction, BitmexWsTopic},
50 messages::{
51 BitmexExecutionMsg, BitmexFundingMsg, BitmexInstrumentMsg, BitmexMarginMsg,
52 BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexOrderMsg, BitmexPositionMsg,
53 BitmexQuoteMsg, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg,
54 },
55};
56use crate::{
57 common::{
58 consts::BITMEX_VENUE,
59 enums::{BitmexExecInstruction, BitmexExecType, BitmexSide},
60 parse::{
61 clean_reason, map_bitmex_currency, normalize_trade_bin_prices,
62 normalize_trade_bin_volume, parse_contracts_quantity, parse_fractional_quantity,
63 parse_instrument_id, parse_liquidity_side, parse_optional_datetime_to_unix_nanos,
64 parse_position_side, parse_signed_contracts_quantity,
65 },
66 },
67 http::parse::get_currency,
68 websocket::messages::BitmexOrderUpdateMsg,
69};
70
71const BAR_SPEC_1_MINUTE: BarSpecification = BarSpecification {
72 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
73 aggregation: BarAggregation::Minute,
74 price_type: PriceType::Last,
75};
76const BAR_SPEC_5_MINUTE: BarSpecification = BarSpecification {
77 step: NonZero::new(5).expect("5 is a valid non-zero usize"),
78 aggregation: BarAggregation::Minute,
79 price_type: PriceType::Last,
80};
81const BAR_SPEC_1_HOUR: BarSpecification = BarSpecification {
82 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
83 aggregation: BarAggregation::Hour,
84 price_type: PriceType::Last,
85};
86const BAR_SPEC_1_DAY: BarSpecification = BarSpecification {
87 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
88 aggregation: BarAggregation::Day,
89 price_type: PriceType::Last,
90};
91
92#[inline]
100#[must_use]
101pub fn is_index_symbol(symbol: &Ustr) -> bool {
102 symbol.starts_with('.')
103}
104
105#[must_use]
107pub fn parse_book_msg_vec(
108 data: Vec<BitmexOrderBookMsg>,
109 action: BitmexAction,
110 instruments: &AHashMap<Ustr, InstrumentAny>,
111 ts_init: UnixNanos,
112) -> Vec<Data> {
113 let mut deltas = Vec::with_capacity(data.len());
114
115 for msg in data {
116 if let Some(instrument) = instruments.get(&msg.symbol) {
117 let instrument_id = instrument.id();
118 let price_precision = instrument.price_precision();
119 deltas.push(Data::Delta(parse_book_msg(
120 &msg,
121 &action,
122 instrument,
123 instrument_id,
124 price_precision,
125 ts_init,
126 )));
127 } else {
128 tracing::error!(
129 "Instrument cache miss: book delta dropped for symbol={}",
130 msg.symbol
131 );
132 }
133 }
134 deltas
135}
136
137#[must_use]
139pub fn parse_book10_msg_vec(
140 data: Vec<BitmexOrderBook10Msg>,
141 instruments: &AHashMap<Ustr, InstrumentAny>,
142 ts_init: UnixNanos,
143) -> Vec<Data> {
144 let mut depths = Vec::with_capacity(data.len());
145
146 for msg in data {
147 if let Some(instrument) = instruments.get(&msg.symbol) {
148 let instrument_id = instrument.id();
149 let price_precision = instrument.price_precision();
150 depths.push(Data::Depth10(Box::new(parse_book10_msg(
151 &msg,
152 instrument,
153 instrument_id,
154 price_precision,
155 ts_init,
156 ))));
157 } else {
158 tracing::error!(
159 "Instrument cache miss: depth10 message dropped for symbol={}",
160 msg.symbol
161 );
162 }
163 }
164 depths
165}
166
167#[must_use]
169pub fn parse_trade_msg_vec(
170 data: Vec<BitmexTradeMsg>,
171 instruments: &AHashMap<Ustr, InstrumentAny>,
172 ts_init: UnixNanos,
173) -> Vec<Data> {
174 let mut trades = Vec::with_capacity(data.len());
175
176 for msg in data {
177 if let Some(instrument) = instruments.get(&msg.symbol) {
178 let instrument_id = instrument.id();
179 let price_precision = instrument.price_precision();
180 trades.push(Data::Trade(parse_trade_msg(
181 &msg,
182 instrument,
183 instrument_id,
184 price_precision,
185 ts_init,
186 )));
187 } else {
188 tracing::error!(
189 "Instrument cache miss: trade message dropped for symbol={}",
190 msg.symbol
191 );
192 }
193 }
194 trades
195}
196
197#[must_use]
199pub fn parse_trade_bin_msg_vec(
200 data: Vec<BitmexTradeBinMsg>,
201 topic: BitmexWsTopic,
202 instruments: &AHashMap<Ustr, InstrumentAny>,
203 ts_init: UnixNanos,
204) -> Vec<Data> {
205 let mut trades = Vec::with_capacity(data.len());
206
207 for msg in data {
208 if let Some(instrument) = instruments.get(&msg.symbol) {
209 let instrument_id = instrument.id();
210 let price_precision = instrument.price_precision();
211 trades.push(Data::Bar(parse_trade_bin_msg(
212 &msg,
213 &topic,
214 instrument,
215 instrument_id,
216 price_precision,
217 ts_init,
218 )));
219 } else {
220 tracing::error!(
221 "Instrument cache miss: trade bin (bar) dropped for symbol={}",
222 msg.symbol
223 );
224 }
225 }
226 trades
227}
228
229#[allow(clippy::too_many_arguments)]
231#[must_use]
232pub fn parse_book_msg(
233 msg: &BitmexOrderBookMsg,
234 action: &BitmexAction,
235 instrument: &InstrumentAny,
236 instrument_id: InstrumentId,
237 price_precision: u8,
238 ts_init: UnixNanos,
239) -> OrderBookDelta {
240 let flags = if action == &BitmexAction::Insert {
241 RecordFlag::F_SNAPSHOT as u8
242 } else {
243 0
244 };
245
246 let action = action.as_book_action();
247 let price = Price::new(msg.price, price_precision);
248 let side = msg.side.as_order_side();
249 let size = parse_contracts_quantity(msg.size.unwrap_or(0), instrument);
250 let order_id = msg.id;
251 let order = BookOrder::new(side, price, size, order_id);
252 let sequence = 0; let ts_event = UnixNanos::from(msg.timestamp);
254
255 OrderBookDelta::new(
256 instrument_id,
257 action,
258 order,
259 flags,
260 sequence,
261 ts_event,
262 ts_init,
263 )
264}
265
266#[allow(clippy::too_many_arguments)]
272#[must_use]
273pub fn parse_book10_msg(
274 msg: &BitmexOrderBook10Msg,
275 instrument: &InstrumentAny,
276 instrument_id: InstrumentId,
277 price_precision: u8,
278 ts_init: UnixNanos,
279) -> OrderBookDepth10 {
280 let mut bids = Vec::with_capacity(DEPTH10_LEN);
281 let mut asks = Vec::with_capacity(DEPTH10_LEN);
282
283 let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
285 let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
286
287 for (i, level) in msg.bids.iter().enumerate() {
288 let bid_order = BookOrder::new(
289 OrderSide::Buy,
290 Price::new(level[0], price_precision),
291 parse_fractional_quantity(level[1], instrument),
292 0,
293 );
294
295 bids.push(bid_order);
296 bid_counts[i] = 1;
297 }
298
299 for (i, level) in msg.asks.iter().enumerate() {
300 let ask_order = BookOrder::new(
301 OrderSide::Sell,
302 Price::new(level[0], price_precision),
303 parse_fractional_quantity(level[1], instrument),
304 0,
305 );
306
307 asks.push(ask_order);
308 ask_counts[i] = 1;
309 }
310
311 let bids: [BookOrder; DEPTH10_LEN] = bids
312 .try_into()
313 .inspect_err(|v: &Vec<BookOrder>| {
314 tracing::error!("Bids length mismatch: expected 10, was {}", v.len());
315 })
316 .expect("BitMEX orderBook10 should always have exactly 10 bid levels");
317 let asks: [BookOrder; DEPTH10_LEN] = asks
318 .try_into()
319 .inspect_err(|v: &Vec<BookOrder>| {
320 tracing::error!("Asks length mismatch: expected 10, was {}", v.len());
321 })
322 .expect("BitMEX orderBook10 should always have exactly 10 ask levels");
323
324 let ts_event = UnixNanos::from(msg.timestamp);
325
326 OrderBookDepth10::new(
327 instrument_id,
328 bids,
329 asks,
330 bid_counts,
331 ask_counts,
332 RecordFlag::F_SNAPSHOT as u8,
333 0, ts_event,
335 ts_init,
336 )
337}
338
339#[must_use]
341pub fn parse_quote_msg(
342 msg: &BitmexQuoteMsg,
343 last_quote: &QuoteTick,
344 instrument: &InstrumentAny,
345 instrument_id: InstrumentId,
346 price_precision: u8,
347 ts_init: UnixNanos,
348) -> QuoteTick {
349 let bid_price = match msg.bid_price {
350 Some(price) => Price::new(price, price_precision),
351 None => last_quote.bid_price,
352 };
353
354 let ask_price = match msg.ask_price {
355 Some(price) => Price::new(price, price_precision),
356 None => last_quote.ask_price,
357 };
358
359 let bid_size = match msg.bid_size {
360 Some(size) => parse_contracts_quantity(size, instrument),
361 None => last_quote.bid_size,
362 };
363
364 let ask_size = match msg.ask_size {
365 Some(size) => parse_contracts_quantity(size, instrument),
366 None => last_quote.ask_size,
367 };
368
369 let ts_event = UnixNanos::from(msg.timestamp);
370
371 QuoteTick::new(
372 instrument_id,
373 bid_price,
374 ask_price,
375 bid_size,
376 ask_size,
377 ts_event,
378 ts_init,
379 )
380}
381
382#[must_use]
384pub fn parse_trade_msg(
385 msg: &BitmexTradeMsg,
386 instrument: &InstrumentAny,
387 instrument_id: InstrumentId,
388 price_precision: u8,
389 ts_init: UnixNanos,
390) -> TradeTick {
391 let price = Price::new(msg.price, price_precision);
392 let size = parse_contracts_quantity(msg.size, instrument);
393 let aggressor_side = msg.side.as_aggressor_side();
394 let trade_id = TradeId::new(
395 msg.trd_match_id
396 .map_or_else(|| Uuid::new_v4().to_string(), |uuid| uuid.to_string()),
397 );
398 let ts_event = UnixNanos::from(msg.timestamp);
399
400 TradeTick::new(
401 instrument_id,
402 price,
403 size,
404 aggressor_side,
405 trade_id,
406 ts_event,
407 ts_init,
408 )
409}
410
411#[must_use]
413pub fn parse_trade_bin_msg(
414 msg: &BitmexTradeBinMsg,
415 topic: &BitmexWsTopic,
416 instrument: &InstrumentAny,
417 instrument_id: InstrumentId,
418 price_precision: u8,
419 ts_init: UnixNanos,
420) -> Bar {
421 let spec = bar_spec_from_topic(topic);
422 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
423
424 let open = Price::new(msg.open, price_precision);
425 let high = Price::new(msg.high, price_precision);
426 let low = Price::new(msg.low, price_precision);
427 let close = Price::new(msg.close, price_precision);
428
429 let (open, high, low, close) =
430 normalize_trade_bin_prices(open, high, low, close, &msg.symbol, Some(&bar_type));
431
432 let volume_contracts = normalize_trade_bin_volume(Some(msg.volume), &msg.symbol);
433 let volume = parse_contracts_quantity(volume_contracts, instrument);
434 let ts_event = UnixNanos::from(msg.timestamp);
435
436 Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
437}
438
439#[must_use]
445pub fn bar_spec_from_topic(topic: &BitmexWsTopic) -> BarSpecification {
446 match topic {
447 BitmexWsTopic::TradeBin1m => BAR_SPEC_1_MINUTE,
448 BitmexWsTopic::TradeBin5m => BAR_SPEC_5_MINUTE,
449 BitmexWsTopic::TradeBin1h => BAR_SPEC_1_HOUR,
450 BitmexWsTopic::TradeBin1d => BAR_SPEC_1_DAY,
451 _ => {
452 tracing::error!(topic = ?topic, "Bar specification not supported");
453 BAR_SPEC_1_MINUTE
454 }
455 }
456}
457
458#[must_use]
464pub fn topic_from_bar_spec(spec: BarSpecification) -> BitmexWsTopic {
465 match spec {
466 BAR_SPEC_1_MINUTE => BitmexWsTopic::TradeBin1m,
467 BAR_SPEC_5_MINUTE => BitmexWsTopic::TradeBin5m,
468 BAR_SPEC_1_HOUR => BitmexWsTopic::TradeBin1h,
469 BAR_SPEC_1_DAY => BitmexWsTopic::TradeBin1d,
470 _ => {
471 tracing::error!(spec = ?spec, "Bar specification not supported");
472 BitmexWsTopic::TradeBin1m
473 }
474 }
475}
476
477fn infer_order_type_from_msg(msg: &BitmexOrderMsg) -> Option<OrderType> {
478 if msg.stop_px.is_some() {
479 if msg.price.is_some() {
480 Some(OrderType::StopLimit)
481 } else {
482 Some(OrderType::StopMarket)
483 }
484 } else if msg.price.is_some() {
485 Some(OrderType::Limit)
486 } else {
487 Some(OrderType::Market)
488 }
489}
490
491pub fn parse_order_msg(
505 msg: &BitmexOrderMsg,
506 instrument: &InstrumentAny,
507 order_type_cache: &DashMap<ClientOrderId, OrderType>,
508) -> anyhow::Result<OrderStatusReport> {
509 let account_id = AccountId::new(format!("BITMEX-{}", msg.account)); let instrument_id = parse_instrument_id(msg.symbol);
511 let venue_order_id = VenueOrderId::new(msg.order_id.to_string());
512 let common_side: BitmexSide = msg.side.into();
513 let order_side: OrderSide = common_side.into();
514
515 let order_type: OrderType = if let Some(ord_type) = msg.ord_type {
516 ord_type.into()
517 } else if let Some(client_order_id) = msg.cl_ord_id {
518 let client_order_id = ClientOrderId::new(client_order_id);
519 if let Some(entry) = order_type_cache.get(&client_order_id) {
520 *entry.value()
521 } else if let Some(inferred) = infer_order_type_from_msg(msg) {
522 order_type_cache.insert(client_order_id, inferred);
523 inferred
524 } else {
525 anyhow::bail!(
526 "Order type not found in cache for client_order_id: {client_order_id} (order missing ord_type field)"
527 );
528 }
529 } else if let Some(inferred) = infer_order_type_from_msg(msg) {
530 inferred
531 } else {
532 anyhow::bail!("Order missing both ord_type and cl_ord_id");
533 };
534
535 let time_in_force: TimeInForce = match msg.time_in_force {
536 Some(tif) => tif.try_into().map_err(|e| anyhow::anyhow!("{e}"))?,
537 None => TimeInForce::Gtc,
538 };
539 let order_status: OrderStatus = msg.ord_status.into();
540 let quantity = parse_signed_contracts_quantity(msg.order_qty, instrument);
541 let filled_qty = parse_signed_contracts_quantity(msg.cum_qty, instrument);
542 let report_id = UUID4::new();
543 let ts_accepted =
544 parse_optional_datetime_to_unix_nanos(&Some(msg.transact_time), "transact_time");
545 let ts_last = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
546 let ts_init = get_atomic_clock_realtime().get_time_ns();
547
548 let mut report = OrderStatusReport::new(
549 account_id,
550 instrument_id,
551 None, venue_order_id,
553 order_side,
554 order_type,
555 time_in_force,
556 order_status,
557 quantity,
558 filled_qty,
559 ts_accepted,
560 ts_last,
561 ts_init,
562 Some(report_id),
563 );
564
565 if let Some(cl_ord_id) = &msg.cl_ord_id {
566 report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
567 }
568
569 if let Some(cl_ord_link_id) = &msg.cl_ord_link_id {
570 report = report.with_order_list_id(OrderListId::new(cl_ord_link_id));
571 }
572
573 if let Some(price) = msg.price {
574 report = report.with_price(Price::new(price, instrument.price_precision()));
575 }
576
577 if let Some(avg_px) = msg.avg_px {
578 report = report.with_avg_px(avg_px)?;
579 }
580
581 if let Some(trigger_price) = msg.stop_px {
582 let trigger_type = if let Some(exec_insts) = &msg.exec_inst {
583 if exec_insts.contains(&BitmexExecInstruction::MarkPrice) {
585 TriggerType::MarkPrice
586 } else if exec_insts.contains(&BitmexExecInstruction::IndexPrice) {
587 TriggerType::IndexPrice
588 } else if exec_insts.contains(&BitmexExecInstruction::LastPrice) {
589 TriggerType::LastPrice
590 } else {
591 TriggerType::Default
592 }
593 } else {
594 TriggerType::Default };
596
597 report = report
598 .with_trigger_price(Price::new(trigger_price, instrument.price_precision()))
599 .with_trigger_type(trigger_type);
600 }
601
602 if let Some(exec_insts) = &msg.exec_inst {
603 for exec_inst in exec_insts {
604 match exec_inst {
605 BitmexExecInstruction::ParticipateDoNotInitiate => {
606 report = report.with_post_only(true);
607 }
608 BitmexExecInstruction::ReduceOnly => {
609 report = report.with_reduce_only(true);
610 }
611 _ => {}
612 }
613 }
614 }
615
616 if order_status == OrderStatus::Rejected {
618 if let Some(reason_str) = msg.ord_rej_reason.or(msg.text) {
619 tracing::debug!(
620 order_id = ?venue_order_id,
621 client_order_id = ?msg.cl_ord_id,
622 reason = ?reason_str,
623 "Order rejected with reason"
624 );
625 report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
626 } else {
627 tracing::debug!(
628 order_id = ?venue_order_id,
629 client_order_id = ?msg.cl_ord_id,
630 ord_status = ?msg.ord_status,
631 ord_rej_reason = ?msg.ord_rej_reason,
632 text = ?msg.text,
633 "Order rejected without reason from BitMEX"
634 );
635 }
636 }
637
638 if order_status == OrderStatus::Canceled
641 && let Some(reason_str) = msg.ord_rej_reason.or(msg.text)
642 {
643 report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
644 }
645
646 Ok(report)
647}
648
649pub fn parse_order_update_msg(
653 msg: &BitmexOrderUpdateMsg,
654 instrument: &InstrumentAny,
655 account_id: AccountId,
656) -> Option<OrderUpdated> {
657 let trader_id = TraderId::default();
660 let strategy_id = StrategyId::default();
661 let instrument_id = parse_instrument_id(msg.symbol);
662 let venue_order_id = Some(VenueOrderId::new(msg.order_id.to_string()));
663 let client_order_id = msg.cl_ord_id.map(ClientOrderId::new).unwrap_or_default();
664 let quantity = Quantity::zero(instrument.size_precision());
665 let price = msg
666 .price
667 .map(|p| Price::new(p, instrument.price_precision()));
668
669 let trigger_price = None;
671 let protection_price = None;
673
674 let event_id = UUID4::new();
675 let ts_event = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
676 let ts_init = get_atomic_clock_realtime().get_time_ns();
677
678 Some(nautilus_model::events::OrderUpdated::new(
679 trader_id,
680 strategy_id,
681 instrument_id,
682 client_order_id,
683 quantity,
684 event_id,
685 ts_event,
686 ts_init,
687 false, venue_order_id,
689 Some(account_id),
690 price,
691 trigger_price,
692 protection_price,
693 ))
694}
695
696pub fn parse_execution_msg(
715 msg: BitmexExecutionMsg,
716 instrument: &InstrumentAny,
717) -> Option<FillReport> {
718 let exec_type = msg.exec_type?;
719
720 match exec_type {
721 BitmexExecType::Trade | BitmexExecType::Liquidation => {}
723 BitmexExecType::Bankruptcy => {
724 tracing::warn!(
725 exec_type = ?exec_type,
726 order_id = ?msg.order_id,
727 symbol = ?msg.symbol,
728 "Processing bankruptcy execution as fill"
729 );
730 }
731
732 BitmexExecType::Settlement => {
734 tracing::debug!(
735 exec_type = ?exec_type,
736 order_id = ?msg.order_id,
737 symbol = ?msg.symbol,
738 "Settlement execution skipped (not a fill): applies quanto conversion/PnL transfer on contract settlement"
739 );
740 return None;
741 }
742 BitmexExecType::TrialFill => {
743 tracing::warn!(
744 exec_type = ?exec_type,
745 order_id = ?msg.order_id,
746 symbol = ?msg.symbol,
747 "Trial fill execution received (testnet only), not processed as fill"
748 );
749 return None;
750 }
751
752 BitmexExecType::Funding => {
754 tracing::debug!(
755 exec_type = ?exec_type,
756 order_id = ?msg.order_id,
757 symbol = ?msg.symbol,
758 "Funding execution skipped (not a fill)"
759 );
760 return None;
761 }
762 BitmexExecType::Insurance => {
763 tracing::debug!(
764 exec_type = ?exec_type,
765 order_id = ?msg.order_id,
766 symbol = ?msg.symbol,
767 "Insurance execution skipped (not a fill)"
768 );
769 return None;
770 }
771 BitmexExecType::Rebalance => {
772 tracing::debug!(
773 exec_type = ?exec_type,
774 order_id = ?msg.order_id,
775 symbol = ?msg.symbol,
776 "Rebalance execution skipped (not a fill)"
777 );
778 return None;
779 }
780
781 BitmexExecType::New
783 | BitmexExecType::Canceled
784 | BitmexExecType::CancelReject
785 | BitmexExecType::Replaced
786 | BitmexExecType::Rejected
787 | BitmexExecType::AmendReject
788 | BitmexExecType::Suspended
789 | BitmexExecType::Released
790 | BitmexExecType::TriggeredOrActivatedBySystem => {
791 tracing::debug!(
792 exec_type = ?exec_type,
793 order_id = ?msg.order_id,
794 "Execution message skipped (order state change, not a fill)"
795 );
796 return None;
797 }
798 }
799
800 let account_id = AccountId::new(format!("BITMEX-{}", msg.account?));
801 let instrument_id = parse_instrument_id(msg.symbol?);
802 let venue_order_id = VenueOrderId::new(msg.order_id?.to_string());
803 let trade_id = TradeId::new(msg.trd_match_id?.to_string());
804 let order_side: OrderSide = msg.side.map_or(OrderSide::NoOrderSide, |s| {
805 let side: BitmexSide = s.into();
806 side.into()
807 });
808 let last_qty = parse_signed_contracts_quantity(msg.last_qty?, instrument);
809 let last_px = Price::new(msg.last_px?, instrument.price_precision());
810 let settlement_currency_str = msg.settl_currency.unwrap_or(Ustr::from("XBT"));
811 let mapped_currency = map_bitmex_currency(settlement_currency_str.as_str());
812 let currency = get_currency(&mapped_currency);
813 let commission = Money::new(msg.commission.unwrap_or(0.0), currency);
814 let liquidity_side = parse_liquidity_side(&msg.last_liquidity_ind);
815 let client_order_id = msg.cl_ord_id.map(ClientOrderId::new);
816 let venue_position_id = None; let ts_event = parse_optional_datetime_to_unix_nanos(&msg.transact_time, "transact_time");
818 let ts_init = get_atomic_clock_realtime().get_time_ns();
819
820 Some(FillReport::new(
821 account_id,
822 instrument_id,
823 venue_order_id,
824 trade_id,
825 order_side,
826 last_qty,
827 last_px,
828 commission,
829 liquidity_side,
830 client_order_id,
831 venue_position_id,
832 ts_event,
833 ts_init,
834 None,
835 ))
836}
837
838#[must_use]
844pub fn parse_position_msg(
845 msg: BitmexPositionMsg,
846 instrument: &InstrumentAny,
847) -> PositionStatusReport {
848 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
849 let instrument_id = parse_instrument_id(msg.symbol);
850 let position_side = parse_position_side(msg.current_qty).as_specified();
851 let quantity = parse_signed_contracts_quantity(msg.current_qty.unwrap_or(0), instrument);
852 let venue_position_id = None; let avg_px_open = msg.avg_entry_price.and_then(Decimal::from_f64);
854 let ts_last = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
855 let ts_init = get_atomic_clock_realtime().get_time_ns();
856
857 PositionStatusReport::new(
858 account_id,
859 instrument_id,
860 position_side,
861 quantity,
862 ts_last,
863 ts_init,
864 None, venue_position_id, avg_px_open, )
868}
869
870#[must_use]
883pub fn parse_instrument_msg(
884 msg: BitmexInstrumentMsg,
885 instruments_cache: &AHashMap<Ustr, InstrumentAny>,
886 ts_init: UnixNanos,
887) -> Vec<Data> {
888 let mut updates = Vec::new();
889 let is_index = is_index_symbol(&msg.symbol);
890
891 let effective_index_price = if is_index {
894 msg.last_price
895 } else {
896 msg.index_price
897 };
898
899 if msg.mark_price.is_none() && effective_index_price.is_none() {
903 return updates;
904 }
905
906 let instrument_id = InstrumentId::new(Symbol::from_ustr_unchecked(msg.symbol), *BITMEX_VENUE);
907 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
908
909 let price_precision = match instruments_cache.get(&Ustr::from(&msg.symbol)) {
911 Some(instrument) => instrument.price_precision(),
912 None => {
913 if is_index {
917 tracing::trace!(
918 "Index instrument {} not in cache, skipping update",
919 msg.symbol
920 );
921 } else {
922 tracing::debug!("Instrument {} not in cache, skipping update", msg.symbol);
923 }
924 return updates;
925 }
926 };
927
928 if let Some(mark_price) = msg.mark_price {
931 let price = Price::new(mark_price, price_precision);
932 updates.push(Data::MarkPriceUpdate(MarkPriceUpdate::new(
933 instrument_id,
934 price,
935 ts_event,
936 ts_init,
937 )));
938 }
939
940 if let Some(index_price) = effective_index_price {
942 let price = Price::new(index_price, price_precision);
943 updates.push(Data::IndexPriceUpdate(IndexPriceUpdate::new(
944 instrument_id,
945 price,
946 ts_event,
947 ts_init,
948 )));
949 }
950
951 updates
952}
953
954pub fn parse_funding_msg(msg: BitmexFundingMsg, ts_init: UnixNanos) -> Option<FundingRateUpdate> {
960 let instrument_id = InstrumentId::from(format!("{}.BITMEX", msg.symbol).as_str());
961 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
962
963 let rate = match Decimal::from_str(&msg.funding_rate.to_string()) {
965 Ok(rate) => rate,
966 Err(e) => {
967 tracing::error!("Failed to parse funding rate: {e}");
968 return None;
969 }
970 };
971
972 Some(FundingRateUpdate::new(
973 instrument_id,
974 rate,
975 None, ts_event,
977 ts_init,
978 ))
979}
980
981#[must_use]
990pub fn parse_wallet_msg(msg: BitmexWalletMsg, ts_init: UnixNanos) -> AccountState {
991 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
992
993 let currency_str = map_bitmex_currency(msg.currency.as_str());
995 let currency = get_currency(¤cy_str);
996
997 let divisor = if msg.currency == "XBt" {
999 100_000_000.0 } else if msg.currency == "USDt" || msg.currency == "LAMp" {
1001 1_000_000.0 } else {
1003 1.0
1004 };
1005 let amount = msg.amount.unwrap_or(0) as f64 / divisor;
1006
1007 let total = Money::new(amount, currency);
1008 let locked = Money::new(0.0, currency); let free = total - locked;
1010
1011 let balance = AccountBalance::new_checked(total, locked, free)
1012 .expect("Balance calculation should be valid");
1013
1014 AccountState::new(
1015 account_id,
1016 AccountType::Margin,
1017 vec![balance],
1018 vec![], true, UUID4::new(),
1021 ts_init,
1022 ts_init,
1023 None,
1024 )
1025}
1026
1027#[must_use]
1031pub fn parse_margin_msg(msg: BitmexMarginMsg, instrument_id: InstrumentId) -> MarginBalance {
1032 let currency_str = map_bitmex_currency(msg.currency.as_str());
1034 let currency = get_currency(¤cy_str);
1035
1036 let divisor = if msg.currency == "XBt" {
1038 100_000_000.0 } else if msg.currency == "USDt" || msg.currency == "LAMp" {
1040 1_000_000.0 } else {
1042 1.0
1043 };
1044
1045 let initial = (msg.init_margin.unwrap_or(0) as f64 / divisor).max(0.0);
1046 let maintenance = (msg.maint_margin.unwrap_or(0) as f64 / divisor).max(0.0);
1047 let _unrealized = msg.unrealised_pnl.unwrap_or(0) as f64 / divisor;
1048
1049 MarginBalance::new(
1050 Money::new(initial, currency),
1051 Money::new(maintenance, currency),
1052 instrument_id,
1053 )
1054}
1055
1056#[cfg(test)]
1061mod tests {
1062 use chrono::{DateTime, Utc};
1063 use nautilus_model::{
1064 enums::{AggressorSide, BookAction, LiquiditySide, PositionSide},
1065 identifiers::Symbol,
1066 instruments::crypto_perpetual::CryptoPerpetual,
1067 };
1068 use rstest::rstest;
1069 use ustr::Ustr;
1070
1071 use super::*;
1072 use crate::common::{
1073 enums::{BitmexExecType, BitmexOrderStatus},
1074 testing::load_test_json,
1075 };
1076
1077 fn create_test_perpetual_instrument_with_precisions(
1079 price_precision: u8,
1080 size_precision: u8,
1081 ) -> InstrumentAny {
1082 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
1083 InstrumentId::from("XBTUSD.BITMEX"),
1084 Symbol::new("XBTUSD"),
1085 Currency::BTC(),
1086 Currency::USD(),
1087 Currency::BTC(),
1088 true, price_precision,
1090 size_precision,
1091 Price::new(0.5, price_precision),
1092 Quantity::new(1.0, size_precision),
1093 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
1106 UnixNanos::default(),
1107 ))
1108 }
1109
1110 fn create_test_perpetual_instrument() -> InstrumentAny {
1111 create_test_perpetual_instrument_with_precisions(1, 0)
1112 }
1113
1114 #[rstest]
1115 fn test_orderbook_l2_message() {
1116 let json_data = load_test_json("ws_orderbook_l2.json");
1117
1118 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1119 let msg: BitmexOrderBookMsg = serde_json::from_str(&json_data).unwrap();
1120
1121 let instrument = create_test_perpetual_instrument();
1123 let delta = parse_book_msg(
1124 &msg,
1125 &BitmexAction::Insert,
1126 &instrument,
1127 instrument.id(),
1128 instrument.price_precision(),
1129 UnixNanos::from(3),
1130 );
1131 assert_eq!(delta.instrument_id, instrument_id);
1132 assert_eq!(delta.order.price, Price::from("98459.9"));
1133 assert_eq!(delta.order.size, Quantity::from(33000));
1134 assert_eq!(delta.order.side, OrderSide::Sell);
1135 assert_eq!(delta.order.order_id, 62400580205);
1136 assert_eq!(delta.action, BookAction::Add);
1137 assert_eq!(delta.flags, RecordFlag::F_SNAPSHOT as u8);
1138 assert_eq!(delta.sequence, 0);
1139 assert_eq!(delta.ts_event, 1732436782356000000); assert_eq!(delta.ts_init, 3);
1141
1142 let delta = parse_book_msg(
1144 &msg,
1145 &BitmexAction::Update,
1146 &instrument,
1147 instrument.id(),
1148 instrument.price_precision(),
1149 UnixNanos::from(3),
1150 );
1151 assert_eq!(delta.flags, 0);
1152 assert_eq!(delta.action, BookAction::Update);
1153 }
1154
1155 #[rstest]
1156 fn test_orderbook10_message() {
1157 let json_data = load_test_json("ws_orderbook_10.json");
1158 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1159 let msg: BitmexOrderBook10Msg = serde_json::from_str(&json_data).unwrap();
1160 let instrument = create_test_perpetual_instrument();
1161 let depth10 = parse_book10_msg(
1162 &msg,
1163 &instrument,
1164 instrument.id(),
1165 instrument.price_precision(),
1166 UnixNanos::from(3),
1167 );
1168
1169 assert_eq!(depth10.instrument_id, instrument_id);
1170
1171 assert_eq!(depth10.bids[0].price, Price::from("98490.3"));
1173 assert_eq!(depth10.bids[0].size, Quantity::from(22400));
1174 assert_eq!(depth10.bids[0].side, OrderSide::Buy);
1175
1176 assert_eq!(depth10.asks[0].price, Price::from("98490.4"));
1178 assert_eq!(depth10.asks[0].size, Quantity::from(17600));
1179 assert_eq!(depth10.asks[0].side, OrderSide::Sell);
1180
1181 assert_eq!(depth10.bid_counts, [1; DEPTH10_LEN]);
1183 assert_eq!(depth10.ask_counts, [1; DEPTH10_LEN]);
1184
1185 assert_eq!(depth10.sequence, 0);
1187 assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
1188 assert_eq!(depth10.ts_event, 1732436353513000000); assert_eq!(depth10.ts_init, 3);
1190 }
1191
1192 #[rstest]
1193 fn test_quote_message() {
1194 let json_data = load_test_json("ws_quote.json");
1195
1196 let instrument_id = InstrumentId::from("BCHUSDT.BITMEX");
1197 let last_quote = QuoteTick::new(
1198 instrument_id,
1199 Price::new(487.50, 2),
1200 Price::new(488.20, 2),
1201 Quantity::from(100_000),
1202 Quantity::from(100_000),
1203 UnixNanos::from(1),
1204 UnixNanos::from(2),
1205 );
1206 let msg: BitmexQuoteMsg = serde_json::from_str(&json_data).unwrap();
1207 let instrument = create_test_perpetual_instrument_with_precisions(2, 0);
1208 let quote = parse_quote_msg(
1209 &msg,
1210 &last_quote,
1211 &instrument,
1212 instrument_id,
1213 instrument.price_precision(),
1214 UnixNanos::from(3),
1215 );
1216
1217 assert_eq!(quote.instrument_id, instrument_id);
1218 assert_eq!(quote.bid_price, Price::from("487.55"));
1219 assert_eq!(quote.ask_price, Price::from("488.25"));
1220 assert_eq!(quote.bid_size, Quantity::from(103_000));
1221 assert_eq!(quote.ask_size, Quantity::from(50_000));
1222 assert_eq!(quote.ts_event, 1732315465085000000);
1223 assert_eq!(quote.ts_init, 3);
1224 }
1225
1226 #[rstest]
1227 fn test_trade_message() {
1228 let json_data = load_test_json("ws_trade.json");
1229
1230 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1231 let msg: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
1232 let instrument = create_test_perpetual_instrument();
1233 let trade = parse_trade_msg(
1234 &msg,
1235 &instrument,
1236 instrument.id(),
1237 instrument.price_precision(),
1238 UnixNanos::from(3),
1239 );
1240
1241 assert_eq!(trade.instrument_id, instrument_id);
1242 assert_eq!(trade.price, Price::from("98570.9"));
1243 assert_eq!(trade.size, Quantity::from(100));
1244 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1245 assert_eq!(
1246 trade.trade_id.to_string(),
1247 "00000000-006d-1000-0000-000e8737d536"
1248 );
1249 assert_eq!(trade.ts_event, 1732436138704000000); assert_eq!(trade.ts_init, 3);
1251 }
1252
1253 #[rstest]
1254 fn test_trade_bin_message() {
1255 let json_data = load_test_json("ws_trade_bin_1m.json");
1256
1257 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1258 let topic = BitmexWsTopic::TradeBin1m;
1259
1260 let msg: BitmexTradeBinMsg = serde_json::from_str(&json_data).unwrap();
1261 let instrument = create_test_perpetual_instrument();
1262 let bar = parse_trade_bin_msg(
1263 &msg,
1264 &topic,
1265 &instrument,
1266 instrument.id(),
1267 instrument.price_precision(),
1268 UnixNanos::from(3),
1269 );
1270
1271 assert_eq!(bar.instrument_id(), instrument_id);
1272 assert_eq!(
1273 bar.bar_type.spec(),
1274 BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
1275 );
1276 assert_eq!(bar.open, Price::from("97550.0"));
1277 assert_eq!(bar.high, Price::from("97584.4"));
1278 assert_eq!(bar.low, Price::from("97550.0"));
1279 assert_eq!(bar.close, Price::from("97570.1"));
1280 assert_eq!(bar.volume, Quantity::from(84_000));
1281 assert_eq!(bar.ts_event, 1732392420000000000); assert_eq!(bar.ts_init, 3);
1283 }
1284
1285 #[rstest]
1286 fn test_trade_bin_message_extreme_adjustment() {
1287 let topic = BitmexWsTopic::TradeBin1m;
1288 let instrument = create_test_perpetual_instrument();
1289
1290 let msg = BitmexTradeBinMsg {
1291 timestamp: DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
1292 .unwrap()
1293 .with_timezone(&Utc),
1294 symbol: Ustr::from("XBTUSD"),
1295 open: 50_000.0,
1296 high: 49_990.0,
1297 low: 50_010.0,
1298 close: 50_005.0,
1299 trades: 10,
1300 volume: 1_000,
1301 vwap: 0.0,
1302 last_size: 0,
1303 turnover: 0,
1304 home_notional: 0.0,
1305 foreign_notional: 0.0,
1306 };
1307
1308 let bar = parse_trade_bin_msg(
1309 &msg,
1310 &topic,
1311 &instrument,
1312 instrument.id(),
1313 instrument.price_precision(),
1314 UnixNanos::from(3),
1315 );
1316
1317 assert_eq!(bar.high, Price::from("50010.0"));
1318 assert_eq!(bar.low, Price::from("49990.0"));
1319 assert_eq!(bar.open, Price::from("50000.0"));
1320 assert_eq!(bar.close, Price::from("50005.0"));
1321 assert_eq!(bar.volume, Quantity::from(1_000));
1322 }
1323
1324 #[rstest]
1325 fn test_parse_order_msg() {
1326 let json_data = load_test_json("ws_order.json");
1327 let msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1328 let cache = DashMap::new();
1329 let instrument = create_test_perpetual_instrument();
1330 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1331
1332 assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1333 assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1334 assert_eq!(
1335 report.venue_order_id.to_string(),
1336 "550e8400-e29b-41d4-a716-446655440001"
1337 );
1338 assert_eq!(
1339 report.client_order_id.unwrap().to_string(),
1340 "mm_bitmex_1a/oemUeQ4CAJZgP3fjHsA"
1341 );
1342 assert_eq!(report.order_side, OrderSide::Buy);
1343 assert_eq!(report.order_type, OrderType::Limit);
1344 assert_eq!(report.time_in_force, TimeInForce::Gtc);
1345 assert_eq!(report.order_status, OrderStatus::Accepted);
1346 assert_eq!(report.quantity, Quantity::from(100));
1347 assert_eq!(report.filled_qty, Quantity::from(0));
1348 assert_eq!(report.price.unwrap(), Price::from("98000.0"));
1349 assert_eq!(report.ts_accepted, 1732530600000000000); }
1351
1352 #[rstest]
1353 fn test_parse_order_msg_infers_type_when_missing() {
1354 let json_data = load_test_json("ws_order.json");
1355 let mut msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1356 msg.ord_type = None;
1357 msg.cl_ord_id = None;
1358 msg.price = Some(98_000.0);
1359 msg.stop_px = None;
1360
1361 let cache = DashMap::new();
1362 let instrument = create_test_perpetual_instrument();
1363
1364 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1365
1366 assert_eq!(report.order_type, OrderType::Limit);
1367 }
1368
1369 #[rstest]
1370 fn test_parse_order_msg_rejected_with_reason() {
1371 let mut msg: BitmexOrderMsg =
1372 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1373 msg.ord_status = BitmexOrderStatus::Rejected;
1374 msg.ord_rej_reason = Some(Ustr::from("Insufficient available balance"));
1375 msg.text = None;
1376 msg.cum_qty = 0;
1377
1378 let cache = DashMap::new();
1379 let instrument = create_test_perpetual_instrument();
1380 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1381
1382 assert_eq!(report.order_status, OrderStatus::Rejected);
1383 assert_eq!(
1384 report.cancel_reason,
1385 Some("Insufficient available balance".to_string())
1386 );
1387 }
1388
1389 #[rstest]
1390 fn test_parse_order_msg_rejected_with_text_fallback() {
1391 let mut msg: BitmexOrderMsg =
1392 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1393 msg.ord_status = BitmexOrderStatus::Rejected;
1394 msg.ord_rej_reason = None;
1395 msg.text = Some(Ustr::from("Order would execute immediately"));
1396 msg.cum_qty = 0;
1397
1398 let cache = DashMap::new();
1399 let instrument = create_test_perpetual_instrument();
1400 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1401
1402 assert_eq!(report.order_status, OrderStatus::Rejected);
1403 assert_eq!(
1404 report.cancel_reason,
1405 Some("Order would execute immediately".to_string())
1406 );
1407 }
1408
1409 #[rstest]
1410 fn test_parse_order_msg_rejected_without_reason() {
1411 let mut msg: BitmexOrderMsg =
1412 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1413 msg.ord_status = BitmexOrderStatus::Rejected;
1414 msg.ord_rej_reason = None;
1415 msg.text = None;
1416 msg.cum_qty = 0;
1417
1418 let cache = DashMap::new();
1419 let instrument = create_test_perpetual_instrument();
1420 let report = parse_order_msg(&msg, &instrument, &cache).unwrap();
1421
1422 assert_eq!(report.order_status, OrderStatus::Rejected);
1423 assert_eq!(report.cancel_reason, None);
1424 }
1425
1426 #[rstest]
1427 fn test_parse_execution_msg() {
1428 let json_data = load_test_json("ws_execution.json");
1429 let msg: BitmexExecutionMsg = serde_json::from_str(&json_data).unwrap();
1430 let instrument = create_test_perpetual_instrument();
1431 let fill = parse_execution_msg(msg, &instrument).unwrap();
1432
1433 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1434 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1435 assert_eq!(
1436 fill.venue_order_id.to_string(),
1437 "550e8400-e29b-41d4-a716-446655440002"
1438 );
1439 assert_eq!(
1440 fill.trade_id.to_string(),
1441 "00000000-006d-1000-0000-000e8737d540"
1442 );
1443 assert_eq!(
1444 fill.client_order_id.unwrap().to_string(),
1445 "mm_bitmex_2b/oemUeQ4CAJZgP3fjHsB"
1446 );
1447 assert_eq!(fill.order_side, OrderSide::Sell);
1448 assert_eq!(fill.last_qty, Quantity::from(100));
1449 assert_eq!(fill.last_px, Price::from("98950.0"));
1450 assert_eq!(fill.liquidity_side, LiquiditySide::Maker);
1451 assert_eq!(fill.commission, Money::new(0.00075, Currency::from("XBT")));
1452 assert_eq!(fill.commission.currency.code.to_string(), "XBT");
1453 assert_eq!(fill.ts_event, 1732530900789000000); }
1455
1456 #[rstest]
1457 fn test_parse_execution_msg_non_trade() {
1458 let mut msg: BitmexExecutionMsg =
1460 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1461 msg.exec_type = Some(BitmexExecType::Settlement);
1462
1463 let instrument = create_test_perpetual_instrument();
1464 let result = parse_execution_msg(msg, &instrument);
1465 assert!(result.is_none());
1466 }
1467
1468 #[rstest]
1469 fn test_parse_cancel_reject_execution() {
1470 let json = load_test_json("ws_execution_cancel_reject.json");
1472
1473 let msg: BitmexExecutionMsg = serde_json::from_str(&json).unwrap();
1474 assert_eq!(msg.exec_type, Some(BitmexExecType::CancelReject));
1475 assert_eq!(msg.ord_status, Some(BitmexOrderStatus::Rejected));
1476 assert_eq!(msg.symbol, None);
1477
1478 let instrument = create_test_perpetual_instrument();
1480 let result = parse_execution_msg(msg, &instrument);
1481 assert!(result.is_none());
1482 }
1483
1484 #[rstest]
1485 fn test_parse_execution_msg_liquidation() {
1486 let mut msg: BitmexExecutionMsg =
1488 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1489 msg.exec_type = Some(BitmexExecType::Liquidation);
1490
1491 let instrument = create_test_perpetual_instrument();
1492 let fill = parse_execution_msg(msg, &instrument).unwrap();
1493
1494 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1495 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1496 assert_eq!(fill.order_side, OrderSide::Sell);
1497 assert_eq!(fill.last_qty, Quantity::from(100));
1498 assert_eq!(fill.last_px, Price::from("98950.0"));
1499 }
1500
1501 #[rstest]
1502 fn test_parse_execution_msg_bankruptcy() {
1503 let mut msg: BitmexExecutionMsg =
1504 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1505 msg.exec_type = Some(BitmexExecType::Bankruptcy);
1506
1507 let instrument = create_test_perpetual_instrument();
1508 let fill = parse_execution_msg(msg, &instrument).unwrap();
1509
1510 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1511 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1512 assert_eq!(fill.order_side, OrderSide::Sell);
1513 assert_eq!(fill.last_qty, Quantity::from(100));
1514 }
1515
1516 #[rstest]
1517 fn test_parse_execution_msg_settlement() {
1518 let mut msg: BitmexExecutionMsg =
1519 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1520 msg.exec_type = Some(BitmexExecType::Settlement);
1521
1522 let instrument = create_test_perpetual_instrument();
1523 let result = parse_execution_msg(msg, &instrument);
1524 assert!(result.is_none());
1525 }
1526
1527 #[rstest]
1528 fn test_parse_execution_msg_trial_fill() {
1529 let mut msg: BitmexExecutionMsg =
1530 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1531 msg.exec_type = Some(BitmexExecType::TrialFill);
1532
1533 let instrument = create_test_perpetual_instrument();
1534 let result = parse_execution_msg(msg, &instrument);
1535 assert!(result.is_none());
1536 }
1537
1538 #[rstest]
1539 fn test_parse_execution_msg_funding() {
1540 let mut msg: BitmexExecutionMsg =
1541 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1542 msg.exec_type = Some(BitmexExecType::Funding);
1543
1544 let instrument = create_test_perpetual_instrument();
1545 let result = parse_execution_msg(msg, &instrument);
1546 assert!(result.is_none());
1547 }
1548
1549 #[rstest]
1550 fn test_parse_execution_msg_insurance() {
1551 let mut msg: BitmexExecutionMsg =
1552 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1553 msg.exec_type = Some(BitmexExecType::Insurance);
1554
1555 let instrument = create_test_perpetual_instrument();
1556 let result = parse_execution_msg(msg, &instrument);
1557 assert!(result.is_none());
1558 }
1559
1560 #[rstest]
1561 fn test_parse_execution_msg_rebalance() {
1562 let mut msg: BitmexExecutionMsg =
1563 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1564 msg.exec_type = Some(BitmexExecType::Rebalance);
1565
1566 let instrument = create_test_perpetual_instrument();
1567 let result = parse_execution_msg(msg, &instrument);
1568 assert!(result.is_none());
1569 }
1570
1571 #[rstest]
1572 fn test_parse_execution_msg_order_state_changes() {
1573 let instrument = create_test_perpetual_instrument();
1574
1575 let order_state_types = vec![
1576 BitmexExecType::New,
1577 BitmexExecType::Canceled,
1578 BitmexExecType::CancelReject,
1579 BitmexExecType::Replaced,
1580 BitmexExecType::Rejected,
1581 BitmexExecType::AmendReject,
1582 BitmexExecType::Suspended,
1583 BitmexExecType::Released,
1584 BitmexExecType::TriggeredOrActivatedBySystem,
1585 ];
1586
1587 for exec_type in order_state_types {
1588 let mut msg: BitmexExecutionMsg =
1589 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1590 msg.exec_type = Some(exec_type);
1591
1592 let result = parse_execution_msg(msg, &instrument);
1593 assert!(
1594 result.is_none(),
1595 "Expected None for exec_type {:?}",
1596 exec_type
1597 );
1598 }
1599 }
1600
1601 #[rstest]
1602 fn test_parse_position_msg() {
1603 let json_data = load_test_json("ws_position.json");
1604 let msg: BitmexPositionMsg = serde_json::from_str(&json_data).unwrap();
1605 let instrument = create_test_perpetual_instrument();
1606 let report = parse_position_msg(msg, &instrument);
1607
1608 assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1609 assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1610 assert_eq!(report.position_side.as_position_side(), PositionSide::Long);
1611 assert_eq!(report.quantity, Quantity::from(1000));
1612 assert!(report.venue_position_id.is_none());
1613 assert_eq!(report.ts_last, 1732530900789000000); }
1615
1616 #[rstest]
1617 fn test_parse_position_msg_short() {
1618 let mut msg: BitmexPositionMsg =
1619 serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1620 msg.current_qty = Some(-500);
1621
1622 let instrument = create_test_perpetual_instrument();
1623 let report = parse_position_msg(msg, &instrument);
1624 assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
1625 assert_eq!(report.quantity, Quantity::from(500));
1626 }
1627
1628 #[rstest]
1629 fn test_parse_position_msg_flat() {
1630 let mut msg: BitmexPositionMsg =
1631 serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1632 msg.current_qty = Some(0);
1633
1634 let instrument = create_test_perpetual_instrument();
1635 let report = parse_position_msg(msg, &instrument);
1636 assert_eq!(report.position_side.as_position_side(), PositionSide::Flat);
1637 assert_eq!(report.quantity, Quantity::from(0));
1638 }
1639
1640 #[rstest]
1641 fn test_parse_wallet_msg() {
1642 let json_data = load_test_json("ws_wallet.json");
1643 let msg: BitmexWalletMsg = serde_json::from_str(&json_data).unwrap();
1644 let ts_init = UnixNanos::from(1);
1645 let account_state = parse_wallet_msg(msg, ts_init);
1646
1647 assert_eq!(account_state.account_id.to_string(), "BITMEX-1234567");
1648 assert!(!account_state.balances.is_empty());
1649 let balance = &account_state.balances[0];
1650 assert_eq!(balance.currency.code.to_string(), "XBT");
1651 assert!((balance.total.as_f64() - 1.0000518).abs() < 1e-7);
1653 }
1654
1655 #[rstest]
1656 fn test_parse_wallet_msg_no_amount() {
1657 let mut msg: BitmexWalletMsg =
1658 serde_json::from_str(&load_test_json("ws_wallet.json")).unwrap();
1659 msg.amount = None;
1660
1661 let ts_init = UnixNanos::from(1);
1662 let account_state = parse_wallet_msg(msg, ts_init);
1663 let balance = &account_state.balances[0];
1664 assert_eq!(balance.total.as_f64(), 0.0);
1665 }
1666
1667 #[rstest]
1668 fn test_parse_margin_msg() {
1669 let json_data = load_test_json("ws_margin.json");
1670 let msg: BitmexMarginMsg = serde_json::from_str(&json_data).unwrap();
1671 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1672 let margin_balance = parse_margin_msg(msg, instrument_id);
1673
1674 assert_eq!(margin_balance.currency.code.to_string(), "XBT");
1675 assert_eq!(margin_balance.instrument_id, instrument_id);
1676 assert_eq!(margin_balance.initial.as_f64(), 0.0);
1679 assert!((margin_balance.maintenance.as_f64() - 0.00015949).abs() < 1e-8);
1681 }
1682
1683 #[rstest]
1684 fn test_parse_margin_msg_no_available() {
1685 let mut msg: BitmexMarginMsg =
1686 serde_json::from_str(&load_test_json("ws_margin.json")).unwrap();
1687 msg.available_margin = None;
1688
1689 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1690 let margin_balance = parse_margin_msg(msg, instrument_id);
1691 assert!(margin_balance.initial.as_f64() >= 0.0);
1693 assert!(margin_balance.maintenance.as_f64() >= 0.0);
1694 }
1695
1696 #[rstest]
1697 fn test_parse_instrument_msg_both_prices() {
1698 let json_data = load_test_json("ws_instrument.json");
1699 let msg: BitmexInstrumentMsg = serde_json::from_str(&json_data).unwrap();
1700
1701 let mut instruments_cache = AHashMap::new();
1703 let test_instrument = create_test_perpetual_instrument();
1704 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1705
1706 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1707
1708 assert_eq!(updates.len(), 2);
1710
1711 match &updates[0] {
1713 Data::MarkPriceUpdate(update) => {
1714 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1715 assert_eq!(update.value.as_f64(), 95125.7);
1716 }
1717 _ => panic!("Expected MarkPriceUpdate at index 0"),
1718 }
1719
1720 match &updates[1] {
1722 Data::IndexPriceUpdate(update) => {
1723 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1724 assert_eq!(update.value.as_f64(), 95124.3);
1725 }
1726 _ => panic!("Expected IndexPriceUpdate at index 1"),
1727 }
1728 }
1729
1730 #[rstest]
1731 fn test_parse_instrument_msg_mark_price_only() {
1732 let mut msg: BitmexInstrumentMsg =
1733 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1734 msg.index_price = None;
1735
1736 let mut instruments_cache = AHashMap::new();
1738 let test_instrument = create_test_perpetual_instrument();
1739 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1740
1741 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1742
1743 assert_eq!(updates.len(), 1);
1744 match &updates[0] {
1745 Data::MarkPriceUpdate(update) => {
1746 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1747 assert_eq!(update.value.as_f64(), 95125.7);
1748 }
1749 _ => panic!("Expected MarkPriceUpdate"),
1750 }
1751 }
1752
1753 #[rstest]
1754 fn test_parse_instrument_msg_index_price_only() {
1755 let mut msg: BitmexInstrumentMsg =
1756 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1757 msg.mark_price = None;
1758
1759 let mut instruments_cache = AHashMap::new();
1761 let test_instrument = create_test_perpetual_instrument();
1762 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1763
1764 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1765
1766 assert_eq!(updates.len(), 1);
1767 match &updates[0] {
1768 Data::IndexPriceUpdate(update) => {
1769 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1770 assert_eq!(update.value.as_f64(), 95124.3);
1771 }
1772 _ => panic!("Expected IndexPriceUpdate"),
1773 }
1774 }
1775
1776 #[rstest]
1777 fn test_parse_instrument_msg_no_prices() {
1778 let mut msg: BitmexInstrumentMsg =
1779 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1780 msg.mark_price = None;
1781 msg.index_price = None;
1782 msg.last_price = None;
1783
1784 let mut instruments_cache = AHashMap::new();
1786 let test_instrument = create_test_perpetual_instrument();
1787 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1788
1789 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1790 assert_eq!(updates.len(), 0);
1791 }
1792
1793 #[rstest]
1794 fn test_parse_instrument_msg_index_symbol() {
1795 let mut msg: BitmexInstrumentMsg =
1798 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
1799 msg.symbol = Ustr::from(".BXBT");
1800 msg.last_price = Some(119163.05);
1801 msg.mark_price = Some(119163.05); msg.index_price = None;
1803
1804 let instrument_id = InstrumentId::from(".BXBT.BITMEX");
1806 let instrument = CryptoPerpetual::new(
1807 instrument_id,
1808 Symbol::from(".BXBT"),
1809 Currency::BTC(),
1810 Currency::USD(),
1811 Currency::USD(),
1812 false, 2, 8, Price::from("0.01"),
1816 Quantity::from("0.00000001"),
1817 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(), UnixNanos::default(), );
1832 let mut instruments_cache = AHashMap::new();
1833 instruments_cache.insert(
1834 Ustr::from(".BXBT"),
1835 InstrumentAny::CryptoPerpetual(instrument),
1836 );
1837
1838 let updates = parse_instrument_msg(msg, &instruments_cache, UnixNanos::from(1));
1839
1840 assert_eq!(updates.len(), 2);
1841
1842 match &updates[0] {
1844 Data::MarkPriceUpdate(update) => {
1845 assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1846 assert_eq!(update.value, Price::from("119163.05"));
1847 }
1848 _ => panic!("Expected MarkPriceUpdate for index symbol"),
1849 }
1850
1851 match &updates[1] {
1853 Data::IndexPriceUpdate(update) => {
1854 assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
1855 assert_eq!(update.value, Price::from("119163.05"));
1856 assert_eq!(update.ts_init, UnixNanos::from(1));
1857 }
1858 _ => panic!("Expected IndexPriceUpdate for index symbol"),
1859 }
1860 }
1861
1862 #[rstest]
1863 fn test_parse_funding_msg() {
1864 let json_data = load_test_json("ws_funding_rate.json");
1865 let msg: BitmexFundingMsg = serde_json::from_str(&json_data).unwrap();
1866 let update = parse_funding_msg(msg, UnixNanos::from(1));
1867
1868 assert!(update.is_some());
1869 let update = update.unwrap();
1870
1871 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
1872 assert_eq!(update.rate.to_string(), "0.0001");
1873 assert!(update.next_funding_ns.is_none());
1874 assert_eq!(update.ts_event, UnixNanos::from(1732507200000000000));
1875 assert_eq!(update.ts_init, UnixNanos::from(1));
1876 }
1877}