nautilus_dydx/execution/tx_manager.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Transaction manager for dYdX v4 protocol.
17//!
18//! This module provides centralized transaction management including:
19//! - Atomic sequence number tracking for stateful (long-term/conditional) orders
20//! - Transaction building and signing
21//! - Chain synchronization for sequence recovery
22//!
23//! # Sequence Management
24//!
25//! dYdX has two transaction types with different sequence behavior:
26//!
27//! - **Stateful orders** (long-term, conditional): Use Cosmos SDK sequences for replay
28//! protection. Each transaction requires a unique, incrementing sequence number.
29//! - **Short-term orders**: Use Good-Til-Block (GTB) for replay protection. The chain's
30//! `ClobDecorator` ante handler skips sequence checking, so sequences are not consumed.
31//! Use [`TransactionManager::get_cached_sequence`] for these — it returns the current value
32//! without incrementing.
33//!
34//! For stateful orders, this module provides:
35//! 1. `AtomicU64` for lock-free sequence allocation via [`TransactionManager::allocate_sequence`]
36//! 2. Lazy initialization from chain on first use
37//! 3. [`TransactionManager::resync_sequence`] for recovery after mismatch errors
38//! 4. Batch allocation via [`TransactionManager::allocate_sequences`] for parallel stateful
39//! broadcasts
40
41use std::sync::{
42 Arc, RwLock,
43 atomic::{AtomicU64, Ordering},
44};
45
46use cosmrs::Any;
47
48use super::{types::PreparedTransaction, wallet::Wallet};
49use crate::{
50 error::DydxError,
51 grpc::{DydxGrpcClient, TxBuilder, types::ChainId},
52 proto::AccountAuthenticator,
53};
54
55/// Sentinel value indicating sequence is uninitialized.
56pub const SEQUENCE_UNINITIALIZED: u64 = u64::MAX;
57
58/// Default fee denomination for dYdX transactions.
59const FEE_DENOM: &str = "adydx";
60
61/// Transaction manager responsible for wallet, sequence tracking, and transaction building.
62///
63/// This is the single source of truth for:
64/// - Wallet and signing operations
65/// - Sequence numbers (ensuring concurrent order operations don't race)
66/// - Authenticator resolution for permissioned key trading
67///
68/// # Thread Safety
69///
70/// All methods are safe to call from multiple tasks concurrently. Sequence
71/// allocation uses atomic compare-exchange operations for lock-free performance.
72#[derive(Debug)]
73pub struct TransactionManager {
74 /// gRPC client for chain queries.
75 grpc_client: DydxGrpcClient,
76 /// Wallet for transaction signing (created from private key).
77 wallet: Wallet,
78 /// Main account address (for account lookups).
79 /// May differ from wallet's signing address when using permissioned keys.
80 wallet_address: String,
81 /// Chain ID for transaction building.
82 chain_id: ChainId,
83 /// Authenticator IDs for permissioned key trading.
84 authenticator_ids: RwLock<Vec<u64>>,
85 /// Atomic sequence counter. Value `SEQUENCE_UNINITIALIZED` means uninitialized.
86 sequence_number: Arc<AtomicU64>,
87 /// Cached account number (never changes for a given address).
88 /// Value 0 means uninitialized.
89 account_number: AtomicU64,
90}
91
92impl TransactionManager {
93 /// Creates a new transaction manager.
94 ///
95 /// Creates wallet from private key internally. The sequence number is initialized
96 /// to `SEQUENCE_UNINITIALIZED` and will be fetched from chain on first use, or
97 /// can be proactively initialized by calling [`Self::initialize_sequence`].
98 ///
99 /// # Errors
100 ///
101 /// Returns error if wallet creation from private key fails.
102 pub fn new(
103 grpc_client: DydxGrpcClient,
104 private_key: &str,
105 wallet_address: String,
106 chain_id: ChainId,
107 ) -> Result<Self, DydxError> {
108 let wallet = Wallet::from_private_key(private_key)
109 .map_err(|e| DydxError::Wallet(format!("Failed to create wallet: {e}")))?;
110
111 Ok(Self {
112 grpc_client,
113 wallet,
114 wallet_address,
115 chain_id,
116 authenticator_ids: RwLock::new(Vec::new()),
117 sequence_number: Arc::new(AtomicU64::new(SEQUENCE_UNINITIALIZED)),
118 account_number: AtomicU64::new(0),
119 })
120 }
121
122 /// Proactively initializes the sequence number from chain.
123 ///
124 /// Call this during connect() to ensure orders can be submitted immediately
125 /// without first-transaction latency penalty. Also catches auth errors early.
126 ///
127 /// Returns the initialized sequence number.
128 ///
129 /// # Errors
130 ///
131 /// Returns error if chain query fails.
132 pub async fn initialize_sequence(&self) -> Result<u64, DydxError> {
133 let mut grpc = self.grpc_client.clone();
134 let base_account = grpc.get_account(&self.wallet_address).await.map_err(|e| {
135 DydxError::Grpc(Box::new(tonic::Status::internal(format!(
136 "Failed to fetch account for sequence init: {e}"
137 ))))
138 })?;
139
140 let chain_seq = base_account.sequence;
141 self.sequence_number.store(chain_seq, Ordering::SeqCst);
142 log::info!("Initialized sequence from chain: {chain_seq}");
143 Ok(chain_seq)
144 }
145
146 /// Resolves authenticator IDs if using permissioned keys (API wallet).
147 ///
148 /// Compares the wallet's signing address with the main account address.
149 /// If they differ, fetches authenticators from chain and finds the one
150 /// matching this wallet's public key.
151 ///
152 /// Call this during connect() after creating the TransactionManager.
153 ///
154 /// # Errors
155 ///
156 /// Returns error if:
157 /// - Using permissioned key but no authenticators found for main account
158 /// - No authenticator matches the wallet's public key
159 /// - gRPC query fails
160 ///
161 /// # Panics
162 ///
163 /// Panics if the internal `RwLock` is poisoned.
164 pub async fn resolve_authenticators(&self) -> Result<(), DydxError> {
165 // Check if we already have authenticator IDs configured
166 {
167 let ids = self.authenticator_ids.read().expect("RwLock poisoned");
168 if !ids.is_empty() {
169 log::debug!("Using pre-configured authenticator IDs: {:?}", *ids);
170 return Ok(());
171 }
172 }
173
174 // Get the wallet's address (derived from private key)
175 let account = self
176 .wallet
177 .account_offline()
178 .map_err(|e| DydxError::Wallet(format!("Failed to derive account: {e}")))?;
179 let signing_address = account.address.clone();
180 let signing_pubkey = account.public_key();
181
182 // Check if we're using an API wallet (signing address != main account)
183 if signing_address == self.wallet_address {
184 log::debug!(
185 "Signing wallet matches main account {}, no authenticator needed",
186 self.wallet_address
187 );
188 return Ok(());
189 }
190
191 log::info!(
192 "Detected permissioned key setup: signing with {} for main account {}",
193 signing_address,
194 self.wallet_address
195 );
196
197 // Fetch authenticators for the main account
198 let mut grpc = self.grpc_client.clone();
199 let authenticators = grpc
200 .get_authenticators(&self.wallet_address)
201 .await
202 .map_err(|e| {
203 DydxError::Grpc(Box::new(tonic::Status::internal(format!(
204 "Failed to fetch authenticators from chain: {e}"
205 ))))
206 })?;
207
208 if authenticators.is_empty() {
209 return Err(DydxError::Config(format!(
210 "No authenticators found for {}. \
211 Please create an API Trading Key in the dYdX UI first.",
212 self.wallet_address
213 )));
214 }
215
216 log::debug!(
217 "Found {} authenticator(s) for {}",
218 authenticators.len(),
219 self.wallet_address
220 );
221
222 // Find authenticators matching the API wallet's public key
223 let signing_pubkey_bytes = signing_pubkey.to_bytes();
224 let signing_pubkey_b64 = base64::Engine::encode(
225 &base64::engine::general_purpose::STANDARD,
226 &signing_pubkey_bytes,
227 );
228
229 let mut matching_ids = Vec::new();
230 for auth in &authenticators {
231 if Self::authenticator_matches_pubkey(auth, &signing_pubkey_b64) {
232 matching_ids.push(auth.id);
233 log::info!("Found matching authenticator: id={}", auth.id);
234 }
235 }
236
237 if matching_ids.is_empty() {
238 return Err(DydxError::Config(format!(
239 "No authenticator matches the API wallet's public key. \
240 Ensure the API Trading Key was created for wallet {}. \
241 Available authenticators: {:?}",
242 signing_address,
243 authenticators.iter().map(|a| a.id).collect::<Vec<_>>()
244 )));
245 }
246
247 // Store the resolved authenticator IDs
248 {
249 let mut ids = self.authenticator_ids.write().expect("RwLock poisoned");
250 *ids = matching_ids.clone();
251 }
252 log::info!("Resolved authenticator IDs: {matching_ids:?}");
253
254 Ok(())
255 }
256
257 /// Checks if an authenticator contains a SignatureVerification matching the public key.
258 ///
259 /// Expected authenticator config format (JSON array of sub-authenticators):
260 /// ```json
261 /// [{"type": "SignatureVerification", "config": "<base64-pubkey>"}, ...]
262 /// ```
263 fn authenticator_matches_pubkey(auth: &AccountAuthenticator, pubkey_b64: &str) -> bool {
264 #[derive(serde::Deserialize)]
265 struct SubAuth {
266 #[serde(rename = "type")]
267 auth_type: String,
268 config: String,
269 }
270
271 // auth.config is raw bytes (Vec<u8>) containing JSON
272 let config_str = match String::from_utf8(auth.config.clone()) {
273 Ok(s) => s,
274 Err(e) => {
275 log::warn!(
276 "Authenticator id={} has invalid UTF-8 config (len={}): {}",
277 auth.id,
278 auth.config.len(),
279 e
280 );
281 return false;
282 }
283 };
284
285 log::debug!(
286 "Checking authenticator id={}, type={}, config={}",
287 auth.id,
288 auth.r#type,
289 config_str
290 );
291
292 match serde_json::from_str::<Vec<SubAuth>>(&config_str) {
293 Ok(sub_auths) => {
294 for sub in sub_auths {
295 log::debug!(
296 " Sub-authenticator: type={}, config={}",
297 sub.auth_type,
298 sub.config
299 );
300 if sub.auth_type == "SignatureVerification" && sub.config == pubkey_b64 {
301 log::debug!(" -> MATCH! pubkey_b64={pubkey_b64}");
302 return true;
303 }
304 }
305 }
306 Err(e) => {
307 log::warn!(
308 "Authenticator id={} config is not in expected JSON array format: {} (config={})",
309 auth.id,
310 e,
311 config_str
312 );
313 }
314 }
315
316 false
317 }
318
319 /// Allocates the next sequence number atomically.
320 ///
321 /// If the sequence is uninitialized (0), fetches from chain first.
322 /// Uses compare-exchange for lock-free concurrent access.
323 ///
324 /// # Errors
325 ///
326 /// Returns error if chain query fails during initialization.
327 pub async fn allocate_sequence(&self) -> Result<u64, DydxError> {
328 loop {
329 let current = self.sequence_number.load(Ordering::SeqCst);
330 if current == SEQUENCE_UNINITIALIZED {
331 // Initialize from chain
332 self.initialize_sequence_from_chain().await?;
333 continue;
334 }
335 // Atomic get-and-increment
336 if self
337 .sequence_number
338 .compare_exchange(current, current + 1, Ordering::SeqCst, Ordering::SeqCst)
339 .is_ok()
340 {
341 return Ok(current);
342 }
343 // Another thread modified it, retry
344 }
345 }
346
347 /// Allocates N sequence numbers for optimistic parallel broadcast.
348 ///
349 /// Returns a vector of consecutive sequences that can be used concurrently.
350 /// The caller is responsible for handling partial failures by resyncing.
351 ///
352 /// # Arguments
353 ///
354 /// * `count` - Number of sequences to allocate
355 ///
356 /// # Errors
357 ///
358 /// Returns error if chain query fails during initialization.
359 ///
360 /// # Example
361 ///
362 /// ```ignore
363 /// let sequences = tx_manager.allocate_sequences(3).await?;
364 /// // sequences = [10, 11, 12] - three consecutive sequence numbers
365 /// ```
366 pub async fn allocate_sequences(&self, count: usize) -> Result<Vec<u64>, DydxError> {
367 if count == 0 {
368 return Ok(Vec::new());
369 }
370
371 loop {
372 let current = self.sequence_number.load(Ordering::SeqCst);
373 if current == SEQUENCE_UNINITIALIZED {
374 self.initialize_sequence_from_chain().await?;
375 continue;
376 }
377 let new_value = current + count as u64;
378 if self
379 .sequence_number
380 .compare_exchange(current, new_value, Ordering::SeqCst, Ordering::SeqCst)
381 .is_ok()
382 {
383 return Ok((current..new_value).collect());
384 }
385 // Another thread modified it, retry
386 }
387 }
388
389 /// Initializes the sequence counter from chain state.
390 ///
391 /// Only sets the value if it's still 0 (another thread might have set it).
392 async fn initialize_sequence_from_chain(&self) -> Result<(), DydxError> {
393 let mut grpc = self.grpc_client.clone();
394 let base_account = grpc.get_account(&self.wallet_address).await.map_err(|e| {
395 DydxError::Grpc(Box::new(tonic::Status::internal(format!(
396 "Failed to fetch account for sequence init: {e}"
397 ))))
398 })?;
399
400 let chain_seq = base_account.sequence;
401 // Only set if still uninitialized (another thread might have set it)
402 if self
403 .sequence_number
404 .compare_exchange(
405 SEQUENCE_UNINITIALIZED,
406 chain_seq,
407 Ordering::SeqCst,
408 Ordering::SeqCst,
409 )
410 .is_ok()
411 {
412 log::info!("Initialized sequence from chain: {chain_seq}");
413 }
414 Ok(())
415 }
416
417 /// Resyncs the sequence counter from chain after a mismatch error.
418 ///
419 /// Called by the broadcaster's retry logic when a sequence mismatch is detected.
420 /// Unconditionally stores the chain's current sequence.
421 ///
422 /// # Errors
423 ///
424 /// Returns error if chain query fails.
425 pub async fn resync_sequence(&self) -> Result<(), DydxError> {
426 let mut grpc = self.grpc_client.clone();
427 let base_account = grpc.get_account(&self.wallet_address).await.map_err(|e| {
428 DydxError::Grpc(Box::new(tonic::Status::internal(format!(
429 "Failed to fetch account for resync: {e}"
430 ))))
431 })?;
432
433 let chain_seq = base_account.sequence;
434 self.sequence_number.store(chain_seq, Ordering::SeqCst);
435 log::info!("Resynced sequence from chain: {chain_seq}");
436 Ok(())
437 }
438
439 /// Returns the current sequence value without allocation.
440 ///
441 /// Useful for logging and debugging. Returns `SEQUENCE_UNINITIALIZED` if not yet initialized.
442 #[must_use]
443 pub fn current_sequence(&self) -> u64 {
444 self.sequence_number.load(Ordering::SeqCst)
445 }
446
447 /// Returns the cached sequence for short-term orders without incrementing.
448 ///
449 /// # Errors
450 ///
451 /// Returns error if chain query fails during initialization.
452 pub async fn get_cached_sequence(&self) -> Result<u64, DydxError> {
453 let current = self.sequence_number.load(Ordering::SeqCst);
454 if current == SEQUENCE_UNINITIALIZED {
455 self.initialize_sequence_from_chain().await?;
456 return Ok(self.sequence_number.load(Ordering::SeqCst));
457 }
458 Ok(current)
459 }
460
461 /// Builds and signs a transaction with the given messages and sequence.
462 ///
463 /// Uses cached account_number (fetched once from chain) to avoid repeated queries.
464 ///
465 /// # Arguments
466 ///
467 /// * `msgs` - Proto messages to include in transaction
468 /// * `sequence` - Pre-allocated sequence number
469 /// * `operation` - Human-readable name for logging
470 ///
471 /// # Errors
472 ///
473 /// Returns error if account lookup fails or transaction building fails.
474 ///
475 /// # Panics
476 ///
477 /// Panics if the internal `RwLock` is poisoned.
478 pub async fn build_transaction(
479 &self,
480 msgs: Vec<Any>,
481 sequence: u64,
482 operation: &str,
483 ) -> Result<PreparedTransaction, DydxError> {
484 // Derive account for signing (address/account_id are cached in wallet)
485 let mut account = self
486 .wallet
487 .account_offline()
488 .map_err(|e| DydxError::Wallet(format!("Failed to derive account: {e}")))?;
489
490 // Read authenticator IDs (resolved during connect if using permissioned keys)
491 let auth_ids_snapshot: Vec<u64> = {
492 let ids = self.authenticator_ids.read().expect("RwLock poisoned");
493 ids.clone()
494 };
495
496 if !auth_ids_snapshot.is_empty() {
497 log::debug!(
498 "Using permissioned key mode: signing with {} for main account {}",
499 account.address,
500 self.wallet_address
501 );
502 }
503
504 // Get or cache account number (it never changes for a given address)
505 let account_num = self.get_or_fetch_account_number().await?;
506
507 // Set account info for signing
508 account.set_account_info(account_num, sequence);
509
510 // Build transaction
511 let tx_builder =
512 TxBuilder::new(self.chain_id.clone(), FEE_DENOM.to_string()).map_err(|e| {
513 DydxError::Grpc(Box::new(tonic::Status::internal(format!(
514 "TxBuilder init failed: {e}"
515 ))))
516 })?;
517
518 // For permissioned key trading, each message needs an authenticator ID.
519 // Repeat the configured authenticator ID(s) for each message in the batch.
520 let expanded_auth_ids: Vec<u64> = if auth_ids_snapshot.is_empty() {
521 Vec::new()
522 } else {
523 // For each message, use the first authenticator ID
524 // (typically there's only one configured for the trading key)
525 std::iter::repeat_n(auth_ids_snapshot[0], msgs.len()).collect()
526 };
527
528 let auth_ids = if expanded_auth_ids.is_empty() {
529 None
530 } else {
531 Some(expanded_auth_ids.as_slice())
532 };
533
534 let tx_raw = tx_builder
535 .build_transaction(&account, msgs, None, auth_ids)
536 .map_err(|e| {
537 DydxError::Grpc(Box::new(tonic::Status::internal(format!(
538 "Failed to build tx: {e}"
539 ))))
540 })?;
541
542 let tx_bytes = tx_raw.to_bytes().map_err(|e| {
543 DydxError::Grpc(Box::new(tonic::Status::internal(format!(
544 "Failed to serialize tx: {e}"
545 ))))
546 })?;
547
548 log::debug!(
549 "Built {} with {} bytes, sequence={}",
550 operation,
551 tx_bytes.len(),
552 sequence
553 );
554
555 Ok(PreparedTransaction {
556 tx_bytes,
557 sequence,
558 operation: operation.to_string(),
559 })
560 }
561
562 /// Gets the cached account number, or fetches it from chain if not yet cached.
563 ///
564 /// Account numbers are immutable on-chain, so we only need to fetch once.
565 async fn get_or_fetch_account_number(&self) -> Result<u64, DydxError> {
566 let cached = self.account_number.load(Ordering::SeqCst);
567 if cached != 0 {
568 return Ok(cached);
569 }
570
571 // Fetch from chain
572 let mut grpc = self.grpc_client.clone();
573 let base_account = grpc.get_account(&self.wallet_address).await.map_err(|e| {
574 DydxError::Grpc(Box::new(tonic::Status::internal(format!(
575 "Failed to fetch account: {e}"
576 ))))
577 })?;
578
579 let account_num = base_account.account_number;
580
581 // Cache it (CAS to handle concurrent fetches)
582 let _ = self.account_number.compare_exchange(
583 0,
584 account_num,
585 Ordering::SeqCst,
586 Ordering::SeqCst,
587 );
588
589 log::debug!("Cached account_number from chain: {account_num}");
590 Ok(account_num)
591 }
592
593 /// Convenience method: allocate sequence, build, and return prepared transaction.
594 ///
595 /// This is the typical flow for single transaction submission.
596 ///
597 /// # Arguments
598 ///
599 /// * `msgs` - Proto messages to include in transaction
600 /// * `operation` - Human-readable name for logging
601 ///
602 /// # Errors
603 ///
604 /// Returns error if sequence allocation or transaction building fails.
605 pub async fn prepare_transaction(
606 &self,
607 msgs: Vec<Any>,
608 operation: &str,
609 ) -> Result<PreparedTransaction, DydxError> {
610 let sequence = self.allocate_sequence().await?;
611 self.build_transaction(msgs, sequence, operation).await
612 }
613
614 /// Returns the wallet address.
615 #[must_use]
616 pub fn wallet_address(&self) -> &str {
617 &self.wallet_address
618 }
619
620 /// Returns the chain ID.
621 #[must_use]
622 pub fn chain_id(&self) -> &ChainId {
623 &self.chain_id
624 }
625}