Skip to main content

nautilus_tardis/
replay.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    fs,
19    path::{Path, PathBuf},
20};
21
22use anyhow::Context;
23use arrow::record_batch::RecordBatch;
24use chrono::{DateTime, Duration, NaiveDate};
25use futures_util::{StreamExt, future::join_all, pin_mut};
26use heck::ToSnakeCase;
27use nautilus_core::{
28    UnixNanos, datetime::unix_nanos_to_iso8601, formatting::Separable, parsing::precision_from_str,
29};
30use nautilus_model::{
31    data::{
32        Bar, BarType, Data, OrderBookDelta, OrderBookDeltas_API, OrderBookDepth10, QuoteTick,
33        TradeTick,
34    },
35    identifiers::InstrumentId,
36};
37use nautilus_serialization::arrow::{
38    bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
39    book_depth10_to_arrow_record_batch_bytes, quotes_to_arrow_record_batch_bytes,
40    trades_to_arrow_record_batch_bytes,
41};
42use parquet::{arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties};
43use ustr::Ustr;
44
45use super::{enums::TardisExchange, http::models::TardisInstrumentInfo};
46use crate::{
47    config::{BookSnapshotOutput, TardisReplayConfig},
48    http::TardisHttpClient,
49    machine::{TardisMachineClient, types::TardisInstrumentMiniInfo},
50    parse::{normalize_instrument_id, parse_instrument_id},
51};
52
53struct DateCursor {
54    /// Cursor date UTC.
55    date_utc: NaiveDate,
56    /// Cursor end timestamp UNIX nanoseconds.
57    end_ns: UnixNanos,
58}
59
60impl DateCursor {
61    /// Creates a new [`DateCursor`] instance.
62    fn new(current_ns: UnixNanos) -> Self {
63        let current_utc = DateTime::from_timestamp_nanos(current_ns.as_i64());
64        let date_utc = current_utc.date_naive();
65
66        // Calculate end of the current UTC day
67        // SAFETY: Known safe input values
68        let end_utc =
69            date_utc.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
70        let end_ns = UnixNanos::from(end_utc.and_utc().timestamp_nanos_opt().unwrap() as u64);
71
72        Self { date_utc, end_ns }
73    }
74}
75
76async fn gather_instruments_info(
77    config: &TardisReplayConfig,
78    http_client: &TardisHttpClient,
79) -> HashMap<TardisExchange, Vec<TardisInstrumentInfo>> {
80    let futures = config.options.iter().map(|options| {
81        let exchange = options.exchange;
82        let client = &http_client;
83
84        tracing::info!("Requesting instruments for {exchange}");
85
86        async move {
87            match client.instruments_info(exchange, None, None).await {
88                Ok(instruments) => Some((exchange, instruments)),
89                Err(e) => {
90                    tracing::error!("Error fetching instruments for {exchange}: {e}");
91                    None
92                }
93            }
94        }
95    });
96
97    let results: HashMap<TardisExchange, Vec<TardisInstrumentInfo>> =
98        join_all(futures).await.into_iter().flatten().collect();
99
100    tracing::info!("Received all instruments");
101
102    results
103}
104
105/// Run the Tardis Machine replay from a JSON configuration file.
106///
107/// # Errors
108///
109/// Returns an error if reading or parsing the config file fails,
110/// or if any downstream replay operation fails.
111/// Run the Tardis Machine replay from a JSON configuration file.
112///
113/// # Panics
114///
115/// Panics if unable to determine the output path (current directory fallback fails).
116pub async fn run_tardis_machine_replay_from_config(config_filepath: &Path) -> anyhow::Result<()> {
117    tracing::info!("Starting replay");
118    tracing::info!("Config filepath: {config_filepath:?}");
119
120    // Load and parse the replay configuration
121    let config_data = fs::read_to_string(config_filepath)
122        .with_context(|| format!("Failed to read config file: {config_filepath:?}"))?;
123    let config: TardisReplayConfig = serde_json::from_str(&config_data)
124        .context("failed to parse config JSON into TardisReplayConfig")?;
125
126    let path = config
127        .output_path
128        .as_deref()
129        .map(Path::new)
130        .map(Path::to_path_buf)
131        .or_else(|| {
132            std::env::var("NAUTILUS_PATH")
133                .ok()
134                .map(|env_path| PathBuf::from(env_path).join("catalog").join("data"))
135        })
136        .unwrap_or_else(|| std::env::current_dir().expect("Failed to get current directory"));
137
138    tracing::info!("Output path: {path:?}");
139
140    let normalize_symbols = config.normalize_symbols.unwrap_or(true);
141    tracing::info!("normalize_symbols={normalize_symbols}");
142
143    let book_snapshot_output = config
144        .book_snapshot_output
145        .clone()
146        .unwrap_or(BookSnapshotOutput::Deltas);
147    tracing::info!("book_snapshot_output={book_snapshot_output:?}");
148
149    let http_client = TardisHttpClient::new(None, None, None, normalize_symbols)?;
150    let mut machine_client = TardisMachineClient::new(
151        config.tardis_ws_url.as_deref(),
152        normalize_symbols,
153        book_snapshot_output,
154    )?;
155
156    let info_map = gather_instruments_info(&config, &http_client).await;
157
158    for (exchange, instruments) in &info_map {
159        for inst in instruments {
160            let instrument_type = inst.instrument_type;
161            let price_precision = precision_from_str(&inst.price_increment.to_string());
162            let size_precision = precision_from_str(&inst.amount_increment.to_string());
163
164            let instrument_id = if normalize_symbols {
165                normalize_instrument_id(exchange, inst.id, &instrument_type, inst.inverse)
166            } else {
167                parse_instrument_id(exchange, inst.id)
168            };
169
170            let info = TardisInstrumentMiniInfo::new(
171                instrument_id,
172                Some(Ustr::from(&inst.id)),
173                *exchange,
174                price_precision,
175                size_precision,
176            );
177            machine_client.add_instrument_info(info);
178        }
179    }
180
181    tracing::info!("Starting tardis-machine stream");
182    let stream = machine_client.replay(config.options).await?;
183    pin_mut!(stream);
184
185    // Initialize date cursors
186    let mut deltas_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
187    let mut depths_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
188    let mut quotes_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
189    let mut trades_cursors: HashMap<InstrumentId, DateCursor> = HashMap::new();
190    let mut bars_cursors: HashMap<BarType, DateCursor> = HashMap::new();
191
192    // Initialize date collection maps
193    let mut deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>> = HashMap::new();
194    let mut depths_map: HashMap<InstrumentId, Vec<OrderBookDepth10>> = HashMap::new();
195    let mut quotes_map: HashMap<InstrumentId, Vec<QuoteTick>> = HashMap::new();
196    let mut trades_map: HashMap<InstrumentId, Vec<TradeTick>> = HashMap::new();
197    let mut bars_map: HashMap<BarType, Vec<Bar>> = HashMap::new();
198
199    let mut msg_count = 0;
200
201    while let Some(result) = stream.next().await {
202        match result {
203            Ok(msg) => {
204                match msg {
205                    Data::Deltas(msg) => {
206                        handle_deltas_msg(msg, &mut deltas_map, &mut deltas_cursors, &path);
207                    }
208                    Data::Depth10(msg) => {
209                        handle_depth10_msg(*msg, &mut depths_map, &mut depths_cursors, &path);
210                    }
211                    Data::Quote(msg) => {
212                        handle_quote_msg(msg, &mut quotes_map, &mut quotes_cursors, &path);
213                    }
214                    Data::Trade(msg) => {
215                        handle_trade_msg(msg, &mut trades_map, &mut trades_cursors, &path);
216                    }
217                    Data::Bar(msg) => handle_bar_msg(msg, &mut bars_map, &mut bars_cursors, &path),
218                    Data::Delta(delta) => {
219                        tracing::warn!(
220                            "Skipping individual delta message for {} (use Deltas batch instead)",
221                            delta.instrument_id
222                        );
223                    }
224                    Data::MarkPriceUpdate(_)
225                    | Data::IndexPriceUpdate(_)
226                    | Data::InstrumentClose(_) => {
227                        tracing::debug!(
228                            "Skipping unsupported data type for instrument {}",
229                            msg.instrument_id()
230                        );
231                    }
232                }
233
234                msg_count += 1;
235                if msg_count % 100_000 == 0 {
236                    tracing::debug!("Processed {} messages", msg_count.separate_with_commas());
237                }
238            }
239            Err(e) => {
240                tracing::error!("Stream error: {e:?}");
241                break;
242            }
243        }
244    }
245
246    // Iterate through every remaining type and instrument sequentially
247
248    for (instrument_id, deltas) in deltas_map {
249        let cursor = deltas_cursors.get(&instrument_id).expect("Expected cursor");
250        batch_and_write_deltas(deltas, &instrument_id, cursor.date_utc, &path);
251    }
252
253    for (instrument_id, depths) in depths_map {
254        let cursor = depths_cursors.get(&instrument_id).expect("Expected cursor");
255        batch_and_write_depths(depths, &instrument_id, cursor.date_utc, &path);
256    }
257
258    for (instrument_id, quotes) in quotes_map {
259        let cursor = quotes_cursors.get(&instrument_id).expect("Expected cursor");
260        batch_and_write_quotes(quotes, &instrument_id, cursor.date_utc, &path);
261    }
262
263    for (instrument_id, trades) in trades_map {
264        let cursor = trades_cursors.get(&instrument_id).expect("Expected cursor");
265        batch_and_write_trades(trades, &instrument_id, cursor.date_utc, &path);
266    }
267
268    for (bar_type, bars) in bars_map {
269        let cursor = bars_cursors.get(&bar_type).expect("Expected cursor");
270        batch_and_write_bars(bars, &bar_type, cursor.date_utc, &path);
271    }
272
273    tracing::info!(
274        "Replay completed after {} messages",
275        msg_count.separate_with_commas()
276    );
277    Ok(())
278}
279
280fn handle_deltas_msg(
281    deltas: OrderBookDeltas_API,
282    map: &mut HashMap<InstrumentId, Vec<OrderBookDelta>>,
283    cursors: &mut HashMap<InstrumentId, DateCursor>,
284    path: &Path,
285) {
286    let cursor = cursors
287        .entry(deltas.instrument_id)
288        .or_insert_with(|| DateCursor::new(deltas.ts_init));
289
290    if deltas.ts_init > cursor.end_ns {
291        if let Some(deltas_vec) = map.remove(&deltas.instrument_id) {
292            batch_and_write_deltas(deltas_vec, &deltas.instrument_id, cursor.date_utc, path);
293        }
294        // Update cursor
295        *cursor = DateCursor::new(deltas.ts_init);
296    }
297
298    map.entry(deltas.instrument_id)
299        .or_insert_with(|| Vec::with_capacity(100_000))
300        .extend(&*deltas.deltas);
301}
302
303fn handle_depth10_msg(
304    depth10: OrderBookDepth10,
305    map: &mut HashMap<InstrumentId, Vec<OrderBookDepth10>>,
306    cursors: &mut HashMap<InstrumentId, DateCursor>,
307    path: &Path,
308) {
309    let cursor = cursors
310        .entry(depth10.instrument_id)
311        .or_insert_with(|| DateCursor::new(depth10.ts_init));
312
313    if depth10.ts_init > cursor.end_ns {
314        if let Some(depths_vec) = map.remove(&depth10.instrument_id) {
315            batch_and_write_depths(depths_vec, &depth10.instrument_id, cursor.date_utc, path);
316        }
317        // Update cursor
318        *cursor = DateCursor::new(depth10.ts_init);
319    }
320
321    map.entry(depth10.instrument_id)
322        .or_insert_with(|| Vec::with_capacity(100_000))
323        .push(depth10);
324}
325
326fn handle_quote_msg(
327    quote: QuoteTick,
328    map: &mut HashMap<InstrumentId, Vec<QuoteTick>>,
329    cursors: &mut HashMap<InstrumentId, DateCursor>,
330    path: &Path,
331) {
332    let cursor = cursors
333        .entry(quote.instrument_id)
334        .or_insert_with(|| DateCursor::new(quote.ts_init));
335
336    if quote.ts_init > cursor.end_ns {
337        if let Some(quotes_vec) = map.remove(&quote.instrument_id) {
338            batch_and_write_quotes(quotes_vec, &quote.instrument_id, cursor.date_utc, path);
339        }
340        // Update cursor
341        *cursor = DateCursor::new(quote.ts_init);
342    }
343
344    map.entry(quote.instrument_id)
345        .or_insert_with(|| Vec::with_capacity(100_000))
346        .push(quote);
347}
348
349fn handle_trade_msg(
350    trade: TradeTick,
351    map: &mut HashMap<InstrumentId, Vec<TradeTick>>,
352    cursors: &mut HashMap<InstrumentId, DateCursor>,
353    path: &Path,
354) {
355    let cursor = cursors
356        .entry(trade.instrument_id)
357        .or_insert_with(|| DateCursor::new(trade.ts_init));
358
359    if trade.ts_init > cursor.end_ns {
360        if let Some(trades_vec) = map.remove(&trade.instrument_id) {
361            batch_and_write_trades(trades_vec, &trade.instrument_id, cursor.date_utc, path);
362        }
363        // Update cursor
364        *cursor = DateCursor::new(trade.ts_init);
365    }
366
367    map.entry(trade.instrument_id)
368        .or_insert_with(|| Vec::with_capacity(100_000))
369        .push(trade);
370}
371
372fn handle_bar_msg(
373    bar: Bar,
374    map: &mut HashMap<BarType, Vec<Bar>>,
375    cursors: &mut HashMap<BarType, DateCursor>,
376    path: &Path,
377) {
378    let cursor = cursors
379        .entry(bar.bar_type)
380        .or_insert_with(|| DateCursor::new(bar.ts_init));
381
382    if bar.ts_init > cursor.end_ns {
383        if let Some(bars_vec) = map.remove(&bar.bar_type) {
384            batch_and_write_bars(bars_vec, &bar.bar_type, cursor.date_utc, path);
385        }
386        // Update cursor
387        *cursor = DateCursor::new(bar.ts_init);
388    }
389
390    map.entry(bar.bar_type)
391        .or_insert_with(|| Vec::with_capacity(100_000))
392        .push(bar);
393}
394
395fn batch_and_write_deltas(
396    deltas: Vec<OrderBookDelta>,
397    instrument_id: &InstrumentId,
398    date: NaiveDate,
399    path: &Path,
400) {
401    let typename = stringify!(OrderBookDeltas);
402    match book_deltas_to_arrow_record_batch_bytes(deltas) {
403        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
404        Err(e) => {
405            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
406        }
407    }
408}
409
410fn batch_and_write_depths(
411    depths: Vec<OrderBookDepth10>,
412    instrument_id: &InstrumentId,
413    date: NaiveDate,
414    path: &Path,
415) {
416    // Use "order_book_depths" to match catalog path prefix
417    let typename = "order_book_depths";
418    match book_depth10_to_arrow_record_batch_bytes(depths) {
419        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
420        Err(e) => {
421            tracing::error!("Error converting OrderBookDepth10 to Arrow: {e:?}");
422        }
423    }
424}
425
426fn batch_and_write_quotes(
427    quotes: Vec<QuoteTick>,
428    instrument_id: &InstrumentId,
429    date: NaiveDate,
430    path: &Path,
431) {
432    let typename = stringify!(QuoteTick);
433    match quotes_to_arrow_record_batch_bytes(quotes) {
434        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
435        Err(e) => {
436            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
437        }
438    }
439}
440
441fn batch_and_write_trades(
442    trades: Vec<TradeTick>,
443    instrument_id: &InstrumentId,
444    date: NaiveDate,
445    path: &Path,
446) {
447    let typename = stringify!(TradeTick);
448    match trades_to_arrow_record_batch_bytes(trades) {
449        Ok(batch) => write_batch(batch, typename, instrument_id, date, path),
450        Err(e) => {
451            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
452        }
453    }
454}
455
456fn batch_and_write_bars(bars: Vec<Bar>, bar_type: &BarType, date: NaiveDate, path: &Path) {
457    let typename = stringify!(Bar);
458    let batch = match bars_to_arrow_record_batch_bytes(bars) {
459        Ok(batch) => batch,
460        Err(e) => {
461            tracing::error!("Error converting `{typename}` to Arrow: {e:?}");
462            return;
463        }
464    };
465
466    let filepath = path.join(parquet_filepath_bars(bar_type, date));
467    if let Err(e) = write_parquet_local(batch, &filepath) {
468        tracing::error!("Error writing {filepath:?}: {e:?}");
469    } else {
470        tracing::info!("File written: {filepath:?}");
471    }
472}
473
474/// Asserts that the given date is on or after the UNIX epoch (1970-01-01).
475///
476/// # Panics
477///
478/// Panics if the date is before 1970-01-01, as pre-epoch dates cannot be
479/// reliably represented as UnixNanos without overflow issues.
480fn assert_post_epoch(date: NaiveDate) {
481    let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("UNIX epoch must exist");
482    if date < epoch {
483        panic!("Tardis replay filenames require dates on or after 1970-01-01; received {date}");
484    }
485}
486
487/// Converts an ISO 8601 timestamp to a filesystem-safe format.
488///
489/// This function replaces colons and dots with hyphens to make the timestamp
490/// safe for use in filenames across different filesystems.
491fn iso_timestamp_to_file_timestamp(iso_timestamp: &str) -> String {
492    iso_timestamp.replace([':', '.'], "-")
493}
494
495/// Converts timestamps to a filename using ISO 8601 format.
496///
497/// This function converts two Unix nanosecond timestamps to a filename that uses
498/// ISO 8601 format with filesystem-safe characters, matching the catalog convention.
499fn timestamps_to_filename(timestamp_1: UnixNanos, timestamp_2: UnixNanos) -> String {
500    let datetime_1 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_1));
501    let datetime_2 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_2));
502
503    format!("{datetime_1}_{datetime_2}.parquet")
504}
505
506fn parquet_filepath(typename: &str, instrument_id: &InstrumentId, date: NaiveDate) -> PathBuf {
507    assert_post_epoch(date);
508
509    let typename = typename.to_snake_case();
510    let instrument_id_str = instrument_id.to_string().replace('/', "");
511
512    let start_utc = date.and_hms_opt(0, 0, 0).unwrap().and_utc();
513    let end_utc = date.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
514
515    let start_nanos = start_utc
516        .timestamp_nanos_opt()
517        .expect("valid nanosecond timestamp");
518    let end_nanos = (end_utc.and_utc())
519        .timestamp_nanos_opt()
520        .expect("valid nanosecond timestamp");
521
522    let filename = timestamps_to_filename(
523        UnixNanos::from(start_nanos as u64),
524        UnixNanos::from(end_nanos as u64),
525    );
526
527    PathBuf::new()
528        .join(typename)
529        .join(instrument_id_str)
530        .join(filename)
531}
532
533fn parquet_filepath_bars(bar_type: &BarType, date: NaiveDate) -> PathBuf {
534    assert_post_epoch(date);
535
536    let bar_type_str = bar_type.to_string().replace('/', "");
537
538    // Calculate start and end timestamps for the day (UTC)
539    let start_utc = date.and_hms_opt(0, 0, 0).unwrap().and_utc();
540    let end_utc = date.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
541
542    let start_nanos = start_utc
543        .timestamp_nanos_opt()
544        .expect("valid nanosecond timestamp");
545    let end_nanos = (end_utc.and_utc())
546        .timestamp_nanos_opt()
547        .expect("valid nanosecond timestamp");
548
549    let filename = timestamps_to_filename(
550        UnixNanos::from(start_nanos as u64),
551        UnixNanos::from(end_nanos as u64),
552    );
553
554    PathBuf::new().join("bar").join(bar_type_str).join(filename)
555}
556
557fn write_batch(
558    batch: RecordBatch,
559    typename: &str,
560    instrument_id: &InstrumentId,
561    date: NaiveDate,
562    path: &Path,
563) {
564    let filepath = path.join(parquet_filepath(typename, instrument_id, date));
565    if let Err(e) = write_parquet_local(batch, &filepath) {
566        tracing::error!("Error writing {filepath:?}: {e:?}");
567    } else {
568        tracing::info!("File written: {filepath:?}");
569    }
570}
571
572fn write_parquet_local(batch: RecordBatch, file_path: &Path) -> anyhow::Result<()> {
573    if let Some(parent) = file_path.parent() {
574        std::fs::create_dir_all(parent)?;
575    }
576
577    let file = std::fs::File::create(file_path)?;
578    let props = WriterProperties::builder()
579        .set_compression(Compression::SNAPPY)
580        .build();
581
582    let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
583    writer.write(&batch)?;
584    writer.close()?;
585    Ok(())
586}
587
588#[cfg(test)]
589mod tests {
590    use chrono::{TimeZone, Utc};
591    use rstest::rstest;
592
593    use super::*;
594
595    #[rstest]
596    #[case(
597    // Start of day: 2024-01-01 00:00:00 UTC
598    Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
599    NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
600    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
601)]
602    #[case(
603    // Midday: 2024-01-01 12:00:00 UTC
604    Utc.with_ymd_and_hms(2024, 1, 1, 12, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
605    NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
606    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
607)]
608    #[case(
609    // End of day: 2024-01-01 23:59:59.999999999 UTC
610    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999,
611    NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
612    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
613)]
614    #[case(
615    // Start of new day: 2024-01-02 00:00:00 UTC
616    Utc.with_ymd_and_hms(2024, 1, 2, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
617    NaiveDate::from_ymd_opt(2024, 1, 2).unwrap(),
618    Utc.with_ymd_and_hms(2024, 1, 2, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
619)]
620    fn test_date_cursor(
621        #[case] timestamp: u64,
622        #[case] expected_date: NaiveDate,
623        #[case] expected_end_ns: u64,
624    ) {
625        let unix_nanos = UnixNanos::from(timestamp);
626        let cursor = DateCursor::new(unix_nanos);
627
628        assert_eq!(cursor.date_utc, expected_date);
629        assert_eq!(cursor.end_ns, UnixNanos::from(expected_end_ns));
630    }
631}