to_json/
to_json.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::path::PathBuf;
17
18use datafusion::parquet::file::reader::{FileReader, SerializedFileReader};
19use nautilus_model::data::{to_variant, Bar, Data, OrderBookDelta, QuoteTick, TradeTick};
20use nautilus_persistence::{
21    backend::session::DataBackendSession, python::backend::session::NautilusDataType,
22};
23use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
24use serde_json::to_writer_pretty;
25
26fn determine_data_type(file_name: &str) -> Option<NautilusDataType> {
27    let file_name = file_name.to_lowercase();
28    if file_name.contains("quotes") || file_name.contains("quote_tick") {
29        Some(NautilusDataType::QuoteTick)
30    } else if file_name.contains("trades") || file_name.contains("trade_tick") {
31        Some(NautilusDataType::TradeTick)
32    } else if file_name.contains("bars") {
33        Some(NautilusDataType::Bar)
34    } else if file_name.contains("deltas") || file_name.contains("order_book_delta") {
35        Some(NautilusDataType::OrderBookDelta)
36    } else {
37        None
38    }
39}
40
41fn main() -> Result<(), Box<dyn std::error::Error>> {
42    let args: Vec<String> = std::env::args().collect();
43    if args.len() != 2 {
44        return Err("Usage: to_json <file>".into());
45    }
46    let file_path = PathBuf::from(&args[1]);
47
48    // Validate file extension
49    if !file_path
50        .extension()
51        .is_some_and(|ext| ext.eq_ignore_ascii_case("parquet"))
52    {
53        return Err("Input file must be a parquet file".into());
54    }
55
56    // Determine data type from filename
57    let data_type = determine_data_type(file_path.to_str().unwrap())
58        .ok_or("Could not determine data type from filename")?;
59
60    // Setup session and read data
61    let mut session = DataBackendSession::new(5000);
62    let file_str = file_path.to_str().unwrap();
63
64    // Process based on data type
65    match data_type {
66        NautilusDataType::QuoteTick => process_data::<QuoteTick>(file_str, &mut session)?,
67        NautilusDataType::TradeTick => process_data::<TradeTick>(file_str, &mut session)?,
68        NautilusDataType::Bar => process_data::<Bar>(file_str, &mut session)?,
69        NautilusDataType::OrderBookDelta => process_data::<OrderBookDelta>(file_str, &mut session)?,
70        _ => return Err("Unsupported data type".into()),
71    }
72
73    Ok(())
74}
75
76fn process_data<T>(
77    file_path: &str,
78    session: &mut DataBackendSession,
79) -> Result<(), Box<dyn std::error::Error>>
80where
81    T: serde::Serialize + TryFrom<Data> + EncodeToRecordBatch + DecodeDataFromRecordBatch,
82{
83    // Setup output paths
84    let input_path = PathBuf::from(file_path);
85    let stem = input_path.file_stem().unwrap().to_str().unwrap();
86    let default = PathBuf::from(".");
87    let parent = input_path.parent().unwrap_or(&default);
88    let json_path = parent.join(format!("{stem}.json"));
89    let metadata_path = parent.join(format!("{stem}.metadata.json"));
90
91    // Read parquet metadata
92    let parquet_file = std::fs::File::open(file_path)?;
93    let reader = SerializedFileReader::new(parquet_file)?;
94    let row_group_metadata = reader.metadata().row_group(0);
95    let rows_per_group = row_group_metadata.num_rows();
96
97    // Read data
98    session.add_file::<T>("data", file_path, None)?;
99    let query_result = session.get_query_result();
100    let data = query_result.collect::<Vec<_>>();
101    let data: Vec<T> = to_variant(data);
102
103    // Extract metadata and add row group info
104    let mut metadata = T::chunk_metadata(&data);
105    metadata.insert("rows_per_group".to_string(), rows_per_group.to_string());
106
107    // Write data to JSON
108    let json_file = std::fs::File::create(json_path)?;
109    to_writer_pretty(json_file, &data)?;
110
111    // Write metadata to JSON
112    let metadata_file = std::fs::File::create(metadata_path)?;
113    to_writer_pretty(metadata_file, &metadata)?;
114
115    println!("Successfully processed {} records", data.len());
116    Ok(())
117}