nautilus_persistence/backend/
catalog.rs1use std::path::PathBuf;
17
18use datafusion::{arrow::record_batch::RecordBatch, error::Result};
19use heck::ToSnakeCase;
20use itertools::Itertools;
21use log::info;
22use nautilus_core::UnixNanos;
23use nautilus_model::data::{
24 Bar, Data, GetTsInit, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
25};
26use nautilus_serialization::{
27 arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch},
28 parquet::write_batches_to_parquet,
29};
30use serde::Serialize;
31
32use super::session::{self, DataBackendSession, QueryResult, build_query};
33
34pub struct ParquetDataCatalog {
35 base_path: PathBuf,
36 batch_size: usize,
37 session: DataBackendSession,
38}
39
40impl ParquetDataCatalog {
41 #[must_use]
42 pub fn new(base_path: PathBuf, batch_size: Option<usize>) -> Self {
43 let batch_size = batch_size.unwrap_or(5000);
44 Self {
45 base_path,
46 batch_size,
47 session: session::DataBackendSession::new(batch_size),
48 }
49 }
50
51 fn make_path(&self, type_name: &str, instrument_id: Option<&String>) -> PathBuf {
52 let mut path = self.base_path.join("data").join(type_name);
53
54 if let Some(id) = instrument_id {
55 path = path.join(id);
56 }
57
58 std::fs::create_dir_all(&path).expect("Failed to create directory");
59 let file_path = path.join("data.parquet");
60 info!("Created directory path: {:?}", file_path);
61 file_path
62 }
63
64 fn check_ascending_timestamps<T: GetTsInit>(data: &[T], type_name: &str) {
65 assert!(
66 data.windows(2).all(|w| w[0].ts_init() <= w[1].ts_init()),
67 "{type_name} timestamps must be in ascending order"
68 );
69 }
70
71 #[must_use]
72 pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> Vec<RecordBatch>
73 where
74 T: GetTsInit + EncodeToRecordBatch,
75 {
76 data.into_iter()
77 .chunks(self.batch_size)
78 .into_iter()
79 .map(|chunk| {
80 let data = chunk.collect_vec();
83 let metadata = EncodeToRecordBatch::chunk_metadata(&data);
84 T::encode_batch(&metadata, &data).expect("Expected to encode batch")
85 })
86 .collect()
87 }
88
89 #[must_use]
90 pub fn write_to_json<T>(
91 &self,
92 data: Vec<T>,
93 path: Option<PathBuf>,
94 write_metadata: bool,
95 ) -> PathBuf
96 where
97 T: GetTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
98 {
99 let type_name = std::any::type_name::<T>().to_snake_case();
100 Self::check_ascending_timestamps(&data, &type_name);
101
102 let json_path = path.unwrap_or_else(|| {
103 let path = self.make_path(T::path_prefix(), None);
104 path.with_extension("json")
105 });
106
107 info!(
108 "Writing {} records of {type_name} data to {json_path:?}",
109 data.len(),
110 );
111
112 if write_metadata {
113 let metadata = T::chunk_metadata(&data);
114 let metadata_path = json_path.with_extension("metadata.json");
115 info!("Writing metadata to {:?}", metadata_path);
116 let metadata_file = std::fs::File::create(&metadata_path)
117 .unwrap_or_else(|_| panic!("Failed to create metadata file at {metadata_path:?}"));
118 serde_json::to_writer_pretty(metadata_file, &metadata)
119 .unwrap_or_else(|_| panic!("Failed to write metadata to JSON"));
120 }
121
122 let file = std::fs::File::create(&json_path)
123 .unwrap_or_else(|_| panic!("Failed to create JSON file at {json_path:?}"));
124
125 serde_json::to_writer_pretty(file, &serde_json::to_value(data).unwrap())
126 .unwrap_or_else(|_| panic!("Failed to write {type_name} to JSON"));
127
128 json_path
129 }
130
131 #[must_use]
132 pub fn write_to_parquet<T>(
133 &self,
134 data: Vec<T>,
135 path: Option<PathBuf>,
136 compression: Option<parquet::basic::Compression>,
137 max_row_group_size: Option<usize>,
138 ) -> PathBuf
139 where
140 T: GetTsInit + EncodeToRecordBatch + CatalogPathPrefix,
141 {
142 let type_name = std::any::type_name::<T>().to_snake_case();
143 Self::check_ascending_timestamps(&data, &type_name);
144
145 let batches = self.data_to_record_batches(data);
146 let batch = batches.first().expect("Expected at least one batch");
147 let schema = batch.schema();
148 let instrument_id = schema.metadata.get("instrument_id");
149 let path = path.unwrap_or_else(|| self.make_path(T::path_prefix(), instrument_id));
150
151 info!(
153 "Writing {} batches of {} data to {:?}",
154 batches.len(),
155 type_name,
156 path
157 );
158
159 write_batches_to_parquet(&batches, &path, compression, max_row_group_size)
160 .unwrap_or_else(|_| panic!("Failed to write {type_name} to parquet"));
161
162 path
163 }
164
165 pub fn query_file<T>(
167 &mut self,
168 path: PathBuf,
169 start: Option<UnixNanos>,
170 end: Option<UnixNanos>,
171 where_clause: Option<&str>,
172 ) -> Result<QueryResult>
173 where
174 T: DecodeDataFromRecordBatch + CatalogPathPrefix,
175 {
176 let path_str = path.to_str().unwrap();
177 let table_name = path.file_stem().unwrap().to_str().unwrap();
178 let query = build_query(table_name, start, end, where_clause);
179 self.session
180 .add_file::<T>(table_name, path_str, Some(&query))?;
181 Ok(self.session.get_query_result())
182 }
183
184 pub fn query_directory<T>(
186 &mut self,
187 instrument_ids: Vec<String>,
189 start: Option<UnixNanos>,
190 end: Option<UnixNanos>,
191 where_clause: Option<&str>,
192 ) -> Result<QueryResult>
193 where
194 T: DecodeDataFromRecordBatch + CatalogPathPrefix,
195 {
196 let mut paths = Vec::new();
197 for instrument_id in &instrument_ids {
198 paths.push(self.make_path(T::path_prefix(), Some(instrument_id)));
199 }
200
201 if paths.is_empty() {
203 paths.push(self.make_path(T::path_prefix(), None));
204 }
205
206 for path in &paths {
207 let path = path.to_str().unwrap();
208 let query = build_query(path, start, end, where_clause);
209 self.session.add_file::<T>(path, path, Some(&query))?;
210 }
211
212 Ok(self.session.get_query_result())
213 }
214
215 pub fn write_data_enum(&self, data: Vec<Data>) {
216 let mut delta: Vec<OrderBookDelta> = Vec::new();
217 let mut depth10: Vec<OrderBookDepth10> = Vec::new();
218 let mut quote: Vec<QuoteTick> = Vec::new();
219 let mut trade: Vec<TradeTick> = Vec::new();
220 let mut bar: Vec<Bar> = Vec::new();
221
222 for d in data.iter().cloned() {
223 match d {
224 Data::Delta(d) => {
225 delta.push(d);
226 }
227 Data::Depth10(d) => {
228 depth10.push(*d);
229 }
230 Data::Quote(d) => {
231 quote.push(d);
232 }
233 Data::Trade(d) => {
234 trade.push(d);
235 }
236 Data::Bar(d) => {
237 bar.push(d);
238 }
239 Data::Deltas(_) => continue,
240 }
241 }
242
243 let _ = self.write_to_parquet(delta, None, None, None);
244 let _ = self.write_to_parquet(depth10, None, None, None);
245 let _ = self.write_to_parquet(quote, None, None, None);
246 let _ = self.write_to_parquet(trade, None, None, None);
247 let _ = self.write_to_parquet(bar, None, None, None);
248 }
249}
250
251pub trait CatalogPathPrefix {
252 fn path_prefix() -> &'static str;
253}
254
255macro_rules! impl_catalog_path_prefix {
256 ($type:ty, $path:expr) => {
257 impl CatalogPathPrefix for $type {
258 fn path_prefix() -> &'static str {
259 $path
260 }
261 }
262 };
263}
264
265impl_catalog_path_prefix!(QuoteTick, "quotes");
266impl_catalog_path_prefix!(TradeTick, "trades");
267impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
268impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
269impl_catalog_path_prefix!(Bar, "bars");