1use std::convert::TryFrom;
19
20use anyhow::Context;
21use nautilus_core::{nanos::UnixNanos, uuid::UUID4};
22use nautilus_model::{
23 data::{Bar, BarType, BookOrder, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick},
24 enums::{
25 AccountType, AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType,
26 PositionSideSpecified, RecordFlag, TimeInForce,
27 },
28 events::account::state::AccountState,
29 identifiers::{AccountId, ClientOrderId, TradeId, VenueOrderId},
30 instruments::{Instrument, any::InstrumentAny},
31 reports::{FillReport, OrderStatusReport, PositionStatusReport},
32 types::{AccountBalance, Currency, Money, Price, Quantity},
33};
34use rust_decimal::Decimal;
35
36use super::messages::{
37 BybitWsAccountExecution, BybitWsAccountOrder, BybitWsAccountPosition, BybitWsAccountWallet,
38 BybitWsKline, BybitWsOrderbookDepthMsg, BybitWsTickerLinearMsg, BybitWsTickerOptionMsg,
39 BybitWsTrade,
40};
41use crate::common::{
42 enums::{BybitOrderStatus, BybitOrderType, BybitTimeInForce},
43 parse::{parse_millis_timestamp, parse_price_with_precision, parse_quantity_with_precision},
44};
45
46pub fn parse_topic(topic: &str) -> anyhow::Result<Vec<&str>> {
52 let parts: Vec<&str> = topic.split('.').collect();
53 if parts.is_empty() {
54 anyhow::bail!("Invalid topic format: empty topic");
55 }
56 Ok(parts)
57}
58
59pub fn parse_kline_topic(topic: &str) -> anyhow::Result<(&str, &str)> {
67 let parts = parse_topic(topic)?;
68 if parts.len() != 3 || parts[0] != "kline" {
69 anyhow::bail!(
70 "Invalid kline topic format: expected 'kline.{{interval}}.{{symbol}}', got '{topic}'"
71 );
72 }
73 Ok((parts[1], parts[2]))
74}
75
76pub fn parse_ws_trade_tick(
78 trade: &BybitWsTrade,
79 instrument: &InstrumentAny,
80 ts_init: UnixNanos,
81) -> anyhow::Result<TradeTick> {
82 let price = parse_price_with_precision(&trade.p, instrument.price_precision(), "trade.p")?;
83 let size = parse_quantity_with_precision(&trade.v, instrument.size_precision(), "trade.v")?;
84 let aggressor: AggressorSide = trade.taker_side.into();
85 let trade_id = TradeId::new_checked(trade.i.as_str())
86 .context("invalid trade identifier in Bybit trade message")?;
87 let ts_event = parse_millis_i64(trade.t, "trade.T")?;
88
89 TradeTick::new_checked(
90 instrument.id(),
91 price,
92 size,
93 aggressor,
94 trade_id,
95 ts_event,
96 ts_init,
97 )
98 .context("failed to construct TradeTick from Bybit trade message")
99}
100
101pub fn parse_orderbook_deltas(
103 msg: &BybitWsOrderbookDepthMsg,
104 instrument: &InstrumentAny,
105 ts_init: UnixNanos,
106) -> anyhow::Result<OrderBookDeltas> {
107 let is_snapshot = msg.msg_type.eq_ignore_ascii_case("snapshot");
108 let ts_event = parse_millis_i64(msg.ts, "orderbook.ts")?;
109 let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
110
111 let depth = &msg.data;
112 let instrument_id = instrument.id();
113 let price_precision = instrument.price_precision();
114 let size_precision = instrument.size_precision();
115 let update_id = u64::try_from(depth.u)
116 .context("received negative update id in Bybit order book message")?;
117 let sequence = u64::try_from(depth.seq)
118 .context("received negative sequence in Bybit order book message")?;
119
120 let mut deltas = Vec::new();
121
122 if is_snapshot {
123 deltas.push(OrderBookDelta::clear(
124 instrument_id,
125 sequence,
126 ts_event,
127 ts_init,
128 ));
129 }
130
131 let total_levels = depth.b.len() + depth.a.len();
132 let mut processed = 0_usize;
133
134 let mut push_level = |values: &[String], side: OrderSide| -> anyhow::Result<()> {
135 let (price, size) = parse_book_level(values, price_precision, size_precision, "orderbook")?;
136 let action = if size.is_zero() {
137 BookAction::Delete
138 } else if is_snapshot {
139 BookAction::Add
140 } else {
141 BookAction::Update
142 };
143
144 processed += 1;
145 let mut flags = RecordFlag::F_MBP as u8;
146 if processed == total_levels {
147 flags |= RecordFlag::F_LAST as u8;
148 }
149
150 let order = BookOrder::new(side, price, size, update_id);
151 let delta = OrderBookDelta::new_checked(
152 instrument_id,
153 action,
154 order,
155 flags,
156 sequence,
157 ts_event,
158 ts_init,
159 )
160 .context("failed to construct OrderBookDelta from Bybit book level")?;
161 deltas.push(delta);
162 Ok(())
163 };
164
165 for level in &depth.b {
166 push_level(level, OrderSide::Buy)?;
167 }
168 for level in &depth.a {
169 push_level(level, OrderSide::Sell)?;
170 }
171
172 if total_levels == 0
173 && let Some(last) = deltas.last_mut()
174 {
175 last.flags |= RecordFlag::F_LAST as u8;
176 }
177
178 OrderBookDeltas::new_checked(instrument_id, deltas)
179 .context("failed to assemble OrderBookDeltas from Bybit message")
180}
181
182pub fn parse_orderbook_quote(
184 msg: &BybitWsOrderbookDepthMsg,
185 instrument: &InstrumentAny,
186 last_quote: Option<&QuoteTick>,
187 ts_init: UnixNanos,
188) -> anyhow::Result<QuoteTick> {
189 let ts_event = parse_millis_i64(msg.ts, "orderbook.ts")?;
190 let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
191 let price_precision = instrument.price_precision();
192 let size_precision = instrument.size_precision();
193
194 let get_best =
195 |levels: &[Vec<String>], label: &str| -> anyhow::Result<Option<(Price, Quantity)>> {
196 if let Some(values) = levels.first() {
197 parse_book_level(values, price_precision, size_precision, label).map(Some)
198 } else {
199 Ok(None)
200 }
201 };
202
203 let bids = get_best(&msg.data.b, "bid")?;
204 let asks = get_best(&msg.data.a, "ask")?;
205
206 let (bid_price, bid_size) = match (bids, last_quote) {
207 (Some(level), _) => level,
208 (None, Some(prev)) => (prev.bid_price, prev.bid_size),
209 (None, None) => {
210 anyhow::bail!(
211 "Bybit order book update missing bid levels and no previous quote provided"
212 );
213 }
214 };
215
216 let (ask_price, ask_size) = match (asks, last_quote) {
217 (Some(level), _) => level,
218 (None, Some(prev)) => (prev.ask_price, prev.ask_size),
219 (None, None) => {
220 anyhow::bail!(
221 "Bybit order book update missing ask levels and no previous quote provided"
222 );
223 }
224 };
225
226 QuoteTick::new_checked(
227 instrument.id(),
228 bid_price,
229 ask_price,
230 bid_size,
231 ask_size,
232 ts_event,
233 ts_init,
234 )
235 .context("failed to construct QuoteTick from Bybit order book message")
236}
237
238pub fn parse_ticker_linear_quote(
240 msg: &BybitWsTickerLinearMsg,
241 instrument: &InstrumentAny,
242 ts_init: UnixNanos,
243) -> anyhow::Result<QuoteTick> {
244 let ts_event = parse_millis_i64(msg.ts, "ticker.ts")?;
245 let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
246 let price_precision = instrument.price_precision();
247 let size_precision = instrument.size_precision();
248
249 let data = &msg.data;
250 let bid_price = data
251 .bid1_price
252 .as_ref()
253 .context("Bybit ticker message missing bid1Price")?
254 .as_str();
255 let ask_price = data
256 .ask1_price
257 .as_ref()
258 .context("Bybit ticker message missing ask1Price")?
259 .as_str();
260
261 let bid_price = parse_price_with_precision(bid_price, price_precision, "ticker.bid1Price")?;
262 let ask_price = parse_price_with_precision(ask_price, price_precision, "ticker.ask1Price")?;
263
264 let bid_size_str = data.bid1_size.as_deref().unwrap_or("0");
265 let ask_size_str = data.ask1_size.as_deref().unwrap_or("0");
266
267 let bid_size = parse_quantity_with_precision(bid_size_str, size_precision, "ticker.bid1Size")?;
268 let ask_size = parse_quantity_with_precision(ask_size_str, size_precision, "ticker.ask1Size")?;
269
270 QuoteTick::new_checked(
271 instrument.id(),
272 bid_price,
273 ask_price,
274 bid_size,
275 ask_size,
276 ts_event,
277 ts_init,
278 )
279 .context("failed to construct QuoteTick from Bybit linear ticker message")
280}
281
282pub fn parse_ticker_option_quote(
284 msg: &BybitWsTickerOptionMsg,
285 instrument: &InstrumentAny,
286 ts_init: UnixNanos,
287) -> anyhow::Result<QuoteTick> {
288 let ts_event = parse_millis_i64(msg.ts, "ticker.ts")?;
289 let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
290 let price_precision = instrument.price_precision();
291 let size_precision = instrument.size_precision();
292
293 let data = &msg.data;
294 let bid_price =
295 parse_price_with_precision(&data.bid_price, price_precision, "ticker.bidPrice")?;
296 let ask_price =
297 parse_price_with_precision(&data.ask_price, price_precision, "ticker.askPrice")?;
298 let bid_size = parse_quantity_with_precision(&data.bid_size, size_precision, "ticker.bidSize")?;
299 let ask_size = parse_quantity_with_precision(&data.ask_size, size_precision, "ticker.askSize")?;
300
301 QuoteTick::new_checked(
302 instrument.id(),
303 bid_price,
304 ask_price,
305 bid_size,
306 ask_size,
307 ts_event,
308 ts_init,
309 )
310 .context("failed to construct QuoteTick from Bybit option ticker message")
311}
312
313pub(crate) fn parse_millis_i64(value: i64, field: &str) -> anyhow::Result<UnixNanos> {
314 if value < 0 {
315 Err(anyhow::anyhow!("{field} must be non-negative, was {value}"))
316 } else {
317 parse_millis_timestamp(&value.to_string(), field)
318 }
319}
320
321fn parse_book_level(
322 level: &[String],
323 price_precision: u8,
324 size_precision: u8,
325 label: &str,
326) -> anyhow::Result<(Price, Quantity)> {
327 let price_str = level
328 .first()
329 .ok_or_else(|| anyhow::anyhow!("missing price component in {label} level"))?;
330 let size_str = level
331 .get(1)
332 .ok_or_else(|| anyhow::anyhow!("missing size component in {label} level"))?;
333 let price = parse_price_with_precision(price_str, price_precision, label)?;
334 let size = parse_quantity_with_precision(size_str, size_precision, label)?;
335 Ok((price, size))
336}
337
338pub fn parse_ws_kline_bar(
344 kline: &BybitWsKline,
345 instrument: &InstrumentAny,
346 bar_type: BarType,
347 timestamp_on_close: bool,
348 ts_init: UnixNanos,
349) -> anyhow::Result<Bar> {
350 let price_precision = instrument.price_precision();
351 let size_precision = instrument.size_precision();
352
353 let open = parse_price_with_precision(&kline.open, price_precision, "kline.open")?;
354 let high = parse_price_with_precision(&kline.high, price_precision, "kline.high")?;
355 let low = parse_price_with_precision(&kline.low, price_precision, "kline.low")?;
356 let close = parse_price_with_precision(&kline.close, price_precision, "kline.close")?;
357 let volume = parse_quantity_with_precision(&kline.volume, size_precision, "kline.volume")?;
358
359 let mut ts_event = parse_millis_i64(kline.start, "kline.start")?;
360 if timestamp_on_close {
361 let interval_ns = bar_type
362 .spec()
363 .timedelta()
364 .num_nanoseconds()
365 .context("bar specification produced non-integer interval")?;
366 let interval_ns = u64::try_from(interval_ns)
367 .context("bar interval overflowed the u64 range for nanoseconds")?;
368 let updated = ts_event
369 .as_u64()
370 .checked_add(interval_ns)
371 .context("bar timestamp overflowed when adjusting to close time")?;
372 ts_event = UnixNanos::from(updated);
373 }
374 let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
375
376 Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
377 .context("failed to construct Bar from Bybit WebSocket kline")
378}
379
380pub fn parse_ws_order_status_report(
386 order: &BybitWsAccountOrder,
387 instrument: &InstrumentAny,
388 account_id: AccountId,
389 ts_init: UnixNanos,
390) -> anyhow::Result<OrderStatusReport> {
391 let instrument_id = instrument.id();
392 let venue_order_id = VenueOrderId::new(order.order_id.as_str());
393 let order_side: OrderSide = order.side.into();
394
395 let order_type: OrderType = match order.order_type {
396 BybitOrderType::Market => OrderType::Market,
397 BybitOrderType::Limit => OrderType::Limit,
398 BybitOrderType::Unknown => OrderType::Limit,
399 };
400
401 let time_in_force: TimeInForce = match order.time_in_force {
402 BybitTimeInForce::Gtc => TimeInForce::Gtc,
403 BybitTimeInForce::Ioc => TimeInForce::Ioc,
404 BybitTimeInForce::Fok => TimeInForce::Fok,
405 BybitTimeInForce::PostOnly => TimeInForce::Gtc,
406 };
407
408 let quantity =
409 parse_quantity_with_precision(&order.qty, instrument.size_precision(), "order.qty")?;
410
411 let filled_qty = parse_quantity_with_precision(
412 &order.cum_exec_qty,
413 instrument.size_precision(),
414 "order.cumExecQty",
415 )?;
416
417 let order_status: OrderStatus = match order.order_status {
423 BybitOrderStatus::Created | BybitOrderStatus::New | BybitOrderStatus::Untriggered => {
424 OrderStatus::Accepted
425 }
426 BybitOrderStatus::Rejected => {
427 if filled_qty.is_positive() {
428 OrderStatus::Canceled
429 } else {
430 OrderStatus::Rejected
431 }
432 }
433 BybitOrderStatus::PartiallyFilled => OrderStatus::PartiallyFilled,
434 BybitOrderStatus::Filled => OrderStatus::Filled,
435 BybitOrderStatus::Canceled | BybitOrderStatus::PartiallyFilledCanceled => {
436 OrderStatus::Canceled
437 }
438 BybitOrderStatus::Triggered => OrderStatus::Triggered,
439 BybitOrderStatus::Deactivated => OrderStatus::Canceled,
440 };
441
442 let ts_accepted = parse_millis_timestamp(&order.created_time, "order.createdTime")?;
443 let ts_last = parse_millis_timestamp(&order.updated_time, "order.updatedTime")?;
444
445 let mut report = OrderStatusReport::new(
446 account_id,
447 instrument_id,
448 None,
449 venue_order_id,
450 order_side,
451 order_type,
452 time_in_force,
453 order_status,
454 quantity,
455 filled_qty,
456 ts_accepted,
457 ts_last,
458 ts_init,
459 Some(UUID4::new()),
460 );
461
462 if !order.order_link_id.is_empty() {
463 report = report.with_client_order_id(ClientOrderId::new(order.order_link_id.as_str()));
464 }
465
466 if !order.price.is_empty() && order.price != "0" {
467 let price =
468 parse_price_with_precision(&order.price, instrument.price_precision(), "order.price")?;
469 report = report.with_price(price);
470 }
471
472 if !order.avg_price.is_empty() && order.avg_price != "0" {
473 let avg_px = order
474 .avg_price
475 .parse::<f64>()
476 .with_context(|| format!("Failed to parse avg_price='{}' as f64", order.avg_price))?;
477 report = report.with_avg_px(avg_px);
478 }
479
480 if !order.trigger_price.is_empty() && order.trigger_price != "0" {
481 let trigger_price = parse_price_with_precision(
482 &order.trigger_price,
483 instrument.price_precision(),
484 "order.triggerPrice",
485 )?;
486 report = report.with_trigger_price(trigger_price);
487 }
488
489 if order.reduce_only {
490 report = report.with_reduce_only(true);
491 }
492
493 if order.time_in_force == BybitTimeInForce::PostOnly {
494 report = report.with_post_only(true);
495 }
496
497 if !order.reject_reason.is_empty() {
498 report = report.with_cancel_reason(order.reject_reason.to_string());
499 }
500
501 Ok(report)
502}
503
504pub fn parse_ws_fill_report(
510 execution: &BybitWsAccountExecution,
511 account_id: AccountId,
512 instrument: &InstrumentAny,
513 ts_init: UnixNanos,
514) -> anyhow::Result<FillReport> {
515 let instrument_id = instrument.id();
516 let venue_order_id = VenueOrderId::new(execution.order_id.as_str());
517 let trade_id = TradeId::new_checked(execution.exec_id.as_str())
518 .context("invalid execId in Bybit WebSocket execution payload")?;
519
520 let order_side: OrderSide = execution.side.into();
521 let last_qty = parse_quantity_with_precision(
522 &execution.exec_qty,
523 instrument.size_precision(),
524 "execution.execQty",
525 )?;
526 let last_px = parse_price_with_precision(
527 &execution.exec_price,
528 instrument.price_precision(),
529 "execution.execPrice",
530 )?;
531
532 let liquidity_side = if execution.is_maker {
533 LiquiditySide::Maker
534 } else {
535 LiquiditySide::Taker
536 };
537
538 let commission_str = execution.exec_fee.trim_start_matches('-');
539 let commission_amount = commission_str
540 .parse::<f64>()
541 .with_context(|| format!("Failed to parse execFee='{}' as f64", execution.exec_fee))?
542 .abs();
543
544 let commission_currency = instrument.quote_currency();
546 let commission = Money::new(commission_amount, commission_currency);
547 let ts_event = parse_millis_timestamp(&execution.exec_time, "execution.execTime")?;
548
549 let client_order_id = if !execution.order_link_id.is_empty() {
550 Some(ClientOrderId::new(execution.order_link_id.as_str()))
551 } else {
552 None
553 };
554
555 Ok(FillReport::new(
556 account_id,
557 instrument_id,
558 venue_order_id,
559 trade_id,
560 order_side,
561 last_qty,
562 last_px,
563 commission,
564 liquidity_side,
565 client_order_id,
566 None, ts_event,
568 ts_init,
569 None, ))
571}
572
573pub fn parse_ws_position_status_report(
579 position: &BybitWsAccountPosition,
580 account_id: AccountId,
581 instrument: &InstrumentAny,
582 ts_init: UnixNanos,
583) -> anyhow::Result<PositionStatusReport> {
584 let instrument_id = instrument.id();
585
586 let quantity = parse_quantity_with_precision(
588 &position.size,
589 instrument.size_precision(),
590 "position.size",
591 )?;
592
593 let position_side = if position.side.eq_ignore_ascii_case("buy") {
595 PositionSideSpecified::Long
596 } else if position.side.eq_ignore_ascii_case("sell") {
597 PositionSideSpecified::Short
598 } else {
599 PositionSideSpecified::Flat
600 };
601
602 let avg_px_open = if let Some(ref avg_price) = position.avg_price {
603 if !avg_price.is_empty() && avg_price != "0" {
604 avg_price
605 .parse::<f64>()
606 .with_context(|| format!("Failed to parse avgPrice='{}' as f64", avg_price))?
607 } else {
608 0.0
609 }
610 } else {
611 0.0
612 };
613
614 let _unrealized_pnl = position.unrealised_pnl.parse::<f64>().with_context(|| {
615 format!(
616 "Failed to parse unrealisedPnl='{}' as f64",
617 position.unrealised_pnl
618 )
619 })?;
620
621 let _realized_pnl = position.cum_realised_pnl.parse::<f64>().with_context(|| {
622 format!(
623 "Failed to parse cumRealisedPnl='{}' as f64",
624 position.cum_realised_pnl
625 )
626 })?;
627
628 let ts_last = parse_millis_timestamp(&position.updated_time, "position.updatedTime")?;
629
630 let avg_px_open_decimal = if avg_px_open != 0.0 {
631 Some(Decimal::try_from(avg_px_open).context("Failed to convert avg_px_open to Decimal")?)
632 } else {
633 None
634 };
635
636 Ok(PositionStatusReport::new(
637 account_id,
638 instrument_id,
639 position_side,
640 quantity,
641 ts_last,
642 ts_init,
643 None, None, avg_px_open_decimal,
646 ))
647}
648
649pub fn parse_ws_account_state(
655 wallet: &BybitWsAccountWallet,
656 account_id: AccountId,
657 ts_event: UnixNanos,
658 ts_init: UnixNanos,
659) -> anyhow::Result<AccountState> {
660 let mut balances = Vec::new();
661
662 for coin_data in &wallet.coin {
663 let currency = Currency::from(coin_data.coin.as_str());
664
665 let wallet_balance_amount = coin_data.wallet_balance.parse::<f64>().with_context(|| {
666 format!(
667 "Failed to parse walletBalance='{}' as f64",
668 coin_data.wallet_balance
669 )
670 })?;
671
672 let spot_borrow_amount = if let Some(ref spot_borrow) = coin_data.spot_borrow {
673 if spot_borrow.is_empty() {
674 0.0
675 } else {
676 spot_borrow.parse::<f64>().with_context(|| {
677 format!("Failed to parse spotBorrow='{}' as f64", spot_borrow)
678 })?
679 }
680 } else {
681 0.0
682 };
683
684 let total_amount = wallet_balance_amount - spot_borrow_amount;
685
686 let free_amount = if coin_data.available_to_withdraw.is_empty() {
687 0.0
688 } else {
689 coin_data
690 .available_to_withdraw
691 .parse::<f64>()
692 .with_context(|| {
693 format!(
694 "Failed to parse availableToWithdraw='{}' as f64",
695 coin_data.available_to_withdraw
696 )
697 })?
698 };
699
700 let locked_amount = total_amount - free_amount;
701
702 let total = Money::new(total_amount, currency);
703 let locked = Money::new(locked_amount, currency);
704 let free = Money::new(free_amount, currency);
705
706 let balance = AccountBalance::new_checked(total, locked, free)
707 .context("Failed to create AccountBalance from wallet data")?;
708 balances.push(balance);
709 }
710
711 Ok(AccountState::new(
712 account_id,
713 AccountType::Margin, balances,
715 vec![], true, UUID4::new(),
718 ts_event,
719 ts_init,
720 None, ))
722}
723
724#[cfg(test)]
729mod tests {
730 use nautilus_model::{
731 data::BarSpecification,
732 enums::{AggregationSource, BarAggregation, PositionSide, PriceType},
733 };
734 use rstest::rstest;
735
736 use super::*;
737 use crate::{
738 common::{
739 parse::{parse_linear_instrument, parse_option_instrument},
740 testing::load_test_json,
741 },
742 http::models::{BybitInstrumentLinearResponse, BybitInstrumentOptionResponse},
743 websocket::messages::{
744 BybitWsOrderbookDepthMsg, BybitWsTickerLinearMsg, BybitWsTickerOptionMsg,
745 BybitWsTradeMsg,
746 },
747 };
748
749 const TS: UnixNanos = UnixNanos::new(1_700_000_000_000_000_000);
750
751 use ustr::Ustr;
752
753 use crate::http::models::BybitFeeRate;
754
755 fn sample_fee_rate(
756 symbol: &str,
757 taker: &str,
758 maker: &str,
759 base_coin: Option<&str>,
760 ) -> BybitFeeRate {
761 BybitFeeRate {
762 symbol: Ustr::from(symbol),
763 taker_fee_rate: taker.to_string(),
764 maker_fee_rate: maker.to_string(),
765 base_coin: base_coin.map(Ustr::from),
766 }
767 }
768
769 fn linear_instrument() -> InstrumentAny {
770 let json = load_test_json("http_get_instruments_linear.json");
771 let response: BybitInstrumentLinearResponse = serde_json::from_str(&json).unwrap();
772 let instrument = &response.result.list[0];
773 let fee_rate = sample_fee_rate("BTCUSDT", "0.00055", "0.0001", Some("BTC"));
774 parse_linear_instrument(instrument, &fee_rate, TS, TS).unwrap()
775 }
776
777 fn option_instrument() -> InstrumentAny {
778 let json = load_test_json("http_get_instruments_option.json");
779 let response: BybitInstrumentOptionResponse = serde_json::from_str(&json).unwrap();
780 let instrument = &response.result.list[0];
781 parse_option_instrument(instrument, TS, TS).unwrap()
782 }
783
784 #[rstest]
785 fn parse_ws_trade_into_trade_tick() {
786 let instrument = linear_instrument();
787 let json = load_test_json("ws_public_trade.json");
788 let msg: BybitWsTradeMsg = serde_json::from_str(&json).unwrap();
789 let trade = &msg.data[0];
790
791 let tick = parse_ws_trade_tick(trade, &instrument, TS).unwrap();
792
793 assert_eq!(tick.instrument_id, instrument.id());
794 assert_eq!(tick.price, instrument.make_price(27451.00));
795 assert_eq!(tick.size, instrument.make_qty(0.010, None));
796 assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
797 assert_eq!(
798 tick.trade_id.to_string(),
799 "9dc75fca-4bdd-4773-9f78-6f5d7ab2a110"
800 );
801 assert_eq!(tick.ts_event, UnixNanos::new(1_709_891_679_000_000_000));
802 }
803
804 #[rstest]
805 fn parse_orderbook_snapshot_into_deltas() {
806 let instrument = linear_instrument();
807 let json = load_test_json("ws_orderbook_snapshot.json");
808 let msg: BybitWsOrderbookDepthMsg = serde_json::from_str(&json).unwrap();
809
810 let deltas = parse_orderbook_deltas(&msg, &instrument, TS).unwrap();
811
812 assert_eq!(deltas.instrument_id, instrument.id());
813 assert_eq!(deltas.deltas.len(), 5);
814 assert_eq!(deltas.deltas[0].action, BookAction::Clear);
815 assert_eq!(
816 deltas.deltas[1].order.price,
817 instrument.make_price(27450.00)
818 );
819 assert_eq!(
820 deltas.deltas[1].order.size,
821 instrument.make_qty(0.500, None)
822 );
823 let last = deltas.deltas.last().unwrap();
824 assert_eq!(last.order.side, OrderSide::Sell);
825 assert_eq!(last.order.price, instrument.make_price(27451.50));
826 assert_eq!(
827 last.flags & RecordFlag::F_LAST as u8,
828 RecordFlag::F_LAST as u8
829 );
830 }
831
832 #[rstest]
833 fn parse_orderbook_delta_marks_actions() {
834 let instrument = linear_instrument();
835 let json = load_test_json("ws_orderbook_delta.json");
836 let msg: BybitWsOrderbookDepthMsg = serde_json::from_str(&json).unwrap();
837
838 let deltas = parse_orderbook_deltas(&msg, &instrument, TS).unwrap();
839
840 assert_eq!(deltas.deltas.len(), 2);
841 let bid = &deltas.deltas[0];
842 assert_eq!(bid.action, BookAction::Update);
843 assert_eq!(bid.order.side, OrderSide::Buy);
844 assert_eq!(bid.order.size, instrument.make_qty(0.400, None));
845
846 let ask = &deltas.deltas[1];
847 assert_eq!(ask.action, BookAction::Delete);
848 assert_eq!(ask.order.side, OrderSide::Sell);
849 assert_eq!(ask.order.size, instrument.make_qty(0.0, None));
850 assert_eq!(
851 ask.flags & RecordFlag::F_LAST as u8,
852 RecordFlag::F_LAST as u8
853 );
854 }
855
856 #[rstest]
857 fn parse_orderbook_quote_produces_top_of_book() {
858 let instrument = linear_instrument();
859 let json = load_test_json("ws_orderbook_snapshot.json");
860 let msg: BybitWsOrderbookDepthMsg = serde_json::from_str(&json).unwrap();
861
862 let quote = parse_orderbook_quote(&msg, &instrument, None, TS).unwrap();
863
864 assert_eq!(quote.instrument_id, instrument.id());
865 assert_eq!(quote.bid_price, instrument.make_price(27450.00));
866 assert_eq!(quote.bid_size, instrument.make_qty(0.500, None));
867 assert_eq!(quote.ask_price, instrument.make_price(27451.00));
868 assert_eq!(quote.ask_size, instrument.make_qty(0.750, None));
869 }
870
871 #[rstest]
872 fn parse_orderbook_quote_with_delta_updates_sizes() {
873 let instrument = linear_instrument();
874 let snapshot: BybitWsOrderbookDepthMsg =
875 serde_json::from_str(&load_test_json("ws_orderbook_snapshot.json")).unwrap();
876 let base_quote = parse_orderbook_quote(&snapshot, &instrument, None, TS).unwrap();
877
878 let delta: BybitWsOrderbookDepthMsg =
879 serde_json::from_str(&load_test_json("ws_orderbook_delta.json")).unwrap();
880 let updated = parse_orderbook_quote(&delta, &instrument, Some(&base_quote), TS).unwrap();
881
882 assert_eq!(updated.bid_price, instrument.make_price(27450.00));
883 assert_eq!(updated.bid_size, instrument.make_qty(0.400, None));
884 assert_eq!(updated.ask_price, instrument.make_price(27451.00));
885 assert_eq!(updated.ask_size, instrument.make_qty(0.0, None));
886 }
887
888 #[rstest]
889 fn parse_linear_ticker_quote_to_quote_tick() {
890 let instrument = linear_instrument();
891 let json = load_test_json("ws_ticker_linear.json");
892 let msg: BybitWsTickerLinearMsg = serde_json::from_str(&json).unwrap();
893
894 let quote = parse_ticker_linear_quote(&msg, &instrument, TS).unwrap();
895
896 assert_eq!(quote.instrument_id, instrument.id());
897 assert_eq!(quote.bid_price, instrument.make_price(17215.50));
898 assert_eq!(quote.ask_price, instrument.make_price(17216.00));
899 assert_eq!(quote.bid_size, instrument.make_qty(84.489, None));
900 assert_eq!(quote.ask_size, instrument.make_qty(83.020, None));
901 assert_eq!(quote.ts_event, UnixNanos::new(1_673_272_861_686_000_000));
902 assert_eq!(quote.ts_init, TS);
903 }
904
905 #[rstest]
906 fn parse_option_ticker_quote_to_quote_tick() {
907 let instrument = option_instrument();
908 let json = load_test_json("ws_ticker_option.json");
909 let msg: BybitWsTickerOptionMsg = serde_json::from_str(&json).unwrap();
910
911 let quote = parse_ticker_option_quote(&msg, &instrument, TS).unwrap();
912
913 assert_eq!(quote.instrument_id, instrument.id());
914 assert_eq!(quote.bid_price, instrument.make_price(0.0));
915 assert_eq!(quote.ask_price, instrument.make_price(10.0));
916 assert_eq!(quote.bid_size, instrument.make_qty(0.0, None));
917 assert_eq!(quote.ask_size, instrument.make_qty(5.1, None));
918 assert_eq!(quote.ts_event, UnixNanos::new(1_672_917_511_074_000_000));
919 assert_eq!(quote.ts_init, TS);
920 }
921
922 #[rstest]
923 fn parse_ws_kline_into_bar() {
924 use std::num::NonZero;
925
926 let instrument = linear_instrument();
927 let json = load_test_json("ws_kline.json");
928 let msg: crate::websocket::messages::BybitWsKlineMsg = serde_json::from_str(&json).unwrap();
929 let kline = &msg.data[0];
930
931 let bar_spec = BarSpecification {
932 step: NonZero::new(5).unwrap(),
933 aggregation: BarAggregation::Minute,
934 price_type: PriceType::Last,
935 };
936 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::External);
937
938 let bar = parse_ws_kline_bar(kline, &instrument, bar_type, false, TS).unwrap();
939
940 assert_eq!(bar.bar_type, bar_type);
941 assert_eq!(bar.open, instrument.make_price(16649.5));
942 assert_eq!(bar.high, instrument.make_price(16677.0));
943 assert_eq!(bar.low, instrument.make_price(16608.0));
944 assert_eq!(bar.close, instrument.make_price(16677.0));
945 assert_eq!(bar.volume, instrument.make_qty(2.081, None));
946 assert_eq!(bar.ts_event, UnixNanos::new(1_672_324_800_000_000_000));
947 assert_eq!(bar.ts_init, TS);
948 }
949
950 #[rstest]
951 fn parse_ws_order_into_order_status_report() {
952 let instrument = linear_instrument();
953 let json = load_test_json("ws_account_order_filled.json");
954 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
955 serde_json::from_str(&json).unwrap();
956 let order = &msg.data[0];
957 let account_id = AccountId::new("BYBIT-001");
958
959 let report = parse_ws_order_status_report(order, &instrument, account_id, TS).unwrap();
960
961 assert_eq!(report.account_id, account_id);
962 assert_eq!(report.instrument_id, instrument.id());
963 assert_eq!(report.order_side, OrderSide::Buy);
964 assert_eq!(report.order_type, OrderType::Limit);
965 assert_eq!(report.time_in_force, TimeInForce::Gtc);
966 assert_eq!(report.order_status, OrderStatus::Filled);
967 assert_eq!(report.quantity, instrument.make_qty(0.100, None));
968 assert_eq!(report.filled_qty, instrument.make_qty(0.100, None));
969 assert_eq!(report.price, Some(instrument.make_price(30000.50)));
970 assert_eq!(report.avg_px, Some(30000.50));
971 assert_eq!(
972 report.client_order_id.as_ref().unwrap().to_string(),
973 "test-client-order-001"
974 );
975 assert_eq!(
976 report.ts_accepted,
977 UnixNanos::new(1_672_364_262_444_000_000)
978 );
979 assert_eq!(report.ts_last, UnixNanos::new(1_672_364_262_457_000_000));
980 }
981
982 #[rstest]
983 fn parse_ws_order_partially_filled_rejected_maps_to_canceled() {
984 let instrument = linear_instrument();
985 let json = load_test_json("ws_account_order_partially_filled_rejected.json");
986 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
987 serde_json::from_str(&json).unwrap();
988 let order = &msg.data[0];
989 let account_id = AccountId::new("BYBIT-001");
990
991 let report = parse_ws_order_status_report(order, &instrument, account_id, TS).unwrap();
992
993 assert_eq!(report.order_status, OrderStatus::Canceled);
995 assert_eq!(report.filled_qty, instrument.make_qty(50.0, None));
996 assert_eq!(
997 report.client_order_id.as_ref().unwrap().to_string(),
998 "O-20251001-164609-APEX-000-49"
999 );
1000 assert_eq!(report.cancel_reason, Some("UNKNOWN".to_string()));
1001 }
1002
1003 #[rstest]
1004 fn parse_ws_execution_into_fill_report() {
1005 let instrument = linear_instrument();
1006 let json = load_test_json("ws_account_execution.json");
1007 let msg: crate::websocket::messages::BybitWsAccountExecutionMsg =
1008 serde_json::from_str(&json).unwrap();
1009 let execution = &msg.data[0];
1010 let account_id = AccountId::new("BYBIT-001");
1011
1012 let report = parse_ws_fill_report(execution, account_id, &instrument, TS).unwrap();
1013
1014 assert_eq!(report.account_id, account_id);
1015 assert_eq!(report.instrument_id, instrument.id());
1016 assert_eq!(
1017 report.venue_order_id.to_string(),
1018 "9aac161b-8ed6-450d-9cab-c5cc67c21784"
1019 );
1020 assert_eq!(
1021 report.trade_id.to_string(),
1022 "0ab1bdf7-4219-438b-b30a-32ec863018f7"
1023 );
1024 assert_eq!(report.order_side, OrderSide::Sell);
1025 assert_eq!(report.last_qty, instrument.make_qty(0.5, None));
1026 assert_eq!(report.last_px, instrument.make_price(95900.1));
1027 assert_eq!(report.commission.as_f64(), 26.3725275);
1028 assert_eq!(report.liquidity_side, LiquiditySide::Taker);
1029 assert_eq!(
1030 report.client_order_id.as_ref().unwrap().to_string(),
1031 "test-order-link-001"
1032 );
1033 assert_eq!(report.ts_event, UnixNanos::new(1_746_270_400_353_000_000));
1034 }
1035
1036 #[rstest]
1037 fn parse_ws_position_into_position_status_report() {
1038 let instrument = linear_instrument();
1039 let json = load_test_json("ws_account_position.json");
1040 let msg: crate::websocket::messages::BybitWsAccountPositionMsg =
1041 serde_json::from_str(&json).unwrap();
1042 let position = &msg.data[0];
1043 let account_id = AccountId::new("BYBIT-001");
1044
1045 let report =
1046 parse_ws_position_status_report(position, account_id, &instrument, TS).unwrap();
1047
1048 assert_eq!(report.account_id, account_id);
1049 assert_eq!(report.instrument_id, instrument.id());
1050 assert_eq!(report.position_side.as_position_side(), PositionSide::Long);
1051 assert_eq!(report.quantity, instrument.make_qty(0.15, None));
1052 assert_eq!(
1053 report.avg_px_open,
1054 Some(Decimal::try_from(28500.50).unwrap())
1055 );
1056 assert_eq!(report.ts_last, UnixNanos::new(1_697_682_317_038_000_000));
1057 assert_eq!(report.ts_init, TS);
1058 }
1059
1060 #[rstest]
1061 fn parse_ws_position_short_into_position_status_report() {
1062 let instruments_json = load_test_json("http_get_instruments_linear.json");
1064 let instruments_response: crate::http::models::BybitInstrumentLinearResponse =
1065 serde_json::from_str(&instruments_json).unwrap();
1066 let eth_def = &instruments_response.result.list[1]; let fee_rate = crate::http::models::BybitFeeRate {
1068 symbol: ustr::Ustr::from("ETHUSDT"),
1069 taker_fee_rate: "0.00055".to_string(),
1070 maker_fee_rate: "0.0001".to_string(),
1071 base_coin: Some(ustr::Ustr::from("ETH")),
1072 };
1073 let instrument =
1074 crate::common::parse::parse_linear_instrument(eth_def, &fee_rate, TS, TS).unwrap();
1075
1076 let json = load_test_json("ws_account_position_short.json");
1077 let msg: crate::websocket::messages::BybitWsAccountPositionMsg =
1078 serde_json::from_str(&json).unwrap();
1079 let position = &msg.data[0];
1080 let account_id = AccountId::new("BYBIT-001");
1081
1082 let report =
1083 parse_ws_position_status_report(position, account_id, &instrument, TS).unwrap();
1084
1085 assert_eq!(report.account_id, account_id);
1086 assert_eq!(report.instrument_id.symbol.as_str(), "ETHUSDT-LINEAR");
1087 assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
1088 assert_eq!(report.quantity, instrument.make_qty(2.5, None));
1089 assert_eq!(
1090 report.avg_px_open,
1091 Some(Decimal::try_from(2450.75).unwrap())
1092 );
1093 assert_eq!(report.ts_last, UnixNanos::new(1_697_682_417_038_000_000));
1094 assert_eq!(report.ts_init, TS);
1095 }
1096
1097 #[rstest]
1098 fn parse_ws_wallet_into_account_state() {
1099 let json = load_test_json("ws_account_wallet.json");
1100 let msg: crate::websocket::messages::BybitWsAccountWalletMsg =
1101 serde_json::from_str(&json).unwrap();
1102 let wallet = &msg.data[0];
1103 let account_id = AccountId::new("BYBIT-001");
1104 let ts_event = UnixNanos::new(1_700_034_722_104_000_000);
1105
1106 let state = parse_ws_account_state(wallet, account_id, ts_event, TS).unwrap();
1107
1108 assert_eq!(state.account_id, account_id);
1109 assert_eq!(state.account_type, AccountType::Margin);
1110 assert_eq!(state.balances.len(), 2);
1111 assert!(state.is_reported);
1112
1113 let btc_balance = &state.balances[0];
1115 assert_eq!(btc_balance.currency.code.as_str(), "BTC");
1116 assert!((btc_balance.total.as_f64() - 0.00102964).abs() < 1e-8);
1117 assert!((btc_balance.free.as_f64() - 0.00092964).abs() < 1e-8);
1118 assert!((btc_balance.locked.as_f64() - 0.0001).abs() < 1e-8);
1119
1120 let usdt_balance = &state.balances[1];
1122 assert_eq!(usdt_balance.currency.code.as_str(), "USDT");
1123 assert!((usdt_balance.total.as_f64() - 9647.75537647).abs() < 1e-6);
1124 assert!((usdt_balance.free.as_f64() - 9519.89806037).abs() < 1e-6);
1125 assert!((usdt_balance.locked.as_f64() - 127.8573161).abs() < 1e-6);
1126
1127 assert_eq!(state.ts_event, ts_event);
1128 assert_eq!(state.ts_init, TS);
1129 }
1130}