nautilus_bybit/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsing helpers for Bybit WebSocket payloads.
17
18use std::convert::TryFrom;
19
20use anyhow::Context;
21use nautilus_core::{nanos::UnixNanos, uuid::UUID4};
22use nautilus_model::{
23    data::{Bar, BarType, BookOrder, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick},
24    enums::{
25        AccountType, AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType,
26        PositionSideSpecified, RecordFlag, TimeInForce,
27    },
28    events::account::state::AccountState,
29    identifiers::{AccountId, ClientOrderId, TradeId, VenueOrderId},
30    instruments::{Instrument, any::InstrumentAny},
31    reports::{FillReport, OrderStatusReport, PositionStatusReport},
32    types::{AccountBalance, Currency, Money, Price, Quantity},
33};
34use rust_decimal::Decimal;
35
36use super::messages::{
37    BybitWsAccountExecution, BybitWsAccountOrder, BybitWsAccountPosition, BybitWsAccountWallet,
38    BybitWsKline, BybitWsOrderbookDepthMsg, BybitWsTickerLinearMsg, BybitWsTickerOptionMsg,
39    BybitWsTrade,
40};
41use crate::common::{
42    enums::{BybitOrderStatus, BybitOrderType, BybitTimeInForce},
43    parse::{parse_millis_timestamp, parse_price_with_precision, parse_quantity_with_precision},
44};
45
46/// Parses a Bybit WebSocket topic string into its components.
47///
48/// # Errors
49///
50/// Returns an error if the topic format is invalid.
51pub fn parse_topic(topic: &str) -> anyhow::Result<Vec<&str>> {
52    let parts: Vec<&str> = topic.split('.').collect();
53    if parts.is_empty() {
54        anyhow::bail!("Invalid topic format: empty topic");
55    }
56    Ok(parts)
57}
58
59/// Parses a Bybit kline topic into (interval, symbol).
60///
61/// Topic format: "kline.{interval}.{symbol}" (e.g., "kline.5.BTCUSDT")
62///
63/// # Errors
64///
65/// Returns an error if the topic format is invalid.
66pub fn parse_kline_topic(topic: &str) -> anyhow::Result<(&str, &str)> {
67    let parts = parse_topic(topic)?;
68    if parts.len() != 3 || parts[0] != "kline" {
69        anyhow::bail!(
70            "Invalid kline topic format: expected 'kline.{{interval}}.{{symbol}}', got '{topic}'"
71        );
72    }
73    Ok((parts[1], parts[2]))
74}
75
76/// Parses a WebSocket trade frame into a [`TradeTick`].
77pub fn parse_ws_trade_tick(
78    trade: &BybitWsTrade,
79    instrument: &InstrumentAny,
80    ts_init: UnixNanos,
81) -> anyhow::Result<TradeTick> {
82    let price = parse_price_with_precision(&trade.p, instrument.price_precision(), "trade.p")?;
83    let size = parse_quantity_with_precision(&trade.v, instrument.size_precision(), "trade.v")?;
84    let aggressor: AggressorSide = trade.taker_side.into();
85    let trade_id = TradeId::new_checked(trade.i.as_str())
86        .context("invalid trade identifier in Bybit trade message")?;
87    let ts_event = parse_millis_i64(trade.t, "trade.T")?;
88
89    TradeTick::new_checked(
90        instrument.id(),
91        price,
92        size,
93        aggressor,
94        trade_id,
95        ts_event,
96        ts_init,
97    )
98    .context("failed to construct TradeTick from Bybit trade message")
99}
100
101/// Parses an order book depth message into [`OrderBookDeltas`].
102pub fn parse_orderbook_deltas(
103    msg: &BybitWsOrderbookDepthMsg,
104    instrument: &InstrumentAny,
105    ts_init: UnixNanos,
106) -> anyhow::Result<OrderBookDeltas> {
107    let is_snapshot = msg.msg_type.eq_ignore_ascii_case("snapshot");
108    let ts_event = parse_millis_i64(msg.ts, "orderbook.ts")?;
109    let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
110
111    let depth = &msg.data;
112    let instrument_id = instrument.id();
113    let price_precision = instrument.price_precision();
114    let size_precision = instrument.size_precision();
115    let update_id = u64::try_from(depth.u)
116        .context("received negative update id in Bybit order book message")?;
117    let sequence = u64::try_from(depth.seq)
118        .context("received negative sequence in Bybit order book message")?;
119
120    let mut deltas = Vec::new();
121
122    if is_snapshot {
123        deltas.push(OrderBookDelta::clear(
124            instrument_id,
125            sequence,
126            ts_event,
127            ts_init,
128        ));
129    }
130
131    let total_levels = depth.b.len() + depth.a.len();
132    let mut processed = 0_usize;
133
134    let mut push_level = |values: &[String], side: OrderSide| -> anyhow::Result<()> {
135        let (price, size) = parse_book_level(values, price_precision, size_precision, "orderbook")?;
136        let action = if size.is_zero() {
137            BookAction::Delete
138        } else if is_snapshot {
139            BookAction::Add
140        } else {
141            BookAction::Update
142        };
143
144        processed += 1;
145        let mut flags = RecordFlag::F_MBP as u8;
146        if processed == total_levels {
147            flags |= RecordFlag::F_LAST as u8;
148        }
149
150        let order = BookOrder::new(side, price, size, update_id);
151        let delta = OrderBookDelta::new_checked(
152            instrument_id,
153            action,
154            order,
155            flags,
156            sequence,
157            ts_event,
158            ts_init,
159        )
160        .context("failed to construct OrderBookDelta from Bybit book level")?;
161        deltas.push(delta);
162        Ok(())
163    };
164
165    for level in &depth.b {
166        push_level(level, OrderSide::Buy)?;
167    }
168    for level in &depth.a {
169        push_level(level, OrderSide::Sell)?;
170    }
171
172    if total_levels == 0
173        && let Some(last) = deltas.last_mut()
174    {
175        last.flags |= RecordFlag::F_LAST as u8;
176    }
177
178    OrderBookDeltas::new_checked(instrument_id, deltas)
179        .context("failed to assemble OrderBookDeltas from Bybit message")
180}
181
182/// Parses an order book snapshot or delta into a [`QuoteTick`].
183pub fn parse_orderbook_quote(
184    msg: &BybitWsOrderbookDepthMsg,
185    instrument: &InstrumentAny,
186    last_quote: Option<&QuoteTick>,
187    ts_init: UnixNanos,
188) -> anyhow::Result<QuoteTick> {
189    let ts_event = parse_millis_i64(msg.ts, "orderbook.ts")?;
190    let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
191    let price_precision = instrument.price_precision();
192    let size_precision = instrument.size_precision();
193
194    let get_best =
195        |levels: &[Vec<String>], label: &str| -> anyhow::Result<Option<(Price, Quantity)>> {
196            if let Some(values) = levels.first() {
197                parse_book_level(values, price_precision, size_precision, label).map(Some)
198            } else {
199                Ok(None)
200            }
201        };
202
203    let bids = get_best(&msg.data.b, "bid")?;
204    let asks = get_best(&msg.data.a, "ask")?;
205
206    let (bid_price, bid_size) = match (bids, last_quote) {
207        (Some(level), _) => level,
208        (None, Some(prev)) => (prev.bid_price, prev.bid_size),
209        (None, None) => {
210            anyhow::bail!(
211                "Bybit order book update missing bid levels and no previous quote provided"
212            );
213        }
214    };
215
216    let (ask_price, ask_size) = match (asks, last_quote) {
217        (Some(level), _) => level,
218        (None, Some(prev)) => (prev.ask_price, prev.ask_size),
219        (None, None) => {
220            anyhow::bail!(
221                "Bybit order book update missing ask levels and no previous quote provided"
222            );
223        }
224    };
225
226    QuoteTick::new_checked(
227        instrument.id(),
228        bid_price,
229        ask_price,
230        bid_size,
231        ask_size,
232        ts_event,
233        ts_init,
234    )
235    .context("failed to construct QuoteTick from Bybit order book message")
236}
237
238/// Parses a linear or inverse ticker payload into a [`QuoteTick`].
239pub fn parse_ticker_linear_quote(
240    msg: &BybitWsTickerLinearMsg,
241    instrument: &InstrumentAny,
242    ts_init: UnixNanos,
243) -> anyhow::Result<QuoteTick> {
244    let ts_event = parse_millis_i64(msg.ts, "ticker.ts")?;
245    let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
246    let price_precision = instrument.price_precision();
247    let size_precision = instrument.size_precision();
248
249    let data = &msg.data;
250    let bid_price = data
251        .bid1_price
252        .as_ref()
253        .context("Bybit ticker message missing bid1Price")?
254        .as_str();
255    let ask_price = data
256        .ask1_price
257        .as_ref()
258        .context("Bybit ticker message missing ask1Price")?
259        .as_str();
260
261    let bid_price = parse_price_with_precision(bid_price, price_precision, "ticker.bid1Price")?;
262    let ask_price = parse_price_with_precision(ask_price, price_precision, "ticker.ask1Price")?;
263
264    let bid_size_str = data.bid1_size.as_deref().unwrap_or("0");
265    let ask_size_str = data.ask1_size.as_deref().unwrap_or("0");
266
267    let bid_size = parse_quantity_with_precision(bid_size_str, size_precision, "ticker.bid1Size")?;
268    let ask_size = parse_quantity_with_precision(ask_size_str, size_precision, "ticker.ask1Size")?;
269
270    QuoteTick::new_checked(
271        instrument.id(),
272        bid_price,
273        ask_price,
274        bid_size,
275        ask_size,
276        ts_event,
277        ts_init,
278    )
279    .context("failed to construct QuoteTick from Bybit linear ticker message")
280}
281
282/// Parses an option ticker payload into a [`QuoteTick`].
283pub fn parse_ticker_option_quote(
284    msg: &BybitWsTickerOptionMsg,
285    instrument: &InstrumentAny,
286    ts_init: UnixNanos,
287) -> anyhow::Result<QuoteTick> {
288    let ts_event = parse_millis_i64(msg.ts, "ticker.ts")?;
289    let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
290    let price_precision = instrument.price_precision();
291    let size_precision = instrument.size_precision();
292
293    let data = &msg.data;
294    let bid_price =
295        parse_price_with_precision(&data.bid_price, price_precision, "ticker.bidPrice")?;
296    let ask_price =
297        parse_price_with_precision(&data.ask_price, price_precision, "ticker.askPrice")?;
298    let bid_size = parse_quantity_with_precision(&data.bid_size, size_precision, "ticker.bidSize")?;
299    let ask_size = parse_quantity_with_precision(&data.ask_size, size_precision, "ticker.askSize")?;
300
301    QuoteTick::new_checked(
302        instrument.id(),
303        bid_price,
304        ask_price,
305        bid_size,
306        ask_size,
307        ts_event,
308        ts_init,
309    )
310    .context("failed to construct QuoteTick from Bybit option ticker message")
311}
312
313pub(crate) fn parse_millis_i64(value: i64, field: &str) -> anyhow::Result<UnixNanos> {
314    if value < 0 {
315        Err(anyhow::anyhow!("{field} must be non-negative, was {value}"))
316    } else {
317        parse_millis_timestamp(&value.to_string(), field)
318    }
319}
320
321fn parse_book_level(
322    level: &[String],
323    price_precision: u8,
324    size_precision: u8,
325    label: &str,
326) -> anyhow::Result<(Price, Quantity)> {
327    let price_str = level
328        .first()
329        .ok_or_else(|| anyhow::anyhow!("missing price component in {label} level"))?;
330    let size_str = level
331        .get(1)
332        .ok_or_else(|| anyhow::anyhow!("missing size component in {label} level"))?;
333    let price = parse_price_with_precision(price_str, price_precision, label)?;
334    let size = parse_quantity_with_precision(size_str, size_precision, label)?;
335    Ok((price, size))
336}
337
338/// Parses a WebSocket kline payload into a [`Bar`].
339///
340/// # Errors
341///
342/// Returns an error if price or volume fields cannot be parsed or if the bar cannot be constructed.
343pub fn parse_ws_kline_bar(
344    kline: &BybitWsKline,
345    instrument: &InstrumentAny,
346    bar_type: BarType,
347    timestamp_on_close: bool,
348    ts_init: UnixNanos,
349) -> anyhow::Result<Bar> {
350    let price_precision = instrument.price_precision();
351    let size_precision = instrument.size_precision();
352
353    let open = parse_price_with_precision(&kline.open, price_precision, "kline.open")?;
354    let high = parse_price_with_precision(&kline.high, price_precision, "kline.high")?;
355    let low = parse_price_with_precision(&kline.low, price_precision, "kline.low")?;
356    let close = parse_price_with_precision(&kline.close, price_precision, "kline.close")?;
357    let volume = parse_quantity_with_precision(&kline.volume, size_precision, "kline.volume")?;
358
359    let mut ts_event = parse_millis_i64(kline.start, "kline.start")?;
360    if timestamp_on_close {
361        let interval_ns = bar_type
362            .spec()
363            .timedelta()
364            .num_nanoseconds()
365            .context("bar specification produced non-integer interval")?;
366        let interval_ns = u64::try_from(interval_ns)
367            .context("bar interval overflowed the u64 range for nanoseconds")?;
368        let updated = ts_event
369            .as_u64()
370            .checked_add(interval_ns)
371            .context("bar timestamp overflowed when adjusting to close time")?;
372        ts_event = UnixNanos::from(updated);
373    }
374    let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
375
376    Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
377        .context("failed to construct Bar from Bybit WebSocket kline")
378}
379
380/// Parses a WebSocket account order payload into an [`OrderStatusReport`].
381///
382/// # Errors
383///
384/// Returns an error if price or quantity fields cannot be parsed or timestamps are invalid.
385pub fn parse_ws_order_status_report(
386    order: &BybitWsAccountOrder,
387    instrument: &InstrumentAny,
388    account_id: AccountId,
389    ts_init: UnixNanos,
390) -> anyhow::Result<OrderStatusReport> {
391    let instrument_id = instrument.id();
392    let venue_order_id = VenueOrderId::new(order.order_id.as_str());
393    let order_side: OrderSide = order.side.into();
394
395    let order_type: OrderType = match order.order_type {
396        BybitOrderType::Market => OrderType::Market,
397        BybitOrderType::Limit => OrderType::Limit,
398        BybitOrderType::Unknown => OrderType::Limit,
399    };
400
401    let time_in_force: TimeInForce = match order.time_in_force {
402        BybitTimeInForce::Gtc => TimeInForce::Gtc,
403        BybitTimeInForce::Ioc => TimeInForce::Ioc,
404        BybitTimeInForce::Fok => TimeInForce::Fok,
405        BybitTimeInForce::PostOnly => TimeInForce::Gtc,
406    };
407
408    let quantity =
409        parse_quantity_with_precision(&order.qty, instrument.size_precision(), "order.qty")?;
410
411    let filled_qty = parse_quantity_with_precision(
412        &order.cum_exec_qty,
413        instrument.size_precision(),
414        "order.cumExecQty",
415    )?;
416
417    // Map Bybit order status to Nautilus order status
418    // Special case: if Bybit reports "Rejected" but the order has fills, treat it as Canceled.
419    // This handles the case where the exchange partially fills an order then rejects the
420    // remaining quantity (e.g., due to margin, risk limits, or liquidity constraints).
421    // The state machine does not allow PARTIALLY_FILLED -> REJECTED transitions.
422    let order_status: OrderStatus = match order.order_status {
423        BybitOrderStatus::Created | BybitOrderStatus::New | BybitOrderStatus::Untriggered => {
424            OrderStatus::Accepted
425        }
426        BybitOrderStatus::Rejected => {
427            if filled_qty.is_positive() {
428                OrderStatus::Canceled
429            } else {
430                OrderStatus::Rejected
431            }
432        }
433        BybitOrderStatus::PartiallyFilled => OrderStatus::PartiallyFilled,
434        BybitOrderStatus::Filled => OrderStatus::Filled,
435        BybitOrderStatus::Canceled | BybitOrderStatus::PartiallyFilledCanceled => {
436            OrderStatus::Canceled
437        }
438        BybitOrderStatus::Triggered => OrderStatus::Triggered,
439        BybitOrderStatus::Deactivated => OrderStatus::Canceled,
440    };
441
442    let ts_accepted = parse_millis_timestamp(&order.created_time, "order.createdTime")?;
443    let ts_last = parse_millis_timestamp(&order.updated_time, "order.updatedTime")?;
444
445    let mut report = OrderStatusReport::new(
446        account_id,
447        instrument_id,
448        None,
449        venue_order_id,
450        order_side,
451        order_type,
452        time_in_force,
453        order_status,
454        quantity,
455        filled_qty,
456        ts_accepted,
457        ts_last,
458        ts_init,
459        Some(UUID4::new()),
460    );
461
462    if !order.order_link_id.is_empty() {
463        report = report.with_client_order_id(ClientOrderId::new(order.order_link_id.as_str()));
464    }
465
466    if !order.price.is_empty() && order.price != "0" {
467        let price =
468            parse_price_with_precision(&order.price, instrument.price_precision(), "order.price")?;
469        report = report.with_price(price);
470    }
471
472    if !order.avg_price.is_empty() && order.avg_price != "0" {
473        let avg_px = order
474            .avg_price
475            .parse::<f64>()
476            .with_context(|| format!("Failed to parse avg_price='{}' as f64", order.avg_price))?;
477        report = report.with_avg_px(avg_px);
478    }
479
480    if !order.trigger_price.is_empty() && order.trigger_price != "0" {
481        let trigger_price = parse_price_with_precision(
482            &order.trigger_price,
483            instrument.price_precision(),
484            "order.triggerPrice",
485        )?;
486        report = report.with_trigger_price(trigger_price);
487    }
488
489    if order.reduce_only {
490        report = report.with_reduce_only(true);
491    }
492
493    if order.time_in_force == BybitTimeInForce::PostOnly {
494        report = report.with_post_only(true);
495    }
496
497    if !order.reject_reason.is_empty() {
498        report = report.with_cancel_reason(order.reject_reason.to_string());
499    }
500
501    Ok(report)
502}
503
504/// Parses a WebSocket account execution payload into a [`FillReport`].
505///
506/// # Errors
507///
508/// Returns an error if price or quantity fields cannot be parsed or timestamps are invalid.
509pub fn parse_ws_fill_report(
510    execution: &BybitWsAccountExecution,
511    account_id: AccountId,
512    instrument: &InstrumentAny,
513    ts_init: UnixNanos,
514) -> anyhow::Result<FillReport> {
515    let instrument_id = instrument.id();
516    let venue_order_id = VenueOrderId::new(execution.order_id.as_str());
517    let trade_id = TradeId::new_checked(execution.exec_id.as_str())
518        .context("invalid execId in Bybit WebSocket execution payload")?;
519
520    let order_side: OrderSide = execution.side.into();
521    let last_qty = parse_quantity_with_precision(
522        &execution.exec_qty,
523        instrument.size_precision(),
524        "execution.execQty",
525    )?;
526    let last_px = parse_price_with_precision(
527        &execution.exec_price,
528        instrument.price_precision(),
529        "execution.execPrice",
530    )?;
531
532    let liquidity_side = if execution.is_maker {
533        LiquiditySide::Maker
534    } else {
535        LiquiditySide::Taker
536    };
537
538    let commission_str = execution.exec_fee.trim_start_matches('-');
539    let commission_amount = commission_str
540        .parse::<f64>()
541        .with_context(|| format!("Failed to parse execFee='{}' as f64", execution.exec_fee))?
542        .abs();
543
544    // Use instrument quote currency for commission
545    let commission_currency = instrument.quote_currency();
546    let commission = Money::new(commission_amount, commission_currency);
547    let ts_event = parse_millis_timestamp(&execution.exec_time, "execution.execTime")?;
548
549    let client_order_id = if !execution.order_link_id.is_empty() {
550        Some(ClientOrderId::new(execution.order_link_id.as_str()))
551    } else {
552        None
553    };
554
555    Ok(FillReport::new(
556        account_id,
557        instrument_id,
558        venue_order_id,
559        trade_id,
560        order_side,
561        last_qty,
562        last_px,
563        commission,
564        liquidity_side,
565        client_order_id,
566        None, // venue_position_id
567        ts_event,
568        ts_init,
569        None, // report_id
570    ))
571}
572
573/// Parses a WebSocket account position payload into a [`PositionStatusReport`].
574///
575/// # Errors
576///
577/// Returns an error if position size or prices cannot be parsed.
578pub fn parse_ws_position_status_report(
579    position: &BybitWsAccountPosition,
580    account_id: AccountId,
581    instrument: &InstrumentAny,
582    ts_init: UnixNanos,
583) -> anyhow::Result<PositionStatusReport> {
584    let instrument_id = instrument.id();
585
586    // Parse absolute size as unsigned Quantity
587    let quantity = parse_quantity_with_precision(
588        &position.size,
589        instrument.size_precision(),
590        "position.size",
591    )?;
592
593    // Derive position side from the side field
594    let position_side = if position.side.eq_ignore_ascii_case("buy") {
595        PositionSideSpecified::Long
596    } else if position.side.eq_ignore_ascii_case("sell") {
597        PositionSideSpecified::Short
598    } else {
599        PositionSideSpecified::Flat
600    };
601
602    let avg_px_open = if let Some(ref avg_price) = position.avg_price {
603        if !avg_price.is_empty() && avg_price != "0" {
604            avg_price
605                .parse::<f64>()
606                .with_context(|| format!("Failed to parse avgPrice='{}' as f64", avg_price))?
607        } else {
608            0.0
609        }
610    } else {
611        0.0
612    };
613
614    let _unrealized_pnl = position.unrealised_pnl.parse::<f64>().with_context(|| {
615        format!(
616            "Failed to parse unrealisedPnl='{}' as f64",
617            position.unrealised_pnl
618        )
619    })?;
620
621    let _realized_pnl = position.cum_realised_pnl.parse::<f64>().with_context(|| {
622        format!(
623            "Failed to parse cumRealisedPnl='{}' as f64",
624            position.cum_realised_pnl
625        )
626    })?;
627
628    let ts_last = parse_millis_timestamp(&position.updated_time, "position.updatedTime")?;
629
630    let avg_px_open_decimal = if avg_px_open != 0.0 {
631        Some(Decimal::try_from(avg_px_open).context("Failed to convert avg_px_open to Decimal")?)
632    } else {
633        None
634    };
635
636    Ok(PositionStatusReport::new(
637        account_id,
638        instrument_id,
639        position_side,
640        quantity,
641        ts_last,
642        ts_init,
643        None, // report_id
644        None, // venue_position_id
645        avg_px_open_decimal,
646    ))
647}
648
649/// Parses a WebSocket account wallet payload into an [`AccountState`].
650///
651/// # Errors
652///
653/// Returns an error if balance fields cannot be parsed.
654pub fn parse_ws_account_state(
655    wallet: &BybitWsAccountWallet,
656    account_id: AccountId,
657    ts_event: UnixNanos,
658    ts_init: UnixNanos,
659) -> anyhow::Result<AccountState> {
660    let mut balances = Vec::new();
661
662    for coin_data in &wallet.coin {
663        let currency = Currency::from(coin_data.coin.as_str());
664
665        let total_amount = coin_data.wallet_balance.parse::<f64>().with_context(|| {
666            format!(
667                "Failed to parse walletBalance='{}' as f64",
668                coin_data.wallet_balance
669            )
670        })?;
671
672        let free_amount = if coin_data.available_to_withdraw.is_empty() {
673            0.0
674        } else {
675            coin_data
676                .available_to_withdraw
677                .parse::<f64>()
678                .with_context(|| {
679                    format!(
680                        "Failed to parse availableToWithdraw='{}' as f64",
681                        coin_data.available_to_withdraw
682                    )
683                })?
684        };
685
686        let locked_amount = total_amount - free_amount;
687
688        let total = Money::new(total_amount, currency);
689        let locked = Money::new(locked_amount, currency);
690        let free = Money::new(free_amount, currency);
691
692        let balance = AccountBalance::new_checked(total, locked, free)
693            .context("Failed to create AccountBalance from wallet data")?;
694        balances.push(balance);
695    }
696
697    Ok(AccountState::new(
698        account_id,
699        AccountType::Margin, // Bybit unified account
700        balances,
701        vec![], // margins - Bybit doesn't provide per-instrument margin in wallet updates
702        true,   // is_reported
703        UUID4::new(),
704        ts_event,
705        ts_init,
706        None, // base_currency
707    ))
708}
709
710////////////////////////////////////////////////////////////////////////////////
711// Tests
712////////////////////////////////////////////////////////////////////////////////
713
714#[cfg(test)]
715mod tests {
716    use nautilus_model::{
717        data::BarSpecification,
718        enums::{AggregationSource, BarAggregation, PositionSide, PriceType},
719    };
720    use rstest::rstest;
721
722    use super::*;
723    use crate::{
724        common::{
725            parse::{parse_linear_instrument, parse_option_instrument},
726            testing::load_test_json,
727        },
728        http::models::{BybitInstrumentLinearResponse, BybitInstrumentOptionResponse},
729        websocket::messages::{
730            BybitWsOrderbookDepthMsg, BybitWsTickerLinearMsg, BybitWsTickerOptionMsg,
731            BybitWsTradeMsg,
732        },
733    };
734
735    const TS: UnixNanos = UnixNanos::new(1_700_000_000_000_000_000);
736
737    use ustr::Ustr;
738
739    use crate::http::models::BybitFeeRate;
740
741    fn sample_fee_rate(
742        symbol: &str,
743        taker: &str,
744        maker: &str,
745        base_coin: Option<&str>,
746    ) -> BybitFeeRate {
747        BybitFeeRate {
748            symbol: Ustr::from(symbol),
749            taker_fee_rate: taker.to_string(),
750            maker_fee_rate: maker.to_string(),
751            base_coin: base_coin.map(Ustr::from),
752        }
753    }
754
755    fn linear_instrument() -> InstrumentAny {
756        let json = load_test_json("http_get_instruments_linear.json");
757        let response: BybitInstrumentLinearResponse = serde_json::from_str(&json).unwrap();
758        let instrument = &response.result.list[0];
759        let fee_rate = sample_fee_rate("BTCUSDT", "0.00055", "0.0001", Some("BTC"));
760        parse_linear_instrument(instrument, &fee_rate, TS, TS).unwrap()
761    }
762
763    fn option_instrument() -> InstrumentAny {
764        let json = load_test_json("http_get_instruments_option.json");
765        let response: BybitInstrumentOptionResponse = serde_json::from_str(&json).unwrap();
766        let instrument = &response.result.list[0];
767        parse_option_instrument(instrument, TS, TS).unwrap()
768    }
769
770    #[rstest]
771    fn parse_ws_trade_into_trade_tick() {
772        let instrument = linear_instrument();
773        let json = load_test_json("ws_public_trade.json");
774        let msg: BybitWsTradeMsg = serde_json::from_str(&json).unwrap();
775        let trade = &msg.data[0];
776
777        let tick = parse_ws_trade_tick(trade, &instrument, TS).unwrap();
778
779        assert_eq!(tick.instrument_id, instrument.id());
780        assert_eq!(tick.price, instrument.make_price(27451.00));
781        assert_eq!(tick.size, instrument.make_qty(0.010, None));
782        assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
783        assert_eq!(
784            tick.trade_id.to_string(),
785            "9dc75fca-4bdd-4773-9f78-6f5d7ab2a110"
786        );
787        assert_eq!(tick.ts_event, UnixNanos::new(1_709_891_679_000_000_000));
788    }
789
790    #[rstest]
791    fn parse_orderbook_snapshot_into_deltas() {
792        let instrument = linear_instrument();
793        let json = load_test_json("ws_orderbook_snapshot.json");
794        let msg: BybitWsOrderbookDepthMsg = serde_json::from_str(&json).unwrap();
795
796        let deltas = parse_orderbook_deltas(&msg, &instrument, TS).unwrap();
797
798        assert_eq!(deltas.instrument_id, instrument.id());
799        assert_eq!(deltas.deltas.len(), 5);
800        assert_eq!(deltas.deltas[0].action, BookAction::Clear);
801        assert_eq!(
802            deltas.deltas[1].order.price,
803            instrument.make_price(27450.00)
804        );
805        assert_eq!(
806            deltas.deltas[1].order.size,
807            instrument.make_qty(0.500, None)
808        );
809        let last = deltas.deltas.last().unwrap();
810        assert_eq!(last.order.side, OrderSide::Sell);
811        assert_eq!(last.order.price, instrument.make_price(27451.50));
812        assert_eq!(
813            last.flags & RecordFlag::F_LAST as u8,
814            RecordFlag::F_LAST as u8
815        );
816    }
817
818    #[rstest]
819    fn parse_orderbook_delta_marks_actions() {
820        let instrument = linear_instrument();
821        let json = load_test_json("ws_orderbook_delta.json");
822        let msg: BybitWsOrderbookDepthMsg = serde_json::from_str(&json).unwrap();
823
824        let deltas = parse_orderbook_deltas(&msg, &instrument, TS).unwrap();
825
826        assert_eq!(deltas.deltas.len(), 2);
827        let bid = &deltas.deltas[0];
828        assert_eq!(bid.action, BookAction::Update);
829        assert_eq!(bid.order.side, OrderSide::Buy);
830        assert_eq!(bid.order.size, instrument.make_qty(0.400, None));
831
832        let ask = &deltas.deltas[1];
833        assert_eq!(ask.action, BookAction::Delete);
834        assert_eq!(ask.order.side, OrderSide::Sell);
835        assert_eq!(ask.order.size, instrument.make_qty(0.0, None));
836        assert_eq!(
837            ask.flags & RecordFlag::F_LAST as u8,
838            RecordFlag::F_LAST as u8
839        );
840    }
841
842    #[rstest]
843    fn parse_orderbook_quote_produces_top_of_book() {
844        let instrument = linear_instrument();
845        let json = load_test_json("ws_orderbook_snapshot.json");
846        let msg: BybitWsOrderbookDepthMsg = serde_json::from_str(&json).unwrap();
847
848        let quote = parse_orderbook_quote(&msg, &instrument, None, TS).unwrap();
849
850        assert_eq!(quote.instrument_id, instrument.id());
851        assert_eq!(quote.bid_price, instrument.make_price(27450.00));
852        assert_eq!(quote.bid_size, instrument.make_qty(0.500, None));
853        assert_eq!(quote.ask_price, instrument.make_price(27451.00));
854        assert_eq!(quote.ask_size, instrument.make_qty(0.750, None));
855    }
856
857    #[rstest]
858    fn parse_orderbook_quote_with_delta_updates_sizes() {
859        let instrument = linear_instrument();
860        let snapshot: BybitWsOrderbookDepthMsg =
861            serde_json::from_str(&load_test_json("ws_orderbook_snapshot.json")).unwrap();
862        let base_quote = parse_orderbook_quote(&snapshot, &instrument, None, TS).unwrap();
863
864        let delta: BybitWsOrderbookDepthMsg =
865            serde_json::from_str(&load_test_json("ws_orderbook_delta.json")).unwrap();
866        let updated = parse_orderbook_quote(&delta, &instrument, Some(&base_quote), TS).unwrap();
867
868        assert_eq!(updated.bid_price, instrument.make_price(27450.00));
869        assert_eq!(updated.bid_size, instrument.make_qty(0.400, None));
870        assert_eq!(updated.ask_price, instrument.make_price(27451.00));
871        assert_eq!(updated.ask_size, instrument.make_qty(0.0, None));
872    }
873
874    #[rstest]
875    fn parse_linear_ticker_quote_to_quote_tick() {
876        let instrument = linear_instrument();
877        let json = load_test_json("ws_ticker_linear.json");
878        let msg: BybitWsTickerLinearMsg = serde_json::from_str(&json).unwrap();
879
880        let quote = parse_ticker_linear_quote(&msg, &instrument, TS).unwrap();
881
882        assert_eq!(quote.instrument_id, instrument.id());
883        assert_eq!(quote.bid_price, instrument.make_price(17215.50));
884        assert_eq!(quote.ask_price, instrument.make_price(17216.00));
885        assert_eq!(quote.bid_size, instrument.make_qty(84.489, None));
886        assert_eq!(quote.ask_size, instrument.make_qty(83.020, None));
887        assert_eq!(quote.ts_event, UnixNanos::new(1_673_272_861_686_000_000));
888        assert_eq!(quote.ts_init, TS);
889    }
890
891    #[rstest]
892    fn parse_option_ticker_quote_to_quote_tick() {
893        let instrument = option_instrument();
894        let json = load_test_json("ws_ticker_option.json");
895        let msg: BybitWsTickerOptionMsg = serde_json::from_str(&json).unwrap();
896
897        let quote = parse_ticker_option_quote(&msg, &instrument, TS).unwrap();
898
899        assert_eq!(quote.instrument_id, instrument.id());
900        assert_eq!(quote.bid_price, instrument.make_price(0.0));
901        assert_eq!(quote.ask_price, instrument.make_price(10.0));
902        assert_eq!(quote.bid_size, instrument.make_qty(0.0, None));
903        assert_eq!(quote.ask_size, instrument.make_qty(5.1, None));
904        assert_eq!(quote.ts_event, UnixNanos::new(1_672_917_511_074_000_000));
905        assert_eq!(quote.ts_init, TS);
906    }
907
908    #[rstest]
909    fn parse_ws_kline_into_bar() {
910        use std::num::NonZero;
911
912        let instrument = linear_instrument();
913        let json = load_test_json("ws_kline.json");
914        let msg: crate::websocket::messages::BybitWsKlineMsg = serde_json::from_str(&json).unwrap();
915        let kline = &msg.data[0];
916
917        let bar_spec = BarSpecification {
918            step: NonZero::new(5).unwrap(),
919            aggregation: BarAggregation::Minute,
920            price_type: PriceType::Last,
921        };
922        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::External);
923
924        let bar = parse_ws_kline_bar(kline, &instrument, bar_type, false, TS).unwrap();
925
926        assert_eq!(bar.bar_type, bar_type);
927        assert_eq!(bar.open, instrument.make_price(16649.5));
928        assert_eq!(bar.high, instrument.make_price(16677.0));
929        assert_eq!(bar.low, instrument.make_price(16608.0));
930        assert_eq!(bar.close, instrument.make_price(16677.0));
931        assert_eq!(bar.volume, instrument.make_qty(2.081, None));
932        assert_eq!(bar.ts_event, UnixNanos::new(1_672_324_800_000_000_000));
933        assert_eq!(bar.ts_init, TS);
934    }
935
936    #[rstest]
937    fn parse_ws_order_into_order_status_report() {
938        let instrument = linear_instrument();
939        let json = load_test_json("ws_account_order_filled.json");
940        let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
941            serde_json::from_str(&json).unwrap();
942        let order = &msg.data[0];
943        let account_id = AccountId::new("BYBIT-001");
944
945        let report = parse_ws_order_status_report(order, &instrument, account_id, TS).unwrap();
946
947        assert_eq!(report.account_id, account_id);
948        assert_eq!(report.instrument_id, instrument.id());
949        assert_eq!(report.order_side, OrderSide::Buy);
950        assert_eq!(report.order_type, OrderType::Limit);
951        assert_eq!(report.time_in_force, TimeInForce::Gtc);
952        assert_eq!(report.order_status, OrderStatus::Filled);
953        assert_eq!(report.quantity, instrument.make_qty(0.100, None));
954        assert_eq!(report.filled_qty, instrument.make_qty(0.100, None));
955        assert_eq!(report.price, Some(instrument.make_price(30000.50)));
956        assert_eq!(report.avg_px, Some(30000.50));
957        assert_eq!(
958            report.client_order_id.as_ref().unwrap().to_string(),
959            "test-client-order-001"
960        );
961        assert_eq!(
962            report.ts_accepted,
963            UnixNanos::new(1_672_364_262_444_000_000)
964        );
965        assert_eq!(report.ts_last, UnixNanos::new(1_672_364_262_457_000_000));
966    }
967
968    #[rstest]
969    fn parse_ws_order_partially_filled_rejected_maps_to_canceled() {
970        let instrument = linear_instrument();
971        let json = load_test_json("ws_account_order_partially_filled_rejected.json");
972        let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
973            serde_json::from_str(&json).unwrap();
974        let order = &msg.data[0];
975        let account_id = AccountId::new("BYBIT-001");
976
977        let report = parse_ws_order_status_report(order, &instrument, account_id, TS).unwrap();
978
979        // Verify that Bybit "Rejected" status with fills is mapped to Canceled, not Rejected
980        assert_eq!(report.order_status, OrderStatus::Canceled);
981        assert_eq!(report.filled_qty, instrument.make_qty(50.0, None));
982        assert_eq!(
983            report.client_order_id.as_ref().unwrap().to_string(),
984            "O-20251001-164609-APEX-000-49"
985        );
986        assert_eq!(report.cancel_reason, Some("UNKNOWN".to_string()));
987    }
988
989    #[rstest]
990    fn parse_ws_execution_into_fill_report() {
991        let instrument = linear_instrument();
992        let json = load_test_json("ws_account_execution.json");
993        let msg: crate::websocket::messages::BybitWsAccountExecutionMsg =
994            serde_json::from_str(&json).unwrap();
995        let execution = &msg.data[0];
996        let account_id = AccountId::new("BYBIT-001");
997
998        let report = parse_ws_fill_report(execution, account_id, &instrument, TS).unwrap();
999
1000        assert_eq!(report.account_id, account_id);
1001        assert_eq!(report.instrument_id, instrument.id());
1002        assert_eq!(
1003            report.venue_order_id.to_string(),
1004            "9aac161b-8ed6-450d-9cab-c5cc67c21784"
1005        );
1006        assert_eq!(
1007            report.trade_id.to_string(),
1008            "0ab1bdf7-4219-438b-b30a-32ec863018f7"
1009        );
1010        assert_eq!(report.order_side, OrderSide::Sell);
1011        assert_eq!(report.last_qty, instrument.make_qty(0.5, None));
1012        assert_eq!(report.last_px, instrument.make_price(95900.1));
1013        assert_eq!(report.commission.as_f64(), 26.3725275);
1014        assert_eq!(report.liquidity_side, LiquiditySide::Taker);
1015        assert_eq!(
1016            report.client_order_id.as_ref().unwrap().to_string(),
1017            "test-order-link-001"
1018        );
1019        assert_eq!(report.ts_event, UnixNanos::new(1_746_270_400_353_000_000));
1020    }
1021
1022    #[rstest]
1023    fn parse_ws_position_into_position_status_report() {
1024        let instrument = linear_instrument();
1025        let json = load_test_json("ws_account_position.json");
1026        let msg: crate::websocket::messages::BybitWsAccountPositionMsg =
1027            serde_json::from_str(&json).unwrap();
1028        let position = &msg.data[0];
1029        let account_id = AccountId::new("BYBIT-001");
1030
1031        let report =
1032            parse_ws_position_status_report(position, account_id, &instrument, TS).unwrap();
1033
1034        assert_eq!(report.account_id, account_id);
1035        assert_eq!(report.instrument_id, instrument.id());
1036        assert_eq!(report.position_side.as_position_side(), PositionSide::Long);
1037        assert_eq!(report.quantity, instrument.make_qty(0.15, None));
1038        assert_eq!(
1039            report.avg_px_open,
1040            Some(Decimal::try_from(28500.50).unwrap())
1041        );
1042        assert_eq!(report.ts_last, UnixNanos::new(1_697_682_317_038_000_000));
1043        assert_eq!(report.ts_init, TS);
1044    }
1045
1046    #[rstest]
1047    fn parse_ws_position_short_into_position_status_report() {
1048        // Create ETHUSDT instrument
1049        let instruments_json = load_test_json("http_get_instruments_linear.json");
1050        let instruments_response: crate::http::models::BybitInstrumentLinearResponse =
1051            serde_json::from_str(&instruments_json).unwrap();
1052        let eth_def = &instruments_response.result.list[1]; // ETHUSDT is second in the list
1053        let fee_rate = crate::http::models::BybitFeeRate {
1054            symbol: ustr::Ustr::from("ETHUSDT"),
1055            taker_fee_rate: "0.00055".to_string(),
1056            maker_fee_rate: "0.0001".to_string(),
1057            base_coin: Some(ustr::Ustr::from("ETH")),
1058        };
1059        let instrument =
1060            crate::common::parse::parse_linear_instrument(eth_def, &fee_rate, TS, TS).unwrap();
1061
1062        let json = load_test_json("ws_account_position_short.json");
1063        let msg: crate::websocket::messages::BybitWsAccountPositionMsg =
1064            serde_json::from_str(&json).unwrap();
1065        let position = &msg.data[0];
1066        let account_id = AccountId::new("BYBIT-001");
1067
1068        let report =
1069            parse_ws_position_status_report(position, account_id, &instrument, TS).unwrap();
1070
1071        assert_eq!(report.account_id, account_id);
1072        assert_eq!(report.instrument_id.symbol.as_str(), "ETHUSDT-LINEAR");
1073        assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
1074        assert_eq!(report.quantity, instrument.make_qty(2.5, None));
1075        assert_eq!(
1076            report.avg_px_open,
1077            Some(Decimal::try_from(2450.75).unwrap())
1078        );
1079        assert_eq!(report.ts_last, UnixNanos::new(1_697_682_417_038_000_000));
1080        assert_eq!(report.ts_init, TS);
1081    }
1082
1083    #[rstest]
1084    fn parse_ws_wallet_into_account_state() {
1085        let json = load_test_json("ws_account_wallet.json");
1086        let msg: crate::websocket::messages::BybitWsAccountWalletMsg =
1087            serde_json::from_str(&json).unwrap();
1088        let wallet = &msg.data[0];
1089        let account_id = AccountId::new("BYBIT-001");
1090        let ts_event = UnixNanos::new(1_700_034_722_104_000_000);
1091
1092        let state = parse_ws_account_state(wallet, account_id, ts_event, TS).unwrap();
1093
1094        assert_eq!(state.account_id, account_id);
1095        assert_eq!(state.account_type, AccountType::Margin);
1096        assert_eq!(state.balances.len(), 2);
1097        assert!(state.is_reported);
1098
1099        // Check BTC balance
1100        let btc_balance = &state.balances[0];
1101        assert_eq!(btc_balance.currency.code.as_str(), "BTC");
1102        assert!((btc_balance.total.as_f64() - 0.00102964).abs() < 1e-8);
1103        assert!((btc_balance.free.as_f64() - 0.00092964).abs() < 1e-8);
1104        assert!((btc_balance.locked.as_f64() - 0.0001).abs() < 1e-8);
1105
1106        // Check USDT balance
1107        let usdt_balance = &state.balances[1];
1108        assert_eq!(usdt_balance.currency.code.as_str(), "USDT");
1109        assert!((usdt_balance.total.as_f64() - 9647.75537647).abs() < 1e-6);
1110        assert!((usdt_balance.free.as_f64() - 9519.89806037).abs() < 1e-6);
1111        assert!((usdt_balance.locked.as_f64() - 127.8573161).abs() < 1e-6);
1112
1113        assert_eq!(state.ts_event, ts_event);
1114        assert_eq!(state.ts_init, TS);
1115    }
1116}