nautilus_persistence/backend/
catalog.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::path::PathBuf;
17
18use datafusion::{arrow::record_batch::RecordBatch, error::Result};
19use heck::ToSnakeCase;
20use itertools::Itertools;
21use log::info;
22use nautilus_core::UnixNanos;
23use nautilus_model::data::{
24    Bar, Data, GetTsInit, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
25};
26use nautilus_serialization::{
27    arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch},
28    parquet::write_batches_to_parquet,
29};
30use serde::Serialize;
31
32use super::session::{self, DataBackendSession, QueryResult, build_query};
33
34pub struct ParquetDataCatalog {
35    base_path: PathBuf,
36    batch_size: usize,
37    session: DataBackendSession,
38}
39
40impl ParquetDataCatalog {
41    #[must_use]
42    pub fn new(base_path: PathBuf, batch_size: Option<usize>) -> Self {
43        let batch_size = batch_size.unwrap_or(5000);
44        Self {
45            base_path,
46            batch_size,
47            session: session::DataBackendSession::new(batch_size),
48        }
49    }
50
51    fn make_path(&self, type_name: &str, instrument_id: Option<&String>) -> PathBuf {
52        let mut path = self.base_path.join("data").join(type_name);
53
54        if let Some(id) = instrument_id {
55            path = path.join(id);
56        }
57
58        std::fs::create_dir_all(&path).expect("Failed to create directory");
59        let file_path = path.join("data.parquet");
60        info!("Created directory path: {:?}", file_path);
61        file_path
62    }
63
64    fn check_ascending_timestamps<T: GetTsInit>(data: &[T], type_name: &str) {
65        assert!(
66            data.windows(2).all(|w| w[0].ts_init() <= w[1].ts_init()),
67            "{type_name} timestamps must be in ascending order"
68        );
69    }
70
71    #[must_use]
72    pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> Vec<RecordBatch>
73    where
74        T: GetTsInit + EncodeToRecordBatch,
75    {
76        data.into_iter()
77            .chunks(self.batch_size)
78            .into_iter()
79            .map(|chunk| {
80                // Take first element and extract metadata
81                // SAFETY: Unwrap safe as already checked that `data` not empty
82                let data = chunk.collect_vec();
83                let metadata = EncodeToRecordBatch::chunk_metadata(&data);
84                T::encode_batch(&metadata, &data).expect("Expected to encode batch")
85            })
86            .collect()
87    }
88
89    #[must_use]
90    pub fn write_to_json<T>(
91        &self,
92        data: Vec<T>,
93        path: Option<PathBuf>,
94        write_metadata: bool,
95    ) -> PathBuf
96    where
97        T: GetTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
98    {
99        let type_name = std::any::type_name::<T>().to_snake_case();
100        Self::check_ascending_timestamps(&data, &type_name);
101
102        let json_path = path.unwrap_or_else(|| {
103            let path = self.make_path(T::path_prefix(), None);
104            path.with_extension("json")
105        });
106
107        info!(
108            "Writing {} records of {type_name} data to {json_path:?}",
109            data.len(),
110        );
111
112        if write_metadata {
113            let metadata = T::chunk_metadata(&data);
114            let metadata_path = json_path.with_extension("metadata.json");
115            info!("Writing metadata to {:?}", metadata_path);
116            let metadata_file = std::fs::File::create(&metadata_path)
117                .unwrap_or_else(|_| panic!("Failed to create metadata file at {metadata_path:?}"));
118            serde_json::to_writer_pretty(metadata_file, &metadata)
119                .unwrap_or_else(|_| panic!("Failed to write metadata to JSON"));
120        }
121
122        let file = std::fs::File::create(&json_path)
123            .unwrap_or_else(|_| panic!("Failed to create JSON file at {json_path:?}"));
124
125        serde_json::to_writer_pretty(file, &serde_json::to_value(data).unwrap())
126            .unwrap_or_else(|_| panic!("Failed to write {type_name} to JSON"));
127
128        json_path
129    }
130
131    #[must_use]
132    pub fn write_to_parquet<T>(
133        &self,
134        data: Vec<T>,
135        path: Option<PathBuf>,
136        compression: Option<parquet::basic::Compression>,
137        max_row_group_size: Option<usize>,
138    ) -> PathBuf
139    where
140        T: GetTsInit + EncodeToRecordBatch + CatalogPathPrefix,
141    {
142        let type_name = std::any::type_name::<T>().to_snake_case();
143        Self::check_ascending_timestamps(&data, &type_name);
144
145        let batches = self.data_to_record_batches(data);
146        let batch = batches.first().expect("Expected at least one batch");
147        let schema = batch.schema();
148        let instrument_id = schema.metadata.get("instrument_id");
149        let path = path.unwrap_or_else(|| self.make_path(T::path_prefix(), instrument_id));
150
151        // Write all batches to parquet file
152        info!(
153            "Writing {} batches of {} data to {:?}",
154            batches.len(),
155            type_name,
156            path
157        );
158
159        write_batches_to_parquet(&batches, &path, compression, max_row_group_size)
160            .unwrap_or_else(|_| panic!("Failed to write {type_name} to parquet"));
161
162        path
163    }
164
165    /// Query data loaded in the catalog
166    pub fn query_file<T>(
167        &mut self,
168        path: PathBuf,
169        start: Option<UnixNanos>,
170        end: Option<UnixNanos>,
171        where_clause: Option<&str>,
172    ) -> Result<QueryResult>
173    where
174        T: DecodeDataFromRecordBatch + CatalogPathPrefix,
175    {
176        let path_str = path.to_str().unwrap();
177        let table_name = path.file_stem().unwrap().to_str().unwrap();
178        let query = build_query(table_name, start, end, where_clause);
179        self.session
180            .add_file::<T>(table_name, path_str, Some(&query))?;
181        Ok(self.session.get_query_result())
182    }
183
184    /// Query data loaded in the catalog
185    pub fn query_directory<T>(
186        &mut self,
187        // use instrument_ids or bar_types to query specific subset of the data
188        instrument_ids: Vec<String>,
189        start: Option<UnixNanos>,
190        end: Option<UnixNanos>,
191        where_clause: Option<&str>,
192    ) -> Result<QueryResult>
193    where
194        T: DecodeDataFromRecordBatch + CatalogPathPrefix,
195    {
196        let mut paths = Vec::new();
197        for instrument_id in &instrument_ids {
198            paths.push(self.make_path(T::path_prefix(), Some(instrument_id)));
199        }
200
201        // If no specific instrument_id is selected query all files for the data type
202        if paths.is_empty() {
203            paths.push(self.make_path(T::path_prefix(), None));
204        }
205
206        for path in &paths {
207            let path = path.to_str().unwrap();
208            let query = build_query(path, start, end, where_clause);
209            self.session.add_file::<T>(path, path, Some(&query))?;
210        }
211
212        Ok(self.session.get_query_result())
213    }
214
215    pub fn write_data_enum(&self, data: Vec<Data>) {
216        let mut delta: Vec<OrderBookDelta> = Vec::new();
217        let mut depth10: Vec<OrderBookDepth10> = Vec::new();
218        let mut quote: Vec<QuoteTick> = Vec::new();
219        let mut trade: Vec<TradeTick> = Vec::new();
220        let mut bar: Vec<Bar> = Vec::new();
221
222        for d in data.iter().cloned() {
223            match d {
224                Data::Delta(d) => {
225                    delta.push(d);
226                }
227                Data::Depth10(d) => {
228                    depth10.push(*d);
229                }
230                Data::Quote(d) => {
231                    quote.push(d);
232                }
233                Data::Trade(d) => {
234                    trade.push(d);
235                }
236                Data::Bar(d) => {
237                    bar.push(d);
238                }
239                Data::Deltas(_) => continue,
240            }
241        }
242
243        let _ = self.write_to_parquet(delta, None, None, None);
244        let _ = self.write_to_parquet(depth10, None, None, None);
245        let _ = self.write_to_parquet(quote, None, None, None);
246        let _ = self.write_to_parquet(trade, None, None, None);
247        let _ = self.write_to_parquet(bar, None, None, None);
248    }
249}
250
251pub trait CatalogPathPrefix {
252    fn path_prefix() -> &'static str;
253}
254
255macro_rules! impl_catalog_path_prefix {
256    ($type:ty, $path:expr) => {
257        impl CatalogPathPrefix for $type {
258            fn path_prefix() -> &'static str {
259                $path
260            }
261        }
262    };
263}
264
265impl_catalog_path_prefix!(QuoteTick, "quotes");
266impl_catalog_path_prefix!(TradeTick, "trades");
267impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
268impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
269impl_catalog_path_prefix!(Bar, "bars");