nautilus_dydx/execution/
block_time.rs1use std::{
29 collections::VecDeque,
30 sync::{
31 RwLock,
32 atomic::{AtomicU64, Ordering},
33 },
34};
35
36use chrono::{DateTime, Utc};
37
38pub const DEFAULT_BLOCK_TIME_WINDOW_SIZE: usize = 100;
42
43pub const DEFAULT_BLOCK_TIME_MS: u64 = 500;
47
48pub const MIN_SAMPLES_FOR_ESTIMATE: usize = 5;
53
54pub const MIN_VALID_BLOCK_TIME_MS: f64 = 50.0;
60
61#[derive(Debug)]
66struct BlockTimeWindow {
67 samples: VecDeque<(u64, DateTime<Utc>)>,
69 capacity: usize,
71 last_height: Option<u64>,
73}
74
75impl BlockTimeWindow {
76 fn new(capacity: usize) -> Self {
78 Self {
79 samples: VecDeque::with_capacity(capacity),
80 capacity,
81 last_height: None,
82 }
83 }
84
85 fn record(&mut self, height: u64, time: DateTime<Utc>) {
90 if self.last_height == Some(height) {
92 return;
93 }
94 self.last_height = Some(height);
95
96 if self.samples.len() >= self.capacity {
98 self.samples.pop_front();
99 }
100 self.samples.push_back((height, time));
101 }
102
103 fn sample_count(&self) -> usize {
105 self.samples.len()
106 }
107
108 fn average_seconds_per_block(&self) -> Option<f64> {
112 let sample_count = self.sample_count();
113 if sample_count < MIN_SAMPLES_FOR_ESTIMATE {
114 return None;
115 }
116
117 let mut sorted: Vec<_> = self.samples.iter().copied().collect();
119 sorted.sort_by_key(|(height, _)| *height);
120
121 let mut total_delta_ms: i64 = 0;
122 let mut delta_count: usize = 0;
123
124 for window in sorted.windows(2) {
125 let (h1, t1) = &window[0];
126 let (h2, t2) = &window[1];
127
128 let height_diff = h2.saturating_sub(*h1);
130 if height_diff == 0 {
131 continue;
132 }
133
134 let time_diff_ms = (*t2 - *t1).num_milliseconds();
135 if time_diff_ms <= 0 {
136 continue; }
138
139 let ms_per_block = time_diff_ms / height_diff as i64;
141 total_delta_ms += ms_per_block;
142 delta_count += 1;
143 }
144
145 if delta_count == 0 {
146 return None;
147 }
148
149 let avg_ms = total_delta_ms as f64 / delta_count as f64;
150
151 if avg_ms < MIN_VALID_BLOCK_TIME_MS {
154 return None;
155 }
156
157 Some(avg_ms / 1000.0)
158 }
159}
160
161#[derive(Debug)]
167pub struct BlockTimeMonitor {
168 current_height: AtomicU64,
170 current_time: RwLock<Option<DateTime<Utc>>>,
172 window: RwLock<BlockTimeWindow>,
174}
175
176impl Default for BlockTimeMonitor {
177 fn default() -> Self {
178 Self::new()
179 }
180}
181
182impl BlockTimeMonitor {
183 #[must_use]
185 pub fn new() -> Self {
186 Self::with_window_size(DEFAULT_BLOCK_TIME_WINDOW_SIZE)
187 }
188
189 #[must_use]
191 pub fn with_window_size(window_size: usize) -> Self {
192 Self {
193 current_height: AtomicU64::new(0),
194 current_time: RwLock::new(None),
195 window: RwLock::new(BlockTimeWindow::new(window_size)),
196 }
197 }
198
199 pub fn record_block(&self, height: u64, time: DateTime<Utc>) {
208 self.current_height.store(height, Ordering::Release);
210
211 *self.current_time.write().expect("RwLock poisoned") = Some(time);
213
214 self.window
216 .write()
217 .expect("RwLock poisoned")
218 .record(height, time);
219 }
220
221 #[must_use]
225 pub fn current_block_height(&self) -> u64 {
226 self.current_height.load(Ordering::Acquire)
227 }
228
229 #[must_use]
235 pub fn current_block_time(&self) -> Option<DateTime<Utc>> {
236 *self.current_time.read().expect("RwLock poisoned")
237 }
238
239 #[must_use]
247 pub fn estimated_seconds_per_block(&self) -> Option<f64> {
248 self.window
249 .read()
250 .expect("RwLock poisoned")
251 .average_seconds_per_block()
252 }
253
254 #[must_use]
258 pub fn seconds_per_block_or_default(&self) -> f64 {
259 self.estimated_seconds_per_block()
260 .unwrap_or(DEFAULT_BLOCK_TIME_MS as f64 / 1000.0)
261 }
262
263 #[must_use]
268 pub fn estimate_blocks_for_duration(&self, duration_secs: f64) -> u32 {
269 let secs_per_block = self.seconds_per_block_or_default();
270 let blocks = (duration_secs / secs_per_block).ceil();
271 blocks.min(f64::from(u32::MAX)) as u32
273 }
274
275 #[must_use]
282 pub fn estimate_expiry_time(&self, expiry_block: u64) -> Option<DateTime<Utc>> {
283 let current_height = self.current_block_height();
284 let current_time = self.current_block_time()?;
285 let secs_per_block = self.estimated_seconds_per_block()?;
286
287 if expiry_block <= current_height {
288 return None;
290 }
291
292 let blocks_remaining = expiry_block - current_height;
293 let seconds_remaining = blocks_remaining as f64 * secs_per_block;
294
295 Some(
296 current_time
297 + chrono::Duration::milliseconds((seconds_remaining * 1000.0).round() as i64),
298 )
299 }
300
301 #[must_use]
305 pub fn estimate_remaining_lifetime_secs(&self, expiry_block: u64) -> Option<f64> {
306 let current_height = self.current_block_height();
307
308 if expiry_block <= current_height {
309 return Some(0.0);
310 }
311
312 let blocks_remaining = expiry_block - current_height;
313 let secs_per_block = self.estimated_seconds_per_block()?;
314
315 Some(blocks_remaining as f64 * secs_per_block)
316 }
317
318 #[must_use]
324 pub fn is_ready(&self) -> bool {
325 self.window.read().expect("RwLock poisoned").sample_count() >= MIN_SAMPLES_FOR_ESTIMATE
326 }
327
328 #[must_use]
334 pub fn sample_count(&self) -> usize {
335 self.window.read().expect("RwLock poisoned").sample_count()
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use chrono::Duration;
342 use rstest::rstest;
343
344 use super::*;
345
346 #[rstest]
347 fn test_new_monitor_not_ready() {
348 let monitor = BlockTimeMonitor::new();
349 assert!(!monitor.is_ready());
350 assert_eq!(monitor.current_block_height(), 0);
351 assert!(monitor.estimated_seconds_per_block().is_none());
352 }
353
354 #[rstest]
355 fn test_record_updates_height() {
356 let monitor = BlockTimeMonitor::new();
357 let now = Utc::now();
358
359 monitor.record_block(100, now);
360 assert_eq!(monitor.current_block_height(), 100);
361
362 monitor.record_block(101, now + Duration::milliseconds(500));
363 assert_eq!(monitor.current_block_height(), 101);
364 }
365
366 #[rstest]
367 fn test_seconds_per_block_or_default_before_ready() {
368 let monitor = BlockTimeMonitor::new();
369 let default = DEFAULT_BLOCK_TIME_MS as f64 / 1000.0;
370 assert!((monitor.seconds_per_block_or_default() - default).abs() < 0.001);
371 }
372
373 #[rstest]
374 fn test_becomes_ready_after_min_samples() {
375 let monitor = BlockTimeMonitor::new();
376 let mut time = Utc::now();
377
378 for i in 0..MIN_SAMPLES_FOR_ESTIMATE {
379 monitor.record_block(100 + i as u64, time);
380 time += Duration::milliseconds(500);
381 }
382
383 assert!(monitor.is_ready());
384 }
385
386 #[rstest]
387 fn test_average_block_time_calculation() {
388 let monitor = BlockTimeMonitor::new();
389 let mut time = Utc::now();
390 let block_time_ms = 500;
391
392 for i in 0..10 {
394 monitor.record_block(100 + i as u64, time);
395 time += Duration::milliseconds(block_time_ms);
396 }
397
398 let estimated = monitor.estimated_seconds_per_block().unwrap();
399 assert!(
400 (estimated - 0.5).abs() < 0.1,
401 "Expected ~0.5s, was {estimated}"
402 );
403 }
404
405 #[rstest]
406 fn test_estimate_blocks_for_duration() {
407 let monitor = BlockTimeMonitor::new();
408 let mut time = Utc::now();
409
410 for i in 0..10 {
412 monitor.record_block(100 + i as u64, time);
413 time += Duration::milliseconds(500);
414 }
415
416 let blocks = monitor.estimate_blocks_for_duration(10.0);
418 assert!((18..=22).contains(&blocks), "Expected ~20, was {blocks}");
419 }
420
421 #[rstest]
422 fn test_estimate_expiry_time() {
423 let monitor = BlockTimeMonitor::new();
424 let start_time = Utc::now();
425 let mut time = start_time;
426
427 for i in 0..10 {
429 monitor.record_block(100 + i as u64, time);
430 time += Duration::milliseconds(500);
431 }
432
433 let expiry_time = monitor.estimate_expiry_time(129).unwrap();
436 let current_block_time = time - Duration::milliseconds(500);
438 let expected = current_block_time + Duration::milliseconds(20 * 500);
439
440 let diff_ms = (expiry_time - expected).num_milliseconds().abs();
441 assert!(diff_ms < 1000, "Expected ~{expected}, was {expiry_time}");
442 }
443
444 #[rstest]
445 fn test_estimate_expiry_time_past_block() {
446 let monitor = BlockTimeMonitor::new();
447 let time = Utc::now();
448
449 monitor.record_block(100, time);
450
451 assert!(monitor.estimate_expiry_time(50).is_none());
453 }
454
455 #[rstest]
456 fn test_estimate_remaining_lifetime() {
457 let monitor = BlockTimeMonitor::new();
458 let mut time = Utc::now();
459
460 for i in 0..10 {
462 monitor.record_block(100 + i as u64, time);
463 time += Duration::milliseconds(500);
464 }
465
466 let remaining = monitor.estimate_remaining_lifetime_secs(129).unwrap();
468 assert!(
469 (remaining - 10.0).abs() < 1.0,
470 "Expected ~10s, was {remaining}"
471 );
472 }
473
474 #[rstest]
475 fn test_circular_buffer_wraps() {
476 let monitor = BlockTimeMonitor::with_window_size(5);
477 let mut time = Utc::now();
478
479 for i in 0..10 {
481 monitor.record_block(100 + i as u64, time);
482 time += Duration::milliseconds(500);
483 }
484
485 assert_eq!(monitor.sample_count(), 5);
487 assert!(monitor.is_ready());
488 }
489
490 #[rstest]
491 fn test_handles_non_consecutive_blocks() {
492 let monitor = BlockTimeMonitor::new();
493 let mut time = Utc::now();
494
495 monitor.record_block(100, time);
497 time += Duration::milliseconds(500);
498 monitor.record_block(101, time);
499 time += Duration::milliseconds(500);
500 monitor.record_block(102, time);
501 time += Duration::milliseconds(1500); monitor.record_block(105, time);
503 time += Duration::milliseconds(500);
504 monitor.record_block(106, time);
505
506 assert!(monitor.is_ready());
508 let estimated = monitor.estimated_seconds_per_block().unwrap();
509 assert!(
511 (estimated - 0.5).abs() < 0.2,
512 "Expected ~0.5s, was {estimated}"
513 );
514 }
515
516 #[rstest]
517 fn test_deduplicates_same_block_height() {
518 let monitor = BlockTimeMonitor::with_window_size(10);
519 let time = Utc::now();
520
521 monitor.record_block(100, time);
523 monitor.record_block(100, time + Duration::milliseconds(10));
524 monitor.record_block(100, time + Duration::milliseconds(20));
525 monitor.record_block(100, time + Duration::milliseconds(30));
526
527 assert_eq!(monitor.sample_count(), 1);
529 }
530
531 #[rstest]
532 fn test_rapid_replay_bounded_memory() {
533 let monitor = BlockTimeMonitor::with_window_size(5);
534 let mut time = Utc::now();
535
536 for i in 0..1000 {
538 monitor.record_block(100 + i as u64, time);
539 time += Duration::milliseconds(500);
540 }
541
542 assert_eq!(monitor.sample_count(), 5);
544 assert!(monitor.is_ready());
545
546 let estimated = monitor.estimated_seconds_per_block().unwrap();
548 assert!(
549 (estimated - 0.5).abs() < 0.1,
550 "Expected ~0.5s, was {estimated}"
551 );
552 }
553
554 #[rstest]
555 fn test_rapid_replay_with_duplicate_heights() {
556 let monitor = BlockTimeMonitor::with_window_size(10);
557 let mut time = Utc::now();
558
559 for block in 100..110 {
561 for _ in 0..3 {
562 monitor.record_block(block, time);
563 time += Duration::milliseconds(100);
564 }
565 time += Duration::milliseconds(200); }
567
568 assert_eq!(monitor.sample_count(), 10);
570 }
571
572 #[rstest]
573 fn test_rejects_unrealistically_small_block_times() {
574 let monitor = BlockTimeMonitor::with_window_size(10);
575 let time = Utc::now();
576
577 for i in 0..10 {
580 monitor.record_block(100 + i as u64, time + Duration::milliseconds(i));
581 }
582
583 assert!(monitor.is_ready());
585 assert!(
586 monitor.estimated_seconds_per_block().is_none(),
587 "Expected None for unrealistically small block times"
588 );
589
590 let default = super::DEFAULT_BLOCK_TIME_MS as f64 / 1000.0;
592 assert!((monitor.seconds_per_block_or_default() - default).abs() < 0.001);
593 }
594
595 #[rstest]
596 fn test_estimate_blocks_handles_large_duration() {
597 let monitor = BlockTimeMonitor::new();
598 let blocks = monitor.estimate_blocks_for_duration(f64::MAX);
602 assert_eq!(blocks, u32::MAX);
603 }
604
605 #[rstest]
606 fn test_estimate_blocks_handles_zero_duration() {
607 let monitor = BlockTimeMonitor::new();
608
609 let blocks = monitor.estimate_blocks_for_duration(0.0);
610 assert_eq!(blocks, 0);
611 }
612}