nautilus_serialization/
parquet.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{fs, fs::File, path::PathBuf};
17
18use arrow::record_batch::RecordBatch;
19use parquet::{
20    arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
21    file::{
22        properties::WriterProperties,
23        reader::{FileReader, SerializedFileReader},
24        statistics::Statistics,
25    },
26};
27
28use crate::enums::ParquetWriteMode;
29
30/// Writes a `RecordBatch` to a Parquet file at the specified `filepath`, with optional compression.
31pub fn write_batch_to_parquet(
32    batch: RecordBatch,
33    filepath: &PathBuf,
34    compression: Option<parquet::basic::Compression>,
35    max_row_group_size: Option<usize>,
36    write_mode: Option<ParquetWriteMode>,
37) -> anyhow::Result<()> {
38    write_batches_to_parquet(
39        &[batch],
40        filepath,
41        compression,
42        max_row_group_size,
43        write_mode,
44    )
45}
46
47pub fn write_batches_to_parquet(
48    batches: &[RecordBatch],
49    filepath: &PathBuf,
50    compression: Option<parquet::basic::Compression>,
51    max_row_group_size: Option<usize>,
52    write_mode: Option<ParquetWriteMode>,
53) -> anyhow::Result<()> {
54    let used_write_mode = write_mode.unwrap_or(ParquetWriteMode::Overwrite);
55
56    // Ensure the parent directory exists
57    if let Some(parent) = filepath.parent() {
58        std::fs::create_dir_all(parent)?;
59    }
60
61    if (used_write_mode == ParquetWriteMode::Append || used_write_mode == ParquetWriteMode::Prepend)
62        && filepath.exists()
63    {
64        // Read existing parquet file
65        let file = File::open(filepath)?;
66        let reader = ParquetRecordBatchReaderBuilder::try_new(file)?;
67        let existing_batches: Vec<RecordBatch> = reader.build()?.collect::<Result<Vec<_>, _>>()?;
68
69        if !existing_batches.is_empty() {
70            let mut combined = Vec::with_capacity(existing_batches.len() + batches.len());
71            let batches: Vec<RecordBatch> = batches.to_vec();
72
73            // Combine batches in the appropriate order
74            let combined_batches = if used_write_mode == ParquetWriteMode::Append {
75                combined.extend(existing_batches);
76                combined.extend(batches);
77                combined
78            } else {
79                // Prepend mode
80                combined.extend(batches.clone());
81                combined.extend(existing_batches);
82                combined
83            };
84
85            return write_batches_to_file(
86                &combined_batches,
87                filepath,
88                compression,
89                max_row_group_size,
90            );
91        }
92    }
93
94    // Default case: create new file or overwrite existing
95    write_batches_to_file(batches, filepath, compression, max_row_group_size)
96}
97
98pub fn combine_data_files(
99    parquet_files: Vec<PathBuf>,
100    column_name: &str,
101    compression: Option<parquet::basic::Compression>,
102    max_row_group_size: Option<usize>,
103) -> anyhow::Result<()> {
104    let n_files = parquet_files.len();
105
106    if n_files <= 1 {
107        return Ok(());
108    }
109
110    // Get min/max for each file
111    let min_max_per_file = parquet_files
112        .iter()
113        .map(|file| min_max_from_parquet_metadata(file, column_name))
114        .collect::<Result<Vec<_>, _>>()?;
115
116    // Create ordering by first timestamp
117    let mut ordering: Vec<usize> = (0..n_files).collect();
118    ordering.sort_by_key(|&i| min_max_per_file[i].0);
119
120    // Check for timestamp intersection
121    for i in 1..n_files {
122        if min_max_per_file[ordering[i - 1]].1 >= min_max_per_file[ordering[i]].0 {
123            anyhow::bail!(
124                "Merging not safe due to intersection of timestamps between files. Aborting."
125            );
126        }
127    }
128
129    // Create sorted list of files
130    let sorted_parquet_files = ordering
131        .into_iter()
132        .map(|i| parquet_files[i].clone())
133        .collect();
134
135    // Combine the files
136    combine_parquet_files(sorted_parquet_files, compression, max_row_group_size)
137}
138
139pub fn combine_parquet_files(
140    file_list: Vec<PathBuf>,
141    compression: Option<parquet::basic::Compression>,
142    max_row_group_size: Option<usize>,
143) -> anyhow::Result<()> {
144    if file_list.len() <= 1 {
145        return Ok(());
146    }
147
148    // Create readers and immediately build them.  Store the *readers*, not the builders.
149    let mut readers = Vec::new();
150    for file in &file_list {
151        let file = File::open(file)?;
152        let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
153        readers.push(builder.build()?); // Build immediately and store the reader.
154    }
155
156    // Collect all batches into a single vector
157    let mut all_batches: Vec<RecordBatch> = Vec::new();
158    for reader in &mut readers {
159        for batch in reader.by_ref() {
160            all_batches.push(batch?);
161        }
162    }
163
164    // Use write_batches_to_file to write the combined batches
165    write_batches_to_file(&all_batches, &file_list[0], compression, max_row_group_size)?;
166
167    // Remove the merged files.
168    for file_path in file_list.iter().skip(1) {
169        fs::remove_file(file_path)?;
170    }
171
172    Ok(())
173}
174
175fn write_batches_to_file(
176    batches: &[RecordBatch],
177    filepath: &PathBuf,
178    compression: Option<parquet::basic::Compression>,
179    max_row_group_size: Option<usize>,
180) -> anyhow::Result<()> {
181    let file = File::create(filepath)?;
182    let writer_props = WriterProperties::builder()
183        .set_compression(compression.unwrap_or(parquet::basic::Compression::SNAPPY))
184        .set_max_row_group_size(max_row_group_size.unwrap_or(5000))
185        .build();
186
187    let mut writer = ArrowWriter::try_new(file, batches[0].schema(), Some(writer_props))?;
188    for batch in batches {
189        writer.write(batch)?;
190    }
191    writer.close()?;
192
193    Ok(())
194}
195
196pub fn min_max_from_parquet_metadata(
197    file_path: &PathBuf,
198    column_name: &str,
199) -> anyhow::Result<(i64, i64)> {
200    // Open the parquet file
201    let file = File::open(file_path)?;
202    let reader = SerializedFileReader::new(file)?;
203
204    let metadata = reader.metadata();
205    let mut overall_min_value: Option<i64> = None;
206    let mut overall_max_value: Option<i64> = None;
207
208    // Iterate through all row groups
209    for i in 0..metadata.num_row_groups() {
210        let row_group = metadata.row_group(i);
211
212        // Iterate through all columns in this row group
213        for j in 0..row_group.num_columns() {
214            let col_metadata = row_group.column(j);
215
216            if col_metadata.column_path().string() == column_name {
217                if let Some(stats) = col_metadata.statistics() {
218                    // Check if we have Int64 statistics
219                    if let Statistics::Int64(int64_stats) = stats {
220                        // Extract min value if available
221                        if let Some(&min_value) = int64_stats.min_opt() {
222                            if overall_min_value.is_none() || min_value < overall_min_value.unwrap()
223                            {
224                                overall_min_value = Some(min_value);
225                            }
226                        }
227
228                        // Extract max value if available
229                        if let Some(&max_value) = int64_stats.max_opt() {
230                            if overall_max_value.is_none() || max_value > overall_max_value.unwrap()
231                            {
232                                overall_max_value = Some(max_value);
233                            }
234                        }
235                    } else {
236                        anyhow::bail!("Warning: Column name '{column_name}' is not of type i64.");
237                    }
238                } else {
239                    anyhow::bail!(
240                        "Warning: Statistics not available for column '{column_name}' in row group {i}."
241                    );
242                }
243            }
244        }
245    }
246
247    // Return the min/max pair if both are available
248    if let (Some(min), Some(max)) = (overall_min_value, overall_max_value) {
249        Ok((min, max))
250    } else {
251        anyhow::bail!(
252            "Column '{column_name}' not found or has no Int64 statistics in any row group."
253        )
254    }
255}