1use std::path::PathBuf;
17
18use datafusion::arrow::record_batch::RecordBatch;
19use heck::ToSnakeCase;
20use itertools::Itertools;
21use log::info;
22use nautilus_core::UnixNanos;
23use nautilus_model::data::{
24 Bar, Data, GetTsInit, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10,
25 QuoteTick, TradeTick, close::InstrumentClose,
26};
27use nautilus_serialization::{
28 arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch},
29 enums::ParquetWriteMode,
30 parquet::{combine_data_files, min_max_from_parquet_metadata, write_batches_to_parquet},
31};
32use serde::Serialize;
33
34use super::session::{self, DataBackendSession, QueryResult, build_query};
35
36pub struct ParquetDataCatalog {
37 base_path: PathBuf,
38 batch_size: usize,
39 session: DataBackendSession,
40}
41
42impl ParquetDataCatalog {
43 #[must_use]
44 pub fn new(base_path: PathBuf, batch_size: Option<usize>) -> Self {
45 let batch_size = batch_size.unwrap_or(5000);
46 Self {
47 base_path,
48 batch_size,
49 session: session::DataBackendSession::new(batch_size),
50 }
51 }
52
53 pub fn write_data_enum(&self, data: Vec<Data>, write_mode: Option<ParquetWriteMode>) {
54 let mut deltas: Vec<OrderBookDelta> = Vec::new();
55 let mut depth10s: Vec<OrderBookDepth10> = Vec::new();
56 let mut quotes: Vec<QuoteTick> = Vec::new();
57 let mut trades: Vec<TradeTick> = Vec::new();
58 let mut bars: Vec<Bar> = Vec::new();
59 let mut mark_prices: Vec<MarkPriceUpdate> = Vec::new();
60 let mut index_prices: Vec<IndexPriceUpdate> = Vec::new();
61 let mut closes: Vec<InstrumentClose> = Vec::new();
62
63 for d in data.iter().cloned() {
64 match d {
65 Data::Deltas(_) => continue,
66 Data::Delta(d) => {
67 deltas.push(d);
68 }
69 Data::Depth10(d) => {
70 depth10s.push(*d);
71 }
72 Data::Quote(d) => {
73 quotes.push(d);
74 }
75 Data::Trade(d) => {
76 trades.push(d);
77 }
78 Data::Bar(d) => {
79 bars.push(d);
80 }
81 Data::MarkPriceUpdate(p) => {
82 mark_prices.push(p);
83 }
84 Data::IndexPriceUpdate(p) => {
85 index_prices.push(p);
86 }
87 Data::InstrumentClose(c) => {
88 closes.push(c);
89 }
90 }
91 }
92
93 let _ = self.write_to_parquet(deltas, None, None, None, write_mode);
94 let _ = self.write_to_parquet(depth10s, None, None, None, write_mode);
95 let _ = self.write_to_parquet(quotes, None, None, None, write_mode);
96 let _ = self.write_to_parquet(trades, None, None, None, write_mode);
97 let _ = self.write_to_parquet(bars, None, None, None, write_mode);
98 let _ = self.write_to_parquet(mark_prices, None, None, None, write_mode);
99 let _ = self.write_to_parquet(index_prices, None, None, None, write_mode);
100 let _ = self.write_to_parquet(closes, None, None, None, write_mode);
101 }
102
103 pub fn write_to_parquet<T>(
104 &self,
105 data: Vec<T>,
106 path: Option<PathBuf>,
107 compression: Option<parquet::basic::Compression>,
108 max_row_group_size: Option<usize>,
109 write_mode: Option<ParquetWriteMode>,
110 ) -> anyhow::Result<PathBuf>
111 where
112 T: GetTsInit + EncodeToRecordBatch + CatalogPathPrefix,
113 {
114 let type_name = std::any::type_name::<T>().to_snake_case();
115 Self::check_ascending_timestamps(&data, &type_name);
116 let batches = self.data_to_record_batches(data)?;
117 let schema = batches.first().expect("Batches are empty.").schema();
118 let instrument_id = schema.metadata.get("instrument_id").cloned();
119 let new_path = self.make_path(T::path_prefix(), instrument_id, write_mode)?;
120 let path = path.unwrap_or(new_path);
121
122 info!(
124 "Writing {} batches of {type_name} data to {path:?}",
125 batches.len()
126 );
127
128 write_batches_to_parquet(&batches, &path, compression, max_row_group_size, write_mode)?;
129
130 Ok(path)
131 }
132
133 fn check_ascending_timestamps<T: GetTsInit>(data: &[T], type_name: &str) {
134 assert!(
135 data.windows(2).all(|w| w[0].ts_init() <= w[1].ts_init()),
136 "{type_name} timestamps must be in ascending order"
137 );
138 }
139
140 pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> anyhow::Result<Vec<RecordBatch>>
141 where
142 T: GetTsInit + EncodeToRecordBatch,
143 {
144 let mut batches = Vec::new();
145
146 for chunk in &data.into_iter().chunks(self.batch_size) {
147 let data = chunk.collect_vec();
148 let metadata = EncodeToRecordBatch::chunk_metadata(&data);
149 let record_batch = T::encode_batch(&metadata, &data)?;
150 batches.push(record_batch);
151 }
152
153 Ok(batches)
154 }
155
156 fn make_path(
157 &self,
158 type_name: &str,
159 instrument_id: Option<String>,
160 write_mode: Option<ParquetWriteMode>,
161 ) -> anyhow::Result<PathBuf> {
162 let path = self.make_directory_path(type_name, instrument_id);
163 std::fs::create_dir_all(&path)?;
164 let used_write_mode = write_mode.unwrap_or(ParquetWriteMode::Overwrite);
165 let mut file_path = path.join("data-0.parquet");
166 let mut empty_path = file_path.clone();
167 let mut i = 0;
168
169 while empty_path.exists() {
170 i += 1;
171 let name = format!("data-{i}.parquet");
172 empty_path = path.join(name);
173 }
174
175 if i > 1 && used_write_mode != ParquetWriteMode::NewFile {
176 anyhow::bail!(
177 "Only ParquetWriteMode::NewFile is allowed for a directory containing several parquet files."
178 );
179 } else if used_write_mode == ParquetWriteMode::NewFile {
180 file_path = empty_path;
181 }
182
183 info!("Created directory path: {file_path:?}");
184
185 Ok(file_path)
186 }
187
188 fn make_directory_path(&self, type_name: &str, instrument_id: Option<String>) -> PathBuf {
189 let mut path = self.base_path.join("data").join(type_name);
190
191 if let Some(id) = instrument_id {
192 path = path.join(id.replace('/', "")); }
194
195 path
196 }
197
198 pub fn write_to_json<T>(
199 &self,
200 data: Vec<T>,
201 path: Option<PathBuf>,
202 write_metadata: bool,
203 ) -> anyhow::Result<PathBuf>
204 where
205 T: GetTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
206 {
207 let type_name = std::any::type_name::<T>().to_snake_case();
208 Self::check_ascending_timestamps(&data, &type_name);
209 let new_path = self.make_path(T::path_prefix(), None, None)?;
210 let json_path = path.unwrap_or(new_path.with_extension("json"));
211
212 info!(
213 "Writing {} records of {type_name} data to {json_path:?}",
214 data.len(),
215 );
216
217 if write_metadata {
218 let metadata = T::chunk_metadata(&data);
219 let metadata_path = json_path.with_extension("metadata.json");
220 info!("Writing metadata to {metadata_path:?}");
221 let metadata_file = std::fs::File::create(&metadata_path)?;
222 serde_json::to_writer_pretty(metadata_file, &metadata)?;
223 }
224
225 let file = std::fs::File::create(&json_path)?;
226 serde_json::to_writer_pretty(file, &serde_json::to_value(data)?)?;
227
228 Ok(json_path)
229 }
230
231 pub fn consolidate_data(
232 &self,
233 type_name: &str,
234 instrument_id: Option<String>,
235 ) -> anyhow::Result<()> {
236 let parquet_files = self.query_parquet_files(type_name, instrument_id)?;
237
238 if !parquet_files.is_empty() {
239 combine_data_files(parquet_files, "ts_init", None, None)?;
240 }
241
242 Ok(())
243 }
244
245 pub fn consolidate_catalog(&self) -> anyhow::Result<()> {
246 let leaf_directories = self.find_leaf_data_directories()?;
247
248 for directory in leaf_directories {
249 let parquet_files: Vec<PathBuf> = std::fs::read_dir(directory)?
250 .filter_map(|entry| {
251 let path = entry.ok()?.path();
252
253 if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
254 Some(path)
255 } else {
256 None
257 }
258 })
259 .collect();
260
261 if !parquet_files.is_empty() {
262 combine_data_files(parquet_files, "ts_init", None, None)?;
263 }
264 }
265
266 Ok(())
267 }
268
269 pub fn find_leaf_data_directories(&self) -> anyhow::Result<Vec<PathBuf>> {
270 let mut all_paths: Vec<PathBuf> = Vec::new();
271 let data_dir = self.base_path.join("data");
272
273 for entry in walkdir::WalkDir::new(data_dir) {
274 all_paths.push(entry?.path().to_path_buf());
275 }
276
277 let all_dirs = all_paths
278 .iter()
279 .filter(|p| p.is_dir())
280 .cloned()
281 .collect::<Vec<PathBuf>>();
282 let mut leaf_dirs = Vec::new();
283
284 for directory in all_dirs {
285 let items = std::fs::read_dir(&directory)?;
286 let has_subdirs = items.into_iter().any(|entry| {
287 let entry = entry.unwrap();
288 entry.path().is_dir()
289 });
290 let has_files = std::fs::read_dir(&directory)?.any(|entry| {
291 let entry = entry.unwrap();
292 entry.path().is_file()
293 });
294
295 if has_files && !has_subdirs {
296 leaf_dirs.push(directory);
297 }
298 }
299
300 Ok(leaf_dirs)
301 }
302
303 pub fn query_file<T>(
305 &mut self,
306 path: PathBuf,
307 start: Option<UnixNanos>,
308 end: Option<UnixNanos>,
309 where_clause: Option<&str>,
310 ) -> anyhow::Result<QueryResult>
311 where
312 T: DecodeDataFromRecordBatch + CatalogPathPrefix,
313 {
314 let path_str = path.to_str().expect("Failed to convert path to string");
315 let table_name = path
316 .file_stem()
317 .unwrap()
318 .to_str()
319 .expect("Failed to convert path to string");
320 let query = build_query(table_name, start, end, where_clause);
321 self.session
322 .add_file::<T>(table_name, path_str, Some(&query))?;
323
324 Ok(self.session.get_query_result())
325 }
326
327 pub fn query_directory<T>(
329 &mut self,
330 instrument_ids: Vec<String>,
331 start: Option<UnixNanos>,
332 end: Option<UnixNanos>,
333 where_clause: Option<&str>,
334 ) -> anyhow::Result<QueryResult>
335 where
336 T: DecodeDataFromRecordBatch + CatalogPathPrefix,
337 {
338 let mut paths = Vec::new();
339
340 for instrument_id in instrument_ids {
341 paths.extend(self.query_parquet_files(T::path_prefix(), Some(instrument_id))?);
342 }
343
344 if paths.is_empty() {
346 paths.push(self.make_path(T::path_prefix(), None, None)?);
347 }
348
349 for path in &paths {
350 let path = path.to_str().expect("Failed to convert path to string");
351 let query = build_query(path, start, end, where_clause);
352 self.session.add_file::<T>(path, path, Some(&query))?;
353 }
354
355 Ok(self.session.get_query_result())
356 }
357
358 #[allow(dead_code)]
359 pub fn query_timestamp_bound(
360 &self,
361 data_cls: &str,
362 instrument_id: Option<String>,
363 is_last: Option<bool>,
364 ) -> anyhow::Result<Option<i64>> {
365 let is_last = is_last.unwrap_or(true);
366 let parquet_files = self.query_parquet_files(data_cls, instrument_id)?;
367
368 if parquet_files.is_empty() {
369 return Ok(None);
370 }
371
372 let min_max_per_file: Vec<(i64, i64)> = parquet_files
373 .iter()
374 .map(|file| min_max_from_parquet_metadata(file, "ts_init"))
375 .collect::<Result<Vec<_>, _>>()?;
376 let mut timestamps: Vec<i64> = Vec::new();
377
378 for min_max in min_max_per_file {
379 let (min, max) = min_max;
380
381 if is_last {
382 timestamps.push(max);
383 } else {
384 timestamps.push(min);
385 }
386 }
387
388 if timestamps.is_empty() {
389 return Ok(None);
390 }
391
392 if is_last {
393 Ok(timestamps.iter().max().copied())
394 } else {
395 Ok(timestamps.iter().min().copied())
396 }
397 }
398
399 pub fn query_parquet_files(
400 &self,
401 type_name: &str,
402 instrument_id: Option<String>,
403 ) -> anyhow::Result<Vec<PathBuf>> {
404 let path = self.make_directory_path(type_name, instrument_id);
405 let mut files = Vec::new();
406
407 if path.exists() {
408 for entry in std::fs::read_dir(path)? {
409 let path = entry?.path();
410 if path.is_file() && path.extension().unwrap() == "parquet" {
411 files.push(path);
412 }
413 }
414 }
415
416 Ok(files)
417 }
418}
419
420pub trait CatalogPathPrefix {
421 fn path_prefix() -> &'static str;
422}
423
424macro_rules! impl_catalog_path_prefix {
425 ($type:ty, $path:expr) => {
426 impl CatalogPathPrefix for $type {
427 fn path_prefix() -> &'static str {
428 $path
429 }
430 }
431 };
432}
433
434impl_catalog_path_prefix!(QuoteTick, "quotes");
435impl_catalog_path_prefix!(TradeTick, "trades");
436impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
437impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
438impl_catalog_path_prefix!(Bar, "bars");
439impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
440impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
441impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");