nautilus_persistence/backend/
catalog.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::path::PathBuf;
17
18use datafusion::arrow::record_batch::RecordBatch;
19use heck::ToSnakeCase;
20use itertools::Itertools;
21use log::info;
22use nautilus_core::UnixNanos;
23use nautilus_model::data::{
24    Bar, Data, GetTsInit, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10,
25    QuoteTick, TradeTick, close::InstrumentClose,
26};
27use nautilus_serialization::{
28    arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch},
29    enums::ParquetWriteMode,
30    parquet::{combine_data_files, min_max_from_parquet_metadata, write_batches_to_parquet},
31};
32use serde::Serialize;
33
34use super::session::{self, DataBackendSession, QueryResult, build_query};
35
36pub struct ParquetDataCatalog {
37    base_path: PathBuf,
38    batch_size: usize,
39    session: DataBackendSession,
40}
41
42impl ParquetDataCatalog {
43    #[must_use]
44    pub fn new(base_path: PathBuf, batch_size: Option<usize>) -> Self {
45        let batch_size = batch_size.unwrap_or(5000);
46        Self {
47            base_path,
48            batch_size,
49            session: session::DataBackendSession::new(batch_size),
50        }
51    }
52
53    pub fn write_data_enum(&self, data: Vec<Data>, write_mode: Option<ParquetWriteMode>) {
54        let mut deltas: Vec<OrderBookDelta> = Vec::new();
55        let mut depth10s: Vec<OrderBookDepth10> = Vec::new();
56        let mut quotes: Vec<QuoteTick> = Vec::new();
57        let mut trades: Vec<TradeTick> = Vec::new();
58        let mut bars: Vec<Bar> = Vec::new();
59        let mut mark_prices: Vec<MarkPriceUpdate> = Vec::new();
60        let mut index_prices: Vec<IndexPriceUpdate> = Vec::new();
61        let mut closes: Vec<InstrumentClose> = Vec::new();
62
63        for d in data.iter().cloned() {
64            match d {
65                Data::Deltas(_) => continue,
66                Data::Delta(d) => {
67                    deltas.push(d);
68                }
69                Data::Depth10(d) => {
70                    depth10s.push(*d);
71                }
72                Data::Quote(d) => {
73                    quotes.push(d);
74                }
75                Data::Trade(d) => {
76                    trades.push(d);
77                }
78                Data::Bar(d) => {
79                    bars.push(d);
80                }
81                Data::MarkPriceUpdate(p) => {
82                    mark_prices.push(p);
83                }
84                Data::IndexPriceUpdate(p) => {
85                    index_prices.push(p);
86                }
87                Data::InstrumentClose(c) => {
88                    closes.push(c);
89                }
90            }
91        }
92
93        let _ = self.write_to_parquet(deltas, None, None, None, write_mode);
94        let _ = self.write_to_parquet(depth10s, None, None, None, write_mode);
95        let _ = self.write_to_parquet(quotes, None, None, None, write_mode);
96        let _ = self.write_to_parquet(trades, None, None, None, write_mode);
97        let _ = self.write_to_parquet(bars, None, None, None, write_mode);
98        let _ = self.write_to_parquet(mark_prices, None, None, None, write_mode);
99        let _ = self.write_to_parquet(index_prices, None, None, None, write_mode);
100        let _ = self.write_to_parquet(closes, None, None, None, write_mode);
101    }
102
103    pub fn write_to_parquet<T>(
104        &self,
105        data: Vec<T>,
106        path: Option<PathBuf>,
107        compression: Option<parquet::basic::Compression>,
108        max_row_group_size: Option<usize>,
109        write_mode: Option<ParquetWriteMode>,
110    ) -> anyhow::Result<PathBuf>
111    where
112        T: GetTsInit + EncodeToRecordBatch + CatalogPathPrefix,
113    {
114        let type_name = std::any::type_name::<T>().to_snake_case();
115        Self::check_ascending_timestamps(&data, &type_name);
116        let batches = self.data_to_record_batches(data)?;
117        let schema = batches.first().expect("Batches are empty.").schema();
118        let instrument_id = schema.metadata.get("instrument_id").cloned();
119        let new_path = self.make_path(T::path_prefix(), instrument_id, write_mode)?;
120        let path = path.unwrap_or(new_path);
121
122        // Write all batches to parquet file
123        info!(
124            "Writing {} batches of {type_name} data to {path:?}",
125            batches.len()
126        );
127
128        write_batches_to_parquet(&batches, &path, compression, max_row_group_size, write_mode)?;
129
130        Ok(path)
131    }
132
133    fn check_ascending_timestamps<T: GetTsInit>(data: &[T], type_name: &str) {
134        assert!(
135            data.windows(2).all(|w| w[0].ts_init() <= w[1].ts_init()),
136            "{type_name} timestamps must be in ascending order"
137        );
138    }
139
140    pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> anyhow::Result<Vec<RecordBatch>>
141    where
142        T: GetTsInit + EncodeToRecordBatch,
143    {
144        let mut batches = Vec::new();
145
146        for chunk in &data.into_iter().chunks(self.batch_size) {
147            let data = chunk.collect_vec();
148            let metadata = EncodeToRecordBatch::chunk_metadata(&data);
149            let record_batch = T::encode_batch(&metadata, &data)?;
150            batches.push(record_batch);
151        }
152
153        Ok(batches)
154    }
155
156    fn make_path(
157        &self,
158        type_name: &str,
159        instrument_id: Option<String>,
160        write_mode: Option<ParquetWriteMode>,
161    ) -> anyhow::Result<PathBuf> {
162        let path = self.make_directory_path(type_name, instrument_id);
163        std::fs::create_dir_all(&path)?;
164        let used_write_mode = write_mode.unwrap_or(ParquetWriteMode::Overwrite);
165        let mut file_path = path.join("data-0.parquet");
166        let mut empty_path = file_path.clone();
167        let mut i = 0;
168
169        while empty_path.exists() {
170            i += 1;
171            let name = format!("data-{i}.parquet");
172            empty_path = path.join(name);
173        }
174
175        if i > 1 && used_write_mode != ParquetWriteMode::NewFile {
176            anyhow::bail!(
177                "Only ParquetWriteMode::NewFile is allowed for a directory containing several parquet files."
178            );
179        } else if used_write_mode == ParquetWriteMode::NewFile {
180            file_path = empty_path;
181        }
182
183        info!("Created directory path: {file_path:?}");
184
185        Ok(file_path)
186    }
187
188    fn make_directory_path(&self, type_name: &str, instrument_id: Option<String>) -> PathBuf {
189        let mut path = self.base_path.join("data").join(type_name);
190
191        if let Some(id) = instrument_id {
192            path = path.join(id.replace('/', "")); // for FX symbols like EUR/USD
193        }
194
195        path
196    }
197
198    pub fn write_to_json<T>(
199        &self,
200        data: Vec<T>,
201        path: Option<PathBuf>,
202        write_metadata: bool,
203    ) -> anyhow::Result<PathBuf>
204    where
205        T: GetTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
206    {
207        let type_name = std::any::type_name::<T>().to_snake_case();
208        Self::check_ascending_timestamps(&data, &type_name);
209        let new_path = self.make_path(T::path_prefix(), None, None)?;
210        let json_path = path.unwrap_or(new_path.with_extension("json"));
211
212        info!(
213            "Writing {} records of {type_name} data to {json_path:?}",
214            data.len(),
215        );
216
217        if write_metadata {
218            let metadata = T::chunk_metadata(&data);
219            let metadata_path = json_path.with_extension("metadata.json");
220            info!("Writing metadata to {metadata_path:?}");
221            let metadata_file = std::fs::File::create(&metadata_path)?;
222            serde_json::to_writer_pretty(metadata_file, &metadata)?;
223        }
224
225        let file = std::fs::File::create(&json_path)?;
226        serde_json::to_writer_pretty(file, &serde_json::to_value(data)?)?;
227
228        Ok(json_path)
229    }
230
231    pub fn consolidate_data(
232        &self,
233        type_name: &str,
234        instrument_id: Option<String>,
235    ) -> anyhow::Result<()> {
236        let parquet_files = self.query_parquet_files(type_name, instrument_id)?;
237
238        if !parquet_files.is_empty() {
239            combine_data_files(parquet_files, "ts_init", None, None)?;
240        }
241
242        Ok(())
243    }
244
245    pub fn consolidate_catalog(&self) -> anyhow::Result<()> {
246        let leaf_directories = self.find_leaf_data_directories()?;
247
248        for directory in leaf_directories {
249            let parquet_files: Vec<PathBuf> = std::fs::read_dir(directory)?
250                .filter_map(|entry| {
251                    let path = entry.ok()?.path();
252
253                    if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
254                        Some(path)
255                    } else {
256                        None
257                    }
258                })
259                .collect();
260
261            if !parquet_files.is_empty() {
262                combine_data_files(parquet_files, "ts_init", None, None)?;
263            }
264        }
265
266        Ok(())
267    }
268
269    pub fn find_leaf_data_directories(&self) -> anyhow::Result<Vec<PathBuf>> {
270        let mut all_paths: Vec<PathBuf> = Vec::new();
271        let data_dir = self.base_path.join("data");
272
273        for entry in walkdir::WalkDir::new(data_dir) {
274            all_paths.push(entry?.path().to_path_buf());
275        }
276
277        let all_dirs = all_paths
278            .iter()
279            .filter(|p| p.is_dir())
280            .cloned()
281            .collect::<Vec<PathBuf>>();
282        let mut leaf_dirs = Vec::new();
283
284        for directory in all_dirs {
285            let items = std::fs::read_dir(&directory)?;
286            let has_subdirs = items.into_iter().any(|entry| {
287                let entry = entry.unwrap();
288                entry.path().is_dir()
289            });
290            let has_files = std::fs::read_dir(&directory)?.any(|entry| {
291                let entry = entry.unwrap();
292                entry.path().is_file()
293            });
294
295            if has_files && !has_subdirs {
296                leaf_dirs.push(directory);
297            }
298        }
299
300        Ok(leaf_dirs)
301    }
302
303    /// Query data loaded in the catalog
304    pub fn query_file<T>(
305        &mut self,
306        path: PathBuf,
307        start: Option<UnixNanos>,
308        end: Option<UnixNanos>,
309        where_clause: Option<&str>,
310    ) -> anyhow::Result<QueryResult>
311    where
312        T: DecodeDataFromRecordBatch + CatalogPathPrefix,
313    {
314        let path_str = path.to_str().expect("Failed to convert path to string");
315        let table_name = path
316            .file_stem()
317            .unwrap()
318            .to_str()
319            .expect("Failed to convert path to string");
320        let query = build_query(table_name, start, end, where_clause);
321        self.session
322            .add_file::<T>(table_name, path_str, Some(&query))?;
323
324        Ok(self.session.get_query_result())
325    }
326
327    /// Query data loaded in the catalog
328    pub fn query_directory<T>(
329        &mut self,
330        instrument_ids: Vec<String>,
331        start: Option<UnixNanos>,
332        end: Option<UnixNanos>,
333        where_clause: Option<&str>,
334    ) -> anyhow::Result<QueryResult>
335    where
336        T: DecodeDataFromRecordBatch + CatalogPathPrefix,
337    {
338        let mut paths = Vec::new();
339
340        for instrument_id in instrument_ids {
341            paths.extend(self.query_parquet_files(T::path_prefix(), Some(instrument_id))?);
342        }
343
344        // If no specific instrument_id is selected query all files for the data type
345        if paths.is_empty() {
346            paths.push(self.make_path(T::path_prefix(), None, None)?);
347        }
348
349        for path in &paths {
350            let path = path.to_str().expect("Failed to convert path to string");
351            let query = build_query(path, start, end, where_clause);
352            self.session.add_file::<T>(path, path, Some(&query))?;
353        }
354
355        Ok(self.session.get_query_result())
356    }
357
358    #[allow(dead_code)]
359    pub fn query_timestamp_bound(
360        &self,
361        data_cls: &str,
362        instrument_id: Option<String>,
363        is_last: Option<bool>,
364    ) -> anyhow::Result<Option<i64>> {
365        let is_last = is_last.unwrap_or(true);
366        let parquet_files = self.query_parquet_files(data_cls, instrument_id)?;
367
368        if parquet_files.is_empty() {
369            return Ok(None);
370        }
371
372        let min_max_per_file: Vec<(i64, i64)> = parquet_files
373            .iter()
374            .map(|file| min_max_from_parquet_metadata(file, "ts_init"))
375            .collect::<Result<Vec<_>, _>>()?;
376        let mut timestamps: Vec<i64> = Vec::new();
377
378        for min_max in min_max_per_file {
379            let (min, max) = min_max;
380
381            if is_last {
382                timestamps.push(max);
383            } else {
384                timestamps.push(min);
385            }
386        }
387
388        if timestamps.is_empty() {
389            return Ok(None);
390        }
391
392        if is_last {
393            Ok(timestamps.iter().max().copied())
394        } else {
395            Ok(timestamps.iter().min().copied())
396        }
397    }
398
399    pub fn query_parquet_files(
400        &self,
401        type_name: &str,
402        instrument_id: Option<String>,
403    ) -> anyhow::Result<Vec<PathBuf>> {
404        let path = self.make_directory_path(type_name, instrument_id);
405        let mut files = Vec::new();
406
407        if path.exists() {
408            for entry in std::fs::read_dir(path)? {
409                let path = entry?.path();
410                if path.is_file() && path.extension().unwrap() == "parquet" {
411                    files.push(path);
412                }
413            }
414        }
415
416        Ok(files)
417    }
418}
419
420pub trait CatalogPathPrefix {
421    fn path_prefix() -> &'static str;
422}
423
424macro_rules! impl_catalog_path_prefix {
425    ($type:ty, $path:expr) => {
426        impl CatalogPathPrefix for $type {
427            fn path_prefix() -> &'static str {
428                $path
429            }
430        }
431    };
432}
433
434impl_catalog_path_prefix!(QuoteTick, "quotes");
435impl_catalog_path_prefix!(TradeTick, "trades");
436impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
437impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
438impl_catalog_path_prefix!(Bar, "bars");
439impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
440impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
441impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");