nautilus_dydx/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsing utilities for dYdX WebSocket messages.
17//!
18//! Converts WebSocket-specific message formats into Nautilus domain types
19//! by transforming them into HTTP-equivalent structures and delegating to
20//! the HTTP parser for consistency.
21
22use std::str::FromStr;
23
24use anyhow::Context;
25use chrono::{DateTime, Utc};
26use dashmap::DashMap;
27use nautilus_core::UnixNanos;
28use nautilus_model::{
29    enums::{OrderSide, OrderStatus},
30    identifiers::{AccountId, InstrumentId},
31    instruments::{Instrument, InstrumentAny},
32    reports::{FillReport, OrderStatusReport, PositionStatusReport},
33};
34use rust_decimal::Decimal;
35
36use crate::{
37    common::enums::DydxOrderStatus,
38    http::{
39        models::{Fill, Order, PerpetualPosition},
40        parse::{parse_fill_report, parse_order_status_report, parse_position_status_report},
41    },
42    websocket::messages::{
43        DydxPerpetualPosition, DydxWsFillSubaccountMessageContents,
44        DydxWsOrderSubaccountMessageContents,
45    },
46};
47
48/// Parses a WebSocket order update into an OrderStatusReport.
49///
50/// Converts the WebSocket order format to the HTTP Order format, then delegates
51/// to the existing HTTP parser for consistency.
52///
53/// # Errors
54///
55/// Returns an error if:
56/// - clob_pair_id cannot be parsed from string
57/// - Instrument lookup fails for the clob_pair_id
58/// - Field parsing fails (price, size, etc.)
59/// - HTTP parser fails
60pub fn parse_ws_order_report(
61    ws_order: &DydxWsOrderSubaccountMessageContents,
62    clob_pair_id_to_instrument: &DashMap<u32, InstrumentId>,
63    instruments: &DashMap<InstrumentId, InstrumentAny>,
64    account_id: AccountId,
65    ts_init: UnixNanos,
66) -> anyhow::Result<OrderStatusReport> {
67    // Parse clob_pair_id from string
68    let clob_pair_id: u32 = ws_order.clob_pair_id.parse().context(format!(
69        "Failed to parse clob_pair_id '{}'",
70        ws_order.clob_pair_id
71    ))?;
72
73    // Lookup instrument by clob_pair_id
74    let instrument_id = *clob_pair_id_to_instrument
75        .get(&clob_pair_id)
76        .ok_or_else(|| {
77            let available: Vec<u32> = clob_pair_id_to_instrument
78                .iter()
79                .map(|entry| *entry.key())
80                .collect();
81            anyhow::anyhow!(
82                "No instrument cached for clob_pair_id {clob_pair_id}. Available: {available:?}"
83            )
84        })?
85        .value();
86
87    let instrument = instruments
88        .get(&instrument_id)
89        .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found in cache"))?
90        .value()
91        .clone();
92
93    // Convert WebSocket order to HTTP Order format
94    let http_order = convert_ws_order_to_http(ws_order)?;
95
96    // Delegate to existing HTTP parser
97    let mut report = parse_order_status_report(&http_order, &instrument, account_id, ts_init)?;
98
99    // For untriggered conditional orders with an explicit trigger price we
100    // surface `PendingUpdate` to match Nautilus semantics and existing dYdX
101    // enum mapping.
102    if matches!(ws_order.status, DydxOrderStatus::Untriggered) && ws_order.trigger_price.is_some() {
103        report.order_status = OrderStatus::PendingUpdate;
104    }
105
106    Ok(report)
107}
108
109/// Converts a WebSocket order message to HTTP Order format.
110///
111/// # Errors
112///
113/// Returns an error if any field parsing fails.
114fn convert_ws_order_to_http(
115    ws_order: &DydxWsOrderSubaccountMessageContents,
116) -> anyhow::Result<Order> {
117    // Parse numeric fields
118    let clob_pair_id: u32 = ws_order
119        .clob_pair_id
120        .parse()
121        .context("Failed to parse clob_pair_id")?;
122
123    let size: Decimal = ws_order.size.parse().context("Failed to parse size")?;
124
125    let total_filled: Decimal = ws_order
126        .total_filled
127        .parse()
128        .context("Failed to parse total_filled")?;
129
130    // Saturate to zero if total_filled exceeds size (edge case: rounding or partial fills)
131    let remaining_size = (size - total_filled).max(Decimal::ZERO);
132
133    let price: Decimal = ws_order.price.parse().context("Failed to parse price")?;
134
135    let created_at_height: u64 = ws_order
136        .created_at_height
137        .parse()
138        .context("Failed to parse created_at_height")?;
139
140    let client_metadata: u32 = ws_order
141        .client_metadata
142        .parse()
143        .context("Failed to parse client_metadata")?;
144
145    let order_flags: u32 = ws_order
146        .order_flags
147        .parse()
148        .context("Failed to parse order_flags")?;
149
150    // Parse optional fields
151    let good_til_block = ws_order
152        .good_til_block
153        .as_ref()
154        .and_then(|s| s.parse::<u64>().ok());
155
156    let good_til_block_time = ws_order
157        .good_til_block_time
158        .as_ref()
159        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
160        .map(|dt| dt.with_timezone(&Utc));
161
162    let trigger_price = ws_order
163        .trigger_price
164        .as_ref()
165        .and_then(|s| Decimal::from_str(s).ok());
166
167    // Parse updated_at (optional for BEST_EFFORT_OPENED orders)
168    let updated_at = ws_order
169        .updated_at
170        .as_ref()
171        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
172        .map(|dt| dt.with_timezone(&Utc));
173
174    // Parse updated_at_height (optional for BEST_EFFORT_OPENED orders)
175    let updated_at_height = ws_order
176        .updated_at_height
177        .as_ref()
178        .and_then(|s| s.parse::<u64>().ok());
179
180    // Calculate total filled from size - remaining_size
181    let total_filled = size.checked_sub(remaining_size).unwrap_or(Decimal::ZERO);
182
183    Ok(Order {
184        id: ws_order.id.clone(),
185        subaccount_id: ws_order.subaccount_id.clone(),
186        client_id: ws_order.client_id.clone(),
187        clob_pair_id,
188        side: ws_order.side,
189        size,
190        total_filled,
191        price,
192        status: ws_order.status,
193        order_type: ws_order.order_type,
194        time_in_force: ws_order.time_in_force,
195        reduce_only: ws_order.reduce_only,
196        post_only: ws_order.post_only,
197        order_flags,
198        good_til_block,
199        good_til_block_time,
200        created_at_height: Some(created_at_height),
201        client_metadata,
202        trigger_price,
203        condition_type: None, // Not provided in WebSocket messages
204        conditional_order_trigger_subticks: None, // Not provided in WebSocket messages
205        execution: None,      // Inferred from post_only flag by HTTP parser
206        updated_at,
207        updated_at_height,
208        ticker: None,               // Not provided in WebSocket messages
209        subaccount_number: 0,       // Default to 0 for WebSocket messages
210        order_router_address: None, // Not provided in WebSocket messages
211    })
212}
213
214/// Parses a WebSocket fill update into a FillReport.
215///
216/// Converts the WebSocket fill format to the HTTP Fill format, then delegates
217/// to the existing HTTP parser for consistency.
218///
219/// # Errors
220///
221/// Returns an error if:
222/// - Instrument lookup fails for the market symbol
223/// - Field parsing fails (price, size, fee, etc.)
224/// - HTTP parser fails
225pub fn parse_ws_fill_report(
226    ws_fill: &DydxWsFillSubaccountMessageContents,
227    instruments: &DashMap<InstrumentId, InstrumentAny>,
228    account_id: AccountId,
229    ts_init: UnixNanos,
230) -> anyhow::Result<FillReport> {
231    // Lookup instrument by market symbol
232    let instrument = instruments
233        .iter()
234        .find(|entry| entry.value().id().symbol.as_str() == ws_fill.market.as_str())
235        .ok_or_else(|| {
236            let available: Vec<String> = instruments
237                .iter()
238                .map(|entry| entry.value().id().symbol.to_string())
239                .collect();
240            anyhow::anyhow!(
241                "No instrument cached for market '{}'. Available: {:?}",
242                ws_fill.market,
243                available
244            )
245        })?
246        .value()
247        .clone();
248
249    // Convert WebSocket fill to HTTP Fill format
250    let http_fill = convert_ws_fill_to_http(ws_fill)?;
251
252    // Delegate to existing HTTP parser
253    parse_fill_report(&http_fill, &instrument, account_id, ts_init)
254}
255
256/// Converts a WebSocket fill message to HTTP Fill format.
257///
258/// # Errors
259///
260/// Returns an error if any field parsing fails.
261fn convert_ws_fill_to_http(ws_fill: &DydxWsFillSubaccountMessageContents) -> anyhow::Result<Fill> {
262    // Parse numeric fields
263    let price: Decimal = ws_fill.price.parse().context("Failed to parse price")?;
264
265    let size: Decimal = ws_fill.size.parse().context("Failed to parse size")?;
266
267    let fee: Decimal = ws_fill.fee.parse().context("Failed to parse fee")?;
268
269    let created_at_height: u64 = ws_fill
270        .created_at_height
271        .parse()
272        .context("Failed to parse created_at_height")?;
273
274    let client_metadata: u32 = ws_fill
275        .client_metadata
276        .parse()
277        .context("Failed to parse client_metadata")?;
278
279    // Parse timestamp
280    let created_at = DateTime::parse_from_rfc3339(&ws_fill.created_at)
281        .context("Failed to parse created_at")?
282        .with_timezone(&Utc);
283
284    Ok(Fill {
285        id: ws_fill.id.clone(),
286        side: ws_fill.side,
287        liquidity: ws_fill.liquidity,
288        fill_type: ws_fill.fill_type,
289        market: ws_fill.market.to_string(),
290        market_type: ws_fill.market_type,
291        price,
292        size,
293        fee,
294        created_at,
295        created_at_height,
296        order_id: ws_fill.order_id.clone(),
297        client_metadata,
298    })
299}
300
301/// Parses a WebSocket position into a PositionStatusReport.
302///
303/// Converts the WebSocket position format to the HTTP PerpetualPosition format,
304/// then delegates to the existing HTTP parser for consistency.
305///
306/// # Errors
307///
308/// Returns an error if:
309/// - Instrument lookup fails for the market symbol
310/// - Field parsing fails (size, prices, etc.)
311/// - HTTP parser fails
312pub fn parse_ws_position_report(
313    ws_position: &DydxPerpetualPosition,
314    instruments: &DashMap<InstrumentId, InstrumentAny>,
315    account_id: AccountId,
316    ts_init: UnixNanos,
317) -> anyhow::Result<PositionStatusReport> {
318    // Lookup instrument by market symbol
319    let instrument = instruments
320        .iter()
321        .find(|entry| entry.value().id().symbol.as_str() == ws_position.market.as_str())
322        .ok_or_else(|| {
323            let available: Vec<String> = instruments
324                .iter()
325                .map(|entry| entry.value().id().symbol.to_string())
326                .collect();
327            anyhow::anyhow!(
328                "No instrument cached for market '{}'. Available: {:?}",
329                ws_position.market,
330                available
331            )
332        })?
333        .value()
334        .clone();
335
336    // Convert WebSocket position to HTTP PerpetualPosition format
337    let http_position = convert_ws_position_to_http(ws_position)?;
338
339    // Delegate to existing HTTP parser
340    parse_position_status_report(&http_position, &instrument, account_id, ts_init)
341}
342
343/// Converts a WebSocket position to HTTP PerpetualPosition format.
344///
345/// # Errors
346///
347/// Returns an error if any field parsing fails.
348fn convert_ws_position_to_http(
349    ws_position: &DydxPerpetualPosition,
350) -> anyhow::Result<PerpetualPosition> {
351    // Parse numeric fields
352    let size: Decimal = ws_position.size.parse().context("Failed to parse size")?;
353
354    let max_size: Decimal = ws_position
355        .max_size
356        .parse()
357        .context("Failed to parse max_size")?;
358
359    let entry_price: Decimal = ws_position
360        .entry_price
361        .parse()
362        .context("Failed to parse entry_price")?;
363
364    let exit_price: Option<Decimal> = ws_position
365        .exit_price
366        .as_ref()
367        .map(|s| s.parse())
368        .transpose()
369        .context("Failed to parse exit_price")?;
370
371    let realized_pnl: Decimal = ws_position
372        .realized_pnl
373        .parse()
374        .context("Failed to parse realized_pnl")?;
375
376    let unrealized_pnl: Decimal = ws_position
377        .unrealized_pnl
378        .parse()
379        .context("Failed to parse unrealized_pnl")?;
380
381    let sum_open: Decimal = ws_position
382        .sum_open
383        .parse()
384        .context("Failed to parse sum_open")?;
385
386    let sum_close: Decimal = ws_position
387        .sum_close
388        .parse()
389        .context("Failed to parse sum_close")?;
390
391    let net_funding: Decimal = ws_position
392        .net_funding
393        .parse()
394        .context("Failed to parse net_funding")?;
395
396    // Parse timestamps
397    let created_at = DateTime::parse_from_rfc3339(&ws_position.created_at)
398        .context("Failed to parse created_at")?
399        .with_timezone(&Utc);
400
401    let closed_at = ws_position
402        .closed_at
403        .as_ref()
404        .map(|s| DateTime::parse_from_rfc3339(s))
405        .transpose()
406        .context("Failed to parse closed_at")?
407        .map(|dt| dt.with_timezone(&Utc));
408
409    // Determine side from size sign (HTTP format uses OrderSide, not PositionSide)
410    let side = if size.is_sign_positive() {
411        OrderSide::Buy
412    } else {
413        OrderSide::Sell
414    };
415
416    Ok(PerpetualPosition {
417        market: ws_position.market.to_string(),
418        status: ws_position.status,
419        side,
420        size,
421        max_size,
422        entry_price,
423        exit_price,
424        realized_pnl,
425        created_at_height: 0, // Not provided in WebSocket messages
426        created_at,
427        sum_open,
428        sum_close,
429        net_funding,
430        unrealized_pnl,
431        closed_at,
432    })
433}
434
435#[cfg(test)]
436mod tests {
437    use nautilus_model::{
438        enums::{LiquiditySide, OrderSide, OrderType, PositionSideSpecified},
439        identifiers::{AccountId, InstrumentId, Symbol, Venue},
440        instruments::CryptoPerpetual,
441        types::{Currency, Price, Quantity},
442    };
443    use rstest::rstest;
444
445    use super::*;
446    use crate::common::enums::{DydxOrderStatus, DydxOrderType, DydxTimeInForce};
447
448    fn create_test_instrument() -> InstrumentAny {
449        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
450
451        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
452            instrument_id,
453            Symbol::new("BTC-USD"),
454            Currency::BTC(),
455            Currency::USD(),
456            Currency::USD(),
457            false,
458            2,
459            8,
460            Price::new(0.01, 2),
461            Quantity::new(0.001, 8),
462            Some(Quantity::new(1.0, 0)),
463            Some(Quantity::new(0.001, 8)),
464            Some(Quantity::new(100000.0, 8)),
465            Some(Quantity::new(0.001, 8)),
466            None,
467            None,
468            Some(Price::new(1000000.0, 2)),
469            Some(Price::new(0.01, 2)),
470            Some(rust_decimal_macros::dec!(0.05)),
471            Some(rust_decimal_macros::dec!(0.03)),
472            Some(rust_decimal_macros::dec!(0.0002)),
473            Some(rust_decimal_macros::dec!(0.0005)),
474            UnixNanos::default(),
475            UnixNanos::default(),
476        ))
477    }
478
479    #[rstest]
480    fn test_convert_ws_order_to_http_basic() {
481        let ws_order = DydxWsOrderSubaccountMessageContents {
482            id: "order123".to_string(),
483            subaccount_id: "dydx1test/0".to_string(),
484            client_id: "12345".to_string(),
485            clob_pair_id: "1".to_string(),
486            side: OrderSide::Buy,
487            size: "1.5".to_string(),
488            price: "50000.0".to_string(),
489            status: DydxOrderStatus::PartiallyFilled,
490            order_type: DydxOrderType::Limit,
491            time_in_force: DydxTimeInForce::Gtt,
492            post_only: false,
493            reduce_only: false,
494            order_flags: "0".to_string(),
495            good_til_block: Some("1000".to_string()),
496            good_til_block_time: None,
497            created_at_height: "900".to_string(),
498            client_metadata: "0".to_string(),
499            trigger_price: None,
500            total_filled: "0.5".to_string(),
501            updated_at: Some("2024-11-14T10:00:00Z".to_string()),
502            updated_at_height: Some("950".to_string()),
503        };
504
505        let result = convert_ws_order_to_http(&ws_order);
506        assert!(result.is_ok());
507
508        let http_order = result.unwrap();
509        assert_eq!(http_order.id, "order123");
510        assert_eq!(http_order.clob_pair_id, 1);
511        assert_eq!(http_order.size.to_string(), "1.5");
512        assert_eq!(http_order.total_filled, rust_decimal_macros::dec!(0.5)); // 0.5 filled
513        assert_eq!(http_order.status, DydxOrderStatus::PartiallyFilled);
514    }
515
516    #[rstest]
517    fn test_parse_ws_order_report_success() {
518        let ws_order = DydxWsOrderSubaccountMessageContents {
519            id: "order456".to_string(),
520            subaccount_id: "dydx1test/0".to_string(),
521            client_id: "67890".to_string(),
522            clob_pair_id: "1".to_string(),
523            side: OrderSide::Sell,
524            size: "2.0".to_string(),
525            price: "51000.0".to_string(),
526            status: DydxOrderStatus::Open,
527            order_type: DydxOrderType::Limit,
528            time_in_force: DydxTimeInForce::Gtt,
529            post_only: true,
530            reduce_only: false,
531            order_flags: "0".to_string(),
532            good_til_block: Some("2000".to_string()),
533            good_til_block_time: None,
534            created_at_height: "1800".to_string(),
535            client_metadata: "0".to_string(),
536            trigger_price: None,
537            total_filled: "0.0".to_string(),
538            updated_at: None,
539            updated_at_height: None,
540        };
541
542        let clob_pair_id_to_instrument = DashMap::new();
543        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
544        clob_pair_id_to_instrument.insert(1, instrument_id);
545
546        let instruments = DashMap::new();
547        let instrument = create_test_instrument();
548        instruments.insert(instrument_id, instrument);
549
550        let account_id = AccountId::new("DYDX-001");
551        let ts_init = UnixNanos::default();
552
553        let result = parse_ws_order_report(
554            &ws_order,
555            &clob_pair_id_to_instrument,
556            &instruments,
557            account_id,
558            ts_init,
559        );
560
561        assert!(result.is_ok());
562        let report = result.unwrap();
563        assert_eq!(report.account_id, account_id);
564        assert_eq!(report.order_side, OrderSide::Sell);
565    }
566
567    #[rstest]
568    fn test_parse_ws_order_report_missing_instrument() {
569        let ws_order = DydxWsOrderSubaccountMessageContents {
570            id: "order789".to_string(),
571            subaccount_id: "dydx1test/0".to_string(),
572            client_id: "11111".to_string(),
573            clob_pair_id: "99".to_string(), // Non-existent
574            side: OrderSide::Buy,
575            size: "1.0".to_string(),
576            price: "50000.0".to_string(),
577            status: DydxOrderStatus::Open,
578            order_type: DydxOrderType::Market,
579            time_in_force: DydxTimeInForce::Ioc,
580            post_only: false,
581            reduce_only: false,
582            order_flags: "0".to_string(),
583            good_til_block: Some("1000".to_string()),
584            good_til_block_time: None,
585            created_at_height: "900".to_string(),
586            client_metadata: "0".to_string(),
587            trigger_price: None,
588            total_filled: "0.0".to_string(),
589            updated_at: None,
590            updated_at_height: None,
591        };
592
593        let clob_pair_id_to_instrument = DashMap::new();
594        let instruments = DashMap::new();
595        let account_id = AccountId::new("DYDX-001");
596        let ts_init = UnixNanos::default();
597
598        let result = parse_ws_order_report(
599            &ws_order,
600            &clob_pair_id_to_instrument,
601            &instruments,
602            account_id,
603            ts_init,
604        );
605
606        assert!(result.is_err());
607        assert!(
608            result
609                .unwrap_err()
610                .to_string()
611                .contains("No instrument cached")
612        );
613    }
614
615    // ========== Fill Parsing Tests ==========
616
617    #[rstest]
618    fn test_convert_ws_fill_to_http() {
619        use crate::{
620            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
621            websocket::messages::DydxWsFillSubaccountMessageContents,
622        };
623
624        let ws_fill = DydxWsFillSubaccountMessageContents {
625            id: "fill123".to_string(),
626            subaccount_id: "sub1".to_string(),
627            side: OrderSide::Buy,
628            liquidity: DydxLiquidity::Maker,
629            fill_type: DydxFillType::Limit,
630            market: "BTC-USD".into(),
631            market_type: DydxTickerType::Perpetual,
632            price: "50000.5".to_string(),
633            size: "0.1".to_string(),
634            fee: "-2.5".to_string(), // Negative for maker rebate
635            created_at: "2024-01-15T10:30:00Z".to_string(),
636            created_at_height: "12345".to_string(),
637            order_id: "order456".to_string(),
638            client_metadata: "999".to_string(),
639        };
640
641        let result = convert_ws_fill_to_http(&ws_fill);
642        assert!(result.is_ok());
643
644        let http_fill = result.unwrap();
645        assert_eq!(http_fill.id, "fill123");
646        assert_eq!(http_fill.side, OrderSide::Buy);
647        assert_eq!(http_fill.liquidity, DydxLiquidity::Maker);
648        assert_eq!(http_fill.price, rust_decimal_macros::dec!(50000.5));
649        assert_eq!(http_fill.size, rust_decimal_macros::dec!(0.1));
650        assert_eq!(http_fill.fee, rust_decimal_macros::dec!(-2.5));
651        assert_eq!(http_fill.created_at_height, 12345);
652        assert_eq!(http_fill.order_id, "order456");
653        assert_eq!(http_fill.client_metadata, 999);
654    }
655
656    #[rstest]
657    fn test_parse_ws_fill_report_success() {
658        use crate::{
659            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
660            websocket::messages::DydxWsFillSubaccountMessageContents,
661        };
662
663        let instrument = create_test_instrument();
664        let instrument_id = instrument.id();
665
666        let instruments = DashMap::new();
667        instruments.insert(instrument_id, instrument);
668
669        let ws_fill = DydxWsFillSubaccountMessageContents {
670            id: "fill789".to_string(),
671            subaccount_id: "sub1".to_string(),
672            side: OrderSide::Sell,
673            liquidity: DydxLiquidity::Taker,
674            fill_type: DydxFillType::Limit,
675            market: "BTC-USD-PERP".into(),
676            market_type: DydxTickerType::Perpetual,
677            price: "49500.0".to_string(),
678            size: "0.5".to_string(),
679            fee: "12.375".to_string(), // Positive for taker fee
680            created_at: "2024-01-15T11:00:00Z".to_string(),
681            created_at_height: "12400".to_string(),
682            order_id: "order999".to_string(),
683            client_metadata: "888".to_string(),
684        };
685
686        let account_id = AccountId::new("DYDX-001");
687        let ts_init = UnixNanos::default();
688
689        let result = parse_ws_fill_report(&ws_fill, &instruments, account_id, ts_init);
690        assert!(result.is_ok());
691
692        let fill_report = result.unwrap();
693        assert_eq!(fill_report.instrument_id, instrument_id);
694        assert_eq!(fill_report.venue_order_id.as_str(), "order999");
695        assert_eq!(fill_report.last_qty.as_f64(), 0.5);
696        assert_eq!(fill_report.last_px.as_f64(), 49500.0);
697        // Commission should be negative (cost to trader) after negating positive fee
698        assert!((fill_report.commission.as_f64() + 12.38).abs() < 0.01);
699    }
700
701    #[rstest]
702    fn test_parse_ws_fill_report_missing_instrument() {
703        use crate::{
704            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
705            websocket::messages::DydxWsFillSubaccountMessageContents,
706        };
707
708        let instruments = DashMap::new(); // Empty - no instruments cached
709
710        let ws_fill = DydxWsFillSubaccountMessageContents {
711            id: "fill000".to_string(),
712            subaccount_id: "sub1".to_string(),
713            side: OrderSide::Buy,
714            liquidity: DydxLiquidity::Maker,
715            fill_type: DydxFillType::Limit,
716            market: "ETH-USD-PERP".into(),
717            market_type: DydxTickerType::Perpetual,
718            price: "3000.0".to_string(),
719            size: "1.0".to_string(),
720            fee: "-1.5".to_string(),
721            created_at: "2024-01-15T12:00:00Z".to_string(),
722            created_at_height: "12500".to_string(),
723            order_id: "order111".to_string(),
724            client_metadata: "777".to_string(),
725        };
726
727        let account_id = AccountId::new("DYDX-001");
728        let ts_init = UnixNanos::default();
729
730        let result = parse_ws_fill_report(&ws_fill, &instruments, account_id, ts_init);
731        assert!(result.is_err());
732        assert!(
733            result
734                .unwrap_err()
735                .to_string()
736                .contains("No instrument cached for market")
737        );
738    }
739
740    // ========== Position Parsing Tests ==========
741
742    #[rstest]
743    fn test_convert_ws_position_to_http() {
744        use nautilus_model::enums::PositionSide;
745
746        use crate::{
747            common::enums::DydxPositionStatus, websocket::messages::DydxPerpetualPosition,
748        };
749
750        let ws_position = DydxPerpetualPosition {
751            market: "BTC-USD".into(),
752            status: DydxPositionStatus::Open,
753            side: PositionSide::Long,
754            size: "1.5".to_string(),
755            max_size: "2.0".to_string(),
756            entry_price: "50000.0".to_string(),
757            exit_price: None,
758            realized_pnl: "100.0".to_string(),
759            unrealized_pnl: "250.5".to_string(),
760            created_at: "2024-01-15T10:00:00Z".to_string(),
761            closed_at: None,
762            sum_open: "5.0".to_string(),
763            sum_close: "3.5".to_string(),
764            net_funding: "-10.25".to_string(),
765        };
766
767        let result = convert_ws_position_to_http(&ws_position);
768        assert!(result.is_ok());
769
770        let http_position = result.unwrap();
771        assert_eq!(http_position.market, "BTC-USD");
772        assert_eq!(http_position.status, DydxPositionStatus::Open);
773        assert_eq!(http_position.side, OrderSide::Buy); // Positive size = Buy
774        assert_eq!(http_position.size, rust_decimal_macros::dec!(1.5));
775        assert_eq!(http_position.max_size, rust_decimal_macros::dec!(2.0));
776        assert_eq!(
777            http_position.entry_price,
778            rust_decimal_macros::dec!(50000.0)
779        );
780        assert_eq!(http_position.exit_price, None);
781        assert_eq!(http_position.realized_pnl, rust_decimal_macros::dec!(100.0));
782        assert_eq!(
783            http_position.unrealized_pnl,
784            rust_decimal_macros::dec!(250.5)
785        );
786        assert_eq!(http_position.sum_open, rust_decimal_macros::dec!(5.0));
787        assert_eq!(http_position.sum_close, rust_decimal_macros::dec!(3.5));
788        assert_eq!(http_position.net_funding, rust_decimal_macros::dec!(-10.25));
789    }
790
791    #[rstest]
792    fn test_parse_ws_position_report_success() {
793        use nautilus_model::enums::PositionSide;
794
795        use crate::{
796            common::enums::DydxPositionStatus, websocket::messages::DydxPerpetualPosition,
797        };
798
799        let instrument = create_test_instrument();
800        let instrument_id = instrument.id();
801
802        let instruments = DashMap::new();
803        instruments.insert(instrument_id, instrument);
804
805        let ws_position = DydxPerpetualPosition {
806            market: "BTC-USD-PERP".into(),
807            status: DydxPositionStatus::Open,
808            side: PositionSide::Long,
809            size: "0.5".to_string(),
810            max_size: "1.0".to_string(),
811            entry_price: "49500.0".to_string(),
812            exit_price: None,
813            realized_pnl: "0.0".to_string(),
814            unrealized_pnl: "125.0".to_string(),
815            created_at: "2024-01-15T09:00:00Z".to_string(),
816            closed_at: None,
817            sum_open: "0.5".to_string(),
818            sum_close: "0.0".to_string(),
819            net_funding: "-2.5".to_string(),
820        };
821
822        let account_id = AccountId::new("DYDX-001");
823        let ts_init = UnixNanos::default();
824
825        let result = parse_ws_position_report(&ws_position, &instruments, account_id, ts_init);
826        assert!(result.is_ok());
827
828        let position_report = result.unwrap();
829        assert_eq!(position_report.instrument_id, instrument_id);
830        assert_eq!(position_report.position_side, PositionSideSpecified::Long);
831        assert_eq!(position_report.quantity.as_f64(), 0.5);
832        // avg_px_open should be entry_price
833        assert!(position_report.avg_px_open.is_some());
834    }
835
836    #[rstest]
837    fn test_parse_ws_position_report_short() {
838        use nautilus_model::enums::PositionSide;
839
840        use crate::{
841            common::enums::DydxPositionStatus, websocket::messages::DydxPerpetualPosition,
842        };
843
844        let instrument = create_test_instrument();
845        let instrument_id = instrument.id();
846
847        let instruments = DashMap::new();
848        instruments.insert(instrument_id, instrument);
849
850        let ws_position = DydxPerpetualPosition {
851            market: "BTC-USD-PERP".into(),
852            status: DydxPositionStatus::Open,
853            side: PositionSide::Short,
854            size: "-0.25".to_string(), // Negative for short
855            max_size: "0.5".to_string(),
856            entry_price: "51000.0".to_string(),
857            exit_price: None,
858            realized_pnl: "50.0".to_string(),
859            unrealized_pnl: "-75.25".to_string(),
860            created_at: "2024-01-15T08:00:00Z".to_string(),
861            closed_at: None,
862            sum_open: "0.25".to_string(),
863            sum_close: "0.0".to_string(),
864            net_funding: "1.5".to_string(),
865        };
866
867        let account_id = AccountId::new("DYDX-001");
868        let ts_init = UnixNanos::default();
869
870        let result = parse_ws_position_report(&ws_position, &instruments, account_id, ts_init);
871        assert!(result.is_ok());
872
873        let position_report = result.unwrap();
874        assert_eq!(position_report.instrument_id, instrument_id);
875        assert_eq!(position_report.position_side, PositionSideSpecified::Short);
876        assert_eq!(position_report.quantity.as_f64(), 0.25); // Quantity is always positive
877    }
878
879    #[rstest]
880    fn test_parse_ws_position_report_missing_instrument() {
881        use nautilus_model::enums::PositionSide;
882
883        use crate::{
884            common::enums::DydxPositionStatus, websocket::messages::DydxPerpetualPosition,
885        };
886
887        let instruments = DashMap::new(); // Empty - no instruments cached
888
889        let ws_position = DydxPerpetualPosition {
890            market: "ETH-USD-PERP".into(),
891            status: DydxPositionStatus::Open,
892            side: PositionSide::Long,
893            size: "5.0".to_string(),
894            max_size: "10.0".to_string(),
895            entry_price: "3000.0".to_string(),
896            exit_price: None,
897            realized_pnl: "0.0".to_string(),
898            unrealized_pnl: "500.0".to_string(),
899            created_at: "2024-01-15T07:00:00Z".to_string(),
900            closed_at: None,
901            sum_open: "5.0".to_string(),
902            sum_close: "0.0".to_string(),
903            net_funding: "-5.0".to_string(),
904        };
905
906        let account_id = AccountId::new("DYDX-001");
907        let ts_init = UnixNanos::default();
908
909        let result = parse_ws_position_report(&ws_position, &instruments, account_id, ts_init);
910        assert!(result.is_err());
911        assert!(
912            result
913                .unwrap_err()
914                .to_string()
915                .contains("No instrument cached for market")
916        );
917    }
918
919    #[rstest]
920    #[case(DydxOrderStatus::Filled, "2.0")]
921    #[case(DydxOrderStatus::Canceled, "0.0")]
922    #[case(DydxOrderStatus::BestEffortCanceled, "0.5")]
923    #[case(DydxOrderStatus::BestEffortOpened, "0.0")]
924    #[case(DydxOrderStatus::Untriggered, "0.0")]
925    fn test_parse_ws_order_various_statuses(
926        #[case] status: DydxOrderStatus,
927        #[case] total_filled: &str,
928    ) {
929        let ws_order = DydxWsOrderSubaccountMessageContents {
930            id: format!("order_{status:?}"),
931            subaccount_id: "dydx1test/0".to_string(),
932            client_id: "99999".to_string(),
933            clob_pair_id: "1".to_string(),
934            side: OrderSide::Buy,
935            size: "2.0".to_string(),
936            price: "50000.0".to_string(),
937            status,
938            order_type: DydxOrderType::Limit,
939            time_in_force: DydxTimeInForce::Gtt,
940            post_only: false,
941            reduce_only: false,
942            order_flags: "0".to_string(),
943            good_til_block: Some("1000".to_string()),
944            good_til_block_time: None,
945            created_at_height: "900".to_string(),
946            client_metadata: "0".to_string(),
947            trigger_price: None,
948            total_filled: total_filled.to_string(),
949            updated_at: Some("2024-11-14T10:00:00Z".to_string()),
950            updated_at_height: Some("950".to_string()),
951        };
952
953        let clob_pair_id_to_instrument = DashMap::new();
954        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
955        clob_pair_id_to_instrument.insert(1, instrument_id);
956
957        let instruments = DashMap::new();
958        let instrument = create_test_instrument();
959        instruments.insert(instrument_id, instrument);
960
961        let account_id = AccountId::new("DYDX-001");
962        let ts_init = UnixNanos::default();
963
964        let result = parse_ws_order_report(
965            &ws_order,
966            &clob_pair_id_to_instrument,
967            &instruments,
968            account_id,
969            ts_init,
970        );
971
972        assert!(
973            result.is_ok(),
974            "Failed to parse order with status {status:?}"
975        );
976        let report = result.unwrap();
977
978        // Verify status conversion
979        use nautilus_model::enums::OrderStatus;
980        let expected_status = match status {
981            DydxOrderStatus::Open
982            | DydxOrderStatus::BestEffortOpened
983            | DydxOrderStatus::Untriggered => OrderStatus::Accepted,
984            DydxOrderStatus::PartiallyFilled => OrderStatus::PartiallyFilled,
985            DydxOrderStatus::Filled => OrderStatus::Filled,
986            DydxOrderStatus::Canceled | DydxOrderStatus::BestEffortCanceled => {
987                OrderStatus::Canceled
988            }
989        };
990        assert_eq!(report.order_status, expected_status);
991    }
992
993    #[rstest]
994    fn test_parse_ws_order_with_trigger_price() {
995        let ws_order = DydxWsOrderSubaccountMessageContents {
996            id: "conditional_order".to_string(),
997            subaccount_id: "dydx1test/0".to_string(),
998            client_id: "88888".to_string(),
999            clob_pair_id: "1".to_string(),
1000            side: OrderSide::Sell,
1001            size: "1.0".to_string(),
1002            price: "52000.0".to_string(),
1003            status: DydxOrderStatus::Untriggered,
1004            order_type: DydxOrderType::StopLimit,
1005            time_in_force: DydxTimeInForce::Gtt,
1006            post_only: false,
1007            reduce_only: true,
1008            order_flags: "32".to_string(),
1009            good_til_block: None,
1010            good_til_block_time: Some("2024-12-31T23:59:59Z".to_string()),
1011            created_at_height: "1000".to_string(),
1012            client_metadata: "100".to_string(),
1013            trigger_price: Some("51500.0".to_string()),
1014            total_filled: "0.0".to_string(),
1015            updated_at: Some("2024-11-14T11:00:00Z".to_string()),
1016            updated_at_height: Some("1050".to_string()),
1017        };
1018
1019        let clob_pair_id_to_instrument = DashMap::new();
1020        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1021        clob_pair_id_to_instrument.insert(1, instrument_id);
1022
1023        let instruments = DashMap::new();
1024        let instrument = create_test_instrument();
1025        instruments.insert(instrument_id, instrument);
1026
1027        let account_id = AccountId::new("DYDX-001");
1028        let ts_init = UnixNanos::default();
1029
1030        let result = parse_ws_order_report(
1031            &ws_order,
1032            &clob_pair_id_to_instrument,
1033            &instruments,
1034            account_id,
1035            ts_init,
1036        );
1037
1038        assert!(result.is_ok());
1039        let report = result.unwrap();
1040        assert_eq!(report.order_status, OrderStatus::PendingUpdate);
1041        // Trigger price should be parsed and available in the report
1042        assert!(report.trigger_price.is_some());
1043    }
1044
1045    #[rstest]
1046    fn test_parse_ws_order_market_type() {
1047        let ws_order = DydxWsOrderSubaccountMessageContents {
1048            id: "market_order".to_string(),
1049            subaccount_id: "dydx1test/0".to_string(),
1050            client_id: "77777".to_string(),
1051            clob_pair_id: "1".to_string(),
1052            side: OrderSide::Buy,
1053            size: "0.5".to_string(),
1054            price: "50000.0".to_string(), // Market orders still have a price
1055            status: DydxOrderStatus::Filled,
1056            order_type: DydxOrderType::Market,
1057            time_in_force: DydxTimeInForce::Ioc,
1058            post_only: false,
1059            reduce_only: false,
1060            order_flags: "0".to_string(),
1061            good_til_block: Some("1000".to_string()),
1062            good_til_block_time: None,
1063            created_at_height: "900".to_string(),
1064            client_metadata: "0".to_string(),
1065            trigger_price: None,
1066            total_filled: "0.5".to_string(),
1067            updated_at: Some("2024-11-14T10:01:00Z".to_string()),
1068            updated_at_height: Some("901".to_string()),
1069        };
1070
1071        let clob_pair_id_to_instrument = DashMap::new();
1072        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1073        clob_pair_id_to_instrument.insert(1, instrument_id);
1074
1075        let instruments = DashMap::new();
1076        let instrument = create_test_instrument();
1077        instruments.insert(instrument_id, instrument);
1078
1079        let account_id = AccountId::new("DYDX-001");
1080        let ts_init = UnixNanos::default();
1081
1082        let result = parse_ws_order_report(
1083            &ws_order,
1084            &clob_pair_id_to_instrument,
1085            &instruments,
1086            account_id,
1087            ts_init,
1088        );
1089
1090        assert!(result.is_ok());
1091        let report = result.unwrap();
1092        assert_eq!(report.order_type, OrderType::Market);
1093        assert_eq!(report.order_status, OrderStatus::Filled);
1094    }
1095
1096    #[rstest]
1097    fn test_parse_ws_order_invalid_clob_pair_id() {
1098        let ws_order = DydxWsOrderSubaccountMessageContents {
1099            id: "bad_order".to_string(),
1100            subaccount_id: "dydx1test/0".to_string(),
1101            client_id: "12345".to_string(),
1102            clob_pair_id: "not_a_number".to_string(), // Invalid
1103            side: OrderSide::Buy,
1104            size: "1.0".to_string(),
1105            price: "50000.0".to_string(),
1106            status: DydxOrderStatus::Open,
1107            order_type: DydxOrderType::Limit,
1108            time_in_force: DydxTimeInForce::Gtt,
1109            post_only: false,
1110            reduce_only: false,
1111            order_flags: "0".to_string(),
1112            good_til_block: Some("1000".to_string()),
1113            good_til_block_time: None,
1114            created_at_height: "900".to_string(),
1115            client_metadata: "0".to_string(),
1116            trigger_price: None,
1117            total_filled: "0.0".to_string(),
1118            updated_at: None,
1119            updated_at_height: None,
1120        };
1121
1122        let clob_pair_id_to_instrument = DashMap::new();
1123        let instruments = DashMap::new();
1124        let account_id = AccountId::new("DYDX-001");
1125        let ts_init = UnixNanos::default();
1126
1127        let result = parse_ws_order_report(
1128            &ws_order,
1129            &clob_pair_id_to_instrument,
1130            &instruments,
1131            account_id,
1132            ts_init,
1133        );
1134
1135        assert!(result.is_err());
1136        assert!(
1137            result
1138                .unwrap_err()
1139                .to_string()
1140                .contains("Failed to parse clob_pair_id")
1141        );
1142    }
1143
1144    #[rstest]
1145    fn test_parse_ws_position_closed() {
1146        use nautilus_model::enums::PositionSide;
1147
1148        use crate::{
1149            common::enums::DydxPositionStatus, websocket::messages::DydxPerpetualPosition,
1150        };
1151
1152        let instrument = create_test_instrument();
1153        let instrument_id = instrument.id();
1154
1155        let instruments = DashMap::new();
1156        instruments.insert(instrument_id, instrument);
1157
1158        let ws_position = DydxPerpetualPosition {
1159            market: "BTC-USD-PERP".into(),
1160            status: DydxPositionStatus::Closed,
1161            side: PositionSide::Long,
1162            size: "0.0".to_string(), // Closed = zero size
1163            max_size: "2.0".to_string(),
1164            entry_price: "48000.0".to_string(),
1165            exit_price: Some("52000.0".to_string()),
1166            realized_pnl: "2000.0".to_string(),
1167            unrealized_pnl: "0.0".to_string(),
1168            created_at: "2024-01-10T09:00:00Z".to_string(),
1169            closed_at: Some("2024-01-15T14:00:00Z".to_string()),
1170            sum_open: "5.0".to_string(),
1171            sum_close: "5.0".to_string(), // Fully closed
1172            net_funding: "-25.5".to_string(),
1173        };
1174
1175        let account_id = AccountId::new("DYDX-001");
1176        let ts_init = UnixNanos::default();
1177
1178        let result = parse_ws_position_report(&ws_position, &instruments, account_id, ts_init);
1179        assert!(result.is_ok());
1180
1181        let position_report = result.unwrap();
1182        assert_eq!(position_report.instrument_id, instrument_id);
1183        // Closed position should have zero quantity
1184        assert_eq!(position_report.quantity.as_f64(), 0.0);
1185    }
1186
1187    #[rstest]
1188    fn test_parse_ws_fill_with_maker_rebate() {
1189        use crate::{
1190            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
1191            websocket::messages::DydxWsFillSubaccountMessageContents,
1192        };
1193
1194        let instrument = create_test_instrument();
1195        let instrument_id = instrument.id();
1196
1197        let instruments = DashMap::new();
1198        instruments.insert(instrument_id, instrument);
1199
1200        let ws_fill = DydxWsFillSubaccountMessageContents {
1201            id: "fill_rebate".to_string(),
1202            subaccount_id: "sub1".to_string(),
1203            side: OrderSide::Buy,
1204            liquidity: DydxLiquidity::Maker,
1205            fill_type: DydxFillType::Limit,
1206            market: "BTC-USD-PERP".into(),
1207            market_type: DydxTickerType::Perpetual,
1208            price: "50000.0".to_string(),
1209            size: "1.0".to_string(),
1210            fee: "-15.0".to_string(), // Negative fee = rebate
1211            created_at: "2024-01-15T13:00:00Z".to_string(),
1212            created_at_height: "13000".to_string(),
1213            order_id: "order_maker".to_string(),
1214            client_metadata: "200".to_string(),
1215        };
1216
1217        let account_id = AccountId::new("DYDX-001");
1218        let ts_init = UnixNanos::default();
1219
1220        let result = parse_ws_fill_report(&ws_fill, &instruments, account_id, ts_init);
1221        assert!(result.is_ok());
1222
1223        let fill_report = result.unwrap();
1224        assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
1225        // Commission should be positive (rebate) after negating dYdX's negative fee
1226        assert!(fill_report.commission.as_f64() > 0.0);
1227    }
1228
1229    #[rstest]
1230    fn test_parse_ws_fill_taker_with_fee() {
1231        use crate::{
1232            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
1233            websocket::messages::DydxWsFillSubaccountMessageContents,
1234        };
1235
1236        let instrument = create_test_instrument();
1237        let instrument_id = instrument.id();
1238
1239        let instruments = DashMap::new();
1240        instruments.insert(instrument_id, instrument);
1241
1242        let ws_fill = DydxWsFillSubaccountMessageContents {
1243            id: "fill_taker".to_string(),
1244            subaccount_id: "sub2".to_string(),
1245            side: OrderSide::Sell,
1246            liquidity: DydxLiquidity::Taker,
1247            fill_type: DydxFillType::Limit,
1248            market: "BTC-USD-PERP".into(),
1249            market_type: DydxTickerType::Perpetual,
1250            price: "49800.0".to_string(),
1251            size: "0.75".to_string(),
1252            fee: "18.675".to_string(), // Positive fee for taker
1253            created_at: "2024-01-15T14:00:00Z".to_string(),
1254            created_at_height: "14000".to_string(),
1255            order_id: "order_taker".to_string(),
1256            client_metadata: "300".to_string(),
1257        };
1258
1259        let account_id = AccountId::new("DYDX-001");
1260        let ts_init = UnixNanos::default();
1261
1262        let result = parse_ws_fill_report(&ws_fill, &instruments, account_id, ts_init);
1263        assert!(result.is_ok());
1264
1265        let fill_report = result.unwrap();
1266        assert_eq!(fill_report.liquidity_side, LiquiditySide::Taker);
1267        assert_eq!(fill_report.order_side, OrderSide::Sell);
1268        // Commission should be negative (cost to trader) after negating positive fee
1269        assert!(fill_report.commission.as_f64() < 0.0);
1270    }
1271}