1use std::path::PathBuf;
17
18use datafusion::parquet::file::reader::{FileReader, SerializedFileReader};
19use nautilus_model::data::{to_variant, Bar, Data, OrderBookDelta, QuoteTick, TradeTick};
20use nautilus_persistence::{
21 backend::session::DataBackendSession, python::backend::session::NautilusDataType,
22};
23use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
24use serde_json::to_writer_pretty;
25
26fn determine_data_type(file_name: &str) -> Option<NautilusDataType> {
27 let file_name = file_name.to_lowercase();
28 if file_name.contains("quotes") || file_name.contains("quote_tick") {
29 Some(NautilusDataType::QuoteTick)
30 } else if file_name.contains("trades") || file_name.contains("trade_tick") {
31 Some(NautilusDataType::TradeTick)
32 } else if file_name.contains("bars") {
33 Some(NautilusDataType::Bar)
34 } else if file_name.contains("deltas") || file_name.contains("order_book_delta") {
35 Some(NautilusDataType::OrderBookDelta)
36 } else {
37 None
38 }
39}
40
41fn main() -> Result<(), Box<dyn std::error::Error>> {
42 let args: Vec<String> = std::env::args().collect();
43 if args.len() != 2 {
44 return Err("Usage: to_json <file>".into());
45 }
46 let file_path = PathBuf::from(&args[1]);
47
48 if !file_path
50 .extension()
51 .is_some_and(|ext| ext.eq_ignore_ascii_case("parquet"))
52 {
53 return Err("Input file must be a parquet file".into());
54 }
55
56 let data_type = determine_data_type(file_path.to_str().unwrap())
58 .ok_or("Could not determine data type from filename")?;
59
60 let mut session = DataBackendSession::new(5000);
62 let file_str = file_path.to_str().unwrap();
63
64 match data_type {
66 NautilusDataType::QuoteTick => process_data::<QuoteTick>(file_str, &mut session)?,
67 NautilusDataType::TradeTick => process_data::<TradeTick>(file_str, &mut session)?,
68 NautilusDataType::Bar => process_data::<Bar>(file_str, &mut session)?,
69 NautilusDataType::OrderBookDelta => process_data::<OrderBookDelta>(file_str, &mut session)?,
70 _ => return Err("Unsupported data type".into()),
71 }
72
73 Ok(())
74}
75
76fn process_data<T>(
77 file_path: &str,
78 session: &mut DataBackendSession,
79) -> Result<(), Box<dyn std::error::Error>>
80where
81 T: serde::Serialize + TryFrom<Data> + EncodeToRecordBatch + DecodeDataFromRecordBatch,
82{
83 let input_path = PathBuf::from(file_path);
85 let stem = input_path.file_stem().unwrap().to_str().unwrap();
86 let default = PathBuf::from(".");
87 let parent = input_path.parent().unwrap_or(&default);
88 let json_path = parent.join(format!("{stem}.json"));
89 let metadata_path = parent.join(format!("{stem}.metadata.json"));
90
91 let parquet_file = std::fs::File::open(file_path)?;
93 let reader = SerializedFileReader::new(parquet_file)?;
94 let row_group_metadata = reader.metadata().row_group(0);
95 let rows_per_group = row_group_metadata.num_rows();
96
97 session.add_file::<T>("data", file_path, None)?;
99 let query_result = session.get_query_result();
100 let data = query_result.collect::<Vec<_>>();
101 let data: Vec<T> = to_variant(data);
102
103 let mut metadata = T::chunk_metadata(&data);
105 metadata.insert("rows_per_group".to_string(), rows_per_group.to_string());
106
107 let json_file = std::fs::File::create(json_path)?;
109 to_writer_pretty(json_file, &data)?;
110
111 let metadata_file = std::fs::File::create(metadata_path)?;
113 to_writer_pretty(metadata_file, &metadata)?;
114
115 println!("Successfully processed {} records", data.len());
116 Ok(())
117}