1use std::convert::TryFrom;
19
20use anyhow::Context;
21use nautilus_core::{nanos::UnixNanos, uuid::UUID4};
22use nautilus_model::{
23 data::{Bar, BarType, BookOrder, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick},
24 enums::{
25 AccountType, AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType,
26 PositionSideSpecified, RecordFlag, TimeInForce,
27 },
28 events::account::state::AccountState,
29 identifiers::{AccountId, ClientOrderId, TradeId, VenueOrderId},
30 instruments::{Instrument, any::InstrumentAny},
31 reports::{FillReport, OrderStatusReport, PositionStatusReport},
32 types::{AccountBalance, Currency, Money, Price, Quantity},
33};
34use rust_decimal::Decimal;
35
36use super::messages::{
37 BybitWsAccountExecution, BybitWsAccountOrder, BybitWsAccountPosition, BybitWsAccountWallet,
38 BybitWsKline, BybitWsOrderbookDepthMsg, BybitWsTickerLinearMsg, BybitWsTickerOptionMsg,
39 BybitWsTrade,
40};
41use crate::common::{
42 enums::{BybitOrderStatus, BybitOrderType, BybitTimeInForce},
43 parse::{parse_millis_timestamp, parse_price_with_precision, parse_quantity_with_precision},
44};
45
46pub fn parse_topic(topic: &str) -> anyhow::Result<Vec<&str>> {
52 let parts: Vec<&str> = topic.split('.').collect();
53 if parts.is_empty() {
54 anyhow::bail!("Invalid topic format: empty topic");
55 }
56 Ok(parts)
57}
58
59pub fn parse_kline_topic(topic: &str) -> anyhow::Result<(&str, &str)> {
67 let parts = parse_topic(topic)?;
68 if parts.len() != 3 || parts[0] != "kline" {
69 anyhow::bail!(
70 "Invalid kline topic format: expected 'kline.{{interval}}.{{symbol}}', got '{topic}'"
71 );
72 }
73 Ok((parts[1], parts[2]))
74}
75
76pub fn parse_ws_trade_tick(
78 trade: &BybitWsTrade,
79 instrument: &InstrumentAny,
80 ts_init: UnixNanos,
81) -> anyhow::Result<TradeTick> {
82 let price = parse_price_with_precision(&trade.p, instrument.price_precision(), "trade.p")?;
83 let size = parse_quantity_with_precision(&trade.v, instrument.size_precision(), "trade.v")?;
84 let aggressor: AggressorSide = trade.taker_side.into();
85 let trade_id = TradeId::new_checked(trade.i.as_str())
86 .context("invalid trade identifier in Bybit trade message")?;
87 let ts_event = parse_millis_i64(trade.t, "trade.T")?;
88
89 TradeTick::new_checked(
90 instrument.id(),
91 price,
92 size,
93 aggressor,
94 trade_id,
95 ts_event,
96 ts_init,
97 )
98 .context("failed to construct TradeTick from Bybit trade message")
99}
100
101pub fn parse_orderbook_deltas(
103 msg: &BybitWsOrderbookDepthMsg,
104 instrument: &InstrumentAny,
105 ts_init: UnixNanos,
106) -> anyhow::Result<OrderBookDeltas> {
107 let is_snapshot = msg.msg_type.eq_ignore_ascii_case("snapshot");
108 let ts_event = parse_millis_i64(msg.ts, "orderbook.ts")?;
109 let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
110
111 let depth = &msg.data;
112 let instrument_id = instrument.id();
113 let price_precision = instrument.price_precision();
114 let size_precision = instrument.size_precision();
115 let update_id = u64::try_from(depth.u)
116 .context("received negative update id in Bybit order book message")?;
117 let sequence = u64::try_from(depth.seq)
118 .context("received negative sequence in Bybit order book message")?;
119
120 let mut deltas = Vec::new();
121
122 if is_snapshot {
123 deltas.push(OrderBookDelta::clear(
124 instrument_id,
125 sequence,
126 ts_event,
127 ts_init,
128 ));
129 }
130
131 let total_levels = depth.b.len() + depth.a.len();
132 let mut processed = 0_usize;
133
134 let mut push_level = |values: &[String], side: OrderSide| -> anyhow::Result<()> {
135 let (price, size) = parse_book_level(values, price_precision, size_precision, "orderbook")?;
136 let action = if size.is_zero() {
137 BookAction::Delete
138 } else if is_snapshot {
139 BookAction::Add
140 } else {
141 BookAction::Update
142 };
143
144 processed += 1;
145 let mut flags = RecordFlag::F_MBP as u8;
146 if processed == total_levels {
147 flags |= RecordFlag::F_LAST as u8;
148 }
149
150 let order = BookOrder::new(side, price, size, update_id);
151 let delta = OrderBookDelta::new_checked(
152 instrument_id,
153 action,
154 order,
155 flags,
156 sequence,
157 ts_event,
158 ts_init,
159 )
160 .context("failed to construct OrderBookDelta from Bybit book level")?;
161 deltas.push(delta);
162 Ok(())
163 };
164
165 for level in &depth.b {
166 push_level(level, OrderSide::Buy)?;
167 }
168 for level in &depth.a {
169 push_level(level, OrderSide::Sell)?;
170 }
171
172 if total_levels == 0
173 && let Some(last) = deltas.last_mut()
174 {
175 last.flags |= RecordFlag::F_LAST as u8;
176 }
177
178 OrderBookDeltas::new_checked(instrument_id, deltas)
179 .context("failed to assemble OrderBookDeltas from Bybit message")
180}
181
182pub fn parse_orderbook_quote(
184 msg: &BybitWsOrderbookDepthMsg,
185 instrument: &InstrumentAny,
186 last_quote: Option<&QuoteTick>,
187 ts_init: UnixNanos,
188) -> anyhow::Result<QuoteTick> {
189 let ts_event = parse_millis_i64(msg.ts, "orderbook.ts")?;
190 let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
191 let price_precision = instrument.price_precision();
192 let size_precision = instrument.size_precision();
193
194 let get_best =
195 |levels: &[Vec<String>], label: &str| -> anyhow::Result<Option<(Price, Quantity)>> {
196 if let Some(values) = levels.first() {
197 parse_book_level(values, price_precision, size_precision, label).map(Some)
198 } else {
199 Ok(None)
200 }
201 };
202
203 let bids = get_best(&msg.data.b, "bid")?;
204 let asks = get_best(&msg.data.a, "ask")?;
205
206 let (bid_price, bid_size) = match (bids, last_quote) {
207 (Some(level), _) => level,
208 (None, Some(prev)) => (prev.bid_price, prev.bid_size),
209 (None, None) => {
210 anyhow::bail!(
211 "Bybit order book update missing bid levels and no previous quote provided"
212 );
213 }
214 };
215
216 let (ask_price, ask_size) = match (asks, last_quote) {
217 (Some(level), _) => level,
218 (None, Some(prev)) => (prev.ask_price, prev.ask_size),
219 (None, None) => {
220 anyhow::bail!(
221 "Bybit order book update missing ask levels and no previous quote provided"
222 );
223 }
224 };
225
226 QuoteTick::new_checked(
227 instrument.id(),
228 bid_price,
229 ask_price,
230 bid_size,
231 ask_size,
232 ts_event,
233 ts_init,
234 )
235 .context("failed to construct QuoteTick from Bybit order book message")
236}
237
238pub fn parse_ticker_linear_quote(
240 msg: &BybitWsTickerLinearMsg,
241 instrument: &InstrumentAny,
242 ts_init: UnixNanos,
243) -> anyhow::Result<QuoteTick> {
244 let ts_event = parse_millis_i64(msg.ts, "ticker.ts")?;
245 let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
246 let price_precision = instrument.price_precision();
247 let size_precision = instrument.size_precision();
248
249 let data = &msg.data;
250 let bid_price = data
251 .bid1_price
252 .as_ref()
253 .context("Bybit ticker message missing bid1Price")?
254 .as_str();
255 let ask_price = data
256 .ask1_price
257 .as_ref()
258 .context("Bybit ticker message missing ask1Price")?
259 .as_str();
260
261 let bid_price = parse_price_with_precision(bid_price, price_precision, "ticker.bid1Price")?;
262 let ask_price = parse_price_with_precision(ask_price, price_precision, "ticker.ask1Price")?;
263
264 let bid_size_str = data.bid1_size.as_deref().unwrap_or("0");
265 let ask_size_str = data.ask1_size.as_deref().unwrap_or("0");
266
267 let bid_size = parse_quantity_with_precision(bid_size_str, size_precision, "ticker.bid1Size")?;
268 let ask_size = parse_quantity_with_precision(ask_size_str, size_precision, "ticker.ask1Size")?;
269
270 QuoteTick::new_checked(
271 instrument.id(),
272 bid_price,
273 ask_price,
274 bid_size,
275 ask_size,
276 ts_event,
277 ts_init,
278 )
279 .context("failed to construct QuoteTick from Bybit linear ticker message")
280}
281
282pub fn parse_ticker_option_quote(
284 msg: &BybitWsTickerOptionMsg,
285 instrument: &InstrumentAny,
286 ts_init: UnixNanos,
287) -> anyhow::Result<QuoteTick> {
288 let ts_event = parse_millis_i64(msg.ts, "ticker.ts")?;
289 let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
290 let price_precision = instrument.price_precision();
291 let size_precision = instrument.size_precision();
292
293 let data = &msg.data;
294 let bid_price =
295 parse_price_with_precision(&data.bid_price, price_precision, "ticker.bidPrice")?;
296 let ask_price =
297 parse_price_with_precision(&data.ask_price, price_precision, "ticker.askPrice")?;
298 let bid_size = parse_quantity_with_precision(&data.bid_size, size_precision, "ticker.bidSize")?;
299 let ask_size = parse_quantity_with_precision(&data.ask_size, size_precision, "ticker.askSize")?;
300
301 QuoteTick::new_checked(
302 instrument.id(),
303 bid_price,
304 ask_price,
305 bid_size,
306 ask_size,
307 ts_event,
308 ts_init,
309 )
310 .context("failed to construct QuoteTick from Bybit option ticker message")
311}
312
313pub(crate) fn parse_millis_i64(value: i64, field: &str) -> anyhow::Result<UnixNanos> {
314 if value < 0 {
315 Err(anyhow::anyhow!("{field} must be non-negative, was {value}"))
316 } else {
317 parse_millis_timestamp(&value.to_string(), field)
318 }
319}
320
321fn parse_book_level(
322 level: &[String],
323 price_precision: u8,
324 size_precision: u8,
325 label: &str,
326) -> anyhow::Result<(Price, Quantity)> {
327 let price_str = level
328 .first()
329 .ok_or_else(|| anyhow::anyhow!("missing price component in {label} level"))?;
330 let size_str = level
331 .get(1)
332 .ok_or_else(|| anyhow::anyhow!("missing size component in {label} level"))?;
333 let price = parse_price_with_precision(price_str, price_precision, label)?;
334 let size = parse_quantity_with_precision(size_str, size_precision, label)?;
335 Ok((price, size))
336}
337
338pub fn parse_ws_kline_bar(
344 kline: &BybitWsKline,
345 instrument: &InstrumentAny,
346 bar_type: BarType,
347 timestamp_on_close: bool,
348 ts_init: UnixNanos,
349) -> anyhow::Result<Bar> {
350 let price_precision = instrument.price_precision();
351 let size_precision = instrument.size_precision();
352
353 let open = parse_price_with_precision(&kline.open, price_precision, "kline.open")?;
354 let high = parse_price_with_precision(&kline.high, price_precision, "kline.high")?;
355 let low = parse_price_with_precision(&kline.low, price_precision, "kline.low")?;
356 let close = parse_price_with_precision(&kline.close, price_precision, "kline.close")?;
357 let volume = parse_quantity_with_precision(&kline.volume, size_precision, "kline.volume")?;
358
359 let mut ts_event = parse_millis_i64(kline.start, "kline.start")?;
360 if timestamp_on_close {
361 let interval_ns = bar_type
362 .spec()
363 .timedelta()
364 .num_nanoseconds()
365 .context("bar specification produced non-integer interval")?;
366 let interval_ns = u64::try_from(interval_ns)
367 .context("bar interval overflowed the u64 range for nanoseconds")?;
368 let updated = ts_event
369 .as_u64()
370 .checked_add(interval_ns)
371 .context("bar timestamp overflowed when adjusting to close time")?;
372 ts_event = UnixNanos::from(updated);
373 }
374 let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
375
376 Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
377 .context("failed to construct Bar from Bybit WebSocket kline")
378}
379
380pub fn parse_ws_order_status_report(
386 order: &BybitWsAccountOrder,
387 instrument: &InstrumentAny,
388 account_id: AccountId,
389 ts_init: UnixNanos,
390) -> anyhow::Result<OrderStatusReport> {
391 let instrument_id = instrument.id();
392 let venue_order_id = VenueOrderId::new(order.order_id.as_str());
393 let order_side: OrderSide = order.side.into();
394
395 let order_type: OrderType = match order.order_type {
396 BybitOrderType::Market => OrderType::Market,
397 BybitOrderType::Limit => OrderType::Limit,
398 BybitOrderType::Unknown => OrderType::Limit,
399 };
400
401 let time_in_force: TimeInForce = match order.time_in_force {
402 BybitTimeInForce::Gtc => TimeInForce::Gtc,
403 BybitTimeInForce::Ioc => TimeInForce::Ioc,
404 BybitTimeInForce::Fok => TimeInForce::Fok,
405 BybitTimeInForce::PostOnly => TimeInForce::Gtc,
406 };
407
408 let quantity =
409 parse_quantity_with_precision(&order.qty, instrument.size_precision(), "order.qty")?;
410
411 let filled_qty = parse_quantity_with_precision(
412 &order.cum_exec_qty,
413 instrument.size_precision(),
414 "order.cumExecQty",
415 )?;
416
417 let order_status: OrderStatus = match order.order_status {
423 BybitOrderStatus::Created | BybitOrderStatus::New | BybitOrderStatus::Untriggered => {
424 OrderStatus::Accepted
425 }
426 BybitOrderStatus::Rejected => {
427 if filled_qty.is_positive() {
428 OrderStatus::Canceled
429 } else {
430 OrderStatus::Rejected
431 }
432 }
433 BybitOrderStatus::PartiallyFilled => OrderStatus::PartiallyFilled,
434 BybitOrderStatus::Filled => OrderStatus::Filled,
435 BybitOrderStatus::Canceled | BybitOrderStatus::PartiallyFilledCanceled => {
436 OrderStatus::Canceled
437 }
438 BybitOrderStatus::Triggered => OrderStatus::Triggered,
439 BybitOrderStatus::Deactivated => OrderStatus::Canceled,
440 };
441
442 let ts_accepted = parse_millis_timestamp(&order.created_time, "order.createdTime")?;
443 let ts_last = parse_millis_timestamp(&order.updated_time, "order.updatedTime")?;
444
445 let mut report = OrderStatusReport::new(
446 account_id,
447 instrument_id,
448 None,
449 venue_order_id,
450 order_side,
451 order_type,
452 time_in_force,
453 order_status,
454 quantity,
455 filled_qty,
456 ts_accepted,
457 ts_last,
458 ts_init,
459 Some(UUID4::new()),
460 );
461
462 if !order.order_link_id.is_empty() {
463 report = report.with_client_order_id(ClientOrderId::new(order.order_link_id.as_str()));
464 }
465
466 if !order.price.is_empty() && order.price != "0" {
467 let price =
468 parse_price_with_precision(&order.price, instrument.price_precision(), "order.price")?;
469 report = report.with_price(price);
470 }
471
472 if !order.avg_price.is_empty() && order.avg_price != "0" {
473 let avg_px = order
474 .avg_price
475 .parse::<f64>()
476 .with_context(|| format!("Failed to parse avg_price='{}' as f64", order.avg_price))?;
477 report = report.with_avg_px(avg_px);
478 }
479
480 if !order.trigger_price.is_empty() && order.trigger_price != "0" {
481 let trigger_price = parse_price_with_precision(
482 &order.trigger_price,
483 instrument.price_precision(),
484 "order.triggerPrice",
485 )?;
486 report = report.with_trigger_price(trigger_price);
487 }
488
489 if order.reduce_only {
490 report = report.with_reduce_only(true);
491 }
492
493 if order.time_in_force == BybitTimeInForce::PostOnly {
494 report = report.with_post_only(true);
495 }
496
497 if !order.reject_reason.is_empty() {
498 report = report.with_cancel_reason(order.reject_reason.to_string());
499 }
500
501 Ok(report)
502}
503
504pub fn parse_ws_fill_report(
510 execution: &BybitWsAccountExecution,
511 account_id: AccountId,
512 instrument: &InstrumentAny,
513 ts_init: UnixNanos,
514) -> anyhow::Result<FillReport> {
515 let instrument_id = instrument.id();
516 let venue_order_id = VenueOrderId::new(execution.order_id.as_str());
517 let trade_id = TradeId::new_checked(execution.exec_id.as_str())
518 .context("invalid execId in Bybit WebSocket execution payload")?;
519
520 let order_side: OrderSide = execution.side.into();
521 let last_qty = parse_quantity_with_precision(
522 &execution.exec_qty,
523 instrument.size_precision(),
524 "execution.execQty",
525 )?;
526 let last_px = parse_price_with_precision(
527 &execution.exec_price,
528 instrument.price_precision(),
529 "execution.execPrice",
530 )?;
531
532 let liquidity_side = if execution.is_maker {
533 LiquiditySide::Maker
534 } else {
535 LiquiditySide::Taker
536 };
537
538 let commission_str = execution.exec_fee.trim_start_matches('-');
539 let commission_amount = commission_str
540 .parse::<f64>()
541 .with_context(|| format!("Failed to parse execFee='{}' as f64", execution.exec_fee))?
542 .abs();
543
544 let commission_currency = instrument.quote_currency();
546 let commission = Money::new(commission_amount, commission_currency);
547 let ts_event = parse_millis_timestamp(&execution.exec_time, "execution.execTime")?;
548
549 let client_order_id = if !execution.order_link_id.is_empty() {
550 Some(ClientOrderId::new(execution.order_link_id.as_str()))
551 } else {
552 None
553 };
554
555 Ok(FillReport::new(
556 account_id,
557 instrument_id,
558 venue_order_id,
559 trade_id,
560 order_side,
561 last_qty,
562 last_px,
563 commission,
564 liquidity_side,
565 client_order_id,
566 None, ts_event,
568 ts_init,
569 None, ))
571}
572
573pub fn parse_ws_position_status_report(
579 position: &BybitWsAccountPosition,
580 account_id: AccountId,
581 instrument: &InstrumentAny,
582 ts_init: UnixNanos,
583) -> anyhow::Result<PositionStatusReport> {
584 let instrument_id = instrument.id();
585
586 let quantity = parse_quantity_with_precision(
588 &position.size,
589 instrument.size_precision(),
590 "position.size",
591 )?;
592
593 let position_side = if position.side.eq_ignore_ascii_case("buy") {
595 PositionSideSpecified::Long
596 } else if position.side.eq_ignore_ascii_case("sell") {
597 PositionSideSpecified::Short
598 } else {
599 PositionSideSpecified::Flat
600 };
601
602 let avg_px_open = if let Some(ref avg_price) = position.avg_price {
603 if !avg_price.is_empty() && avg_price != "0" {
604 avg_price
605 .parse::<f64>()
606 .with_context(|| format!("Failed to parse avgPrice='{}' as f64", avg_price))?
607 } else {
608 0.0
609 }
610 } else {
611 0.0
612 };
613
614 let _unrealized_pnl = position.unrealised_pnl.parse::<f64>().with_context(|| {
615 format!(
616 "Failed to parse unrealisedPnl='{}' as f64",
617 position.unrealised_pnl
618 )
619 })?;
620
621 let _realized_pnl = position.cum_realised_pnl.parse::<f64>().with_context(|| {
622 format!(
623 "Failed to parse cumRealisedPnl='{}' as f64",
624 position.cum_realised_pnl
625 )
626 })?;
627
628 let ts_last = parse_millis_timestamp(&position.updated_time, "position.updatedTime")?;
629
630 let avg_px_open_decimal = if avg_px_open != 0.0 {
631 Some(Decimal::try_from(avg_px_open).context("Failed to convert avg_px_open to Decimal")?)
632 } else {
633 None
634 };
635
636 Ok(PositionStatusReport::new(
637 account_id,
638 instrument_id,
639 position_side,
640 quantity,
641 ts_last,
642 ts_init,
643 None, None, avg_px_open_decimal,
646 ))
647}
648
649pub fn parse_ws_account_state(
655 wallet: &BybitWsAccountWallet,
656 account_id: AccountId,
657 ts_event: UnixNanos,
658 ts_init: UnixNanos,
659) -> anyhow::Result<AccountState> {
660 let mut balances = Vec::new();
661
662 for coin_data in &wallet.coin {
663 let currency = Currency::from(coin_data.coin.as_str());
664
665 let total_amount = coin_data.wallet_balance.parse::<f64>().with_context(|| {
666 format!(
667 "Failed to parse walletBalance='{}' as f64",
668 coin_data.wallet_balance
669 )
670 })?;
671
672 let free_amount = if coin_data.available_to_withdraw.is_empty() {
673 0.0
674 } else {
675 coin_data
676 .available_to_withdraw
677 .parse::<f64>()
678 .with_context(|| {
679 format!(
680 "Failed to parse availableToWithdraw='{}' as f64",
681 coin_data.available_to_withdraw
682 )
683 })?
684 };
685
686 let locked_amount = total_amount - free_amount;
687
688 let total = Money::new(total_amount, currency);
689 let locked = Money::new(locked_amount, currency);
690 let free = Money::new(free_amount, currency);
691
692 let balance = AccountBalance::new_checked(total, locked, free)
693 .context("Failed to create AccountBalance from wallet data")?;
694 balances.push(balance);
695 }
696
697 Ok(AccountState::new(
698 account_id,
699 AccountType::Margin, balances,
701 vec![], true, UUID4::new(),
704 ts_event,
705 ts_init,
706 None, ))
708}
709
710#[cfg(test)]
715mod tests {
716 use nautilus_model::{
717 data::BarSpecification,
718 enums::{AggregationSource, BarAggregation, PositionSide, PriceType},
719 };
720 use rstest::rstest;
721
722 use super::*;
723 use crate::{
724 common::{
725 parse::{parse_linear_instrument, parse_option_instrument},
726 testing::load_test_json,
727 },
728 http::models::{BybitInstrumentLinearResponse, BybitInstrumentOptionResponse},
729 websocket::messages::{
730 BybitWsOrderbookDepthMsg, BybitWsTickerLinearMsg, BybitWsTickerOptionMsg,
731 BybitWsTradeMsg,
732 },
733 };
734
735 const TS: UnixNanos = UnixNanos::new(1_700_000_000_000_000_000);
736
737 use ustr::Ustr;
738
739 use crate::http::models::BybitFeeRate;
740
741 fn sample_fee_rate(
742 symbol: &str,
743 taker: &str,
744 maker: &str,
745 base_coin: Option<&str>,
746 ) -> BybitFeeRate {
747 BybitFeeRate {
748 symbol: Ustr::from(symbol),
749 taker_fee_rate: taker.to_string(),
750 maker_fee_rate: maker.to_string(),
751 base_coin: base_coin.map(Ustr::from),
752 }
753 }
754
755 fn linear_instrument() -> InstrumentAny {
756 let json = load_test_json("http_get_instruments_linear.json");
757 let response: BybitInstrumentLinearResponse = serde_json::from_str(&json).unwrap();
758 let instrument = &response.result.list[0];
759 let fee_rate = sample_fee_rate("BTCUSDT", "0.00055", "0.0001", Some("BTC"));
760 parse_linear_instrument(instrument, &fee_rate, TS, TS).unwrap()
761 }
762
763 fn option_instrument() -> InstrumentAny {
764 let json = load_test_json("http_get_instruments_option.json");
765 let response: BybitInstrumentOptionResponse = serde_json::from_str(&json).unwrap();
766 let instrument = &response.result.list[0];
767 parse_option_instrument(instrument, TS, TS).unwrap()
768 }
769
770 #[rstest]
771 fn parse_ws_trade_into_trade_tick() {
772 let instrument = linear_instrument();
773 let json = load_test_json("ws_public_trade.json");
774 let msg: BybitWsTradeMsg = serde_json::from_str(&json).unwrap();
775 let trade = &msg.data[0];
776
777 let tick = parse_ws_trade_tick(trade, &instrument, TS).unwrap();
778
779 assert_eq!(tick.instrument_id, instrument.id());
780 assert_eq!(tick.price, instrument.make_price(27451.00));
781 assert_eq!(tick.size, instrument.make_qty(0.010, None));
782 assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
783 assert_eq!(
784 tick.trade_id.to_string(),
785 "9dc75fca-4bdd-4773-9f78-6f5d7ab2a110"
786 );
787 assert_eq!(tick.ts_event, UnixNanos::new(1_709_891_679_000_000_000));
788 }
789
790 #[rstest]
791 fn parse_orderbook_snapshot_into_deltas() {
792 let instrument = linear_instrument();
793 let json = load_test_json("ws_orderbook_snapshot.json");
794 let msg: BybitWsOrderbookDepthMsg = serde_json::from_str(&json).unwrap();
795
796 let deltas = parse_orderbook_deltas(&msg, &instrument, TS).unwrap();
797
798 assert_eq!(deltas.instrument_id, instrument.id());
799 assert_eq!(deltas.deltas.len(), 5);
800 assert_eq!(deltas.deltas[0].action, BookAction::Clear);
801 assert_eq!(
802 deltas.deltas[1].order.price,
803 instrument.make_price(27450.00)
804 );
805 assert_eq!(
806 deltas.deltas[1].order.size,
807 instrument.make_qty(0.500, None)
808 );
809 let last = deltas.deltas.last().unwrap();
810 assert_eq!(last.order.side, OrderSide::Sell);
811 assert_eq!(last.order.price, instrument.make_price(27451.50));
812 assert_eq!(
813 last.flags & RecordFlag::F_LAST as u8,
814 RecordFlag::F_LAST as u8
815 );
816 }
817
818 #[rstest]
819 fn parse_orderbook_delta_marks_actions() {
820 let instrument = linear_instrument();
821 let json = load_test_json("ws_orderbook_delta.json");
822 let msg: BybitWsOrderbookDepthMsg = serde_json::from_str(&json).unwrap();
823
824 let deltas = parse_orderbook_deltas(&msg, &instrument, TS).unwrap();
825
826 assert_eq!(deltas.deltas.len(), 2);
827 let bid = &deltas.deltas[0];
828 assert_eq!(bid.action, BookAction::Update);
829 assert_eq!(bid.order.side, OrderSide::Buy);
830 assert_eq!(bid.order.size, instrument.make_qty(0.400, None));
831
832 let ask = &deltas.deltas[1];
833 assert_eq!(ask.action, BookAction::Delete);
834 assert_eq!(ask.order.side, OrderSide::Sell);
835 assert_eq!(ask.order.size, instrument.make_qty(0.0, None));
836 assert_eq!(
837 ask.flags & RecordFlag::F_LAST as u8,
838 RecordFlag::F_LAST as u8
839 );
840 }
841
842 #[rstest]
843 fn parse_orderbook_quote_produces_top_of_book() {
844 let instrument = linear_instrument();
845 let json = load_test_json("ws_orderbook_snapshot.json");
846 let msg: BybitWsOrderbookDepthMsg = serde_json::from_str(&json).unwrap();
847
848 let quote = parse_orderbook_quote(&msg, &instrument, None, TS).unwrap();
849
850 assert_eq!(quote.instrument_id, instrument.id());
851 assert_eq!(quote.bid_price, instrument.make_price(27450.00));
852 assert_eq!(quote.bid_size, instrument.make_qty(0.500, None));
853 assert_eq!(quote.ask_price, instrument.make_price(27451.00));
854 assert_eq!(quote.ask_size, instrument.make_qty(0.750, None));
855 }
856
857 #[rstest]
858 fn parse_orderbook_quote_with_delta_updates_sizes() {
859 let instrument = linear_instrument();
860 let snapshot: BybitWsOrderbookDepthMsg =
861 serde_json::from_str(&load_test_json("ws_orderbook_snapshot.json")).unwrap();
862 let base_quote = parse_orderbook_quote(&snapshot, &instrument, None, TS).unwrap();
863
864 let delta: BybitWsOrderbookDepthMsg =
865 serde_json::from_str(&load_test_json("ws_orderbook_delta.json")).unwrap();
866 let updated = parse_orderbook_quote(&delta, &instrument, Some(&base_quote), TS).unwrap();
867
868 assert_eq!(updated.bid_price, instrument.make_price(27450.00));
869 assert_eq!(updated.bid_size, instrument.make_qty(0.400, None));
870 assert_eq!(updated.ask_price, instrument.make_price(27451.00));
871 assert_eq!(updated.ask_size, instrument.make_qty(0.0, None));
872 }
873
874 #[rstest]
875 fn parse_linear_ticker_quote_to_quote_tick() {
876 let instrument = linear_instrument();
877 let json = load_test_json("ws_ticker_linear.json");
878 let msg: BybitWsTickerLinearMsg = serde_json::from_str(&json).unwrap();
879
880 let quote = parse_ticker_linear_quote(&msg, &instrument, TS).unwrap();
881
882 assert_eq!(quote.instrument_id, instrument.id());
883 assert_eq!(quote.bid_price, instrument.make_price(17215.50));
884 assert_eq!(quote.ask_price, instrument.make_price(17216.00));
885 assert_eq!(quote.bid_size, instrument.make_qty(84.489, None));
886 assert_eq!(quote.ask_size, instrument.make_qty(83.020, None));
887 assert_eq!(quote.ts_event, UnixNanos::new(1_673_272_861_686_000_000));
888 assert_eq!(quote.ts_init, TS);
889 }
890
891 #[rstest]
892 fn parse_option_ticker_quote_to_quote_tick() {
893 let instrument = option_instrument();
894 let json = load_test_json("ws_ticker_option.json");
895 let msg: BybitWsTickerOptionMsg = serde_json::from_str(&json).unwrap();
896
897 let quote = parse_ticker_option_quote(&msg, &instrument, TS).unwrap();
898
899 assert_eq!(quote.instrument_id, instrument.id());
900 assert_eq!(quote.bid_price, instrument.make_price(0.0));
901 assert_eq!(quote.ask_price, instrument.make_price(10.0));
902 assert_eq!(quote.bid_size, instrument.make_qty(0.0, None));
903 assert_eq!(quote.ask_size, instrument.make_qty(5.1, None));
904 assert_eq!(quote.ts_event, UnixNanos::new(1_672_917_511_074_000_000));
905 assert_eq!(quote.ts_init, TS);
906 }
907
908 #[rstest]
909 fn parse_ws_kline_into_bar() {
910 use std::num::NonZero;
911
912 let instrument = linear_instrument();
913 let json = load_test_json("ws_kline.json");
914 let msg: crate::websocket::messages::BybitWsKlineMsg = serde_json::from_str(&json).unwrap();
915 let kline = &msg.data[0];
916
917 let bar_spec = BarSpecification {
918 step: NonZero::new(5).unwrap(),
919 aggregation: BarAggregation::Minute,
920 price_type: PriceType::Last,
921 };
922 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::External);
923
924 let bar = parse_ws_kline_bar(kline, &instrument, bar_type, false, TS).unwrap();
925
926 assert_eq!(bar.bar_type, bar_type);
927 assert_eq!(bar.open, instrument.make_price(16649.5));
928 assert_eq!(bar.high, instrument.make_price(16677.0));
929 assert_eq!(bar.low, instrument.make_price(16608.0));
930 assert_eq!(bar.close, instrument.make_price(16677.0));
931 assert_eq!(bar.volume, instrument.make_qty(2.081, None));
932 assert_eq!(bar.ts_event, UnixNanos::new(1_672_324_800_000_000_000));
933 assert_eq!(bar.ts_init, TS);
934 }
935
936 #[rstest]
937 fn parse_ws_order_into_order_status_report() {
938 let instrument = linear_instrument();
939 let json = load_test_json("ws_account_order_filled.json");
940 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
941 serde_json::from_str(&json).unwrap();
942 let order = &msg.data[0];
943 let account_id = AccountId::new("BYBIT-001");
944
945 let report = parse_ws_order_status_report(order, &instrument, account_id, TS).unwrap();
946
947 assert_eq!(report.account_id, account_id);
948 assert_eq!(report.instrument_id, instrument.id());
949 assert_eq!(report.order_side, OrderSide::Buy);
950 assert_eq!(report.order_type, OrderType::Limit);
951 assert_eq!(report.time_in_force, TimeInForce::Gtc);
952 assert_eq!(report.order_status, OrderStatus::Filled);
953 assert_eq!(report.quantity, instrument.make_qty(0.100, None));
954 assert_eq!(report.filled_qty, instrument.make_qty(0.100, None));
955 assert_eq!(report.price, Some(instrument.make_price(30000.50)));
956 assert_eq!(report.avg_px, Some(30000.50));
957 assert_eq!(
958 report.client_order_id.as_ref().unwrap().to_string(),
959 "test-client-order-001"
960 );
961 assert_eq!(
962 report.ts_accepted,
963 UnixNanos::new(1_672_364_262_444_000_000)
964 );
965 assert_eq!(report.ts_last, UnixNanos::new(1_672_364_262_457_000_000));
966 }
967
968 #[rstest]
969 fn parse_ws_order_partially_filled_rejected_maps_to_canceled() {
970 let instrument = linear_instrument();
971 let json = load_test_json("ws_account_order_partially_filled_rejected.json");
972 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
973 serde_json::from_str(&json).unwrap();
974 let order = &msg.data[0];
975 let account_id = AccountId::new("BYBIT-001");
976
977 let report = parse_ws_order_status_report(order, &instrument, account_id, TS).unwrap();
978
979 assert_eq!(report.order_status, OrderStatus::Canceled);
981 assert_eq!(report.filled_qty, instrument.make_qty(50.0, None));
982 assert_eq!(
983 report.client_order_id.as_ref().unwrap().to_string(),
984 "O-20251001-164609-APEX-000-49"
985 );
986 assert_eq!(report.cancel_reason, Some("UNKNOWN".to_string()));
987 }
988
989 #[rstest]
990 fn parse_ws_execution_into_fill_report() {
991 let instrument = linear_instrument();
992 let json = load_test_json("ws_account_execution.json");
993 let msg: crate::websocket::messages::BybitWsAccountExecutionMsg =
994 serde_json::from_str(&json).unwrap();
995 let execution = &msg.data[0];
996 let account_id = AccountId::new("BYBIT-001");
997
998 let report = parse_ws_fill_report(execution, account_id, &instrument, TS).unwrap();
999
1000 assert_eq!(report.account_id, account_id);
1001 assert_eq!(report.instrument_id, instrument.id());
1002 assert_eq!(
1003 report.venue_order_id.to_string(),
1004 "9aac161b-8ed6-450d-9cab-c5cc67c21784"
1005 );
1006 assert_eq!(
1007 report.trade_id.to_string(),
1008 "0ab1bdf7-4219-438b-b30a-32ec863018f7"
1009 );
1010 assert_eq!(report.order_side, OrderSide::Sell);
1011 assert_eq!(report.last_qty, instrument.make_qty(0.5, None));
1012 assert_eq!(report.last_px, instrument.make_price(95900.1));
1013 assert_eq!(report.commission.as_f64(), 26.3725275);
1014 assert_eq!(report.liquidity_side, LiquiditySide::Taker);
1015 assert_eq!(
1016 report.client_order_id.as_ref().unwrap().to_string(),
1017 "test-order-link-001"
1018 );
1019 assert_eq!(report.ts_event, UnixNanos::new(1_746_270_400_353_000_000));
1020 }
1021
1022 #[rstest]
1023 fn parse_ws_position_into_position_status_report() {
1024 let instrument = linear_instrument();
1025 let json = load_test_json("ws_account_position.json");
1026 let msg: crate::websocket::messages::BybitWsAccountPositionMsg =
1027 serde_json::from_str(&json).unwrap();
1028 let position = &msg.data[0];
1029 let account_id = AccountId::new("BYBIT-001");
1030
1031 let report =
1032 parse_ws_position_status_report(position, account_id, &instrument, TS).unwrap();
1033
1034 assert_eq!(report.account_id, account_id);
1035 assert_eq!(report.instrument_id, instrument.id());
1036 assert_eq!(report.position_side.as_position_side(), PositionSide::Long);
1037 assert_eq!(report.quantity, instrument.make_qty(0.15, None));
1038 assert_eq!(
1039 report.avg_px_open,
1040 Some(Decimal::try_from(28500.50).unwrap())
1041 );
1042 assert_eq!(report.ts_last, UnixNanos::new(1_697_682_317_038_000_000));
1043 assert_eq!(report.ts_init, TS);
1044 }
1045
1046 #[rstest]
1047 fn parse_ws_position_short_into_position_status_report() {
1048 let instruments_json = load_test_json("http_get_instruments_linear.json");
1050 let instruments_response: crate::http::models::BybitInstrumentLinearResponse =
1051 serde_json::from_str(&instruments_json).unwrap();
1052 let eth_def = &instruments_response.result.list[1]; let fee_rate = crate::http::models::BybitFeeRate {
1054 symbol: ustr::Ustr::from("ETHUSDT"),
1055 taker_fee_rate: "0.00055".to_string(),
1056 maker_fee_rate: "0.0001".to_string(),
1057 base_coin: Some(ustr::Ustr::from("ETH")),
1058 };
1059 let instrument =
1060 crate::common::parse::parse_linear_instrument(eth_def, &fee_rate, TS, TS).unwrap();
1061
1062 let json = load_test_json("ws_account_position_short.json");
1063 let msg: crate::websocket::messages::BybitWsAccountPositionMsg =
1064 serde_json::from_str(&json).unwrap();
1065 let position = &msg.data[0];
1066 let account_id = AccountId::new("BYBIT-001");
1067
1068 let report =
1069 parse_ws_position_status_report(position, account_id, &instrument, TS).unwrap();
1070
1071 assert_eq!(report.account_id, account_id);
1072 assert_eq!(report.instrument_id.symbol.as_str(), "ETHUSDT-LINEAR");
1073 assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
1074 assert_eq!(report.quantity, instrument.make_qty(2.5, None));
1075 assert_eq!(
1076 report.avg_px_open,
1077 Some(Decimal::try_from(2450.75).unwrap())
1078 );
1079 assert_eq!(report.ts_last, UnixNanos::new(1_697_682_417_038_000_000));
1080 assert_eq!(report.ts_init, TS);
1081 }
1082
1083 #[rstest]
1084 fn parse_ws_wallet_into_account_state() {
1085 let json = load_test_json("ws_account_wallet.json");
1086 let msg: crate::websocket::messages::BybitWsAccountWalletMsg =
1087 serde_json::from_str(&json).unwrap();
1088 let wallet = &msg.data[0];
1089 let account_id = AccountId::new("BYBIT-001");
1090 let ts_event = UnixNanos::new(1_700_034_722_104_000_000);
1091
1092 let state = parse_ws_account_state(wallet, account_id, ts_event, TS).unwrap();
1093
1094 assert_eq!(state.account_id, account_id);
1095 assert_eq!(state.account_type, AccountType::Margin);
1096 assert_eq!(state.balances.len(), 2);
1097 assert!(state.is_reported);
1098
1099 let btc_balance = &state.balances[0];
1101 assert_eq!(btc_balance.currency.code.as_str(), "BTC");
1102 assert!((btc_balance.total.as_f64() - 0.00102964).abs() < 1e-8);
1103 assert!((btc_balance.free.as_f64() - 0.00092964).abs() < 1e-8);
1104 assert!((btc_balance.locked.as_f64() - 0.0001).abs() < 1e-8);
1105
1106 let usdt_balance = &state.balances[1];
1108 assert_eq!(usdt_balance.currency.code.as_str(), "USDT");
1109 assert!((usdt_balance.total.as_f64() - 9647.75537647).abs() < 1e-6);
1110 assert!((usdt_balance.free.as_f64() - 9519.89806037).abs() < 1e-6);
1111 assert!((usdt_balance.locked.as_f64() - 127.8573161).abs() < 1e-6);
1112
1113 assert_eq!(state.ts_event, ts_event);
1114 assert_eq!(state.ts_init, TS);
1115 }
1116}