Skip to main content

nautilus_dydx/execution/
block_time.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Block time monitoring for dYdX short-term order expiration estimation.
17//!
18//! This module provides [`BlockTimeMonitor`], a component that tracks rolling average
19//! block times from WebSocket data to enable accurate estimation of short-term order
20//! expiration in wall-clock time.
21//!
22//! # Overview
23//!
24//! dYdX short-term orders expire by block height (typically 20 blocks). Without knowing
25//! the actual block time, it's impossible to estimate when an order will expire in
26//! wall-clock time. This monitor captures block timestamps from WebSocket updates and
27
28use std::{
29    collections::VecDeque,
30    sync::{
31        RwLock,
32        atomic::{AtomicU64, Ordering},
33    },
34};
35
36use chrono::{DateTime, Utc};
37
38/// Default rolling window size for block time averaging.
39///
40/// 100 blocks at ~500ms/block = ~50 seconds of data.
41pub const DEFAULT_BLOCK_TIME_WINDOW_SIZE: usize = 100;
42
43/// Default block time in milliseconds (dYdX mainnet ~500ms).
44///
45/// Used as fallback when insufficient samples are available.
46pub const DEFAULT_BLOCK_TIME_MS: u64 = 500;
47
48/// Minimum number of samples required before trusting the rolling average.
49///
50/// Below this threshold, [`BlockTimeMonitor::estimated_seconds_per_block`] returns `None`
51/// and [`BlockTimeMonitor::seconds_per_block_or_default`] uses the default value.
52pub const MIN_SAMPLES_FOR_ESTIMATE: usize = 5;
53
54/// Minimum valid block time in milliseconds.
55///
56/// Any calculated block time below this threshold is considered invalid
57/// (likely due to clock skew, data corruption, or integer division truncation).
58/// When detected, the monitor falls back to the default block time.
59pub const MIN_VALID_BLOCK_TIME_MS: f64 = 50.0;
60
61/// Internal rolling window buffer for block samples.
62///
63/// Uses a `VecDeque` for O(1) push/pop operations with bounded memory.
64/// Includes deduplication to skip repeated block heights during rapid replays.
65#[derive(Debug)]
66struct BlockTimeWindow {
67    /// Circular buffer of (height, timestamp) samples.
68    samples: VecDeque<(u64, DateTime<Utc>)>,
69    /// Maximum capacity of the window.
70    capacity: usize,
71    /// Last recorded block height for deduplication.
72    last_height: Option<u64>,
73}
74
75impl BlockTimeWindow {
76    /// Creates a new window with specified capacity.
77    fn new(capacity: usize) -> Self {
78        Self {
79            samples: VecDeque::with_capacity(capacity),
80            capacity,
81            last_height: None,
82        }
83    }
84
85    /// Records a new block sample.
86    ///
87    /// Skips duplicate block heights to prevent redundant entries during rapid
88    /// block replays where the same height may be reported multiple times.
89    fn record(&mut self, height: u64, time: DateTime<Utc>) {
90        // Skip duplicate heights (rapid replays often repeat same block)
91        if self.last_height == Some(height) {
92            return;
93        }
94        self.last_height = Some(height);
95
96        // Maintain bounded size: remove oldest when at capacity
97        if self.samples.len() >= self.capacity {
98            self.samples.pop_front();
99        }
100        self.samples.push_back((height, time));
101    }
102
103    /// Returns the number of samples in the window.
104    fn sample_count(&self) -> usize {
105        self.samples.len()
106    }
107
108    /// Computes the average seconds per block from the rolling window.
109    ///
110    /// Returns `None` if fewer than [`MIN_SAMPLES_FOR_ESTIMATE`] samples are available.
111    fn average_seconds_per_block(&self) -> Option<f64> {
112        let sample_count = self.sample_count();
113        if sample_count < MIN_SAMPLES_FOR_ESTIMATE {
114            return None;
115        }
116
117        // Sort samples by height to compute deltas between consecutive blocks
118        let mut sorted: Vec<_> = self.samples.iter().copied().collect();
119        sorted.sort_by_key(|(height, _)| *height);
120
121        let mut total_delta_ms: i64 = 0;
122        let mut delta_count: usize = 0;
123
124        for window in sorted.windows(2) {
125            let (h1, t1) = &window[0];
126            let (h2, t2) = &window[1];
127
128            // Skip duplicate heights (shouldn't happen with deduplication, but be safe)
129            let height_diff = h2.saturating_sub(*h1);
130            if height_diff == 0 {
131                continue;
132            }
133
134            let time_diff_ms = (*t2 - *t1).num_milliseconds();
135            if time_diff_ms <= 0 {
136                continue; // Invalid time difference (clock skew or reorg)
137            }
138
139            // Normalize time difference by height difference for multi-block gaps
140            let ms_per_block = time_diff_ms / height_diff as i64;
141            total_delta_ms += ms_per_block;
142            delta_count += 1;
143        }
144
145        if delta_count == 0 {
146            return None;
147        }
148
149        let avg_ms = total_delta_ms as f64 / delta_count as f64;
150
151        // Validate: block time must be at least MIN_VALID_BLOCK_TIME_MS
152        // to avoid division issues with unrealistically small values
153        if avg_ms < MIN_VALID_BLOCK_TIME_MS {
154            return None;
155        }
156
157        Some(avg_ms / 1000.0)
158    }
159}
160
161/// Monitors block times and provides estimation utilities for order expiration.
162///
163/// Thread-safe component that tracks rolling average block times from WebSocket data.
164/// Uses atomic operations for the hot path (height reads) and a read-write lock for
165/// less frequent operations (window updates, time estimation).
166#[derive(Debug)]
167pub struct BlockTimeMonitor {
168    /// Current block height (atomic for fast reads on hot path).
169    current_height: AtomicU64,
170    /// Current block timestamp.
171    current_time: RwLock<Option<DateTime<Utc>>>,
172    /// Rolling window for block time averaging.
173    window: RwLock<BlockTimeWindow>,
174}
175
176impl Default for BlockTimeMonitor {
177    fn default() -> Self {
178        Self::new()
179    }
180}
181
182impl BlockTimeMonitor {
183    /// Creates a new [`BlockTimeMonitor`] with default window size.
184    #[must_use]
185    pub fn new() -> Self {
186        Self::with_window_size(DEFAULT_BLOCK_TIME_WINDOW_SIZE)
187    }
188
189    /// Creates a new [`BlockTimeMonitor`] with custom window size.
190    #[must_use]
191    pub fn with_window_size(window_size: usize) -> Self {
192        Self {
193            current_height: AtomicU64::new(0),
194            current_time: RwLock::new(None),
195            window: RwLock::new(BlockTimeWindow::new(window_size)),
196        }
197    }
198
199    /// Records a new block from WebSocket data.
200    ///
201    /// Should be called whenever a block height update is received.
202    /// Updates the current height atomically and adds the sample to the rolling window.
203    ///
204    /// # Panics
205    ///
206    /// Panics if the RwLock is poisoned (should never happen in practice).
207    pub fn record_block(&self, height: u64, time: DateTime<Utc>) {
208        // Update current height atomically (hot path)
209        self.current_height.store(height, Ordering::Release);
210
211        // Update current time
212        *self.current_time.write().expect("RwLock poisoned") = Some(time);
213
214        // Add to rolling window
215        self.window
216            .write()
217            .expect("RwLock poisoned")
218            .record(height, time);
219    }
220
221    /// Returns the current block height.
222    ///
223    /// This is a fast, lock-free read suitable for hot paths.
224    #[must_use]
225    pub fn current_block_height(&self) -> u64 {
226        self.current_height.load(Ordering::Acquire)
227    }
228
229    /// Returns the timestamp of the most recent block.
230    ///
231    /// # Panics
232    ///
233    /// Panics if the RwLock is poisoned (should never happen in practice).
234    #[must_use]
235    pub fn current_block_time(&self) -> Option<DateTime<Utc>> {
236        *self.current_time.read().expect("RwLock poisoned")
237    }
238
239    /// Returns the estimated seconds per block based on rolling average.
240    ///
241    /// Returns `None` if fewer than [`MIN_SAMPLES_FOR_ESTIMATE`] samples are available.
242    ///
243    /// # Panics
244    ///
245    /// Panics if the RwLock is poisoned (should never happen in practice).
246    #[must_use]
247    pub fn estimated_seconds_per_block(&self) -> Option<f64> {
248        self.window
249            .read()
250            .expect("RwLock poisoned")
251            .average_seconds_per_block()
252    }
253
254    /// Returns estimated seconds per block, falling back to default if unavailable.
255    ///
256    /// Uses [`DEFAULT_BLOCK_TIME_MS`] (500ms) when insufficient samples.
257    #[must_use]
258    pub fn seconds_per_block_or_default(&self) -> f64 {
259        self.estimated_seconds_per_block()
260            .unwrap_or(DEFAULT_BLOCK_TIME_MS as f64 / 1000.0)
261    }
262
263    /// Estimates how many blocks will occur in the given duration.
264    ///
265    /// Uses the rolling average if available, otherwise falls back to default block time.
266    /// Result is capped at `u32::MAX` to prevent overflow from edge cases.
267    #[must_use]
268    pub fn estimate_blocks_for_duration(&self, duration_secs: f64) -> u32 {
269        let secs_per_block = self.seconds_per_block_or_default();
270        let blocks = (duration_secs / secs_per_block).ceil();
271        // Cap at u32::MAX to prevent overflow from infinity or very large values
272        blocks.min(f64::from(u32::MAX)) as u32
273    }
274
275    /// Estimates the wall-clock time when a specific block height will be reached.
276    ///
277    /// Returns `None` if:
278    /// - Insufficient samples for reliable estimation
279    /// - No current block time available
280    /// - Target block is in the past
281    #[must_use]
282    pub fn estimate_expiry_time(&self, expiry_block: u64) -> Option<DateTime<Utc>> {
283        let current_height = self.current_block_height();
284        let current_time = self.current_block_time()?;
285        let secs_per_block = self.estimated_seconds_per_block()?;
286
287        if expiry_block <= current_height {
288            // Block already passed
289            return None;
290        }
291
292        let blocks_remaining = expiry_block - current_height;
293        let seconds_remaining = blocks_remaining as f64 * secs_per_block;
294
295        Some(
296            current_time
297                + chrono::Duration::milliseconds((seconds_remaining * 1000.0).round() as i64),
298        )
299    }
300
301    /// Estimates remaining lifetime in seconds for an order expiring at the given block.
302    ///
303    /// Returns `None` if insufficient data or block already passed.
304    #[must_use]
305    pub fn estimate_remaining_lifetime_secs(&self, expiry_block: u64) -> Option<f64> {
306        let current_height = self.current_block_height();
307
308        if expiry_block <= current_height {
309            return Some(0.0);
310        }
311
312        let blocks_remaining = expiry_block - current_height;
313        let secs_per_block = self.estimated_seconds_per_block()?;
314
315        Some(blocks_remaining as f64 * secs_per_block)
316    }
317
318    /// Returns `true` if the monitor has enough samples for reliable estimation.
319    ///
320    /// # Panics
321    ///
322    /// Panics if the RwLock is poisoned (should never happen in practice).
323    #[must_use]
324    pub fn is_ready(&self) -> bool {
325        self.window.read().expect("RwLock poisoned").sample_count() >= MIN_SAMPLES_FOR_ESTIMATE
326    }
327
328    /// Returns the number of samples collected in the rolling window.
329    ///
330    /// # Panics
331    ///
332    /// Panics if the RwLock is poisoned (should never happen in practice).
333    #[must_use]
334    pub fn sample_count(&self) -> usize {
335        self.window.read().expect("RwLock poisoned").sample_count()
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use chrono::Duration;
342    use rstest::rstest;
343
344    use super::*;
345
346    #[rstest]
347    fn test_new_monitor_not_ready() {
348        let monitor = BlockTimeMonitor::new();
349        assert!(!monitor.is_ready());
350        assert_eq!(monitor.current_block_height(), 0);
351        assert!(monitor.estimated_seconds_per_block().is_none());
352    }
353
354    #[rstest]
355    fn test_record_updates_height() {
356        let monitor = BlockTimeMonitor::new();
357        let now = Utc::now();
358
359        monitor.record_block(100, now);
360        assert_eq!(monitor.current_block_height(), 100);
361
362        monitor.record_block(101, now + Duration::milliseconds(500));
363        assert_eq!(monitor.current_block_height(), 101);
364    }
365
366    #[rstest]
367    fn test_seconds_per_block_or_default_before_ready() {
368        let monitor = BlockTimeMonitor::new();
369        let default = DEFAULT_BLOCK_TIME_MS as f64 / 1000.0;
370        assert!((monitor.seconds_per_block_or_default() - default).abs() < 0.001);
371    }
372
373    #[rstest]
374    fn test_becomes_ready_after_min_samples() {
375        let monitor = BlockTimeMonitor::new();
376        let mut time = Utc::now();
377
378        for i in 0..MIN_SAMPLES_FOR_ESTIMATE {
379            monitor.record_block(100 + i as u64, time);
380            time += Duration::milliseconds(500);
381        }
382
383        assert!(monitor.is_ready());
384    }
385
386    #[rstest]
387    fn test_average_block_time_calculation() {
388        let monitor = BlockTimeMonitor::new();
389        let mut time = Utc::now();
390        let block_time_ms = 500;
391
392        // Record enough samples with consistent 500ms block time
393        for i in 0..10 {
394            monitor.record_block(100 + i as u64, time);
395            time += Duration::milliseconds(block_time_ms);
396        }
397
398        let estimated = monitor.estimated_seconds_per_block().unwrap();
399        assert!(
400            (estimated - 0.5).abs() < 0.1,
401            "Expected ~0.5s, was {estimated}"
402        );
403    }
404
405    #[rstest]
406    fn test_estimate_blocks_for_duration() {
407        let monitor = BlockTimeMonitor::new();
408        let mut time = Utc::now();
409
410        // Set up with 500ms block time
411        for i in 0..10 {
412            monitor.record_block(100 + i as u64, time);
413            time += Duration::milliseconds(500);
414        }
415
416        // 10 seconds should be ~20 blocks at 500ms/block
417        let blocks = monitor.estimate_blocks_for_duration(10.0);
418        assert!((18..=22).contains(&blocks), "Expected ~20, was {blocks}");
419    }
420
421    #[rstest]
422    fn test_estimate_expiry_time() {
423        let monitor = BlockTimeMonitor::new();
424        let start_time = Utc::now();
425        let mut time = start_time;
426
427        // Set up with 500ms block time, ending at block 109
428        for i in 0..10 {
429            monitor.record_block(100 + i as u64, time);
430            time += Duration::milliseconds(500);
431        }
432
433        // After loop: current block is 109, current_block_time = time - 500ms
434        // Expiry at block 129 = 20 blocks from 109
435        let expiry_time = monitor.estimate_expiry_time(129).unwrap();
436        // Expected: current_block_time + (20 blocks * 500ms)
437        let current_block_time = time - Duration::milliseconds(500);
438        let expected = current_block_time + Duration::milliseconds(20 * 500);
439
440        let diff_ms = (expiry_time - expected).num_milliseconds().abs();
441        assert!(diff_ms < 1000, "Expected ~{expected}, was {expiry_time}");
442    }
443
444    #[rstest]
445    fn test_estimate_expiry_time_past_block() {
446        let monitor = BlockTimeMonitor::new();
447        let time = Utc::now();
448
449        monitor.record_block(100, time);
450
451        // Block 50 is in the past
452        assert!(monitor.estimate_expiry_time(50).is_none());
453    }
454
455    #[rstest]
456    fn test_estimate_remaining_lifetime() {
457        let monitor = BlockTimeMonitor::new();
458        let mut time = Utc::now();
459
460        // Set up with 500ms block time
461        for i in 0..10 {
462            monitor.record_block(100 + i as u64, time);
463            time += Duration::milliseconds(500);
464        }
465
466        // Current height is 109, expiry at 129 (20 blocks)
467        let remaining = monitor.estimate_remaining_lifetime_secs(129).unwrap();
468        assert!(
469            (remaining - 10.0).abs() < 1.0,
470            "Expected ~10s, was {remaining}"
471        );
472    }
473
474    #[rstest]
475    fn test_circular_buffer_wraps() {
476        let monitor = BlockTimeMonitor::with_window_size(5);
477        let mut time = Utc::now();
478
479        // Record more samples than window size
480        for i in 0..10 {
481            monitor.record_block(100 + i as u64, time);
482            time += Duration::milliseconds(500);
483        }
484
485        // Should still have only 5 samples
486        assert_eq!(monitor.sample_count(), 5);
487        assert!(monitor.is_ready());
488    }
489
490    #[rstest]
491    fn test_handles_non_consecutive_blocks() {
492        let monitor = BlockTimeMonitor::new();
493        let mut time = Utc::now();
494
495        // Record blocks with a gap (100, 101, 102, 105, 106)
496        monitor.record_block(100, time);
497        time += Duration::milliseconds(500);
498        monitor.record_block(101, time);
499        time += Duration::milliseconds(500);
500        monitor.record_block(102, time);
501        time += Duration::milliseconds(1500); // Skip 3 blocks
502        monitor.record_block(105, time);
503        time += Duration::milliseconds(500);
504        monitor.record_block(106, time);
505
506        // Should still calculate a reasonable estimate
507        assert!(monitor.is_ready());
508        let estimated = monitor.estimated_seconds_per_block().unwrap();
509        // Expect ~500ms per block even with the gap
510        assert!(
511            (estimated - 0.5).abs() < 0.2,
512            "Expected ~0.5s, was {estimated}"
513        );
514    }
515
516    #[rstest]
517    fn test_deduplicates_same_block_height() {
518        let monitor = BlockTimeMonitor::with_window_size(10);
519        let time = Utc::now();
520
521        // Record same block height multiple times (rapid replay scenario)
522        monitor.record_block(100, time);
523        monitor.record_block(100, time + Duration::milliseconds(10));
524        monitor.record_block(100, time + Duration::milliseconds(20));
525        monitor.record_block(100, time + Duration::milliseconds(30));
526
527        // Should only have 1 sample due to deduplication
528        assert_eq!(monitor.sample_count(), 1);
529    }
530
531    #[rstest]
532    fn test_rapid_replay_bounded_memory() {
533        let monitor = BlockTimeMonitor::with_window_size(5);
534        let mut time = Utc::now();
535
536        // Simulate rapid replay: 1000 block updates
537        for i in 0..1000 {
538            monitor.record_block(100 + i as u64, time);
539            time += Duration::milliseconds(500);
540        }
541
542        // Buffer should never exceed capacity
543        assert_eq!(monitor.sample_count(), 5);
544        assert!(monitor.is_ready());
545
546        // Estimate should still be valid
547        let estimated = monitor.estimated_seconds_per_block().unwrap();
548        assert!(
549            (estimated - 0.5).abs() < 0.1,
550            "Expected ~0.5s, was {estimated}"
551        );
552    }
553
554    #[rstest]
555    fn test_rapid_replay_with_duplicate_heights() {
556        let monitor = BlockTimeMonitor::with_window_size(10);
557        let mut time = Utc::now();
558
559        // Simulate rapid replay with duplicates: each block reported 3 times
560        for block in 100..110 {
561            for _ in 0..3 {
562                monitor.record_block(block, time);
563                time += Duration::milliseconds(100);
564            }
565            time += Duration::milliseconds(200); // Actual block time ~500ms
566        }
567
568        // Should have exactly 10 samples (one per unique block)
569        assert_eq!(monitor.sample_count(), 10);
570    }
571
572    #[rstest]
573    fn test_rejects_unrealistically_small_block_times() {
574        let monitor = BlockTimeMonitor::with_window_size(10);
575        let time = Utc::now();
576
577        // Record blocks with extremely small time differences (1ms per block)
578        // This is unrealistic and should be rejected
579        for i in 0..10 {
580            monitor.record_block(100 + i as u64, time + Duration::milliseconds(i));
581        }
582
583        // Should have samples but estimated time should be None (below threshold)
584        assert!(monitor.is_ready());
585        assert!(
586            monitor.estimated_seconds_per_block().is_none(),
587            "Expected None for unrealistically small block times"
588        );
589
590        // Should fall back to default
591        let default = super::DEFAULT_BLOCK_TIME_MS as f64 / 1000.0;
592        assert!((monitor.seconds_per_block_or_default() - default).abs() < 0.001);
593    }
594
595    #[rstest]
596    fn test_estimate_blocks_handles_large_duration() {
597        let monitor = BlockTimeMonitor::new();
598        // Without samples, uses default (0.5s per block)
599
600        // Very large duration should not overflow
601        let blocks = monitor.estimate_blocks_for_duration(f64::MAX);
602        assert_eq!(blocks, u32::MAX);
603    }
604
605    #[rstest]
606    fn test_estimate_blocks_handles_zero_duration() {
607        let monitor = BlockTimeMonitor::new();
608
609        let blocks = monitor.estimate_blocks_for_duration(0.0);
610        assert_eq!(blocks, 0);
611    }
612}