Skip to main content

nautilus_dydx/execution/
tx_manager.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Transaction manager for dYdX v4 protocol.
17//!
18//! This module provides centralized transaction management including:
19//! - Atomic sequence number tracking for stateful (long-term/conditional) orders
20//! - Transaction building and signing
21//! - Chain synchronization for sequence recovery
22//!
23//! # Sequence Management
24//!
25//! dYdX has two transaction types with different sequence behavior:
26//!
27//! - **Stateful orders** (long-term, conditional): Use Cosmos SDK sequences for replay
28//!   protection. Each transaction requires a unique, incrementing sequence number.
29//! - **Short-term orders**: Use Good-Til-Block (GTB) for replay protection. The chain's
30//!   `ClobDecorator` ante handler skips sequence checking, so sequences are not consumed.
31//!   Use [`TransactionManager::get_cached_sequence`] for these — it returns the current value
32//!   without incrementing.
33//!
34//! For stateful orders, this module provides:
35//! 1. `AtomicU64` for lock-free sequence allocation via [`TransactionManager::allocate_sequence`]
36//! 2. Lazy initialization from chain on first use
37//! 3. [`TransactionManager::resync_sequence`] for recovery after mismatch errors
38//! 4. Batch allocation via [`TransactionManager::allocate_sequences`] for parallel stateful
39//!    broadcasts
40
41use std::sync::{
42    Arc, RwLock,
43    atomic::{AtomicU64, Ordering},
44};
45
46use cosmrs::Any;
47
48use super::{types::PreparedTransaction, wallet::Wallet};
49use crate::{
50    error::DydxError,
51    grpc::{DydxGrpcClient, TxBuilder, types::ChainId},
52    proto::AccountAuthenticator,
53};
54
55/// Sentinel value indicating sequence is uninitialized.
56pub const SEQUENCE_UNINITIALIZED: u64 = u64::MAX;
57
58/// Default fee denomination for dYdX transactions.
59const FEE_DENOM: &str = "adydx";
60
61/// Transaction manager responsible for wallet, sequence tracking, and transaction building.
62///
63/// This is the single source of truth for:
64/// - Wallet and signing operations
65/// - Sequence numbers (ensuring concurrent order operations don't race)
66/// - Authenticator resolution for permissioned key trading
67///
68/// # Thread Safety
69///
70/// All methods are safe to call from multiple tasks concurrently. Sequence
71/// allocation uses atomic compare-exchange operations for lock-free performance.
72#[derive(Debug)]
73pub struct TransactionManager {
74    /// gRPC client for chain queries.
75    grpc_client: DydxGrpcClient,
76    /// Wallet for transaction signing (created from private key).
77    wallet: Wallet,
78    /// Main account address (for account lookups).
79    /// May differ from wallet's signing address when using permissioned keys.
80    wallet_address: String,
81    /// Chain ID for transaction building.
82    chain_id: ChainId,
83    /// Authenticator IDs for permissioned key trading.
84    authenticator_ids: RwLock<Vec<u64>>,
85    /// Atomic sequence counter. Value `SEQUENCE_UNINITIALIZED` means uninitialized.
86    sequence_number: Arc<AtomicU64>,
87    /// Cached account number (never changes for a given address).
88    /// Value 0 means uninitialized.
89    account_number: AtomicU64,
90}
91
92impl TransactionManager {
93    /// Creates a new transaction manager.
94    ///
95    /// Creates wallet from private key internally. The sequence number is initialized
96    /// to `SEQUENCE_UNINITIALIZED` and will be fetched from chain on first use, or
97    /// can be proactively initialized by calling [`Self::initialize_sequence`].
98    ///
99    /// # Errors
100    ///
101    /// Returns error if wallet creation from private key fails.
102    pub fn new(
103        grpc_client: DydxGrpcClient,
104        private_key: &str,
105        wallet_address: String,
106        chain_id: ChainId,
107    ) -> Result<Self, DydxError> {
108        let wallet = Wallet::from_private_key(private_key)
109            .map_err(|e| DydxError::Wallet(format!("Failed to create wallet: {e}")))?;
110
111        Ok(Self {
112            grpc_client,
113            wallet,
114            wallet_address,
115            chain_id,
116            authenticator_ids: RwLock::new(Vec::new()),
117            sequence_number: Arc::new(AtomicU64::new(SEQUENCE_UNINITIALIZED)),
118            account_number: AtomicU64::new(0),
119        })
120    }
121
122    /// Proactively initializes the sequence number from chain.
123    ///
124    /// Call this during connect() to ensure orders can be submitted immediately
125    /// without first-transaction latency penalty. Also catches auth errors early.
126    ///
127    /// Returns the initialized sequence number.
128    ///
129    /// # Errors
130    ///
131    /// Returns error if chain query fails.
132    pub async fn initialize_sequence(&self) -> Result<u64, DydxError> {
133        let mut grpc = self.grpc_client.clone();
134        let base_account = grpc.get_account(&self.wallet_address).await.map_err(|e| {
135            DydxError::Grpc(Box::new(tonic::Status::internal(format!(
136                "Failed to fetch account for sequence init: {e}"
137            ))))
138        })?;
139
140        let chain_seq = base_account.sequence;
141        self.sequence_number.store(chain_seq, Ordering::SeqCst);
142        log::info!("Initialized sequence from chain: {chain_seq}");
143        Ok(chain_seq)
144    }
145
146    /// Resolves authenticator IDs if using permissioned keys (API wallet).
147    ///
148    /// Compares the wallet's signing address with the main account address.
149    /// If they differ, fetches authenticators from chain and finds the one
150    /// matching this wallet's public key.
151    ///
152    /// Call this during connect() after creating the TransactionManager.
153    ///
154    /// # Errors
155    ///
156    /// Returns error if:
157    /// - Using permissioned key but no authenticators found for main account
158    /// - No authenticator matches the wallet's public key
159    /// - gRPC query fails
160    ///
161    /// # Panics
162    ///
163    /// Panics if the internal `RwLock` is poisoned.
164    pub async fn resolve_authenticators(&self) -> Result<(), DydxError> {
165        // Check if we already have authenticator IDs configured
166        {
167            let ids = self.authenticator_ids.read().expect("RwLock poisoned");
168            if !ids.is_empty() {
169                log::debug!("Using pre-configured authenticator IDs: {:?}", *ids);
170                return Ok(());
171            }
172        }
173
174        // Get the wallet's address (derived from private key)
175        let account = self
176            .wallet
177            .account_offline()
178            .map_err(|e| DydxError::Wallet(format!("Failed to derive account: {e}")))?;
179        let signing_address = account.address.clone();
180        let signing_pubkey = account.public_key();
181
182        // Check if we're using an API wallet (signing address != main account)
183        if signing_address == self.wallet_address {
184            log::debug!(
185                "Signing wallet matches main account {}, no authenticator needed",
186                self.wallet_address
187            );
188            return Ok(());
189        }
190
191        log::info!(
192            "Detected permissioned key setup: signing with {} for main account {}",
193            signing_address,
194            self.wallet_address
195        );
196
197        // Fetch authenticators for the main account
198        let mut grpc = self.grpc_client.clone();
199        let authenticators = grpc
200            .get_authenticators(&self.wallet_address)
201            .await
202            .map_err(|e| {
203                DydxError::Grpc(Box::new(tonic::Status::internal(format!(
204                    "Failed to fetch authenticators from chain: {e}"
205                ))))
206            })?;
207
208        if authenticators.is_empty() {
209            return Err(DydxError::Config(format!(
210                "No authenticators found for {}. \
211                 Please create an API Trading Key in the dYdX UI first.",
212                self.wallet_address
213            )));
214        }
215
216        log::debug!(
217            "Found {} authenticator(s) for {}",
218            authenticators.len(),
219            self.wallet_address
220        );
221
222        // Find authenticators matching the API wallet's public key
223        let signing_pubkey_bytes = signing_pubkey.to_bytes();
224        let signing_pubkey_b64 = base64::Engine::encode(
225            &base64::engine::general_purpose::STANDARD,
226            &signing_pubkey_bytes,
227        );
228
229        let mut matching_ids = Vec::new();
230        for auth in &authenticators {
231            if Self::authenticator_matches_pubkey(auth, &signing_pubkey_b64) {
232                matching_ids.push(auth.id);
233                log::info!("Found matching authenticator: id={}", auth.id);
234            }
235        }
236
237        if matching_ids.is_empty() {
238            return Err(DydxError::Config(format!(
239                "No authenticator matches the API wallet's public key. \
240                 Ensure the API Trading Key was created for wallet {}. \
241                 Available authenticators: {:?}",
242                signing_address,
243                authenticators.iter().map(|a| a.id).collect::<Vec<_>>()
244            )));
245        }
246
247        // Store the resolved authenticator IDs
248        {
249            let mut ids = self.authenticator_ids.write().expect("RwLock poisoned");
250            *ids = matching_ids.clone();
251        }
252        log::info!("Resolved authenticator IDs: {matching_ids:?}");
253
254        Ok(())
255    }
256
257    /// Checks if an authenticator contains a SignatureVerification matching the public key.
258    ///
259    /// Expected authenticator config format (JSON array of sub-authenticators):
260    /// ```json
261    /// [{"type": "SignatureVerification", "config": "<base64-pubkey>"}, ...]
262    /// ```
263    fn authenticator_matches_pubkey(auth: &AccountAuthenticator, pubkey_b64: &str) -> bool {
264        #[derive(serde::Deserialize)]
265        struct SubAuth {
266            #[serde(rename = "type")]
267            auth_type: String,
268            config: String,
269        }
270
271        // auth.config is raw bytes (Vec<u8>) containing JSON
272        let config_str = match String::from_utf8(auth.config.clone()) {
273            Ok(s) => s,
274            Err(e) => {
275                log::warn!(
276                    "Authenticator id={} has invalid UTF-8 config (len={}): {}",
277                    auth.id,
278                    auth.config.len(),
279                    e
280                );
281                return false;
282            }
283        };
284
285        log::debug!(
286            "Checking authenticator id={}, type={}, config={}",
287            auth.id,
288            auth.r#type,
289            config_str
290        );
291
292        match serde_json::from_str::<Vec<SubAuth>>(&config_str) {
293            Ok(sub_auths) => {
294                for sub in sub_auths {
295                    log::debug!(
296                        "  Sub-authenticator: type={}, config={}",
297                        sub.auth_type,
298                        sub.config
299                    );
300                    if sub.auth_type == "SignatureVerification" && sub.config == pubkey_b64 {
301                        log::debug!("  -> MATCH! pubkey_b64={pubkey_b64}");
302                        return true;
303                    }
304                }
305            }
306            Err(e) => {
307                log::warn!(
308                    "Authenticator id={} config is not in expected JSON array format: {} (config={})",
309                    auth.id,
310                    e,
311                    config_str
312                );
313            }
314        }
315
316        false
317    }
318
319    /// Allocates the next sequence number atomically.
320    ///
321    /// If the sequence is uninitialized (0), fetches from chain first.
322    /// Uses compare-exchange for lock-free concurrent access.
323    ///
324    /// # Errors
325    ///
326    /// Returns error if chain query fails during initialization.
327    pub async fn allocate_sequence(&self) -> Result<u64, DydxError> {
328        loop {
329            let current = self.sequence_number.load(Ordering::SeqCst);
330            if current == SEQUENCE_UNINITIALIZED {
331                // Initialize from chain
332                self.initialize_sequence_from_chain().await?;
333                continue;
334            }
335            // Atomic get-and-increment
336            if self
337                .sequence_number
338                .compare_exchange(current, current + 1, Ordering::SeqCst, Ordering::SeqCst)
339                .is_ok()
340            {
341                return Ok(current);
342            }
343            // Another thread modified it, retry
344        }
345    }
346
347    /// Allocates N sequence numbers for optimistic parallel broadcast.
348    ///
349    /// Returns a vector of consecutive sequences that can be used concurrently.
350    /// The caller is responsible for handling partial failures by resyncing.
351    ///
352    /// # Arguments
353    ///
354    /// * `count` - Number of sequences to allocate
355    ///
356    /// # Errors
357    ///
358    /// Returns error if chain query fails during initialization.
359    ///
360    /// # Example
361    ///
362    /// ```ignore
363    /// let sequences = tx_manager.allocate_sequences(3).await?;
364    /// // sequences = [10, 11, 12] - three consecutive sequence numbers
365    /// ```
366    pub async fn allocate_sequences(&self, count: usize) -> Result<Vec<u64>, DydxError> {
367        if count == 0 {
368            return Ok(Vec::new());
369        }
370
371        loop {
372            let current = self.sequence_number.load(Ordering::SeqCst);
373            if current == SEQUENCE_UNINITIALIZED {
374                self.initialize_sequence_from_chain().await?;
375                continue;
376            }
377            let new_value = current + count as u64;
378            if self
379                .sequence_number
380                .compare_exchange(current, new_value, Ordering::SeqCst, Ordering::SeqCst)
381                .is_ok()
382            {
383                return Ok((current..new_value).collect());
384            }
385            // Another thread modified it, retry
386        }
387    }
388
389    /// Initializes the sequence counter from chain state.
390    ///
391    /// Only sets the value if it's still 0 (another thread might have set it).
392    async fn initialize_sequence_from_chain(&self) -> Result<(), DydxError> {
393        let mut grpc = self.grpc_client.clone();
394        let base_account = grpc.get_account(&self.wallet_address).await.map_err(|e| {
395            DydxError::Grpc(Box::new(tonic::Status::internal(format!(
396                "Failed to fetch account for sequence init: {e}"
397            ))))
398        })?;
399
400        let chain_seq = base_account.sequence;
401        // Only set if still uninitialized (another thread might have set it)
402        if self
403            .sequence_number
404            .compare_exchange(
405                SEQUENCE_UNINITIALIZED,
406                chain_seq,
407                Ordering::SeqCst,
408                Ordering::SeqCst,
409            )
410            .is_ok()
411        {
412            log::info!("Initialized sequence from chain: {chain_seq}");
413        }
414        Ok(())
415    }
416
417    /// Resyncs the sequence counter from chain after a mismatch error.
418    ///
419    /// Called by the broadcaster's retry logic when a sequence mismatch is detected.
420    /// Unconditionally stores the chain's current sequence.
421    ///
422    /// # Errors
423    ///
424    /// Returns error if chain query fails.
425    pub async fn resync_sequence(&self) -> Result<(), DydxError> {
426        let mut grpc = self.grpc_client.clone();
427        let base_account = grpc.get_account(&self.wallet_address).await.map_err(|e| {
428            DydxError::Grpc(Box::new(tonic::Status::internal(format!(
429                "Failed to fetch account for resync: {e}"
430            ))))
431        })?;
432
433        let chain_seq = base_account.sequence;
434        self.sequence_number.store(chain_seq, Ordering::SeqCst);
435        log::info!("Resynced sequence from chain: {chain_seq}");
436        Ok(())
437    }
438
439    /// Returns the current sequence value without allocation.
440    ///
441    /// Useful for logging and debugging. Returns `SEQUENCE_UNINITIALIZED` if not yet initialized.
442    #[must_use]
443    pub fn current_sequence(&self) -> u64 {
444        self.sequence_number.load(Ordering::SeqCst)
445    }
446
447    /// Returns the cached sequence for short-term orders without incrementing.
448    ///
449    /// # Errors
450    ///
451    /// Returns error if chain query fails during initialization.
452    pub async fn get_cached_sequence(&self) -> Result<u64, DydxError> {
453        let current = self.sequence_number.load(Ordering::SeqCst);
454        if current == SEQUENCE_UNINITIALIZED {
455            self.initialize_sequence_from_chain().await?;
456            return Ok(self.sequence_number.load(Ordering::SeqCst));
457        }
458        Ok(current)
459    }
460
461    /// Builds and signs a transaction with the given messages and sequence.
462    ///
463    /// Uses cached account_number (fetched once from chain) to avoid repeated queries.
464    ///
465    /// # Arguments
466    ///
467    /// * `msgs` - Proto messages to include in transaction
468    /// * `sequence` - Pre-allocated sequence number
469    /// * `operation` - Human-readable name for logging
470    ///
471    /// # Errors
472    ///
473    /// Returns error if account lookup fails or transaction building fails.
474    ///
475    /// # Panics
476    ///
477    /// Panics if the internal `RwLock` is poisoned.
478    pub async fn build_transaction(
479        &self,
480        msgs: Vec<Any>,
481        sequence: u64,
482        operation: &str,
483    ) -> Result<PreparedTransaction, DydxError> {
484        // Derive account for signing (address/account_id are cached in wallet)
485        let mut account = self
486            .wallet
487            .account_offline()
488            .map_err(|e| DydxError::Wallet(format!("Failed to derive account: {e}")))?;
489
490        // Read authenticator IDs (resolved during connect if using permissioned keys)
491        let auth_ids_snapshot: Vec<u64> = {
492            let ids = self.authenticator_ids.read().expect("RwLock poisoned");
493            ids.clone()
494        };
495
496        if !auth_ids_snapshot.is_empty() {
497            log::debug!(
498                "Using permissioned key mode: signing with {} for main account {}",
499                account.address,
500                self.wallet_address
501            );
502        }
503
504        // Get or cache account number (it never changes for a given address)
505        let account_num = self.get_or_fetch_account_number().await?;
506
507        // Set account info for signing
508        account.set_account_info(account_num, sequence);
509
510        // Build transaction
511        let tx_builder =
512            TxBuilder::new(self.chain_id.clone(), FEE_DENOM.to_string()).map_err(|e| {
513                DydxError::Grpc(Box::new(tonic::Status::internal(format!(
514                    "TxBuilder init failed: {e}"
515                ))))
516            })?;
517
518        // For permissioned key trading, each message needs an authenticator ID.
519        // Repeat the configured authenticator ID(s) for each message in the batch.
520        let expanded_auth_ids: Vec<u64> = if auth_ids_snapshot.is_empty() {
521            Vec::new()
522        } else {
523            // For each message, use the first authenticator ID
524            // (typically there's only one configured for the trading key)
525            std::iter::repeat_n(auth_ids_snapshot[0], msgs.len()).collect()
526        };
527
528        let auth_ids = if expanded_auth_ids.is_empty() {
529            None
530        } else {
531            Some(expanded_auth_ids.as_slice())
532        };
533
534        let tx_raw = tx_builder
535            .build_transaction(&account, msgs, None, auth_ids)
536            .map_err(|e| {
537                DydxError::Grpc(Box::new(tonic::Status::internal(format!(
538                    "Failed to build tx: {e}"
539                ))))
540            })?;
541
542        let tx_bytes = tx_raw.to_bytes().map_err(|e| {
543            DydxError::Grpc(Box::new(tonic::Status::internal(format!(
544                "Failed to serialize tx: {e}"
545            ))))
546        })?;
547
548        log::debug!(
549            "Built {} with {} bytes, sequence={}",
550            operation,
551            tx_bytes.len(),
552            sequence
553        );
554
555        Ok(PreparedTransaction {
556            tx_bytes,
557            sequence,
558            operation: operation.to_string(),
559        })
560    }
561
562    /// Gets the cached account number, or fetches it from chain if not yet cached.
563    ///
564    /// Account numbers are immutable on-chain, so we only need to fetch once.
565    async fn get_or_fetch_account_number(&self) -> Result<u64, DydxError> {
566        let cached = self.account_number.load(Ordering::SeqCst);
567        if cached != 0 {
568            return Ok(cached);
569        }
570
571        // Fetch from chain
572        let mut grpc = self.grpc_client.clone();
573        let base_account = grpc.get_account(&self.wallet_address).await.map_err(|e| {
574            DydxError::Grpc(Box::new(tonic::Status::internal(format!(
575                "Failed to fetch account: {e}"
576            ))))
577        })?;
578
579        let account_num = base_account.account_number;
580
581        // Cache it (CAS to handle concurrent fetches)
582        let _ = self.account_number.compare_exchange(
583            0,
584            account_num,
585            Ordering::SeqCst,
586            Ordering::SeqCst,
587        );
588
589        log::debug!("Cached account_number from chain: {account_num}");
590        Ok(account_num)
591    }
592
593    /// Convenience method: allocate sequence, build, and return prepared transaction.
594    ///
595    /// This is the typical flow for single transaction submission.
596    ///
597    /// # Arguments
598    ///
599    /// * `msgs` - Proto messages to include in transaction
600    /// * `operation` - Human-readable name for logging
601    ///
602    /// # Errors
603    ///
604    /// Returns error if sequence allocation or transaction building fails.
605    pub async fn prepare_transaction(
606        &self,
607        msgs: Vec<Any>,
608        operation: &str,
609    ) -> Result<PreparedTransaction, DydxError> {
610        let sequence = self.allocate_sequence().await?;
611        self.build_transaction(msgs, sequence, operation).await
612    }
613
614    /// Returns the wallet address.
615    #[must_use]
616    pub fn wallet_address(&self) -> &str {
617        &self.wallet_address
618    }
619
620    /// Returns the chain ID.
621    #[must_use]
622    pub fn chain_id(&self) -> &ChainId {
623        &self.chain_id
624    }
625}