1use std::sync::{Arc, Mutex, atomic::AtomicU64};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use dashmap::DashMap;
23use nautilus_common::{
24 messages::{
25 ExecutionEvent,
26 execution::{
27 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
28 GenerateOrderStatusReport, GeneratePositionReports, ModifyOrder, QueryAccount,
29 QueryOrder, SubmitOrder, SubmitOrderList,
30 },
31 },
32 msgbus,
33 runner::get_exec_event_sender,
34 runtime::get_runtime,
35};
36use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
37use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
38use nautilus_live::execution::client::LiveExecutionClient;
39use nautilus_model::{
40 accounts::AccountAny,
41 enums::{OmsType, OrderSide, OrderType, TimeInForce},
42 events::{OrderCancelRejected, OrderEventAny, OrderRejected},
43 identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue},
44 instruments::{Instrument, InstrumentAny},
45 orders::Order,
46 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
47 types::{AccountBalance, MarginBalance},
48};
49use rust_decimal::Decimal;
50use tokio::task::JoinHandle;
51
52use crate::{
53 common::{consts::DYDX_VENUE, credential::DydxCredential},
54 config::DydxAdapterConfig,
55 execution::submitter::OrderSubmitter,
56 grpc::{DydxGrpcClient, OrderBuilder, Wallet},
57 http::client::DydxHttpClient,
58 websocket::client::DydxWebSocketClient,
59};
60
61pub mod submitter;
62
63pub const MAX_CLIENT_ID: u32 = u32::MAX;
65
66enum ExecutionReport {
71 Order(Box<OrderStatusReport>),
72 Fill(Box<FillReport>),
73}
74
75#[derive(Debug)]
90#[allow(dead_code)] pub struct DydxExecutionClient {
92 core: ExecutionClientCore,
93 config: DydxAdapterConfig,
94 http_client: DydxHttpClient,
95 ws_client: DydxWebSocketClient,
96 grpc_client: Arc<tokio::sync::RwLock<DydxGrpcClient>>,
97 wallet: Arc<tokio::sync::RwLock<Option<Wallet>>>,
98 order_builders: DashMap<InstrumentId, OrderBuilder>,
99 instruments: DashMap<InstrumentId, InstrumentAny>,
101 market_to_instrument: DashMap<String, InstrumentId>,
102 clob_pair_id_to_instrument: DashMap<u32, InstrumentId>,
103 block_height: AtomicU64,
104 oracle_prices: DashMap<InstrumentId, Decimal>,
105 client_id_to_int: DashMap<String, u32>,
106 int_to_client_id: DashMap<u32, String>,
107 wallet_address: String,
108 subaccount_number: u32,
109 started: bool,
110 connected: bool,
111 instruments_initialized: bool,
112 ws_stream_handle: Option<JoinHandle<()>>,
113 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
114}
115
116impl DydxExecutionClient {
117 pub fn new(
123 core: ExecutionClientCore,
124 config: DydxAdapterConfig,
125 wallet_address: String,
126 subaccount_number: u32,
127 ) -> anyhow::Result<Self> {
128 let http_client = DydxHttpClient::default();
129
130 let ws_client = if let Some(ref mnemonic) = config.mnemonic {
132 let credential = DydxCredential::from_mnemonic(mnemonic, subaccount_number, vec![])?;
133 DydxWebSocketClient::new_private(
134 config.ws_url.clone(),
135 credential,
136 core.account_id,
137 Some(20),
138 )
139 } else {
140 DydxWebSocketClient::new_public(config.ws_url.clone(), Some(20))
141 };
142
143 let grpc_urls = config.get_grpc_urls();
144 let grpc_client = Arc::new(tokio::sync::RwLock::new(
145 get_runtime()
146 .block_on(async { DydxGrpcClient::new_with_fallback(&grpc_urls).await })
147 .context("failed to construct dYdX gRPC client")?,
148 ));
149
150 Ok(Self {
151 core,
152 config,
153 http_client,
154 ws_client,
155 grpc_client,
156 wallet: Arc::new(tokio::sync::RwLock::new(None)),
157 order_builders: DashMap::new(),
158 instruments: DashMap::new(),
159 market_to_instrument: DashMap::new(),
160 clob_pair_id_to_instrument: DashMap::new(),
161 block_height: AtomicU64::new(0),
162 oracle_prices: DashMap::new(),
163 client_id_to_int: DashMap::new(),
164 int_to_client_id: DashMap::new(),
165 wallet_address,
166 subaccount_number,
167 started: false,
168 connected: false,
169 instruments_initialized: false,
170 ws_stream_handle: None,
171 pending_tasks: Mutex::new(Vec::new()),
172 })
173 }
174
175 #[allow(dead_code)] fn generate_client_order_id_int(&self, client_order_id: &str) -> u32 {
181 if let Ok(id) = client_order_id.parse::<u32>() {
182 self.client_id_to_int
183 .insert(client_order_id.to_string(), id);
184 self.int_to_client_id
185 .insert(id, client_order_id.to_string());
186 return id;
187 }
188
189 let id = rand::random::<u32>();
191 self.client_id_to_int
192 .insert(client_order_id.to_string(), id);
193 self.int_to_client_id
194 .insert(id, client_order_id.to_string());
195 id
196 }
197
198 #[allow(dead_code)] fn get_client_order_id_int(&self, client_order_id: &str) -> Option<u32> {
203 if let Ok(id) = client_order_id.parse::<u32>() {
205 return Some(id);
206 }
207
208 self.client_id_to_int
210 .get(client_order_id)
211 .map(|entry| *entry.value())
212 }
213
214 #[allow(dead_code)] fn get_client_order_id(&self, client_order_id_int: u32) -> String {
219 self.int_to_client_id.get(&client_order_id_int).map_or_else(
220 || client_order_id_int.to_string(),
221 |entry| entry.value().clone(),
222 )
223 }
224
225 fn cache_instruments_from_http(&mut self) {
232 use nautilus_model::instruments::InstrumentAny;
233
234 let instruments: Vec<InstrumentAny> = self
236 .http_client
237 .instruments_cache
238 .iter()
239 .map(|entry| entry.value().clone())
240 .collect();
241
242 tracing::debug!(
243 "Caching {} instruments in execution client",
244 instruments.len()
245 );
246
247 for instrument in instruments {
248 let instrument_id = instrument.id();
249 let symbol = instrument_id.symbol.as_str();
250
251 self.instruments.insert(instrument_id, instrument.clone());
253
254 self.market_to_instrument
256 .insert(symbol.to_string(), instrument_id);
257 }
258
259 let http_mapping = self.http_client.clob_pair_id_mapping();
262 for entry in http_mapping.iter() {
263 self.clob_pair_id_to_instrument
264 .insert(*entry.key(), *entry.value());
265 }
266
267 self.instruments_initialized = true;
268 tracing::info!(
269 "Cached {} instruments ({} CLOB pair IDs) with market mappings",
270 self.instruments.len(),
271 self.clob_pair_id_to_instrument.len()
272 );
273 }
274
275 fn get_instrument_by_market(&self, market: &str) -> Option<InstrumentAny> {
277 self.market_to_instrument
278 .get(market)
279 .and_then(|id| self.instruments.get(&id).map(|entry| entry.value().clone()))
280 }
281
282 fn get_instrument_by_clob_pair_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
284 let instrument = self
285 .clob_pair_id_to_instrument
286 .get(&clob_pair_id)
287 .and_then(|id| self.instruments.get(&id).map(|entry| entry.value().clone()));
288
289 if instrument.is_none() {
290 self.log_missing_instrument_for_clob_pair_id(clob_pair_id);
291 }
292
293 instrument
294 }
295
296 fn log_missing_instrument_for_clob_pair_id(&self, clob_pair_id: u32) {
297 let known: Vec<(u32, String)> = self
298 .clob_pair_id_to_instrument
299 .iter()
300 .filter_map(|entry| {
301 let instrument_id = entry.value();
302 self.instruments.get(instrument_id).map(|inst_entry| {
303 (
304 *entry.key(),
305 inst_entry.value().id().symbol.as_str().to_string(),
306 )
307 })
308 })
309 .collect();
310
311 tracing::warn!(
312 "Instrument for clob_pair_id {} not found in cache. Known CLOB pair IDs and symbols: {:?}",
313 clob_pair_id,
314 known
315 );
316 }
317
318 fn spawn_task<F>(&self, label: &'static str, fut: F)
319 where
320 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
321 {
322 let handle = tokio::spawn(async move {
323 if let Err(e) = fut.await {
324 tracing::error!("{label}: {e:?}");
325 }
326 });
327
328 self.pending_tasks
329 .lock()
330 .expect(MUTEX_POISONED)
331 .push(handle);
332 }
333
334 fn spawn_order_task<F>(
338 &self,
339 label: &'static str,
340 strategy_id: StrategyId,
341 instrument_id: InstrumentId,
342 client_order_id: ClientOrderId,
343 fut: F,
344 ) where
345 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
346 {
347 let trader_id = self.core.trader_id;
349 let account_id = self.core.account_id;
350 let sender = get_exec_event_sender();
351
352 let handle = tokio::spawn(async move {
353 if let Err(e) = fut.await {
354 let error_msg = format!("{label} failed: {e:?}");
355 tracing::error!("{}", error_msg);
356
357 let ts_now = UnixNanos::default(); let event = OrderRejected::new(
360 trader_id,
361 strategy_id,
362 instrument_id,
363 client_order_id,
364 account_id,
365 error_msg.into(),
366 UUID4::new(),
367 ts_now,
368 ts_now,
369 false,
370 false,
371 );
372
373 if let Err(send_err) =
374 sender.send(nautilus_common::messages::ExecutionEvent::Order(
375 OrderEventAny::Rejected(event),
376 ))
377 {
378 tracing::error!("Failed to send OrderRejected event: {send_err}");
379 }
380 }
381 });
382
383 self.pending_tasks
384 .lock()
385 .expect(MUTEX_POISONED)
386 .push(handle);
387 }
388
389 fn abort_pending_tasks(&self) {
390 let mut guard = self.pending_tasks.lock().expect(MUTEX_POISONED);
391 for handle in guard.drain(..) {
392 handle.abort();
393 }
394 }
395}
396impl ExecutionClient for DydxExecutionClient {
397 fn is_connected(&self) -> bool {
398 self.connected
399 }
400
401 fn client_id(&self) -> ClientId {
402 self.core.client_id
403 }
404
405 fn account_id(&self) -> AccountId {
406 self.core.account_id
407 }
408
409 fn venue(&self) -> Venue {
410 *DYDX_VENUE
411 }
412
413 fn oms_type(&self) -> OmsType {
414 self.core.oms_type
415 }
416
417 fn get_account(&self) -> Option<AccountAny> {
418 self.core.get_account()
419 }
420
421 fn generate_account_state(
422 &self,
423 balances: Vec<AccountBalance>,
424 margins: Vec<MarginBalance>,
425 reported: bool,
426 ts_event: UnixNanos,
427 ) -> anyhow::Result<()> {
428 self.core
429 .generate_account_state(balances, margins, reported, ts_event)
430 }
431
432 fn start(&mut self) -> anyhow::Result<()> {
433 if self.started {
434 tracing::warn!("dYdX execution client already started");
435 return Ok(());
436 }
437
438 tracing::info!("Starting dYdX execution client");
439 self.started = true;
440 Ok(())
441 }
442
443 fn stop(&mut self) -> anyhow::Result<()> {
444 if !self.started {
445 tracing::warn!("dYdX execution client not started");
446 return Ok(());
447 }
448
449 tracing::info!("Stopping dYdX execution client");
450 self.abort_pending_tasks();
451 self.started = false;
452 self.connected = false;
453 Ok(())
454 }
455
456 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
465 let order = cmd.order.clone();
466
467 if !self.is_connected() {
469 let reason = "Cannot submit order: execution client not connected";
470 tracing::error!("{}", reason);
471 anyhow::bail!(reason);
472 }
473
474 if order.is_closed() {
476 tracing::warn!("Cannot submit closed order {}", order.client_order_id());
477 return Ok(());
478 }
479
480 match order.order_type() {
482 OrderType::Market | OrderType::Limit => {
483 tracing::debug!(
485 "Submitting {} order: {}",
486 if matches!(order.order_type(), OrderType::Market) {
487 "MARKET"
488 } else {
489 "LIMIT"
490 },
491 order.client_order_id()
492 );
493 }
494 OrderType::StopMarket
496 | OrderType::StopLimit
497 | OrderType::MarketIfTouched
498 | OrderType::LimitIfTouched
499 | OrderType::TrailingStopMarket
500 | OrderType::TrailingStopLimit => {
501 self.core.generate_order_submitted(
502 order.strategy_id(),
503 order.instrument_id(),
504 order.client_order_id(),
505 cmd.ts_init,
506 );
507 tracing::warn!(
508 order_type = ?order.order_type(),
509 client_order_id = %order.client_order_id(),
510 "Conditional order stub: OrderSubmitted generated but not sent to exchange (proto implementation pending)"
511 );
512 return Ok(());
513 }
514 order_type => {
515 let reason = format!("Order type {:?} not supported by dYdX", order_type);
516 tracing::error!("{}", reason);
517 self.core.generate_order_rejected(
518 order.strategy_id(),
519 order.instrument_id(),
520 order.client_order_id(),
521 &reason,
522 cmd.ts_init,
523 false,
524 );
525 return Ok(());
526 }
527 }
528
529 self.core.generate_order_submitted(
531 order.strategy_id(),
532 order.instrument_id(),
533 order.client_order_id(),
534 cmd.ts_init,
535 );
536
537 let grpc_client = self.grpc_client.clone();
538 let wallet = self.wallet.clone();
539 let wallet_address = self.wallet_address.clone();
540 let subaccount_number = self.subaccount_number;
541 let client_order_id = order.client_order_id();
542 let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
543 #[allow(clippy::redundant_clone)]
544 let order_clone = order.clone();
545
546 self.spawn_order_task(
547 "submit_order",
548 order.strategy_id(),
549 order.instrument_id(),
550 order.client_order_id(),
551 async move {
552 let wallet_guard = wallet.read().await;
553 let wallet_ref = wallet_guard
554 .as_ref()
555 .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
556
557 let grpc_guard = grpc_client.read().await;
558 let submitter =
559 OrderSubmitter::new((*grpc_guard).clone(), wallet_address, subaccount_number);
560
561 let client_id_u32 = client_order_id.as_str().parse::<u32>().unwrap_or_else(|_| {
564 use std::{
566 collections::hash_map::DefaultHasher,
567 hash::{Hash, Hasher},
568 };
569 let mut hasher = DefaultHasher::new();
570 client_order_id.as_str().hash(&mut hasher);
571 (hasher.finish() % (MAX_CLIENT_ID as u64)) as u32
572 });
573
574 match order_clone.order_type() {
576 OrderType::Market => {
577 submitter
578 .submit_market_order(
579 wallet_ref,
580 client_id_u32,
581 order_clone.order_side(),
582 order_clone.quantity(),
583 block_height,
584 )
585 .await?;
586 tracing::info!("Successfully submitted market order: {}", client_order_id);
587 }
588 OrderType::Limit => {
589 let expire_time = order_clone
590 .expire_time()
591 .map(|t| (t.as_u64() / 1_000_000_000) as i64);
592 submitter
593 .submit_limit_order(
594 wallet_ref,
595 client_id_u32,
596 order_clone.order_side(),
597 order_clone
598 .price()
599 .ok_or_else(|| anyhow::anyhow!("Limit order missing price"))?,
600 order_clone.quantity(),
601 order_clone.time_in_force(),
602 order_clone.is_post_only(),
603 order_clone.is_reduce_only(),
604 block_height,
605 expire_time,
606 )
607 .await?;
608 tracing::info!("Successfully submitted limit order: {}", client_order_id);
609 }
610 _ => unreachable!("Order type already validated"),
611 }
612
613 Ok(())
614 },
615 );
616
617 Ok(())
618 }
619
620 fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
621 anyhow::bail!("Order lists not supported by dYdX")
622 }
623
624 fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
625 anyhow::bail!("Order modification not supported by dYdX")
626 }
627
628 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
645 if !self.is_connected() {
646 anyhow::bail!("Cannot cancel order: not connected");
647 }
648
649 let client_order_id = cmd.client_order_id;
650
651 let cache = self.core.cache();
653 let cache_borrow = cache.borrow();
654
655 let order = match cache_borrow.order(&client_order_id) {
656 Some(order) => order,
657 None => {
658 tracing::error!(
659 "Cannot cancel order {}: not found in cache",
660 client_order_id
661 );
662 return Ok(()); }
664 };
665
666 if order.is_closed() {
668 tracing::warn!(
669 "CancelOrder command for {} when order already {} (will not send to exchange)",
670 client_order_id,
671 order.status()
672 );
673 return Ok(());
674 }
675
676 let instrument_id = cmd.instrument_id;
678 let instrument = match cache_borrow.instrument(&instrument_id) {
679 Some(instrument) => instrument,
680 None => {
681 tracing::error!(
682 "Cannot cancel order {}: instrument {} not found in cache",
683 client_order_id,
684 instrument_id
685 );
686 return Ok(()); }
688 };
689
690 tracing::debug!(
691 "Cancelling order {} for instrument {}",
692 client_order_id,
693 instrument.id()
694 );
695
696 let grpc_client = self.grpc_client.clone();
697 let wallet = self.wallet.clone();
698 let wallet_address = self.wallet_address.clone();
699 let subaccount_number = self.subaccount_number;
700 let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
701 let trader_id = cmd.trader_id;
702 let strategy_id = cmd.strategy_id;
703 let venue_order_id = cmd.venue_order_id;
704
705 self.spawn_task("cancel_order", async move {
706 let wallet_guard = wallet.read().await;
707 let wallet_ref = wallet_guard
708 .as_ref()
709 .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
710
711 let grpc_guard = grpc_client.read().await;
712 let submitter =
713 OrderSubmitter::new((*grpc_guard).clone(), wallet_address, subaccount_number);
714
715 let client_id_u32 = client_order_id.as_str().parse::<u32>().unwrap_or_else(|_| {
717 use std::{
719 collections::hash_map::DefaultHasher,
720 hash::{Hash, Hasher},
721 };
722 let mut hasher = DefaultHasher::new();
723 client_order_id.as_str().hash(&mut hasher);
724 (hasher.finish() % (MAX_CLIENT_ID as u64)) as u32
725 });
726
727 match submitter
729 .cancel_order(wallet_ref, client_id_u32, block_height)
730 .await
731 {
732 Ok(_) => {
733 tracing::info!("Successfully cancelled order: {}", client_order_id);
734 }
735 Err(e) => {
736 tracing::error!("Failed to cancel order {}: {:?}", client_order_id, e);
737
738 let sender = get_exec_event_sender();
740 let ts_now = UnixNanos::default();
741 let event = OrderCancelRejected::new(
742 trader_id,
743 strategy_id,
744 instrument_id,
745 client_order_id,
746 format!("Cancel order failed: {e:?}").into(),
747 UUID4::new(),
748 ts_now,
749 ts_now,
750 false,
751 Some(venue_order_id),
752 None, );
754 sender
755 .send(ExecutionEvent::Order(OrderEventAny::CancelRejected(event)))
756 .unwrap();
757 }
758 }
759
760 Ok(())
761 });
762
763 Ok(())
764 }
765
766 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
767 if !self.is_connected() {
768 anyhow::bail!("Cannot cancel orders: not connected");
769 }
770
771 let cache = self.core.cache().borrow();
773 let mut open_orders: Vec<_> = cache
774 .orders_open(None, None, None, None)
775 .into_iter()
776 .collect();
777
778 let instrument_id = cmd.instrument_id;
780 open_orders.retain(|order| order.instrument_id() == instrument_id);
781
782 if cmd.order_side != OrderSide::NoOrderSide {
784 let order_side = cmd.order_side;
785 open_orders.retain(|order| order.order_side() == order_side);
786 }
787
788 let mut short_term_orders = Vec::new();
792 let mut long_term_orders = Vec::new();
793
794 for order in &open_orders {
795 match order.time_in_force() {
796 TimeInForce::Ioc | TimeInForce::Fok => short_term_orders.push(order),
797 TimeInForce::Gtc
798 | TimeInForce::Gtd
799 | TimeInForce::Day
800 | TimeInForce::AtTheOpen
801 | TimeInForce::AtTheClose => long_term_orders.push(order),
802 }
803 }
804
805 tracing::info!(
806 "[STUB] Cancel all orders: total={}, short_term={}, long_term={}, instrument_id={}, order_side={:?}",
807 open_orders.len(),
808 short_term_orders.len(),
809 long_term_orders.len(),
810 instrument_id,
811 cmd.order_side
812 );
813
814 Ok(())
822 }
823
824 fn batch_cancel_orders(&self, _cmd: &BatchCancelOrders) -> anyhow::Result<()> {
825 anyhow::bail!("Batch cancel not supported by dYdX")
826 }
827
828 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
829 Ok(())
830 }
831
832 fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
833 Ok(())
834 }
835}
836
837fn dispatch_account_state(state: nautilus_model::events::AccountState) {
842 use std::any::Any;
843 msgbus::send_any("Portfolio.update_account".into(), &state as &dyn Any);
844}
845
846fn dispatch_execution_report(report: ExecutionReport) {
861 let sender = get_exec_event_sender();
862 match report {
863 ExecutionReport::Order(order_report) => {
864 tracing::debug!(
865 "Dispatching order report: status={:?}, venue_order_id={:?}, client_order_id={:?}",
866 order_report.order_status,
867 order_report.venue_order_id,
868 order_report.client_order_id
869 );
870 let exec_report = nautilus_common::messages::ExecutionReport::OrderStatus(order_report);
871 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
872 tracing::warn!("Failed to send order status report: {e}");
873 }
874 }
875 ExecutionReport::Fill(fill_report) => {
876 tracing::debug!(
877 "Dispatching fill report: venue_order_id={}, trade_id={}",
878 fill_report.venue_order_id,
879 fill_report.trade_id
880 );
881 let exec_report = nautilus_common::messages::ExecutionReport::Fill(fill_report);
882 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
883 tracing::warn!("Failed to send fill report: {e}");
884 }
885 }
886 }
887}
888
889#[async_trait(?Send)]
890impl LiveExecutionClient for DydxExecutionClient {
891 fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ExecutionEvent> {
892 get_exec_event_sender()
893 }
894
895 fn get_clock(&self) -> std::cell::Ref<'_, dyn nautilus_common::clock::Clock> {
896 self.core.clock().borrow()
897 }
898
899 async fn connect(&mut self) -> anyhow::Result<()> {
900 if self.connected {
901 tracing::warn!("dYdX execution client already connected");
902 return Ok(());
903 }
904
905 tracing::info!("Connecting to dYdX");
906
907 tracing::debug!("Loading instruments from HTTP API");
910 self.http_client.fetch_and_cache_instruments().await?;
911 tracing::info!(
912 "Loaded {} instruments from HTTP",
913 self.http_client.instruments_cache.len()
914 );
915
916 self.cache_instruments_from_http();
918
919 if let Some(mnemonic) = &self.config.mnemonic {
921 let wallet = Wallet::from_mnemonic(mnemonic)?;
922 *self.wallet.write().await = Some(wallet);
923 tracing::debug!("Wallet initialized");
924 }
925
926 self.ws_client.connect().await?;
928 tracing::debug!("WebSocket connected");
929
930 self.ws_client.subscribe_block_height().await?;
932 tracing::debug!("Subscribed to block height updates");
933
934 self.ws_client.subscribe_markets().await?;
936 tracing::debug!("Subscribed to markets");
937
938 if self.config.mnemonic.is_some() {
940 self.ws_client
941 .subscribe_subaccount(&self.wallet_address, self.subaccount_number)
942 .await?;
943 tracing::debug!(
944 "Subscribed to subaccount updates: {}/{}",
945 self.wallet_address,
946 self.subaccount_number
947 );
948
949 if let Some(mut rx) = self.ws_client.take_receiver() {
952 use crate::websocket::messages::NautilusWsMessage;
953
954 let account_id = self.core.account_id;
956 let instruments = self.instruments.clone();
957 let oracle_prices = self.oracle_prices.clone();
958 let clob_pair_id_to_instrument = self.clob_pair_id_to_instrument.clone();
959
960 let handle = tokio::spawn(async move {
961 while let Some(msg) = rx.recv().await {
962 match msg {
963 NautilusWsMessage::Order(report) => {
964 tracing::debug!("Received order update: {:?}", report.order_status);
965 dispatch_execution_report(ExecutionReport::Order(report));
966 }
967 NautilusWsMessage::Fill(report) => {
968 tracing::debug!("Received fill update");
969 dispatch_execution_report(ExecutionReport::Fill(report));
970 }
971 NautilusWsMessage::Position(report) => {
972 tracing::debug!("Received position update");
973 let sender = get_exec_event_sender();
975 let exec_report =
976 nautilus_common::messages::ExecutionReport::Position(Box::new(
977 *report,
978 ));
979 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
980 tracing::warn!("Failed to send position status report: {e}");
981 }
982 }
983 NautilusWsMessage::AccountState(state) => {
984 tracing::debug!("Received account state update");
985 dispatch_account_state(*state);
986 }
987 NautilusWsMessage::SubaccountSubscribed(msg) => {
988 tracing::debug!(
989 "Parsing subaccount subscription with full context"
990 );
991
992 let inst_map: std::collections::HashMap<_, _> = instruments
994 .iter()
995 .map(|entry| (*entry.key(), entry.value().clone()))
996 .collect();
997
998 let oracle_map: std::collections::HashMap<_, _> = oracle_prices
1000 .iter()
1001 .map(|entry| (*entry.key(), *entry.value()))
1002 .collect();
1003
1004 let ts_init =
1005 nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1006 let ts_event = ts_init;
1007
1008 match crate::http::parse::parse_account_state(
1009 &msg.contents.subaccount,
1010 account_id,
1011 &inst_map,
1012 &oracle_map,
1013 ts_event,
1014 ts_init,
1015 ) {
1016 Ok(account_state) => {
1017 tracing::info!(
1018 "Parsed account state: {} balance(s), {} margin(s)",
1019 account_state.balances.len(),
1020 account_state.margins.len()
1021 );
1022 dispatch_account_state(account_state);
1023 }
1024 Err(e) => {
1025 tracing::error!("Failed to parse account state: {e}");
1026 }
1027 }
1028
1029 if let Some(ref positions) =
1031 msg.contents.subaccount.open_perpetual_positions
1032 {
1033 tracing::debug!(
1034 "Parsing {} position(s) from subscription",
1035 positions.len()
1036 );
1037
1038 for (market, ws_position) in positions.iter() {
1039 match crate::websocket::parse::parse_ws_position_report(
1040 ws_position,
1041 &instruments,
1042 account_id,
1043 ts_init,
1044 ) {
1045 Ok(report) => {
1046 tracing::debug!(
1047 "Parsed position report: {} {} {} {}",
1048 report.instrument_id,
1049 report.position_side,
1050 report.quantity,
1051 market
1052 );
1053 let sender = get_exec_event_sender();
1054 let exec_report =
1055 nautilus_common::messages::ExecutionReport::Position(
1056 Box::new(report),
1057 );
1058 if let Err(e) =
1059 sender.send(ExecutionEvent::Report(exec_report))
1060 {
1061 tracing::warn!(
1062 "Failed to send position status report: {e}"
1063 );
1064 }
1065 }
1066 Err(e) => {
1067 tracing::error!(
1068 "Failed to parse WebSocket position for {}: {e}",
1069 market
1070 );
1071 }
1072 }
1073 }
1074 }
1075 }
1076 NautilusWsMessage::SubaccountsChannelData(data) => {
1077 tracing::debug!("Processing subaccounts channel data");
1078 let ts_init =
1079 nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1080
1081 if let Some(ref orders) = data.contents.orders {
1083 for ws_order in orders {
1084 match crate::websocket::parse::parse_ws_order_report(
1085 ws_order,
1086 &clob_pair_id_to_instrument,
1087 &instruments,
1088 account_id,
1089 ts_init,
1090 ) {
1091 Ok(report) => {
1092 tracing::debug!(
1093 "Parsed order report: {} {} {} @ {}",
1094 report.instrument_id,
1095 report.order_side,
1096 report.order_status,
1097 report.quantity
1098 );
1099 let sender = get_exec_event_sender();
1100 let exec_report =
1101 nautilus_common::messages::ExecutionReport::OrderStatus(
1102 Box::new(report),
1103 );
1104 if let Err(e) =
1105 sender.send(ExecutionEvent::Report(exec_report))
1106 {
1107 tracing::warn!(
1108 "Failed to send order status report: {e}"
1109 );
1110 }
1111 }
1112 Err(e) => {
1113 tracing::error!(
1114 "Failed to parse WebSocket order: {e}"
1115 );
1116 }
1117 }
1118 }
1119 }
1120
1121 if let Some(ref fills) = data.contents.fills {
1123 for ws_fill in fills {
1124 match crate::websocket::parse::parse_ws_fill_report(
1125 ws_fill,
1126 &instruments,
1127 account_id,
1128 ts_init,
1129 ) {
1130 Ok(report) => {
1131 tracing::debug!(
1132 "Parsed fill report: {} {} {} @ {}",
1133 report.instrument_id,
1134 report.venue_order_id,
1135 report.last_qty,
1136 report.last_px
1137 );
1138 let sender = get_exec_event_sender();
1139 let exec_report =
1140 nautilus_common::messages::ExecutionReport::Fill(
1141 Box::new(report),
1142 );
1143 if let Err(e) =
1144 sender.send(ExecutionEvent::Report(exec_report))
1145 {
1146 tracing::warn!(
1147 "Failed to send fill report: {e}"
1148 );
1149 }
1150 }
1151 Err(e) => {
1152 tracing::error!(
1153 "Failed to parse WebSocket fill: {e}"
1154 );
1155 }
1156 }
1157 }
1158 }
1159 }
1160 NautilusWsMessage::OraclePrices(oracle_prices_map) => {
1161 tracing::debug!(
1162 "Processing oracle price updates for {} markets",
1163 oracle_prices_map.len()
1164 );
1165
1166 for (market_symbol, oracle_data) in oracle_prices_map.iter() {
1168 match oracle_data.oracle_price.parse::<rust_decimal::Decimal>()
1170 {
1171 Ok(price) => {
1172 let symbol_with_perp =
1175 format!("{}-PERP", market_symbol);
1176
1177 if let Some(entry) = instruments.iter().find(|entry| {
1179 entry.value().id().symbol.as_str()
1180 == symbol_with_perp
1181 }) {
1182 let instrument_id = *entry.key();
1183 oracle_prices.insert(instrument_id, price);
1184 tracing::trace!(
1185 "Updated oracle price for {}: {}",
1186 instrument_id,
1187 price
1188 );
1189 } else {
1190 tracing::debug!(
1191 "No instrument found for market symbol '{}' (tried '{}')",
1192 market_symbol,
1193 symbol_with_perp
1194 );
1195 }
1196 }
1197 Err(e) => {
1198 tracing::warn!(
1199 "Failed to parse oracle price for {}: {}",
1200 market_symbol,
1201 e
1202 );
1203 }
1204 }
1205 }
1206 }
1207 NautilusWsMessage::Error(err) => {
1208 tracing::error!("WebSocket error: {:?}", err);
1209 }
1210 NautilusWsMessage::Reconnected => {
1211 tracing::info!("WebSocket reconnected");
1212 }
1213 _ => {
1214 }
1216 }
1217 }
1218 tracing::info!("WebSocket message processing task ended");
1219 });
1220
1221 self.ws_stream_handle = Some(handle);
1222 tracing::debug!("Spawned WebSocket message processing task");
1223 }
1224 }
1225 self.connected = true;
1226 tracing::info!(client_id = %self.core.client_id, "Connected");
1227 Ok(())
1228 }
1229
1230 async fn disconnect(&mut self) -> anyhow::Result<()> {
1231 if !self.connected {
1232 tracing::warn!("dYdX execution client not connected");
1233 return Ok(());
1234 }
1235
1236 tracing::info!("Disconnecting from dYdX");
1237
1238 if self.config.mnemonic.is_some() {
1240 let _ = self
1241 .ws_client
1242 .unsubscribe_subaccount(&self.wallet_address, self.subaccount_number)
1243 .await
1244 .map_err(|e| tracing::warn!("Failed to unsubscribe from subaccount: {e}"));
1245 }
1246
1247 let _ = self
1249 .ws_client
1250 .unsubscribe_markets()
1251 .await
1252 .map_err(|e| tracing::warn!("Failed to unsubscribe from markets: {e}"));
1253
1254 let _ = self
1256 .ws_client
1257 .unsubscribe_block_height()
1258 .await
1259 .map_err(|e| tracing::warn!("Failed to unsubscribe from block height: {e}"));
1260
1261 self.ws_client.disconnect().await?;
1263
1264 if let Some(handle) = self.ws_stream_handle.take() {
1266 handle.abort();
1267 tracing::debug!("Aborted WebSocket message processing task");
1268 }
1269
1270 self.abort_pending_tasks();
1272
1273 self.connected = false;
1274 tracing::info!(client_id = %self.core.client_id, "Disconnected");
1275 Ok(())
1276 }
1277
1278 async fn generate_order_status_report(
1279 &self,
1280 cmd: &GenerateOrderStatusReport,
1281 ) -> anyhow::Result<Option<OrderStatusReport>> {
1282 use anyhow::Context;
1283
1284 let response = self
1286 .http_client
1287 .inner
1288 .get_orders(
1289 &self.wallet_address,
1290 self.subaccount_number,
1291 None, Some(1), )
1294 .await
1295 .context("failed to fetch order from dYdX API")?;
1296
1297 if response.orders.is_empty() {
1298 return Ok(None);
1299 }
1300
1301 let order = &response.orders[0];
1302 let ts_init = UnixNanos::default();
1303
1304 let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1306 Some(inst) => inst,
1307 None => return Ok(None),
1308 };
1309
1310 let report = crate::http::parse::parse_order_status_report(
1312 order,
1313 &instrument,
1314 self.core.account_id,
1315 ts_init,
1316 )
1317 .context("failed to parse order status report")?;
1318
1319 if let Some(client_order_id) = cmd.client_order_id
1321 && report.client_order_id != Some(client_order_id)
1322 {
1323 return Ok(None);
1324 }
1325
1326 if let Some(venue_order_id) = cmd.venue_order_id
1328 && report.venue_order_id.as_str() != venue_order_id.as_str()
1329 {
1330 return Ok(None);
1331 }
1332
1333 if let Some(instrument_id) = cmd.instrument_id
1335 && report.instrument_id != instrument_id
1336 {
1337 return Ok(None);
1338 }
1339
1340 Ok(Some(report))
1341 }
1342
1343 async fn generate_order_status_reports(
1344 &self,
1345 cmd: &GenerateOrderStatusReport,
1346 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1347 use anyhow::Context;
1348
1349 let response = self
1351 .http_client
1352 .inner
1353 .get_orders(
1354 &self.wallet_address,
1355 self.subaccount_number,
1356 None, None, )
1359 .await
1360 .context("failed to fetch orders from dYdX API")?;
1361
1362 let mut reports = Vec::new();
1363 let ts_init = UnixNanos::default();
1364
1365 for order in response.orders {
1366 let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1368 Some(inst) => inst,
1369 None => continue,
1370 };
1371
1372 if let Some(filter_id) = cmd.instrument_id
1374 && instrument.id() != filter_id
1375 {
1376 continue;
1377 }
1378
1379 match crate::http::parse::parse_order_status_report(
1381 &order,
1382 &instrument,
1383 self.core.account_id,
1384 ts_init,
1385 ) {
1386 Ok(report) => {
1387 if let Some(client_order_id) = cmd.client_order_id
1389 && report.client_order_id != Some(client_order_id)
1390 {
1391 continue;
1392 }
1393
1394 if let Some(venue_order_id) = cmd.venue_order_id
1396 && report.venue_order_id.as_str() != venue_order_id.as_str()
1397 {
1398 continue;
1399 }
1400
1401 reports.push(report);
1402 }
1403 Err(e) => tracing::error!("Failed to parse order status report: {e}"),
1404 }
1405 }
1406
1407 tracing::info!("Generated {} order status reports", reports.len());
1408 Ok(reports)
1409 }
1410
1411 async fn generate_fill_reports(
1412 &self,
1413 cmd: GenerateFillReports,
1414 ) -> anyhow::Result<Vec<FillReport>> {
1415 use anyhow::Context;
1416
1417 let response = self
1419 .http_client
1420 .inner
1421 .get_fills(
1422 &self.wallet_address,
1423 self.subaccount_number,
1424 None, None, )
1427 .await
1428 .context("failed to fetch fills from dYdX API")?;
1429
1430 let mut reports = Vec::new();
1431 let ts_init = UnixNanos::default();
1432
1433 for fill in response.fills {
1434 let instrument = match self.get_instrument_by_market(&fill.market) {
1436 Some(inst) => inst,
1437 None => {
1438 tracing::warn!(
1439 "Instrument for market {} not found in cache, skipping fill {}",
1440 fill.market,
1441 fill.id
1442 );
1443 continue;
1444 }
1445 };
1446
1447 if let Some(filter_id) = cmd.instrument_id
1449 && instrument.id() != filter_id
1450 {
1451 continue;
1452 }
1453
1454 match crate::http::parse::parse_fill_report(
1456 &fill,
1457 &instrument,
1458 self.core.account_id,
1459 ts_init,
1460 ) {
1461 Ok(report) => {
1462 if let Some(venue_order_id) = cmd.venue_order_id
1464 && report.venue_order_id.as_str() != venue_order_id.as_str()
1465 {
1466 continue;
1467 }
1468
1469 if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
1471 if report.ts_event >= start && report.ts_event <= end {
1472 reports.push(report);
1473 }
1474 } else if let Some(start) = cmd.start {
1475 if report.ts_event >= start {
1476 reports.push(report);
1477 }
1478 } else if let Some(end) = cmd.end {
1479 if report.ts_event <= end {
1480 reports.push(report);
1481 }
1482 } else {
1483 reports.push(report);
1484 }
1485 }
1486 Err(e) => tracing::error!("Failed to parse fill report: {e}"),
1487 }
1488 }
1489
1490 tracing::info!("Generated {} fill reports", reports.len());
1491 Ok(reports)
1492 }
1493
1494 async fn generate_position_status_reports(
1495 &self,
1496 cmd: &GeneratePositionReports,
1497 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1498 use anyhow::Context;
1499
1500 let response = self
1502 .http_client
1503 .inner
1504 .get_subaccount(&self.wallet_address, self.subaccount_number)
1505 .await
1506 .context("failed to fetch subaccount from dYdX API")?;
1507
1508 let mut reports = Vec::new();
1509 let ts_init = UnixNanos::default();
1510
1511 for (market_ticker, position) in &response.subaccount.open_perpetual_positions {
1513 let instrument = match self.get_instrument_by_market(market_ticker) {
1515 Some(inst) => inst,
1516 None => {
1517 tracing::warn!(
1518 "Instrument for market {} not found in cache, skipping position",
1519 market_ticker
1520 );
1521 continue;
1522 }
1523 };
1524
1525 if let Some(filter_id) = cmd.instrument_id
1527 && instrument.id() != filter_id
1528 {
1529 continue;
1530 }
1531
1532 match crate::http::parse::parse_position_status_report(
1534 position,
1535 &instrument,
1536 self.core.account_id,
1537 ts_init,
1538 ) {
1539 Ok(report) => reports.push(report),
1540 Err(e) => {
1541 tracing::error!("Failed to parse position status report: {e}");
1542 }
1543 }
1544 }
1545
1546 tracing::info!("Generated {} position status reports", reports.len());
1547 Ok(reports)
1548 }
1549
1550 async fn generate_mass_status(
1551 &self,
1552 lookback_mins: Option<u64>,
1553 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1554 use anyhow::Context;
1555
1556 tracing::info!(
1557 "Generating mass execution status{}",
1558 lookback_mins.map_or_else(
1559 || " (unbounded)".to_string(),
1560 |mins| format!(" (lookback: {} minutes)", mins)
1561 )
1562 );
1563
1564 let cutoff_time =
1566 lookback_mins.map(|mins| chrono::Utc::now() - chrono::Duration::minutes(mins as i64));
1567
1568 let orders_response = self
1570 .http_client
1571 .inner
1572 .get_orders(&self.wallet_address, self.subaccount_number, None, None)
1573 .await
1574 .context("failed to fetch orders for mass status")?;
1575
1576 let subaccount_response = self
1578 .http_client
1579 .inner
1580 .get_subaccount(&self.wallet_address, self.subaccount_number)
1581 .await
1582 .context("failed to fetch subaccount for mass status")?;
1583
1584 let fills_response = self
1586 .http_client
1587 .inner
1588 .get_fills(&self.wallet_address, self.subaccount_number, None, None)
1589 .await
1590 .context("failed to fetch fills for mass status")?;
1591
1592 let ts_init = UnixNanos::default();
1593 let mut order_reports = Vec::new();
1594 let mut position_reports = Vec::new();
1595 let mut fill_reports = Vec::new();
1596
1597 let mut orders_filtered = 0;
1599 let mut fills_filtered = 0;
1600
1601 for order in orders_response.orders {
1603 if let Some(cutoff) = cutoff_time
1605 && order.updated_at < cutoff
1606 {
1607 orders_filtered += 1;
1608 continue;
1609 }
1610
1611 if let Some(instrument) = self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1612 match crate::http::parse::parse_order_status_report(
1613 &order,
1614 &instrument,
1615 self.core.account_id,
1616 ts_init,
1617 ) {
1618 Ok(report) => order_reports.push(report),
1619 Err(e) => tracing::error!("Failed to parse order in mass status: {e}"),
1620 }
1621 }
1622 }
1623
1624 for (market_ticker, position) in &subaccount_response.subaccount.open_perpetual_positions {
1626 if let Some(instrument) = self.get_instrument_by_market(market_ticker) {
1627 match crate::http::parse::parse_position_status_report(
1628 position,
1629 &instrument,
1630 self.core.account_id,
1631 ts_init,
1632 ) {
1633 Ok(report) => position_reports.push(report),
1634 Err(e) => tracing::error!("Failed to parse position in mass status: {e}"),
1635 }
1636 }
1637 }
1638
1639 for fill in fills_response.fills {
1641 if let Some(cutoff) = cutoff_time
1643 && fill.created_at < cutoff
1644 {
1645 fills_filtered += 1;
1646 continue;
1647 }
1648
1649 if let Some(instrument) = self.get_instrument_by_market(&fill.market) {
1650 match crate::http::parse::parse_fill_report(
1651 &fill,
1652 &instrument,
1653 self.core.account_id,
1654 ts_init,
1655 ) {
1656 Ok(report) => fill_reports.push(report),
1657 Err(e) => tracing::error!("Failed to parse fill in mass status: {e}"),
1658 }
1659 }
1660 }
1661
1662 if cutoff_time.is_some() {
1663 tracing::info!(
1664 "Generated mass status: {} orders ({} filtered), {} positions, {} fills ({} filtered)",
1665 order_reports.len(),
1666 orders_filtered,
1667 position_reports.len(),
1668 fill_reports.len(),
1669 fills_filtered
1670 );
1671 } else {
1672 tracing::info!(
1673 "Generated mass status: {} orders, {} positions, {} fills",
1674 order_reports.len(),
1675 position_reports.len(),
1676 fill_reports.len()
1677 );
1678 }
1679
1680 let mut mass_status = ExecutionMassStatus::new(
1682 self.core.client_id,
1683 self.core.account_id,
1684 self.core.venue,
1685 ts_init,
1686 None, );
1688
1689 mass_status.add_order_reports(order_reports);
1690 mass_status.add_position_reports(position_reports);
1691 mass_status.add_fill_reports(fill_reports);
1692
1693 Ok(Some(mass_status))
1694 }
1695}
1696
1697#[cfg(test)]
1698mod tests {
1699 use nautilus_model::{
1700 enums::{OrderSide, OrderType, TimeInForce},
1701 events::order::initialized::OrderInitializedBuilder,
1702 identifiers::{ClientOrderId, InstrumentId, StrategyId, TraderId},
1703 orders::OrderAny,
1704 types::{Price, Quantity},
1705 };
1706 use rstest::rstest;
1707
1708 use super::*;
1709
1710 #[rstest]
1712 fn test_client_order_id_numeric_parsing() {
1713 let client_id = "12345";
1714 let result: Result<u32, _> = client_id.parse();
1715 assert!(result.is_ok());
1716 assert_eq!(result.unwrap(), 12345);
1717 }
1718
1719 #[rstest]
1721 fn test_client_order_id_hash_fallback() {
1722 use std::{
1723 collections::hash_map::DefaultHasher,
1724 hash::{Hash, Hasher},
1725 };
1726
1727 let client_id = "O-20241112-ABC123-001";
1728 let parse_result: Result<u32, _> = client_id.parse();
1729 assert!(parse_result.is_err());
1730
1731 let mut hasher = DefaultHasher::new();
1733 client_id.hash(&mut hasher);
1734 let hash_result = (hasher.finish() % (MAX_CLIENT_ID as u64)) as u32;
1735
1736 assert!(hash_result < MAX_CLIENT_ID);
1737 assert!(hash_result > 0); }
1739
1740 #[rstest]
1742 fn test_unsupported_order_type_rejection() {
1743 let order_type = OrderType::StopMarket;
1745 let is_supported = matches!(order_type, OrderType::Market | OrderType::Limit);
1746 assert!(!is_supported);
1747
1748 let order_type = OrderType::StopLimit;
1750 let is_supported = matches!(order_type, OrderType::Market | OrderType::Limit);
1751 assert!(!is_supported);
1752 }
1753
1754 #[rstest]
1756 fn test_supported_order_types() {
1757 let market = OrderType::Market;
1758 assert!(matches!(market, OrderType::Market | OrderType::Limit));
1759
1760 let limit = OrderType::Limit;
1761 assert!(matches!(limit, OrderType::Market | OrderType::Limit));
1762 }
1763
1764 #[rstest]
1766 fn test_unix_nanos_to_seconds_conversion() {
1767 use nautilus_core::UnixNanos;
1768
1769 let one_second = UnixNanos::from(1_000_000_000_u64);
1771 let seconds = (one_second.as_u64() / 1_000_000_000) as i64;
1772 assert_eq!(seconds, 1);
1773
1774 let one_hour = UnixNanos::from(3_600_000_000_000_u64);
1776 let seconds = (one_hour.as_u64() / 1_000_000_000) as i64;
1777 assert_eq!(seconds, 3600);
1778
1779 let now = UnixNanos::from(1_731_398_400_000_000_000_u64); let seconds = (now.as_u64() / 1_000_000_000) as i64;
1782 assert_eq!(seconds, 1_731_398_400);
1783 }
1784
1785 #[rstest]
1787 fn test_order_any_api_usage() {
1788 let order = OrderInitializedBuilder::default()
1789 .trader_id(TraderId::from("TRADER-001"))
1790 .strategy_id(StrategyId::from("STRATEGY-001"))
1791 .instrument_id(InstrumentId::from("ETH-USD-PERP.DYDX"))
1792 .client_order_id(ClientOrderId::from("O-001"))
1793 .order_side(OrderSide::Buy)
1794 .order_type(OrderType::Limit)
1795 .quantity(Quantity::from("10"))
1796 .price(Some(Price::from("2000.50")))
1797 .time_in_force(TimeInForce::Gtc)
1798 .build()
1799 .unwrap();
1800
1801 let order_any: OrderAny = order.into();
1802
1803 assert_eq!(order_any.order_side(), OrderSide::Buy);
1805 assert_eq!(order_any.order_type(), OrderType::Limit);
1806 assert_eq!(order_any.quantity(), Quantity::from("10"));
1807 assert_eq!(order_any.price(), Some(Price::from("2000.50")));
1808 assert_eq!(order_any.time_in_force(), TimeInForce::Gtc);
1809 assert!(!order_any.is_post_only());
1810 assert!(!order_any.is_reduce_only());
1811 assert_eq!(order_any.expire_time(), None);
1812 }
1813
1814 #[rstest]
1816 fn test_max_client_id_limit() {
1817 assert_eq!(MAX_CLIENT_ID, u32::MAX);
1819 }
1820
1821 #[rstest]
1823 fn test_cancel_order_id_consistency() {
1824 use std::{
1825 collections::hash_map::DefaultHasher,
1826 hash::{Hash, Hasher},
1827 };
1828
1829 let client_id_str = "O-20241112-CANCEL-001";
1830
1831 let mut hasher1 = DefaultHasher::new();
1833 client_id_str.hash(&mut hasher1);
1834 let id1 = (hasher1.finish() % (MAX_CLIENT_ID as u64)) as u32;
1835
1836 let mut hasher2 = DefaultHasher::new();
1838 client_id_str.hash(&mut hasher2);
1839 let id2 = (hasher2.finish() % (MAX_CLIENT_ID as u64)) as u32;
1840
1841 assert_eq!(id1, id2, "Client ID conversion must be deterministic");
1842 }
1843
1844 #[rstest]
1846 fn test_clob_pair_id_extraction_from_raw_symbol() {
1847 let raw_symbol = "1";
1849 let result: Result<u32, _> = raw_symbol.parse();
1850 assert!(result.is_ok());
1851 assert_eq!(result.unwrap(), 1);
1852
1853 let raw_symbol = "42";
1855 let result: Result<u32, _> = raw_symbol.parse();
1856 assert!(result.is_ok());
1857 assert_eq!(result.unwrap(), 42);
1858 }
1859
1860 #[rstest]
1862 fn test_clob_pair_id_extraction_invalid() {
1863 let raw_symbol = "BTC-USD";
1865 let result: Result<u32, _> = raw_symbol.parse();
1866 assert!(result.is_err());
1867
1868 let raw_symbol = "";
1870 let result: Result<u32, _> = raw_symbol.parse();
1871 assert!(result.is_err());
1872 }
1873
1874 #[rstest]
1876 fn test_market_ticker_parsing() {
1877 let ticker = "BTC-USD";
1878 let parts: Vec<&str> = ticker.split('-').collect();
1879 assert_eq!(parts.len(), 2);
1880 assert_eq!(parts[0], "BTC");
1881 assert_eq!(parts[1], "USD");
1882
1883 let ticker = "ETH-USD";
1884 let parts: Vec<&str> = ticker.split('-').collect();
1885 assert_eq!(parts.len(), 2);
1886 assert_eq!(parts[0], "ETH");
1887 assert_eq!(parts[1], "USD");
1888 }
1889
1890 #[rstest]
1892 fn test_market_ticker_invalid_format() {
1893 let ticker = "BTCUSD";
1894 let parts: Vec<&str> = ticker.split('-').collect();
1895 assert_eq!(parts.len(), 1); let ticker = "BTC-USD-PERP";
1898 let parts: Vec<&str> = ticker.split('-').collect();
1899 assert_eq!(parts.len(), 3); }
1901}