1use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use object_store::{ObjectStore, path::Path as ObjectPath};
20use parquet::{
21 arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
22 file::{
23 properties::WriterProperties,
24 reader::{FileReader, SerializedFileReader},
25 statistics::Statistics,
26 },
27};
28
29pub async fn write_batch_to_parquet(
35 batch: RecordBatch,
36 path: &str,
37 storage_options: Option<std::collections::HashMap<String, String>>,
38 compression: Option<parquet::basic::Compression>,
39 max_row_group_size: Option<usize>,
40) -> anyhow::Result<()> {
41 write_batches_to_parquet(
42 &[batch],
43 path,
44 storage_options,
45 compression,
46 max_row_group_size,
47 )
48 .await
49}
50
51pub async fn write_batches_to_parquet(
57 batches: &[RecordBatch],
58 path: &str,
59 storage_options: Option<std::collections::HashMap<String, String>>,
60 compression: Option<parquet::basic::Compression>,
61 max_row_group_size: Option<usize>,
62) -> anyhow::Result<()> {
63 let (object_store, base_path, _) = create_object_store_from_path(path, storage_options)?;
64 let object_path = if base_path.is_empty() {
65 ObjectPath::from(path)
66 } else {
67 ObjectPath::from(format!("{base_path}/{path}"))
68 };
69
70 write_batches_to_object_store(
71 batches,
72 object_store,
73 &object_path,
74 compression,
75 max_row_group_size,
76 )
77 .await
78}
79
80pub async fn write_batches_to_object_store(
86 batches: &[RecordBatch],
87 object_store: Arc<dyn ObjectStore>,
88 path: &ObjectPath,
89 compression: Option<parquet::basic::Compression>,
90 max_row_group_size: Option<usize>,
91) -> anyhow::Result<()> {
92 let mut buffer = Vec::new();
94
95 let writer_props = WriterProperties::builder()
96 .set_compression(compression.unwrap_or(parquet::basic::Compression::SNAPPY))
97 .set_max_row_group_size(max_row_group_size.unwrap_or(5000))
98 .build();
99
100 let mut writer = ArrowWriter::try_new(&mut buffer, batches[0].schema(), Some(writer_props))?;
101 for batch in batches {
102 writer.write(batch)?;
103 }
104 writer.close()?;
105
106 object_store.put(path, buffer.into()).await?;
108
109 Ok(())
110}
111
112pub async fn combine_parquet_files(
118 file_paths: Vec<&str>,
119 new_file_path: &str,
120 storage_options: Option<std::collections::HashMap<String, String>>,
121 compression: Option<parquet::basic::Compression>,
122 max_row_group_size: Option<usize>,
123) -> anyhow::Result<()> {
124 if file_paths.len() <= 1 {
125 return Ok(());
126 }
127
128 let (object_store, base_path, _) =
130 create_object_store_from_path(file_paths[0], storage_options)?;
131
132 let object_paths: Vec<ObjectPath> = file_paths
134 .iter()
135 .map(|path| {
136 if base_path.is_empty() {
137 ObjectPath::from(*path)
138 } else {
139 ObjectPath::from(format!("{base_path}/{path}"))
140 }
141 })
142 .collect();
143
144 let new_object_path = if base_path.is_empty() {
145 ObjectPath::from(new_file_path)
146 } else {
147 ObjectPath::from(format!("{base_path}/{new_file_path}"))
148 };
149
150 combine_parquet_files_from_object_store(
151 object_store,
152 object_paths,
153 &new_object_path,
154 compression,
155 max_row_group_size,
156 )
157 .await
158}
159
160pub async fn combine_parquet_files_from_object_store(
166 object_store: Arc<dyn ObjectStore>,
167 file_paths: Vec<ObjectPath>,
168 new_file_path: &ObjectPath,
169 compression: Option<parquet::basic::Compression>,
170 max_row_group_size: Option<usize>,
171) -> anyhow::Result<()> {
172 if file_paths.len() <= 1 {
173 return Ok(());
174 }
175
176 let mut all_batches: Vec<RecordBatch> = Vec::new();
177
178 for path in &file_paths {
180 let data = object_store.get(path).await?.bytes().await?;
181 let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
182 let mut reader = builder.build()?;
183
184 for batch in reader.by_ref() {
185 all_batches.push(batch?);
186 }
187 }
188
189 write_batches_to_object_store(
191 &all_batches,
192 object_store.clone(),
193 new_file_path,
194 compression,
195 max_row_group_size,
196 )
197 .await?;
198
199 for path in &file_paths {
201 if path != new_file_path {
202 object_store.delete(path).await?;
203 }
204 }
205
206 Ok(())
207}
208
209pub async fn min_max_from_parquet_metadata(
219 file_path: &str,
220 storage_options: Option<std::collections::HashMap<String, String>>,
221 column_name: &str,
222) -> anyhow::Result<(u64, u64)> {
223 let (object_store, base_path, _) = create_object_store_from_path(file_path, storage_options)?;
224 let object_path = if base_path.is_empty() {
225 ObjectPath::from(file_path)
226 } else {
227 ObjectPath::from(format!("{base_path}/{file_path}"))
228 };
229
230 min_max_from_parquet_metadata_object_store(object_store, &object_path, column_name).await
231}
232
233pub async fn min_max_from_parquet_metadata_object_store(
243 object_store: Arc<dyn ObjectStore>,
244 file_path: &ObjectPath,
245 column_name: &str,
246) -> anyhow::Result<(u64, u64)> {
247 let data = object_store.get(file_path).await?.bytes().await?;
249 let reader = SerializedFileReader::new(data)?;
250
251 let metadata = reader.metadata();
252 let mut overall_min_value: Option<i64> = None;
253 let mut overall_max_value: Option<i64> = None;
254
255 for i in 0..metadata.num_row_groups() {
257 let row_group = metadata.row_group(i);
258
259 for j in 0..row_group.num_columns() {
261 let col_metadata = row_group.column(j);
262
263 if col_metadata.column_path().string() == column_name {
264 if let Some(stats) = col_metadata.statistics() {
265 if let Statistics::Int64(int64_stats) = stats {
267 if let Some(&min_value) = int64_stats.min_opt()
269 && (overall_min_value.is_none()
270 || min_value < overall_min_value.unwrap())
271 {
272 overall_min_value = Some(min_value);
273 }
274
275 if let Some(&max_value) = int64_stats.max_opt()
277 && (overall_max_value.is_none()
278 || max_value > overall_max_value.unwrap())
279 {
280 overall_max_value = Some(max_value);
281 }
282 } else {
283 anyhow::bail!("Warning: Column name '{column_name}' is not of type i64.");
284 }
285 } else {
286 anyhow::bail!(
287 "Warning: Statistics not available for column '{column_name}' in row group {i}."
288 );
289 }
290 }
291 }
292 }
293
294 if let (Some(min), Some(max)) = (overall_min_value, overall_max_value) {
296 Ok((min as u64, max as u64))
297 } else {
298 anyhow::bail!(
299 "Column '{column_name}' not found or has no Int64 statistics in any row group."
300 )
301 }
302}
303
304pub fn create_object_store_from_path(
323 path: &str,
324 storage_options: Option<std::collections::HashMap<String, String>>,
325) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
326 let uri = normalize_path_to_uri(path);
327
328 match uri.as_str() {
329 s if s.starts_with("s3://") => create_s3_store(&uri, storage_options),
330 s if s.starts_with("gs://") || s.starts_with("gcs://") => {
331 create_gcs_store(&uri, storage_options)
332 }
333 s if s.starts_with("azure://") => create_azure_store(&uri, storage_options),
334 s if s.starts_with("abfs://") => create_abfs_store(&uri, storage_options),
335 s if s.starts_with("http://") || s.starts_with("https://") => {
336 create_http_store(&uri, storage_options)
337 }
338 s if s.starts_with("file://") => create_local_store(&uri, true),
339 _ => create_local_store(&uri, false), }
341}
342
343#[must_use]
362pub fn normalize_path_to_uri(path: &str) -> String {
363 if path.contains("://") {
364 path.to_string()
366 } else {
367 if is_absolute_path(path) {
369 path_to_file_uri(path)
370 } else {
371 let absolute_path = std::env::current_dir().unwrap().join(path);
373 path_to_file_uri(&absolute_path.to_string_lossy())
374 }
375 }
376}
377
378#[must_use]
380fn is_absolute_path(path: &str) -> bool {
381 if path.starts_with('/') {
382 true
384 } else if path.len() >= 3
385 && path.chars().nth(1) == Some(':')
386 && path.chars().nth(2) == Some('\\')
387 {
388 true
390 } else if path.len() >= 3
391 && path.chars().nth(1) == Some(':')
392 && path.chars().nth(2) == Some('/')
393 {
394 true
396 } else if path.starts_with("\\\\") {
397 true
399 } else {
400 false
401 }
402}
403
404#[must_use]
406fn path_to_file_uri(path: &str) -> String {
407 if path.starts_with('/') {
408 format!("file://{path}")
410 } else if path.len() >= 3 && path.chars().nth(1) == Some(':') {
411 let normalized = path.replace('\\', "/");
413 format!("file:///{normalized}")
414 } else if let Some(without_prefix) = path.strip_prefix("\\\\") {
415 let normalized = without_prefix.replace('\\', "/");
417 format!("file://{normalized}")
418 } else {
419 format!("file://{path}")
421 }
422}
423
424fn create_local_store(
426 uri: &str,
427 is_file_uri: bool,
428) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
429 let path = if is_file_uri {
430 uri.strip_prefix("file://").unwrap_or(uri)
431 } else {
432 uri
433 };
434
435 let local_store = object_store::local::LocalFileSystem::new_with_prefix(path)?;
436 Ok((Arc::new(local_store), String::new(), uri.to_string()))
437}
438
439fn create_s3_store(
441 uri: &str,
442 storage_options: Option<std::collections::HashMap<String, String>>,
443) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
444 let (url, path) = parse_url_and_path(uri)?;
445 let bucket = extract_host(&url, "Invalid S3 URI: missing bucket")?;
446
447 let mut builder = object_store::aws::AmazonS3Builder::new().with_bucket_name(&bucket);
448
449 if let Some(options) = storage_options {
451 for (key, value) in options {
452 match key.as_str() {
453 "endpoint_url" => {
454 builder = builder.with_endpoint(&value);
455 }
456 "region" => {
457 builder = builder.with_region(&value);
458 }
459 "access_key_id" | "key" => {
460 builder = builder.with_access_key_id(&value);
461 }
462 "secret_access_key" | "secret" => {
463 builder = builder.with_secret_access_key(&value);
464 }
465 "session_token" | "token" => {
466 builder = builder.with_token(&value);
467 }
468 "allow_http" => {
469 let allow_http = value.to_lowercase() == "true";
470 builder = builder.with_allow_http(allow_http);
471 }
472 _ => {
473 log::warn!("Unknown S3 storage option: {key}");
475 }
476 }
477 }
478 }
479
480 let s3_store = builder.build()?;
481 Ok((Arc::new(s3_store), path, uri.to_string()))
482}
483
484fn create_gcs_store(
486 uri: &str,
487 storage_options: Option<std::collections::HashMap<String, String>>,
488) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
489 let (url, path) = parse_url_and_path(uri)?;
490 let bucket = extract_host(&url, "Invalid GCS URI: missing bucket")?;
491
492 let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new().with_bucket_name(&bucket);
493
494 if let Some(options) = storage_options {
496 for (key, value) in options {
497 match key.as_str() {
498 "service_account_path" => {
499 builder = builder.with_service_account_path(&value);
500 }
501 "service_account_key" => {
502 builder = builder.with_service_account_key(&value);
503 }
504 "project_id" => {
505 log::warn!(
508 "project_id should be set via service account or environment variables"
509 );
510 }
511 "application_credentials" => {
512 unsafe {
517 std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", &value);
518 }
519 }
520 _ => {
521 log::warn!("Unknown GCS storage option: {key}");
523 }
524 }
525 }
526 }
527
528 let gcs_store = builder.build()?;
529 Ok((Arc::new(gcs_store), path, uri.to_string()))
530}
531
532fn create_azure_store(
534 uri: &str,
535 storage_options: Option<std::collections::HashMap<String, String>>,
536) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
537 let (url, _) = parse_url_and_path(uri)?;
538 let account = extract_host(&url, "Invalid Azure URI: missing account")?;
539
540 let path_segments: Vec<&str> = url.path().trim_start_matches('/').split('/').collect();
541 if path_segments.is_empty() || path_segments[0].is_empty() {
542 anyhow::bail!("Invalid Azure URI: missing container");
543 }
544
545 let container = path_segments[0];
546 let path = if path_segments.len() > 1 {
547 path_segments[1..].join("/")
548 } else {
549 String::new()
550 };
551
552 let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
553 .with_account(&account)
554 .with_container_name(container);
555
556 if let Some(options) = storage_options {
558 for (key, value) in options {
559 match key.as_str() {
560 "account_name" => {
561 builder = builder.with_account(&value);
562 }
563 "account_key" => {
564 builder = builder.with_access_key(&value);
565 }
566 "sas_token" => {
567 let query_pairs: Vec<(String, String)> = value
569 .split('&')
570 .filter_map(|pair| {
571 let mut parts = pair.split('=');
572 match (parts.next(), parts.next()) {
573 (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
574 _ => None,
575 }
576 })
577 .collect();
578 builder = builder.with_sas_authorization(query_pairs);
579 }
580 "client_id" => {
581 builder = builder.with_client_id(&value);
582 }
583 "client_secret" => {
584 builder = builder.with_client_secret(&value);
585 }
586 "tenant_id" => {
587 builder = builder.with_tenant_id(&value);
588 }
589 _ => {
590 log::warn!("Unknown Azure storage option: {key}");
592 }
593 }
594 }
595 }
596
597 let azure_store = builder.build()?;
598 Ok((Arc::new(azure_store), path, uri.to_string()))
599}
600
601fn create_abfs_store(
603 uri: &str,
604 storage_options: Option<std::collections::HashMap<String, String>>,
605) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
606 let (url, path) = parse_url_and_path(uri)?;
607 let host = extract_host(&url, "Invalid ABFS URI: missing host")?;
608
609 let account = host
611 .split('.')
612 .next()
613 .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: cannot extract account from host"))?;
614
615 let container = url
617 .username()
618 .split('@')
619 .next()
620 .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: missing container"))?;
621
622 let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
623 .with_account(account)
624 .with_container_name(container);
625
626 if let Some(options) = storage_options {
628 for (key, value) in options {
629 match key.as_str() {
630 "account_name" => {
631 builder = builder.with_account(&value);
632 }
633 "account_key" => {
634 builder = builder.with_access_key(&value);
635 }
636 "sas_token" => {
637 let query_pairs: Vec<(String, String)> = value
639 .split('&')
640 .filter_map(|pair| {
641 let mut parts = pair.split('=');
642 match (parts.next(), parts.next()) {
643 (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
644 _ => None,
645 }
646 })
647 .collect();
648 builder = builder.with_sas_authorization(query_pairs);
649 }
650 "client_id" => {
651 builder = builder.with_client_id(&value);
652 }
653 "client_secret" => {
654 builder = builder.with_client_secret(&value);
655 }
656 "tenant_id" => {
657 builder = builder.with_tenant_id(&value);
658 }
659 _ => {
660 log::warn!("Unknown ABFS storage option: {key}");
662 }
663 }
664 }
665 }
666
667 let azure_store = builder.build()?;
668 Ok((Arc::new(azure_store), path, uri.to_string()))
669}
670
671fn create_http_store(
673 uri: &str,
674 storage_options: Option<std::collections::HashMap<String, String>>,
675) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
676 let (url, path) = parse_url_and_path(uri)?;
677 let base_url = format!("{}://{}", url.scheme(), url.host_str().unwrap_or(""));
678
679 let builder = object_store::http::HttpBuilder::new().with_url(base_url);
680
681 if let Some(options) = storage_options {
683 for (key, _value) in options {
684 log::warn!("Unknown HTTP storage option: {key}");
688 }
689 }
690
691 let http_store = builder.build()?;
692 Ok((Arc::new(http_store), path, uri.to_string()))
693}
694
695fn parse_url_and_path(uri: &str) -> anyhow::Result<(url::Url, String)> {
697 let url = url::Url::parse(uri)?;
698 let path = url.path().trim_start_matches('/').to_string();
699 Ok((url, path))
700}
701
702fn extract_host(url: &url::Url, error_msg: &str) -> anyhow::Result<String> {
704 url.host_str()
705 .map(ToString::to_string)
706 .ok_or_else(|| anyhow::anyhow!("{}", error_msg))
707}
708
709#[cfg(test)]
714mod tests {
715 use std::collections::HashMap;
716
717 use rstest::rstest;
718
719 use super::*;
720
721 #[rstest]
722 fn test_create_object_store_from_path_local() {
723 let temp_dir = std::env::temp_dir().join("nautilus_test");
725 std::fs::create_dir_all(&temp_dir).unwrap();
726
727 let result = create_object_store_from_path(temp_dir.to_str().unwrap(), None);
728 if let Err(e) = &result {
729 println!("Error: {e:?}");
730 }
731 assert!(result.is_ok());
732 let (_, base_path, uri) = result.unwrap();
733 assert_eq!(base_path, "");
734 assert_eq!(uri, format!("file://{}", temp_dir.to_str().unwrap()));
736
737 std::fs::remove_dir_all(&temp_dir).ok();
739 }
740
741 #[rstest]
742 fn test_create_object_store_from_path_s3() {
743 let mut options = HashMap::new();
744 options.insert(
745 "endpoint_url".to_string(),
746 "https://test.endpoint.com".to_string(),
747 );
748 options.insert("region".to_string(), "us-west-2".to_string());
749 options.insert("access_key_id".to_string(), "test_key".to_string());
750 options.insert("secret_access_key".to_string(), "test_secret".to_string());
751
752 let result = create_object_store_from_path("s3://test-bucket/path", Some(options));
753 assert!(result.is_ok());
754 let (_, base_path, uri) = result.unwrap();
755 assert_eq!(base_path, "path");
756 assert_eq!(uri, "s3://test-bucket/path");
757 }
758
759 #[rstest]
760 fn test_create_object_store_from_path_azure() {
761 let mut options = HashMap::new();
762 options.insert("account_name".to_string(), "testaccount".to_string());
763 options.insert("account_key".to_string(), "dGVzdGtleQ==".to_string()); let result =
767 create_object_store_from_path("azure://testaccount/container/path", Some(options));
768 if let Err(e) = &result {
769 println!("Azure Error: {e:?}");
770 }
771 assert!(result.is_ok());
772 let (_, base_path, uri) = result.unwrap();
773 assert_eq!(base_path, "path");
774 assert_eq!(uri, "azure://testaccount/container/path");
775 }
776
777 #[rstest]
778 fn test_create_object_store_from_path_gcs() {
779 let mut options = HashMap::new();
781 options.insert("project_id".to_string(), "test-project".to_string());
782
783 let result = create_object_store_from_path("gs://test-bucket/path", Some(options));
784 match result {
787 Ok((_, base_path, uri)) => {
788 assert_eq!(base_path, "path");
789 assert_eq!(uri, "gs://test-bucket/path");
790 }
791 Err(e) => {
792 let error_msg = format!("{e:?}");
794 assert!(error_msg.contains("test-bucket") || error_msg.contains("credential"));
795 }
796 }
797 }
798
799 #[rstest]
800 fn test_create_object_store_from_path_empty_options() {
801 let result = create_object_store_from_path("s3://test-bucket/path", None);
802 assert!(result.is_ok());
803 let (_, base_path, uri) = result.unwrap();
804 assert_eq!(base_path, "path");
805 assert_eq!(uri, "s3://test-bucket/path");
806 }
807
808 #[rstest]
809 fn test_parse_url_and_path() {
810 let result = parse_url_and_path("s3://bucket/path/to/file");
811 assert!(result.is_ok());
812 let (url, path) = result.unwrap();
813 assert_eq!(url.scheme(), "s3");
814 assert_eq!(url.host_str().unwrap(), "bucket");
815 assert_eq!(path, "path/to/file");
816 }
817
818 #[rstest]
819 fn test_extract_host() {
820 let url = url::Url::parse("s3://test-bucket/path").unwrap();
821 let result = extract_host(&url, "Test error");
822 assert!(result.is_ok());
823 assert_eq!(result.unwrap(), "test-bucket");
824 }
825
826 #[rstest]
827 fn test_normalize_path_to_uri() {
828 assert_eq!(normalize_path_to_uri("/tmp/test"), "file:///tmp/test");
830
831 assert_eq!(
833 normalize_path_to_uri("C:\\tmp\\test"),
834 "file:///C:/tmp/test"
835 );
836 assert_eq!(normalize_path_to_uri("C:/tmp/test"), "file:///C:/tmp/test");
837 assert_eq!(
838 normalize_path_to_uri("D:\\data\\file.txt"),
839 "file:///D:/data/file.txt"
840 );
841
842 assert_eq!(
844 normalize_path_to_uri("\\\\server\\share\\file"),
845 "file://server/share/file"
846 );
847
848 assert_eq!(
850 normalize_path_to_uri("s3://bucket/path"),
851 "s3://bucket/path"
852 );
853 assert_eq!(
854 normalize_path_to_uri("file:///tmp/test"),
855 "file:///tmp/test"
856 );
857 assert_eq!(
858 normalize_path_to_uri("https://example.com/path"),
859 "https://example.com/path"
860 );
861 }
862
863 #[rstest]
864 fn test_is_absolute_path() {
865 assert!(is_absolute_path("/tmp/test"));
867 assert!(is_absolute_path("/"));
868
869 assert!(is_absolute_path("C:\\tmp\\test"));
871 assert!(is_absolute_path("C:/tmp/test"));
872 assert!(is_absolute_path("D:\\"));
873 assert!(is_absolute_path("Z:/"));
874
875 assert!(is_absolute_path("\\\\server\\share"));
877 assert!(is_absolute_path("\\\\localhost\\c$"));
878
879 assert!(!is_absolute_path("tmp/test"));
881 assert!(!is_absolute_path("./test"));
882 assert!(!is_absolute_path("../test"));
883 assert!(!is_absolute_path("test.txt"));
884
885 assert!(!is_absolute_path(""));
887 assert!(!is_absolute_path("C"));
888 assert!(!is_absolute_path("C:"));
889 assert!(!is_absolute_path("\\"));
890 }
891
892 #[rstest]
893 fn test_path_to_file_uri() {
894 assert_eq!(path_to_file_uri("/tmp/test"), "file:///tmp/test");
896 assert_eq!(path_to_file_uri("/"), "file:///");
897
898 assert_eq!(path_to_file_uri("C:\\tmp\\test"), "file:///C:/tmp/test");
900 assert_eq!(path_to_file_uri("C:/tmp/test"), "file:///C:/tmp/test");
901 assert_eq!(path_to_file_uri("D:\\"), "file:///D:/");
902
903 assert_eq!(
905 path_to_file_uri("\\\\server\\share\\file"),
906 "file://server/share/file"
907 );
908 assert_eq!(
909 path_to_file_uri("\\\\localhost\\c$\\test"),
910 "file://localhost/c$/test"
911 );
912 }
913}