nautilus_dydx/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsing utilities for dYdX WebSocket messages.
17//!
18//! Converts WebSocket-specific message formats into Nautilus domain types
19//! by transforming them into HTTP-equivalent structures and delegating to
20//! the HTTP parser for consistency.
21
22use std::str::FromStr;
23
24use anyhow::Context;
25use chrono::{DateTime, Utc};
26use dashmap::DashMap;
27use nautilus_core::UnixNanos;
28use nautilus_model::{
29    enums::{OrderSide, OrderStatus},
30    identifiers::{AccountId, InstrumentId},
31    instruments::{Instrument, InstrumentAny},
32    reports::{FillReport, OrderStatusReport, PositionStatusReport},
33};
34use rust_decimal::Decimal;
35
36use crate::{
37    common::enums::DydxOrderStatus,
38    http::{
39        models::{Fill, Order, PerpetualPosition},
40        parse::{parse_fill_report, parse_order_status_report, parse_position_status_report},
41    },
42    websocket::messages::{
43        DydxPerpetualPosition, DydxWsFillSubaccountMessageContents,
44        DydxWsOrderSubaccountMessageContents,
45    },
46};
47
48/// Parses a WebSocket order update into an OrderStatusReport.
49///
50/// Converts the WebSocket order format to the HTTP Order format, then delegates
51/// to the existing HTTP parser for consistency.
52///
53/// # Errors
54///
55/// Returns an error if:
56/// - clob_pair_id cannot be parsed from string
57/// - Instrument lookup fails for the clob_pair_id
58/// - Field parsing fails (price, size, etc.)
59/// - HTTP parser fails
60pub fn parse_ws_order_report(
61    ws_order: &DydxWsOrderSubaccountMessageContents,
62    clob_pair_id_to_instrument: &DashMap<u32, InstrumentId>,
63    instruments: &DashMap<InstrumentId, InstrumentAny>,
64    account_id: AccountId,
65    ts_init: UnixNanos,
66) -> anyhow::Result<OrderStatusReport> {
67    let clob_pair_id: u32 = ws_order.clob_pair_id.parse().context(format!(
68        "Failed to parse clob_pair_id '{}'",
69        ws_order.clob_pair_id
70    ))?;
71
72    let instrument_id = *clob_pair_id_to_instrument
73        .get(&clob_pair_id)
74        .ok_or_else(|| {
75            let available: Vec<u32> = clob_pair_id_to_instrument
76                .iter()
77                .map(|entry| *entry.key())
78                .collect();
79            anyhow::anyhow!(
80                "No instrument cached for clob_pair_id {clob_pair_id}. Available: {available:?}"
81            )
82        })?
83        .value();
84
85    let instrument = instruments
86        .get(&instrument_id)
87        .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found in cache"))?
88        .value()
89        .clone();
90
91    let http_order = convert_ws_order_to_http(ws_order)?;
92    let mut report = parse_order_status_report(&http_order, &instrument, account_id, ts_init)?;
93
94    // For untriggered conditional orders with an explicit trigger price we
95    // surface `PendingUpdate` to match Nautilus semantics and existing dYdX
96    // enum mapping.
97    if matches!(ws_order.status, DydxOrderStatus::Untriggered) && ws_order.trigger_price.is_some() {
98        report.order_status = OrderStatus::PendingUpdate;
99    }
100
101    Ok(report)
102}
103
104/// Converts a WebSocket order message to HTTP Order format.
105///
106/// # Errors
107///
108/// Returns an error if any field parsing fails.
109fn convert_ws_order_to_http(
110    ws_order: &DydxWsOrderSubaccountMessageContents,
111) -> anyhow::Result<Order> {
112    let clob_pair_id: u32 = ws_order
113        .clob_pair_id
114        .parse()
115        .context("Failed to parse clob_pair_id")?;
116
117    let size: Decimal = ws_order.size.parse().context("Failed to parse size")?;
118
119    let total_filled: Decimal = ws_order
120        .total_filled
121        .parse()
122        .context("Failed to parse total_filled")?;
123
124    // Saturate to zero if total_filled exceeds size (edge case: rounding or partial fills)
125    let remaining_size = (size - total_filled).max(Decimal::ZERO);
126
127    let price: Decimal = ws_order.price.parse().context("Failed to parse price")?;
128
129    let created_at_height: u64 = ws_order
130        .created_at_height
131        .parse()
132        .context("Failed to parse created_at_height")?;
133
134    let client_metadata: u32 = ws_order
135        .client_metadata
136        .parse()
137        .context("Failed to parse client_metadata")?;
138
139    let order_flags: u32 = ws_order
140        .order_flags
141        .parse()
142        .context("Failed to parse order_flags")?;
143
144    let good_til_block = ws_order
145        .good_til_block
146        .as_ref()
147        .and_then(|s| s.parse::<u64>().ok());
148
149    let good_til_block_time = ws_order
150        .good_til_block_time
151        .as_ref()
152        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
153        .map(|dt| dt.with_timezone(&Utc));
154
155    let trigger_price = ws_order
156        .trigger_price
157        .as_ref()
158        .and_then(|s| Decimal::from_str(s).ok());
159
160    // Parse updated_at (optional for BEST_EFFORT_OPENED orders)
161    let updated_at = ws_order
162        .updated_at
163        .as_ref()
164        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
165        .map(|dt| dt.with_timezone(&Utc));
166
167    // Parse updated_at_height (optional for BEST_EFFORT_OPENED orders)
168    let updated_at_height = ws_order
169        .updated_at_height
170        .as_ref()
171        .and_then(|s| s.parse::<u64>().ok());
172
173    let total_filled = size.checked_sub(remaining_size).unwrap_or(Decimal::ZERO);
174
175    Ok(Order {
176        id: ws_order.id.clone(),
177        subaccount_id: ws_order.subaccount_id.clone(),
178        client_id: ws_order.client_id.clone(),
179        clob_pair_id,
180        side: ws_order.side,
181        size,
182        total_filled,
183        price,
184        status: ws_order.status,
185        order_type: ws_order.order_type,
186        time_in_force: ws_order.time_in_force,
187        reduce_only: ws_order.reduce_only,
188        post_only: ws_order.post_only,
189        order_flags,
190        good_til_block,
191        good_til_block_time,
192        created_at_height: Some(created_at_height),
193        client_metadata,
194        trigger_price,
195        condition_type: None, // Not provided in WebSocket messages
196        conditional_order_trigger_subticks: None, // Not provided in WebSocket messages
197        execution: None,      // Inferred from post_only flag by HTTP parser
198        updated_at,
199        updated_at_height,
200        ticker: None,               // Not provided in WebSocket messages
201        subaccount_number: 0,       // Default to 0 for WebSocket messages
202        order_router_address: None, // Not provided in WebSocket messages
203    })
204}
205
206/// Parses a WebSocket fill update into a FillReport.
207///
208/// Converts the WebSocket fill format to the HTTP Fill format, then delegates
209/// to the existing HTTP parser for consistency.
210///
211/// # Errors
212///
213/// Returns an error if:
214/// - Instrument lookup fails for the market symbol
215/// - Field parsing fails (price, size, fee, etc.)
216/// - HTTP parser fails
217pub fn parse_ws_fill_report(
218    ws_fill: &DydxWsFillSubaccountMessageContents,
219    instruments: &DashMap<InstrumentId, InstrumentAny>,
220    account_id: AccountId,
221    ts_init: UnixNanos,
222) -> anyhow::Result<FillReport> {
223    let instrument = instruments
224        .iter()
225        .find(|entry| entry.value().id().symbol.as_str() == ws_fill.market.as_str())
226        .ok_or_else(|| {
227            let available: Vec<String> = instruments
228                .iter()
229                .map(|entry| entry.value().id().symbol.to_string())
230                .collect();
231            anyhow::anyhow!(
232                "No instrument cached for market '{}'. Available: {:?}",
233                ws_fill.market,
234                available
235            )
236        })?
237        .value()
238        .clone();
239
240    let http_fill = convert_ws_fill_to_http(ws_fill)?;
241    parse_fill_report(&http_fill, &instrument, account_id, ts_init)
242}
243
244/// Converts a WebSocket fill message to HTTP Fill format.
245///
246/// # Errors
247///
248/// Returns an error if any field parsing fails.
249fn convert_ws_fill_to_http(ws_fill: &DydxWsFillSubaccountMessageContents) -> anyhow::Result<Fill> {
250    let price: Decimal = ws_fill.price.parse().context("Failed to parse price")?;
251
252    let size: Decimal = ws_fill.size.parse().context("Failed to parse size")?;
253
254    let fee: Decimal = ws_fill.fee.parse().context("Failed to parse fee")?;
255
256    let created_at_height: u64 = ws_fill
257        .created_at_height
258        .parse()
259        .context("Failed to parse created_at_height")?;
260
261    let client_metadata: u32 = ws_fill
262        .client_metadata
263        .parse()
264        .context("Failed to parse client_metadata")?;
265
266    let created_at = DateTime::parse_from_rfc3339(&ws_fill.created_at)
267        .context("Failed to parse created_at")?
268        .with_timezone(&Utc);
269
270    Ok(Fill {
271        id: ws_fill.id.clone(),
272        side: ws_fill.side,
273        liquidity: ws_fill.liquidity,
274        fill_type: ws_fill.fill_type,
275        market: ws_fill.market.to_string(),
276        market_type: ws_fill.market_type,
277        price,
278        size,
279        fee,
280        created_at,
281        created_at_height,
282        order_id: ws_fill.order_id.clone(),
283        client_metadata,
284    })
285}
286
287/// Parses a WebSocket position into a PositionStatusReport.
288///
289/// Converts the WebSocket position format to the HTTP PerpetualPosition format,
290/// then delegates to the existing HTTP parser for consistency.
291///
292/// # Errors
293///
294/// Returns an error if:
295/// - Instrument lookup fails for the market symbol
296/// - Field parsing fails (size, prices, etc.)
297/// - HTTP parser fails
298pub fn parse_ws_position_report(
299    ws_position: &DydxPerpetualPosition,
300    instruments: &DashMap<InstrumentId, InstrumentAny>,
301    account_id: AccountId,
302    ts_init: UnixNanos,
303) -> anyhow::Result<PositionStatusReport> {
304    let instrument = instruments
305        .iter()
306        .find(|entry| entry.value().id().symbol.as_str() == ws_position.market.as_str())
307        .ok_or_else(|| {
308            let available: Vec<String> = instruments
309                .iter()
310                .map(|entry| entry.value().id().symbol.to_string())
311                .collect();
312            anyhow::anyhow!(
313                "No instrument cached for market '{}'. Available: {:?}",
314                ws_position.market,
315                available
316            )
317        })?
318        .value()
319        .clone();
320
321    let http_position = convert_ws_position_to_http(ws_position)?;
322    parse_position_status_report(&http_position, &instrument, account_id, ts_init)
323}
324
325/// Converts a WebSocket position to HTTP PerpetualPosition format.
326///
327/// # Errors
328///
329/// Returns an error if any field parsing fails.
330fn convert_ws_position_to_http(
331    ws_position: &DydxPerpetualPosition,
332) -> anyhow::Result<PerpetualPosition> {
333    let size: Decimal = ws_position.size.parse().context("Failed to parse size")?;
334
335    let max_size: Decimal = ws_position
336        .max_size
337        .parse()
338        .context("Failed to parse max_size")?;
339
340    let entry_price: Decimal = ws_position
341        .entry_price
342        .parse()
343        .context("Failed to parse entry_price")?;
344
345    let exit_price: Option<Decimal> = ws_position
346        .exit_price
347        .as_ref()
348        .map(|s| s.parse())
349        .transpose()
350        .context("Failed to parse exit_price")?;
351
352    let realized_pnl: Decimal = ws_position
353        .realized_pnl
354        .parse()
355        .context("Failed to parse realized_pnl")?;
356
357    let unrealized_pnl: Decimal = ws_position
358        .unrealized_pnl
359        .parse()
360        .context("Failed to parse unrealized_pnl")?;
361
362    let sum_open: Decimal = ws_position
363        .sum_open
364        .parse()
365        .context("Failed to parse sum_open")?;
366
367    let sum_close: Decimal = ws_position
368        .sum_close
369        .parse()
370        .context("Failed to parse sum_close")?;
371
372    let net_funding: Decimal = ws_position
373        .net_funding
374        .parse()
375        .context("Failed to parse net_funding")?;
376
377    let created_at = DateTime::parse_from_rfc3339(&ws_position.created_at)
378        .context("Failed to parse created_at")?
379        .with_timezone(&Utc);
380
381    let closed_at = ws_position
382        .closed_at
383        .as_ref()
384        .map(|s| DateTime::parse_from_rfc3339(s))
385        .transpose()
386        .context("Failed to parse closed_at")?
387        .map(|dt| dt.with_timezone(&Utc));
388
389    // Determine side from size sign (HTTP format uses OrderSide, not PositionSide)
390    let side = if size.is_sign_positive() {
391        OrderSide::Buy
392    } else {
393        OrderSide::Sell
394    };
395
396    Ok(PerpetualPosition {
397        market: ws_position.market.to_string(),
398        status: ws_position.status,
399        side,
400        size,
401        max_size,
402        entry_price,
403        exit_price,
404        realized_pnl,
405        created_at_height: 0, // Not provided in WebSocket messages
406        created_at,
407        sum_open,
408        sum_close,
409        net_funding,
410        unrealized_pnl,
411        closed_at,
412    })
413}
414
415#[cfg(test)]
416mod tests {
417    use nautilus_model::{
418        enums::{LiquiditySide, OrderSide, OrderType, PositionSideSpecified},
419        identifiers::{AccountId, InstrumentId, Symbol, Venue},
420        instruments::CryptoPerpetual,
421        types::{Currency, Price, Quantity},
422    };
423    use rstest::rstest;
424
425    use super::*;
426    use crate::common::enums::{DydxOrderStatus, DydxOrderType, DydxTimeInForce};
427
428    fn create_test_instrument() -> InstrumentAny {
429        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
430
431        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
432            instrument_id,
433            Symbol::new("BTC-USD"),
434            Currency::BTC(),
435            Currency::USD(),
436            Currency::USD(),
437            false,
438            2,
439            8,
440            Price::new(0.01, 2),
441            Quantity::new(0.001, 8),
442            Some(Quantity::new(1.0, 0)),
443            Some(Quantity::new(0.001, 8)),
444            Some(Quantity::new(100000.0, 8)),
445            Some(Quantity::new(0.001, 8)),
446            None,
447            None,
448            Some(Price::new(1000000.0, 2)),
449            Some(Price::new(0.01, 2)),
450            Some(rust_decimal_macros::dec!(0.05)),
451            Some(rust_decimal_macros::dec!(0.03)),
452            Some(rust_decimal_macros::dec!(0.0002)),
453            Some(rust_decimal_macros::dec!(0.0005)),
454            UnixNanos::default(),
455            UnixNanos::default(),
456        ))
457    }
458
459    #[rstest]
460    fn test_convert_ws_order_to_http_basic() {
461        let ws_order = DydxWsOrderSubaccountMessageContents {
462            id: "order123".to_string(),
463            subaccount_id: "dydx1test/0".to_string(),
464            client_id: "12345".to_string(),
465            clob_pair_id: "1".to_string(),
466            side: OrderSide::Buy,
467            size: "1.5".to_string(),
468            price: "50000.0".to_string(),
469            status: DydxOrderStatus::PartiallyFilled,
470            order_type: DydxOrderType::Limit,
471            time_in_force: DydxTimeInForce::Gtt,
472            post_only: false,
473            reduce_only: false,
474            order_flags: "0".to_string(),
475            good_til_block: Some("1000".to_string()),
476            good_til_block_time: None,
477            created_at_height: "900".to_string(),
478            client_metadata: "0".to_string(),
479            trigger_price: None,
480            total_filled: "0.5".to_string(),
481            updated_at: Some("2024-11-14T10:00:00Z".to_string()),
482            updated_at_height: Some("950".to_string()),
483        };
484
485        let result = convert_ws_order_to_http(&ws_order);
486        assert!(result.is_ok());
487
488        let http_order = result.unwrap();
489        assert_eq!(http_order.id, "order123");
490        assert_eq!(http_order.clob_pair_id, 1);
491        assert_eq!(http_order.size.to_string(), "1.5");
492        assert_eq!(http_order.total_filled, rust_decimal_macros::dec!(0.5)); // 0.5 filled
493        assert_eq!(http_order.status, DydxOrderStatus::PartiallyFilled);
494    }
495
496    #[rstest]
497    fn test_parse_ws_order_report_success() {
498        let ws_order = DydxWsOrderSubaccountMessageContents {
499            id: "order456".to_string(),
500            subaccount_id: "dydx1test/0".to_string(),
501            client_id: "67890".to_string(),
502            clob_pair_id: "1".to_string(),
503            side: OrderSide::Sell,
504            size: "2.0".to_string(),
505            price: "51000.0".to_string(),
506            status: DydxOrderStatus::Open,
507            order_type: DydxOrderType::Limit,
508            time_in_force: DydxTimeInForce::Gtt,
509            post_only: true,
510            reduce_only: false,
511            order_flags: "0".to_string(),
512            good_til_block: Some("2000".to_string()),
513            good_til_block_time: None,
514            created_at_height: "1800".to_string(),
515            client_metadata: "0".to_string(),
516            trigger_price: None,
517            total_filled: "0.0".to_string(),
518            updated_at: None,
519            updated_at_height: None,
520        };
521
522        let clob_pair_id_to_instrument = DashMap::new();
523        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
524        clob_pair_id_to_instrument.insert(1, instrument_id);
525
526        let instruments = DashMap::new();
527        let instrument = create_test_instrument();
528        instruments.insert(instrument_id, instrument);
529
530        let account_id = AccountId::new("DYDX-001");
531        let ts_init = UnixNanos::default();
532
533        let result = parse_ws_order_report(
534            &ws_order,
535            &clob_pair_id_to_instrument,
536            &instruments,
537            account_id,
538            ts_init,
539        );
540
541        assert!(result.is_ok());
542        let report = result.unwrap();
543        assert_eq!(report.account_id, account_id);
544        assert_eq!(report.order_side, OrderSide::Sell);
545    }
546
547    #[rstest]
548    fn test_parse_ws_order_report_missing_instrument() {
549        let ws_order = DydxWsOrderSubaccountMessageContents {
550            id: "order789".to_string(),
551            subaccount_id: "dydx1test/0".to_string(),
552            client_id: "11111".to_string(),
553            clob_pair_id: "99".to_string(), // Non-existent
554            side: OrderSide::Buy,
555            size: "1.0".to_string(),
556            price: "50000.0".to_string(),
557            status: DydxOrderStatus::Open,
558            order_type: DydxOrderType::Market,
559            time_in_force: DydxTimeInForce::Ioc,
560            post_only: false,
561            reduce_only: false,
562            order_flags: "0".to_string(),
563            good_til_block: Some("1000".to_string()),
564            good_til_block_time: None,
565            created_at_height: "900".to_string(),
566            client_metadata: "0".to_string(),
567            trigger_price: None,
568            total_filled: "0.0".to_string(),
569            updated_at: None,
570            updated_at_height: None,
571        };
572
573        let clob_pair_id_to_instrument = DashMap::new();
574        let instruments = DashMap::new();
575        let account_id = AccountId::new("DYDX-001");
576        let ts_init = UnixNanos::default();
577
578        let result = parse_ws_order_report(
579            &ws_order,
580            &clob_pair_id_to_instrument,
581            &instruments,
582            account_id,
583            ts_init,
584        );
585
586        assert!(result.is_err());
587        assert!(
588            result
589                .unwrap_err()
590                .to_string()
591                .contains("No instrument cached")
592        );
593    }
594
595    #[rstest]
596    fn test_convert_ws_fill_to_http() {
597        use crate::{
598            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
599            websocket::messages::DydxWsFillSubaccountMessageContents,
600        };
601
602        let ws_fill = DydxWsFillSubaccountMessageContents {
603            id: "fill123".to_string(),
604            subaccount_id: "sub1".to_string(),
605            side: OrderSide::Buy,
606            liquidity: DydxLiquidity::Maker,
607            fill_type: DydxFillType::Limit,
608            market: "BTC-USD".into(),
609            market_type: DydxTickerType::Perpetual,
610            price: "50000.5".to_string(),
611            size: "0.1".to_string(),
612            fee: "-2.5".to_string(), // Negative for maker rebate
613            created_at: "2024-01-15T10:30:00Z".to_string(),
614            created_at_height: "12345".to_string(),
615            order_id: "order456".to_string(),
616            client_metadata: "999".to_string(),
617        };
618
619        let result = convert_ws_fill_to_http(&ws_fill);
620        assert!(result.is_ok());
621
622        let http_fill = result.unwrap();
623        assert_eq!(http_fill.id, "fill123");
624        assert_eq!(http_fill.side, OrderSide::Buy);
625        assert_eq!(http_fill.liquidity, DydxLiquidity::Maker);
626        assert_eq!(http_fill.price, rust_decimal_macros::dec!(50000.5));
627        assert_eq!(http_fill.size, rust_decimal_macros::dec!(0.1));
628        assert_eq!(http_fill.fee, rust_decimal_macros::dec!(-2.5));
629        assert_eq!(http_fill.created_at_height, 12345);
630        assert_eq!(http_fill.order_id, "order456");
631        assert_eq!(http_fill.client_metadata, 999);
632    }
633
634    #[rstest]
635    fn test_parse_ws_fill_report_success() {
636        use crate::{
637            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
638            websocket::messages::DydxWsFillSubaccountMessageContents,
639        };
640
641        let instrument = create_test_instrument();
642        let instrument_id = instrument.id();
643
644        let instruments = DashMap::new();
645        instruments.insert(instrument_id, instrument);
646
647        let ws_fill = DydxWsFillSubaccountMessageContents {
648            id: "fill789".to_string(),
649            subaccount_id: "sub1".to_string(),
650            side: OrderSide::Sell,
651            liquidity: DydxLiquidity::Taker,
652            fill_type: DydxFillType::Limit,
653            market: "BTC-USD-PERP".into(),
654            market_type: DydxTickerType::Perpetual,
655            price: "49500.0".to_string(),
656            size: "0.5".to_string(),
657            fee: "12.375".to_string(), // Positive for taker fee
658            created_at: "2024-01-15T11:00:00Z".to_string(),
659            created_at_height: "12400".to_string(),
660            order_id: "order999".to_string(),
661            client_metadata: "888".to_string(),
662        };
663
664        let account_id = AccountId::new("DYDX-001");
665        let ts_init = UnixNanos::default();
666
667        let result = parse_ws_fill_report(&ws_fill, &instruments, account_id, ts_init);
668        assert!(result.is_ok());
669
670        let fill_report = result.unwrap();
671        assert_eq!(fill_report.instrument_id, instrument_id);
672        assert_eq!(fill_report.venue_order_id.as_str(), "order999");
673        assert_eq!(fill_report.last_qty.as_f64(), 0.5);
674        assert_eq!(fill_report.last_px.as_f64(), 49500.0);
675        // Commission should be negative (cost to trader) after negating positive fee
676        assert!((fill_report.commission.as_f64() + 12.38).abs() < 0.01);
677    }
678
679    #[rstest]
680    fn test_parse_ws_fill_report_missing_instrument() {
681        use crate::{
682            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
683            websocket::messages::DydxWsFillSubaccountMessageContents,
684        };
685
686        let instruments = DashMap::new(); // Empty - no instruments cached
687
688        let ws_fill = DydxWsFillSubaccountMessageContents {
689            id: "fill000".to_string(),
690            subaccount_id: "sub1".to_string(),
691            side: OrderSide::Buy,
692            liquidity: DydxLiquidity::Maker,
693            fill_type: DydxFillType::Limit,
694            market: "ETH-USD-PERP".into(),
695            market_type: DydxTickerType::Perpetual,
696            price: "3000.0".to_string(),
697            size: "1.0".to_string(),
698            fee: "-1.5".to_string(),
699            created_at: "2024-01-15T12:00:00Z".to_string(),
700            created_at_height: "12500".to_string(),
701            order_id: "order111".to_string(),
702            client_metadata: "777".to_string(),
703        };
704
705        let account_id = AccountId::new("DYDX-001");
706        let ts_init = UnixNanos::default();
707
708        let result = parse_ws_fill_report(&ws_fill, &instruments, account_id, ts_init);
709        assert!(result.is_err());
710        assert!(
711            result
712                .unwrap_err()
713                .to_string()
714                .contains("No instrument cached for market")
715        );
716    }
717
718    #[rstest]
719    fn test_convert_ws_position_to_http() {
720        use nautilus_model::enums::PositionSide;
721
722        use crate::{
723            common::enums::DydxPositionStatus, websocket::messages::DydxPerpetualPosition,
724        };
725
726        let ws_position = DydxPerpetualPosition {
727            market: "BTC-USD".into(),
728            status: DydxPositionStatus::Open,
729            side: PositionSide::Long,
730            size: "1.5".to_string(),
731            max_size: "2.0".to_string(),
732            entry_price: "50000.0".to_string(),
733            exit_price: None,
734            realized_pnl: "100.0".to_string(),
735            unrealized_pnl: "250.5".to_string(),
736            created_at: "2024-01-15T10:00:00Z".to_string(),
737            closed_at: None,
738            sum_open: "5.0".to_string(),
739            sum_close: "3.5".to_string(),
740            net_funding: "-10.25".to_string(),
741        };
742
743        let result = convert_ws_position_to_http(&ws_position);
744        assert!(result.is_ok());
745
746        let http_position = result.unwrap();
747        assert_eq!(http_position.market, "BTC-USD");
748        assert_eq!(http_position.status, DydxPositionStatus::Open);
749        assert_eq!(http_position.side, OrderSide::Buy); // Positive size = Buy
750        assert_eq!(http_position.size, rust_decimal_macros::dec!(1.5));
751        assert_eq!(http_position.max_size, rust_decimal_macros::dec!(2.0));
752        assert_eq!(
753            http_position.entry_price,
754            rust_decimal_macros::dec!(50000.0)
755        );
756        assert_eq!(http_position.exit_price, None);
757        assert_eq!(http_position.realized_pnl, rust_decimal_macros::dec!(100.0));
758        assert_eq!(
759            http_position.unrealized_pnl,
760            rust_decimal_macros::dec!(250.5)
761        );
762        assert_eq!(http_position.sum_open, rust_decimal_macros::dec!(5.0));
763        assert_eq!(http_position.sum_close, rust_decimal_macros::dec!(3.5));
764        assert_eq!(http_position.net_funding, rust_decimal_macros::dec!(-10.25));
765    }
766
767    #[rstest]
768    fn test_parse_ws_position_report_success() {
769        use nautilus_model::enums::PositionSide;
770
771        use crate::{
772            common::enums::DydxPositionStatus, websocket::messages::DydxPerpetualPosition,
773        };
774
775        let instrument = create_test_instrument();
776        let instrument_id = instrument.id();
777
778        let instruments = DashMap::new();
779        instruments.insert(instrument_id, instrument);
780
781        let ws_position = DydxPerpetualPosition {
782            market: "BTC-USD-PERP".into(),
783            status: DydxPositionStatus::Open,
784            side: PositionSide::Long,
785            size: "0.5".to_string(),
786            max_size: "1.0".to_string(),
787            entry_price: "49500.0".to_string(),
788            exit_price: None,
789            realized_pnl: "0.0".to_string(),
790            unrealized_pnl: "125.0".to_string(),
791            created_at: "2024-01-15T09:00:00Z".to_string(),
792            closed_at: None,
793            sum_open: "0.5".to_string(),
794            sum_close: "0.0".to_string(),
795            net_funding: "-2.5".to_string(),
796        };
797
798        let account_id = AccountId::new("DYDX-001");
799        let ts_init = UnixNanos::default();
800
801        let result = parse_ws_position_report(&ws_position, &instruments, account_id, ts_init);
802        assert!(result.is_ok());
803
804        let position_report = result.unwrap();
805        assert_eq!(position_report.instrument_id, instrument_id);
806        assert_eq!(position_report.position_side, PositionSideSpecified::Long);
807        assert_eq!(position_report.quantity.as_f64(), 0.5);
808        // avg_px_open should be entry_price
809        assert!(position_report.avg_px_open.is_some());
810    }
811
812    #[rstest]
813    fn test_parse_ws_position_report_short() {
814        use nautilus_model::enums::PositionSide;
815
816        use crate::{
817            common::enums::DydxPositionStatus, websocket::messages::DydxPerpetualPosition,
818        };
819
820        let instrument = create_test_instrument();
821        let instrument_id = instrument.id();
822
823        let instruments = DashMap::new();
824        instruments.insert(instrument_id, instrument);
825
826        let ws_position = DydxPerpetualPosition {
827            market: "BTC-USD-PERP".into(),
828            status: DydxPositionStatus::Open,
829            side: PositionSide::Short,
830            size: "-0.25".to_string(), // Negative for short
831            max_size: "0.5".to_string(),
832            entry_price: "51000.0".to_string(),
833            exit_price: None,
834            realized_pnl: "50.0".to_string(),
835            unrealized_pnl: "-75.25".to_string(),
836            created_at: "2024-01-15T08:00:00Z".to_string(),
837            closed_at: None,
838            sum_open: "0.25".to_string(),
839            sum_close: "0.0".to_string(),
840            net_funding: "1.5".to_string(),
841        };
842
843        let account_id = AccountId::new("DYDX-001");
844        let ts_init = UnixNanos::default();
845
846        let result = parse_ws_position_report(&ws_position, &instruments, account_id, ts_init);
847        assert!(result.is_ok());
848
849        let position_report = result.unwrap();
850        assert_eq!(position_report.instrument_id, instrument_id);
851        assert_eq!(position_report.position_side, PositionSideSpecified::Short);
852        assert_eq!(position_report.quantity.as_f64(), 0.25); // Quantity is always positive
853    }
854
855    #[rstest]
856    fn test_parse_ws_position_report_missing_instrument() {
857        use nautilus_model::enums::PositionSide;
858
859        use crate::{
860            common::enums::DydxPositionStatus, websocket::messages::DydxPerpetualPosition,
861        };
862
863        let instruments = DashMap::new(); // Empty - no instruments cached
864
865        let ws_position = DydxPerpetualPosition {
866            market: "ETH-USD-PERP".into(),
867            status: DydxPositionStatus::Open,
868            side: PositionSide::Long,
869            size: "5.0".to_string(),
870            max_size: "10.0".to_string(),
871            entry_price: "3000.0".to_string(),
872            exit_price: None,
873            realized_pnl: "0.0".to_string(),
874            unrealized_pnl: "500.0".to_string(),
875            created_at: "2024-01-15T07:00:00Z".to_string(),
876            closed_at: None,
877            sum_open: "5.0".to_string(),
878            sum_close: "0.0".to_string(),
879            net_funding: "-5.0".to_string(),
880        };
881
882        let account_id = AccountId::new("DYDX-001");
883        let ts_init = UnixNanos::default();
884
885        let result = parse_ws_position_report(&ws_position, &instruments, account_id, ts_init);
886        assert!(result.is_err());
887        assert!(
888            result
889                .unwrap_err()
890                .to_string()
891                .contains("No instrument cached for market")
892        );
893    }
894
895    #[rstest]
896    #[case(DydxOrderStatus::Filled, "2.0")]
897    #[case(DydxOrderStatus::Canceled, "0.0")]
898    #[case(DydxOrderStatus::BestEffortCanceled, "0.5")]
899    #[case(DydxOrderStatus::BestEffortOpened, "0.0")]
900    #[case(DydxOrderStatus::Untriggered, "0.0")]
901    fn test_parse_ws_order_various_statuses(
902        #[case] status: DydxOrderStatus,
903        #[case] total_filled: &str,
904    ) {
905        let ws_order = DydxWsOrderSubaccountMessageContents {
906            id: format!("order_{status:?}"),
907            subaccount_id: "dydx1test/0".to_string(),
908            client_id: "99999".to_string(),
909            clob_pair_id: "1".to_string(),
910            side: OrderSide::Buy,
911            size: "2.0".to_string(),
912            price: "50000.0".to_string(),
913            status,
914            order_type: DydxOrderType::Limit,
915            time_in_force: DydxTimeInForce::Gtt,
916            post_only: false,
917            reduce_only: false,
918            order_flags: "0".to_string(),
919            good_til_block: Some("1000".to_string()),
920            good_til_block_time: None,
921            created_at_height: "900".to_string(),
922            client_metadata: "0".to_string(),
923            trigger_price: None,
924            total_filled: total_filled.to_string(),
925            updated_at: Some("2024-11-14T10:00:00Z".to_string()),
926            updated_at_height: Some("950".to_string()),
927        };
928
929        let clob_pair_id_to_instrument = DashMap::new();
930        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
931        clob_pair_id_to_instrument.insert(1, instrument_id);
932
933        let instruments = DashMap::new();
934        let instrument = create_test_instrument();
935        instruments.insert(instrument_id, instrument);
936
937        let account_id = AccountId::new("DYDX-001");
938        let ts_init = UnixNanos::default();
939
940        let result = parse_ws_order_report(
941            &ws_order,
942            &clob_pair_id_to_instrument,
943            &instruments,
944            account_id,
945            ts_init,
946        );
947
948        assert!(
949            result.is_ok(),
950            "Failed to parse order with status {status:?}"
951        );
952        let report = result.unwrap();
953
954        // Verify status conversion
955        use nautilus_model::enums::OrderStatus;
956        let expected_status = match status {
957            DydxOrderStatus::Open
958            | DydxOrderStatus::BestEffortOpened
959            | DydxOrderStatus::Untriggered => OrderStatus::Accepted,
960            DydxOrderStatus::PartiallyFilled => OrderStatus::PartiallyFilled,
961            DydxOrderStatus::Filled => OrderStatus::Filled,
962            DydxOrderStatus::Canceled | DydxOrderStatus::BestEffortCanceled => {
963                OrderStatus::Canceled
964            }
965        };
966        assert_eq!(report.order_status, expected_status);
967    }
968
969    #[rstest]
970    fn test_parse_ws_order_with_trigger_price() {
971        let ws_order = DydxWsOrderSubaccountMessageContents {
972            id: "conditional_order".to_string(),
973            subaccount_id: "dydx1test/0".to_string(),
974            client_id: "88888".to_string(),
975            clob_pair_id: "1".to_string(),
976            side: OrderSide::Sell,
977            size: "1.0".to_string(),
978            price: "52000.0".to_string(),
979            status: DydxOrderStatus::Untriggered,
980            order_type: DydxOrderType::StopLimit,
981            time_in_force: DydxTimeInForce::Gtt,
982            post_only: false,
983            reduce_only: true,
984            order_flags: "32".to_string(),
985            good_til_block: None,
986            good_til_block_time: Some("2024-12-31T23:59:59Z".to_string()),
987            created_at_height: "1000".to_string(),
988            client_metadata: "100".to_string(),
989            trigger_price: Some("51500.0".to_string()),
990            total_filled: "0.0".to_string(),
991            updated_at: Some("2024-11-14T11:00:00Z".to_string()),
992            updated_at_height: Some("1050".to_string()),
993        };
994
995        let clob_pair_id_to_instrument = DashMap::new();
996        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
997        clob_pair_id_to_instrument.insert(1, instrument_id);
998
999        let instruments = DashMap::new();
1000        let instrument = create_test_instrument();
1001        instruments.insert(instrument_id, instrument);
1002
1003        let account_id = AccountId::new("DYDX-001");
1004        let ts_init = UnixNanos::default();
1005
1006        let result = parse_ws_order_report(
1007            &ws_order,
1008            &clob_pair_id_to_instrument,
1009            &instruments,
1010            account_id,
1011            ts_init,
1012        );
1013
1014        assert!(result.is_ok());
1015        let report = result.unwrap();
1016        assert_eq!(report.order_status, OrderStatus::PendingUpdate);
1017        // Trigger price should be parsed and available in the report
1018        assert!(report.trigger_price.is_some());
1019    }
1020
1021    #[rstest]
1022    fn test_parse_ws_order_market_type() {
1023        let ws_order = DydxWsOrderSubaccountMessageContents {
1024            id: "market_order".to_string(),
1025            subaccount_id: "dydx1test/0".to_string(),
1026            client_id: "77777".to_string(),
1027            clob_pair_id: "1".to_string(),
1028            side: OrderSide::Buy,
1029            size: "0.5".to_string(),
1030            price: "50000.0".to_string(), // Market orders still have a price
1031            status: DydxOrderStatus::Filled,
1032            order_type: DydxOrderType::Market,
1033            time_in_force: DydxTimeInForce::Ioc,
1034            post_only: false,
1035            reduce_only: false,
1036            order_flags: "0".to_string(),
1037            good_til_block: Some("1000".to_string()),
1038            good_til_block_time: None,
1039            created_at_height: "900".to_string(),
1040            client_metadata: "0".to_string(),
1041            trigger_price: None,
1042            total_filled: "0.5".to_string(),
1043            updated_at: Some("2024-11-14T10:01:00Z".to_string()),
1044            updated_at_height: Some("901".to_string()),
1045        };
1046
1047        let clob_pair_id_to_instrument = DashMap::new();
1048        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1049        clob_pair_id_to_instrument.insert(1, instrument_id);
1050
1051        let instruments = DashMap::new();
1052        let instrument = create_test_instrument();
1053        instruments.insert(instrument_id, instrument);
1054
1055        let account_id = AccountId::new("DYDX-001");
1056        let ts_init = UnixNanos::default();
1057
1058        let result = parse_ws_order_report(
1059            &ws_order,
1060            &clob_pair_id_to_instrument,
1061            &instruments,
1062            account_id,
1063            ts_init,
1064        );
1065
1066        assert!(result.is_ok());
1067        let report = result.unwrap();
1068        assert_eq!(report.order_type, OrderType::Market);
1069        assert_eq!(report.order_status, OrderStatus::Filled);
1070    }
1071
1072    #[rstest]
1073    fn test_parse_ws_order_invalid_clob_pair_id() {
1074        let ws_order = DydxWsOrderSubaccountMessageContents {
1075            id: "bad_order".to_string(),
1076            subaccount_id: "dydx1test/0".to_string(),
1077            client_id: "12345".to_string(),
1078            clob_pair_id: "not_a_number".to_string(), // Invalid
1079            side: OrderSide::Buy,
1080            size: "1.0".to_string(),
1081            price: "50000.0".to_string(),
1082            status: DydxOrderStatus::Open,
1083            order_type: DydxOrderType::Limit,
1084            time_in_force: DydxTimeInForce::Gtt,
1085            post_only: false,
1086            reduce_only: false,
1087            order_flags: "0".to_string(),
1088            good_til_block: Some("1000".to_string()),
1089            good_til_block_time: None,
1090            created_at_height: "900".to_string(),
1091            client_metadata: "0".to_string(),
1092            trigger_price: None,
1093            total_filled: "0.0".to_string(),
1094            updated_at: None,
1095            updated_at_height: None,
1096        };
1097
1098        let clob_pair_id_to_instrument = DashMap::new();
1099        let instruments = DashMap::new();
1100        let account_id = AccountId::new("DYDX-001");
1101        let ts_init = UnixNanos::default();
1102
1103        let result = parse_ws_order_report(
1104            &ws_order,
1105            &clob_pair_id_to_instrument,
1106            &instruments,
1107            account_id,
1108            ts_init,
1109        );
1110
1111        assert!(result.is_err());
1112        assert!(
1113            result
1114                .unwrap_err()
1115                .to_string()
1116                .contains("Failed to parse clob_pair_id")
1117        );
1118    }
1119
1120    #[rstest]
1121    fn test_parse_ws_position_closed() {
1122        use nautilus_model::enums::PositionSide;
1123
1124        use crate::{
1125            common::enums::DydxPositionStatus, websocket::messages::DydxPerpetualPosition,
1126        };
1127
1128        let instrument = create_test_instrument();
1129        let instrument_id = instrument.id();
1130
1131        let instruments = DashMap::new();
1132        instruments.insert(instrument_id, instrument);
1133
1134        let ws_position = DydxPerpetualPosition {
1135            market: "BTC-USD-PERP".into(),
1136            status: DydxPositionStatus::Closed,
1137            side: PositionSide::Long,
1138            size: "0.0".to_string(), // Closed = zero size
1139            max_size: "2.0".to_string(),
1140            entry_price: "48000.0".to_string(),
1141            exit_price: Some("52000.0".to_string()),
1142            realized_pnl: "2000.0".to_string(),
1143            unrealized_pnl: "0.0".to_string(),
1144            created_at: "2024-01-10T09:00:00Z".to_string(),
1145            closed_at: Some("2024-01-15T14:00:00Z".to_string()),
1146            sum_open: "5.0".to_string(),
1147            sum_close: "5.0".to_string(), // Fully closed
1148            net_funding: "-25.5".to_string(),
1149        };
1150
1151        let account_id = AccountId::new("DYDX-001");
1152        let ts_init = UnixNanos::default();
1153
1154        let result = parse_ws_position_report(&ws_position, &instruments, account_id, ts_init);
1155        assert!(result.is_ok());
1156
1157        let position_report = result.unwrap();
1158        assert_eq!(position_report.instrument_id, instrument_id);
1159        // Closed position should have zero quantity
1160        assert_eq!(position_report.quantity.as_f64(), 0.0);
1161    }
1162
1163    #[rstest]
1164    fn test_parse_ws_fill_with_maker_rebate() {
1165        use crate::{
1166            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
1167            websocket::messages::DydxWsFillSubaccountMessageContents,
1168        };
1169
1170        let instrument = create_test_instrument();
1171        let instrument_id = instrument.id();
1172
1173        let instruments = DashMap::new();
1174        instruments.insert(instrument_id, instrument);
1175
1176        let ws_fill = DydxWsFillSubaccountMessageContents {
1177            id: "fill_rebate".to_string(),
1178            subaccount_id: "sub1".to_string(),
1179            side: OrderSide::Buy,
1180            liquidity: DydxLiquidity::Maker,
1181            fill_type: DydxFillType::Limit,
1182            market: "BTC-USD-PERP".into(),
1183            market_type: DydxTickerType::Perpetual,
1184            price: "50000.0".to_string(),
1185            size: "1.0".to_string(),
1186            fee: "-15.0".to_string(), // Negative fee = rebate
1187            created_at: "2024-01-15T13:00:00Z".to_string(),
1188            created_at_height: "13000".to_string(),
1189            order_id: "order_maker".to_string(),
1190            client_metadata: "200".to_string(),
1191        };
1192
1193        let account_id = AccountId::new("DYDX-001");
1194        let ts_init = UnixNanos::default();
1195
1196        let result = parse_ws_fill_report(&ws_fill, &instruments, account_id, ts_init);
1197        assert!(result.is_ok());
1198
1199        let fill_report = result.unwrap();
1200        assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
1201        // Commission should be positive (rebate) after negating dYdX's negative fee
1202        assert!(fill_report.commission.as_f64() > 0.0);
1203    }
1204
1205    #[rstest]
1206    fn test_parse_ws_fill_taker_with_fee() {
1207        use crate::{
1208            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
1209            websocket::messages::DydxWsFillSubaccountMessageContents,
1210        };
1211
1212        let instrument = create_test_instrument();
1213        let instrument_id = instrument.id();
1214
1215        let instruments = DashMap::new();
1216        instruments.insert(instrument_id, instrument);
1217
1218        let ws_fill = DydxWsFillSubaccountMessageContents {
1219            id: "fill_taker".to_string(),
1220            subaccount_id: "sub2".to_string(),
1221            side: OrderSide::Sell,
1222            liquidity: DydxLiquidity::Taker,
1223            fill_type: DydxFillType::Limit,
1224            market: "BTC-USD-PERP".into(),
1225            market_type: DydxTickerType::Perpetual,
1226            price: "49800.0".to_string(),
1227            size: "0.75".to_string(),
1228            fee: "18.675".to_string(), // Positive fee for taker
1229            created_at: "2024-01-15T14:00:00Z".to_string(),
1230            created_at_height: "14000".to_string(),
1231            order_id: "order_taker".to_string(),
1232            client_metadata: "300".to_string(),
1233        };
1234
1235        let account_id = AccountId::new("DYDX-001");
1236        let ts_init = UnixNanos::default();
1237
1238        let result = parse_ws_fill_report(&ws_fill, &instruments, account_id, ts_init);
1239        assert!(result.is_ok());
1240
1241        let fill_report = result.unwrap();
1242        assert_eq!(fill_report.liquidity_side, LiquiditySide::Taker);
1243        assert_eq!(fill_report.order_side, OrderSide::Sell);
1244        // Commission should be negative (cost to trader) after negating positive fee
1245        assert!(fill_report.commission.as_f64() < 0.0);
1246    }
1247}