to_parquet/
to_parquet.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, path::PathBuf};
17
18use datafusion::parquet::{
19    arrow::ArrowWriter,
20    basic::{Compression, ZstdLevel},
21    file::properties::WriterProperties,
22};
23use nautilus_model::data::{Bar, OrderBookDelta, QuoteTick, TradeTick};
24use nautilus_persistence::python::backend::session::NautilusDataType;
25use nautilus_serialization::arrow::EncodeToRecordBatch;
26use serde_json::from_reader;
27
28fn determine_data_type(file_name: &str) -> Option<NautilusDataType> {
29    let file_name = file_name.to_lowercase();
30    if file_name.contains("quotes") || file_name.contains("quote_tick") {
31        Some(NautilusDataType::QuoteTick)
32    } else if file_name.contains("trades") || file_name.contains("trade_tick") {
33        Some(NautilusDataType::TradeTick)
34    } else if file_name.contains("bars") {
35        Some(NautilusDataType::Bar)
36    } else if file_name.contains("deltas") || file_name.contains("order_book_delta") {
37        Some(NautilusDataType::OrderBookDelta)
38    } else {
39        None
40    }
41}
42
43fn main() -> Result<(), Box<dyn std::error::Error>> {
44    let args: Vec<String> = std::env::args().collect();
45    if args.len() != 2 {
46        return Err("Usage: to_parquet <json_file>".into());
47    }
48    let file_path = PathBuf::from(&args[1]);
49
50    // Validate file extension
51    if !file_path
52        .extension()
53        .is_some_and(|ext| ext.eq_ignore_ascii_case("json"))
54    {
55        return Err("Input file must be a json file".into());
56    }
57
58    // Determine data type from filename
59    let data_type = determine_data_type(file_path.to_str().unwrap())
60        .ok_or("Could not determine data type from filename")?;
61
62    // Process based on data type
63    match data_type {
64        NautilusDataType::QuoteTick => process_data::<QuoteTick>(&file_path)?,
65        NautilusDataType::TradeTick => process_data::<TradeTick>(&file_path)?,
66        NautilusDataType::Bar => process_data::<Bar>(&file_path)?,
67        NautilusDataType::OrderBookDelta => process_data::<OrderBookDelta>(&file_path)?,
68        _ => return Err("Unsupported data type".into()),
69    }
70
71    Ok(())
72}
73
74fn process_data<T>(json_path: &PathBuf) -> Result<(), Box<dyn std::error::Error>>
75where
76    T: serde::de::DeserializeOwned + EncodeToRecordBatch,
77{
78    // Setup paths
79    let stem = json_path.file_stem().unwrap().to_str().unwrap();
80    let parent_path = PathBuf::from(".");
81    let parent = json_path.parent().unwrap_or(&parent_path);
82    let metadata_path = parent.join(format!("{stem}.metadata.json"));
83    let parquet_path = parent.join(format!("{stem}.parquet"));
84
85    // Read JSON data
86    let json_data = std::fs::read_to_string(json_path)?;
87    let data: Vec<T> = serde_json::from_str(&json_data)?;
88
89    // Read metadata
90    let metadata_file = std::fs::File::open(metadata_path)?;
91    let metadata: HashMap<String, String> = from_reader(metadata_file)?;
92
93    // Get row group size from metadata
94    let rows_per_group: usize = metadata
95        .get("rows_per_group")
96        .and_then(|s| s.parse().ok())
97        .unwrap_or(5000);
98
99    // Get schema from data type
100    let schema = T::get_schema(Some(metadata.clone()));
101
102    // Write to parquet
103    let mut output_file = std::fs::File::create(&parquet_path)?;
104    {
105        let writer_props = WriterProperties::builder()
106            .set_compression(Compression::ZSTD(ZstdLevel::default()))
107            .set_max_row_group_size(rows_per_group)
108            .build();
109
110        let mut writer = ArrowWriter::try_new(&mut output_file, schema.into(), Some(writer_props))?;
111
112        // Write data in chunks
113        for chunk in data.chunks(rows_per_group) {
114            let batch = T::encode_batch(&metadata, chunk)?;
115            writer.write(&batch)?;
116        }
117        writer.close()?;
118    }
119
120    println!(
121        "Successfully wrote {} records to {}",
122        data.len(),
123        parquet_path.display()
124    );
125    Ok(())
126}