1use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use object_store::{ObjectStore, path::Path as ObjectPath};
20use parquet::{
21 arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
22 file::{
23 properties::WriterProperties,
24 reader::{FileReader, SerializedFileReader},
25 statistics::Statistics,
26 },
27};
28
29pub async fn write_batch_to_parquet(
35 batch: RecordBatch,
36 path: &str,
37 storage_options: Option<std::collections::HashMap<String, String>>,
38 compression: Option<parquet::basic::Compression>,
39 max_row_group_size: Option<usize>,
40) -> anyhow::Result<()> {
41 write_batches_to_parquet(
42 &[batch],
43 path,
44 storage_options,
45 compression,
46 max_row_group_size,
47 )
48 .await
49}
50
51pub async fn write_batches_to_parquet(
57 batches: &[RecordBatch],
58 path: &str,
59 storage_options: Option<std::collections::HashMap<String, String>>,
60 compression: Option<parquet::basic::Compression>,
61 max_row_group_size: Option<usize>,
62) -> anyhow::Result<()> {
63 let (object_store, base_path, _) = create_object_store_from_path(path, storage_options)?;
64 let object_path = if base_path.is_empty() {
65 ObjectPath::from(path)
66 } else {
67 ObjectPath::from(format!("{base_path}/{path}"))
68 };
69
70 write_batches_to_object_store(
71 batches,
72 object_store,
73 &object_path,
74 compression,
75 max_row_group_size,
76 )
77 .await
78}
79
80pub async fn write_batches_to_object_store(
86 batches: &[RecordBatch],
87 object_store: Arc<dyn ObjectStore>,
88 path: &ObjectPath,
89 compression: Option<parquet::basic::Compression>,
90 max_row_group_size: Option<usize>,
91) -> anyhow::Result<()> {
92 let mut buffer = Vec::new();
94
95 let writer_props = WriterProperties::builder()
96 .set_compression(compression.unwrap_or(parquet::basic::Compression::SNAPPY))
97 .set_max_row_group_size(max_row_group_size.unwrap_or(5000))
98 .build();
99
100 let mut writer = ArrowWriter::try_new(&mut buffer, batches[0].schema(), Some(writer_props))?;
101 for batch in batches {
102 writer.write(batch)?;
103 }
104 writer.close()?;
105
106 object_store.put(path, buffer.into()).await?;
108
109 Ok(())
110}
111
112pub async fn combine_parquet_files(
118 file_paths: Vec<&str>,
119 new_file_path: &str,
120 storage_options: Option<std::collections::HashMap<String, String>>,
121 compression: Option<parquet::basic::Compression>,
122 max_row_group_size: Option<usize>,
123) -> anyhow::Result<()> {
124 if file_paths.len() <= 1 {
125 return Ok(());
126 }
127
128 let (object_store, base_path, _) =
130 create_object_store_from_path(file_paths[0], storage_options)?;
131
132 let object_paths: Vec<ObjectPath> = file_paths
134 .iter()
135 .map(|path| {
136 if base_path.is_empty() {
137 ObjectPath::from(*path)
138 } else {
139 ObjectPath::from(format!("{base_path}/{path}"))
140 }
141 })
142 .collect();
143
144 let new_object_path = if base_path.is_empty() {
145 ObjectPath::from(new_file_path)
146 } else {
147 ObjectPath::from(format!("{base_path}/{new_file_path}"))
148 };
149
150 combine_parquet_files_from_object_store(
151 object_store,
152 object_paths,
153 &new_object_path,
154 compression,
155 max_row_group_size,
156 )
157 .await
158}
159
160pub async fn combine_parquet_files_from_object_store(
166 object_store: Arc<dyn ObjectStore>,
167 file_paths: Vec<ObjectPath>,
168 new_file_path: &ObjectPath,
169 compression: Option<parquet::basic::Compression>,
170 max_row_group_size: Option<usize>,
171) -> anyhow::Result<()> {
172 if file_paths.len() <= 1 {
173 return Ok(());
174 }
175
176 let mut all_batches: Vec<RecordBatch> = Vec::new();
177
178 for path in &file_paths {
180 let data = object_store.get(path).await?.bytes().await?;
181 let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
182 let mut reader = builder.build()?;
183
184 for batch in reader.by_ref() {
185 all_batches.push(batch?);
186 }
187 }
188
189 write_batches_to_object_store(
191 &all_batches,
192 object_store.clone(),
193 new_file_path,
194 compression,
195 max_row_group_size,
196 )
197 .await?;
198
199 for path in &file_paths {
201 if path != new_file_path {
202 object_store.delete(path).await?;
203 }
204 }
205
206 Ok(())
207}
208
209pub async fn min_max_from_parquet_metadata(
219 file_path: &str,
220 storage_options: Option<std::collections::HashMap<String, String>>,
221 column_name: &str,
222) -> anyhow::Result<(u64, u64)> {
223 let (object_store, base_path, _) = create_object_store_from_path(file_path, storage_options)?;
224 let object_path = if base_path.is_empty() {
225 ObjectPath::from(file_path)
226 } else {
227 ObjectPath::from(format!("{base_path}/{file_path}"))
228 };
229
230 min_max_from_parquet_metadata_object_store(object_store, &object_path, column_name).await
231}
232
233pub async fn min_max_from_parquet_metadata_object_store(
243 object_store: Arc<dyn ObjectStore>,
244 file_path: &ObjectPath,
245 column_name: &str,
246) -> anyhow::Result<(u64, u64)> {
247 let data = object_store.get(file_path).await?.bytes().await?;
249 let reader = SerializedFileReader::new(data)?;
250
251 let metadata = reader.metadata();
252 let mut overall_min_value: Option<i64> = None;
253 let mut overall_max_value: Option<i64> = None;
254
255 for i in 0..metadata.num_row_groups() {
257 let row_group = metadata.row_group(i);
258
259 for j in 0..row_group.num_columns() {
261 let col_metadata = row_group.column(j);
262
263 if col_metadata.column_path().string() == column_name {
264 if let Some(stats) = col_metadata.statistics() {
265 if let Statistics::Int64(int64_stats) = stats {
267 if let Some(&min_value) = int64_stats.min_opt()
269 && (overall_min_value.is_none()
270 || min_value < overall_min_value.unwrap())
271 {
272 overall_min_value = Some(min_value);
273 }
274
275 if let Some(&max_value) = int64_stats.max_opt()
277 && (overall_max_value.is_none()
278 || max_value > overall_max_value.unwrap())
279 {
280 overall_max_value = Some(max_value);
281 }
282 } else {
283 anyhow::bail!("Warning: Column name '{column_name}' is not of type i64.");
284 }
285 } else {
286 anyhow::bail!(
287 "Warning: Statistics not available for column '{column_name}' in row group {i}."
288 );
289 }
290 }
291 }
292 }
293
294 if let (Some(min), Some(max)) = (overall_min_value, overall_max_value) {
296 Ok((min as u64, max as u64))
297 } else {
298 anyhow::bail!(
299 "Column '{column_name}' not found or has no Int64 statistics in any row group."
300 )
301 }
302}
303
304pub fn create_object_store_from_path(
323 path: &str,
324 storage_options: Option<std::collections::HashMap<String, String>>,
325) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
326 let uri = normalize_path_to_uri(path);
327
328 match uri.as_str() {
329 s if s.starts_with("s3://") => create_s3_store(&uri, storage_options),
330 s if s.starts_with("gs://") || s.starts_with("gcs://") => {
331 create_gcs_store(&uri, storage_options)
332 }
333 s if s.starts_with("az://") => create_azure_store(&uri, storage_options),
334 s if s.starts_with("abfs://") => create_abfs_store(&uri, storage_options),
335 s if s.starts_with("http://") || s.starts_with("https://") => {
336 create_http_store(&uri, storage_options)
337 }
338 s if s.starts_with("file://") => create_local_store(&uri, true),
339 _ => create_local_store(&uri, false), }
341}
342
343#[must_use]
362pub fn normalize_path_to_uri(path: &str) -> String {
363 if path.contains("://") {
364 path.to_string()
366 } else {
367 if is_absolute_path(path) {
369 path_to_file_uri(path)
370 } else {
371 let absolute_path = std::env::current_dir().unwrap().join(path);
373 path_to_file_uri(&absolute_path.to_string_lossy())
374 }
375 }
376}
377
378#[must_use]
380fn is_absolute_path(path: &str) -> bool {
381 if path.starts_with('/') {
382 true
384 } else if path.len() >= 3
385 && path.chars().nth(1) == Some(':')
386 && path.chars().nth(2) == Some('\\')
387 {
388 true
390 } else if path.len() >= 3
391 && path.chars().nth(1) == Some(':')
392 && path.chars().nth(2) == Some('/')
393 {
394 true
396 } else if path.starts_with("\\\\") {
397 true
399 } else {
400 false
401 }
402}
403
404#[must_use]
406fn path_to_file_uri(path: &str) -> String {
407 if path.starts_with('/') {
408 format!("file://{path}")
410 } else if path.len() >= 3 && path.chars().nth(1) == Some(':') {
411 let normalized = path.replace('\\', "/");
413 format!("file:///{normalized}")
414 } else if let Some(without_prefix) = path.strip_prefix("\\\\") {
415 let normalized = without_prefix.replace('\\', "/");
417 format!("file://{normalized}")
418 } else {
419 format!("file://{path}")
421 }
422}
423
424fn create_local_store(
426 uri: &str,
427 is_file_uri: bool,
428) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
429 let path = if is_file_uri {
430 uri.strip_prefix("file://").unwrap_or(uri)
431 } else {
432 uri
433 };
434
435 let local_store = object_store::local::LocalFileSystem::new_with_prefix(path)?;
436 Ok((Arc::new(local_store), String::new(), uri.to_string()))
437}
438
439fn create_s3_store(
441 uri: &str,
442 storage_options: Option<std::collections::HashMap<String, String>>,
443) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
444 let (url, path) = parse_url_and_path(uri)?;
445 let bucket = extract_host(&url, "Invalid S3 URI: missing bucket")?;
446
447 let mut builder = object_store::aws::AmazonS3Builder::new().with_bucket_name(&bucket);
448
449 if let Some(options) = storage_options {
451 for (key, value) in options {
452 match key.as_str() {
453 "endpoint_url" => {
454 builder = builder.with_endpoint(&value);
455 }
456 "region" => {
457 builder = builder.with_region(&value);
458 }
459 "access_key_id" | "key" => {
460 builder = builder.with_access_key_id(&value);
461 }
462 "secret_access_key" | "secret" => {
463 builder = builder.with_secret_access_key(&value);
464 }
465 "session_token" | "token" => {
466 builder = builder.with_token(&value);
467 }
468 "allow_http" => {
469 let allow_http = value.to_lowercase() == "true";
470 builder = builder.with_allow_http(allow_http);
471 }
472 _ => {
473 log::warn!("Unknown S3 storage option: {key}");
475 }
476 }
477 }
478 }
479
480 let s3_store = builder.build()?;
481 Ok((Arc::new(s3_store), path, uri.to_string()))
482}
483
484fn create_gcs_store(
486 uri: &str,
487 storage_options: Option<std::collections::HashMap<String, String>>,
488) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
489 let (url, path) = parse_url_and_path(uri)?;
490 let bucket = extract_host(&url, "Invalid GCS URI: missing bucket")?;
491
492 let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new().with_bucket_name(&bucket);
493
494 if let Some(options) = storage_options {
496 for (key, value) in options {
497 match key.as_str() {
498 "service_account_path" => {
499 builder = builder.with_service_account_path(&value);
500 }
501 "service_account_key" => {
502 builder = builder.with_service_account_key(&value);
503 }
504 "project_id" => {
505 log::warn!(
508 "project_id should be set via service account or environment variables"
509 );
510 }
511 "application_credentials" => {
512 unsafe {
517 std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", &value);
518 }
519 }
520 _ => {
521 log::warn!("Unknown GCS storage option: {key}");
523 }
524 }
525 }
526 }
527
528 let gcs_store = builder.build()?;
529 Ok((Arc::new(gcs_store), path, uri.to_string()))
530}
531
532fn create_azure_store(
534 uri: &str,
535 storage_options: Option<std::collections::HashMap<String, String>>,
536) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
537 let (url, _) = parse_url_and_path(uri)?;
538 let container = extract_host(&url, "Invalid Azure URI: missing container")?;
539
540 let path = url.path().trim_start_matches('/').to_string();
541
542 let mut builder =
543 object_store::azure::MicrosoftAzureBuilder::new().with_container_name(container);
544
545 if let Some(options) = storage_options {
547 for (key, value) in options {
548 match key.as_str() {
549 "account_name" => {
550 builder = builder.with_account(&value);
551 }
552 "account_key" => {
553 builder = builder.with_access_key(&value);
554 }
555 "sas_token" => {
556 let query_pairs: Vec<(String, String)> = value
558 .split('&')
559 .filter_map(|pair| {
560 let mut parts = pair.split('=');
561 match (parts.next(), parts.next()) {
562 (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
563 _ => None,
564 }
565 })
566 .collect();
567 builder = builder.with_sas_authorization(query_pairs);
568 }
569 "client_id" => {
570 builder = builder.with_client_id(&value);
571 }
572 "client_secret" => {
573 builder = builder.with_client_secret(&value);
574 }
575 "tenant_id" => {
576 builder = builder.with_tenant_id(&value);
577 }
578 _ => {
579 log::warn!("Unknown Azure storage option: {key}");
581 }
582 }
583 }
584 }
585
586 let azure_store = builder.build()?;
587 Ok((Arc::new(azure_store), path, uri.to_string()))
588}
589
590fn create_abfs_store(
592 uri: &str,
593 storage_options: Option<std::collections::HashMap<String, String>>,
594) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
595 let (url, path) = parse_url_and_path(uri)?;
596 let host = extract_host(&url, "Invalid ABFS URI: missing host")?;
597
598 let account = host
600 .split('.')
601 .next()
602 .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: cannot extract account from host"))?;
603
604 let container = url
606 .username()
607 .split('@')
608 .next()
609 .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: missing container"))?;
610
611 let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
612 .with_account(account)
613 .with_container_name(container);
614
615 if let Some(options) = storage_options {
617 for (key, value) in options {
618 match key.as_str() {
619 "account_name" => {
620 builder = builder.with_account(&value);
621 }
622 "account_key" => {
623 builder = builder.with_access_key(&value);
624 }
625 "sas_token" => {
626 let query_pairs: Vec<(String, String)> = value
628 .split('&')
629 .filter_map(|pair| {
630 let mut parts = pair.split('=');
631 match (parts.next(), parts.next()) {
632 (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
633 _ => None,
634 }
635 })
636 .collect();
637 builder = builder.with_sas_authorization(query_pairs);
638 }
639 "client_id" => {
640 builder = builder.with_client_id(&value);
641 }
642 "client_secret" => {
643 builder = builder.with_client_secret(&value);
644 }
645 "tenant_id" => {
646 builder = builder.with_tenant_id(&value);
647 }
648 _ => {
649 log::warn!("Unknown ABFS storage option: {key}");
651 }
652 }
653 }
654 }
655
656 let azure_store = builder.build()?;
657 Ok((Arc::new(azure_store), path, uri.to_string()))
658}
659
660fn create_http_store(
662 uri: &str,
663 storage_options: Option<std::collections::HashMap<String, String>>,
664) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
665 let (url, path) = parse_url_and_path(uri)?;
666 let base_url = format!("{}://{}", url.scheme(), url.host_str().unwrap_or(""));
667
668 let builder = object_store::http::HttpBuilder::new().with_url(base_url);
669
670 if let Some(options) = storage_options {
672 for (key, _value) in options {
673 log::warn!("Unknown HTTP storage option: {key}");
677 }
678 }
679
680 let http_store = builder.build()?;
681 Ok((Arc::new(http_store), path, uri.to_string()))
682}
683
684fn parse_url_and_path(uri: &str) -> anyhow::Result<(url::Url, String)> {
686 let url = url::Url::parse(uri)?;
687 let path = url.path().trim_start_matches('/').to_string();
688 Ok((url, path))
689}
690
691fn extract_host(url: &url::Url, error_msg: &str) -> anyhow::Result<String> {
693 url.host_str()
694 .map(ToString::to_string)
695 .ok_or_else(|| anyhow::anyhow!("{}", error_msg))
696}
697
698#[cfg(test)]
703mod tests {
704 use std::collections::HashMap;
705
706 use rstest::rstest;
707
708 use super::*;
709
710 #[rstest]
711 fn test_create_object_store_from_path_local() {
712 let temp_dir = std::env::temp_dir().join("nautilus_test");
714 std::fs::create_dir_all(&temp_dir).unwrap();
715
716 let result = create_object_store_from_path(temp_dir.to_str().unwrap(), None);
717 if let Err(e) = &result {
718 println!("Error: {e:?}");
719 }
720 assert!(result.is_ok());
721 let (_, base_path, uri) = result.unwrap();
722 assert_eq!(base_path, "");
723 assert_eq!(uri, format!("file://{}", temp_dir.to_str().unwrap()));
725
726 std::fs::remove_dir_all(&temp_dir).ok();
728 }
729
730 #[rstest]
731 fn test_create_object_store_from_path_s3() {
732 let mut options = HashMap::new();
733 options.insert(
734 "endpoint_url".to_string(),
735 "https://test.endpoint.com".to_string(),
736 );
737 options.insert("region".to_string(), "us-west-2".to_string());
738 options.insert("access_key_id".to_string(), "test_key".to_string());
739 options.insert("secret_access_key".to_string(), "test_secret".to_string());
740
741 let result = create_object_store_from_path("s3://test-bucket/path", Some(options));
742 assert!(result.is_ok());
743 let (_, base_path, uri) = result.unwrap();
744 assert_eq!(base_path, "path");
745 assert_eq!(uri, "s3://test-bucket/path");
746 }
747
748 #[rstest]
749 fn test_create_object_store_from_path_azure() {
750 let mut options = HashMap::new();
751 options.insert("account_name".to_string(), "testaccount".to_string());
752 options.insert("account_key".to_string(), "dGVzdGtleQ==".to_string()); let result = create_object_store_from_path("az://container/path", Some(options));
756 if let Err(e) = &result {
757 println!("Azure Error: {e:?}");
758 }
759 assert!(result.is_ok());
760 let (_, base_path, uri) = result.unwrap();
761 assert_eq!(base_path, "path");
762 assert_eq!(uri, "az://container/path");
763 }
764
765 #[rstest]
766 fn test_create_object_store_from_path_gcs() {
767 let mut options = HashMap::new();
769 options.insert("project_id".to_string(), "test-project".to_string());
770
771 let result = create_object_store_from_path("gs://test-bucket/path", Some(options));
772 match result {
775 Ok((_, base_path, uri)) => {
776 assert_eq!(base_path, "path");
777 assert_eq!(uri, "gs://test-bucket/path");
778 }
779 Err(e) => {
780 let error_msg = format!("{e:?}");
782 assert!(error_msg.contains("test-bucket") || error_msg.contains("credential"));
783 }
784 }
785 }
786
787 #[rstest]
788 fn test_create_object_store_from_path_empty_options() {
789 let result = create_object_store_from_path("s3://test-bucket/path", None);
790 assert!(result.is_ok());
791 let (_, base_path, uri) = result.unwrap();
792 assert_eq!(base_path, "path");
793 assert_eq!(uri, "s3://test-bucket/path");
794 }
795
796 #[rstest]
797 fn test_parse_url_and_path() {
798 let result = parse_url_and_path("s3://bucket/path/to/file");
799 assert!(result.is_ok());
800 let (url, path) = result.unwrap();
801 assert_eq!(url.scheme(), "s3");
802 assert_eq!(url.host_str().unwrap(), "bucket");
803 assert_eq!(path, "path/to/file");
804 }
805
806 #[rstest]
807 fn test_extract_host() {
808 let url = url::Url::parse("s3://test-bucket/path").unwrap();
809 let result = extract_host(&url, "Test error");
810 assert!(result.is_ok());
811 assert_eq!(result.unwrap(), "test-bucket");
812 }
813
814 #[rstest]
815 fn test_normalize_path_to_uri() {
816 assert_eq!(normalize_path_to_uri("/tmp/test"), "file:///tmp/test");
818
819 assert_eq!(
821 normalize_path_to_uri("C:\\tmp\\test"),
822 "file:///C:/tmp/test"
823 );
824 assert_eq!(normalize_path_to_uri("C:/tmp/test"), "file:///C:/tmp/test");
825 assert_eq!(
826 normalize_path_to_uri("D:\\data\\file.txt"),
827 "file:///D:/data/file.txt"
828 );
829
830 assert_eq!(
832 normalize_path_to_uri("\\\\server\\share\\file"),
833 "file://server/share/file"
834 );
835
836 assert_eq!(
838 normalize_path_to_uri("s3://bucket/path"),
839 "s3://bucket/path"
840 );
841 assert_eq!(
842 normalize_path_to_uri("file:///tmp/test"),
843 "file:///tmp/test"
844 );
845 assert_eq!(
846 normalize_path_to_uri("https://example.com/path"),
847 "https://example.com/path"
848 );
849 }
850
851 #[rstest]
852 fn test_is_absolute_path() {
853 assert!(is_absolute_path("/tmp/test"));
855 assert!(is_absolute_path("/"));
856
857 assert!(is_absolute_path("C:\\tmp\\test"));
859 assert!(is_absolute_path("C:/tmp/test"));
860 assert!(is_absolute_path("D:\\"));
861 assert!(is_absolute_path("Z:/"));
862
863 assert!(is_absolute_path("\\\\server\\share"));
865 assert!(is_absolute_path("\\\\localhost\\c$"));
866
867 assert!(!is_absolute_path("tmp/test"));
869 assert!(!is_absolute_path("./test"));
870 assert!(!is_absolute_path("../test"));
871 assert!(!is_absolute_path("test.txt"));
872
873 assert!(!is_absolute_path(""));
875 assert!(!is_absolute_path("C"));
876 assert!(!is_absolute_path("C:"));
877 assert!(!is_absolute_path("\\"));
878 }
879
880 #[rstest]
881 fn test_path_to_file_uri() {
882 assert_eq!(path_to_file_uri("/tmp/test"), "file:///tmp/test");
884 assert_eq!(path_to_file_uri("/"), "file:///");
885
886 assert_eq!(path_to_file_uri("C:\\tmp\\test"), "file:///C:/tmp/test");
888 assert_eq!(path_to_file_uri("C:/tmp/test"), "file:///C:/tmp/test");
889 assert_eq!(path_to_file_uri("D:\\"), "file:///D:/");
890
891 assert_eq!(
893 path_to_file_uri("\\\\server\\share\\file"),
894 "file://server/share/file"
895 );
896 assert_eq!(
897 path_to_file_uri("\\\\localhost\\c$\\test"),
898 "file://localhost/c$/test"
899 );
900 }
901}