nautilus_dydx/execution/broadcaster.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Transaction broadcaster for dYdX v4 protocol.
17//!
18//! This module handles gRPC transmission of transactions with automatic retry
19//! on sequence mismatch errors. It works in conjunction with `TransactionManager`
20//! to provide reliable transaction delivery.
21//!
22//! # Retry Logic
23//!
24//! Uses the battle-tested [`RetryManager`] from `nautilus-network` with exponential
25//! backoff. When a transaction fails with "account sequence mismatch" (Cosmos SDK
26//! error code 32), the broadcaster:
27//!
28//! 1. Resyncs the sequence counter from chain
29//! 2. Rebuilds the transaction with the new sequence
30//! 3. Applies exponential backoff (500ms → 1s → 2s → 4s)
31//! 4. Retries up to 5 times
32//!
33//! This handles the case where multiple transactions are submitted in parallel
34//! and one succeeds before the other, invalidating the sequence.
35
36use std::sync::{
37 Arc,
38 atomic::{AtomicBool, Ordering},
39};
40
41use cosmrs::Any;
42use nautilus_network::retry::{RetryConfig, RetryManager};
43
44use super::{tx_manager::TransactionManager, types::PreparedTransaction};
45use crate::{error::DydxError, grpc::DydxGrpcClient};
46
47/// Maximum retries for sequence mismatch errors.
48pub const MAX_SEQUENCE_RETRIES: u32 = 5;
49
50/// Initial delay between retries in milliseconds.
51/// Exponential backoff will increase this: 500 → 1000 → 2000 → 4000ms
52const INITIAL_RETRY_DELAY_MS: u64 = 500;
53
54/// Maximum delay between retries in milliseconds.
55const MAX_RETRY_DELAY_MS: u64 = 4_000;
56
57/// Maximum total time for all retries in milliseconds (10 seconds).
58/// Prevents indefinite retry loops during chain congestion.
59const MAX_ELAPSED_MS: u64 = 10_000;
60
61/// Creates a retry manager configured for blockchain transaction broadcasting.
62///
63/// Configuration optimized for Cosmos SDK sequence management:
64/// - 5 retries with exponential backoff (500ms → 4s max)
65/// - Small jitter (100ms) to avoid thundering herd
66/// - No operation timeout (chain responses can be slow)
67/// - 10 second total budget to prevent indefinite waits
68#[must_use]
69pub fn create_tx_retry_manager() -> RetryManager<DydxError> {
70 let config = RetryConfig {
71 max_retries: MAX_SEQUENCE_RETRIES,
72 initial_delay_ms: INITIAL_RETRY_DELAY_MS,
73 max_delay_ms: MAX_RETRY_DELAY_MS,
74 backoff_factor: 2.0,
75 jitter_ms: 100,
76 operation_timeout_ms: None, // Blockchain responses can be slow
77 immediate_first: false, // Always wait before retry (block needs time)
78 max_elapsed_ms: Some(MAX_ELAPSED_MS),
79 };
80 RetryManager::new(config)
81}
82
83/// Transaction broadcaster responsible for gRPC transmission with retry logic.
84///
85/// Works with `TransactionManager` to handle sequence mismatch errors gracefully.
86/// Uses [`RetryManager`] with exponential backoff for reliable delivery.
87///
88/// # Broadcast Modes
89///
90/// ## Stateful Orders (long-term/conditional): `broadcast_with_retry`
91///
92/// Serialized through a semaphore to prevent sequence races. Cosmos SDK requires
93/// stateful transactions to have unique, incrementing sequence numbers. The semaphore
94/// ensures allocate → build → broadcast happens atomically for each operation.
95///
96/// On sequence mismatch (Cosmos SDK error code 32 or dYdX code 104):
97/// 1. The `should_retry` callback sets a flag indicating resync is needed
98/// 2. The `RetryManager` applies exponential backoff
99/// 3. On next attempt, the operation checks the flag and resyncs sequence from chain
100/// 4. A new transaction is built with the fresh sequence and broadcast
101///
102/// ## Short-term Orders: `broadcast_short_term`
103///
104/// dYdX short-term orders use Good-Til-Block (GTB) for replay protection instead of
105/// Cosmos SDK sequences. The chain's `ClobDecorator` ante handler skips sequence
106/// checking for short-term messages. This means:
107/// - No semaphore needed (fully concurrent)
108/// - Cached sequence used (no increment, no allocation)
109/// - No sequence-based retry logic needed
110#[derive(Debug)]
111pub struct TxBroadcaster {
112 /// gRPC client for broadcasting transactions.
113 grpc_client: DydxGrpcClient,
114 /// Retry manager for handling transient failures.
115 retry_manager: RetryManager<DydxError>,
116 /// Semaphore for serializing broadcasts (permits=1 acts as mutex).
117 /// Ensures sequence allocation → build → broadcast are atomic.
118 broadcast_semaphore: Arc<tokio::sync::Semaphore>,
119}
120
121impl TxBroadcaster {
122 /// Creates a new transaction broadcaster.
123 #[must_use]
124 pub fn new(grpc_client: DydxGrpcClient) -> Self {
125 Self {
126 grpc_client,
127 retry_manager: create_tx_retry_manager(),
128 broadcast_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
129 }
130 }
131
132 /// Broadcasts a prepared transaction with automatic retry on sequence mismatch.
133 ///
134 /// **Serialization**: Acquires a semaphore permit before allocating sequence,
135 /// building, and broadcasting. This ensures transactions are broadcast in
136 /// sequence order, preventing "sequence mismatch" errors from concurrent calls.
137 ///
138 /// On sequence mismatch (code=32), resyncs from chain, allocates new sequence,
139 /// rebuilds the transaction, and retries with exponential backoff.
140 ///
141 /// # Arguments
142 ///
143 /// * `tx_manager` - Transaction manager for sequence resync and rebuilding
144 /// * `msgs` - Original messages to rebuild on retry
145 /// * `operation` - Human-readable operation name for logging
146 ///
147 /// # Returns
148 ///
149 /// The transaction hash on success.
150 ///
151 /// # Errors
152 ///
153 /// Returns error if all retries are exhausted or a non-retryable error occurs.
154 pub async fn broadcast_with_retry(
155 &self,
156 tx_manager: &TransactionManager,
157 msgs: Vec<Any>,
158 operation_name: &str,
159 ) -> Result<String, DydxError> {
160 // Acquire semaphore to serialize broadcasts.
161 // This ensures sequence N is fully broadcast before sequence N+1 is allocated.
162 let _permit =
163 self.broadcast_semaphore.acquire().await.map_err(|e| {
164 DydxError::Nautilus(anyhow::anyhow!("Broadcast semaphore closed: {e}"))
165 })?;
166
167 log::debug!("Acquired broadcast permit for {operation_name}");
168
169 // Flag to track if we need to resync sequence before the next attempt.
170 // Set by should_retry when a sequence mismatch is detected.
171 let needs_resync = Arc::new(AtomicBool::new(false));
172 let needs_resync_for_retry = Arc::clone(&needs_resync);
173
174 // Clone values that need to be moved into closures
175 let grpc_client = self.grpc_client.clone();
176 let op_name = operation_name.to_string();
177
178 let operation = || {
179 // Clone captures for the async block
180 let needs_resync = Arc::clone(&needs_resync);
181 let grpc_client = grpc_client.clone();
182 let msgs = msgs.clone();
183 let op_name = op_name.clone();
184
185 async move {
186 // Resync sequence if previous attempt failed with sequence mismatch
187 if needs_resync.swap(false, Ordering::SeqCst) {
188 log::debug!("Resyncing sequence from chain before retry");
189 tx_manager.resync_sequence().await?;
190 }
191
192 // Prepare transaction (allocates new sequence)
193 let prepared = tx_manager.prepare_transaction(msgs, &op_name).await?;
194
195 // Broadcast
196 let mut grpc = grpc_client;
197 let tx_hash = grpc.broadcast_tx(prepared.tx_bytes).await.map_err(|e| {
198 log::error!("gRPC broadcast failed for {op_name}: {e}");
199 DydxError::Nautilus(e)
200 })?;
201
202 log::info!("{op_name} successfully: tx_hash={tx_hash}");
203 Ok(tx_hash)
204 }
205 };
206
207 let should_retry = move |e: &DydxError| -> bool {
208 if e.is_sequence_mismatch() {
209 // Set flag so next attempt will resync
210 needs_resync_for_retry.store(true, Ordering::SeqCst);
211 log::warn!("Sequence mismatch detected, will resync and retry");
212 true
213 } else if e.is_transient() {
214 // Also resync on transient errors (timeout, unavailable).
215 // Without this, each retry allocates a NEW sequence, causing drift
216 // (e.g., timeout → alloc 314, timeout → alloc 315, then sequence mismatch).
217 needs_resync_for_retry.store(true, Ordering::SeqCst);
218 log::warn!("Transient error detected, will resync and retry: {e}");
219 true
220 } else {
221 false
222 }
223 };
224
225 let create_error = |msg: String| -> DydxError { DydxError::Nautilus(anyhow::anyhow!(msg)) };
226
227 // Permit is held throughout retry loop, released when _permit drops
228 self.retry_manager
229 .execute_with_retry(operation_name, operation, should_retry, create_error)
230 .await
231 }
232
233 /// Broadcasts a short-term order transaction without sequence management.
234 ///
235 /// # Errors
236 ///
237 /// Returns error if building or broadcasting fails.
238 pub async fn broadcast_short_term(
239 &self,
240 tx_manager: &TransactionManager,
241 msgs: Vec<Any>,
242 operation_name: &str,
243 ) -> Result<String, DydxError> {
244 let cached_sequence = tx_manager.get_cached_sequence().await?;
245 let prepared = tx_manager
246 .build_transaction(msgs, cached_sequence, operation_name)
247 .await?;
248
249 let mut grpc = self.grpc_client.clone();
250 let tx_hash = grpc.broadcast_tx(prepared.tx_bytes).await.map_err(|e| {
251 log::error!("gRPC broadcast failed for {operation_name}: {e}");
252 DydxError::Nautilus(e)
253 })?;
254
255 log::info!("{operation_name} successfully: tx_hash={tx_hash}");
256 Ok(tx_hash)
257 }
258
259 /// Broadcasts a prepared transaction without retry.
260 ///
261 /// Use this for optimistic batching where you handle failures externally,
262 /// or when you've already prepared a transaction and want direct control.
263 ///
264 /// # Returns
265 ///
266 /// The transaction hash on success.
267 ///
268 /// # Errors
269 ///
270 /// Returns error if the gRPC broadcast fails.
271 pub async fn broadcast_once(
272 &self,
273 prepared: &PreparedTransaction,
274 ) -> Result<String, DydxError> {
275 let mut grpc = self.grpc_client.clone();
276 let operation = &prepared.operation;
277
278 let tx_hash = grpc
279 .broadcast_tx(prepared.tx_bytes.clone())
280 .await
281 .map_err(|e| {
282 log::error!("gRPC broadcast failed for {operation}: {e}");
283 DydxError::Nautilus(e)
284 })?;
285
286 log::info!("{operation} successfully: tx_hash={tx_hash}");
287 Ok(tx_hash)
288 }
289}