Skip to main content

nautilus_dydx/execution/
submitter.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Order submission facade for dYdX v4.
17//!
18//! This module provides [`OrderSubmitter`], a unified facade for submitting orders to dYdX.
19//! It internally uses the extracted components:
20//! - [`TransactionManager`]: Sequence tracking and transaction signing
21//! - [`TxBroadcaster`]: gRPC broadcast with retry logic
22//! - [`OrderMessageBuilder`]: Proto message construction
23//!
24//! The wallet is owned internally by `TransactionManager`, so method signatures
25//! don't require passing `&wallet` on each call.
26
27use std::sync::Arc;
28
29use nautilus_common::live::get_runtime;
30use nautilus_model::{
31    enums::{OrderSide, TimeInForce},
32    identifiers::InstrumentId,
33    types::{Price, Quantity},
34};
35
36use crate::{
37    error::DydxError,
38    execution::{
39        block_time::BlockTimeMonitor,
40        broadcaster::TxBroadcaster,
41        order_builder::OrderMessageBuilder,
42        tx_manager::TransactionManager,
43        types::{ConditionalOrderType, LimitOrderParams, OrderLifetime},
44    },
45    grpc::{DydxGrpcClient, types::ChainId},
46    http::client::DydxHttpClient,
47};
48
49/// Order submission facade for dYdX v4.
50///
51/// Provides a clean API for order submission, internally coordinating:
52/// - [`TransactionManager`]: Owns wallet, handles sequence + signing
53/// - [`TxBroadcaster`]: Handles gRPC broadcast with retry
54/// - [`OrderMessageBuilder`]: Constructs proto messages
55///
56/// # Wallet Ownership
57///
58/// The wallet is owned by `TransactionManager` (passed at construction via `private_key`).
59/// This eliminates the need to pass `&wallet` to every method.
60///
61/// # Block Time Monitor
62///
63/// `block_time_monitor` provides current block height and dynamic block time estimation.
64/// Updated externally by WebSocket, read by order methods.
65///
66/// # Thread Safety
67///
68/// All methods are safe to call from multiple tasks concurrently.
69#[derive(Debug)]
70pub struct OrderSubmitter {
71    /// Transaction manager - owns wallet, handles sequence and signing.
72    tx_manager: Arc<TransactionManager>,
73    /// Transaction broadcaster with retry logic.
74    broadcaster: Arc<TxBroadcaster>,
75    /// Order message builder for proto construction.
76    order_builder: Arc<OrderMessageBuilder>,
77    /// Block time monitor - provides current height and block time estimation.
78    block_time_monitor: Arc<BlockTimeMonitor>,
79}
80
81impl OrderSubmitter {
82    /// Creates a new order submitter with wallet owned internally.
83    ///
84    /// # Arguments
85    ///
86    /// * `grpc_client` - gRPC client for chain queries and broadcasting
87    /// * `http_client` - HTTP client (provides market params cache)
88    /// * `private_key` - Private key (hex-encoded) - wallet created internally
89    /// * `wallet_address` - Main account address (may differ from derived address for permissioned keys)
90    /// * `subaccount_number` - dYdX subaccount number (typically 0)
91    /// * `chain_id` - dYdX chain ID
92    /// * `block_time_monitor` - Block time monitor (provides current height and dynamic block time)
93    ///
94    /// # Errors
95    ///
96    /// Returns error if wallet creation from private key fails.
97    pub fn new(
98        grpc_client: DydxGrpcClient,
99        http_client: DydxHttpClient,
100        private_key: &str,
101        wallet_address: String,
102        subaccount_number: u32,
103        chain_id: ChainId,
104        block_time_monitor: Arc<BlockTimeMonitor>,
105    ) -> Result<Self, DydxError> {
106        // Create transaction manager (owns wallet and sequence management)
107        let tx_manager = Arc::new(TransactionManager::new(
108            grpc_client.clone(),
109            private_key,
110            wallet_address.clone(),
111            chain_id,
112        )?);
113
114        let broadcaster = Arc::new(TxBroadcaster::new(grpc_client));
115
116        let order_builder = Arc::new(OrderMessageBuilder::new(
117            http_client,
118            wallet_address,
119            subaccount_number,
120            block_time_monitor.clone(),
121        ));
122
123        Ok(Self {
124            tx_manager,
125            broadcaster,
126            order_builder,
127            block_time_monitor,
128        })
129    }
130
131    /// Creates a new order submitter from pre-built components.
132    ///
133    /// Use this when you already have initialized components (e.g., from `DydxExecutionClient`).
134    pub fn from_components(
135        tx_manager: Arc<TransactionManager>,
136        broadcaster: Arc<TxBroadcaster>,
137        order_builder: Arc<OrderMessageBuilder>,
138        block_time_monitor: Arc<BlockTimeMonitor>,
139    ) -> Self {
140        Self {
141            tx_manager,
142            broadcaster,
143            order_builder,
144            block_time_monitor,
145        }
146    }
147
148    /// Returns the current block height.
149    #[must_use]
150    pub fn current_block_height(&self) -> u32 {
151        self.block_time_monitor.current_block_height() as u32
152    }
153
154    /// Returns a reference to the block time monitor.
155    #[must_use]
156    pub fn block_time_monitor(&self) -> &BlockTimeMonitor {
157        &self.block_time_monitor
158    }
159
160    /// Returns the wallet address.
161    #[must_use]
162    pub fn wallet_address(&self) -> &str {
163        self.tx_manager.wallet_address()
164    }
165
166    /// Returns a reference to the order builder.
167    #[must_use]
168    pub fn order_builder(&self) -> &OrderMessageBuilder {
169        &self.order_builder
170    }
171
172    /// Returns a reference to the transaction manager.
173    #[must_use]
174    pub fn tx_manager(&self) -> &TransactionManager {
175        &self.tx_manager
176    }
177
178    /// Submits a market order to dYdX via gRPC.
179    ///
180    /// Market orders execute immediately at the best available price.
181    /// Block height is read from the shared `block_height` state.
182    ///
183    /// # Returns
184    ///
185    /// The transaction hash on success.
186    ///
187    /// # Errors
188    ///
189    /// Returns `DydxError` if gRPC submission fails.
190    pub async fn submit_market_order(
191        &self,
192        instrument_id: InstrumentId,
193        client_order_id: u32,
194        client_metadata: u32,
195        side: OrderSide,
196        quantity: Quantity,
197    ) -> Result<String, DydxError> {
198        log::info!(
199            "Submitting market order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, quantity={quantity}"
200        );
201
202        let block_height = self.current_block_height();
203
204        // Build proto message
205        let msg = self.order_builder.build_market_order(
206            instrument_id,
207            client_order_id,
208            client_metadata,
209            side,
210            quantity,
211            block_height,
212        )?;
213
214        // Market orders are always short-term — use cached sequence (no increment)
215        let operation = format!("Submit market order {client_order_id}");
216        let tx_hash = self
217            .broadcaster
218            .broadcast_short_term(&self.tx_manager, vec![msg], &operation)
219            .await?;
220
221        Ok(tx_hash)
222    }
223
224    /// Submits a limit order to dYdX via gRPC.
225    ///
226    /// Limit orders execute only at the specified price or better.
227    /// Block height is read from the shared `block_height` state.
228    ///
229    /// # Returns
230    ///
231    /// The transaction hash on success.
232    ///
233    /// # Errors
234    ///
235    /// Returns `DydxError` if gRPC submission fails.
236    #[allow(clippy::too_many_arguments)]
237    pub async fn submit_limit_order(
238        &self,
239        instrument_id: InstrumentId,
240        client_order_id: u32,
241        client_metadata: u32,
242        side: OrderSide,
243        price: Price,
244        quantity: Quantity,
245        time_in_force: TimeInForce,
246        post_only: bool,
247        reduce_only: bool,
248        expire_time: Option<i64>,
249    ) -> Result<String, DydxError> {
250        log::info!(
251            "Submitting limit order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, price={price}, \
252             quantity={quantity}, tif={time_in_force:?}, post_only={post_only}, reduce_only={reduce_only}"
253        );
254
255        let block_height = self.current_block_height();
256
257        // Build proto message
258        let msg = self.order_builder.build_limit_order(
259            instrument_id,
260            client_order_id,
261            client_metadata,
262            side,
263            price,
264            quantity,
265            time_in_force,
266            post_only,
267            reduce_only,
268            block_height,
269            expire_time,
270        )?;
271
272        // Determine if short-term based on time_in_force and expire_time
273        let is_short_term = OrderLifetime::from_time_in_force(
274            time_in_force,
275            expire_time,
276            false,
277            self.order_builder.max_short_term_secs(),
278        )
279        .is_short_term();
280
281        // Short-term: cached sequence, no retry. Stateful: proper sequence management.
282        let operation = format!("Submit limit order {client_order_id}");
283        let tx_hash = if is_short_term {
284            self.broadcaster
285                .broadcast_short_term(&self.tx_manager, vec![msg], &operation)
286                .await?
287        } else {
288            self.broadcaster
289                .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
290                .await?
291        };
292
293        Ok(tx_hash)
294    }
295
296    /// Submits a batch of limit orders.
297    ///
298    /// # Protocol Constraints
299    ///
300    /// - **Short-term orders cannot be batched**: If any order is short-term (IOC, FOK, or
301    ///   expire_time within 60s), each order is submitted in a separate transaction.
302    /// - **Long-term orders can be batched**: All orders in a single transaction.
303    ///
304    /// # Returns
305    ///
306    /// A vector of transaction hashes (one per transaction).
307    ///
308    /// # Errors
309    ///
310    /// Returns `DydxError` if any submission fails.
311    pub async fn submit_limit_orders_batch(
312        &self,
313        orders: Vec<LimitOrderParams>,
314    ) -> Result<Vec<String>, DydxError> {
315        if orders.is_empty() {
316            return Ok(Vec::new());
317        }
318
319        let block_height = self.current_block_height();
320
321        // Check if any orders are short-term (cannot be batched)
322        let has_short_term = orders
323            .iter()
324            .any(|params| self.order_builder.is_short_term_order(params));
325
326        if has_short_term {
327            // Short-term orders must be submitted individually.
328            // They don't consume Cosmos SDK sequences (GTB replay protection),
329            // so we use broadcast_short_term for concurrent submission.
330            log::debug!(
331                "Submitting {} short-term limit orders concurrently (sequence not consumed)",
332                orders.len()
333            );
334
335            let mut tx_hashes = Vec::with_capacity(orders.len());
336            let mut handles = Vec::with_capacity(orders.len());
337
338            for params in orders {
339                let tx_manager = Arc::clone(&self.tx_manager);
340                let broadcaster = Arc::clone(&self.broadcaster);
341                let order_builder = Arc::clone(&self.order_builder);
342
343                let handle = get_runtime().spawn(async move {
344                    let msg = order_builder.build_limit_order_from_params(&params, block_height)?;
345                    let operation = format!("Submit short-term order {}", params.client_order_id);
346                    broadcaster
347                        .broadcast_short_term(&tx_manager, vec![msg], &operation)
348                        .await
349                });
350
351                handles.push(handle);
352            }
353
354            // Collect results
355            for handle in handles {
356                match handle.await {
357                    Ok(Ok(tx_hash)) => tx_hashes.push(tx_hash),
358                    Ok(Err(e)) => return Err(e),
359                    Err(e) => {
360                        return Err(DydxError::Nautilus(anyhow::anyhow!("Task join error: {e}")));
361                    }
362                }
363            }
364
365            Ok(tx_hashes)
366        } else {
367            // Long-term orders can be batched in a single transaction
368            log::info!(
369                "Batch submitting {} long-term limit orders in single transaction",
370                orders.len()
371            );
372
373            let msgs = self
374                .order_builder
375                .build_limit_orders_batch(&orders, block_height)?;
376
377            let operation = format!("Submit batch of {} limit orders", msgs.len());
378            let tx_hash = self
379                .broadcaster
380                .broadcast_with_retry(&self.tx_manager, msgs, &operation)
381                .await?;
382
383            Ok(vec![tx_hash])
384        }
385    }
386
387    /// Cancels an order on dYdX via gRPC.
388    ///
389    /// Block height is read from the shared `block_height` state.
390    ///
391    /// # Returns
392    ///
393    /// The transaction hash on success.
394    ///
395    /// # Errors
396    ///
397    /// Returns `DydxError` if gRPC cancellation fails or market params not found.
398    pub async fn cancel_order(
399        &self,
400        instrument_id: InstrumentId,
401        client_order_id: u32,
402        time_in_force: TimeInForce,
403        expire_time_ns: Option<nautilus_core::UnixNanos>,
404    ) -> Result<String, DydxError> {
405        log::info!("Cancelling order: client_id={client_order_id}, instrument={instrument_id}");
406
407        let block_height = self.current_block_height();
408
409        // Build cancel message
410        let msg = self.order_builder.build_cancel_order(
411            instrument_id,
412            client_order_id,
413            time_in_force,
414            expire_time_ns,
415            block_height,
416        )?;
417
418        // Determine if this is a short-term cancel
419        let is_short_term = self
420            .order_builder
421            .is_short_term_cancel(time_in_force, expire_time_ns);
422
423        // Short-term: cached sequence, no retry. Stateful: proper sequence management.
424        let operation = format!("Cancel order {client_order_id}");
425        let tx_hash = if is_short_term {
426            self.broadcaster
427                .broadcast_short_term(&self.tx_manager, vec![msg], &operation)
428                .await?
429        } else {
430            self.broadcaster
431                .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
432                .await?
433        };
434
435        Ok(tx_hash)
436    }
437
438    /// Cancels multiple orders in a single blockchain transaction.
439    ///
440    /// Batches all cancellation messages into one transaction for efficiency.
441    ///
442    /// # Arguments
443    ///
444    /// * `orders` - Slice of (instrument_id, client_order_id, time_in_force, expire_time_ns) tuples
445    ///
446    /// # Returns
447    ///
448    /// The transaction hash on success.
449    ///
450    /// # Errors
451    ///
452    /// Returns `DydxError` if transaction broadcast fails or market params not found.
453    pub async fn cancel_orders_batch(
454        &self,
455        orders: &[(
456            InstrumentId,
457            u32,
458            TimeInForce,
459            Option<nautilus_core::UnixNanos>,
460        )],
461    ) -> Result<String, DydxError> {
462        if orders.is_empty() {
463            return Err(DydxError::Order("No orders to cancel".to_string()));
464        }
465
466        log::info!(
467            "Batch cancelling {} orders in single transaction",
468            orders.len()
469        );
470
471        let block_height = self.current_block_height();
472
473        // Build all cancel messages
474        let msgs = self
475            .order_builder
476            .build_cancel_orders_batch(orders, block_height)?;
477
478        // Broadcast with retry
479        let operation = format!("Cancel batch of {} orders", msgs.len());
480        let tx_hash = self
481            .broadcaster
482            .broadcast_with_retry(&self.tx_manager, msgs, &operation)
483            .await?;
484
485        Ok(tx_hash)
486    }
487
488    /// Submits a stop market order to dYdX via gRPC.
489    ///
490    /// Stop market orders are triggered when the price reaches `trigger_price`.
491    ///
492    /// # Returns
493    ///
494    /// The transaction hash on success.
495    ///
496    /// # Errors
497    ///
498    /// Returns `DydxError` if gRPC submission fails.
499    #[allow(clippy::too_many_arguments)]
500    pub async fn submit_stop_market_order(
501        &self,
502        instrument_id: InstrumentId,
503        client_order_id: u32,
504        client_metadata: u32,
505        side: OrderSide,
506        trigger_price: Price,
507        quantity: Quantity,
508        reduce_only: bool,
509        expire_time: Option<i64>,
510    ) -> Result<String, DydxError> {
511        log::info!(
512            "Submitting stop market order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, \
513             trigger={trigger_price}, qty={quantity}"
514        );
515
516        // Build proto message
517        let msg = self.order_builder.build_stop_market_order(
518            instrument_id,
519            client_order_id,
520            client_metadata,
521            side,
522            trigger_price,
523            quantity,
524            reduce_only,
525            expire_time,
526        )?;
527
528        // Broadcast with retry
529        let operation = format!("Submit stop market order {client_order_id}");
530        let tx_hash = self
531            .broadcaster
532            .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
533            .await?;
534
535        Ok(tx_hash)
536    }
537
538    /// Submits a stop limit order to dYdX via gRPC.
539    ///
540    /// Stop limit orders are triggered when the price reaches `trigger_price`,
541    /// then placed as a limit order at `limit_price`.
542    ///
543    /// # Returns
544    ///
545    /// The transaction hash on success.
546    ///
547    /// # Errors
548    ///
549    /// Returns `DydxError` if gRPC submission fails.
550    #[allow(clippy::too_many_arguments)]
551    pub async fn submit_stop_limit_order(
552        &self,
553        instrument_id: InstrumentId,
554        client_order_id: u32,
555        client_metadata: u32,
556        side: OrderSide,
557        trigger_price: Price,
558        limit_price: Price,
559        quantity: Quantity,
560        time_in_force: TimeInForce,
561        post_only: bool,
562        reduce_only: bool,
563        expire_time: Option<i64>,
564    ) -> Result<String, DydxError> {
565        log::info!(
566            "Submitting stop limit order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, \
567             trigger={trigger_price}, limit={limit_price}, qty={quantity}"
568        );
569
570        // Build proto message
571        let msg = self.order_builder.build_stop_limit_order(
572            instrument_id,
573            client_order_id,
574            client_metadata,
575            side,
576            trigger_price,
577            limit_price,
578            quantity,
579            time_in_force,
580            post_only,
581            reduce_only,
582            expire_time,
583        )?;
584
585        // Broadcast with retry
586        let operation = format!("Submit stop limit order {client_order_id}");
587        let tx_hash = self
588            .broadcaster
589            .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
590            .await?;
591
592        Ok(tx_hash)
593    }
594
595    /// Submits a take profit market order to dYdX via gRPC.
596    ///
597    /// Take profit market orders are triggered when the price reaches `trigger_price`,
598    /// then executed as a market order.
599    ///
600    /// # Returns
601    ///
602    /// The transaction hash on success.
603    ///
604    /// # Errors
605    ///
606    /// Returns `DydxError` if gRPC submission fails.
607    #[allow(clippy::too_many_arguments)]
608    pub async fn submit_take_profit_market_order(
609        &self,
610        instrument_id: InstrumentId,
611        client_order_id: u32,
612        client_metadata: u32,
613        side: OrderSide,
614        trigger_price: Price,
615        quantity: Quantity,
616        reduce_only: bool,
617        expire_time: Option<i64>,
618    ) -> Result<String, DydxError> {
619        log::info!(
620            "Submitting take profit market order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, \
621             trigger={trigger_price}, qty={quantity}"
622        );
623
624        // Build proto message
625        let msg = self.order_builder.build_take_profit_market_order(
626            instrument_id,
627            client_order_id,
628            client_metadata,
629            side,
630            trigger_price,
631            quantity,
632            reduce_only,
633            expire_time,
634        )?;
635
636        // Broadcast with retry
637        let operation = format!("Submit take profit market order {client_order_id}");
638        let tx_hash = self
639            .broadcaster
640            .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
641            .await?;
642
643        Ok(tx_hash)
644    }
645
646    /// Submits a take profit limit order to dYdX via gRPC.
647    ///
648    /// Take profit limit orders are triggered when the price reaches `trigger_price`,
649    /// then placed as a limit order at `limit_price`.
650    ///
651    /// # Returns
652    ///
653    /// The transaction hash on success.
654    ///
655    /// # Errors
656    ///
657    /// Returns `DydxError` if gRPC submission fails.
658    #[allow(clippy::too_many_arguments)]
659    pub async fn submit_take_profit_limit_order(
660        &self,
661        instrument_id: InstrumentId,
662        client_order_id: u32,
663        client_metadata: u32,
664        side: OrderSide,
665        trigger_price: Price,
666        limit_price: Price,
667        quantity: Quantity,
668        time_in_force: TimeInForce,
669        post_only: bool,
670        reduce_only: bool,
671        expire_time: Option<i64>,
672    ) -> Result<String, DydxError> {
673        log::info!(
674            "Submitting take profit limit order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, \
675             trigger={trigger_price}, limit={limit_price}, qty={quantity}"
676        );
677
678        // Build proto message
679        let msg = self.order_builder.build_take_profit_limit_order(
680            instrument_id,
681            client_order_id,
682            client_metadata,
683            side,
684            trigger_price,
685            limit_price,
686            quantity,
687            time_in_force,
688            post_only,
689            reduce_only,
690            expire_time,
691        )?;
692
693        // Broadcast with retry
694        let operation = format!("Submit take profit limit order {client_order_id}");
695        let tx_hash = self
696            .broadcaster
697            .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
698            .await?;
699
700        Ok(tx_hash)
701    }
702
703    /// Submits a conditional order (generic interface).
704    ///
705    /// This method handles all conditional order types: StopMarket, StopLimit,
706    /// TakeProfitMarket, and TakeProfitLimit.
707    ///
708    /// # Returns
709    ///
710    /// The transaction hash on success.
711    ///
712    /// # Errors
713    ///
714    /// Returns `DydxError` if gRPC submission fails or `limit_price` is missing for limit orders.
715    #[allow(clippy::too_many_arguments)]
716    pub async fn submit_conditional_order(
717        &self,
718        instrument_id: InstrumentId,
719        client_order_id: u32,
720        client_metadata: u32,
721        order_type: ConditionalOrderType,
722        side: OrderSide,
723        trigger_price: Price,
724        limit_price: Option<Price>,
725        quantity: Quantity,
726        time_in_force: Option<TimeInForce>,
727        post_only: bool,
728        reduce_only: bool,
729        expire_time: Option<i64>,
730    ) -> Result<String, DydxError> {
731        // Build proto message
732        let msg = self.order_builder.build_conditional_order(
733            instrument_id,
734            client_order_id,
735            client_metadata,
736            order_type,
737            side,
738            trigger_price,
739            limit_price,
740            quantity,
741            time_in_force,
742            post_only,
743            reduce_only,
744            expire_time,
745        )?;
746
747        // Broadcast with retry
748        let operation = format!("Submit {order_type:?} order {client_order_id}");
749        let tx_hash = self
750            .broadcaster
751            .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
752            .await?;
753
754        Ok(tx_hash)
755    }
756}