Skip to main content

nautilus_dydx/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsing utilities for dYdX WebSocket messages.
17//!
18//! Converts WebSocket-specific message formats into Nautilus domain types
19//! by transforming them into HTTP-equivalent structures and delegating to
20//! the HTTP parser for consistency.
21
22use std::str::FromStr;
23
24use anyhow::Context;
25use chrono::{DateTime, Utc};
26use dashmap::DashMap;
27use nautilus_core::UnixNanos;
28use nautilus_model::{
29    data::{Bar, BarType, BookOrder, Data, OrderBookDelta, OrderBookDeltas, TradeTick},
30    enums::{AggressorSide, BookAction, OrderSide, OrderStatus, RecordFlag},
31    identifiers::{AccountId, InstrumentId, TradeId},
32    instruments::{Instrument, InstrumentAny},
33    reports::{FillReport, OrderStatusReport, PositionStatusReport},
34    types::{Price, Quantity},
35};
36use rust_decimal::Decimal;
37
38use super::{DydxWsError, DydxWsResult};
39use crate::{
40    common::{
41        enums::{DydxOrderStatus, DydxTickerType},
42        instrument_cache::InstrumentCache,
43    },
44    execution::{encoder::ClientOrderIdEncoder, types::OrderContext},
45    http::{
46        models::{Fill, Order, PerpetualPosition},
47        parse::{parse_fill_report, parse_order_status_report, parse_position_status_report},
48    },
49    websocket::messages::{
50        DydxCandle, DydxOrderbookContents, DydxOrderbookSnapshotContents, DydxPerpetualPosition,
51        DydxTradeContents, DydxWsFillSubaccountMessageContents,
52        DydxWsOrderSubaccountMessageContents,
53    },
54};
55
56/// Parses a WebSocket order update into an OrderStatusReport.
57///
58/// Converts the WebSocket order format to the HTTP Order format, then delegates
59/// to the existing HTTP parser for consistency.
60///
61/// # Arguments
62///
63/// * `ws_order` - The WebSocket order message to parse
64/// * `instrument_cache` - Cache for looking up instruments by clob_pair_id
65/// * `order_contexts` - Map of dYdX u32 client IDs to order contexts
66/// * `encoder` - Bidirectional encoder for ClientOrderId ↔ u32 mapping
67/// * `account_id` - Account ID for the report
68/// * `ts_init` - Timestamp for initialization
69///
70/// # Errors
71///
72/// Returns an error if:
73/// - clob_pair_id cannot be parsed from string.
74/// - Instrument lookup fails for the clob_pair_id.
75/// - Field parsing fails (price, size, etc.).
76/// - HTTP parser fails.
77pub fn parse_ws_order_report(
78    ws_order: &DydxWsOrderSubaccountMessageContents,
79    instrument_cache: &InstrumentCache,
80    order_contexts: &DashMap<u32, OrderContext>,
81    encoder: &ClientOrderIdEncoder,
82    account_id: AccountId,
83    ts_init: UnixNanos,
84) -> anyhow::Result<OrderStatusReport> {
85    let clob_pair_id: u32 = ws_order.clob_pair_id.parse().context(format!(
86        "Failed to parse clob_pair_id '{}'",
87        ws_order.clob_pair_id
88    ))?;
89
90    let instrument = instrument_cache
91        .get_by_clob_id(clob_pair_id)
92        .ok_or_else(|| {
93            instrument_cache.log_missing_clob_pair_id(clob_pair_id);
94            anyhow::anyhow!("No instrument cached for clob_pair_id {clob_pair_id}")
95        })?;
96
97    let http_order = convert_ws_order_to_http(ws_order)?;
98    let mut report = parse_order_status_report(&http_order, &instrument, account_id, ts_init)?;
99
100    let dydx_client_id = ws_order.client_id.parse::<u32>().ok();
101    let dydx_client_metadata = ws_order
102        .client_metadata
103        .as_ref()
104        .and_then(|s| s.parse::<u32>().ok())
105        .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
106
107    log::info!(
108        "[WS_ORDER_RECV] dYdX client_id='{}' meta={:#x} (parsed u32={:?}) | status={:?} | clob_pair={} | side={:?} | size={} | filled={}",
109        ws_order.client_id,
110        dydx_client_metadata,
111        dydx_client_id,
112        ws_order.status,
113        ws_order.clob_pair_id,
114        ws_order.side,
115        ws_order.size,
116        ws_order.total_filled.as_deref().unwrap_or("?")
117    );
118
119    // Look up the original Nautilus client_order_id from the order context first,
120    // then fall back to encoder.decode() if not found in context
121    if let Some(client_id) = dydx_client_id {
122        if let Some(ctx) = order_contexts.get(&client_id) {
123            log::info!(
124                "[WS_ORDER_RECV] DECODE via order_contexts: dYdX u32={} -> Nautilus '{}'",
125                client_id,
126                ctx.client_order_id
127            );
128            report.client_order_id = Some(ctx.client_order_id);
129        } else if let Some(client_order_id) = encoder.decode(client_id, dydx_client_metadata) {
130            // Fallback: use encoder's bidirectional decode with both client_id and client_metadata
131            log::info!(
132                "[WS_ORDER_RECV] DECODE via encoder fallback: dYdX u32={client_id} meta={dydx_client_metadata:#x} -> Nautilus '{client_order_id}'"
133            );
134            report.client_order_id = Some(client_order_id);
135        } else {
136            log::warn!(
137                "[WS_ORDER_RECV] DECODE FAILED: dYdX u32={client_id} meta={dydx_client_metadata:#x} not found in order_contexts or encoder!"
138            );
139        }
140    } else {
141        log::warn!(
142            "[WS_ORDER_RECV] Could not parse client_id '{}' as u32",
143            ws_order.client_id
144        );
145    }
146
147    // For untriggered conditional orders with an explicit trigger price we
148    // surface `PendingUpdate` to match Nautilus semantics and existing dYdX
149    // enum mapping.
150    if matches!(ws_order.status, DydxOrderStatus::Untriggered) && ws_order.trigger_price.is_some() {
151        report.order_status = OrderStatus::PendingUpdate;
152    }
153
154    Ok(report)
155}
156
157/// Converts a WebSocket order message to HTTP Order format.
158///
159/// # Errors
160///
161/// Returns an error if any field parsing fails.
162fn convert_ws_order_to_http(
163    ws_order: &DydxWsOrderSubaccountMessageContents,
164) -> anyhow::Result<Order> {
165    let clob_pair_id: u32 = ws_order
166        .clob_pair_id
167        .parse()
168        .context("Failed to parse clob_pair_id")?;
169
170    let size: Decimal = ws_order.size.parse().context("Failed to parse size")?;
171
172    let total_filled: Decimal = ws_order
173        .total_filled
174        .as_ref()
175        .map(|s| s.parse())
176        .transpose()
177        .context("Failed to parse total_filled")?
178        .unwrap_or(Decimal::ZERO);
179
180    // Saturate to zero if total_filled exceeds size (edge case: rounding or partial fills)
181    let remaining_size = (size - total_filled).max(Decimal::ZERO);
182
183    let price: Decimal = ws_order.price.parse().context("Failed to parse price")?;
184
185    let created_at_height: u64 = ws_order
186        .created_at_height
187        .as_ref()
188        .map(|s| s.parse())
189        .transpose()
190        .context("Failed to parse created_at_height")?
191        .unwrap_or(0);
192
193    let client_metadata: u32 = ws_order
194        .client_metadata
195        .as_ref()
196        .ok_or_else(|| anyhow::anyhow!("Missing required field: client_metadata"))?
197        .parse()
198        .context("Failed to parse client_metadata")?;
199
200    let order_flags: u32 = ws_order
201        .order_flags
202        .parse()
203        .context("Failed to parse order_flags")?;
204
205    let good_til_block = ws_order
206        .good_til_block
207        .as_ref()
208        .and_then(|s| s.parse::<u64>().ok());
209
210    let good_til_block_time = ws_order
211        .good_til_block_time
212        .as_ref()
213        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
214        .map(|dt| dt.with_timezone(&Utc));
215
216    let trigger_price = ws_order
217        .trigger_price
218        .as_ref()
219        .and_then(|s| Decimal::from_str(s).ok());
220
221    // Parse updated_at (optional for BEST_EFFORT_OPENED orders)
222    let updated_at = ws_order
223        .updated_at
224        .as_ref()
225        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
226        .map(|dt| dt.with_timezone(&Utc));
227
228    // Parse updated_at_height (optional for BEST_EFFORT_OPENED orders)
229    let updated_at_height = ws_order
230        .updated_at_height
231        .as_ref()
232        .and_then(|s| s.parse::<u64>().ok());
233
234    let total_filled = size.checked_sub(remaining_size).unwrap_or(Decimal::ZERO);
235
236    Ok(Order {
237        id: ws_order.id.clone(),
238        subaccount_id: ws_order.subaccount_id.clone(),
239        client_id: ws_order.client_id.clone(),
240        clob_pair_id,
241        side: ws_order.side,
242        size,
243        total_filled,
244        price,
245        status: ws_order.status,
246        order_type: ws_order.order_type,
247        time_in_force: ws_order.time_in_force,
248        reduce_only: ws_order.reduce_only,
249        post_only: ws_order.post_only,
250        order_flags,
251        good_til_block,
252        good_til_block_time,
253        created_at_height: Some(created_at_height),
254        client_metadata,
255        trigger_price,
256        condition_type: None, // Not provided in WebSocket messages
257        conditional_order_trigger_subticks: None, // Not provided in WebSocket messages
258        execution: None,      // Inferred from post_only flag by HTTP parser
259        updated_at,
260        updated_at_height,
261        ticker: None,               // Not provided in WebSocket messages
262        subaccount_number: 0,       // Default to 0 for WebSocket messages
263        order_router_address: None, // Not provided in WebSocket messages
264    })
265}
266
267/// Parses a WebSocket fill update into a FillReport.
268///
269/// Converts the WebSocket fill format to the HTTP Fill format, then delegates
270/// to the existing HTTP parser for consistency. Correlates the fill back to the
271/// originating order using the `order_id_map` (built from WS order updates).
272///
273/// # Errors
274///
275/// Returns an error if:
276/// - Instrument lookup fails for the market symbol.
277/// - Field parsing fails (price, size, fee, etc.).
278/// - HTTP parser fails.
279pub fn parse_ws_fill_report(
280    ws_fill: &DydxWsFillSubaccountMessageContents,
281    instrument_cache: &InstrumentCache,
282    order_id_map: &DashMap<String, (u32, u32)>,
283    order_contexts: &DashMap<u32, OrderContext>,
284    encoder: &ClientOrderIdEncoder,
285    account_id: AccountId,
286    ts_init: UnixNanos,
287) -> anyhow::Result<FillReport> {
288    let instrument = instrument_cache
289        .get_by_market(&ws_fill.market)
290        .ok_or_else(|| {
291            let available: Vec<String> = instrument_cache
292                .all_instruments()
293                .into_iter()
294                .map(|inst| inst.id().symbol.to_string())
295                .collect();
296            anyhow::anyhow!(
297                "No instrument cached for market '{}'. Available: {:?}",
298                ws_fill.market,
299                available
300            )
301        })?;
302
303    let http_fill = convert_ws_fill_to_http(ws_fill)?;
304    let mut report = parse_fill_report(&http_fill, &instrument, account_id, ts_init)?;
305
306    // Correlate fill to order via order_id → (client_id, client_metadata) → client_order_id
307    if let Some(ref order_id) = ws_fill.order_id {
308        if let Some(entry) = order_id_map.get(order_id) {
309            let (client_id, client_metadata) = *entry.value();
310            if let Some(ctx) = order_contexts.get(&client_id) {
311                report.client_order_id = Some(ctx.client_order_id);
312            } else if let Some(client_order_id) = encoder.decode(client_id, client_metadata) {
313                report.client_order_id = Some(client_order_id);
314            } else {
315                log::warn!(
316                    "[WS_FILL_RECV] DECODE FAILED: order_id={order_id} -> client_id={client_id} meta={client_metadata:#x} not decodable",
317                );
318            }
319        } else {
320            log::warn!(
321                "[WS_FILL_RECV] No order_id mapping for '{order_id}', fill cannot be correlated",
322            );
323        }
324    }
325
326    Ok(report)
327}
328
329/// Converts a WebSocket fill message to HTTP Fill format.
330///
331/// # Errors
332///
333/// Returns an error if any field parsing fails.
334fn convert_ws_fill_to_http(ws_fill: &DydxWsFillSubaccountMessageContents) -> anyhow::Result<Fill> {
335    let price: Decimal = ws_fill.price.parse().context("Failed to parse price")?;
336    let size: Decimal = ws_fill.size.parse().context("Failed to parse size")?;
337    let fee: Decimal = ws_fill.fee.parse().context("Failed to parse fee")?;
338
339    let created_at_height: u64 = ws_fill
340        .created_at_height
341        .as_ref()
342        .map(|s| s.parse())
343        .transpose()
344        .context("Failed to parse created_at_height")?
345        .unwrap_or(0);
346
347    let client_metadata: u32 = ws_fill
348        .client_metadata
349        .as_ref()
350        .ok_or_else(|| anyhow::anyhow!("Missing required field: client_metadata"))?
351        .parse()
352        .context("Failed to parse client_metadata")?;
353
354    let order_id = ws_fill
355        .order_id
356        .clone()
357        .ok_or_else(|| anyhow::anyhow!("Missing required field: order_id"))?;
358
359    let created_at = DateTime::parse_from_rfc3339(&ws_fill.created_at)
360        .context("Failed to parse created_at")?
361        .with_timezone(&Utc);
362
363    Ok(Fill {
364        id: ws_fill.id.clone(),
365        side: ws_fill.side,
366        liquidity: ws_fill.liquidity,
367        fill_type: ws_fill.fill_type,
368        market: ws_fill.market,
369        market_type: ws_fill.market_type.unwrap_or(DydxTickerType::Perpetual),
370        price,
371        size,
372        fee,
373        created_at,
374        created_at_height,
375        order_id,
376        client_metadata,
377    })
378}
379
380/// Parses a WebSocket position into a PositionStatusReport.
381///
382/// Converts the WebSocket position format to the HTTP PerpetualPosition format,
383/// then delegates to the existing HTTP parser for consistency.
384///
385/// # Errors
386///
387/// Returns an error if:
388/// - Instrument lookup fails for the market symbol.
389/// - Field parsing fails (size, prices, etc.).
390/// - HTTP parser fails.
391pub fn parse_ws_position_report(
392    ws_position: &DydxPerpetualPosition,
393    instrument_cache: &InstrumentCache,
394    account_id: AccountId,
395    ts_init: UnixNanos,
396) -> anyhow::Result<PositionStatusReport> {
397    let instrument = instrument_cache
398        .get_by_market(&ws_position.market)
399        .ok_or_else(|| {
400            let available: Vec<String> = instrument_cache
401                .all_instruments()
402                .into_iter()
403                .map(|inst| inst.id().symbol.to_string())
404                .collect();
405            anyhow::anyhow!(
406                "No instrument cached for market '{}'. Available: {:?}",
407                ws_position.market,
408                available
409            )
410        })?;
411
412    let http_position = convert_ws_position_to_http(ws_position)?;
413    parse_position_status_report(&http_position, &instrument, account_id, ts_init)
414}
415
416/// Converts a WebSocket position to HTTP PerpetualPosition format.
417///
418/// # Errors
419///
420/// Returns an error if any field parsing fails.
421fn convert_ws_position_to_http(
422    ws_position: &DydxPerpetualPosition,
423) -> anyhow::Result<PerpetualPosition> {
424    let size: Decimal = ws_position.size.parse().context("Failed to parse size")?;
425
426    let max_size: Decimal = ws_position
427        .max_size
428        .parse()
429        .context("Failed to parse max_size")?;
430
431    let entry_price: Decimal = ws_position
432        .entry_price
433        .parse()
434        .context("Failed to parse entry_price")?;
435
436    let exit_price: Option<Decimal> = ws_position
437        .exit_price
438        .as_ref()
439        .map(|s| s.parse())
440        .transpose()
441        .context("Failed to parse exit_price")?;
442
443    let realized_pnl: Decimal = ws_position
444        .realized_pnl
445        .parse()
446        .context("Failed to parse realized_pnl")?;
447
448    let unrealized_pnl: Decimal = ws_position
449        .unrealized_pnl
450        .parse()
451        .context("Failed to parse unrealized_pnl")?;
452
453    let sum_open: Decimal = ws_position
454        .sum_open
455        .parse()
456        .context("Failed to parse sum_open")?;
457
458    let sum_close: Decimal = ws_position
459        .sum_close
460        .parse()
461        .context("Failed to parse sum_close")?;
462
463    let net_funding: Decimal = ws_position
464        .net_funding
465        .parse()
466        .context("Failed to parse net_funding")?;
467
468    let created_at = DateTime::parse_from_rfc3339(&ws_position.created_at)
469        .context("Failed to parse created_at")?
470        .with_timezone(&Utc);
471
472    let closed_at = ws_position
473        .closed_at
474        .as_ref()
475        .map(|s| DateTime::parse_from_rfc3339(s))
476        .transpose()
477        .context("Failed to parse closed_at")?
478        .map(|dt| dt.with_timezone(&Utc));
479
480    // Determine side from size sign (HTTP format uses OrderSide, not PositionSide)
481    let side = if size.is_sign_positive() {
482        OrderSide::Buy
483    } else {
484        OrderSide::Sell
485    };
486
487    Ok(PerpetualPosition {
488        market: ws_position.market,
489        status: ws_position.status,
490        side,
491        size,
492        max_size,
493        entry_price,
494        exit_price,
495        realized_pnl,
496        created_at_height: 0, // Not provided in WebSocket messages
497        created_at,
498        sum_open,
499        sum_close,
500        net_funding,
501        unrealized_pnl,
502        closed_at,
503    })
504}
505
506// ---------------------------------------------------------------------------
507//  Market data parsing functions
508// ---------------------------------------------------------------------------
509
510/// Parses an orderbook snapshot into [`OrderBookDeltas`].
511///
512/// # Errors
513///
514/// Returns an error if price/size parsing fails.
515pub fn parse_orderbook_snapshot(
516    instrument_id: &InstrumentId,
517    contents: &DydxOrderbookSnapshotContents,
518    price_precision: u8,
519    size_precision: u8,
520    ts_init: UnixNanos,
521) -> DydxWsResult<OrderBookDeltas> {
522    let mut deltas = Vec::new();
523    deltas.push(OrderBookDelta::clear(*instrument_id, 0, ts_init, ts_init));
524
525    let bids = contents.bids.as_deref().unwrap_or(&[]);
526    let asks = contents.asks.as_deref().unwrap_or(&[]);
527
528    let bids_len = bids.len();
529    let asks_len = asks.len();
530
531    for (idx, bid) in bids.iter().enumerate() {
532        let is_last = idx == bids_len - 1 && asks_len == 0;
533        let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
534
535        let price = Decimal::from_str(&bid.price)
536            .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
537
538        let size = Decimal::from_str(&bid.size)
539            .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
540
541        let order = BookOrder::new(
542            OrderSide::Buy,
543            Price::from_decimal_dp(price, price_precision).map_err(|e| {
544                DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
545            })?,
546            Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
547                DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
548            })?,
549            0,
550        );
551
552        deltas.push(OrderBookDelta::new(
553            *instrument_id,
554            BookAction::Add,
555            order,
556            flags,
557            0,
558            ts_init,
559            ts_init,
560        ));
561    }
562
563    for (idx, ask) in asks.iter().enumerate() {
564        let is_last = idx == asks_len - 1;
565        let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
566
567        let price = Decimal::from_str(&ask.price)
568            .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
569
570        let size = Decimal::from_str(&ask.size)
571            .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
572
573        let order = BookOrder::new(
574            OrderSide::Sell,
575            Price::from_decimal_dp(price, price_precision).map_err(|e| {
576                DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
577            })?,
578            Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
579                DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
580            })?,
581            0,
582        );
583
584        deltas.push(OrderBookDelta::new(
585            *instrument_id,
586            BookAction::Add,
587            order,
588            flags,
589            0,
590            ts_init,
591            ts_init,
592        ));
593    }
594
595    Ok(OrderBookDeltas::new(*instrument_id, deltas))
596}
597
598/// Parses orderbook deltas (marks as last message by default).
599///
600/// # Errors
601///
602/// Returns an error if price/size parsing fails.
603pub fn parse_orderbook_deltas(
604    instrument_id: &InstrumentId,
605    contents: &DydxOrderbookContents,
606    price_precision: u8,
607    size_precision: u8,
608    ts_init: UnixNanos,
609) -> DydxWsResult<OrderBookDeltas> {
610    let deltas = parse_orderbook_deltas_with_flag(
611        instrument_id,
612        contents,
613        price_precision,
614        size_precision,
615        ts_init,
616        true,
617    )?;
618    Ok(OrderBookDeltas::new(*instrument_id, deltas))
619}
620
621/// Parses orderbook deltas with explicit last-message flag for batch processing.
622///
623/// # Errors
624///
625/// Returns an error if price/size parsing fails.
626#[allow(clippy::too_many_arguments)]
627pub fn parse_orderbook_deltas_with_flag(
628    instrument_id: &InstrumentId,
629    contents: &DydxOrderbookContents,
630    price_precision: u8,
631    size_precision: u8,
632    ts_init: UnixNanos,
633    is_last_message: bool,
634) -> DydxWsResult<Vec<OrderBookDelta>> {
635    let mut deltas = Vec::new();
636
637    let bids = contents.bids.as_deref().unwrap_or(&[]);
638    let asks = contents.asks.as_deref().unwrap_or(&[]);
639
640    let bids_len = bids.len();
641    let asks_len = asks.len();
642
643    for (idx, (price_str, size_str)) in bids.iter().enumerate() {
644        let is_last = is_last_message && idx == bids_len - 1 && asks_len == 0;
645        let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
646
647        let price = Decimal::from_str(price_str)
648            .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
649
650        let size = Decimal::from_str(size_str)
651            .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
652
653        let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
654            DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
655        })?;
656        let action = if qty.is_zero() {
657            BookAction::Delete
658        } else {
659            BookAction::Update
660        };
661
662        let order = BookOrder::new(
663            OrderSide::Buy,
664            Price::from_decimal_dp(price, price_precision).map_err(|e| {
665                DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
666            })?,
667            qty,
668            0,
669        );
670
671        deltas.push(OrderBookDelta::new(
672            *instrument_id,
673            action,
674            order,
675            flags,
676            0,
677            ts_init,
678            ts_init,
679        ));
680    }
681
682    for (idx, (price_str, size_str)) in asks.iter().enumerate() {
683        let is_last = is_last_message && idx == asks_len - 1;
684        let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
685
686        let price = Decimal::from_str(price_str)
687            .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
688
689        let size = Decimal::from_str(size_str)
690            .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
691
692        let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
693            DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
694        })?;
695        let action = if qty.is_zero() {
696            BookAction::Delete
697        } else {
698            BookAction::Update
699        };
700
701        let order = BookOrder::new(
702            OrderSide::Sell,
703            Price::from_decimal_dp(price, price_precision).map_err(|e| {
704                DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
705            })?,
706            qty,
707            0,
708        );
709
710        deltas.push(OrderBookDelta::new(
711            *instrument_id,
712            action,
713            order,
714            flags,
715            0,
716            ts_init,
717            ts_init,
718        ));
719    }
720
721    Ok(deltas)
722}
723
724/// Parses trade ticks from trade contents.
725///
726/// # Errors
727///
728/// Returns an error if price/size/timestamp parsing fails.
729pub fn parse_trade_ticks(
730    instrument_id: InstrumentId,
731    instrument: &InstrumentAny,
732    contents: &DydxTradeContents,
733    ts_init: UnixNanos,
734) -> DydxWsResult<Vec<Data>> {
735    let mut ticks = Vec::new();
736
737    for trade in &contents.trades {
738        let aggressor_side = match trade.side {
739            OrderSide::Buy => AggressorSide::Buyer,
740            OrderSide::Sell => AggressorSide::Seller,
741            _ => continue,
742        };
743
744        let price = Decimal::from_str(&trade.price)
745            .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade price: {e}")))?;
746
747        let size = Decimal::from_str(&trade.size)
748            .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade size: {e}")))?;
749
750        let trade_ts = trade.created_at.timestamp_nanos_opt().ok_or_else(|| {
751            DydxWsError::Parse(format!("Timestamp out of range for trade {}", trade.id))
752        })?;
753
754        let tick = TradeTick::new(
755            instrument_id,
756            Price::from_decimal_dp(price, instrument.price_precision()).map_err(|e| {
757                DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
758            })?,
759            Quantity::from_decimal_dp(size, instrument.size_precision()).map_err(|e| {
760                DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
761            })?,
762            aggressor_side,
763            TradeId::new(&trade.id),
764            UnixNanos::from(trade_ts as u64),
765            ts_init,
766        );
767        ticks.push(Data::Trade(tick));
768    }
769
770    Ok(ticks)
771}
772
773/// Parses a single candle into a [`Bar`].
774///
775/// When `timestamp_on_close` is true, `ts_event` is set to bar close time
776/// (started_at + interval). When false, uses the venue-native open time.
777///
778/// # Errors
779///
780/// Returns an error if OHLCV/timestamp parsing fails.
781pub fn parse_candle_bar(
782    bar_type: BarType,
783    instrument: &InstrumentAny,
784    candle: &DydxCandle,
785    timestamp_on_close: bool,
786    ts_init: UnixNanos,
787) -> DydxWsResult<Bar> {
788    let open = Decimal::from_str(&candle.open)
789        .map_err(|e| DydxWsError::Parse(format!("Failed to parse open: {e}")))?;
790    let high = Decimal::from_str(&candle.high)
791        .map_err(|e| DydxWsError::Parse(format!("Failed to parse high: {e}")))?;
792    let low = Decimal::from_str(&candle.low)
793        .map_err(|e| DydxWsError::Parse(format!("Failed to parse low: {e}")))?;
794    let close = Decimal::from_str(&candle.close)
795        .map_err(|e| DydxWsError::Parse(format!("Failed to parse close: {e}")))?;
796    let volume = candle
797        .base_token_volume
798        .as_deref()
799        .map(Decimal::from_str)
800        .transpose()
801        .map_err(|e| DydxWsError::Parse(format!("Failed to parse volume: {e}")))?
802        .unwrap_or(Decimal::ZERO);
803
804    let started_at_nanos = candle.started_at.timestamp_nanos_opt().ok_or_else(|| {
805        DydxWsError::Parse(format!(
806            "Timestamp out of range for candle at {}",
807            candle.started_at
808        ))
809    })?;
810    let mut ts_event = UnixNanos::from(started_at_nanos as u64);
811    if timestamp_on_close {
812        let interval_ns = bar_type
813            .spec()
814            .timedelta()
815            .num_nanoseconds()
816            .ok_or_else(|| DydxWsError::Parse("Bar interval overflow".to_string()))?;
817        let updated = (started_at_nanos as u64)
818            .checked_add(interval_ns as u64)
819            .ok_or_else(|| {
820                DydxWsError::Parse("Bar timestamp overflowed adjusting to close time".to_string())
821            })?;
822        ts_event = UnixNanos::from(updated);
823    }
824
825    let bar = Bar::new(
826        bar_type,
827        Price::from_decimal_dp(open, instrument.price_precision()).map_err(|e| {
828            DydxWsError::Parse(format!("Failed to create open Price from decimal: {e}"))
829        })?,
830        Price::from_decimal_dp(high, instrument.price_precision()).map_err(|e| {
831            DydxWsError::Parse(format!("Failed to create high Price from decimal: {e}"))
832        })?,
833        Price::from_decimal_dp(low, instrument.price_precision()).map_err(|e| {
834            DydxWsError::Parse(format!("Failed to create low Price from decimal: {e}"))
835        })?,
836        Price::from_decimal_dp(close, instrument.price_precision()).map_err(|e| {
837            DydxWsError::Parse(format!("Failed to create close Price from decimal: {e}"))
838        })?,
839        Quantity::from_decimal_dp(volume, instrument.size_precision()).map_err(|e| {
840            DydxWsError::Parse(format!(
841                "Failed to create volume Quantity from decimal: {e}"
842            ))
843        })?,
844        ts_event,
845        ts_init,
846    );
847
848    Ok(bar)
849}
850
851#[cfg(test)]
852mod tests {
853    use std::str::FromStr;
854
855    use nautilus_model::{
856        data::{BarType, Data},
857        enums::{
858            AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType,
859            PositionSideSpecified,
860        },
861        identifiers::{AccountId, InstrumentId, Symbol, Venue},
862        instruments::{CryptoPerpetual, InstrumentAny},
863        types::{Currency, Price, Quantity},
864    };
865    use rstest::rstest;
866    use rust_decimal_macros::dec;
867    use ustr::Ustr;
868
869    use super::*;
870    use crate::{
871        common::{
872            enums::{
873                DydxFillType, DydxLiquidity, DydxMarketStatus, DydxOrderStatus, DydxOrderType,
874                DydxPositionSide, DydxPositionStatus, DydxTickerType, DydxTimeInForce,
875            },
876            testing::load_json_fixture,
877        },
878        http::models::PerpetualMarket,
879        websocket::messages::{DydxPerpetualPosition, DydxWsFillSubaccountMessageContents},
880    };
881
882    /// Creates a test market with BTC-USD ticker and specified clob_pair_id.
883    fn create_test_market(ticker: &str, clob_pair_id: u32) -> PerpetualMarket {
884        PerpetualMarket {
885            clob_pair_id,
886            ticker: Ustr::from(ticker),
887            status: DydxMarketStatus::Active,
888            base_asset: Some(Ustr::from("BTC")),
889            quote_asset: Some(Ustr::from("USD")),
890            step_size: dec!(0.001),
891            tick_size: dec!(0.01),
892            index_price: Some(dec!(50000)),
893            oracle_price: dec!(50000),
894            price_change_24h: dec!(0),
895            next_funding_rate: dec!(0),
896            next_funding_at: None,
897            min_order_size: Some(dec!(0.001)),
898            market_type: None,
899            initial_margin_fraction: dec!(0.05),
900            maintenance_margin_fraction: dec!(0.03),
901            base_position_notional: None,
902            incremental_position_size: None,
903            incremental_initial_margin_fraction: None,
904            max_position_size: None,
905            open_interest: dec!(1000),
906            atomic_resolution: -10,
907            quantum_conversion_exponent: -9,
908            subticks_per_tick: 1000000,
909            step_base_quantums: 1000000,
910            is_reduce_only: false,
911        }
912    }
913
914    /// Creates an InstrumentCache populated with the test instrument.
915    fn create_test_instrument_cache() -> InstrumentCache {
916        let cache = InstrumentCache::new();
917        let instrument = create_test_instrument();
918        let market = create_test_market("BTC-USD", 1);
919        cache.insert(instrument, market);
920        cache
921    }
922
923    fn create_test_instrument() -> InstrumentAny {
924        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
925
926        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
927            instrument_id,
928            Symbol::new("BTC-USD"),
929            Currency::BTC(),
930            Currency::USD(),
931            Currency::USD(),
932            false,
933            2,
934            8,
935            Price::new(0.01, 2),
936            Quantity::new(0.001, 8),
937            Some(Quantity::new(1.0, 0)),
938            Some(Quantity::new(0.001, 8)),
939            Some(Quantity::new(100000.0, 8)),
940            Some(Quantity::new(0.001, 8)),
941            None,
942            None,
943            Some(Price::new(1000000.0, 2)),
944            Some(Price::new(0.01, 2)),
945            Some(rust_decimal_macros::dec!(0.05)),
946            Some(rust_decimal_macros::dec!(0.03)),
947            Some(rust_decimal_macros::dec!(0.0002)),
948            Some(rust_decimal_macros::dec!(0.0005)),
949            UnixNanos::default(),
950            UnixNanos::default(),
951        ))
952    }
953
954    #[rstest]
955    fn test_convert_ws_order_to_http_basic() {
956        let ws_order = DydxWsOrderSubaccountMessageContents {
957            id: "order123".to_string(),
958            subaccount_id: "dydx1test/0".to_string(),
959            client_id: "12345".to_string(),
960            clob_pair_id: "1".to_string(),
961            side: OrderSide::Buy,
962            size: "1.5".to_string(),
963            price: "50000.0".to_string(),
964            status: DydxOrderStatus::PartiallyFilled,
965            order_type: DydxOrderType::Limit,
966            time_in_force: DydxTimeInForce::Gtt,
967            post_only: false,
968            reduce_only: false,
969            order_flags: "0".to_string(),
970            good_til_block: Some("1000".to_string()),
971            good_til_block_time: None,
972            created_at_height: Some("900".to_string()),
973            client_metadata: Some("0".to_string()),
974            trigger_price: None,
975            total_filled: Some("0.5".to_string()),
976            updated_at: Some("2024-11-14T10:00:00Z".to_string()),
977            updated_at_height: Some("950".to_string()),
978        };
979
980        let result = convert_ws_order_to_http(&ws_order);
981        assert!(result.is_ok());
982
983        let http_order = result.unwrap();
984        assert_eq!(http_order.id, "order123");
985        assert_eq!(http_order.clob_pair_id, 1);
986        assert_eq!(http_order.size.to_string(), "1.5");
987        assert_eq!(http_order.total_filled, rust_decimal_macros::dec!(0.5)); // 0.5 filled
988        assert_eq!(http_order.status, DydxOrderStatus::PartiallyFilled);
989    }
990
991    #[rstest]
992    fn test_parse_ws_order_report_success() {
993        let ws_order = DydxWsOrderSubaccountMessageContents {
994            id: "order456".to_string(),
995            subaccount_id: "dydx1test/0".to_string(),
996            client_id: "67890".to_string(),
997            clob_pair_id: "1".to_string(),
998            side: OrderSide::Sell,
999            size: "2.0".to_string(),
1000            price: "51000.0".to_string(),
1001            status: DydxOrderStatus::Open,
1002            order_type: DydxOrderType::Limit,
1003            time_in_force: DydxTimeInForce::Gtt,
1004            post_only: true,
1005            reduce_only: false,
1006            order_flags: "0".to_string(),
1007            good_til_block: Some("2000".to_string()),
1008            good_til_block_time: None,
1009            created_at_height: Some("1800".to_string()),
1010            client_metadata: Some("0".to_string()),
1011            trigger_price: None,
1012            total_filled: Some("0.0".to_string()),
1013            updated_at: None,
1014            updated_at_height: None,
1015        };
1016
1017        let instrument_cache = create_test_instrument_cache();
1018        let encoder = ClientOrderIdEncoder::new();
1019
1020        let account_id = AccountId::new("DYDX-001");
1021        let ts_init = UnixNanos::default();
1022        let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1023
1024        let result = parse_ws_order_report(
1025            &ws_order,
1026            &instrument_cache,
1027            &order_contexts,
1028            &encoder,
1029            account_id,
1030            ts_init,
1031        );
1032
1033        assert!(result.is_ok());
1034        let report = result.unwrap();
1035        assert_eq!(report.account_id, account_id);
1036        assert_eq!(report.order_side, OrderSide::Sell);
1037    }
1038
1039    #[rstest]
1040    fn test_parse_ws_order_report_missing_instrument() {
1041        let ws_order = DydxWsOrderSubaccountMessageContents {
1042            id: "order789".to_string(),
1043            subaccount_id: "dydx1test/0".to_string(),
1044            client_id: "11111".to_string(),
1045            clob_pair_id: "99".to_string(), // Non-existent
1046            side: OrderSide::Buy,
1047            size: "1.0".to_string(),
1048            price: "50000.0".to_string(),
1049            status: DydxOrderStatus::Open,
1050            order_type: DydxOrderType::Market,
1051            time_in_force: DydxTimeInForce::Ioc,
1052            post_only: false,
1053            reduce_only: false,
1054            order_flags: "0".to_string(),
1055            good_til_block: Some("1000".to_string()),
1056            good_til_block_time: None,
1057            created_at_height: Some("900".to_string()),
1058            client_metadata: Some("0".to_string()),
1059            trigger_price: None,
1060            total_filled: Some("0.0".to_string()),
1061            updated_at: None,
1062            updated_at_height: None,
1063        };
1064
1065        let instrument_cache = InstrumentCache::new(); // Empty cache
1066        let encoder = ClientOrderIdEncoder::new();
1067        let account_id = AccountId::new("DYDX-001");
1068        let ts_init = UnixNanos::default();
1069        let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1070
1071        let result = parse_ws_order_report(
1072            &ws_order,
1073            &instrument_cache,
1074            &order_contexts,
1075            &encoder,
1076            account_id,
1077            ts_init,
1078        );
1079
1080        assert!(result.is_err());
1081        assert!(
1082            result
1083                .unwrap_err()
1084                .to_string()
1085                .contains("No instrument cached")
1086        );
1087    }
1088
1089    #[rstest]
1090    fn test_convert_ws_fill_to_http() {
1091        let ws_fill = DydxWsFillSubaccountMessageContents {
1092            id: "fill123".to_string(),
1093            subaccount_id: "sub1".to_string(),
1094            side: OrderSide::Buy,
1095            liquidity: DydxLiquidity::Maker,
1096            fill_type: DydxFillType::Limit,
1097            market: "BTC-USD".into(),
1098            market_type: Some(DydxTickerType::Perpetual),
1099            price: "50000.5".to_string(),
1100            size: "0.1".to_string(),
1101            fee: "-2.5".to_string(), // Negative for maker rebate
1102            created_at: "2024-01-15T10:30:00Z".to_string(),
1103            created_at_height: Some("12345".to_string()),
1104            order_id: Some("order456".to_string()),
1105            client_metadata: Some("999".to_string()),
1106        };
1107
1108        let result = convert_ws_fill_to_http(&ws_fill);
1109        assert!(result.is_ok());
1110
1111        let http_fill = result.unwrap();
1112        assert_eq!(http_fill.id, "fill123");
1113        assert_eq!(http_fill.side, OrderSide::Buy);
1114        assert_eq!(http_fill.liquidity, DydxLiquidity::Maker);
1115        assert_eq!(http_fill.price, rust_decimal_macros::dec!(50000.5));
1116        assert_eq!(http_fill.size, rust_decimal_macros::dec!(0.1));
1117        assert_eq!(http_fill.fee, rust_decimal_macros::dec!(-2.5));
1118        assert_eq!(http_fill.created_at_height, 12345);
1119        assert_eq!(http_fill.order_id, "order456");
1120        assert_eq!(http_fill.client_metadata, 999);
1121    }
1122
1123    #[rstest]
1124    fn test_parse_ws_fill_report_success() {
1125        let instrument_cache = create_test_instrument_cache();
1126        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1127
1128        // dYdX WS fills use market format "BTC-USD" (not "BTC-USD-PERP")
1129        // but the instrument symbol is "BTC-USD-PERP"
1130        let ws_fill = DydxWsFillSubaccountMessageContents {
1131            id: "fill789".to_string(),
1132            subaccount_id: "sub1".to_string(),
1133            side: OrderSide::Sell,
1134            liquidity: DydxLiquidity::Taker,
1135            fill_type: DydxFillType::Limit,
1136            market: "BTC-USD".into(),
1137            market_type: Some(DydxTickerType::Perpetual),
1138            price: "49500.0".to_string(),
1139            size: "0.5".to_string(),
1140            fee: "12.375".to_string(), // Positive for taker fee
1141            created_at: "2024-01-15T11:00:00Z".to_string(),
1142            created_at_height: Some("12400".to_string()),
1143            order_id: Some("order999".to_string()),
1144            client_metadata: Some("888".to_string()),
1145        };
1146
1147        let account_id = AccountId::new("DYDX-001");
1148        let ts_init = UnixNanos::default();
1149        let order_id_map = DashMap::new();
1150        let order_contexts = DashMap::new();
1151        let encoder = ClientOrderIdEncoder::new();
1152
1153        let result = parse_ws_fill_report(
1154            &ws_fill,
1155            &instrument_cache,
1156            &order_id_map,
1157            &order_contexts,
1158            &encoder,
1159            account_id,
1160            ts_init,
1161        );
1162        assert!(result.is_ok());
1163
1164        let fill_report = result.unwrap();
1165        assert_eq!(fill_report.instrument_id, instrument_id);
1166        assert_eq!(fill_report.venue_order_id.as_str(), "order999");
1167        assert_eq!(fill_report.last_qty.as_f64(), 0.5);
1168        assert_eq!(fill_report.last_px.as_f64(), 49500.0);
1169        assert_eq!(fill_report.commission.as_decimal(), dec!(12.38));
1170    }
1171
1172    #[rstest]
1173    fn test_parse_ws_fill_report_missing_instrument() {
1174        let instrument_cache = InstrumentCache::new(); // Empty - no instruments cached
1175
1176        let ws_fill = DydxWsFillSubaccountMessageContents {
1177            id: "fill000".to_string(),
1178            subaccount_id: "sub1".to_string(),
1179            side: OrderSide::Buy,
1180            liquidity: DydxLiquidity::Maker,
1181            fill_type: DydxFillType::Limit,
1182            market: "ETH-USD-PERP".into(),
1183            market_type: Some(DydxTickerType::Perpetual),
1184            price: "3000.0".to_string(),
1185            size: "1.0".to_string(),
1186            fee: "-1.5".to_string(),
1187            created_at: "2024-01-15T12:00:00Z".to_string(),
1188            created_at_height: Some("12500".to_string()),
1189            order_id: Some("order111".to_string()),
1190            client_metadata: Some("777".to_string()),
1191        };
1192
1193        let account_id = AccountId::new("DYDX-001");
1194        let ts_init = UnixNanos::default();
1195        let order_id_map = DashMap::new();
1196        let order_contexts = DashMap::new();
1197        let encoder = ClientOrderIdEncoder::new();
1198
1199        let result = parse_ws_fill_report(
1200            &ws_fill,
1201            &instrument_cache,
1202            &order_id_map,
1203            &order_contexts,
1204            &encoder,
1205            account_id,
1206            ts_init,
1207        );
1208        assert!(result.is_err());
1209        assert!(
1210            result
1211                .unwrap_err()
1212                .to_string()
1213                .contains("No instrument cached for market")
1214        );
1215    }
1216
1217    #[rstest]
1218    fn test_convert_ws_position_to_http() {
1219        let ws_position = DydxPerpetualPosition {
1220            market: "BTC-USD".into(),
1221            status: DydxPositionStatus::Open,
1222            side: DydxPositionSide::Long,
1223            size: "1.5".to_string(),
1224            max_size: "2.0".to_string(),
1225            entry_price: "50000.0".to_string(),
1226            exit_price: None,
1227            realized_pnl: "100.0".to_string(),
1228            unrealized_pnl: "250.5".to_string(),
1229            created_at: "2024-01-15T10:00:00Z".to_string(),
1230            closed_at: None,
1231            sum_open: "5.0".to_string(),
1232            sum_close: "3.5".to_string(),
1233            net_funding: "-10.25".to_string(),
1234        };
1235
1236        let result = convert_ws_position_to_http(&ws_position);
1237        assert!(result.is_ok());
1238
1239        let http_position = result.unwrap();
1240        assert_eq!(http_position.market, "BTC-USD");
1241        assert_eq!(http_position.status, DydxPositionStatus::Open);
1242        assert_eq!(http_position.side, OrderSide::Buy); // Positive size = Buy
1243        assert_eq!(http_position.size, rust_decimal_macros::dec!(1.5));
1244        assert_eq!(http_position.max_size, rust_decimal_macros::dec!(2.0));
1245        assert_eq!(
1246            http_position.entry_price,
1247            rust_decimal_macros::dec!(50000.0)
1248        );
1249        assert_eq!(http_position.exit_price, None);
1250        assert_eq!(http_position.realized_pnl, rust_decimal_macros::dec!(100.0));
1251        assert_eq!(
1252            http_position.unrealized_pnl,
1253            rust_decimal_macros::dec!(250.5)
1254        );
1255        assert_eq!(http_position.sum_open, rust_decimal_macros::dec!(5.0));
1256        assert_eq!(http_position.sum_close, rust_decimal_macros::dec!(3.5));
1257        assert_eq!(http_position.net_funding, rust_decimal_macros::dec!(-10.25));
1258    }
1259
1260    #[rstest]
1261    fn test_parse_ws_position_report_success() {
1262        let instrument_cache = create_test_instrument_cache();
1263        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1264
1265        let ws_position = DydxPerpetualPosition {
1266            market: "BTC-USD".into(),
1267            status: DydxPositionStatus::Open,
1268            side: DydxPositionSide::Long,
1269            size: "0.5".to_string(),
1270            max_size: "1.0".to_string(),
1271            entry_price: "49500.0".to_string(),
1272            exit_price: None,
1273            realized_pnl: "0.0".to_string(),
1274            unrealized_pnl: "125.0".to_string(),
1275            created_at: "2024-01-15T09:00:00Z".to_string(),
1276            closed_at: None,
1277            sum_open: "0.5".to_string(),
1278            sum_close: "0.0".to_string(),
1279            net_funding: "-2.5".to_string(),
1280        };
1281
1282        let account_id = AccountId::new("DYDX-001");
1283        let ts_init = UnixNanos::default();
1284
1285        let result = parse_ws_position_report(&ws_position, &instrument_cache, account_id, ts_init);
1286        assert!(result.is_ok());
1287
1288        let position_report = result.unwrap();
1289        assert_eq!(position_report.instrument_id, instrument_id);
1290        assert_eq!(position_report.position_side, PositionSideSpecified::Long);
1291        assert_eq!(position_report.quantity.as_f64(), 0.5);
1292        // avg_px_open should be entry_price
1293        assert!(position_report.avg_px_open.is_some());
1294    }
1295
1296    #[rstest]
1297    fn test_parse_ws_position_report_short() {
1298        let instrument_cache = create_test_instrument_cache();
1299        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1300
1301        let ws_position = DydxPerpetualPosition {
1302            market: "BTC-USD".into(),
1303            status: DydxPositionStatus::Open,
1304            side: DydxPositionSide::Short,
1305            size: "-0.25".to_string(), // Negative for short
1306            max_size: "0.5".to_string(),
1307            entry_price: "51000.0".to_string(),
1308            exit_price: None,
1309            realized_pnl: "50.0".to_string(),
1310            unrealized_pnl: "-75.25".to_string(),
1311            created_at: "2024-01-15T08:00:00Z".to_string(),
1312            closed_at: None,
1313            sum_open: "0.25".to_string(),
1314            sum_close: "0.0".to_string(),
1315            net_funding: "1.5".to_string(),
1316        };
1317
1318        let account_id = AccountId::new("DYDX-001");
1319        let ts_init = UnixNanos::default();
1320
1321        let result = parse_ws_position_report(&ws_position, &instrument_cache, account_id, ts_init);
1322        assert!(result.is_ok());
1323
1324        let position_report = result.unwrap();
1325        assert_eq!(position_report.instrument_id, instrument_id);
1326        assert_eq!(position_report.position_side, PositionSideSpecified::Short);
1327        assert_eq!(position_report.quantity.as_f64(), 0.25); // Quantity is always positive
1328    }
1329
1330    #[rstest]
1331    fn test_parse_ws_position_report_missing_instrument() {
1332        let instrument_cache = InstrumentCache::new(); // Empty - no instruments cached
1333
1334        let ws_position = DydxPerpetualPosition {
1335            market: "ETH-USD-PERP".into(),
1336            status: DydxPositionStatus::Open,
1337            side: DydxPositionSide::Long,
1338            size: "5.0".to_string(),
1339            max_size: "10.0".to_string(),
1340            entry_price: "3000.0".to_string(),
1341            exit_price: None,
1342            realized_pnl: "0.0".to_string(),
1343            unrealized_pnl: "500.0".to_string(),
1344            created_at: "2024-01-15T07:00:00Z".to_string(),
1345            closed_at: None,
1346            sum_open: "5.0".to_string(),
1347            sum_close: "0.0".to_string(),
1348            net_funding: "-5.0".to_string(),
1349        };
1350
1351        let account_id = AccountId::new("DYDX-001");
1352        let ts_init = UnixNanos::default();
1353
1354        let result = parse_ws_position_report(&ws_position, &instrument_cache, account_id, ts_init);
1355        assert!(result.is_err());
1356        assert!(
1357            result
1358                .unwrap_err()
1359                .to_string()
1360                .contains("No instrument cached for market")
1361        );
1362    }
1363
1364    #[rstest]
1365    #[case(DydxOrderStatus::Filled, "2.0")]
1366    #[case(DydxOrderStatus::Canceled, "0.0")]
1367    #[case(DydxOrderStatus::BestEffortCanceled, "0.5")]
1368    #[case(DydxOrderStatus::BestEffortOpened, "0.0")]
1369    #[case(DydxOrderStatus::Untriggered, "0.0")]
1370    fn test_parse_ws_order_various_statuses(
1371        #[case] status: DydxOrderStatus,
1372        #[case] total_filled: &str,
1373    ) {
1374        let ws_order = DydxWsOrderSubaccountMessageContents {
1375            id: format!("order_{status:?}"),
1376            subaccount_id: "dydx1test/0".to_string(),
1377            client_id: "99999".to_string(),
1378            clob_pair_id: "1".to_string(),
1379            side: OrderSide::Buy,
1380            size: "2.0".to_string(),
1381            price: "50000.0".to_string(),
1382            status,
1383            order_type: DydxOrderType::Limit,
1384            time_in_force: DydxTimeInForce::Gtt,
1385            post_only: false,
1386            reduce_only: false,
1387            order_flags: "0".to_string(),
1388            good_til_block: Some("1000".to_string()),
1389            good_til_block_time: None,
1390            created_at_height: Some("900".to_string()),
1391            client_metadata: Some("0".to_string()),
1392            trigger_price: None,
1393            total_filled: Some(total_filled.to_string()),
1394            updated_at: Some("2024-11-14T10:00:00Z".to_string()),
1395            updated_at_height: Some("950".to_string()),
1396        };
1397
1398        let instrument_cache = create_test_instrument_cache();
1399        let encoder = ClientOrderIdEncoder::new();
1400
1401        let account_id = AccountId::new("DYDX-001");
1402        let ts_init = UnixNanos::default();
1403        let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1404
1405        let result = parse_ws_order_report(
1406            &ws_order,
1407            &instrument_cache,
1408            &order_contexts,
1409            &encoder,
1410            account_id,
1411            ts_init,
1412        );
1413
1414        assert!(
1415            result.is_ok(),
1416            "Failed to parse order with status {status:?}"
1417        );
1418        let report = result.unwrap();
1419
1420        // Verify status conversion
1421        let expected_status = match status {
1422            DydxOrderStatus::Open
1423            | DydxOrderStatus::BestEffortOpened
1424            | DydxOrderStatus::Untriggered => OrderStatus::Accepted,
1425            DydxOrderStatus::PartiallyFilled => OrderStatus::PartiallyFilled,
1426            DydxOrderStatus::Filled => OrderStatus::Filled,
1427            DydxOrderStatus::Canceled | DydxOrderStatus::BestEffortCanceled => {
1428                OrderStatus::Canceled
1429            }
1430        };
1431        assert_eq!(report.order_status, expected_status);
1432    }
1433
1434    #[rstest]
1435    fn test_parse_ws_order_with_trigger_price() {
1436        let ws_order = DydxWsOrderSubaccountMessageContents {
1437            id: "conditional_order".to_string(),
1438            subaccount_id: "dydx1test/0".to_string(),
1439            client_id: "88888".to_string(),
1440            clob_pair_id: "1".to_string(),
1441            side: OrderSide::Sell,
1442            size: "1.0".to_string(),
1443            price: "52000.0".to_string(),
1444            status: DydxOrderStatus::Untriggered,
1445            order_type: DydxOrderType::StopLimit,
1446            time_in_force: DydxTimeInForce::Gtt,
1447            post_only: false,
1448            reduce_only: true,
1449            order_flags: "32".to_string(),
1450            good_til_block: None,
1451            good_til_block_time: Some("2024-12-31T23:59:59Z".to_string()),
1452            created_at_height: Some("1000".to_string()),
1453            client_metadata: Some("100".to_string()),
1454            trigger_price: Some("51500.0".to_string()),
1455            total_filled: Some("0.0".to_string()),
1456            updated_at: Some("2024-11-14T11:00:00Z".to_string()),
1457            updated_at_height: Some("1050".to_string()),
1458        };
1459
1460        let instrument_cache = create_test_instrument_cache();
1461        let encoder = ClientOrderIdEncoder::new();
1462
1463        let account_id = AccountId::new("DYDX-001");
1464        let ts_init = UnixNanos::default();
1465        let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1466
1467        let result = parse_ws_order_report(
1468            &ws_order,
1469            &instrument_cache,
1470            &order_contexts,
1471            &encoder,
1472            account_id,
1473            ts_init,
1474        );
1475
1476        assert!(result.is_ok());
1477        let report = result.unwrap();
1478        assert_eq!(report.order_status, OrderStatus::PendingUpdate);
1479        // Trigger price should be parsed and available in the report
1480        assert!(report.trigger_price.is_some());
1481    }
1482
1483    #[rstest]
1484    fn test_parse_ws_order_market_type() {
1485        let ws_order = DydxWsOrderSubaccountMessageContents {
1486            id: "market_order".to_string(),
1487            subaccount_id: "dydx1test/0".to_string(),
1488            client_id: "77777".to_string(),
1489            clob_pair_id: "1".to_string(),
1490            side: OrderSide::Buy,
1491            size: "0.5".to_string(),
1492            price: "50000.0".to_string(), // Market orders still have a price
1493            status: DydxOrderStatus::Filled,
1494            order_type: DydxOrderType::Market,
1495            time_in_force: DydxTimeInForce::Ioc,
1496            post_only: false,
1497            reduce_only: false,
1498            order_flags: "0".to_string(),
1499            good_til_block: Some("1000".to_string()),
1500            good_til_block_time: None,
1501            created_at_height: Some("900".to_string()),
1502            client_metadata: Some("0".to_string()),
1503            trigger_price: None,
1504            total_filled: Some("0.5".to_string()),
1505            updated_at: Some("2024-11-14T10:01:00Z".to_string()),
1506            updated_at_height: Some("901".to_string()),
1507        };
1508
1509        let instrument_cache = create_test_instrument_cache();
1510        let encoder = ClientOrderIdEncoder::new();
1511
1512        let account_id = AccountId::new("DYDX-001");
1513        let ts_init = UnixNanos::default();
1514        let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1515
1516        let result = parse_ws_order_report(
1517            &ws_order,
1518            &instrument_cache,
1519            &order_contexts,
1520            &encoder,
1521            account_id,
1522            ts_init,
1523        );
1524
1525        assert!(result.is_ok());
1526        let report = result.unwrap();
1527        assert_eq!(report.order_type, OrderType::Market);
1528        assert_eq!(report.order_status, OrderStatus::Filled);
1529    }
1530
1531    #[rstest]
1532    fn test_parse_ws_order_invalid_clob_pair_id() {
1533        let ws_order = DydxWsOrderSubaccountMessageContents {
1534            id: "bad_order".to_string(),
1535            subaccount_id: "dydx1test/0".to_string(),
1536            client_id: "12345".to_string(),
1537            clob_pair_id: "not_a_number".to_string(), // Invalid
1538            side: OrderSide::Buy,
1539            size: "1.0".to_string(),
1540            price: "50000.0".to_string(),
1541            status: DydxOrderStatus::Open,
1542            order_type: DydxOrderType::Limit,
1543            time_in_force: DydxTimeInForce::Gtt,
1544            post_only: false,
1545            reduce_only: false,
1546            order_flags: "0".to_string(),
1547            good_til_block: Some("1000".to_string()),
1548            good_til_block_time: None,
1549            created_at_height: Some("900".to_string()),
1550            client_metadata: Some("0".to_string()),
1551            trigger_price: None,
1552            total_filled: Some("0.0".to_string()),
1553            updated_at: None,
1554            updated_at_height: None,
1555        };
1556
1557        let instrument_cache = InstrumentCache::new(); // Empty cache
1558        let encoder = ClientOrderIdEncoder::new();
1559        let account_id = AccountId::new("DYDX-001");
1560        let ts_init = UnixNanos::default();
1561        let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1562
1563        let result = parse_ws_order_report(
1564            &ws_order,
1565            &instrument_cache,
1566            &order_contexts,
1567            &encoder,
1568            account_id,
1569            ts_init,
1570        );
1571
1572        assert!(result.is_err());
1573        assert!(
1574            result
1575                .unwrap_err()
1576                .to_string()
1577                .contains("Failed to parse clob_pair_id")
1578        );
1579    }
1580
1581    #[rstest]
1582    fn test_parse_ws_position_closed() {
1583        let instrument_cache = create_test_instrument_cache();
1584        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1585
1586        let ws_position = DydxPerpetualPosition {
1587            market: "BTC-USD".into(),
1588            status: DydxPositionStatus::Closed,
1589            side: DydxPositionSide::Long,
1590            size: "0.0".to_string(), // Closed = zero size
1591            max_size: "2.0".to_string(),
1592            entry_price: "48000.0".to_string(),
1593            exit_price: Some("52000.0".to_string()),
1594            realized_pnl: "2000.0".to_string(),
1595            unrealized_pnl: "0.0".to_string(),
1596            created_at: "2024-01-10T09:00:00Z".to_string(),
1597            closed_at: Some("2024-01-15T14:00:00Z".to_string()),
1598            sum_open: "5.0".to_string(),
1599            sum_close: "5.0".to_string(), // Fully closed
1600            net_funding: "-25.5".to_string(),
1601        };
1602
1603        let account_id = AccountId::new("DYDX-001");
1604        let ts_init = UnixNanos::default();
1605
1606        let result = parse_ws_position_report(&ws_position, &instrument_cache, account_id, ts_init);
1607        assert!(result.is_ok());
1608
1609        let position_report = result.unwrap();
1610        assert_eq!(position_report.instrument_id, instrument_id);
1611        // Closed position should have zero quantity
1612        assert_eq!(position_report.quantity.as_f64(), 0.0);
1613    }
1614
1615    #[rstest]
1616    fn test_parse_ws_fill_with_maker_rebate() {
1617        let instrument_cache = create_test_instrument_cache();
1618
1619        let ws_fill = DydxWsFillSubaccountMessageContents {
1620            id: "fill_rebate".to_string(),
1621            subaccount_id: "sub1".to_string(),
1622            side: OrderSide::Buy,
1623            liquidity: DydxLiquidity::Maker,
1624            fill_type: DydxFillType::Limit,
1625            market: "BTC-USD".into(),
1626            market_type: Some(DydxTickerType::Perpetual),
1627            price: "50000.0".to_string(),
1628            size: "1.0".to_string(),
1629            fee: "-15.0".to_string(), // Negative fee = rebate
1630            created_at: "2024-01-15T13:00:00Z".to_string(),
1631            created_at_height: Some("13000".to_string()),
1632            order_id: Some("order_maker".to_string()),
1633            client_metadata: Some("200".to_string()),
1634        };
1635
1636        let account_id = AccountId::new("DYDX-001");
1637        let ts_init = UnixNanos::default();
1638        let order_id_map = DashMap::new();
1639        let order_contexts = DashMap::new();
1640        let encoder = ClientOrderIdEncoder::new();
1641
1642        let result = parse_ws_fill_report(
1643            &ws_fill,
1644            &instrument_cache,
1645            &order_id_map,
1646            &order_contexts,
1647            &encoder,
1648            account_id,
1649            ts_init,
1650        );
1651        assert!(result.is_ok());
1652
1653        let fill_report = result.unwrap();
1654        assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
1655        assert!(fill_report.commission.as_decimal() < dec!(0));
1656    }
1657
1658    #[rstest]
1659    fn test_parse_ws_fill_taker_with_fee() {
1660        let instrument_cache = create_test_instrument_cache();
1661
1662        let ws_fill = DydxWsFillSubaccountMessageContents {
1663            id: "fill_taker".to_string(),
1664            subaccount_id: "sub2".to_string(),
1665            side: OrderSide::Sell,
1666            liquidity: DydxLiquidity::Taker,
1667            fill_type: DydxFillType::Limit,
1668            market: "BTC-USD".into(),
1669            market_type: Some(DydxTickerType::Perpetual),
1670            price: "49800.0".to_string(),
1671            size: "0.75".to_string(),
1672            fee: "18.675".to_string(), // Positive fee for taker
1673            created_at: "2024-01-15T14:00:00Z".to_string(),
1674            created_at_height: Some("14000".to_string()),
1675            order_id: Some("order_taker".to_string()),
1676            client_metadata: Some("300".to_string()),
1677        };
1678
1679        let account_id = AccountId::new("DYDX-001");
1680        let ts_init = UnixNanos::default();
1681        let order_id_map = DashMap::new();
1682        let order_contexts = DashMap::new();
1683        let encoder = ClientOrderIdEncoder::new();
1684
1685        let result = parse_ws_fill_report(
1686            &ws_fill,
1687            &instrument_cache,
1688            &order_id_map,
1689            &order_contexts,
1690            &encoder,
1691            account_id,
1692            ts_init,
1693        );
1694        assert!(result.is_ok());
1695
1696        let fill_report = result.unwrap();
1697        assert_eq!(fill_report.liquidity_side, LiquiditySide::Taker);
1698        assert_eq!(fill_report.order_side, OrderSide::Sell);
1699        assert!(fill_report.commission.as_decimal() > dec!(0));
1700    }
1701
1702    #[rstest]
1703    fn test_parse_orderbook_snapshot() {
1704        let json = load_json_fixture("ws_orderbook_subscribed.json");
1705        let contents: DydxOrderbookSnapshotContents =
1706            serde_json::from_value(json["contents"].clone())
1707                .expect("Failed to parse orderbook snapshot contents");
1708
1709        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
1710        let ts_init = UnixNanos::from(1_000_000_000u64);
1711
1712        let deltas = parse_orderbook_snapshot(&instrument_id, &contents, 2, 8, ts_init)
1713            .expect("Failed to parse orderbook snapshot");
1714
1715        // 1 clear + 3 bids + 3 asks = 7 deltas
1716        assert_eq!(deltas.deltas.len(), 7);
1717
1718        assert_eq!(deltas.deltas[0].action, BookAction::Clear);
1719        assert_eq!(deltas.deltas[1].action, BookAction::Add);
1720        assert_eq!(deltas.deltas[1].order.side, OrderSide::Buy);
1721        assert_eq!(deltas.deltas[1].order.price.to_string(), "43240.00");
1722        assert_eq!(deltas.deltas[1].order.size.to_string(), "1.50000000");
1723
1724        assert_eq!(deltas.deltas[4].action, BookAction::Add);
1725        assert_eq!(deltas.deltas[4].order.side, OrderSide::Sell);
1726        assert_eq!(deltas.deltas[4].order.price.to_string(), "43250.00");
1727        assert_eq!(deltas.deltas[4].order.size.to_string(), "1.20000000");
1728
1729        let last = deltas.deltas.last().unwrap();
1730        assert_ne!(last.flags, 0);
1731    }
1732
1733    #[rstest]
1734    fn test_parse_orderbook_deltas_update() {
1735        let json = load_json_fixture("ws_orderbook_update.json");
1736        let contents: DydxOrderbookContents = serde_json::from_value(json["contents"].clone())
1737            .expect("Failed to parse orderbook update contents");
1738
1739        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
1740        let ts_init = UnixNanos::from(1_000_000_000u64);
1741
1742        let deltas = parse_orderbook_deltas(&instrument_id, &contents, 2, 8, ts_init)
1743            .expect("Failed to parse orderbook deltas");
1744
1745        // 2 bids + 2 asks = 4 deltas
1746        assert_eq!(deltas.deltas.len(), 4);
1747
1748        assert_eq!(deltas.deltas[0].action, BookAction::Update);
1749        assert_eq!(deltas.deltas[0].order.side, OrderSide::Buy);
1750        assert_eq!(deltas.deltas[0].order.price.to_string(), "43240.00");
1751
1752        // First ask with size 0.0 should be a Delete
1753        assert_eq!(deltas.deltas[2].action, BookAction::Delete);
1754        assert_eq!(deltas.deltas[2].order.side, OrderSide::Sell);
1755        assert_eq!(deltas.deltas[2].order.price.to_string(), "43250.00");
1756
1757        assert_eq!(deltas.deltas[3].action, BookAction::Update);
1758        assert_eq!(deltas.deltas[3].order.side, OrderSide::Sell);
1759    }
1760
1761    #[rstest]
1762    fn test_parse_trade_ticks_ws() {
1763        let json = load_json_fixture("ws_trades_subscribed.json");
1764        let contents: DydxTradeContents = serde_json::from_value(json["contents"].clone())
1765            .expect("Failed to parse trade contents");
1766
1767        let instrument = create_test_instrument();
1768        let instrument_id = instrument.id();
1769        let ts_init = UnixNanos::from(1_000_000_000u64);
1770
1771        let ticks = parse_trade_ticks(instrument_id, &instrument, &contents, ts_init)
1772            .expect("Failed to parse trade ticks");
1773
1774        assert_eq!(ticks.len(), 1);
1775        if let Data::Trade(tick) = &ticks[0] {
1776            assert_eq!(tick.instrument_id, instrument_id);
1777            assert_eq!(tick.price.to_string(), "43250.00");
1778            assert_eq!(tick.size.to_string(), "0.50000000");
1779            assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
1780            assert_eq!(tick.trade_id.to_string(), "trade-001");
1781        } else {
1782            panic!("Expected Trade data");
1783        }
1784    }
1785
1786    #[rstest]
1787    #[case(true)]
1788    #[case(false)]
1789    fn test_parse_candle_bar_timestamp_on_close(#[case] timestamp_on_close: bool) {
1790        let json = load_json_fixture("ws_candles_subscribed.json");
1791        let candles_value = &json["contents"]["candles"];
1792        let candles: Vec<DydxCandle> =
1793            serde_json::from_value(candles_value.clone()).expect("Failed to parse candle array");
1794
1795        let instrument = create_test_instrument();
1796        let bar_type = BarType::from_str("BTC-USD-PERP.DYDX-1-MINUTE-LAST-EXTERNAL")
1797            .expect("Failed to parse bar type");
1798        let ts_init = UnixNanos::from(1_000_000_000u64);
1799
1800        let bar = parse_candle_bar(
1801            bar_type,
1802            &instrument,
1803            &candles[0],
1804            timestamp_on_close,
1805            ts_init,
1806        )
1807        .expect("Failed to parse candle bar");
1808
1809        assert_eq!(bar.bar_type, bar_type);
1810        assert_eq!(bar.open.to_string(), "43100.00");
1811        assert_eq!(bar.high.to_string(), "43500.00");
1812        assert_eq!(bar.low.to_string(), "43000.00");
1813        assert_eq!(bar.close.to_string(), "43400.00");
1814        assert_eq!(bar.volume.to_string(), "12.34500000");
1815
1816        // 2024-01-01T00:00:00.000Z = 1_704_067_200_000_000_000 ns
1817        let started_at_ns = 1_704_067_200_000_000_000u64;
1818        let one_min_ns = 60_000_000_000u64;
1819        if timestamp_on_close {
1820            assert_eq!(bar.ts_event.as_u64(), started_at_ns + one_min_ns);
1821        } else {
1822            assert_eq!(bar.ts_event.as_u64(), started_at_ns);
1823        }
1824    }
1825}