nautilus_tardis/
replay.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    fs,
19    path::{Path, PathBuf},
20};
21
22use anyhow::Context;
23use arrow::record_batch::RecordBatch;
24use chrono::{DateTime, Duration, NaiveDate};
25use futures_util::{StreamExt, future::join_all, pin_mut};
26use heck::ToSnakeCase;
27use nautilus_core::{UnixNanos, datetime::unix_nanos_to_iso8601, parsing::precision_from_str};
28use nautilus_model::{
29    data::{
30        Bar, BarType, Data, OrderBookDelta, OrderBookDeltas_API, OrderBookDepth10, QuoteTick,
31        TradeTick,
32    },
33    identifiers::InstrumentId,
34};
35use nautilus_serialization::arrow::{
36    bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
37    book_depth10_to_arrow_record_batch_bytes, quotes_to_arrow_record_batch_bytes,
38    trades_to_arrow_record_batch_bytes,
39};
40use parquet::{arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties};
41use thousands::Separable;
42use ustr::Ustr;
43
44use super::{enums::TardisExchange, http::models::TardisInstrumentInfo};
45use crate::{
46    config::{BookSnapshotOutput, TardisReplayConfig},
47    http::TardisHttpClient,
48    machine::{TardisMachineClient, types::TardisInstrumentMiniInfo},
49    parse::{normalize_instrument_id, parse_instrument_id},
50};
51
52struct DateCursor {
53    /// Cursor date UTC.
54    date_utc: NaiveDate,
55    /// Cursor end timestamp UNIX nanoseconds.
56    end_ns: UnixNanos,
57}
58
59impl DateCursor {
60    /// Creates a new [`DateCursor`] instance.
61    fn new(current_ns: UnixNanos) -> Self {
62        let current_utc = DateTime::from_timestamp_nanos(current_ns.as_i64());
63        let date_utc = current_utc.date_naive();
64
65        // Calculate end of the current UTC day
66        // SAFETY: Known safe input values
67        let end_utc =
68            date_utc.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
69        let end_ns = UnixNanos::from(end_utc.and_utc().timestamp_nanos_opt().unwrap() as u64);
70
71        Self { date_utc, end_ns }
72    }
73}
74
75async fn gather_instruments_info(
76    config: &TardisReplayConfig,
77    http_client: &TardisHttpClient,
78) -> HashMap<TardisExchange, Vec<TardisInstrumentInfo>> {
79    let futures = config.options.iter().map(|options| {
80        let exchange = options.exchange;
81        let client = &http_client;
82
83        tracing::info!("Requesting instruments for {exchange}");
84
85        async move {
86            match client.instruments_info(exchange, None, None).await {
87                Ok(instruments) => Some((exchange, instruments)),
88                Err(e) => {
89                    tracing::error!("Error fetching instruments for {exchange}: {e}");
90                    None
91                }
92            }
93        }
94    });
95
96    let results: HashMap<TardisExchange, Vec<TardisInstrumentInfo>> =
97        join_all(futures).await.into_iter().flatten().collect();
98
99    tracing::info!("Received all instruments");
100
101    results
102}
103
104/// Run the Tardis Machine replay from a JSON configuration file.
105///
106/// # Errors
107///
108/// Returns an error if reading or parsing the config file fails,
109/// or if any downstream replay operation fails.
110/// Run the Tardis Machine replay from a JSON configuration file.
111///
112/// # Panics
113///
114/// Panics if unable to determine the output path (current directory fallback fails).
115pub async fn run_tardis_machine_replay_from_config(config_filepath: &Path) -> anyhow::Result<()> {
116    tracing::info!("Starting replay");
117    tracing::info!("Config filepath: {config_filepath:?}");
118
119    // Load and parse the replay configuration
120    let config_data = fs::read_to_string(config_filepath)
121        .with_context(|| format!("Failed to read config file: {config_filepath:?}"))?;
122    let config: TardisReplayConfig = serde_json::from_str(&config_data)
123        .context("failed to parse config JSON into TardisReplayConfig")?;
124
125    let path = config
126        .output_path
127        .as_deref()
128        .map(Path::new)
129        .map(Path::to_path_buf)
130        .or_else(|| {
131            std::env::var("NAUTILUS_PATH")
132                .ok()
133                .map(|env_path| PathBuf::from(env_path).join("catalog").join("data"))
134        })
135        .unwrap_or_else(|| std::env::current_dir().expect("Failed to get current directory"));
136
137    tracing::info!("Output path: {path:?}");
138
139    let normalize_symbols = config.normalize_symbols.unwrap_or(true);
140    tracing::info!("normalize_symbols={normalize_symbols}");
141
142    let book_snapshot_output = config
143        .book_snapshot_output
144        .clone()
145        .unwrap_or(BookSnapshotOutput::Deltas);
146    tracing::info!("book_snapshot_output={book_snapshot_output:?}");
147
148    let http_client = TardisHttpClient::new(None, None, None, normalize_symbols)?;
149    let mut machine_client = TardisMachineClient::new(
150        config.tardis_ws_url.as_deref(),
151        normalize_symbols,
152        book_snapshot_output,
153    )?;
154
155    let info_map = gather_instruments_info(&config, &http_client).await;
156
157    for (exchange, instruments) in &info_map {
158        for inst in instruments {
159            let instrument_type = inst.instrument_type;
160            let price_precision = precision_from_str(&inst.price_increment.to_string());
161            let size_precision = precision_from_str(&inst.amount_increment.to_string());
162
163            let instrument_id = if normalize_symbols {
164                normalize_instrument_id(exchange, inst.id, &instrument_type, inst.inverse)
165            } else {
166                parse_instrument_id(exchange, inst.id)
167            };
168
169            let info = TardisInstrumentMiniInfo::new(
170                instrument_id,
171                Some(Ustr::from(&inst.id)),
172                *exchange,
173                price_precision,
174                size_precision,
175            );
176            machine_client.add_instrument_info(info);
177        }
178    }
179
180    tracing::info!("Starting tardis-machine stream");
181    let stream = machine_client.replay(config.options).await?;
182    pin_mut!(stream);
183
184    // Initialize date cursors
185    let mut deltas_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
186    let mut depths_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
187    let mut quotes_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
188    let mut trades_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
189    let mut bars_cursors: HashMap<BarType, DateCursor> = HashMap::new();
190
191    // Initialize date collection maps
192    let mut deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>> = HashMap::new();
193    let mut depths_map: HashMap<InstrumentId, Vec<OrderBookDepth10>> = HashMap::new();
194    let mut quotes_map: HashMap<InstrumentId, Vec<QuoteTick>> = HashMap::new();
195    let mut trades_map: HashMap<InstrumentId, Vec<TradeTick>> = HashMap::new();
196    let mut bars_map: HashMap<BarType, Vec<Bar>> = HashMap::new();
197
198    let mut msg_count = 0;
199
200    while let Some(result) = stream.next().await {
201        match result {
202            Ok(msg) => {
203                match msg {
204                    Data::Deltas(msg) => {
205                        handle_deltas_msg(msg, &mut deltas_map, &mut deltas_cursors, &path);
206                    }
207                    Data::Depth10(msg) => {
208                        handle_depth10_msg(*msg, &mut depths_map, &mut depths_cursors, &path);
209                    }
210                    Data::Quote(msg) => {
211                        handle_quote_msg(msg, &mut quotes_map, &mut quotes_cursors, &path);
212                    }
213                    Data::Trade(msg) => {
214                        handle_trade_msg(msg, &mut trades_map, &mut trades_cursors, &path);
215                    }
216                    Data::Bar(msg) => handle_bar_msg(msg, &mut bars_map, &mut bars_cursors, &path),
217                    Data::Delta(delta) => {
218                        tracing::warn!(
219                            "Skipping individual delta message for {} (use Deltas batch instead)",
220                            delta.instrument_id
221                        );
222                    }
223                    Data::MarkPriceUpdate(_)
224                    | Data::IndexPriceUpdate(_)
225                    | Data::InstrumentClose(_) => {
226                        tracing::debug!(
227                            "Skipping unsupported data type for instrument {}",
228                            msg.instrument_id()
229                        );
230                    }
231                }
232
233                msg_count += 1;
234                if msg_count % 100_000 == 0 {
235                    tracing::debug!("Processed {} messages", msg_count.separate_with_commas());
236                }
237            }
238            Err(e) => {
239                tracing::error!("Stream error: {e:?}");
240                break;
241            }
242        }
243    }
244
245    // Iterate through every remaining type and instrument sequentially
246
247    for (instrument_id, deltas) in deltas_map {
248        let cursor = deltas_cursors.get(&instrument_id).expect("Expected cursor");
249        batch_and_write_deltas(deltas, &instrument_id, cursor.date_utc, &path);
250    }
251
252    for (instrument_id, depths) in depths_map {
253        let cursor = depths_cursors.get(&instrument_id).expect("Expected cursor");
254        batch_and_write_depths(depths, &instrument_id, cursor.date_utc, &path);
255    }
256
257    for (instrument_id, quotes) in quotes_map {
258        let cursor = quotes_cursors.get(&instrument_id).expect("Expected cursor");
259        batch_and_write_quotes(quotes, &instrument_id, cursor.date_utc, &path);
260    }
261
262    for (instrument_id, trades) in trades_map {
263        let cursor = trades_cursors.get(&instrument_id).expect("Expected cursor");
264        batch_and_write_trades(trades, &instrument_id, cursor.date_utc, &path);
265    }
266
267    for (bar_type, bars) in bars_map {
268        let cursor = bars_cursors.get(&bar_type).expect("Expected cursor");
269        batch_and_write_bars(bars, &bar_type, cursor.date_utc, &path);
270    }
271
272    tracing::info!(
273        "Replay completed after {} messages",
274        msg_count.separate_with_commas()
275    );
276    Ok(())
277}
278
279fn handle_deltas_msg(
280    deltas: OrderBookDeltas_API,
281    map: &mut HashMap<InstrumentId, Vec<OrderBookDelta>>,
282    cursors: &mut HashMap<InstrumentId, DateCursor>,
283    path: &Path,
284) {
285    let cursor = cursors
286        .entry(deltas.instrument_id)
287        .or_insert_with(|| DateCursor::new(deltas.ts_init));
288
289    if deltas.ts_init > cursor.end_ns {
290        if let Some(deltas_vec) = map.remove(&deltas.instrument_id) {
291            batch_and_write_deltas(deltas_vec, &deltas.instrument_id, cursor.date_utc, path);
292        }
293        // Update cursor
294        *cursor = DateCursor::new(deltas.ts_init);
295    }
296
297    map.entry(deltas.instrument_id)
298        .or_insert_with(|| Vec::with_capacity(100_000))
299        .extend(&*deltas.deltas);
300}
301
302fn handle_depth10_msg(
303    depth10: OrderBookDepth10,
304    map: &mut HashMap<InstrumentId, Vec<OrderBookDepth10>>,
305    cursors: &mut HashMap<InstrumentId, DateCursor>,
306    path: &Path,
307) {
308    let cursor = cursors
309        .entry(depth10.instrument_id)
310        .or_insert_with(|| DateCursor::new(depth10.ts_init));
311
312    if depth10.ts_init > cursor.end_ns {
313        if let Some(depths_vec) = map.remove(&depth10.instrument_id) {
314            batch_and_write_depths(depths_vec, &depth10.instrument_id, cursor.date_utc, path);
315        }
316        // Update cursor
317        *cursor = DateCursor::new(depth10.ts_init);
318    }
319
320    map.entry(depth10.instrument_id)
321        .or_insert_with(|| Vec::with_capacity(100_000))
322        .push(depth10);
323}
324
325fn handle_quote_msg(
326    quote: QuoteTick,
327    map: &mut HashMap<InstrumentId, Vec<QuoteTick>>,
328    cursors: &mut HashMap<InstrumentId, DateCursor>,
329    path: &Path,
330) {
331    let cursor = cursors
332        .entry(quote.instrument_id)
333        .or_insert_with(|| DateCursor::new(quote.ts_init));
334
335    if quote.ts_init > cursor.end_ns {
336        if let Some(quotes_vec) = map.remove(&quote.instrument_id) {
337            batch_and_write_quotes(quotes_vec, &quote.instrument_id, cursor.date_utc, path);
338        }
339        // Update cursor
340        *cursor = DateCursor::new(quote.ts_init);
341    }
342
343    map.entry(quote.instrument_id)
344        .or_insert_with(|| Vec::with_capacity(100_000))
345        .push(quote);
346}
347
348fn handle_trade_msg(
349    trade: TradeTick,
350    map: &mut HashMap<InstrumentId, Vec<TradeTick>>,
351    cursors: &mut HashMap<InstrumentId, DateCursor>,
352    path: &Path,
353) {
354    let cursor = cursors
355        .entry(trade.instrument_id)
356        .or_insert_with(|| DateCursor::new(trade.ts_init));
357
358    if trade.ts_init > cursor.end_ns {
359        if let Some(trades_vec) = map.remove(&trade.instrument_id) {
360            batch_and_write_trades(trades_vec, &trade.instrument_id, cursor.date_utc, path);
361        }
362        // Update cursor
363        *cursor = DateCursor::new(trade.ts_init);
364    }
365
366    map.entry(trade.instrument_id)
367        .or_insert_with(|| Vec::with_capacity(100_000))
368        .push(trade);
369}
370
371fn handle_bar_msg(
372    bar: Bar,
373    map: &mut HashMap<BarType, Vec<Bar>>,
374    cursors: &mut HashMap<BarType, DateCursor>,
375    path: &Path,
376) {
377    let cursor = cursors
378        .entry(bar.bar_type)
379        .or_insert_with(|| DateCursor::new(bar.ts_init));
380
381    if bar.ts_init > cursor.end_ns {
382        if let Some(bars_vec) = map.remove(&bar.bar_type) {
383            batch_and_write_bars(bars_vec, &bar.bar_type, cursor.date_utc, path);
384        }
385        // Update cursor
386        *cursor = DateCursor::new(bar.ts_init);
387    }
388
389    map.entry(bar.bar_type)
390        .or_insert_with(|| Vec::with_capacity(100_000))
391        .push(bar);
392}
393
394fn batch_and_write_deltas(
395    deltas: Vec<OrderBookDelta>,
396    instrument_id: &InstrumentId,
397    date: NaiveDate,
398    path: &Path,
399) {
400    let typename = stringify!(OrderBookDeltas);
401    match book_deltas_to_arrow_record_batch_bytes(deltas) {
402        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
403        Err(e) => {
404            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
405        }
406    }
407}
408
409fn batch_and_write_depths(
410    depths: Vec<OrderBookDepth10>,
411    instrument_id: &InstrumentId,
412    date: NaiveDate,
413    path: &Path,
414) {
415    // Use "order_book_depths" to match catalog path prefix
416    let typename = "order_book_depths";
417    match book_depth10_to_arrow_record_batch_bytes(depths) {
418        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
419        Err(e) => {
420            tracing::error!("Error converting OrderBookDepth10 to Arrow: {e:?}");
421        }
422    }
423}
424
425fn batch_and_write_quotes(
426    quotes: Vec<QuoteTick>,
427    instrument_id: &InstrumentId,
428    date: NaiveDate,
429    path: &Path,
430) {
431    let typename = stringify!(QuoteTick);
432    match quotes_to_arrow_record_batch_bytes(quotes) {
433        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
434        Err(e) => {
435            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
436        }
437    }
438}
439
440fn batch_and_write_trades(
441    trades: Vec<TradeTick>,
442    instrument_id: &InstrumentId,
443    date: NaiveDate,
444    path: &Path,
445) {
446    let typename = stringify!(TradeTick);
447    match trades_to_arrow_record_batch_bytes(trades) {
448        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
449        Err(e) => {
450            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
451        }
452    }
453}
454
455fn batch_and_write_bars(bars: Vec<Bar>, bar_type: &BarType, date: NaiveDate, path: &Path) {
456    let typename = stringify!(Bar);
457    let batch = match bars_to_arrow_record_batch_bytes(bars) {
458        Ok(batch) => batch,
459        Err(e) => {
460            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
461            return;
462        }
463    };
464
465    let filepath = path.join(parquet_filepath_bars(bar_type, date));
466    if let Err(e) = write_parquet_local(batch, &filepath) {
467        tracing::error!("Error writing {filepath:?}: {e:?}");
468    } else {
469        tracing::info!("File written: {filepath:?}");
470    }
471}
472
473/// Asserts that the given date is on or after the UNIX epoch (1970-01-01).
474///
475/// # Panics
476///
477/// Panics if the date is before 1970-01-01, as pre-epoch dates cannot be
478/// reliably represented as UnixNanos without overflow issues.
479fn assert_post_epoch(date: NaiveDate) {
480    let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("UNIX epoch must exist");
481    if date < epoch {
482        panic!("Tardis replay filenames require dates on or after 1970-01-01; received {date}");
483    }
484}
485
486/// Converts an ISO 8601 timestamp to a filesystem-safe format.
487///
488/// This function replaces colons and dots with hyphens to make the timestamp
489/// safe for use in filenames across different filesystems.
490fn iso_timestamp_to_file_timestamp(iso_timestamp: &str) -> String {
491    iso_timestamp.replace([':', '.'], "-")
492}
493
494/// Converts timestamps to a filename using ISO 8601 format.
495///
496/// This function converts two Unix nanosecond timestamps to a filename that uses
497/// ISO 8601 format with filesystem-safe characters, matching the catalog convention.
498fn timestamps_to_filename(timestamp_1: UnixNanos, timestamp_2: UnixNanos) -> String {
499    let datetime_1 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_1));
500    let datetime_2 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_2));
501
502    format!("{datetime_1}_{datetime_2}.parquet")
503}
504
505fn parquet_filepath(typename: &str, instrument_id: &InstrumentId, date: NaiveDate) -> PathBuf {
506    assert_post_epoch(date);
507
508    let typename = typename.to_snake_case();
509    let instrument_id_str = instrument_id.to_string().replace('/', "");
510
511    let start_utc = date.and_hms_opt(0, 0, 0).unwrap().and_utc();
512    let end_utc = date.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
513
514    let start_nanos = start_utc
515        .timestamp_nanos_opt()
516        .expect("valid nanosecond timestamp");
517    let end_nanos = (end_utc.and_utc())
518        .timestamp_nanos_opt()
519        .expect("valid nanosecond timestamp");
520
521    let filename = timestamps_to_filename(
522        UnixNanos::from(start_nanos as u64),
523        UnixNanos::from(end_nanos as u64),
524    );
525
526    PathBuf::new()
527        .join(typename)
528        .join(instrument_id_str)
529        .join(filename)
530}
531
532fn parquet_filepath_bars(bar_type: &BarType, date: NaiveDate) -> PathBuf {
533    assert_post_epoch(date);
534
535    let bar_type_str = bar_type.to_string().replace('/', "");
536
537    // Calculate start and end timestamps for the day (UTC)
538    let start_utc = date.and_hms_opt(0, 0, 0).unwrap().and_utc();
539    let end_utc = date.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
540
541    let start_nanos = start_utc
542        .timestamp_nanos_opt()
543        .expect("valid nanosecond timestamp");
544    let end_nanos = (end_utc.and_utc())
545        .timestamp_nanos_opt()
546        .expect("valid nanosecond timestamp");
547
548    let filename = timestamps_to_filename(
549        UnixNanos::from(start_nanos as u64),
550        UnixNanos::from(end_nanos as u64),
551    );
552
553    PathBuf::new().join("bar").join(bar_type_str).join(filename)
554}
555
556fn write_batch(
557    batch: RecordBatch,
558    typename: &str,
559    instrument_id: &InstrumentId,
560    date: NaiveDate,
561    path: &Path,
562) {
563    let filepath = path.join(parquet_filepath(typename, instrument_id, date));
564    if let Err(e) = write_parquet_local(batch, &filepath) {
565        tracing::error!("Error writing {filepath:?}: {e:?}");
566    } else {
567        tracing::info!("File written: {filepath:?}");
568    }
569}
570
571fn write_parquet_local(batch: RecordBatch, file_path: &Path) -> anyhow::Result<()> {
572    if let Some(parent) = file_path.parent() {
573        std::fs::create_dir_all(parent)?;
574    }
575
576    let file = std::fs::File::create(file_path)?;
577    let props = WriterProperties::builder()
578        .set_compression(Compression::SNAPPY)
579        .build();
580
581    let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
582    writer.write(&batch)?;
583    writer.close()?;
584    Ok(())
585}
586
587#[cfg(test)]
588mod tests {
589    use chrono::{TimeZone, Utc};
590    use rstest::rstest;
591
592    use super::*;
593
594    #[rstest]
595    #[case(
596    // Start of day: 2024-01-01 00:00:00 UTC
597    Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
598    NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
599    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
600)]
601    #[case(
602    // Midday: 2024-01-01 12:00:00 UTC
603    Utc.with_ymd_and_hms(2024, 1, 1, 12, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
604    NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
605    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
606)]
607    #[case(
608    // End of day: 2024-01-01 23:59:59.999999999 UTC
609    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999,
610    NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
611    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
612)]
613    #[case(
614    // Start of new day: 2024-01-02 00:00:00 UTC
615    Utc.with_ymd_and_hms(2024, 1, 2, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
616    NaiveDate::from_ymd_opt(2024, 1, 2).unwrap(),
617    Utc.with_ymd_and_hms(2024, 1, 2, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
618)]
619    fn test_date_cursor(
620        #[case] timestamp: u64,
621        #[case] expected_date: NaiveDate,
622        #[case] expected_end_ns: u64,
623    ) {
624        let unix_nanos = UnixNanos::from(timestamp);
625        let cursor = DateCursor::new(unix_nanos);
626
627        assert_eq!(cursor.date_utc, expected_date);
628        assert_eq!(cursor.end_ns, UnixNanos::from(expected_end_ns));
629    }
630}