1use std::str::FromStr;
23
24use anyhow::Context;
25use chrono::{DateTime, Utc};
26use dashmap::DashMap;
27use nautilus_core::UnixNanos;
28use nautilus_model::{
29 data::{Bar, BarType, BookOrder, Data, OrderBookDelta, OrderBookDeltas, TradeTick},
30 enums::{AggressorSide, BookAction, OrderSide, OrderStatus, RecordFlag},
31 identifiers::{AccountId, InstrumentId, TradeId},
32 instruments::{Instrument, InstrumentAny},
33 reports::{FillReport, OrderStatusReport, PositionStatusReport},
34 types::{Price, Quantity},
35};
36use rust_decimal::Decimal;
37
38use super::{DydxWsError, DydxWsResult};
39use crate::{
40 common::{
41 enums::{DydxOrderStatus, DydxTickerType},
42 instrument_cache::InstrumentCache,
43 },
44 execution::{encoder::ClientOrderIdEncoder, types::OrderContext},
45 http::{
46 models::{Fill, Order, PerpetualPosition},
47 parse::{parse_fill_report, parse_order_status_report, parse_position_status_report},
48 },
49 websocket::messages::{
50 DydxCandle, DydxOrderbookContents, DydxOrderbookSnapshotContents, DydxPerpetualPosition,
51 DydxTradeContents, DydxWsFillSubaccountMessageContents,
52 DydxWsOrderSubaccountMessageContents,
53 },
54};
55
56pub fn parse_ws_order_report(
78 ws_order: &DydxWsOrderSubaccountMessageContents,
79 instrument_cache: &InstrumentCache,
80 order_contexts: &DashMap<u32, OrderContext>,
81 encoder: &ClientOrderIdEncoder,
82 account_id: AccountId,
83 ts_init: UnixNanos,
84) -> anyhow::Result<OrderStatusReport> {
85 let clob_pair_id: u32 = ws_order.clob_pair_id.parse().context(format!(
86 "Failed to parse clob_pair_id '{}'",
87 ws_order.clob_pair_id
88 ))?;
89
90 let instrument = instrument_cache
91 .get_by_clob_id(clob_pair_id)
92 .ok_or_else(|| {
93 instrument_cache.log_missing_clob_pair_id(clob_pair_id);
94 anyhow::anyhow!("No instrument cached for clob_pair_id {clob_pair_id}")
95 })?;
96
97 let http_order = convert_ws_order_to_http(ws_order)?;
98 let mut report = parse_order_status_report(&http_order, &instrument, account_id, ts_init)?;
99
100 let dydx_client_id = ws_order.client_id.parse::<u32>().ok();
101 let dydx_client_metadata = ws_order
102 .client_metadata
103 .as_ref()
104 .and_then(|s| s.parse::<u32>().ok())
105 .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
106
107 log::info!(
108 "[WS_ORDER_RECV] dYdX client_id='{}' meta={:#x} (parsed u32={:?}) | status={:?} | clob_pair={} | side={:?} | size={} | filled={}",
109 ws_order.client_id,
110 dydx_client_metadata,
111 dydx_client_id,
112 ws_order.status,
113 ws_order.clob_pair_id,
114 ws_order.side,
115 ws_order.size,
116 ws_order.total_filled.as_deref().unwrap_or("?")
117 );
118
119 if let Some(client_id) = dydx_client_id {
122 if let Some(ctx) = order_contexts.get(&client_id) {
123 log::info!(
124 "[WS_ORDER_RECV] DECODE via order_contexts: dYdX u32={} -> Nautilus '{}'",
125 client_id,
126 ctx.client_order_id
127 );
128 report.client_order_id = Some(ctx.client_order_id);
129 } else if let Some(client_order_id) = encoder.decode(client_id, dydx_client_metadata) {
130 log::info!(
132 "[WS_ORDER_RECV] DECODE via encoder fallback: dYdX u32={client_id} meta={dydx_client_metadata:#x} -> Nautilus '{client_order_id}'"
133 );
134 report.client_order_id = Some(client_order_id);
135 } else {
136 log::warn!(
137 "[WS_ORDER_RECV] DECODE FAILED: dYdX u32={client_id} meta={dydx_client_metadata:#x} not found in order_contexts or encoder!"
138 );
139 }
140 } else {
141 log::warn!(
142 "[WS_ORDER_RECV] Could not parse client_id '{}' as u32",
143 ws_order.client_id
144 );
145 }
146
147 if matches!(ws_order.status, DydxOrderStatus::Untriggered) && ws_order.trigger_price.is_some() {
151 report.order_status = OrderStatus::PendingUpdate;
152 }
153
154 Ok(report)
155}
156
157fn convert_ws_order_to_http(
163 ws_order: &DydxWsOrderSubaccountMessageContents,
164) -> anyhow::Result<Order> {
165 let clob_pair_id: u32 = ws_order
166 .clob_pair_id
167 .parse()
168 .context("Failed to parse clob_pair_id")?;
169
170 let size: Decimal = ws_order.size.parse().context("Failed to parse size")?;
171
172 let total_filled: Decimal = ws_order
173 .total_filled
174 .as_ref()
175 .map(|s| s.parse())
176 .transpose()
177 .context("Failed to parse total_filled")?
178 .unwrap_or(Decimal::ZERO);
179
180 let remaining_size = (size - total_filled).max(Decimal::ZERO);
182
183 let price: Decimal = ws_order.price.parse().context("Failed to parse price")?;
184
185 let created_at_height: u64 = ws_order
186 .created_at_height
187 .as_ref()
188 .map(|s| s.parse())
189 .transpose()
190 .context("Failed to parse created_at_height")?
191 .unwrap_or(0);
192
193 let client_metadata: u32 = ws_order
194 .client_metadata
195 .as_ref()
196 .ok_or_else(|| anyhow::anyhow!("Missing required field: client_metadata"))?
197 .parse()
198 .context("Failed to parse client_metadata")?;
199
200 let order_flags: u32 = ws_order
201 .order_flags
202 .parse()
203 .context("Failed to parse order_flags")?;
204
205 let good_til_block = ws_order
206 .good_til_block
207 .as_ref()
208 .and_then(|s| s.parse::<u64>().ok());
209
210 let good_til_block_time = ws_order
211 .good_til_block_time
212 .as_ref()
213 .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
214 .map(|dt| dt.with_timezone(&Utc));
215
216 let trigger_price = ws_order
217 .trigger_price
218 .as_ref()
219 .and_then(|s| Decimal::from_str(s).ok());
220
221 let updated_at = ws_order
223 .updated_at
224 .as_ref()
225 .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
226 .map(|dt| dt.with_timezone(&Utc));
227
228 let updated_at_height = ws_order
230 .updated_at_height
231 .as_ref()
232 .and_then(|s| s.parse::<u64>().ok());
233
234 let total_filled = size.checked_sub(remaining_size).unwrap_or(Decimal::ZERO);
235
236 Ok(Order {
237 id: ws_order.id.clone(),
238 subaccount_id: ws_order.subaccount_id.clone(),
239 client_id: ws_order.client_id.clone(),
240 clob_pair_id,
241 side: ws_order.side,
242 size,
243 total_filled,
244 price,
245 status: ws_order.status,
246 order_type: ws_order.order_type,
247 time_in_force: ws_order.time_in_force,
248 reduce_only: ws_order.reduce_only,
249 post_only: ws_order.post_only,
250 order_flags,
251 good_til_block,
252 good_til_block_time,
253 created_at_height: Some(created_at_height),
254 client_metadata,
255 trigger_price,
256 condition_type: None, conditional_order_trigger_subticks: None, execution: None, updated_at,
260 updated_at_height,
261 ticker: None, subaccount_number: 0, order_router_address: None, })
265}
266
267pub fn parse_ws_fill_report(
280 ws_fill: &DydxWsFillSubaccountMessageContents,
281 instrument_cache: &InstrumentCache,
282 order_id_map: &DashMap<String, (u32, u32)>,
283 order_contexts: &DashMap<u32, OrderContext>,
284 encoder: &ClientOrderIdEncoder,
285 account_id: AccountId,
286 ts_init: UnixNanos,
287) -> anyhow::Result<FillReport> {
288 let instrument = instrument_cache
289 .get_by_market(&ws_fill.market)
290 .ok_or_else(|| {
291 let available: Vec<String> = instrument_cache
292 .all_instruments()
293 .into_iter()
294 .map(|inst| inst.id().symbol.to_string())
295 .collect();
296 anyhow::anyhow!(
297 "No instrument cached for market '{}'. Available: {:?}",
298 ws_fill.market,
299 available
300 )
301 })?;
302
303 let http_fill = convert_ws_fill_to_http(ws_fill)?;
304 let mut report = parse_fill_report(&http_fill, &instrument, account_id, ts_init)?;
305
306 if let Some(ref order_id) = ws_fill.order_id {
308 if let Some(entry) = order_id_map.get(order_id) {
309 let (client_id, client_metadata) = *entry.value();
310 if let Some(ctx) = order_contexts.get(&client_id) {
311 report.client_order_id = Some(ctx.client_order_id);
312 } else if let Some(client_order_id) = encoder.decode(client_id, client_metadata) {
313 report.client_order_id = Some(client_order_id);
314 } else {
315 log::warn!(
316 "[WS_FILL_RECV] DECODE FAILED: order_id={order_id} -> client_id={client_id} meta={client_metadata:#x} not decodable",
317 );
318 }
319 } else {
320 log::warn!(
321 "[WS_FILL_RECV] No order_id mapping for '{order_id}', fill cannot be correlated",
322 );
323 }
324 }
325
326 Ok(report)
327}
328
329fn convert_ws_fill_to_http(ws_fill: &DydxWsFillSubaccountMessageContents) -> anyhow::Result<Fill> {
335 let price: Decimal = ws_fill.price.parse().context("Failed to parse price")?;
336 let size: Decimal = ws_fill.size.parse().context("Failed to parse size")?;
337 let fee: Decimal = ws_fill.fee.parse().context("Failed to parse fee")?;
338
339 let created_at_height: u64 = ws_fill
340 .created_at_height
341 .as_ref()
342 .map(|s| s.parse())
343 .transpose()
344 .context("Failed to parse created_at_height")?
345 .unwrap_or(0);
346
347 let client_metadata: u32 = ws_fill
348 .client_metadata
349 .as_ref()
350 .ok_or_else(|| anyhow::anyhow!("Missing required field: client_metadata"))?
351 .parse()
352 .context("Failed to parse client_metadata")?;
353
354 let order_id = ws_fill
355 .order_id
356 .clone()
357 .ok_or_else(|| anyhow::anyhow!("Missing required field: order_id"))?;
358
359 let created_at = DateTime::parse_from_rfc3339(&ws_fill.created_at)
360 .context("Failed to parse created_at")?
361 .with_timezone(&Utc);
362
363 Ok(Fill {
364 id: ws_fill.id.clone(),
365 side: ws_fill.side,
366 liquidity: ws_fill.liquidity,
367 fill_type: ws_fill.fill_type,
368 market: ws_fill.market,
369 market_type: ws_fill.market_type.unwrap_or(DydxTickerType::Perpetual),
370 price,
371 size,
372 fee,
373 created_at,
374 created_at_height,
375 order_id,
376 client_metadata,
377 })
378}
379
380pub fn parse_ws_position_report(
392 ws_position: &DydxPerpetualPosition,
393 instrument_cache: &InstrumentCache,
394 account_id: AccountId,
395 ts_init: UnixNanos,
396) -> anyhow::Result<PositionStatusReport> {
397 let instrument = instrument_cache
398 .get_by_market(&ws_position.market)
399 .ok_or_else(|| {
400 let available: Vec<String> = instrument_cache
401 .all_instruments()
402 .into_iter()
403 .map(|inst| inst.id().symbol.to_string())
404 .collect();
405 anyhow::anyhow!(
406 "No instrument cached for market '{}'. Available: {:?}",
407 ws_position.market,
408 available
409 )
410 })?;
411
412 let http_position = convert_ws_position_to_http(ws_position)?;
413 parse_position_status_report(&http_position, &instrument, account_id, ts_init)
414}
415
416fn convert_ws_position_to_http(
422 ws_position: &DydxPerpetualPosition,
423) -> anyhow::Result<PerpetualPosition> {
424 let size: Decimal = ws_position.size.parse().context("Failed to parse size")?;
425
426 let max_size: Decimal = ws_position
427 .max_size
428 .parse()
429 .context("Failed to parse max_size")?;
430
431 let entry_price: Decimal = ws_position
432 .entry_price
433 .parse()
434 .context("Failed to parse entry_price")?;
435
436 let exit_price: Option<Decimal> = ws_position
437 .exit_price
438 .as_ref()
439 .map(|s| s.parse())
440 .transpose()
441 .context("Failed to parse exit_price")?;
442
443 let realized_pnl: Decimal = ws_position
444 .realized_pnl
445 .parse()
446 .context("Failed to parse realized_pnl")?;
447
448 let unrealized_pnl: Decimal = ws_position
449 .unrealized_pnl
450 .parse()
451 .context("Failed to parse unrealized_pnl")?;
452
453 let sum_open: Decimal = ws_position
454 .sum_open
455 .parse()
456 .context("Failed to parse sum_open")?;
457
458 let sum_close: Decimal = ws_position
459 .sum_close
460 .parse()
461 .context("Failed to parse sum_close")?;
462
463 let net_funding: Decimal = ws_position
464 .net_funding
465 .parse()
466 .context("Failed to parse net_funding")?;
467
468 let created_at = DateTime::parse_from_rfc3339(&ws_position.created_at)
469 .context("Failed to parse created_at")?
470 .with_timezone(&Utc);
471
472 let closed_at = ws_position
473 .closed_at
474 .as_ref()
475 .map(|s| DateTime::parse_from_rfc3339(s))
476 .transpose()
477 .context("Failed to parse closed_at")?
478 .map(|dt| dt.with_timezone(&Utc));
479
480 let side = if size.is_sign_positive() {
482 OrderSide::Buy
483 } else {
484 OrderSide::Sell
485 };
486
487 Ok(PerpetualPosition {
488 market: ws_position.market,
489 status: ws_position.status,
490 side,
491 size,
492 max_size,
493 entry_price,
494 exit_price,
495 realized_pnl,
496 created_at_height: 0, created_at,
498 sum_open,
499 sum_close,
500 net_funding,
501 unrealized_pnl,
502 closed_at,
503 })
504}
505
506pub fn parse_orderbook_snapshot(
516 instrument_id: &InstrumentId,
517 contents: &DydxOrderbookSnapshotContents,
518 price_precision: u8,
519 size_precision: u8,
520 ts_init: UnixNanos,
521) -> DydxWsResult<OrderBookDeltas> {
522 let mut deltas = Vec::new();
523 deltas.push(OrderBookDelta::clear(*instrument_id, 0, ts_init, ts_init));
524
525 let bids = contents.bids.as_deref().unwrap_or(&[]);
526 let asks = contents.asks.as_deref().unwrap_or(&[]);
527
528 let bids_len = bids.len();
529 let asks_len = asks.len();
530
531 for (idx, bid) in bids.iter().enumerate() {
532 let is_last = idx == bids_len - 1 && asks_len == 0;
533 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
534
535 let price = Decimal::from_str(&bid.price)
536 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
537
538 let size = Decimal::from_str(&bid.size)
539 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
540
541 let order = BookOrder::new(
542 OrderSide::Buy,
543 Price::from_decimal_dp(price, price_precision).map_err(|e| {
544 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
545 })?,
546 Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
547 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
548 })?,
549 0,
550 );
551
552 deltas.push(OrderBookDelta::new(
553 *instrument_id,
554 BookAction::Add,
555 order,
556 flags,
557 0,
558 ts_init,
559 ts_init,
560 ));
561 }
562
563 for (idx, ask) in asks.iter().enumerate() {
564 let is_last = idx == asks_len - 1;
565 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
566
567 let price = Decimal::from_str(&ask.price)
568 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
569
570 let size = Decimal::from_str(&ask.size)
571 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
572
573 let order = BookOrder::new(
574 OrderSide::Sell,
575 Price::from_decimal_dp(price, price_precision).map_err(|e| {
576 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
577 })?,
578 Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
579 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
580 })?,
581 0,
582 );
583
584 deltas.push(OrderBookDelta::new(
585 *instrument_id,
586 BookAction::Add,
587 order,
588 flags,
589 0,
590 ts_init,
591 ts_init,
592 ));
593 }
594
595 Ok(OrderBookDeltas::new(*instrument_id, deltas))
596}
597
598pub fn parse_orderbook_deltas(
604 instrument_id: &InstrumentId,
605 contents: &DydxOrderbookContents,
606 price_precision: u8,
607 size_precision: u8,
608 ts_init: UnixNanos,
609) -> DydxWsResult<OrderBookDeltas> {
610 let deltas = parse_orderbook_deltas_with_flag(
611 instrument_id,
612 contents,
613 price_precision,
614 size_precision,
615 ts_init,
616 true,
617 )?;
618 Ok(OrderBookDeltas::new(*instrument_id, deltas))
619}
620
621#[allow(clippy::too_many_arguments)]
627pub fn parse_orderbook_deltas_with_flag(
628 instrument_id: &InstrumentId,
629 contents: &DydxOrderbookContents,
630 price_precision: u8,
631 size_precision: u8,
632 ts_init: UnixNanos,
633 is_last_message: bool,
634) -> DydxWsResult<Vec<OrderBookDelta>> {
635 let mut deltas = Vec::new();
636
637 let bids = contents.bids.as_deref().unwrap_or(&[]);
638 let asks = contents.asks.as_deref().unwrap_or(&[]);
639
640 let bids_len = bids.len();
641 let asks_len = asks.len();
642
643 for (idx, (price_str, size_str)) in bids.iter().enumerate() {
644 let is_last = is_last_message && idx == bids_len - 1 && asks_len == 0;
645 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
646
647 let price = Decimal::from_str(price_str)
648 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
649
650 let size = Decimal::from_str(size_str)
651 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
652
653 let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
654 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
655 })?;
656 let action = if qty.is_zero() {
657 BookAction::Delete
658 } else {
659 BookAction::Update
660 };
661
662 let order = BookOrder::new(
663 OrderSide::Buy,
664 Price::from_decimal_dp(price, price_precision).map_err(|e| {
665 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
666 })?,
667 qty,
668 0,
669 );
670
671 deltas.push(OrderBookDelta::new(
672 *instrument_id,
673 action,
674 order,
675 flags,
676 0,
677 ts_init,
678 ts_init,
679 ));
680 }
681
682 for (idx, (price_str, size_str)) in asks.iter().enumerate() {
683 let is_last = is_last_message && idx == asks_len - 1;
684 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
685
686 let price = Decimal::from_str(price_str)
687 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
688
689 let size = Decimal::from_str(size_str)
690 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
691
692 let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
693 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
694 })?;
695 let action = if qty.is_zero() {
696 BookAction::Delete
697 } else {
698 BookAction::Update
699 };
700
701 let order = BookOrder::new(
702 OrderSide::Sell,
703 Price::from_decimal_dp(price, price_precision).map_err(|e| {
704 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
705 })?,
706 qty,
707 0,
708 );
709
710 deltas.push(OrderBookDelta::new(
711 *instrument_id,
712 action,
713 order,
714 flags,
715 0,
716 ts_init,
717 ts_init,
718 ));
719 }
720
721 Ok(deltas)
722}
723
724pub fn parse_trade_ticks(
730 instrument_id: InstrumentId,
731 instrument: &InstrumentAny,
732 contents: &DydxTradeContents,
733 ts_init: UnixNanos,
734) -> DydxWsResult<Vec<Data>> {
735 let mut ticks = Vec::new();
736
737 for trade in &contents.trades {
738 let aggressor_side = match trade.side {
739 OrderSide::Buy => AggressorSide::Buyer,
740 OrderSide::Sell => AggressorSide::Seller,
741 _ => continue,
742 };
743
744 let price = Decimal::from_str(&trade.price)
745 .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade price: {e}")))?;
746
747 let size = Decimal::from_str(&trade.size)
748 .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade size: {e}")))?;
749
750 let trade_ts = trade.created_at.timestamp_nanos_opt().ok_or_else(|| {
751 DydxWsError::Parse(format!("Timestamp out of range for trade {}", trade.id))
752 })?;
753
754 let tick = TradeTick::new(
755 instrument_id,
756 Price::from_decimal_dp(price, instrument.price_precision()).map_err(|e| {
757 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
758 })?,
759 Quantity::from_decimal_dp(size, instrument.size_precision()).map_err(|e| {
760 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
761 })?,
762 aggressor_side,
763 TradeId::new(&trade.id),
764 UnixNanos::from(trade_ts as u64),
765 ts_init,
766 );
767 ticks.push(Data::Trade(tick));
768 }
769
770 Ok(ticks)
771}
772
773pub fn parse_candle_bar(
782 bar_type: BarType,
783 instrument: &InstrumentAny,
784 candle: &DydxCandle,
785 timestamp_on_close: bool,
786 ts_init: UnixNanos,
787) -> DydxWsResult<Bar> {
788 let open = Decimal::from_str(&candle.open)
789 .map_err(|e| DydxWsError::Parse(format!("Failed to parse open: {e}")))?;
790 let high = Decimal::from_str(&candle.high)
791 .map_err(|e| DydxWsError::Parse(format!("Failed to parse high: {e}")))?;
792 let low = Decimal::from_str(&candle.low)
793 .map_err(|e| DydxWsError::Parse(format!("Failed to parse low: {e}")))?;
794 let close = Decimal::from_str(&candle.close)
795 .map_err(|e| DydxWsError::Parse(format!("Failed to parse close: {e}")))?;
796 let volume = candle
797 .base_token_volume
798 .as_deref()
799 .map(Decimal::from_str)
800 .transpose()
801 .map_err(|e| DydxWsError::Parse(format!("Failed to parse volume: {e}")))?
802 .unwrap_or(Decimal::ZERO);
803
804 let started_at_nanos = candle.started_at.timestamp_nanos_opt().ok_or_else(|| {
805 DydxWsError::Parse(format!(
806 "Timestamp out of range for candle at {}",
807 candle.started_at
808 ))
809 })?;
810 let mut ts_event = UnixNanos::from(started_at_nanos as u64);
811 if timestamp_on_close {
812 let interval_ns = bar_type
813 .spec()
814 .timedelta()
815 .num_nanoseconds()
816 .ok_or_else(|| DydxWsError::Parse("Bar interval overflow".to_string()))?;
817 let updated = (started_at_nanos as u64)
818 .checked_add(interval_ns as u64)
819 .ok_or_else(|| {
820 DydxWsError::Parse("Bar timestamp overflowed adjusting to close time".to_string())
821 })?;
822 ts_event = UnixNanos::from(updated);
823 }
824
825 let bar = Bar::new(
826 bar_type,
827 Price::from_decimal_dp(open, instrument.price_precision()).map_err(|e| {
828 DydxWsError::Parse(format!("Failed to create open Price from decimal: {e}"))
829 })?,
830 Price::from_decimal_dp(high, instrument.price_precision()).map_err(|e| {
831 DydxWsError::Parse(format!("Failed to create high Price from decimal: {e}"))
832 })?,
833 Price::from_decimal_dp(low, instrument.price_precision()).map_err(|e| {
834 DydxWsError::Parse(format!("Failed to create low Price from decimal: {e}"))
835 })?,
836 Price::from_decimal_dp(close, instrument.price_precision()).map_err(|e| {
837 DydxWsError::Parse(format!("Failed to create close Price from decimal: {e}"))
838 })?,
839 Quantity::from_decimal_dp(volume, instrument.size_precision()).map_err(|e| {
840 DydxWsError::Parse(format!(
841 "Failed to create volume Quantity from decimal: {e}"
842 ))
843 })?,
844 ts_event,
845 ts_init,
846 );
847
848 Ok(bar)
849}
850
851#[cfg(test)]
852mod tests {
853 use std::str::FromStr;
854
855 use nautilus_model::{
856 data::{BarType, Data},
857 enums::{
858 AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType,
859 PositionSideSpecified,
860 },
861 identifiers::{AccountId, InstrumentId, Symbol, Venue},
862 instruments::{CryptoPerpetual, InstrumentAny},
863 types::{Currency, Price, Quantity},
864 };
865 use rstest::rstest;
866 use rust_decimal_macros::dec;
867 use ustr::Ustr;
868
869 use super::*;
870 use crate::{
871 common::{
872 enums::{
873 DydxFillType, DydxLiquidity, DydxMarketStatus, DydxOrderStatus, DydxOrderType,
874 DydxPositionSide, DydxPositionStatus, DydxTickerType, DydxTimeInForce,
875 },
876 testing::load_json_fixture,
877 },
878 http::models::PerpetualMarket,
879 websocket::messages::{DydxPerpetualPosition, DydxWsFillSubaccountMessageContents},
880 };
881
882 fn create_test_market(ticker: &str, clob_pair_id: u32) -> PerpetualMarket {
884 PerpetualMarket {
885 clob_pair_id,
886 ticker: Ustr::from(ticker),
887 status: DydxMarketStatus::Active,
888 base_asset: Some(Ustr::from("BTC")),
889 quote_asset: Some(Ustr::from("USD")),
890 step_size: dec!(0.001),
891 tick_size: dec!(0.01),
892 index_price: Some(dec!(50000)),
893 oracle_price: dec!(50000),
894 price_change_24h: dec!(0),
895 next_funding_rate: dec!(0),
896 next_funding_at: None,
897 min_order_size: Some(dec!(0.001)),
898 market_type: None,
899 initial_margin_fraction: dec!(0.05),
900 maintenance_margin_fraction: dec!(0.03),
901 base_position_notional: None,
902 incremental_position_size: None,
903 incremental_initial_margin_fraction: None,
904 max_position_size: None,
905 open_interest: dec!(1000),
906 atomic_resolution: -10,
907 quantum_conversion_exponent: -9,
908 subticks_per_tick: 1000000,
909 step_base_quantums: 1000000,
910 is_reduce_only: false,
911 }
912 }
913
914 fn create_test_instrument_cache() -> InstrumentCache {
916 let cache = InstrumentCache::new();
917 let instrument = create_test_instrument();
918 let market = create_test_market("BTC-USD", 1);
919 cache.insert(instrument, market);
920 cache
921 }
922
923 fn create_test_instrument() -> InstrumentAny {
924 let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
925
926 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
927 instrument_id,
928 Symbol::new("BTC-USD"),
929 Currency::BTC(),
930 Currency::USD(),
931 Currency::USD(),
932 false,
933 2,
934 8,
935 Price::new(0.01, 2),
936 Quantity::new(0.001, 8),
937 Some(Quantity::new(1.0, 0)),
938 Some(Quantity::new(0.001, 8)),
939 Some(Quantity::new(100000.0, 8)),
940 Some(Quantity::new(0.001, 8)),
941 None,
942 None,
943 Some(Price::new(1000000.0, 2)),
944 Some(Price::new(0.01, 2)),
945 Some(rust_decimal_macros::dec!(0.05)),
946 Some(rust_decimal_macros::dec!(0.03)),
947 Some(rust_decimal_macros::dec!(0.0002)),
948 Some(rust_decimal_macros::dec!(0.0005)),
949 UnixNanos::default(),
950 UnixNanos::default(),
951 ))
952 }
953
954 #[rstest]
955 fn test_convert_ws_order_to_http_basic() {
956 let ws_order = DydxWsOrderSubaccountMessageContents {
957 id: "order123".to_string(),
958 subaccount_id: "dydx1test/0".to_string(),
959 client_id: "12345".to_string(),
960 clob_pair_id: "1".to_string(),
961 side: OrderSide::Buy,
962 size: "1.5".to_string(),
963 price: "50000.0".to_string(),
964 status: DydxOrderStatus::PartiallyFilled,
965 order_type: DydxOrderType::Limit,
966 time_in_force: DydxTimeInForce::Gtt,
967 post_only: false,
968 reduce_only: false,
969 order_flags: "0".to_string(),
970 good_til_block: Some("1000".to_string()),
971 good_til_block_time: None,
972 created_at_height: Some("900".to_string()),
973 client_metadata: Some("0".to_string()),
974 trigger_price: None,
975 total_filled: Some("0.5".to_string()),
976 updated_at: Some("2024-11-14T10:00:00Z".to_string()),
977 updated_at_height: Some("950".to_string()),
978 };
979
980 let result = convert_ws_order_to_http(&ws_order);
981 assert!(result.is_ok());
982
983 let http_order = result.unwrap();
984 assert_eq!(http_order.id, "order123");
985 assert_eq!(http_order.clob_pair_id, 1);
986 assert_eq!(http_order.size.to_string(), "1.5");
987 assert_eq!(http_order.total_filled, rust_decimal_macros::dec!(0.5)); assert_eq!(http_order.status, DydxOrderStatus::PartiallyFilled);
989 }
990
991 #[rstest]
992 fn test_parse_ws_order_report_success() {
993 let ws_order = DydxWsOrderSubaccountMessageContents {
994 id: "order456".to_string(),
995 subaccount_id: "dydx1test/0".to_string(),
996 client_id: "67890".to_string(),
997 clob_pair_id: "1".to_string(),
998 side: OrderSide::Sell,
999 size: "2.0".to_string(),
1000 price: "51000.0".to_string(),
1001 status: DydxOrderStatus::Open,
1002 order_type: DydxOrderType::Limit,
1003 time_in_force: DydxTimeInForce::Gtt,
1004 post_only: true,
1005 reduce_only: false,
1006 order_flags: "0".to_string(),
1007 good_til_block: Some("2000".to_string()),
1008 good_til_block_time: None,
1009 created_at_height: Some("1800".to_string()),
1010 client_metadata: Some("0".to_string()),
1011 trigger_price: None,
1012 total_filled: Some("0.0".to_string()),
1013 updated_at: None,
1014 updated_at_height: None,
1015 };
1016
1017 let instrument_cache = create_test_instrument_cache();
1018 let encoder = ClientOrderIdEncoder::new();
1019
1020 let account_id = AccountId::new("DYDX-001");
1021 let ts_init = UnixNanos::default();
1022 let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1023
1024 let result = parse_ws_order_report(
1025 &ws_order,
1026 &instrument_cache,
1027 &order_contexts,
1028 &encoder,
1029 account_id,
1030 ts_init,
1031 );
1032
1033 assert!(result.is_ok());
1034 let report = result.unwrap();
1035 assert_eq!(report.account_id, account_id);
1036 assert_eq!(report.order_side, OrderSide::Sell);
1037 }
1038
1039 #[rstest]
1040 fn test_parse_ws_order_report_missing_instrument() {
1041 let ws_order = DydxWsOrderSubaccountMessageContents {
1042 id: "order789".to_string(),
1043 subaccount_id: "dydx1test/0".to_string(),
1044 client_id: "11111".to_string(),
1045 clob_pair_id: "99".to_string(), side: OrderSide::Buy,
1047 size: "1.0".to_string(),
1048 price: "50000.0".to_string(),
1049 status: DydxOrderStatus::Open,
1050 order_type: DydxOrderType::Market,
1051 time_in_force: DydxTimeInForce::Ioc,
1052 post_only: false,
1053 reduce_only: false,
1054 order_flags: "0".to_string(),
1055 good_til_block: Some("1000".to_string()),
1056 good_til_block_time: None,
1057 created_at_height: Some("900".to_string()),
1058 client_metadata: Some("0".to_string()),
1059 trigger_price: None,
1060 total_filled: Some("0.0".to_string()),
1061 updated_at: None,
1062 updated_at_height: None,
1063 };
1064
1065 let instrument_cache = InstrumentCache::new(); let encoder = ClientOrderIdEncoder::new();
1067 let account_id = AccountId::new("DYDX-001");
1068 let ts_init = UnixNanos::default();
1069 let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1070
1071 let result = parse_ws_order_report(
1072 &ws_order,
1073 &instrument_cache,
1074 &order_contexts,
1075 &encoder,
1076 account_id,
1077 ts_init,
1078 );
1079
1080 assert!(result.is_err());
1081 assert!(
1082 result
1083 .unwrap_err()
1084 .to_string()
1085 .contains("No instrument cached")
1086 );
1087 }
1088
1089 #[rstest]
1090 fn test_convert_ws_fill_to_http() {
1091 let ws_fill = DydxWsFillSubaccountMessageContents {
1092 id: "fill123".to_string(),
1093 subaccount_id: "sub1".to_string(),
1094 side: OrderSide::Buy,
1095 liquidity: DydxLiquidity::Maker,
1096 fill_type: DydxFillType::Limit,
1097 market: "BTC-USD".into(),
1098 market_type: Some(DydxTickerType::Perpetual),
1099 price: "50000.5".to_string(),
1100 size: "0.1".to_string(),
1101 fee: "-2.5".to_string(), created_at: "2024-01-15T10:30:00Z".to_string(),
1103 created_at_height: Some("12345".to_string()),
1104 order_id: Some("order456".to_string()),
1105 client_metadata: Some("999".to_string()),
1106 };
1107
1108 let result = convert_ws_fill_to_http(&ws_fill);
1109 assert!(result.is_ok());
1110
1111 let http_fill = result.unwrap();
1112 assert_eq!(http_fill.id, "fill123");
1113 assert_eq!(http_fill.side, OrderSide::Buy);
1114 assert_eq!(http_fill.liquidity, DydxLiquidity::Maker);
1115 assert_eq!(http_fill.price, rust_decimal_macros::dec!(50000.5));
1116 assert_eq!(http_fill.size, rust_decimal_macros::dec!(0.1));
1117 assert_eq!(http_fill.fee, rust_decimal_macros::dec!(-2.5));
1118 assert_eq!(http_fill.created_at_height, 12345);
1119 assert_eq!(http_fill.order_id, "order456");
1120 assert_eq!(http_fill.client_metadata, 999);
1121 }
1122
1123 #[rstest]
1124 fn test_parse_ws_fill_report_success() {
1125 let instrument_cache = create_test_instrument_cache();
1126 let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1127
1128 let ws_fill = DydxWsFillSubaccountMessageContents {
1131 id: "fill789".to_string(),
1132 subaccount_id: "sub1".to_string(),
1133 side: OrderSide::Sell,
1134 liquidity: DydxLiquidity::Taker,
1135 fill_type: DydxFillType::Limit,
1136 market: "BTC-USD".into(),
1137 market_type: Some(DydxTickerType::Perpetual),
1138 price: "49500.0".to_string(),
1139 size: "0.5".to_string(),
1140 fee: "12.375".to_string(), created_at: "2024-01-15T11:00:00Z".to_string(),
1142 created_at_height: Some("12400".to_string()),
1143 order_id: Some("order999".to_string()),
1144 client_metadata: Some("888".to_string()),
1145 };
1146
1147 let account_id = AccountId::new("DYDX-001");
1148 let ts_init = UnixNanos::default();
1149 let order_id_map = DashMap::new();
1150 let order_contexts = DashMap::new();
1151 let encoder = ClientOrderIdEncoder::new();
1152
1153 let result = parse_ws_fill_report(
1154 &ws_fill,
1155 &instrument_cache,
1156 &order_id_map,
1157 &order_contexts,
1158 &encoder,
1159 account_id,
1160 ts_init,
1161 );
1162 assert!(result.is_ok());
1163
1164 let fill_report = result.unwrap();
1165 assert_eq!(fill_report.instrument_id, instrument_id);
1166 assert_eq!(fill_report.venue_order_id.as_str(), "order999");
1167 assert_eq!(fill_report.last_qty.as_f64(), 0.5);
1168 assert_eq!(fill_report.last_px.as_f64(), 49500.0);
1169 assert_eq!(fill_report.commission.as_decimal(), dec!(12.38));
1170 }
1171
1172 #[rstest]
1173 fn test_parse_ws_fill_report_missing_instrument() {
1174 let instrument_cache = InstrumentCache::new(); let ws_fill = DydxWsFillSubaccountMessageContents {
1177 id: "fill000".to_string(),
1178 subaccount_id: "sub1".to_string(),
1179 side: OrderSide::Buy,
1180 liquidity: DydxLiquidity::Maker,
1181 fill_type: DydxFillType::Limit,
1182 market: "ETH-USD-PERP".into(),
1183 market_type: Some(DydxTickerType::Perpetual),
1184 price: "3000.0".to_string(),
1185 size: "1.0".to_string(),
1186 fee: "-1.5".to_string(),
1187 created_at: "2024-01-15T12:00:00Z".to_string(),
1188 created_at_height: Some("12500".to_string()),
1189 order_id: Some("order111".to_string()),
1190 client_metadata: Some("777".to_string()),
1191 };
1192
1193 let account_id = AccountId::new("DYDX-001");
1194 let ts_init = UnixNanos::default();
1195 let order_id_map = DashMap::new();
1196 let order_contexts = DashMap::new();
1197 let encoder = ClientOrderIdEncoder::new();
1198
1199 let result = parse_ws_fill_report(
1200 &ws_fill,
1201 &instrument_cache,
1202 &order_id_map,
1203 &order_contexts,
1204 &encoder,
1205 account_id,
1206 ts_init,
1207 );
1208 assert!(result.is_err());
1209 assert!(
1210 result
1211 .unwrap_err()
1212 .to_string()
1213 .contains("No instrument cached for market")
1214 );
1215 }
1216
1217 #[rstest]
1218 fn test_convert_ws_position_to_http() {
1219 let ws_position = DydxPerpetualPosition {
1220 market: "BTC-USD".into(),
1221 status: DydxPositionStatus::Open,
1222 side: DydxPositionSide::Long,
1223 size: "1.5".to_string(),
1224 max_size: "2.0".to_string(),
1225 entry_price: "50000.0".to_string(),
1226 exit_price: None,
1227 realized_pnl: "100.0".to_string(),
1228 unrealized_pnl: "250.5".to_string(),
1229 created_at: "2024-01-15T10:00:00Z".to_string(),
1230 closed_at: None,
1231 sum_open: "5.0".to_string(),
1232 sum_close: "3.5".to_string(),
1233 net_funding: "-10.25".to_string(),
1234 };
1235
1236 let result = convert_ws_position_to_http(&ws_position);
1237 assert!(result.is_ok());
1238
1239 let http_position = result.unwrap();
1240 assert_eq!(http_position.market, "BTC-USD");
1241 assert_eq!(http_position.status, DydxPositionStatus::Open);
1242 assert_eq!(http_position.side, OrderSide::Buy); assert_eq!(http_position.size, rust_decimal_macros::dec!(1.5));
1244 assert_eq!(http_position.max_size, rust_decimal_macros::dec!(2.0));
1245 assert_eq!(
1246 http_position.entry_price,
1247 rust_decimal_macros::dec!(50000.0)
1248 );
1249 assert_eq!(http_position.exit_price, None);
1250 assert_eq!(http_position.realized_pnl, rust_decimal_macros::dec!(100.0));
1251 assert_eq!(
1252 http_position.unrealized_pnl,
1253 rust_decimal_macros::dec!(250.5)
1254 );
1255 assert_eq!(http_position.sum_open, rust_decimal_macros::dec!(5.0));
1256 assert_eq!(http_position.sum_close, rust_decimal_macros::dec!(3.5));
1257 assert_eq!(http_position.net_funding, rust_decimal_macros::dec!(-10.25));
1258 }
1259
1260 #[rstest]
1261 fn test_parse_ws_position_report_success() {
1262 let instrument_cache = create_test_instrument_cache();
1263 let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1264
1265 let ws_position = DydxPerpetualPosition {
1266 market: "BTC-USD".into(),
1267 status: DydxPositionStatus::Open,
1268 side: DydxPositionSide::Long,
1269 size: "0.5".to_string(),
1270 max_size: "1.0".to_string(),
1271 entry_price: "49500.0".to_string(),
1272 exit_price: None,
1273 realized_pnl: "0.0".to_string(),
1274 unrealized_pnl: "125.0".to_string(),
1275 created_at: "2024-01-15T09:00:00Z".to_string(),
1276 closed_at: None,
1277 sum_open: "0.5".to_string(),
1278 sum_close: "0.0".to_string(),
1279 net_funding: "-2.5".to_string(),
1280 };
1281
1282 let account_id = AccountId::new("DYDX-001");
1283 let ts_init = UnixNanos::default();
1284
1285 let result = parse_ws_position_report(&ws_position, &instrument_cache, account_id, ts_init);
1286 assert!(result.is_ok());
1287
1288 let position_report = result.unwrap();
1289 assert_eq!(position_report.instrument_id, instrument_id);
1290 assert_eq!(position_report.position_side, PositionSideSpecified::Long);
1291 assert_eq!(position_report.quantity.as_f64(), 0.5);
1292 assert!(position_report.avg_px_open.is_some());
1294 }
1295
1296 #[rstest]
1297 fn test_parse_ws_position_report_short() {
1298 let instrument_cache = create_test_instrument_cache();
1299 let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1300
1301 let ws_position = DydxPerpetualPosition {
1302 market: "BTC-USD".into(),
1303 status: DydxPositionStatus::Open,
1304 side: DydxPositionSide::Short,
1305 size: "-0.25".to_string(), max_size: "0.5".to_string(),
1307 entry_price: "51000.0".to_string(),
1308 exit_price: None,
1309 realized_pnl: "50.0".to_string(),
1310 unrealized_pnl: "-75.25".to_string(),
1311 created_at: "2024-01-15T08:00:00Z".to_string(),
1312 closed_at: None,
1313 sum_open: "0.25".to_string(),
1314 sum_close: "0.0".to_string(),
1315 net_funding: "1.5".to_string(),
1316 };
1317
1318 let account_id = AccountId::new("DYDX-001");
1319 let ts_init = UnixNanos::default();
1320
1321 let result = parse_ws_position_report(&ws_position, &instrument_cache, account_id, ts_init);
1322 assert!(result.is_ok());
1323
1324 let position_report = result.unwrap();
1325 assert_eq!(position_report.instrument_id, instrument_id);
1326 assert_eq!(position_report.position_side, PositionSideSpecified::Short);
1327 assert_eq!(position_report.quantity.as_f64(), 0.25); }
1329
1330 #[rstest]
1331 fn test_parse_ws_position_report_missing_instrument() {
1332 let instrument_cache = InstrumentCache::new(); let ws_position = DydxPerpetualPosition {
1335 market: "ETH-USD-PERP".into(),
1336 status: DydxPositionStatus::Open,
1337 side: DydxPositionSide::Long,
1338 size: "5.0".to_string(),
1339 max_size: "10.0".to_string(),
1340 entry_price: "3000.0".to_string(),
1341 exit_price: None,
1342 realized_pnl: "0.0".to_string(),
1343 unrealized_pnl: "500.0".to_string(),
1344 created_at: "2024-01-15T07:00:00Z".to_string(),
1345 closed_at: None,
1346 sum_open: "5.0".to_string(),
1347 sum_close: "0.0".to_string(),
1348 net_funding: "-5.0".to_string(),
1349 };
1350
1351 let account_id = AccountId::new("DYDX-001");
1352 let ts_init = UnixNanos::default();
1353
1354 let result = parse_ws_position_report(&ws_position, &instrument_cache, account_id, ts_init);
1355 assert!(result.is_err());
1356 assert!(
1357 result
1358 .unwrap_err()
1359 .to_string()
1360 .contains("No instrument cached for market")
1361 );
1362 }
1363
1364 #[rstest]
1365 #[case(DydxOrderStatus::Filled, "2.0")]
1366 #[case(DydxOrderStatus::Canceled, "0.0")]
1367 #[case(DydxOrderStatus::BestEffortCanceled, "0.5")]
1368 #[case(DydxOrderStatus::BestEffortOpened, "0.0")]
1369 #[case(DydxOrderStatus::Untriggered, "0.0")]
1370 fn test_parse_ws_order_various_statuses(
1371 #[case] status: DydxOrderStatus,
1372 #[case] total_filled: &str,
1373 ) {
1374 let ws_order = DydxWsOrderSubaccountMessageContents {
1375 id: format!("order_{status:?}"),
1376 subaccount_id: "dydx1test/0".to_string(),
1377 client_id: "99999".to_string(),
1378 clob_pair_id: "1".to_string(),
1379 side: OrderSide::Buy,
1380 size: "2.0".to_string(),
1381 price: "50000.0".to_string(),
1382 status,
1383 order_type: DydxOrderType::Limit,
1384 time_in_force: DydxTimeInForce::Gtt,
1385 post_only: false,
1386 reduce_only: false,
1387 order_flags: "0".to_string(),
1388 good_til_block: Some("1000".to_string()),
1389 good_til_block_time: None,
1390 created_at_height: Some("900".to_string()),
1391 client_metadata: Some("0".to_string()),
1392 trigger_price: None,
1393 total_filled: Some(total_filled.to_string()),
1394 updated_at: Some("2024-11-14T10:00:00Z".to_string()),
1395 updated_at_height: Some("950".to_string()),
1396 };
1397
1398 let instrument_cache = create_test_instrument_cache();
1399 let encoder = ClientOrderIdEncoder::new();
1400
1401 let account_id = AccountId::new("DYDX-001");
1402 let ts_init = UnixNanos::default();
1403 let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1404
1405 let result = parse_ws_order_report(
1406 &ws_order,
1407 &instrument_cache,
1408 &order_contexts,
1409 &encoder,
1410 account_id,
1411 ts_init,
1412 );
1413
1414 assert!(
1415 result.is_ok(),
1416 "Failed to parse order with status {status:?}"
1417 );
1418 let report = result.unwrap();
1419
1420 let expected_status = match status {
1422 DydxOrderStatus::Open
1423 | DydxOrderStatus::BestEffortOpened
1424 | DydxOrderStatus::Untriggered => OrderStatus::Accepted,
1425 DydxOrderStatus::PartiallyFilled => OrderStatus::PartiallyFilled,
1426 DydxOrderStatus::Filled => OrderStatus::Filled,
1427 DydxOrderStatus::Canceled | DydxOrderStatus::BestEffortCanceled => {
1428 OrderStatus::Canceled
1429 }
1430 };
1431 assert_eq!(report.order_status, expected_status);
1432 }
1433
1434 #[rstest]
1435 fn test_parse_ws_order_with_trigger_price() {
1436 let ws_order = DydxWsOrderSubaccountMessageContents {
1437 id: "conditional_order".to_string(),
1438 subaccount_id: "dydx1test/0".to_string(),
1439 client_id: "88888".to_string(),
1440 clob_pair_id: "1".to_string(),
1441 side: OrderSide::Sell,
1442 size: "1.0".to_string(),
1443 price: "52000.0".to_string(),
1444 status: DydxOrderStatus::Untriggered,
1445 order_type: DydxOrderType::StopLimit,
1446 time_in_force: DydxTimeInForce::Gtt,
1447 post_only: false,
1448 reduce_only: true,
1449 order_flags: "32".to_string(),
1450 good_til_block: None,
1451 good_til_block_time: Some("2024-12-31T23:59:59Z".to_string()),
1452 created_at_height: Some("1000".to_string()),
1453 client_metadata: Some("100".to_string()),
1454 trigger_price: Some("51500.0".to_string()),
1455 total_filled: Some("0.0".to_string()),
1456 updated_at: Some("2024-11-14T11:00:00Z".to_string()),
1457 updated_at_height: Some("1050".to_string()),
1458 };
1459
1460 let instrument_cache = create_test_instrument_cache();
1461 let encoder = ClientOrderIdEncoder::new();
1462
1463 let account_id = AccountId::new("DYDX-001");
1464 let ts_init = UnixNanos::default();
1465 let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1466
1467 let result = parse_ws_order_report(
1468 &ws_order,
1469 &instrument_cache,
1470 &order_contexts,
1471 &encoder,
1472 account_id,
1473 ts_init,
1474 );
1475
1476 assert!(result.is_ok());
1477 let report = result.unwrap();
1478 assert_eq!(report.order_status, OrderStatus::PendingUpdate);
1479 assert!(report.trigger_price.is_some());
1481 }
1482
1483 #[rstest]
1484 fn test_parse_ws_order_market_type() {
1485 let ws_order = DydxWsOrderSubaccountMessageContents {
1486 id: "market_order".to_string(),
1487 subaccount_id: "dydx1test/0".to_string(),
1488 client_id: "77777".to_string(),
1489 clob_pair_id: "1".to_string(),
1490 side: OrderSide::Buy,
1491 size: "0.5".to_string(),
1492 price: "50000.0".to_string(), status: DydxOrderStatus::Filled,
1494 order_type: DydxOrderType::Market,
1495 time_in_force: DydxTimeInForce::Ioc,
1496 post_only: false,
1497 reduce_only: false,
1498 order_flags: "0".to_string(),
1499 good_til_block: Some("1000".to_string()),
1500 good_til_block_time: None,
1501 created_at_height: Some("900".to_string()),
1502 client_metadata: Some("0".to_string()),
1503 trigger_price: None,
1504 total_filled: Some("0.5".to_string()),
1505 updated_at: Some("2024-11-14T10:01:00Z".to_string()),
1506 updated_at_height: Some("901".to_string()),
1507 };
1508
1509 let instrument_cache = create_test_instrument_cache();
1510 let encoder = ClientOrderIdEncoder::new();
1511
1512 let account_id = AccountId::new("DYDX-001");
1513 let ts_init = UnixNanos::default();
1514 let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1515
1516 let result = parse_ws_order_report(
1517 &ws_order,
1518 &instrument_cache,
1519 &order_contexts,
1520 &encoder,
1521 account_id,
1522 ts_init,
1523 );
1524
1525 assert!(result.is_ok());
1526 let report = result.unwrap();
1527 assert_eq!(report.order_type, OrderType::Market);
1528 assert_eq!(report.order_status, OrderStatus::Filled);
1529 }
1530
1531 #[rstest]
1532 fn test_parse_ws_order_invalid_clob_pair_id() {
1533 let ws_order = DydxWsOrderSubaccountMessageContents {
1534 id: "bad_order".to_string(),
1535 subaccount_id: "dydx1test/0".to_string(),
1536 client_id: "12345".to_string(),
1537 clob_pair_id: "not_a_number".to_string(), side: OrderSide::Buy,
1539 size: "1.0".to_string(),
1540 price: "50000.0".to_string(),
1541 status: DydxOrderStatus::Open,
1542 order_type: DydxOrderType::Limit,
1543 time_in_force: DydxTimeInForce::Gtt,
1544 post_only: false,
1545 reduce_only: false,
1546 order_flags: "0".to_string(),
1547 good_til_block: Some("1000".to_string()),
1548 good_til_block_time: None,
1549 created_at_height: Some("900".to_string()),
1550 client_metadata: Some("0".to_string()),
1551 trigger_price: None,
1552 total_filled: Some("0.0".to_string()),
1553 updated_at: None,
1554 updated_at_height: None,
1555 };
1556
1557 let instrument_cache = InstrumentCache::new(); let encoder = ClientOrderIdEncoder::new();
1559 let account_id = AccountId::new("DYDX-001");
1560 let ts_init = UnixNanos::default();
1561 let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1562
1563 let result = parse_ws_order_report(
1564 &ws_order,
1565 &instrument_cache,
1566 &order_contexts,
1567 &encoder,
1568 account_id,
1569 ts_init,
1570 );
1571
1572 assert!(result.is_err());
1573 assert!(
1574 result
1575 .unwrap_err()
1576 .to_string()
1577 .contains("Failed to parse clob_pair_id")
1578 );
1579 }
1580
1581 #[rstest]
1582 fn test_parse_ws_position_closed() {
1583 let instrument_cache = create_test_instrument_cache();
1584 let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1585
1586 let ws_position = DydxPerpetualPosition {
1587 market: "BTC-USD".into(),
1588 status: DydxPositionStatus::Closed,
1589 side: DydxPositionSide::Long,
1590 size: "0.0".to_string(), max_size: "2.0".to_string(),
1592 entry_price: "48000.0".to_string(),
1593 exit_price: Some("52000.0".to_string()),
1594 realized_pnl: "2000.0".to_string(),
1595 unrealized_pnl: "0.0".to_string(),
1596 created_at: "2024-01-10T09:00:00Z".to_string(),
1597 closed_at: Some("2024-01-15T14:00:00Z".to_string()),
1598 sum_open: "5.0".to_string(),
1599 sum_close: "5.0".to_string(), net_funding: "-25.5".to_string(),
1601 };
1602
1603 let account_id = AccountId::new("DYDX-001");
1604 let ts_init = UnixNanos::default();
1605
1606 let result = parse_ws_position_report(&ws_position, &instrument_cache, account_id, ts_init);
1607 assert!(result.is_ok());
1608
1609 let position_report = result.unwrap();
1610 assert_eq!(position_report.instrument_id, instrument_id);
1611 assert_eq!(position_report.quantity.as_f64(), 0.0);
1613 }
1614
1615 #[rstest]
1616 fn test_parse_ws_fill_with_maker_rebate() {
1617 let instrument_cache = create_test_instrument_cache();
1618
1619 let ws_fill = DydxWsFillSubaccountMessageContents {
1620 id: "fill_rebate".to_string(),
1621 subaccount_id: "sub1".to_string(),
1622 side: OrderSide::Buy,
1623 liquidity: DydxLiquidity::Maker,
1624 fill_type: DydxFillType::Limit,
1625 market: "BTC-USD".into(),
1626 market_type: Some(DydxTickerType::Perpetual),
1627 price: "50000.0".to_string(),
1628 size: "1.0".to_string(),
1629 fee: "-15.0".to_string(), created_at: "2024-01-15T13:00:00Z".to_string(),
1631 created_at_height: Some("13000".to_string()),
1632 order_id: Some("order_maker".to_string()),
1633 client_metadata: Some("200".to_string()),
1634 };
1635
1636 let account_id = AccountId::new("DYDX-001");
1637 let ts_init = UnixNanos::default();
1638 let order_id_map = DashMap::new();
1639 let order_contexts = DashMap::new();
1640 let encoder = ClientOrderIdEncoder::new();
1641
1642 let result = parse_ws_fill_report(
1643 &ws_fill,
1644 &instrument_cache,
1645 &order_id_map,
1646 &order_contexts,
1647 &encoder,
1648 account_id,
1649 ts_init,
1650 );
1651 assert!(result.is_ok());
1652
1653 let fill_report = result.unwrap();
1654 assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
1655 assert!(fill_report.commission.as_decimal() < dec!(0));
1656 }
1657
1658 #[rstest]
1659 fn test_parse_ws_fill_taker_with_fee() {
1660 let instrument_cache = create_test_instrument_cache();
1661
1662 let ws_fill = DydxWsFillSubaccountMessageContents {
1663 id: "fill_taker".to_string(),
1664 subaccount_id: "sub2".to_string(),
1665 side: OrderSide::Sell,
1666 liquidity: DydxLiquidity::Taker,
1667 fill_type: DydxFillType::Limit,
1668 market: "BTC-USD".into(),
1669 market_type: Some(DydxTickerType::Perpetual),
1670 price: "49800.0".to_string(),
1671 size: "0.75".to_string(),
1672 fee: "18.675".to_string(), created_at: "2024-01-15T14:00:00Z".to_string(),
1674 created_at_height: Some("14000".to_string()),
1675 order_id: Some("order_taker".to_string()),
1676 client_metadata: Some("300".to_string()),
1677 };
1678
1679 let account_id = AccountId::new("DYDX-001");
1680 let ts_init = UnixNanos::default();
1681 let order_id_map = DashMap::new();
1682 let order_contexts = DashMap::new();
1683 let encoder = ClientOrderIdEncoder::new();
1684
1685 let result = parse_ws_fill_report(
1686 &ws_fill,
1687 &instrument_cache,
1688 &order_id_map,
1689 &order_contexts,
1690 &encoder,
1691 account_id,
1692 ts_init,
1693 );
1694 assert!(result.is_ok());
1695
1696 let fill_report = result.unwrap();
1697 assert_eq!(fill_report.liquidity_side, LiquiditySide::Taker);
1698 assert_eq!(fill_report.order_side, OrderSide::Sell);
1699 assert!(fill_report.commission.as_decimal() > dec!(0));
1700 }
1701
1702 #[rstest]
1703 fn test_parse_orderbook_snapshot() {
1704 let json = load_json_fixture("ws_orderbook_subscribed.json");
1705 let contents: DydxOrderbookSnapshotContents =
1706 serde_json::from_value(json["contents"].clone())
1707 .expect("Failed to parse orderbook snapshot contents");
1708
1709 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
1710 let ts_init = UnixNanos::from(1_000_000_000u64);
1711
1712 let deltas = parse_orderbook_snapshot(&instrument_id, &contents, 2, 8, ts_init)
1713 .expect("Failed to parse orderbook snapshot");
1714
1715 assert_eq!(deltas.deltas.len(), 7);
1717
1718 assert_eq!(deltas.deltas[0].action, BookAction::Clear);
1719 assert_eq!(deltas.deltas[1].action, BookAction::Add);
1720 assert_eq!(deltas.deltas[1].order.side, OrderSide::Buy);
1721 assert_eq!(deltas.deltas[1].order.price.to_string(), "43240.00");
1722 assert_eq!(deltas.deltas[1].order.size.to_string(), "1.50000000");
1723
1724 assert_eq!(deltas.deltas[4].action, BookAction::Add);
1725 assert_eq!(deltas.deltas[4].order.side, OrderSide::Sell);
1726 assert_eq!(deltas.deltas[4].order.price.to_string(), "43250.00");
1727 assert_eq!(deltas.deltas[4].order.size.to_string(), "1.20000000");
1728
1729 let last = deltas.deltas.last().unwrap();
1730 assert_ne!(last.flags, 0);
1731 }
1732
1733 #[rstest]
1734 fn test_parse_orderbook_deltas_update() {
1735 let json = load_json_fixture("ws_orderbook_update.json");
1736 let contents: DydxOrderbookContents = serde_json::from_value(json["contents"].clone())
1737 .expect("Failed to parse orderbook update contents");
1738
1739 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
1740 let ts_init = UnixNanos::from(1_000_000_000u64);
1741
1742 let deltas = parse_orderbook_deltas(&instrument_id, &contents, 2, 8, ts_init)
1743 .expect("Failed to parse orderbook deltas");
1744
1745 assert_eq!(deltas.deltas.len(), 4);
1747
1748 assert_eq!(deltas.deltas[0].action, BookAction::Update);
1749 assert_eq!(deltas.deltas[0].order.side, OrderSide::Buy);
1750 assert_eq!(deltas.deltas[0].order.price.to_string(), "43240.00");
1751
1752 assert_eq!(deltas.deltas[2].action, BookAction::Delete);
1754 assert_eq!(deltas.deltas[2].order.side, OrderSide::Sell);
1755 assert_eq!(deltas.deltas[2].order.price.to_string(), "43250.00");
1756
1757 assert_eq!(deltas.deltas[3].action, BookAction::Update);
1758 assert_eq!(deltas.deltas[3].order.side, OrderSide::Sell);
1759 }
1760
1761 #[rstest]
1762 fn test_parse_trade_ticks_ws() {
1763 let json = load_json_fixture("ws_trades_subscribed.json");
1764 let contents: DydxTradeContents = serde_json::from_value(json["contents"].clone())
1765 .expect("Failed to parse trade contents");
1766
1767 let instrument = create_test_instrument();
1768 let instrument_id = instrument.id();
1769 let ts_init = UnixNanos::from(1_000_000_000u64);
1770
1771 let ticks = parse_trade_ticks(instrument_id, &instrument, &contents, ts_init)
1772 .expect("Failed to parse trade ticks");
1773
1774 assert_eq!(ticks.len(), 1);
1775 if let Data::Trade(tick) = &ticks[0] {
1776 assert_eq!(tick.instrument_id, instrument_id);
1777 assert_eq!(tick.price.to_string(), "43250.00");
1778 assert_eq!(tick.size.to_string(), "0.50000000");
1779 assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
1780 assert_eq!(tick.trade_id.to_string(), "trade-001");
1781 } else {
1782 panic!("Expected Trade data");
1783 }
1784 }
1785
1786 #[rstest]
1787 #[case(true)]
1788 #[case(false)]
1789 fn test_parse_candle_bar_timestamp_on_close(#[case] timestamp_on_close: bool) {
1790 let json = load_json_fixture("ws_candles_subscribed.json");
1791 let candles_value = &json["contents"]["candles"];
1792 let candles: Vec<DydxCandle> =
1793 serde_json::from_value(candles_value.clone()).expect("Failed to parse candle array");
1794
1795 let instrument = create_test_instrument();
1796 let bar_type = BarType::from_str("BTC-USD-PERP.DYDX-1-MINUTE-LAST-EXTERNAL")
1797 .expect("Failed to parse bar type");
1798 let ts_init = UnixNanos::from(1_000_000_000u64);
1799
1800 let bar = parse_candle_bar(
1801 bar_type,
1802 &instrument,
1803 &candles[0],
1804 timestamp_on_close,
1805 ts_init,
1806 )
1807 .expect("Failed to parse candle bar");
1808
1809 assert_eq!(bar.bar_type, bar_type);
1810 assert_eq!(bar.open.to_string(), "43100.00");
1811 assert_eq!(bar.high.to_string(), "43500.00");
1812 assert_eq!(bar.low.to_string(), "43000.00");
1813 assert_eq!(bar.close.to_string(), "43400.00");
1814 assert_eq!(bar.volume.to_string(), "12.34500000");
1815
1816 let started_at_ns = 1_704_067_200_000_000_000u64;
1818 let one_min_ns = 60_000_000_000u64;
1819 if timestamp_on_close {
1820 assert_eq!(bar.ts_event.as_u64(), started_at_ns + one_min_ns);
1821 } else {
1822 assert_eq!(bar.ts_event.as_u64(), started_at_ns);
1823 }
1824 }
1825}