1use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use object_store::{ObjectStore, path::Path as ObjectPath};
20use parquet::{
21 arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
22 file::{
23 properties::WriterProperties,
24 reader::{FileReader, SerializedFileReader},
25 statistics::Statistics,
26 },
27};
28
29pub async fn write_batch_to_parquet(
35 batch: RecordBatch,
36 path: &str,
37 storage_options: Option<std::collections::HashMap<String, String>>,
38 compression: Option<parquet::basic::Compression>,
39 max_row_group_size: Option<usize>,
40) -> anyhow::Result<()> {
41 write_batches_to_parquet(
42 &[batch],
43 path,
44 storage_options,
45 compression,
46 max_row_group_size,
47 )
48 .await
49}
50
51pub async fn write_batches_to_parquet(
57 batches: &[RecordBatch],
58 path: &str,
59 storage_options: Option<std::collections::HashMap<String, String>>,
60 compression: Option<parquet::basic::Compression>,
61 max_row_group_size: Option<usize>,
62) -> anyhow::Result<()> {
63 let (object_store, base_path, _) = create_object_store_from_path(path, storage_options)?;
64 let object_path = if base_path.is_empty() {
65 ObjectPath::from(path)
66 } else {
67 ObjectPath::from(format!("{base_path}/{path}"))
68 };
69
70 write_batches_to_object_store(
71 batches,
72 object_store,
73 &object_path,
74 compression,
75 max_row_group_size,
76 )
77 .await
78}
79
80pub async fn write_batches_to_object_store(
86 batches: &[RecordBatch],
87 object_store: Arc<dyn ObjectStore>,
88 path: &ObjectPath,
89 compression: Option<parquet::basic::Compression>,
90 max_row_group_size: Option<usize>,
91) -> anyhow::Result<()> {
92 let mut buffer = Vec::new();
94
95 let writer_props = WriterProperties::builder()
96 .set_compression(compression.unwrap_or(parquet::basic::Compression::SNAPPY))
97 .set_max_row_group_size(max_row_group_size.unwrap_or(5000))
98 .build();
99
100 let mut writer = ArrowWriter::try_new(&mut buffer, batches[0].schema(), Some(writer_props))?;
101 for batch in batches {
102 writer.write(batch)?;
103 }
104 writer.close()?;
105
106 object_store.put(path, buffer.into()).await?;
108
109 Ok(())
110}
111
112pub async fn combine_parquet_files(
118 file_paths: Vec<&str>,
119 new_file_path: &str,
120 storage_options: Option<std::collections::HashMap<String, String>>,
121 compression: Option<parquet::basic::Compression>,
122 max_row_group_size: Option<usize>,
123) -> anyhow::Result<()> {
124 if file_paths.len() <= 1 {
125 return Ok(());
126 }
127
128 let (object_store, base_path, _) =
130 create_object_store_from_path(file_paths[0], storage_options)?;
131
132 let object_paths: Vec<ObjectPath> = file_paths
134 .iter()
135 .map(|path| {
136 if base_path.is_empty() {
137 ObjectPath::from(*path)
138 } else {
139 ObjectPath::from(format!("{base_path}/{path}"))
140 }
141 })
142 .collect();
143
144 let new_object_path = if base_path.is_empty() {
145 ObjectPath::from(new_file_path)
146 } else {
147 ObjectPath::from(format!("{base_path}/{new_file_path}"))
148 };
149
150 combine_parquet_files_from_object_store(
151 object_store,
152 object_paths,
153 &new_object_path,
154 compression,
155 max_row_group_size,
156 )
157 .await
158}
159
160pub async fn combine_parquet_files_from_object_store(
166 object_store: Arc<dyn ObjectStore>,
167 file_paths: Vec<ObjectPath>,
168 new_file_path: &ObjectPath,
169 compression: Option<parquet::basic::Compression>,
170 max_row_group_size: Option<usize>,
171) -> anyhow::Result<()> {
172 if file_paths.len() <= 1 {
173 return Ok(());
174 }
175
176 let mut all_batches: Vec<RecordBatch> = Vec::new();
177
178 for path in &file_paths {
180 let data = object_store.get(path).await?.bytes().await?;
181 let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
182 let mut reader = builder.build()?;
183
184 for batch in reader.by_ref() {
185 all_batches.push(batch?);
186 }
187 }
188
189 write_batches_to_object_store(
191 &all_batches,
192 object_store.clone(),
193 new_file_path,
194 compression,
195 max_row_group_size,
196 )
197 .await?;
198
199 for path in &file_paths {
201 object_store.delete(path).await?;
202 }
203
204 Ok(())
205}
206
207pub async fn min_max_from_parquet_metadata(
217 file_path: &str,
218 storage_options: Option<std::collections::HashMap<String, String>>,
219 column_name: &str,
220) -> anyhow::Result<(u64, u64)> {
221 let (object_store, base_path, _) = create_object_store_from_path(file_path, storage_options)?;
222 let object_path = if base_path.is_empty() {
223 ObjectPath::from(file_path)
224 } else {
225 ObjectPath::from(format!("{base_path}/{file_path}"))
226 };
227
228 min_max_from_parquet_metadata_object_store(object_store, &object_path, column_name).await
229}
230
231pub async fn min_max_from_parquet_metadata_object_store(
241 object_store: Arc<dyn ObjectStore>,
242 file_path: &ObjectPath,
243 column_name: &str,
244) -> anyhow::Result<(u64, u64)> {
245 let data = object_store.get(file_path).await?.bytes().await?;
247 let reader = SerializedFileReader::new(data)?;
248
249 let metadata = reader.metadata();
250 let mut overall_min_value: Option<i64> = None;
251 let mut overall_max_value: Option<i64> = None;
252
253 for i in 0..metadata.num_row_groups() {
255 let row_group = metadata.row_group(i);
256
257 for j in 0..row_group.num_columns() {
259 let col_metadata = row_group.column(j);
260
261 if col_metadata.column_path().string() == column_name {
262 if let Some(stats) = col_metadata.statistics() {
263 if let Statistics::Int64(int64_stats) = stats {
265 if let Some(&min_value) = int64_stats.min_opt()
267 && (overall_min_value.is_none()
268 || min_value < overall_min_value.unwrap())
269 {
270 overall_min_value = Some(min_value);
271 }
272
273 if let Some(&max_value) = int64_stats.max_opt()
275 && (overall_max_value.is_none()
276 || max_value > overall_max_value.unwrap())
277 {
278 overall_max_value = Some(max_value);
279 }
280 } else {
281 anyhow::bail!("Warning: Column name '{column_name}' is not of type i64.");
282 }
283 } else {
284 anyhow::bail!(
285 "Warning: Statistics not available for column '{column_name}' in row group {i}."
286 );
287 }
288 }
289 }
290 }
291
292 if let (Some(min), Some(max)) = (overall_min_value, overall_max_value) {
294 Ok((min as u64, max as u64))
295 } else {
296 anyhow::bail!(
297 "Column '{column_name}' not found or has no Int64 statistics in any row group."
298 )
299 }
300}
301
302pub fn create_object_store_from_path(
321 path: &str,
322 storage_options: Option<std::collections::HashMap<String, String>>,
323) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
324 let uri = normalize_path_to_uri(path);
325
326 match uri.as_str() {
327 s if s.starts_with("s3://") => create_s3_store(&uri, storage_options),
328 s if s.starts_with("gs://") || s.starts_with("gcs://") => {
329 create_gcs_store(&uri, storage_options)
330 }
331 s if s.starts_with("azure://") => create_azure_store(&uri, storage_options),
332 s if s.starts_with("abfs://") => create_abfs_store(&uri, storage_options),
333 s if s.starts_with("http://") || s.starts_with("https://") => {
334 create_http_store(&uri, storage_options)
335 }
336 s if s.starts_with("file://") => create_local_store(&uri, true),
337 _ => create_local_store(&uri, false), }
339}
340
341#[must_use]
360pub fn normalize_path_to_uri(path: &str) -> String {
361 if path.contains("://") {
362 path.to_string()
364 } else {
365 if is_absolute_path(path) {
367 path_to_file_uri(path)
368 } else {
369 let absolute_path = std::env::current_dir().unwrap().join(path);
371 path_to_file_uri(&absolute_path.to_string_lossy())
372 }
373 }
374}
375
376#[must_use]
378fn is_absolute_path(path: &str) -> bool {
379 if path.starts_with('/') {
380 true
382 } else if path.len() >= 3
383 && path.chars().nth(1) == Some(':')
384 && path.chars().nth(2) == Some('\\')
385 {
386 true
388 } else if path.len() >= 3
389 && path.chars().nth(1) == Some(':')
390 && path.chars().nth(2) == Some('/')
391 {
392 true
394 } else if path.starts_with("\\\\") {
395 true
397 } else {
398 false
399 }
400}
401
402#[must_use]
404fn path_to_file_uri(path: &str) -> String {
405 if path.starts_with('/') {
406 format!("file://{path}")
408 } else if path.len() >= 3 && path.chars().nth(1) == Some(':') {
409 let normalized = path.replace('\\', "/");
411 format!("file:///{normalized}")
412 } else if let Some(without_prefix) = path.strip_prefix("\\\\") {
413 let normalized = without_prefix.replace('\\', "/");
415 format!("file://{normalized}")
416 } else {
417 format!("file://{path}")
419 }
420}
421
422fn create_local_store(
424 uri: &str,
425 is_file_uri: bool,
426) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
427 let path = if is_file_uri {
428 uri.strip_prefix("file://").unwrap_or(uri)
429 } else {
430 uri
431 };
432
433 let local_store = object_store::local::LocalFileSystem::new_with_prefix(path)?;
434 Ok((Arc::new(local_store), String::new(), uri.to_string()))
435}
436
437fn create_s3_store(
439 uri: &str,
440 storage_options: Option<std::collections::HashMap<String, String>>,
441) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
442 let (url, path) = parse_url_and_path(uri)?;
443 let bucket = extract_host(&url, "Invalid S3 URI: missing bucket")?;
444
445 let mut builder = object_store::aws::AmazonS3Builder::new().with_bucket_name(&bucket);
446
447 if let Some(options) = storage_options {
449 for (key, value) in options {
450 match key.as_str() {
451 "endpoint_url" => {
452 builder = builder.with_endpoint(&value);
453 }
454 "region" => {
455 builder = builder.with_region(&value);
456 }
457 "access_key_id" | "key" => {
458 builder = builder.with_access_key_id(&value);
459 }
460 "secret_access_key" | "secret" => {
461 builder = builder.with_secret_access_key(&value);
462 }
463 "session_token" | "token" => {
464 builder = builder.with_token(&value);
465 }
466 "allow_http" => {
467 let allow_http = value.to_lowercase() == "true";
468 builder = builder.with_allow_http(allow_http);
469 }
470 _ => {
471 log::warn!("Unknown S3 storage option: {key}");
473 }
474 }
475 }
476 }
477
478 let s3_store = builder.build()?;
479 Ok((Arc::new(s3_store), path, uri.to_string()))
480}
481
482fn create_gcs_store(
484 uri: &str,
485 storage_options: Option<std::collections::HashMap<String, String>>,
486) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
487 let (url, path) = parse_url_and_path(uri)?;
488 let bucket = extract_host(&url, "Invalid GCS URI: missing bucket")?;
489
490 let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new().with_bucket_name(&bucket);
491
492 if let Some(options) = storage_options {
494 for (key, value) in options {
495 match key.as_str() {
496 "service_account_path" => {
497 builder = builder.with_service_account_path(&value);
498 }
499 "service_account_key" => {
500 builder = builder.with_service_account_key(&value);
501 }
502 "project_id" => {
503 log::warn!(
506 "project_id should be set via service account or environment variables"
507 );
508 }
509 "application_credentials" => {
510 unsafe {
515 std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", &value);
516 }
517 }
518 _ => {
519 log::warn!("Unknown GCS storage option: {key}");
521 }
522 }
523 }
524 }
525
526 let gcs_store = builder.build()?;
527 Ok((Arc::new(gcs_store), path, uri.to_string()))
528}
529
530fn create_azure_store(
532 uri: &str,
533 storage_options: Option<std::collections::HashMap<String, String>>,
534) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
535 let (url, _) = parse_url_and_path(uri)?;
536 let account = extract_host(&url, "Invalid Azure URI: missing account")?;
537
538 let path_segments: Vec<&str> = url.path().trim_start_matches('/').split('/').collect();
539 if path_segments.is_empty() || path_segments[0].is_empty() {
540 anyhow::bail!("Invalid Azure URI: missing container");
541 }
542
543 let container = path_segments[0];
544 let path = if path_segments.len() > 1 {
545 path_segments[1..].join("/")
546 } else {
547 String::new()
548 };
549
550 let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
551 .with_account(&account)
552 .with_container_name(container);
553
554 if let Some(options) = storage_options {
556 for (key, value) in options {
557 match key.as_str() {
558 "account_name" => {
559 builder = builder.with_account(&value);
560 }
561 "account_key" => {
562 builder = builder.with_access_key(&value);
563 }
564 "sas_token" => {
565 let query_pairs: Vec<(String, String)> = value
567 .split('&')
568 .filter_map(|pair| {
569 let mut parts = pair.split('=');
570 match (parts.next(), parts.next()) {
571 (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
572 _ => None,
573 }
574 })
575 .collect();
576 builder = builder.with_sas_authorization(query_pairs);
577 }
578 "client_id" => {
579 builder = builder.with_client_id(&value);
580 }
581 "client_secret" => {
582 builder = builder.with_client_secret(&value);
583 }
584 "tenant_id" => {
585 builder = builder.with_tenant_id(&value);
586 }
587 _ => {
588 log::warn!("Unknown Azure storage option: {key}");
590 }
591 }
592 }
593 }
594
595 let azure_store = builder.build()?;
596 Ok((Arc::new(azure_store), path, uri.to_string()))
597}
598
599fn create_abfs_store(
601 uri: &str,
602 storage_options: Option<std::collections::HashMap<String, String>>,
603) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
604 let (url, path) = parse_url_and_path(uri)?;
605 let host = extract_host(&url, "Invalid ABFS URI: missing host")?;
606
607 let account = host
609 .split('.')
610 .next()
611 .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: cannot extract account from host"))?;
612
613 let container = url
615 .username()
616 .split('@')
617 .next()
618 .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: missing container"))?;
619
620 let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
621 .with_account(account)
622 .with_container_name(container);
623
624 if let Some(options) = storage_options {
626 for (key, value) in options {
627 match key.as_str() {
628 "account_name" => {
629 builder = builder.with_account(&value);
630 }
631 "account_key" => {
632 builder = builder.with_access_key(&value);
633 }
634 "sas_token" => {
635 let query_pairs: Vec<(String, String)> = value
637 .split('&')
638 .filter_map(|pair| {
639 let mut parts = pair.split('=');
640 match (parts.next(), parts.next()) {
641 (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
642 _ => None,
643 }
644 })
645 .collect();
646 builder = builder.with_sas_authorization(query_pairs);
647 }
648 "client_id" => {
649 builder = builder.with_client_id(&value);
650 }
651 "client_secret" => {
652 builder = builder.with_client_secret(&value);
653 }
654 "tenant_id" => {
655 builder = builder.with_tenant_id(&value);
656 }
657 _ => {
658 log::warn!("Unknown ABFS storage option: {key}");
660 }
661 }
662 }
663 }
664
665 let azure_store = builder.build()?;
666 Ok((Arc::new(azure_store), path, uri.to_string()))
667}
668
669fn create_http_store(
671 uri: &str,
672 storage_options: Option<std::collections::HashMap<String, String>>,
673) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
674 let (url, path) = parse_url_and_path(uri)?;
675 let base_url = format!("{}://{}", url.scheme(), url.host_str().unwrap_or(""));
676
677 let builder = object_store::http::HttpBuilder::new().with_url(base_url);
678
679 if let Some(options) = storage_options {
681 for (key, _value) in options {
682 log::warn!("Unknown HTTP storage option: {key}");
686 }
687 }
688
689 let http_store = builder.build()?;
690 Ok((Arc::new(http_store), path, uri.to_string()))
691}
692
693fn parse_url_and_path(uri: &str) -> anyhow::Result<(url::Url, String)> {
695 let url = url::Url::parse(uri)?;
696 let path = url.path().trim_start_matches('/').to_string();
697 Ok((url, path))
698}
699
700fn extract_host(url: &url::Url, error_msg: &str) -> anyhow::Result<String> {
702 url.host_str()
703 .map(ToString::to_string)
704 .ok_or_else(|| anyhow::anyhow!("{}", error_msg))
705}
706
707#[cfg(test)]
712mod tests {
713 use std::collections::HashMap;
714
715 use rstest::rstest;
716
717 use super::*;
718
719 #[rstest]
720 fn test_create_object_store_from_path_local() {
721 let temp_dir = std::env::temp_dir().join("nautilus_test");
723 std::fs::create_dir_all(&temp_dir).unwrap();
724
725 let result = create_object_store_from_path(temp_dir.to_str().unwrap(), None);
726 if let Err(e) = &result {
727 println!("Error: {e:?}");
728 }
729 assert!(result.is_ok());
730 let (_, base_path, uri) = result.unwrap();
731 assert_eq!(base_path, "");
732 assert_eq!(uri, format!("file://{}", temp_dir.to_str().unwrap()));
734
735 std::fs::remove_dir_all(&temp_dir).ok();
737 }
738
739 #[rstest]
740 fn test_create_object_store_from_path_s3() {
741 let mut options = HashMap::new();
742 options.insert(
743 "endpoint_url".to_string(),
744 "https://test.endpoint.com".to_string(),
745 );
746 options.insert("region".to_string(), "us-west-2".to_string());
747 options.insert("access_key_id".to_string(), "test_key".to_string());
748 options.insert("secret_access_key".to_string(), "test_secret".to_string());
749
750 let result = create_object_store_from_path("s3://test-bucket/path", Some(options));
751 assert!(result.is_ok());
752 let (_, base_path, uri) = result.unwrap();
753 assert_eq!(base_path, "path");
754 assert_eq!(uri, "s3://test-bucket/path");
755 }
756
757 #[rstest]
758 fn test_create_object_store_from_path_azure() {
759 let mut options = HashMap::new();
760 options.insert("account_name".to_string(), "testaccount".to_string());
761 options.insert("account_key".to_string(), "dGVzdGtleQ==".to_string()); let result =
765 create_object_store_from_path("azure://testaccount/container/path", Some(options));
766 if let Err(e) = &result {
767 println!("Azure Error: {e:?}");
768 }
769 assert!(result.is_ok());
770 let (_, base_path, uri) = result.unwrap();
771 assert_eq!(base_path, "path");
772 assert_eq!(uri, "azure://testaccount/container/path");
773 }
774
775 #[rstest]
776 fn test_create_object_store_from_path_gcs() {
777 let mut options = HashMap::new();
779 options.insert("project_id".to_string(), "test-project".to_string());
780
781 let result = create_object_store_from_path("gs://test-bucket/path", Some(options));
782 match result {
785 Ok((_, base_path, uri)) => {
786 assert_eq!(base_path, "path");
787 assert_eq!(uri, "gs://test-bucket/path");
788 }
789 Err(e) => {
790 let error_msg = format!("{e:?}");
792 assert!(error_msg.contains("test-bucket") || error_msg.contains("credential"));
793 }
794 }
795 }
796
797 #[rstest]
798 fn test_create_object_store_from_path_empty_options() {
799 let result = create_object_store_from_path("s3://test-bucket/path", None);
800 assert!(result.is_ok());
801 let (_, base_path, uri) = result.unwrap();
802 assert_eq!(base_path, "path");
803 assert_eq!(uri, "s3://test-bucket/path");
804 }
805
806 #[rstest]
807 fn test_parse_url_and_path() {
808 let result = parse_url_and_path("s3://bucket/path/to/file");
809 assert!(result.is_ok());
810 let (url, path) = result.unwrap();
811 assert_eq!(url.scheme(), "s3");
812 assert_eq!(url.host_str().unwrap(), "bucket");
813 assert_eq!(path, "path/to/file");
814 }
815
816 #[rstest]
817 fn test_extract_host() {
818 let url = url::Url::parse("s3://test-bucket/path").unwrap();
819 let result = extract_host(&url, "Test error");
820 assert!(result.is_ok());
821 assert_eq!(result.unwrap(), "test-bucket");
822 }
823
824 #[rstest]
825 fn test_normalize_path_to_uri() {
826 assert_eq!(normalize_path_to_uri("/tmp/test"), "file:///tmp/test");
828
829 assert_eq!(
831 normalize_path_to_uri("C:\\tmp\\test"),
832 "file:///C:/tmp/test"
833 );
834 assert_eq!(normalize_path_to_uri("C:/tmp/test"), "file:///C:/tmp/test");
835 assert_eq!(
836 normalize_path_to_uri("D:\\data\\file.txt"),
837 "file:///D:/data/file.txt"
838 );
839
840 assert_eq!(
842 normalize_path_to_uri("\\\\server\\share\\file"),
843 "file://server/share/file"
844 );
845
846 assert_eq!(
848 normalize_path_to_uri("s3://bucket/path"),
849 "s3://bucket/path"
850 );
851 assert_eq!(
852 normalize_path_to_uri("file:///tmp/test"),
853 "file:///tmp/test"
854 );
855 assert_eq!(
856 normalize_path_to_uri("https://example.com/path"),
857 "https://example.com/path"
858 );
859 }
860
861 #[rstest]
862 fn test_is_absolute_path() {
863 assert!(is_absolute_path("/tmp/test"));
865 assert!(is_absolute_path("/"));
866
867 assert!(is_absolute_path("C:\\tmp\\test"));
869 assert!(is_absolute_path("C:/tmp/test"));
870 assert!(is_absolute_path("D:\\"));
871 assert!(is_absolute_path("Z:/"));
872
873 assert!(is_absolute_path("\\\\server\\share"));
875 assert!(is_absolute_path("\\\\localhost\\c$"));
876
877 assert!(!is_absolute_path("tmp/test"));
879 assert!(!is_absolute_path("./test"));
880 assert!(!is_absolute_path("../test"));
881 assert!(!is_absolute_path("test.txt"));
882
883 assert!(!is_absolute_path(""));
885 assert!(!is_absolute_path("C"));
886 assert!(!is_absolute_path("C:"));
887 assert!(!is_absolute_path("\\"));
888 }
889
890 #[rstest]
891 fn test_path_to_file_uri() {
892 assert_eq!(path_to_file_uri("/tmp/test"), "file:///tmp/test");
894 assert_eq!(path_to_file_uri("/"), "file:///");
895
896 assert_eq!(path_to_file_uri("C:\\tmp\\test"), "file:///C:/tmp/test");
898 assert_eq!(path_to_file_uri("C:/tmp/test"), "file:///C:/tmp/test");
899 assert_eq!(path_to_file_uri("D:\\"), "file:///D:/");
900
901 assert_eq!(
903 path_to_file_uri("\\\\server\\share\\file"),
904 "file://server/share/file"
905 );
906 assert_eq!(
907 path_to_file_uri("\\\\localhost\\c$\\test"),
908 "file://localhost/c$/test"
909 );
910 }
911}