nautilus_dydx/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsing utilities for dYdX WebSocket messages.
17//!
18//! Converts WebSocket-specific message formats into Nautilus domain types
19//! by transforming them into HTTP-equivalent structures and delegating to
20//! the HTTP parser for consistency.
21
22use std::str::FromStr;
23
24use anyhow::Context;
25use chrono::Utc;
26use dashmap::DashMap;
27use nautilus_core::UnixNanos;
28use nautilus_model::{
29    enums::{OrderSide, OrderStatus},
30    identifiers::{AccountId, InstrumentId},
31    instruments::{Instrument, InstrumentAny},
32    reports::{FillReport, OrderStatusReport, PositionStatusReport},
33};
34use rust_decimal::Decimal;
35
36use crate::{http::models::Order, schemas::ws::DydxWsOrderSubaccountMessageContents};
37
38/// Parses a WebSocket order update into an OrderStatusReport.
39///
40/// Converts the WebSocket order format to the HTTP Order format, then delegates
41/// to the existing HTTP parser for consistency.
42///
43/// # Errors
44///
45/// Returns an error if:
46/// - clob_pair_id cannot be parsed from string
47/// - Instrument lookup fails for the clob_pair_id
48/// - Field parsing fails (price, size, etc.)
49/// - HTTP parser fails
50pub fn parse_ws_order_report(
51    ws_order: &DydxWsOrderSubaccountMessageContents,
52    clob_pair_id_to_instrument: &DashMap<u32, InstrumentId>,
53    instruments: &DashMap<InstrumentId, InstrumentAny>,
54    account_id: AccountId,
55    ts_init: UnixNanos,
56) -> anyhow::Result<OrderStatusReport> {
57    // Parse clob_pair_id from string
58    let clob_pair_id: u32 = ws_order.clob_pair_id.parse().context(format!(
59        "Failed to parse clob_pair_id '{}'",
60        ws_order.clob_pair_id
61    ))?;
62
63    // Lookup instrument by clob_pair_id
64    let instrument_id = *clob_pair_id_to_instrument
65        .get(&clob_pair_id)
66        .ok_or_else(|| {
67            let available: Vec<u32> = clob_pair_id_to_instrument
68                .iter()
69                .map(|entry| *entry.key())
70                .collect();
71            anyhow::anyhow!(
72                "No instrument cached for clob_pair_id {}. Available: {:?}",
73                clob_pair_id,
74                available
75            )
76        })?
77        .value();
78
79    let instrument = instruments
80        .get(&instrument_id)
81        .ok_or_else(|| anyhow::anyhow!("Instrument {} not found in cache", instrument_id))?
82        .value()
83        .clone();
84
85    // Convert WebSocket order to HTTP Order format
86    let http_order = convert_ws_order_to_http(ws_order)?;
87
88    // Delegate to existing HTTP parser
89    let mut report = crate::http::parse::parse_order_status_report(
90        &http_order,
91        &instrument,
92        account_id,
93        ts_init,
94    )?;
95
96    // For untriggered conditional orders with an explicit trigger price we
97    // surface `PendingUpdate` to match Nautilus semantics and existing dYdX
98    // enum mapping.
99    if matches!(
100        ws_order.status,
101        crate::common::enums::DydxOrderStatus::Untriggered
102    ) && ws_order.trigger_price.is_some()
103    {
104        report.order_status = OrderStatus::PendingUpdate;
105    }
106
107    Ok(report)
108}
109
110/// Converts a WebSocket order message to HTTP Order format.
111///
112/// # Errors
113///
114/// Returns an error if any field parsing fails.
115fn convert_ws_order_to_http(
116    ws_order: &DydxWsOrderSubaccountMessageContents,
117) -> anyhow::Result<Order> {
118    // Parse numeric fields
119    let clob_pair_id: u32 = ws_order
120        .clob_pair_id
121        .parse()
122        .context("Failed to parse clob_pair_id")?;
123
124    let size: Decimal = ws_order.size.parse().context("Failed to parse size")?;
125
126    let total_filled: Decimal = ws_order
127        .total_filled
128        .parse()
129        .context("Failed to parse total_filled")?;
130
131    // Saturate to zero if total_filled exceeds size (edge case: rounding or partial fills)
132    let remaining_size = (size - total_filled).max(Decimal::ZERO);
133
134    let price: Decimal = ws_order.price.parse().context("Failed to parse price")?;
135
136    let created_at_height: u64 = ws_order
137        .created_at_height
138        .parse()
139        .context("Failed to parse created_at_height")?;
140
141    let order_flags: u32 = ws_order
142        .order_flags
143        .parse()
144        .context("Failed to parse order_flags")?;
145
146    let client_metadata: u32 = ws_order
147        .client_metadata
148        .parse()
149        .context("Failed to parse client_metadata")?;
150
151    // Parse optional fields
152    let good_til_block = ws_order
153        .good_til_block
154        .as_ref()
155        .and_then(|s| s.parse::<u64>().ok());
156
157    let good_til_block_time = ws_order
158        .good_til_block_time
159        .as_ref()
160        .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
161        .map(|dt| dt.with_timezone(&Utc));
162
163    let trigger_price = ws_order
164        .trigger_price
165        .as_ref()
166        .and_then(|s| Decimal::from_str(s).ok());
167
168    // Parse updated_at - if missing, we must use current time since created_at
169    // is not available in WebSocket messages (only created_at_height exists)
170    let updated_at = ws_order
171        .updated_at
172        .as_ref()
173        .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
174        .map_or_else(Utc::now, |dt| dt.with_timezone(&Utc));
175
176    let updated_at_height = ws_order
177        .updated_at_height
178        .as_ref()
179        .and_then(|s| s.parse::<u64>().ok())
180        .unwrap_or(created_at_height);
181
182    // Convert order type to string using Display (gives PascalCase like "Limit", "Market")
183    let order_type = ws_order.order_type.to_string();
184
185    Ok(Order {
186        id: ws_order.id.clone(),
187        subaccount_id: ws_order.subaccount_id.clone(),
188        client_id: ws_order.client_id.clone(),
189        clob_pair_id,
190        side: ws_order.side,
191        size,
192        remaining_size,
193        price,
194        status: ws_order.status,
195        order_type,
196        time_in_force: ws_order.time_in_force,
197        reduce_only: ws_order.reduce_only,
198        post_only: ws_order.post_only,
199        order_flags,
200        good_til_block,
201        good_til_block_time,
202        created_at_height,
203        client_metadata,
204        trigger_price,
205        condition_type: None, // Not provided in WebSocket messages
206        conditional_order_trigger_subticks: None, // Not provided in WebSocket messages
207        execution: None,      // Inferred from post_only flag by HTTP parser
208        updated_at,
209        updated_at_height,
210    })
211}
212
213/// Parses a WebSocket fill update into a FillReport.
214///
215/// Converts the WebSocket fill format to the HTTP Fill format, then delegates
216/// to the existing HTTP parser for consistency.
217///
218/// # Errors
219///
220/// Returns an error if:
221/// - Instrument lookup fails for the market symbol
222/// - Field parsing fails (price, size, fee, etc.)
223/// - HTTP parser fails
224pub fn parse_ws_fill_report(
225    ws_fill: &crate::schemas::ws::DydxWsFillSubaccountMessageContents,
226    instruments: &DashMap<InstrumentId, InstrumentAny>,
227    account_id: AccountId,
228    ts_init: UnixNanos,
229) -> anyhow::Result<FillReport> {
230    // Lookup instrument by market symbol
231    let instrument = instruments
232        .iter()
233        .find(|entry| entry.value().id().symbol.as_str() == ws_fill.market.as_str())
234        .ok_or_else(|| {
235            let available: Vec<String> = instruments
236                .iter()
237                .map(|entry| entry.value().id().symbol.to_string())
238                .collect();
239            anyhow::anyhow!(
240                "No instrument cached for market '{}'. Available: {:?}",
241                ws_fill.market,
242                available
243            )
244        })?
245        .value()
246        .clone();
247
248    // Convert WebSocket fill to HTTP Fill format
249    let http_fill = convert_ws_fill_to_http(ws_fill)?;
250
251    // Delegate to existing HTTP parser
252    crate::http::parse::parse_fill_report(&http_fill, &instrument, account_id, ts_init)
253}
254
255/// Converts a WebSocket fill message to HTTP Fill format.
256///
257/// # Errors
258///
259/// Returns an error if any field parsing fails.
260fn convert_ws_fill_to_http(
261    ws_fill: &crate::schemas::ws::DydxWsFillSubaccountMessageContents,
262) -> anyhow::Result<crate::http::models::Fill> {
263    use crate::http::models::Fill;
264
265    // Parse numeric fields
266    let price: Decimal = ws_fill.price.parse().context("Failed to parse price")?;
267
268    let size: Decimal = ws_fill.size.parse().context("Failed to parse size")?;
269
270    let fee: Decimal = ws_fill.fee.parse().context("Failed to parse fee")?;
271
272    let created_at_height: u64 = ws_fill
273        .created_at_height
274        .parse()
275        .context("Failed to parse created_at_height")?;
276
277    let client_metadata: u32 = ws_fill
278        .client_metadata
279        .parse()
280        .context("Failed to parse client_metadata")?;
281
282    // Parse timestamp
283    let created_at = chrono::DateTime::parse_from_rfc3339(&ws_fill.created_at)
284        .context("Failed to parse created_at")?
285        .with_timezone(&Utc);
286
287    Ok(Fill {
288        id: ws_fill.id.clone(),
289        side: ws_fill.side,
290        liquidity: ws_fill.liquidity,
291        fill_type: ws_fill.fill_type,
292        market: ws_fill.market.to_string(),
293        market_type: ws_fill.market_type,
294        price,
295        size,
296        fee,
297        created_at,
298        created_at_height,
299        order_id: ws_fill.order_id.clone(),
300        client_metadata,
301    })
302}
303
304/// Parses a WebSocket position into a PositionStatusReport.
305///
306/// Converts the WebSocket position format to the HTTP PerpetualPosition format,
307/// then delegates to the existing HTTP parser for consistency.
308///
309/// # Errors
310///
311/// Returns an error if:
312/// - Instrument lookup fails for the market symbol
313/// - Field parsing fails (size, prices, etc.)
314/// - HTTP parser fails
315pub fn parse_ws_position_report(
316    ws_position: &crate::schemas::ws::DydxPerpetualPosition,
317    instruments: &DashMap<InstrumentId, InstrumentAny>,
318    account_id: AccountId,
319    ts_init: UnixNanos,
320) -> anyhow::Result<PositionStatusReport> {
321    // Lookup instrument by market symbol
322    let instrument = instruments
323        .iter()
324        .find(|entry| entry.value().id().symbol.as_str() == ws_position.market.as_str())
325        .ok_or_else(|| {
326            let available: Vec<String> = instruments
327                .iter()
328                .map(|entry| entry.value().id().symbol.to_string())
329                .collect();
330            anyhow::anyhow!(
331                "No instrument cached for market '{}'. Available: {:?}",
332                ws_position.market,
333                available
334            )
335        })?
336        .value()
337        .clone();
338
339    // Convert WebSocket position to HTTP PerpetualPosition format
340    let http_position = convert_ws_position_to_http(ws_position)?;
341
342    // Delegate to existing HTTP parser
343    crate::http::parse::parse_position_status_report(
344        &http_position,
345        &instrument,
346        account_id,
347        ts_init,
348    )
349}
350
351/// Converts a WebSocket position to HTTP PerpetualPosition format.
352///
353/// # Errors
354///
355/// Returns an error if any field parsing fails.
356fn convert_ws_position_to_http(
357    ws_position: &crate::schemas::ws::DydxPerpetualPosition,
358) -> anyhow::Result<crate::http::models::PerpetualPosition> {
359    use crate::http::models::PerpetualPosition;
360
361    // Parse numeric fields
362    let size: Decimal = ws_position.size.parse().context("Failed to parse size")?;
363
364    let max_size: Decimal = ws_position
365        .max_size
366        .parse()
367        .context("Failed to parse max_size")?;
368
369    let entry_price: Decimal = ws_position
370        .entry_price
371        .parse()
372        .context("Failed to parse entry_price")?;
373
374    let exit_price: Option<Decimal> = ws_position
375        .exit_price
376        .as_ref()
377        .map(|s| s.parse())
378        .transpose()
379        .context("Failed to parse exit_price")?;
380
381    let realized_pnl: Decimal = ws_position
382        .realized_pnl
383        .parse()
384        .context("Failed to parse realized_pnl")?;
385
386    let unrealized_pnl: Decimal = ws_position
387        .unrealized_pnl
388        .parse()
389        .context("Failed to parse unrealized_pnl")?;
390
391    let sum_open: Decimal = ws_position
392        .sum_open
393        .parse()
394        .context("Failed to parse sum_open")?;
395
396    let sum_close: Decimal = ws_position
397        .sum_close
398        .parse()
399        .context("Failed to parse sum_close")?;
400
401    let net_funding: Decimal = ws_position
402        .net_funding
403        .parse()
404        .context("Failed to parse net_funding")?;
405
406    // Parse timestamps
407    let created_at = chrono::DateTime::parse_from_rfc3339(&ws_position.created_at)
408        .context("Failed to parse created_at")?
409        .with_timezone(&Utc);
410
411    let closed_at = ws_position
412        .closed_at
413        .as_ref()
414        .map(|s| chrono::DateTime::parse_from_rfc3339(s))
415        .transpose()
416        .context("Failed to parse closed_at")?
417        .map(|dt| dt.with_timezone(&Utc));
418
419    // Determine side from size sign (HTTP format uses OrderSide, not PositionSide)
420    let side = if size.is_sign_positive() {
421        OrderSide::Buy
422    } else {
423        OrderSide::Sell
424    };
425
426    Ok(PerpetualPosition {
427        market: ws_position.market.to_string(),
428        status: ws_position.status,
429        side,
430        size,
431        max_size,
432        entry_price,
433        exit_price,
434        realized_pnl,
435        created_at_height: 0, // Not provided in WebSocket messages
436        created_at,
437        sum_open,
438        sum_close,
439        net_funding,
440        unrealized_pnl,
441        closed_at,
442    })
443}
444
445#[cfg(test)]
446mod tests {
447    use nautilus_model::{
448        enums::{LiquiditySide, OrderSide, OrderType, PositionSideSpecified},
449        identifiers::{AccountId, InstrumentId, Symbol, Venue},
450        instruments::CryptoPerpetual,
451        types::{Currency, Price, Quantity},
452    };
453    use rstest::rstest;
454
455    use super::*;
456    use crate::common::enums::{DydxOrderStatus, DydxOrderType, DydxTimeInForce};
457
458    fn create_test_instrument() -> InstrumentAny {
459        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
460
461        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
462            instrument_id,
463            Symbol::new("BTC-USD"),
464            Currency::BTC(),
465            Currency::USD(),
466            Currency::USD(),
467            false,
468            2,
469            8,
470            Price::new(0.01, 2),
471            Quantity::new(0.001, 8),
472            Some(Quantity::new(1.0, 0)),
473            Some(Quantity::new(0.001, 8)),
474            Some(Quantity::new(100000.0, 8)),
475            Some(Quantity::new(0.001, 8)),
476            None,
477            None,
478            Some(Price::new(1000000.0, 2)),
479            Some(Price::new(0.01, 2)),
480            Some(rust_decimal_macros::dec!(0.05)),
481            Some(rust_decimal_macros::dec!(0.03)),
482            Some(rust_decimal_macros::dec!(0.0002)),
483            Some(rust_decimal_macros::dec!(0.0005)),
484            UnixNanos::default(),
485            UnixNanos::default(),
486        ))
487    }
488
489    #[rstest]
490    fn test_convert_ws_order_to_http_basic() {
491        let ws_order = DydxWsOrderSubaccountMessageContents {
492            id: "order123".to_string(),
493            subaccount_id: "dydx1test/0".to_string(),
494            client_id: "12345".to_string(),
495            clob_pair_id: "1".to_string(),
496            side: OrderSide::Buy,
497            size: "1.5".to_string(),
498            price: "50000.0".to_string(),
499            status: DydxOrderStatus::PartiallyFilled,
500            order_type: DydxOrderType::Limit,
501            time_in_force: DydxTimeInForce::Gtt,
502            post_only: false,
503            reduce_only: false,
504            order_flags: "0".to_string(),
505            good_til_block: Some("1000".to_string()),
506            good_til_block_time: None,
507            created_at_height: "900".to_string(),
508            client_metadata: "0".to_string(),
509            trigger_price: None,
510            total_filled: "0.5".to_string(),
511            updated_at: Some("2024-11-14T10:00:00Z".to_string()),
512            updated_at_height: Some("950".to_string()),
513        };
514
515        let result = convert_ws_order_to_http(&ws_order);
516        assert!(result.is_ok());
517
518        let http_order = result.unwrap();
519        assert_eq!(http_order.id, "order123");
520        assert_eq!(http_order.clob_pair_id, 1);
521        assert_eq!(http_order.size.to_string(), "1.5");
522        assert_eq!(http_order.remaining_size, rust_decimal_macros::dec!(1.0)); // 1.5 - 0.5
523        assert_eq!(http_order.status, DydxOrderStatus::PartiallyFilled);
524    }
525
526    #[rstest]
527    fn test_parse_ws_order_report_success() {
528        let ws_order = DydxWsOrderSubaccountMessageContents {
529            id: "order456".to_string(),
530            subaccount_id: "dydx1test/0".to_string(),
531            client_id: "67890".to_string(),
532            clob_pair_id: "1".to_string(),
533            side: OrderSide::Sell,
534            size: "2.0".to_string(),
535            price: "51000.0".to_string(),
536            status: DydxOrderStatus::Open,
537            order_type: DydxOrderType::Limit,
538            time_in_force: DydxTimeInForce::Gtt,
539            post_only: true,
540            reduce_only: false,
541            order_flags: "0".to_string(),
542            good_til_block: Some("2000".to_string()),
543            good_til_block_time: None,
544            created_at_height: "1800".to_string(),
545            client_metadata: "0".to_string(),
546            trigger_price: None,
547            total_filled: "0.0".to_string(),
548            updated_at: None,
549            updated_at_height: None,
550        };
551
552        let clob_pair_id_to_instrument = DashMap::new();
553        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
554        clob_pair_id_to_instrument.insert(1, instrument_id);
555
556        let instruments = DashMap::new();
557        let instrument = create_test_instrument();
558        instruments.insert(instrument_id, instrument);
559
560        let account_id = AccountId::new("DYDX-001");
561        let ts_init = UnixNanos::default();
562
563        let result = parse_ws_order_report(
564            &ws_order,
565            &clob_pair_id_to_instrument,
566            &instruments,
567            account_id,
568            ts_init,
569        );
570
571        assert!(result.is_ok());
572        let report = result.unwrap();
573        assert_eq!(report.account_id, account_id);
574        assert_eq!(report.order_side, OrderSide::Sell);
575    }
576
577    #[rstest]
578    fn test_parse_ws_order_report_missing_instrument() {
579        let ws_order = DydxWsOrderSubaccountMessageContents {
580            id: "order789".to_string(),
581            subaccount_id: "dydx1test/0".to_string(),
582            client_id: "11111".to_string(),
583            clob_pair_id: "99".to_string(), // Non-existent
584            side: OrderSide::Buy,
585            size: "1.0".to_string(),
586            price: "50000.0".to_string(),
587            status: DydxOrderStatus::Open,
588            order_type: DydxOrderType::Market,
589            time_in_force: DydxTimeInForce::Ioc,
590            post_only: false,
591            reduce_only: false,
592            order_flags: "0".to_string(),
593            good_til_block: Some("1000".to_string()),
594            good_til_block_time: None,
595            created_at_height: "900".to_string(),
596            client_metadata: "0".to_string(),
597            trigger_price: None,
598            total_filled: "0.0".to_string(),
599            updated_at: None,
600            updated_at_height: None,
601        };
602
603        let clob_pair_id_to_instrument = DashMap::new();
604        let instruments = DashMap::new();
605        let account_id = AccountId::new("DYDX-001");
606        let ts_init = UnixNanos::default();
607
608        let result = parse_ws_order_report(
609            &ws_order,
610            &clob_pair_id_to_instrument,
611            &instruments,
612            account_id,
613            ts_init,
614        );
615
616        assert!(result.is_err());
617        assert!(
618            result
619                .unwrap_err()
620                .to_string()
621                .contains("No instrument cached")
622        );
623    }
624
625    // ========== Fill Parsing Tests ==========
626
627    #[rstest]
628    fn test_convert_ws_fill_to_http() {
629        use crate::{
630            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
631            schemas::ws::DydxWsFillSubaccountMessageContents,
632        };
633
634        let ws_fill = DydxWsFillSubaccountMessageContents {
635            id: "fill123".to_string(),
636            subaccount_id: "sub1".to_string(),
637            side: OrderSide::Buy,
638            liquidity: DydxLiquidity::Maker,
639            fill_type: DydxFillType::Limit,
640            market: "BTC-USD".into(),
641            market_type: DydxTickerType::Perpetual,
642            price: "50000.5".to_string(),
643            size: "0.1".to_string(),
644            fee: "-2.5".to_string(), // Negative for maker rebate
645            created_at: "2024-01-15T10:30:00Z".to_string(),
646            created_at_height: "12345".to_string(),
647            order_id: "order456".to_string(),
648            client_metadata: "999".to_string(),
649        };
650
651        let result = convert_ws_fill_to_http(&ws_fill);
652        assert!(result.is_ok());
653
654        let http_fill = result.unwrap();
655        assert_eq!(http_fill.id, "fill123");
656        assert_eq!(http_fill.side, OrderSide::Buy);
657        assert_eq!(http_fill.liquidity, DydxLiquidity::Maker);
658        assert_eq!(http_fill.price, rust_decimal_macros::dec!(50000.5));
659        assert_eq!(http_fill.size, rust_decimal_macros::dec!(0.1));
660        assert_eq!(http_fill.fee, rust_decimal_macros::dec!(-2.5));
661        assert_eq!(http_fill.created_at_height, 12345);
662        assert_eq!(http_fill.order_id, "order456");
663        assert_eq!(http_fill.client_metadata, 999);
664    }
665
666    #[rstest]
667    fn test_parse_ws_fill_report_success() {
668        use crate::{
669            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
670            schemas::ws::DydxWsFillSubaccountMessageContents,
671        };
672
673        let instrument = create_test_instrument();
674        let instrument_id = instrument.id();
675
676        let instruments = DashMap::new();
677        instruments.insert(instrument_id, instrument);
678
679        let ws_fill = DydxWsFillSubaccountMessageContents {
680            id: "fill789".to_string(),
681            subaccount_id: "sub1".to_string(),
682            side: OrderSide::Sell,
683            liquidity: DydxLiquidity::Taker,
684            fill_type: DydxFillType::Limit,
685            market: "BTC-USD-PERP".into(),
686            market_type: DydxTickerType::Perpetual,
687            price: "49500.0".to_string(),
688            size: "0.5".to_string(),
689            fee: "12.375".to_string(), // Positive for taker fee
690            created_at: "2024-01-15T11:00:00Z".to_string(),
691            created_at_height: "12400".to_string(),
692            order_id: "order999".to_string(),
693            client_metadata: "888".to_string(),
694        };
695
696        let account_id = AccountId::new("DYDX-001");
697        let ts_init = UnixNanos::default();
698
699        let result = parse_ws_fill_report(&ws_fill, &instruments, account_id, ts_init);
700        assert!(result.is_ok());
701
702        let fill_report = result.unwrap();
703        assert_eq!(fill_report.instrument_id, instrument_id);
704        assert_eq!(fill_report.venue_order_id.as_str(), "order999");
705        assert_eq!(fill_report.last_qty.as_f64(), 0.5);
706        assert_eq!(fill_report.last_px.as_f64(), 49500.0);
707        // Commission should be negative (cost to trader) after negating positive fee
708        assert!((fill_report.commission.as_f64() + 12.38).abs() < 0.01);
709    }
710
711    #[rstest]
712    fn test_parse_ws_fill_report_missing_instrument() {
713        use crate::{
714            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
715            schemas::ws::DydxWsFillSubaccountMessageContents,
716        };
717
718        let instruments = DashMap::new(); // Empty - no instruments cached
719
720        let ws_fill = DydxWsFillSubaccountMessageContents {
721            id: "fill000".to_string(),
722            subaccount_id: "sub1".to_string(),
723            side: OrderSide::Buy,
724            liquidity: DydxLiquidity::Maker,
725            fill_type: DydxFillType::Limit,
726            market: "ETH-USD-PERP".into(),
727            market_type: DydxTickerType::Perpetual,
728            price: "3000.0".to_string(),
729            size: "1.0".to_string(),
730            fee: "-1.5".to_string(),
731            created_at: "2024-01-15T12:00:00Z".to_string(),
732            created_at_height: "12500".to_string(),
733            order_id: "order111".to_string(),
734            client_metadata: "777".to_string(),
735        };
736
737        let account_id = AccountId::new("DYDX-001");
738        let ts_init = UnixNanos::default();
739
740        let result = parse_ws_fill_report(&ws_fill, &instruments, account_id, ts_init);
741        assert!(result.is_err());
742        assert!(
743            result
744                .unwrap_err()
745                .to_string()
746                .contains("No instrument cached for market")
747        );
748    }
749
750    // ========== Position Parsing Tests ==========
751
752    #[rstest]
753    fn test_convert_ws_position_to_http() {
754        use nautilus_model::enums::PositionSide;
755
756        use crate::{common::enums::DydxPositionStatus, schemas::ws::DydxPerpetualPosition};
757
758        let ws_position = DydxPerpetualPosition {
759            market: "BTC-USD".into(),
760            status: DydxPositionStatus::Open,
761            side: PositionSide::Long,
762            size: "1.5".to_string(),
763            max_size: "2.0".to_string(),
764            entry_price: "50000.0".to_string(),
765            exit_price: None,
766            realized_pnl: "100.0".to_string(),
767            unrealized_pnl: "250.5".to_string(),
768            created_at: "2024-01-15T10:00:00Z".to_string(),
769            closed_at: None,
770            sum_open: "5.0".to_string(),
771            sum_close: "3.5".to_string(),
772            net_funding: "-10.25".to_string(),
773        };
774
775        let result = convert_ws_position_to_http(&ws_position);
776        assert!(result.is_ok());
777
778        let http_position = result.unwrap();
779        assert_eq!(http_position.market, "BTC-USD");
780        assert_eq!(http_position.status, DydxPositionStatus::Open);
781        assert_eq!(http_position.side, OrderSide::Buy); // Positive size = Buy
782        assert_eq!(http_position.size, rust_decimal_macros::dec!(1.5));
783        assert_eq!(http_position.max_size, rust_decimal_macros::dec!(2.0));
784        assert_eq!(
785            http_position.entry_price,
786            rust_decimal_macros::dec!(50000.0)
787        );
788        assert_eq!(http_position.exit_price, None);
789        assert_eq!(http_position.realized_pnl, rust_decimal_macros::dec!(100.0));
790        assert_eq!(
791            http_position.unrealized_pnl,
792            rust_decimal_macros::dec!(250.5)
793        );
794        assert_eq!(http_position.sum_open, rust_decimal_macros::dec!(5.0));
795        assert_eq!(http_position.sum_close, rust_decimal_macros::dec!(3.5));
796        assert_eq!(http_position.net_funding, rust_decimal_macros::dec!(-10.25));
797    }
798
799    #[rstest]
800    fn test_parse_ws_position_report_success() {
801        use nautilus_model::enums::PositionSide;
802
803        use crate::{common::enums::DydxPositionStatus, schemas::ws::DydxPerpetualPosition};
804
805        let instrument = create_test_instrument();
806        let instrument_id = instrument.id();
807
808        let instruments = DashMap::new();
809        instruments.insert(instrument_id, instrument);
810
811        let ws_position = DydxPerpetualPosition {
812            market: "BTC-USD-PERP".into(),
813            status: DydxPositionStatus::Open,
814            side: PositionSide::Long,
815            size: "0.5".to_string(),
816            max_size: "1.0".to_string(),
817            entry_price: "49500.0".to_string(),
818            exit_price: None,
819            realized_pnl: "0.0".to_string(),
820            unrealized_pnl: "125.0".to_string(),
821            created_at: "2024-01-15T09:00:00Z".to_string(),
822            closed_at: None,
823            sum_open: "0.5".to_string(),
824            sum_close: "0.0".to_string(),
825            net_funding: "-2.5".to_string(),
826        };
827
828        let account_id = AccountId::new("DYDX-001");
829        let ts_init = UnixNanos::default();
830
831        let result = parse_ws_position_report(&ws_position, &instruments, account_id, ts_init);
832        assert!(result.is_ok());
833
834        let position_report = result.unwrap();
835        assert_eq!(position_report.instrument_id, instrument_id);
836        assert_eq!(position_report.position_side, PositionSideSpecified::Long);
837        assert_eq!(position_report.quantity.as_f64(), 0.5);
838        // avg_px_open should be entry_price
839        assert!(position_report.avg_px_open.is_some());
840    }
841
842    #[rstest]
843    fn test_parse_ws_position_report_short() {
844        use nautilus_model::enums::PositionSide;
845
846        use crate::{common::enums::DydxPositionStatus, schemas::ws::DydxPerpetualPosition};
847
848        let instrument = create_test_instrument();
849        let instrument_id = instrument.id();
850
851        let instruments = DashMap::new();
852        instruments.insert(instrument_id, instrument);
853
854        let ws_position = DydxPerpetualPosition {
855            market: "BTC-USD-PERP".into(),
856            status: DydxPositionStatus::Open,
857            side: PositionSide::Short,
858            size: "-0.25".to_string(), // Negative for short
859            max_size: "0.5".to_string(),
860            entry_price: "51000.0".to_string(),
861            exit_price: None,
862            realized_pnl: "50.0".to_string(),
863            unrealized_pnl: "-75.25".to_string(),
864            created_at: "2024-01-15T08:00:00Z".to_string(),
865            closed_at: None,
866            sum_open: "0.25".to_string(),
867            sum_close: "0.0".to_string(),
868            net_funding: "1.5".to_string(),
869        };
870
871        let account_id = AccountId::new("DYDX-001");
872        let ts_init = UnixNanos::default();
873
874        let result = parse_ws_position_report(&ws_position, &instruments, account_id, ts_init);
875        assert!(result.is_ok());
876
877        let position_report = result.unwrap();
878        assert_eq!(position_report.instrument_id, instrument_id);
879        assert_eq!(position_report.position_side, PositionSideSpecified::Short);
880        assert_eq!(position_report.quantity.as_f64(), 0.25); // Quantity is always positive
881    }
882
883    #[rstest]
884    fn test_parse_ws_position_report_missing_instrument() {
885        use nautilus_model::enums::PositionSide;
886
887        use crate::{common::enums::DydxPositionStatus, schemas::ws::DydxPerpetualPosition};
888
889        let instruments = DashMap::new(); // Empty - no instruments cached
890
891        let ws_position = DydxPerpetualPosition {
892            market: "ETH-USD-PERP".into(),
893            status: DydxPositionStatus::Open,
894            side: PositionSide::Long,
895            size: "5.0".to_string(),
896            max_size: "10.0".to_string(),
897            entry_price: "3000.0".to_string(),
898            exit_price: None,
899            realized_pnl: "0.0".to_string(),
900            unrealized_pnl: "500.0".to_string(),
901            created_at: "2024-01-15T07:00:00Z".to_string(),
902            closed_at: None,
903            sum_open: "5.0".to_string(),
904            sum_close: "0.0".to_string(),
905            net_funding: "-5.0".to_string(),
906        };
907
908        let account_id = AccountId::new("DYDX-001");
909        let ts_init = UnixNanos::default();
910
911        let result = parse_ws_position_report(&ws_position, &instruments, account_id, ts_init);
912        assert!(result.is_err());
913        assert!(
914            result
915                .unwrap_err()
916                .to_string()
917                .contains("No instrument cached for market")
918        );
919    }
920
921    #[rstest]
922    #[case(DydxOrderStatus::Filled, "2.0")]
923    #[case(DydxOrderStatus::Canceled, "0.0")]
924    #[case(DydxOrderStatus::BestEffortCanceled, "0.5")]
925    #[case(DydxOrderStatus::BestEffortOpened, "0.0")]
926    #[case(DydxOrderStatus::Untriggered, "0.0")]
927    fn test_parse_ws_order_various_statuses(
928        #[case] status: DydxOrderStatus,
929        #[case] total_filled: &str,
930    ) {
931        let ws_order = DydxWsOrderSubaccountMessageContents {
932            id: format!("order_{:?}", status),
933            subaccount_id: "dydx1test/0".to_string(),
934            client_id: "99999".to_string(),
935            clob_pair_id: "1".to_string(),
936            side: OrderSide::Buy,
937            size: "2.0".to_string(),
938            price: "50000.0".to_string(),
939            status,
940            order_type: DydxOrderType::Limit,
941            time_in_force: DydxTimeInForce::Gtt,
942            post_only: false,
943            reduce_only: false,
944            order_flags: "0".to_string(),
945            good_til_block: Some("1000".to_string()),
946            good_til_block_time: None,
947            created_at_height: "900".to_string(),
948            client_metadata: "0".to_string(),
949            trigger_price: None,
950            total_filled: total_filled.to_string(),
951            updated_at: Some("2024-11-14T10:00:00Z".to_string()),
952            updated_at_height: Some("950".to_string()),
953        };
954
955        let clob_pair_id_to_instrument = DashMap::new();
956        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
957        clob_pair_id_to_instrument.insert(1, instrument_id);
958
959        let instruments = DashMap::new();
960        let instrument = create_test_instrument();
961        instruments.insert(instrument_id, instrument);
962
963        let account_id = AccountId::new("DYDX-001");
964        let ts_init = UnixNanos::default();
965
966        let result = parse_ws_order_report(
967            &ws_order,
968            &clob_pair_id_to_instrument,
969            &instruments,
970            account_id,
971            ts_init,
972        );
973
974        assert!(
975            result.is_ok(),
976            "Failed to parse order with status {:?}",
977            status
978        );
979        let report = result.unwrap();
980
981        // Verify status conversion
982        use nautilus_model::enums::OrderStatus;
983        let expected_status = match status {
984            DydxOrderStatus::Open
985            | DydxOrderStatus::BestEffortOpened
986            | DydxOrderStatus::Untriggered => OrderStatus::Accepted,
987            DydxOrderStatus::PartiallyFilled => OrderStatus::PartiallyFilled,
988            DydxOrderStatus::Filled => OrderStatus::Filled,
989            DydxOrderStatus::Canceled | DydxOrderStatus::BestEffortCanceled => {
990                OrderStatus::Canceled
991            }
992        };
993        assert_eq!(report.order_status, expected_status);
994    }
995
996    #[rstest]
997    fn test_parse_ws_order_with_trigger_price() {
998        let ws_order = DydxWsOrderSubaccountMessageContents {
999            id: "conditional_order".to_string(),
1000            subaccount_id: "dydx1test/0".to_string(),
1001            client_id: "88888".to_string(),
1002            clob_pair_id: "1".to_string(),
1003            side: OrderSide::Sell,
1004            size: "1.0".to_string(),
1005            price: "52000.0".to_string(),
1006            status: DydxOrderStatus::Untriggered,
1007            order_type: DydxOrderType::StopLimit,
1008            time_in_force: DydxTimeInForce::Gtt,
1009            post_only: false,
1010            reduce_only: true,
1011            order_flags: "32".to_string(), // Conditional flag
1012            good_til_block: None,
1013            good_til_block_time: Some("2024-12-31T23:59:59Z".to_string()),
1014            created_at_height: "1000".to_string(),
1015            client_metadata: "100".to_string(),
1016            trigger_price: Some("51500.0".to_string()),
1017            total_filled: "0.0".to_string(),
1018            updated_at: Some("2024-11-14T11:00:00Z".to_string()),
1019            updated_at_height: Some("1050".to_string()),
1020        };
1021
1022        let clob_pair_id_to_instrument = DashMap::new();
1023        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1024        clob_pair_id_to_instrument.insert(1, instrument_id);
1025
1026        let instruments = DashMap::new();
1027        let instrument = create_test_instrument();
1028        instruments.insert(instrument_id, instrument);
1029
1030        let account_id = AccountId::new("DYDX-001");
1031        let ts_init = UnixNanos::default();
1032
1033        let result = parse_ws_order_report(
1034            &ws_order,
1035            &clob_pair_id_to_instrument,
1036            &instruments,
1037            account_id,
1038            ts_init,
1039        );
1040
1041        assert!(result.is_ok());
1042        let report = result.unwrap();
1043        assert_eq!(report.order_status, OrderStatus::PendingUpdate);
1044        // Trigger price should be parsed and available in the report
1045        assert!(report.trigger_price.is_some());
1046    }
1047
1048    #[rstest]
1049    fn test_parse_ws_order_market_type() {
1050        let ws_order = DydxWsOrderSubaccountMessageContents {
1051            id: "market_order".to_string(),
1052            subaccount_id: "dydx1test/0".to_string(),
1053            client_id: "77777".to_string(),
1054            clob_pair_id: "1".to_string(),
1055            side: OrderSide::Buy,
1056            size: "0.5".to_string(),
1057            price: "50000.0".to_string(), // Market orders still have a price
1058            status: DydxOrderStatus::Filled,
1059            order_type: DydxOrderType::Market,
1060            time_in_force: DydxTimeInForce::Ioc,
1061            post_only: false,
1062            reduce_only: false,
1063            order_flags: "0".to_string(),
1064            good_til_block: Some("1000".to_string()),
1065            good_til_block_time: None,
1066            created_at_height: "900".to_string(),
1067            client_metadata: "0".to_string(),
1068            trigger_price: None,
1069            total_filled: "0.5".to_string(),
1070            updated_at: Some("2024-11-14T10:01:00Z".to_string()),
1071            updated_at_height: Some("901".to_string()),
1072        };
1073
1074        let clob_pair_id_to_instrument = DashMap::new();
1075        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1076        clob_pair_id_to_instrument.insert(1, instrument_id);
1077
1078        let instruments = DashMap::new();
1079        let instrument = create_test_instrument();
1080        instruments.insert(instrument_id, instrument);
1081
1082        let account_id = AccountId::new("DYDX-001");
1083        let ts_init = UnixNanos::default();
1084
1085        let result = parse_ws_order_report(
1086            &ws_order,
1087            &clob_pair_id_to_instrument,
1088            &instruments,
1089            account_id,
1090            ts_init,
1091        );
1092
1093        assert!(result.is_ok());
1094        let report = result.unwrap();
1095        assert_eq!(report.order_type, OrderType::Market);
1096        assert_eq!(report.order_status, OrderStatus::Filled);
1097    }
1098
1099    #[rstest]
1100    fn test_parse_ws_order_invalid_clob_pair_id() {
1101        let ws_order = DydxWsOrderSubaccountMessageContents {
1102            id: "bad_order".to_string(),
1103            subaccount_id: "dydx1test/0".to_string(),
1104            client_id: "12345".to_string(),
1105            clob_pair_id: "not_a_number".to_string(), // Invalid
1106            side: OrderSide::Buy,
1107            size: "1.0".to_string(),
1108            price: "50000.0".to_string(),
1109            status: DydxOrderStatus::Open,
1110            order_type: DydxOrderType::Limit,
1111            time_in_force: DydxTimeInForce::Gtt,
1112            post_only: false,
1113            reduce_only: false,
1114            order_flags: "0".to_string(),
1115            good_til_block: Some("1000".to_string()),
1116            good_til_block_time: None,
1117            created_at_height: "900".to_string(),
1118            client_metadata: "0".to_string(),
1119            trigger_price: None,
1120            total_filled: "0.0".to_string(),
1121            updated_at: None,
1122            updated_at_height: None,
1123        };
1124
1125        let clob_pair_id_to_instrument = DashMap::new();
1126        let instruments = DashMap::new();
1127        let account_id = AccountId::new("DYDX-001");
1128        let ts_init = UnixNanos::default();
1129
1130        let result = parse_ws_order_report(
1131            &ws_order,
1132            &clob_pair_id_to_instrument,
1133            &instruments,
1134            account_id,
1135            ts_init,
1136        );
1137
1138        assert!(result.is_err());
1139        assert!(
1140            result
1141                .unwrap_err()
1142                .to_string()
1143                .contains("Failed to parse clob_pair_id")
1144        );
1145    }
1146
1147    #[rstest]
1148    fn test_parse_ws_position_closed() {
1149        use nautilus_model::enums::PositionSide;
1150
1151        use crate::{common::enums::DydxPositionStatus, schemas::ws::DydxPerpetualPosition};
1152
1153        let instrument = create_test_instrument();
1154        let instrument_id = instrument.id();
1155
1156        let instruments = DashMap::new();
1157        instruments.insert(instrument_id, instrument);
1158
1159        let ws_position = DydxPerpetualPosition {
1160            market: "BTC-USD-PERP".into(),
1161            status: DydxPositionStatus::Closed,
1162            side: PositionSide::Long,
1163            size: "0.0".to_string(), // Closed = zero size
1164            max_size: "2.0".to_string(),
1165            entry_price: "48000.0".to_string(),
1166            exit_price: Some("52000.0".to_string()),
1167            realized_pnl: "2000.0".to_string(),
1168            unrealized_pnl: "0.0".to_string(),
1169            created_at: "2024-01-10T09:00:00Z".to_string(),
1170            closed_at: Some("2024-01-15T14:00:00Z".to_string()),
1171            sum_open: "5.0".to_string(),
1172            sum_close: "5.0".to_string(), // Fully closed
1173            net_funding: "-25.5".to_string(),
1174        };
1175
1176        let account_id = AccountId::new("DYDX-001");
1177        let ts_init = UnixNanos::default();
1178
1179        let result = parse_ws_position_report(&ws_position, &instruments, account_id, ts_init);
1180        assert!(result.is_ok());
1181
1182        let position_report = result.unwrap();
1183        assert_eq!(position_report.instrument_id, instrument_id);
1184        // Closed position should have zero quantity
1185        assert_eq!(position_report.quantity.as_f64(), 0.0);
1186    }
1187
1188    #[rstest]
1189    fn test_parse_ws_fill_with_maker_rebate() {
1190        use crate::{
1191            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
1192            schemas::ws::DydxWsFillSubaccountMessageContents,
1193        };
1194
1195        let instrument = create_test_instrument();
1196        let instrument_id = instrument.id();
1197
1198        let instruments = DashMap::new();
1199        instruments.insert(instrument_id, instrument);
1200
1201        let ws_fill = DydxWsFillSubaccountMessageContents {
1202            id: "fill_rebate".to_string(),
1203            subaccount_id: "sub1".to_string(),
1204            side: OrderSide::Buy,
1205            liquidity: DydxLiquidity::Maker,
1206            fill_type: DydxFillType::Limit,
1207            market: "BTC-USD-PERP".into(),
1208            market_type: DydxTickerType::Perpetual,
1209            price: "50000.0".to_string(),
1210            size: "1.0".to_string(),
1211            fee: "-15.0".to_string(), // Negative fee = rebate
1212            created_at: "2024-01-15T13:00:00Z".to_string(),
1213            created_at_height: "13000".to_string(),
1214            order_id: "order_maker".to_string(),
1215            client_metadata: "200".to_string(),
1216        };
1217
1218        let account_id = AccountId::new("DYDX-001");
1219        let ts_init = UnixNanos::default();
1220
1221        let result = parse_ws_fill_report(&ws_fill, &instruments, account_id, ts_init);
1222        assert!(result.is_ok());
1223
1224        let fill_report = result.unwrap();
1225        assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
1226        // Commission should be positive (rebate) after negating dYdX's negative fee
1227        assert!(fill_report.commission.as_f64() > 0.0);
1228    }
1229
1230    #[rstest]
1231    fn test_parse_ws_fill_taker_with_fee() {
1232        use crate::{
1233            common::enums::{DydxFillType, DydxLiquidity, DydxTickerType},
1234            schemas::ws::DydxWsFillSubaccountMessageContents,
1235        };
1236
1237        let instrument = create_test_instrument();
1238        let instrument_id = instrument.id();
1239
1240        let instruments = DashMap::new();
1241        instruments.insert(instrument_id, instrument);
1242
1243        let ws_fill = DydxWsFillSubaccountMessageContents {
1244            id: "fill_taker".to_string(),
1245            subaccount_id: "sub2".to_string(),
1246            side: OrderSide::Sell,
1247            liquidity: DydxLiquidity::Taker,
1248            fill_type: DydxFillType::Limit,
1249            market: "BTC-USD-PERP".into(),
1250            market_type: DydxTickerType::Perpetual,
1251            price: "49800.0".to_string(),
1252            size: "0.75".to_string(),
1253            fee: "18.675".to_string(), // Positive fee for taker
1254            created_at: "2024-01-15T14:00:00Z".to_string(),
1255            created_at_height: "14000".to_string(),
1256            order_id: "order_taker".to_string(),
1257            client_metadata: "300".to_string(),
1258        };
1259
1260        let account_id = AccountId::new("DYDX-001");
1261        let ts_init = UnixNanos::default();
1262
1263        let result = parse_ws_fill_report(&ws_fill, &instruments, account_id, ts_init);
1264        assert!(result.is_ok());
1265
1266        let fill_report = result.unwrap();
1267        assert_eq!(fill_report.liquidity_side, LiquiditySide::Taker);
1268        assert_eq!(fill_report.order_side, OrderSide::Sell);
1269        // Commission should be negative (cost to trader) after negating positive fee
1270        assert!(fill_report.commission.as_f64() < 0.0);
1271    }
1272}