Skip to main content

nautilus_dydx/execution/
broadcaster.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Transaction broadcaster for dYdX v4 protocol.
17//!
18//! This module handles gRPC transmission of transactions with automatic retry
19//! on sequence mismatch errors. It works in conjunction with `TransactionManager`
20//! to provide reliable transaction delivery.
21//!
22//! # Retry Logic
23//!
24//! Uses the battle-tested [`RetryManager`] from `nautilus-network` with exponential
25//! backoff. When a transaction fails with "account sequence mismatch" (Cosmos SDK
26//! error code 32), the broadcaster:
27//!
28//! 1. Resyncs the sequence counter from chain
29//! 2. Rebuilds the transaction with the new sequence
30//! 3. Applies exponential backoff (500ms → 1s → 2s → 4s)
31//! 4. Retries up to 5 times
32//!
33//! This handles the case where multiple transactions are submitted in parallel
34//! and one succeeds before the other, invalidating the sequence.
35
36use std::sync::{
37    Arc,
38    atomic::{AtomicBool, Ordering},
39};
40
41use cosmrs::Any;
42use nautilus_network::retry::{RetryConfig, RetryManager};
43
44use super::{tx_manager::TransactionManager, types::PreparedTransaction};
45use crate::{error::DydxError, grpc::DydxGrpcClient};
46
47/// Maximum retries for sequence mismatch errors.
48pub const MAX_SEQUENCE_RETRIES: u32 = 5;
49
50/// Initial delay between retries in milliseconds.
51/// Exponential backoff will increase this: 500 → 1000 → 2000 → 4000ms
52const INITIAL_RETRY_DELAY_MS: u64 = 500;
53
54/// Maximum delay between retries in milliseconds.
55const MAX_RETRY_DELAY_MS: u64 = 4_000;
56
57/// Maximum total time for all retries in milliseconds (10 seconds).
58/// Prevents indefinite retry loops during chain congestion.
59const MAX_ELAPSED_MS: u64 = 10_000;
60
61/// Creates a retry manager configured for blockchain transaction broadcasting.
62///
63/// Configuration optimized for Cosmos SDK sequence management:
64/// - 5 retries with exponential backoff (500ms → 4s max)
65/// - Small jitter (100ms) to avoid thundering herd
66/// - No operation timeout (chain responses can be slow)
67/// - 10 second total budget to prevent indefinite waits
68#[must_use]
69pub fn create_tx_retry_manager() -> RetryManager<DydxError> {
70    let config = RetryConfig {
71        max_retries: MAX_SEQUENCE_RETRIES,
72        initial_delay_ms: INITIAL_RETRY_DELAY_MS,
73        max_delay_ms: MAX_RETRY_DELAY_MS,
74        backoff_factor: 2.0,
75        jitter_ms: 100,
76        operation_timeout_ms: None, // Blockchain responses can be slow
77        immediate_first: false,     // Always wait before retry (block needs time)
78        max_elapsed_ms: Some(MAX_ELAPSED_MS),
79    };
80    RetryManager::new(config)
81}
82
83/// Transaction broadcaster responsible for gRPC transmission with retry logic.
84///
85/// Works with `TransactionManager` to handle sequence mismatch errors gracefully.
86/// Uses [`RetryManager`] with exponential backoff for reliable delivery.
87///
88/// # Broadcast Modes
89///
90/// ## Stateful Orders (long-term/conditional): `broadcast_with_retry`
91///
92/// Serialized through a semaphore to prevent sequence races. Cosmos SDK requires
93/// stateful transactions to have unique, incrementing sequence numbers. The semaphore
94/// ensures allocate → build → broadcast happens atomically for each operation.
95///
96/// On sequence mismatch (Cosmos SDK error code 32 or dYdX code 104):
97/// 1. The `should_retry` callback sets a flag indicating resync is needed
98/// 2. The `RetryManager` applies exponential backoff
99/// 3. On next attempt, the operation checks the flag and resyncs sequence from chain
100/// 4. A new transaction is built with the fresh sequence and broadcast
101///
102/// ## Short-term Orders: `broadcast_short_term`
103///
104/// dYdX short-term orders use Good-Til-Block (GTB) for replay protection instead of
105/// Cosmos SDK sequences. The chain's `ClobDecorator` ante handler skips sequence
106/// checking for short-term messages. This means:
107/// - No semaphore needed (fully concurrent)
108/// - Cached sequence used (no increment, no allocation)
109/// - No sequence-based retry logic needed
110#[derive(Debug)]
111pub struct TxBroadcaster {
112    /// gRPC client for broadcasting transactions.
113    grpc_client: DydxGrpcClient,
114    /// Retry manager for handling transient failures.
115    retry_manager: RetryManager<DydxError>,
116    /// Semaphore for serializing broadcasts (permits=1 acts as mutex).
117    /// Ensures sequence allocation → build → broadcast are atomic.
118    broadcast_semaphore: Arc<tokio::sync::Semaphore>,
119}
120
121impl TxBroadcaster {
122    /// Creates a new transaction broadcaster.
123    #[must_use]
124    pub fn new(grpc_client: DydxGrpcClient) -> Self {
125        Self {
126            grpc_client,
127            retry_manager: create_tx_retry_manager(),
128            broadcast_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
129        }
130    }
131
132    /// Broadcasts a prepared transaction with automatic retry on sequence mismatch.
133    ///
134    /// **Serialization**: Acquires a semaphore permit before allocating sequence,
135    /// building, and broadcasting. This ensures transactions are broadcast in
136    /// sequence order, preventing "sequence mismatch" errors from concurrent calls.
137    ///
138    /// On sequence mismatch (code=32), resyncs from chain, allocates new sequence,
139    /// rebuilds the transaction, and retries with exponential backoff.
140    ///
141    /// # Arguments
142    ///
143    /// * `tx_manager` - Transaction manager for sequence resync and rebuilding
144    /// * `msgs` - Original messages to rebuild on retry
145    /// * `operation` - Human-readable operation name for logging
146    ///
147    /// # Returns
148    ///
149    /// The transaction hash on success.
150    ///
151    /// # Errors
152    ///
153    /// Returns error if all retries are exhausted or a non-retryable error occurs.
154    pub async fn broadcast_with_retry(
155        &self,
156        tx_manager: &TransactionManager,
157        msgs: Vec<Any>,
158        operation_name: &str,
159    ) -> Result<String, DydxError> {
160        // Acquire semaphore to serialize broadcasts.
161        // This ensures sequence N is fully broadcast before sequence N+1 is allocated.
162        let _permit =
163            self.broadcast_semaphore.acquire().await.map_err(|e| {
164                DydxError::Nautilus(anyhow::anyhow!("Broadcast semaphore closed: {e}"))
165            })?;
166
167        log::debug!("Acquired broadcast permit for {operation_name}");
168
169        // Flag to track if we need to resync sequence before the next attempt.
170        // Set by should_retry when a sequence mismatch is detected.
171        let needs_resync = Arc::new(AtomicBool::new(false));
172        let needs_resync_for_retry = Arc::clone(&needs_resync);
173
174        // Clone values that need to be moved into closures
175        let grpc_client = self.grpc_client.clone();
176        let op_name = operation_name.to_string();
177
178        let operation = || {
179            // Clone captures for the async block
180            let needs_resync = Arc::clone(&needs_resync);
181            let grpc_client = grpc_client.clone();
182            let msgs = msgs.clone();
183            let op_name = op_name.clone();
184
185            async move {
186                // Resync sequence if previous attempt failed with sequence mismatch
187                if needs_resync.swap(false, Ordering::SeqCst) {
188                    log::debug!("Resyncing sequence from chain before retry");
189                    tx_manager.resync_sequence().await?;
190                }
191
192                // Prepare transaction (allocates new sequence)
193                let prepared = tx_manager.prepare_transaction(msgs, &op_name).await?;
194
195                // Broadcast
196                let mut grpc = grpc_client;
197                let tx_hash = grpc.broadcast_tx(prepared.tx_bytes).await.map_err(|e| {
198                    log::error!("gRPC broadcast failed for {op_name}: {e}");
199                    DydxError::Nautilus(e)
200                })?;
201
202                log::info!("{op_name} successfully: tx_hash={tx_hash}");
203                Ok(tx_hash)
204            }
205        };
206
207        let should_retry = move |e: &DydxError| -> bool {
208            if e.is_sequence_mismatch() {
209                // Set flag so next attempt will resync
210                needs_resync_for_retry.store(true, Ordering::SeqCst);
211                log::warn!("Sequence mismatch detected, will resync and retry");
212                true
213            } else if e.is_transient() {
214                // Also resync on transient errors (timeout, unavailable).
215                // Without this, each retry allocates a NEW sequence, causing drift
216                // (e.g., timeout → alloc 314, timeout → alloc 315, then sequence mismatch).
217                needs_resync_for_retry.store(true, Ordering::SeqCst);
218                log::warn!("Transient error detected, will resync and retry: {e}");
219                true
220            } else {
221                false
222            }
223        };
224
225        let create_error = |msg: String| -> DydxError { DydxError::Nautilus(anyhow::anyhow!(msg)) };
226
227        // Permit is held throughout retry loop, released when _permit drops
228        self.retry_manager
229            .execute_with_retry(operation_name, operation, should_retry, create_error)
230            .await
231    }
232
233    /// Broadcasts a short-term order transaction without sequence management.
234    ///
235    /// # Errors
236    ///
237    /// Returns error if building or broadcasting fails.
238    pub async fn broadcast_short_term(
239        &self,
240        tx_manager: &TransactionManager,
241        msgs: Vec<Any>,
242        operation_name: &str,
243    ) -> Result<String, DydxError> {
244        let cached_sequence = tx_manager.get_cached_sequence().await?;
245        let prepared = tx_manager
246            .build_transaction(msgs, cached_sequence, operation_name)
247            .await?;
248
249        let mut grpc = self.grpc_client.clone();
250        let tx_hash = grpc.broadcast_tx(prepared.tx_bytes).await.map_err(|e| {
251            log::error!("gRPC broadcast failed for {operation_name}: {e}");
252            DydxError::Nautilus(e)
253        })?;
254
255        log::info!("{operation_name} successfully: tx_hash={tx_hash}");
256        Ok(tx_hash)
257    }
258
259    /// Broadcasts a prepared transaction without retry.
260    ///
261    /// Use this for optimistic batching where you handle failures externally,
262    /// or when you've already prepared a transaction and want direct control.
263    ///
264    /// # Returns
265    ///
266    /// The transaction hash on success.
267    ///
268    /// # Errors
269    ///
270    /// Returns error if the gRPC broadcast fails.
271    pub async fn broadcast_once(
272        &self,
273        prepared: &PreparedTransaction,
274    ) -> Result<String, DydxError> {
275        let mut grpc = self.grpc_client.clone();
276        let operation = &prepared.operation;
277
278        let tx_hash = grpc
279            .broadcast_tx(prepared.tx_bytes.clone())
280            .await
281            .map_err(|e| {
282                log::error!("gRPC broadcast failed for {operation}: {e}");
283                DydxError::Nautilus(e)
284            })?;
285
286        log::info!("{operation} successfully: tx_hash={tx_hash}");
287        Ok(tx_hash)
288    }
289}