nautilus_dydx/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsing utilities for dYdX WebSocket messages.
17//!
18//! Converts WebSocket-specific message formats into Nautilus domain types
19//! by transforming them into HTTP-equivalent structures and delegating to
20//! the HTTP parser for consistency.
21
22use std::str::FromStr;
23
24use anyhow::Context;
25use chrono::{DateTime, Utc};
26use dashmap::DashMap;
27use nautilus_core::UnixNanos;
28use nautilus_model::{
29    enums::{OrderSide, OrderStatus},
30    identifiers::{AccountId, InstrumentId},
31    instruments::{Instrument, InstrumentAny},
32    reports::{FillReport, OrderStatusReport, PositionStatusReport},
33};
34use rust_decimal::Decimal;
35
36use crate::{
37    common::{enums::DydxOrderStatus, parse::extract_raw_symbol},
38    http::{
39        models::{Fill, Order, PerpetualPosition},
40        parse::{parse_fill_report, parse_order_status_report, parse_position_status_report},
41    },
42    websocket::messages::{
43        DydxPerpetualPosition, DydxWsFillSubaccountMessageContents,
44        DydxWsOrderSubaccountMessageContents,
45    },
46};
47
48/// Parses a WebSocket order update into an OrderStatusReport.
49///
50/// Converts the WebSocket order format to the HTTP Order format, then delegates
51/// to the existing HTTP parser for consistency.
52///
53/// # Errors
54///
55/// Returns an error if:
56/// - clob_pair_id cannot be parsed from string.
57/// - Instrument lookup fails for the clob_pair_id.
58/// - Field parsing fails (price, size, etc.).
59/// - HTTP parser fails.
60pub fn parse_ws_order_report(
61    ws_order: &DydxWsOrderSubaccountMessageContents,
62    clob_pair_id_to_instrument: &DashMap<u32, InstrumentId>,
63    instruments: &DashMap<InstrumentId, InstrumentAny>,
64    account_id: AccountId,
65    ts_init: UnixNanos,
66) -> anyhow::Result<OrderStatusReport> {
67    let clob_pair_id: u32 = ws_order.clob_pair_id.parse().context(format!(
68        "Failed to parse clob_pair_id '{}'",
69        ws_order.clob_pair_id
70    ))?;
71
72    let instrument_id = *clob_pair_id_to_instrument
73        .get(&clob_pair_id)
74        .ok_or_else(|| {
75            let available: Vec<u32> = clob_pair_id_to_instrument
76                .iter()
77                .map(|entry| *entry.key())
78                .collect();
79            anyhow::anyhow!(
80                "No instrument cached for clob_pair_id {clob_pair_id}. Available: {available:?}"
81            )
82        })?
83        .value();
84
85    let instrument = instruments
86        .get(&instrument_id)
87        .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found in cache"))?
88        .value()
89        .clone();
90
91    let http_order = convert_ws_order_to_http(ws_order)?;
92    let mut report = parse_order_status_report(&http_order, &instrument, account_id, ts_init)?;
93
94    // For untriggered conditional orders with an explicit trigger price we
95    // surface `PendingUpdate` to match Nautilus semantics and existing dYdX
96    // enum mapping.
97    if matches!(ws_order.status, DydxOrderStatus::Untriggered) && ws_order.trigger_price.is_some() {
98        report.order_status = OrderStatus::PendingUpdate;
99    }
100
101    Ok(report)
102}
103
104/// Converts a WebSocket order message to HTTP Order format.
105///
106/// # Errors
107///
108/// Returns an error if any field parsing fails.
109fn convert_ws_order_to_http(
110    ws_order: &DydxWsOrderSubaccountMessageContents,
111) -> anyhow::Result<Order> {
112    let clob_pair_id: u32 = ws_order
113        .clob_pair_id
114        .parse()
115        .context("Failed to parse clob_pair_id")?;
116
117    let size: Decimal = ws_order.size.parse().context("Failed to parse size")?;
118
119    let total_filled: Decimal = ws_order
120        .total_filled
121        .parse()
122        .context("Failed to parse total_filled")?;
123
124    // Saturate to zero if total_filled exceeds size (edge case: rounding or partial fills)
125    let remaining_size = (size - total_filled).max(Decimal::ZERO);
126
127    let price: Decimal = ws_order.price.parse().context("Failed to parse price")?;
128
129    let created_at_height: u64 = ws_order
130        .created_at_height
131        .parse()
132        .context("Failed to parse created_at_height")?;
133
134    let client_metadata: u32 = ws_order
135        .client_metadata
136        .parse()
137        .context("Failed to parse client_metadata")?;
138
139    let order_flags: u32 = ws_order
140        .order_flags
141        .parse()
142        .context("Failed to parse order_flags")?;
143
144    let good_til_block = ws_order
145        .good_til_block
146        .as_ref()
147        .and_then(|s| s.parse::<u64>().ok());
148
149    let good_til_block_time = ws_order
150        .good_til_block_time
151        .as_ref()
152        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
153        .map(|dt| dt.with_timezone(&Utc));
154
155    let trigger_price = ws_order
156        .trigger_price
157        .as_ref()
158        .and_then(|s| Decimal::from_str(s).ok());
159
160    // Parse updated_at (optional for BEST_EFFORT_OPENED orders)
161    let updated_at = ws_order
162        .updated_at
163        .as_ref()
164        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
165        .map(|dt| dt.with_timezone(&Utc));
166
167    // Parse updated_at_height (optional for BEST_EFFORT_OPENED orders)
168    let updated_at_height = ws_order
169        .updated_at_height
170        .as_ref()
171        .and_then(|s| s.parse::<u64>().ok());
172
173    let total_filled = size.checked_sub(remaining_size).unwrap_or(Decimal::ZERO);
174
175    Ok(Order {
176        id: ws_order.id.clone(),
177        subaccount_id: ws_order.subaccount_id.clone(),
178        client_id: ws_order.client_id.clone(),
179        clob_pair_id,
180        side: ws_order.side,
181        size,
182        total_filled,
183        price,
184        status: ws_order.status,
185        order_type: ws_order.order_type,
186        time_in_force: ws_order.time_in_force,
187        reduce_only: ws_order.reduce_only,
188        post_only: ws_order.post_only,
189        order_flags,
190        good_til_block,
191        good_til_block_time,
192        created_at_height: Some(created_at_height),
193        client_metadata,
194        trigger_price,
195        condition_type: None, // Not provided in WebSocket messages
196        conditional_order_trigger_subticks: None, // Not provided in WebSocket messages
197        execution: None,      // Inferred from post_only flag by HTTP parser
198        updated_at,
199        updated_at_height,
200        ticker: None,               // Not provided in WebSocket messages
201        subaccount_number: 0,       // Default to 0 for WebSocket messages
202        order_router_address: None, // Not provided in WebSocket messages
203    })
204}
205
206/// Parses a WebSocket fill update into a FillReport.
207///
208/// Converts the WebSocket fill format to the HTTP Fill format, then delegates
209/// to the existing HTTP parser for consistency.
210///
211/// # Errors
212///
213/// Returns an error if:
214/// - Instrument lookup fails for the market symbol.
215/// - Field parsing fails (price, size, fee, etc.).
216/// - HTTP parser fails.
217pub fn parse_ws_fill_report(
218    ws_fill: &DydxWsFillSubaccountMessageContents,
219    instruments: &DashMap<InstrumentId, InstrumentAny>,
220    account_id: AccountId,
221    ts_init: UnixNanos,
222) -> anyhow::Result<FillReport> {
223    let instrument = instruments
224        .iter()
225        .find(|entry| {
226            extract_raw_symbol(entry.value().id().symbol.as_str()) == ws_fill.market.as_str()
227        })
228        .ok_or_else(|| {
229            let available: Vec<String> = instruments
230                .iter()
231                .map(|entry| entry.value().id().symbol.to_string())
232                .collect();
233            anyhow::anyhow!(
234                "No instrument cached for market '{}'. Available: {:?}",
235                ws_fill.market,
236                available
237            )
238        })?
239        .value()
240        .clone();
241
242    let http_fill = convert_ws_fill_to_http(ws_fill)?;
243    parse_fill_report(&http_fill, &instrument, account_id, ts_init)
244}
245
246/// Converts a WebSocket fill message to HTTP Fill format.
247///
248/// # Errors
249///
250/// Returns an error if any field parsing fails.
251fn convert_ws_fill_to_http(ws_fill: &DydxWsFillSubaccountMessageContents) -> anyhow::Result<Fill> {
252    let price: Decimal = ws_fill.price.parse().context("Failed to parse price")?;
253    let size: Decimal = ws_fill.size.parse().context("Failed to parse size")?;
254    let fee: Decimal = ws_fill.fee.parse().context("Failed to parse fee")?;
255
256    let created_at_height: u64 = ws_fill
257        .created_at_height
258        .parse()
259        .context("Failed to parse created_at_height")?;
260
261    let client_metadata: u32 = ws_fill
262        .client_metadata
263        .parse()
264        .context("Failed to parse client_metadata")?;
265
266    let created_at = DateTime::parse_from_rfc3339(&ws_fill.created_at)
267        .context("Failed to parse created_at")?
268        .with_timezone(&Utc);
269
270    Ok(Fill {
271        id: ws_fill.id.clone(),
272        side: ws_fill.side,
273        liquidity: ws_fill.liquidity,
274        fill_type: ws_fill.fill_type,
275        market: ws_fill.market.to_string(),
276        market_type: ws_fill.market_type,
277        price,
278        size,
279        fee,
280        created_at,
281        created_at_height,
282        order_id: ws_fill.order_id.clone(),
283        client_metadata,
284    })
285}
286
287/// Parses a WebSocket position into a PositionStatusReport.
288///
289/// Converts the WebSocket position format to the HTTP PerpetualPosition format,
290/// then delegates to the existing HTTP parser for consistency.
291///
292/// # Errors
293///
294/// Returns an error if:
295/// - Instrument lookup fails for the market symbol.
296/// - Field parsing fails (size, prices, etc.).
297/// - HTTP parser fails.
298pub fn parse_ws_position_report(
299    ws_position: &DydxPerpetualPosition,
300    instruments: &DashMap<InstrumentId, InstrumentAny>,
301    account_id: AccountId,
302    ts_init: UnixNanos,
303) -> anyhow::Result<PositionStatusReport> {
304    let instrument = instruments
305        .iter()
306        .find(|entry| {
307            extract_raw_symbol(entry.value().id().symbol.as_str()) == ws_position.market.as_str()
308        })
309        .ok_or_else(|| {
310            let available: Vec<String> = instruments
311                .iter()
312                .map(|entry| entry.value().id().symbol.to_string())
313                .collect();
314            anyhow::anyhow!(
315                "No instrument cached for market '{}'. Available: {:?}",
316                ws_position.market,
317                available
318            )
319        })?
320        .value()
321        .clone();
322
323    let http_position = convert_ws_position_to_http(ws_position)?;
324    parse_position_status_report(&http_position, &instrument, account_id, ts_init)
325}
326
327/// Converts a WebSocket position to HTTP PerpetualPosition format.
328///
329/// # Errors
330///
331/// Returns an error if any field parsing fails.
332fn convert_ws_position_to_http(
333    ws_position: &DydxPerpetualPosition,
334) -> anyhow::Result<PerpetualPosition> {
335    let size: Decimal = ws_position.size.parse().context("Failed to parse size")?;
336
337    let max_size: Decimal = ws_position
338        .max_size
339        .parse()
340        .context("Failed to parse max_size")?;
341
342    let entry_price: Decimal = ws_position
343        .entry_price
344        .parse()
345        .context("Failed to parse entry_price")?;
346
347    let exit_price: Option<Decimal> = ws_position
348        .exit_price
349        .as_ref()
350        .map(|s| s.parse())
351        .transpose()
352        .context("Failed to parse exit_price")?;
353
354    let realized_pnl: Decimal = ws_position
355        .realized_pnl
356        .parse()
357        .context("Failed to parse realized_pnl")?;
358
359    let unrealized_pnl: Decimal = ws_position
360        .unrealized_pnl
361        .parse()
362        .context("Failed to parse unrealized_pnl")?;
363
364    let sum_open: Decimal = ws_position
365        .sum_open
366        .parse()
367        .context("Failed to parse sum_open")?;
368
369    let sum_close: Decimal = ws_position
370        .sum_close
371        .parse()
372        .context("Failed to parse sum_close")?;
373
374    let net_funding: Decimal = ws_position
375        .net_funding
376        .parse()
377        .context("Failed to parse net_funding")?;
378
379    let created_at = DateTime::parse_from_rfc3339(&ws_position.created_at)
380        .context("Failed to parse created_at")?
381        .with_timezone(&Utc);
382
383    let closed_at = ws_position
384        .closed_at
385        .as_ref()
386        .map(|s| DateTime::parse_from_rfc3339(s))
387        .transpose()
388        .context("Failed to parse closed_at")?
389        .map(|dt| dt.with_timezone(&Utc));
390
391    // Determine side from size sign (HTTP format uses OrderSide, not PositionSide)
392    let side = if size.is_sign_positive() {
393        OrderSide::Buy
394    } else {
395        OrderSide::Sell
396    };
397
398    Ok(PerpetualPosition {
399        market: ws_position.market.to_string(),
400        status: ws_position.status,
401        side,
402        size,
403        max_size,
404        entry_price,
405        exit_price,
406        realized_pnl,
407        created_at_height: 0, // Not provided in WebSocket messages
408        created_at,
409        sum_open,
410        sum_close,
411        net_funding,
412        unrealized_pnl,
413        closed_at,
414    })
415}
416
417#[cfg(test)]
418mod tests {
419    use nautilus_model::{
420        enums::{LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified},
421        identifiers::{AccountId, InstrumentId, Symbol, Venue},
422        instruments::CryptoPerpetual,
423        types::{Currency, Price, Quantity},
424    };
425    use rstest::rstest;
426
427    use super::*;
428    use crate::common::enums::{DydxOrderStatus, DydxOrderType, DydxTimeInForce};
429
430    fn create_test_instrument() -> InstrumentAny {
431        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
432
433        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
434            instrument_id,
435            Symbol::new("BTC-USD"),
436            Currency::BTC(),
437            Currency::USD(),
438            Currency::USD(),
439            false,
440            2,
441            8,
442            Price::new(0.01, 2),
443            Quantity::new(0.001, 8),
444            Some(Quantity::new(1.0, 0)),
445            Some(Quantity::new(0.001, 8)),
446            Some(Quantity::new(100000.0, 8)),
447            Some(Quantity::new(0.001, 8)),
448            None,
449            None,
450            Some(Price::new(1000000.0, 2)),
451            Some(Price::new(0.01, 2)),
452            Some(rust_decimal_macros::dec!(0.05)),
453            Some(rust_decimal_macros::dec!(0.03)),
454            Some(rust_decimal_macros::dec!(0.0002)),
455            Some(rust_decimal_macros::dec!(0.0005)),
456            UnixNanos::default(),
457            UnixNanos::default(),
458        ))
459    }
460
461    #[rstest]
462    fn test_convert_ws_order_to_http_basic() {
463        let ws_order = DydxWsOrderSubaccountMessageContents {
464            id: "order123".to_string(),
465            subaccount_id: "dydx1test/0".to_string(),
466            client_id: "12345".to_string(),
467            clob_pair_id: "1".to_string(),
468            side: OrderSide::Buy,
469            size: "1.5".to_string(),
470            price: "50000.0".to_string(),
471            status: DydxOrderStatus::PartiallyFilled,
472            order_type: DydxOrderType::Limit,
473            time_in_force: DydxTimeInForce::Gtt,
474            post_only: false,
475            reduce_only: false,
476            order_flags: "0".to_string(),
477            good_til_block: Some("1000".to_string()),
478            good_til_block_time: None,
479            created_at_height: "900".to_string(),
480            client_metadata: "0".to_string(),
481            trigger_price: None,
482            total_filled: "0.5".to_string(),
483            updated_at: Some("2024-11-14T10:00:00Z".to_string()),
484            updated_at_height: Some("950".to_string()),
485        };
486
487        let result = convert_ws_order_to_http(&ws_order);
488        assert!(result.is_ok());
489
490        let http_order = result.unwrap();
491        assert_eq!(http_order.id, "order123");
492        assert_eq!(http_order.clob_pair_id, 1);
493        assert_eq!(http_order.size.to_string(), "1.5");
494        assert_eq!(http_order.total_filled, rust_decimal_macros::dec!(0.5)); // 0.5 filled
495        assert_eq!(http_order.status, DydxOrderStatus::PartiallyFilled);
496    }
497
498    #[rstest]
499    fn test_parse_ws_order_report_success() {
500        let ws_order = DydxWsOrderSubaccountMessageContents {
501            id: "order456".to_string(),
502            subaccount_id: "dydx1test/0".to_string(),
503            client_id: "67890".to_string(),
504            clob_pair_id: "1".to_string(),
505            side: OrderSide::Sell,
506            size: "2.0".to_string(),
507            price: "51000.0".to_string(),
508            status: DydxOrderStatus::Open,
509            order_type: DydxOrderType::Limit,
510            time_in_force: DydxTimeInForce::Gtt,
511            post_only: true,
512            reduce_only: false,
513            order_flags: "0".to_string(),
514            good_til_block: Some("2000".to_string()),
515            good_til_block_time: None,
516            created_at_height: "1800".to_string(),
517            client_metadata: "0".to_string(),
518            trigger_price: None,
519            total_filled: "0.0".to_string(),
520            updated_at: None,
521            updated_at_height: None,
522        };
523
524        let clob_pair_id_to_instrument = DashMap::new();
525        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
526        clob_pair_id_to_instrument.insert(1, instrument_id);
527
528        let instruments = DashMap::new();
529        let instrument = create_test_instrument();
530        instruments.insert(instrument_id, instrument);
531
532        let account_id = AccountId::new("DYDX-001");
533        let ts_init = UnixNanos::default();
534
535        let result = parse_ws_order_report(
536            &ws_order,
537            &clob_pair_id_to_instrument,
538            &instruments,
539            account_id,
540            ts_init,
541        );
542
543        assert!(result.is_ok());
544        let report = result.unwrap();
545        assert_eq!(report.account_id, account_id);
546        assert_eq!(report.order_side, OrderSide::Sell);
547    }
548
549    #[rstest]
550    fn test_parse_ws_order_report_missing_instrument() {
551        let ws_order = DydxWsOrderSubaccountMessageContents {
552            id: "order789".to_string(),
553            subaccount_id: "dydx1test/0".to_string(),
554            client_id: "11111".to_string(),
555            clob_pair_id: "99".to_string(), // Non-existent
556            side: OrderSide::Buy,
557            size: "1.0".to_string(),
558            price: "50000.0".to_string(),
559            status: DydxOrderStatus::Open,
560            order_type: DydxOrderType::Market,
561            time_in_force: DydxTimeInForce::Ioc,
562            post_only: false,
563            reduce_only: false,
564            order_flags: "0".to_string(),
565            good_til_block: Some("1000".to_string()),
566            good_til_block_time: None,
567            created_at_height: "900".to_string(),
568            client_metadata: "0".to_string(),
569            trigger_price: None,
570            total_filled: "0.0".to_string(),
571            updated_at: None,
572            updated_at_height: None,
573        };
574
575        let clob_pair_id_to_instrument = DashMap::new();
576        let instruments = DashMap::new();
577        let account_id = AccountId::new("DYDX-001");
578        let ts_init = UnixNanos::default();
579
580        let result = parse_ws_order_report(
581            &ws_order,
582            &clob_pair_id_to_instrument,
583            &instruments,
584            account_id,
585            ts_init,
586        );
587
588        assert!(result.is_err());
589        assert!(
590            result
591                .unwrap_err()
592                .to_string()
593                .contains("No instrument cached")
594        );
595    }
596
597    #[rstest]
598    fn test_convert_ws_fill_to_http() {
599        use crate::{
600            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
601            websocket::messages::DydxWsFillSubaccountMessageContents,
602        };
603
604        let ws_fill = DydxWsFillSubaccountMessageContents {
605            id: "fill123".to_string(),
606            subaccount_id: "sub1".to_string(),
607            side: OrderSide::Buy,
608            liquidity: DydxLiquidity::Maker,
609            fill_type: DydxFillType::Limit,
610            market: "BTC-USD".into(),
611            market_type: DydxTickerType::Perpetual,
612            price: "50000.5".to_string(),
613            size: "0.1".to_string(),
614            fee: "-2.5".to_string(), // Negative for maker rebate
615            created_at: "2024-01-15T10:30:00Z".to_string(),
616            created_at_height: "12345".to_string(),
617            order_id: "order456".to_string(),
618            client_metadata: "999".to_string(),
619        };
620
621        let result = convert_ws_fill_to_http(&ws_fill);
622        assert!(result.is_ok());
623
624        let http_fill = result.unwrap();
625        assert_eq!(http_fill.id, "fill123");
626        assert_eq!(http_fill.side, OrderSide::Buy);
627        assert_eq!(http_fill.liquidity, DydxLiquidity::Maker);
628        assert_eq!(http_fill.price, rust_decimal_macros::dec!(50000.5));
629        assert_eq!(http_fill.size, rust_decimal_macros::dec!(0.1));
630        assert_eq!(http_fill.fee, rust_decimal_macros::dec!(-2.5));
631        assert_eq!(http_fill.created_at_height, 12345);
632        assert_eq!(http_fill.order_id, "order456");
633        assert_eq!(http_fill.client_metadata, 999);
634    }
635
636    #[rstest]
637    fn test_parse_ws_fill_report_success() {
638        use crate::{
639            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
640            websocket::messages::DydxWsFillSubaccountMessageContents,
641        };
642
643        let instrument = create_test_instrument();
644        let instrument_id = instrument.id();
645
646        let instruments = DashMap::new();
647        instruments.insert(instrument_id, instrument);
648
649        // dYdX WS fills use market format "BTC-USD" (not "BTC-USD-PERP")
650        // but the instrument symbol is "BTC-USD-PERP"
651        let ws_fill = DydxWsFillSubaccountMessageContents {
652            id: "fill789".to_string(),
653            subaccount_id: "sub1".to_string(),
654            side: OrderSide::Sell,
655            liquidity: DydxLiquidity::Taker,
656            fill_type: DydxFillType::Limit,
657            market: "BTC-USD".into(),
658            market_type: DydxTickerType::Perpetual,
659            price: "49500.0".to_string(),
660            size: "0.5".to_string(),
661            fee: "12.375".to_string(), // Positive for taker fee
662            created_at: "2024-01-15T11:00:00Z".to_string(),
663            created_at_height: "12400".to_string(),
664            order_id: "order999".to_string(),
665            client_metadata: "888".to_string(),
666        };
667
668        let account_id = AccountId::new("DYDX-001");
669        let ts_init = UnixNanos::default();
670
671        let result = parse_ws_fill_report(&ws_fill, &instruments, account_id, ts_init);
672        assert!(result.is_ok());
673
674        let fill_report = result.unwrap();
675        assert_eq!(fill_report.instrument_id, instrument_id);
676        assert_eq!(fill_report.venue_order_id.as_str(), "order999");
677        assert_eq!(fill_report.last_qty.as_f64(), 0.5);
678        assert_eq!(fill_report.last_px.as_f64(), 49500.0);
679        // Commission should be negative (cost to trader) after negating positive fee
680        assert!((fill_report.commission.as_f64() + 12.38).abs() < 0.01);
681    }
682
683    #[rstest]
684    fn test_parse_ws_fill_report_missing_instrument() {
685        use crate::{
686            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
687            websocket::messages::DydxWsFillSubaccountMessageContents,
688        };
689
690        let instruments = DashMap::new(); // Empty - no instruments cached
691
692        let ws_fill = DydxWsFillSubaccountMessageContents {
693            id: "fill000".to_string(),
694            subaccount_id: "sub1".to_string(),
695            side: OrderSide::Buy,
696            liquidity: DydxLiquidity::Maker,
697            fill_type: DydxFillType::Limit,
698            market: "ETH-USD-PERP".into(),
699            market_type: DydxTickerType::Perpetual,
700            price: "3000.0".to_string(),
701            size: "1.0".to_string(),
702            fee: "-1.5".to_string(),
703            created_at: "2024-01-15T12:00:00Z".to_string(),
704            created_at_height: "12500".to_string(),
705            order_id: "order111".to_string(),
706            client_metadata: "777".to_string(),
707        };
708
709        let account_id = AccountId::new("DYDX-001");
710        let ts_init = UnixNanos::default();
711
712        let result = parse_ws_fill_report(&ws_fill, &instruments, account_id, ts_init);
713        assert!(result.is_err());
714        assert!(
715            result
716                .unwrap_err()
717                .to_string()
718                .contains("No instrument cached for market")
719        );
720    }
721
722    #[rstest]
723    fn test_convert_ws_position_to_http() {
724        use nautilus_model::enums::PositionSide;
725
726        use crate::{
727            common::enums::DydxPositionStatus, websocket::messages::DydxPerpetualPosition,
728        };
729
730        let ws_position = DydxPerpetualPosition {
731            market: "BTC-USD".into(),
732            status: DydxPositionStatus::Open,
733            side: PositionSide::Long,
734            size: "1.5".to_string(),
735            max_size: "2.0".to_string(),
736            entry_price: "50000.0".to_string(),
737            exit_price: None,
738            realized_pnl: "100.0".to_string(),
739            unrealized_pnl: "250.5".to_string(),
740            created_at: "2024-01-15T10:00:00Z".to_string(),
741            closed_at: None,
742            sum_open: "5.0".to_string(),
743            sum_close: "3.5".to_string(),
744            net_funding: "-10.25".to_string(),
745        };
746
747        let result = convert_ws_position_to_http(&ws_position);
748        assert!(result.is_ok());
749
750        let http_position = result.unwrap();
751        assert_eq!(http_position.market, "BTC-USD");
752        assert_eq!(http_position.status, DydxPositionStatus::Open);
753        assert_eq!(http_position.side, OrderSide::Buy); // Positive size = Buy
754        assert_eq!(http_position.size, rust_decimal_macros::dec!(1.5));
755        assert_eq!(http_position.max_size, rust_decimal_macros::dec!(2.0));
756        assert_eq!(
757            http_position.entry_price,
758            rust_decimal_macros::dec!(50000.0)
759        );
760        assert_eq!(http_position.exit_price, None);
761        assert_eq!(http_position.realized_pnl, rust_decimal_macros::dec!(100.0));
762        assert_eq!(
763            http_position.unrealized_pnl,
764            rust_decimal_macros::dec!(250.5)
765        );
766        assert_eq!(http_position.sum_open, rust_decimal_macros::dec!(5.0));
767        assert_eq!(http_position.sum_close, rust_decimal_macros::dec!(3.5));
768        assert_eq!(http_position.net_funding, rust_decimal_macros::dec!(-10.25));
769    }
770
771    #[rstest]
772    fn test_parse_ws_position_report_success() {
773        use nautilus_model::enums::PositionSide;
774
775        use crate::{
776            common::enums::DydxPositionStatus, websocket::messages::DydxPerpetualPosition,
777        };
778
779        let instrument = create_test_instrument();
780        let instrument_id = instrument.id();
781
782        let instruments = DashMap::new();
783        instruments.insert(instrument_id, instrument);
784
785        let ws_position = DydxPerpetualPosition {
786            market: "BTC-USD".into(),
787            status: DydxPositionStatus::Open,
788            side: PositionSide::Long,
789            size: "0.5".to_string(),
790            max_size: "1.0".to_string(),
791            entry_price: "49500.0".to_string(),
792            exit_price: None,
793            realized_pnl: "0.0".to_string(),
794            unrealized_pnl: "125.0".to_string(),
795            created_at: "2024-01-15T09:00:00Z".to_string(),
796            closed_at: None,
797            sum_open: "0.5".to_string(),
798            sum_close: "0.0".to_string(),
799            net_funding: "-2.5".to_string(),
800        };
801
802        let account_id = AccountId::new("DYDX-001");
803        let ts_init = UnixNanos::default();
804
805        let result = parse_ws_position_report(&ws_position, &instruments, account_id, ts_init);
806        assert!(result.is_ok());
807
808        let position_report = result.unwrap();
809        assert_eq!(position_report.instrument_id, instrument_id);
810        assert_eq!(position_report.position_side, PositionSideSpecified::Long);
811        assert_eq!(position_report.quantity.as_f64(), 0.5);
812        // avg_px_open should be entry_price
813        assert!(position_report.avg_px_open.is_some());
814    }
815
816    #[rstest]
817    fn test_parse_ws_position_report_short() {
818        use nautilus_model::enums::PositionSide;
819
820        use crate::{
821            common::enums::DydxPositionStatus, websocket::messages::DydxPerpetualPosition,
822        };
823
824        let instrument = create_test_instrument();
825        let instrument_id = instrument.id();
826
827        let instruments = DashMap::new();
828        instruments.insert(instrument_id, instrument);
829
830        let ws_position = DydxPerpetualPosition {
831            market: "BTC-USD".into(),
832            status: DydxPositionStatus::Open,
833            side: PositionSide::Short,
834            size: "-0.25".to_string(), // Negative for short
835            max_size: "0.5".to_string(),
836            entry_price: "51000.0".to_string(),
837            exit_price: None,
838            realized_pnl: "50.0".to_string(),
839            unrealized_pnl: "-75.25".to_string(),
840            created_at: "2024-01-15T08:00:00Z".to_string(),
841            closed_at: None,
842            sum_open: "0.25".to_string(),
843            sum_close: "0.0".to_string(),
844            net_funding: "1.5".to_string(),
845        };
846
847        let account_id = AccountId::new("DYDX-001");
848        let ts_init = UnixNanos::default();
849
850        let result = parse_ws_position_report(&ws_position, &instruments, account_id, ts_init);
851        assert!(result.is_ok());
852
853        let position_report = result.unwrap();
854        assert_eq!(position_report.instrument_id, instrument_id);
855        assert_eq!(position_report.position_side, PositionSideSpecified::Short);
856        assert_eq!(position_report.quantity.as_f64(), 0.25); // Quantity is always positive
857    }
858
859    #[rstest]
860    fn test_parse_ws_position_report_missing_instrument() {
861        use nautilus_model::enums::PositionSide;
862
863        use crate::{
864            common::enums::DydxPositionStatus, websocket::messages::DydxPerpetualPosition,
865        };
866
867        let instruments = DashMap::new(); // Empty - no instruments cached
868
869        let ws_position = DydxPerpetualPosition {
870            market: "ETH-USD-PERP".into(),
871            status: DydxPositionStatus::Open,
872            side: PositionSide::Long,
873            size: "5.0".to_string(),
874            max_size: "10.0".to_string(),
875            entry_price: "3000.0".to_string(),
876            exit_price: None,
877            realized_pnl: "0.0".to_string(),
878            unrealized_pnl: "500.0".to_string(),
879            created_at: "2024-01-15T07:00:00Z".to_string(),
880            closed_at: None,
881            sum_open: "5.0".to_string(),
882            sum_close: "0.0".to_string(),
883            net_funding: "-5.0".to_string(),
884        };
885
886        let account_id = AccountId::new("DYDX-001");
887        let ts_init = UnixNanos::default();
888
889        let result = parse_ws_position_report(&ws_position, &instruments, account_id, ts_init);
890        assert!(result.is_err());
891        assert!(
892            result
893                .unwrap_err()
894                .to_string()
895                .contains("No instrument cached for market")
896        );
897    }
898
899    #[rstest]
900    #[case(DydxOrderStatus::Filled, "2.0")]
901    #[case(DydxOrderStatus::Canceled, "0.0")]
902    #[case(DydxOrderStatus::BestEffortCanceled, "0.5")]
903    #[case(DydxOrderStatus::BestEffortOpened, "0.0")]
904    #[case(DydxOrderStatus::Untriggered, "0.0")]
905    fn test_parse_ws_order_various_statuses(
906        #[case] status: DydxOrderStatus,
907        #[case] total_filled: &str,
908    ) {
909        let ws_order = DydxWsOrderSubaccountMessageContents {
910            id: format!("order_{status:?}"),
911            subaccount_id: "dydx1test/0".to_string(),
912            client_id: "99999".to_string(),
913            clob_pair_id: "1".to_string(),
914            side: OrderSide::Buy,
915            size: "2.0".to_string(),
916            price: "50000.0".to_string(),
917            status,
918            order_type: DydxOrderType::Limit,
919            time_in_force: DydxTimeInForce::Gtt,
920            post_only: false,
921            reduce_only: false,
922            order_flags: "0".to_string(),
923            good_til_block: Some("1000".to_string()),
924            good_til_block_time: None,
925            created_at_height: "900".to_string(),
926            client_metadata: "0".to_string(),
927            trigger_price: None,
928            total_filled: total_filled.to_string(),
929            updated_at: Some("2024-11-14T10:00:00Z".to_string()),
930            updated_at_height: Some("950".to_string()),
931        };
932
933        let clob_pair_id_to_instrument = DashMap::new();
934        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
935        clob_pair_id_to_instrument.insert(1, instrument_id);
936
937        let instruments = DashMap::new();
938        let instrument = create_test_instrument();
939        instruments.insert(instrument_id, instrument);
940
941        let account_id = AccountId::new("DYDX-001");
942        let ts_init = UnixNanos::default();
943
944        let result = parse_ws_order_report(
945            &ws_order,
946            &clob_pair_id_to_instrument,
947            &instruments,
948            account_id,
949            ts_init,
950        );
951
952        assert!(
953            result.is_ok(),
954            "Failed to parse order with status {status:?}"
955        );
956        let report = result.unwrap();
957
958        // Verify status conversion
959        let expected_status = match status {
960            DydxOrderStatus::Open
961            | DydxOrderStatus::BestEffortOpened
962            | DydxOrderStatus::Untriggered => OrderStatus::Accepted,
963            DydxOrderStatus::PartiallyFilled => OrderStatus::PartiallyFilled,
964            DydxOrderStatus::Filled => OrderStatus::Filled,
965            DydxOrderStatus::Canceled | DydxOrderStatus::BestEffortCanceled => {
966                OrderStatus::Canceled
967            }
968        };
969        assert_eq!(report.order_status, expected_status);
970    }
971
972    #[rstest]
973    fn test_parse_ws_order_with_trigger_price() {
974        let ws_order = DydxWsOrderSubaccountMessageContents {
975            id: "conditional_order".to_string(),
976            subaccount_id: "dydx1test/0".to_string(),
977            client_id: "88888".to_string(),
978            clob_pair_id: "1".to_string(),
979            side: OrderSide::Sell,
980            size: "1.0".to_string(),
981            price: "52000.0".to_string(),
982            status: DydxOrderStatus::Untriggered,
983            order_type: DydxOrderType::StopLimit,
984            time_in_force: DydxTimeInForce::Gtt,
985            post_only: false,
986            reduce_only: true,
987            order_flags: "32".to_string(),
988            good_til_block: None,
989            good_til_block_time: Some("2024-12-31T23:59:59Z".to_string()),
990            created_at_height: "1000".to_string(),
991            client_metadata: "100".to_string(),
992            trigger_price: Some("51500.0".to_string()),
993            total_filled: "0.0".to_string(),
994            updated_at: Some("2024-11-14T11:00:00Z".to_string()),
995            updated_at_height: Some("1050".to_string()),
996        };
997
998        let clob_pair_id_to_instrument = DashMap::new();
999        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1000        clob_pair_id_to_instrument.insert(1, instrument_id);
1001
1002        let instruments = DashMap::new();
1003        let instrument = create_test_instrument();
1004        instruments.insert(instrument_id, instrument);
1005
1006        let account_id = AccountId::new("DYDX-001");
1007        let ts_init = UnixNanos::default();
1008
1009        let result = parse_ws_order_report(
1010            &ws_order,
1011            &clob_pair_id_to_instrument,
1012            &instruments,
1013            account_id,
1014            ts_init,
1015        );
1016
1017        assert!(result.is_ok());
1018        let report = result.unwrap();
1019        assert_eq!(report.order_status, OrderStatus::PendingUpdate);
1020        // Trigger price should be parsed and available in the report
1021        assert!(report.trigger_price.is_some());
1022    }
1023
1024    #[rstest]
1025    fn test_parse_ws_order_market_type() {
1026        let ws_order = DydxWsOrderSubaccountMessageContents {
1027            id: "market_order".to_string(),
1028            subaccount_id: "dydx1test/0".to_string(),
1029            client_id: "77777".to_string(),
1030            clob_pair_id: "1".to_string(),
1031            side: OrderSide::Buy,
1032            size: "0.5".to_string(),
1033            price: "50000.0".to_string(), // Market orders still have a price
1034            status: DydxOrderStatus::Filled,
1035            order_type: DydxOrderType::Market,
1036            time_in_force: DydxTimeInForce::Ioc,
1037            post_only: false,
1038            reduce_only: false,
1039            order_flags: "0".to_string(),
1040            good_til_block: Some("1000".to_string()),
1041            good_til_block_time: None,
1042            created_at_height: "900".to_string(),
1043            client_metadata: "0".to_string(),
1044            trigger_price: None,
1045            total_filled: "0.5".to_string(),
1046            updated_at: Some("2024-11-14T10:01:00Z".to_string()),
1047            updated_at_height: Some("901".to_string()),
1048        };
1049
1050        let clob_pair_id_to_instrument = DashMap::new();
1051        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1052        clob_pair_id_to_instrument.insert(1, instrument_id);
1053
1054        let instruments = DashMap::new();
1055        let instrument = create_test_instrument();
1056        instruments.insert(instrument_id, instrument);
1057
1058        let account_id = AccountId::new("DYDX-001");
1059        let ts_init = UnixNanos::default();
1060
1061        let result = parse_ws_order_report(
1062            &ws_order,
1063            &clob_pair_id_to_instrument,
1064            &instruments,
1065            account_id,
1066            ts_init,
1067        );
1068
1069        assert!(result.is_ok());
1070        let report = result.unwrap();
1071        assert_eq!(report.order_type, OrderType::Market);
1072        assert_eq!(report.order_status, OrderStatus::Filled);
1073    }
1074
1075    #[rstest]
1076    fn test_parse_ws_order_invalid_clob_pair_id() {
1077        let ws_order = DydxWsOrderSubaccountMessageContents {
1078            id: "bad_order".to_string(),
1079            subaccount_id: "dydx1test/0".to_string(),
1080            client_id: "12345".to_string(),
1081            clob_pair_id: "not_a_number".to_string(), // Invalid
1082            side: OrderSide::Buy,
1083            size: "1.0".to_string(),
1084            price: "50000.0".to_string(),
1085            status: DydxOrderStatus::Open,
1086            order_type: DydxOrderType::Limit,
1087            time_in_force: DydxTimeInForce::Gtt,
1088            post_only: false,
1089            reduce_only: false,
1090            order_flags: "0".to_string(),
1091            good_til_block: Some("1000".to_string()),
1092            good_til_block_time: None,
1093            created_at_height: "900".to_string(),
1094            client_metadata: "0".to_string(),
1095            trigger_price: None,
1096            total_filled: "0.0".to_string(),
1097            updated_at: None,
1098            updated_at_height: None,
1099        };
1100
1101        let clob_pair_id_to_instrument = DashMap::new();
1102        let instruments = DashMap::new();
1103        let account_id = AccountId::new("DYDX-001");
1104        let ts_init = UnixNanos::default();
1105
1106        let result = parse_ws_order_report(
1107            &ws_order,
1108            &clob_pair_id_to_instrument,
1109            &instruments,
1110            account_id,
1111            ts_init,
1112        );
1113
1114        assert!(result.is_err());
1115        assert!(
1116            result
1117                .unwrap_err()
1118                .to_string()
1119                .contains("Failed to parse clob_pair_id")
1120        );
1121    }
1122
1123    #[rstest]
1124    fn test_parse_ws_position_closed() {
1125        use nautilus_model::enums::PositionSide;
1126
1127        use crate::{
1128            common::enums::DydxPositionStatus, websocket::messages::DydxPerpetualPosition,
1129        };
1130
1131        let instrument = create_test_instrument();
1132        let instrument_id = instrument.id();
1133
1134        let instruments = DashMap::new();
1135        instruments.insert(instrument_id, instrument);
1136
1137        let ws_position = DydxPerpetualPosition {
1138            market: "BTC-USD".into(),
1139            status: DydxPositionStatus::Closed,
1140            side: PositionSide::Long,
1141            size: "0.0".to_string(), // Closed = zero size
1142            max_size: "2.0".to_string(),
1143            entry_price: "48000.0".to_string(),
1144            exit_price: Some("52000.0".to_string()),
1145            realized_pnl: "2000.0".to_string(),
1146            unrealized_pnl: "0.0".to_string(),
1147            created_at: "2024-01-10T09:00:00Z".to_string(),
1148            closed_at: Some("2024-01-15T14:00:00Z".to_string()),
1149            sum_open: "5.0".to_string(),
1150            sum_close: "5.0".to_string(), // Fully closed
1151            net_funding: "-25.5".to_string(),
1152        };
1153
1154        let account_id = AccountId::new("DYDX-001");
1155        let ts_init = UnixNanos::default();
1156
1157        let result = parse_ws_position_report(&ws_position, &instruments, account_id, ts_init);
1158        assert!(result.is_ok());
1159
1160        let position_report = result.unwrap();
1161        assert_eq!(position_report.instrument_id, instrument_id);
1162        // Closed position should have zero quantity
1163        assert_eq!(position_report.quantity.as_f64(), 0.0);
1164    }
1165
1166    #[rstest]
1167    fn test_parse_ws_fill_with_maker_rebate() {
1168        use crate::{
1169            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
1170            websocket::messages::DydxWsFillSubaccountMessageContents,
1171        };
1172
1173        let instrument = create_test_instrument();
1174        let instrument_id = instrument.id();
1175
1176        let instruments = DashMap::new();
1177        instruments.insert(instrument_id, instrument);
1178
1179        let ws_fill = DydxWsFillSubaccountMessageContents {
1180            id: "fill_rebate".to_string(),
1181            subaccount_id: "sub1".to_string(),
1182            side: OrderSide::Buy,
1183            liquidity: DydxLiquidity::Maker,
1184            fill_type: DydxFillType::Limit,
1185            market: "BTC-USD".into(),
1186            market_type: DydxTickerType::Perpetual,
1187            price: "50000.0".to_string(),
1188            size: "1.0".to_string(),
1189            fee: "-15.0".to_string(), // Negative fee = rebate
1190            created_at: "2024-01-15T13:00:00Z".to_string(),
1191            created_at_height: "13000".to_string(),
1192            order_id: "order_maker".to_string(),
1193            client_metadata: "200".to_string(),
1194        };
1195
1196        let account_id = AccountId::new("DYDX-001");
1197        let ts_init = UnixNanos::default();
1198
1199        let result = parse_ws_fill_report(&ws_fill, &instruments, account_id, ts_init);
1200        assert!(result.is_ok());
1201
1202        let fill_report = result.unwrap();
1203        assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
1204        // Commission should be positive (rebate) after negating dYdX's negative fee
1205        assert!(fill_report.commission.as_f64() > 0.0);
1206    }
1207
1208    #[rstest]
1209    fn test_parse_ws_fill_taker_with_fee() {
1210        use crate::{
1211            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
1212            websocket::messages::DydxWsFillSubaccountMessageContents,
1213        };
1214
1215        let instrument = create_test_instrument();
1216        let instrument_id = instrument.id();
1217
1218        let instruments = DashMap::new();
1219        instruments.insert(instrument_id, instrument);
1220
1221        let ws_fill = DydxWsFillSubaccountMessageContents {
1222            id: "fill_taker".to_string(),
1223            subaccount_id: "sub2".to_string(),
1224            side: OrderSide::Sell,
1225            liquidity: DydxLiquidity::Taker,
1226            fill_type: DydxFillType::Limit,
1227            market: "BTC-USD".into(),
1228            market_type: DydxTickerType::Perpetual,
1229            price: "49800.0".to_string(),
1230            size: "0.75".to_string(),
1231            fee: "18.675".to_string(), // Positive fee for taker
1232            created_at: "2024-01-15T14:00:00Z".to_string(),
1233            created_at_height: "14000".to_string(),
1234            order_id: "order_taker".to_string(),
1235            client_metadata: "300".to_string(),
1236        };
1237
1238        let account_id = AccountId::new("DYDX-001");
1239        let ts_init = UnixNanos::default();
1240
1241        let result = parse_ws_fill_report(&ws_fill, &instruments, account_id, ts_init);
1242        assert!(result.is_ok());
1243
1244        let fill_report = result.unwrap();
1245        assert_eq!(fill_report.liquidity_side, LiquiditySide::Taker);
1246        assert_eq!(fill_report.order_side, OrderSide::Sell);
1247        // Commission should be negative (cost to trader) after negating positive fee
1248        assert!(fill_report.commission.as_f64() < 0.0);
1249    }
1250}