nautilus_serialization/
parquet.rs
1use std::{fs, fs::File, path::PathBuf};
17
18use arrow::record_batch::RecordBatch;
19use parquet::{
20 arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
21 file::{
22 properties::WriterProperties,
23 reader::{FileReader, SerializedFileReader},
24 statistics::Statistics,
25 },
26};
27
28use crate::enums::ParquetWriteMode;
29
30pub fn write_batch_to_parquet(
32 batch: RecordBatch,
33 filepath: &PathBuf,
34 compression: Option<parquet::basic::Compression>,
35 max_row_group_size: Option<usize>,
36 write_mode: Option<ParquetWriteMode>,
37) -> anyhow::Result<()> {
38 write_batches_to_parquet(
39 &[batch],
40 filepath,
41 compression,
42 max_row_group_size,
43 write_mode,
44 )
45}
46
47pub fn write_batches_to_parquet(
48 batches: &[RecordBatch],
49 filepath: &PathBuf,
50 compression: Option<parquet::basic::Compression>,
51 max_row_group_size: Option<usize>,
52 write_mode: Option<ParquetWriteMode>,
53) -> anyhow::Result<()> {
54 let used_write_mode = write_mode.unwrap_or(ParquetWriteMode::Overwrite);
55
56 if let Some(parent) = filepath.parent() {
58 std::fs::create_dir_all(parent)?;
59 }
60
61 if (used_write_mode == ParquetWriteMode::Append || used_write_mode == ParquetWriteMode::Prepend)
62 && filepath.exists()
63 {
64 let file = File::open(filepath)?;
66 let reader = ParquetRecordBatchReaderBuilder::try_new(file)?;
67 let existing_batches: Vec<RecordBatch> = reader.build()?.collect::<Result<Vec<_>, _>>()?;
68
69 if !existing_batches.is_empty() {
70 let mut combined = Vec::with_capacity(existing_batches.len() + batches.len());
71 let batches: Vec<RecordBatch> = batches.to_vec();
72
73 let combined_batches = if used_write_mode == ParquetWriteMode::Append {
75 combined.extend(existing_batches);
76 combined.extend(batches);
77 combined
78 } else {
79 combined.extend(batches.clone());
81 combined.extend(existing_batches);
82 combined
83 };
84
85 return write_batches_to_file(
86 &combined_batches,
87 filepath,
88 compression,
89 max_row_group_size,
90 );
91 }
92 }
93
94 write_batches_to_file(batches, filepath, compression, max_row_group_size)
96}
97
98pub fn combine_data_files(
99 parquet_files: Vec<PathBuf>,
100 column_name: &str,
101 compression: Option<parquet::basic::Compression>,
102 max_row_group_size: Option<usize>,
103) -> anyhow::Result<()> {
104 let n_files = parquet_files.len();
105
106 if n_files <= 1 {
107 return Ok(());
108 }
109
110 let min_max_per_file = parquet_files
112 .iter()
113 .map(|file| min_max_from_parquet_metadata(file, column_name))
114 .collect::<Result<Vec<_>, _>>()?;
115
116 let mut ordering: Vec<usize> = (0..n_files).collect();
118 ordering.sort_by_key(|&i| min_max_per_file[i].0);
119
120 for i in 1..n_files {
122 if min_max_per_file[ordering[i - 1]].1 >= min_max_per_file[ordering[i]].0 {
123 anyhow::bail!(
124 "Merging not safe due to intersection of timestamps between files. Aborting."
125 );
126 }
127 }
128
129 let sorted_parquet_files = ordering
131 .into_iter()
132 .map(|i| parquet_files[i].clone())
133 .collect();
134
135 combine_parquet_files(sorted_parquet_files, compression, max_row_group_size)
137}
138
139pub fn combine_parquet_files(
140 file_list: Vec<PathBuf>,
141 compression: Option<parquet::basic::Compression>,
142 max_row_group_size: Option<usize>,
143) -> anyhow::Result<()> {
144 if file_list.len() <= 1 {
145 return Ok(());
146 }
147
148 let mut readers = Vec::new();
150 for file in &file_list {
151 let file = File::open(file)?;
152 let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
153 readers.push(builder.build()?); }
155
156 let mut all_batches: Vec<RecordBatch> = Vec::new();
158 for reader in &mut readers {
159 for batch in reader.by_ref() {
160 all_batches.push(batch?);
161 }
162 }
163
164 write_batches_to_file(&all_batches, &file_list[0], compression, max_row_group_size)?;
166
167 for file_path in file_list.iter().skip(1) {
169 fs::remove_file(file_path)?;
170 }
171
172 Ok(())
173}
174
175fn write_batches_to_file(
176 batches: &[RecordBatch],
177 filepath: &PathBuf,
178 compression: Option<parquet::basic::Compression>,
179 max_row_group_size: Option<usize>,
180) -> anyhow::Result<()> {
181 let file = File::create(filepath)?;
182 let writer_props = WriterProperties::builder()
183 .set_compression(compression.unwrap_or(parquet::basic::Compression::SNAPPY))
184 .set_max_row_group_size(max_row_group_size.unwrap_or(5000))
185 .build();
186
187 let mut writer = ArrowWriter::try_new(file, batches[0].schema(), Some(writer_props))?;
188 for batch in batches {
189 writer.write(batch)?;
190 }
191 writer.close()?;
192
193 Ok(())
194}
195
196pub fn min_max_from_parquet_metadata(
197 file_path: &PathBuf,
198 column_name: &str,
199) -> anyhow::Result<(i64, i64)> {
200 let file = File::open(file_path)?;
202 let reader = SerializedFileReader::new(file)?;
203
204 let metadata = reader.metadata();
205 let mut overall_min_value: Option<i64> = None;
206 let mut overall_max_value: Option<i64> = None;
207
208 for i in 0..metadata.num_row_groups() {
210 let row_group = metadata.row_group(i);
211
212 for j in 0..row_group.num_columns() {
214 let col_metadata = row_group.column(j);
215
216 if col_metadata.column_path().string() == column_name {
217 if let Some(stats) = col_metadata.statistics() {
218 if let Statistics::Int64(int64_stats) = stats {
220 if let Some(&min_value) = int64_stats.min_opt() {
222 if overall_min_value.is_none() || min_value < overall_min_value.unwrap()
223 {
224 overall_min_value = Some(min_value);
225 }
226 }
227
228 if let Some(&max_value) = int64_stats.max_opt() {
230 if overall_max_value.is_none() || max_value > overall_max_value.unwrap()
231 {
232 overall_max_value = Some(max_value);
233 }
234 }
235 } else {
236 anyhow::bail!("Warning: Column name '{column_name}' is not of type i64.");
237 }
238 } else {
239 anyhow::bail!(
240 "Warning: Statistics not available for column '{column_name}' in row group {i}."
241 );
242 }
243 }
244 }
245 }
246
247 if let (Some(min), Some(max)) = (overall_min_value, overall_max_value) {
249 Ok((min, max))
250 } else {
251 anyhow::bail!(
252 "Column '{column_name}' not found or has no Int64 statistics in any row group."
253 )
254 }
255}