1use std::{collections::HashMap, path::PathBuf};
17
18use datafusion::parquet::{
19 arrow::ArrowWriter,
20 basic::{Compression, ZstdLevel},
21 file::properties::WriterProperties,
22};
23use nautilus_model::data::{Bar, OrderBookDelta, QuoteTick, TradeTick};
24use nautilus_persistence::python::backend::session::NautilusDataType;
25use nautilus_serialization::arrow::EncodeToRecordBatch;
26use serde_json::from_reader;
27
28fn determine_data_type(file_name: &str) -> Option<NautilusDataType> {
29 let file_name = file_name.to_lowercase();
30 if file_name.contains("quotes") || file_name.contains("quote_tick") {
31 Some(NautilusDataType::QuoteTick)
32 } else if file_name.contains("trades") || file_name.contains("trade_tick") {
33 Some(NautilusDataType::TradeTick)
34 } else if file_name.contains("bars") {
35 Some(NautilusDataType::Bar)
36 } else if file_name.contains("deltas") || file_name.contains("order_book_delta") {
37 Some(NautilusDataType::OrderBookDelta)
38 } else {
39 None
40 }
41}
42
43fn main() -> Result<(), Box<dyn std::error::Error>> {
44 let args: Vec<String> = std::env::args().collect();
45 if args.len() != 2 {
46 return Err("Usage: to_parquet <json_file>".into());
47 }
48 let file_path = PathBuf::from(&args[1]);
49
50 if !file_path
52 .extension()
53 .is_some_and(|ext| ext.eq_ignore_ascii_case("json"))
54 {
55 return Err("Input file must be a json file".into());
56 }
57
58 let data_type = determine_data_type(file_path.to_str().unwrap())
60 .ok_or("Could not determine data type from filename")?;
61
62 match data_type {
64 NautilusDataType::QuoteTick => process_data::<QuoteTick>(&file_path)?,
65 NautilusDataType::TradeTick => process_data::<TradeTick>(&file_path)?,
66 NautilusDataType::Bar => process_data::<Bar>(&file_path)?,
67 NautilusDataType::OrderBookDelta => process_data::<OrderBookDelta>(&file_path)?,
68 _ => return Err("Unsupported data type".into()),
69 }
70
71 Ok(())
72}
73
74fn process_data<T>(json_path: &PathBuf) -> Result<(), Box<dyn std::error::Error>>
75where
76 T: serde::de::DeserializeOwned + EncodeToRecordBatch,
77{
78 let stem = json_path.file_stem().unwrap().to_str().unwrap();
80 let parent_path = PathBuf::from(".");
81 let parent = json_path.parent().unwrap_or(&parent_path);
82 let metadata_path = parent.join(format!("{stem}.metadata.json"));
83 let parquet_path = parent.join(format!("{stem}.parquet"));
84
85 let json_data = std::fs::read_to_string(json_path)?;
87 let data: Vec<T> = serde_json::from_str(&json_data)?;
88
89 let metadata_file = std::fs::File::open(metadata_path)?;
91 let metadata: HashMap<String, String> = from_reader(metadata_file)?;
92
93 let rows_per_group: usize = metadata
95 .get("rows_per_group")
96 .and_then(|s| s.parse().ok())
97 .unwrap_or(5000);
98
99 let schema = T::get_schema(Some(metadata.clone()));
101
102 let mut output_file = std::fs::File::create(&parquet_path)?;
104 {
105 let writer_props = WriterProperties::builder()
106 .set_compression(Compression::ZSTD(ZstdLevel::default()))
107 .set_max_row_group_size(rows_per_group)
108 .build();
109
110 let mut writer = ArrowWriter::try_new(&mut output_file, schema.into(), Some(writer_props))?;
111
112 for chunk in data.chunks(rows_per_group) {
114 let batch = T::encode_batch(&metadata, chunk)?;
115 writer.write(&batch)?;
116 }
117 writer.close()?;
118 }
119
120 println!(
121 "Successfully wrote {} records to {}",
122 data.len(),
123 parquet_path.display()
124 );
125 Ok(())
126}