nautilus_persistence/
parquet.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use ahash::AHashMap;
19use arrow::record_batch::RecordBatch;
20use object_store::{ObjectStore, path::Path as ObjectPath};
21use parquet::{
22    arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
23    file::{
24        properties::WriterProperties,
25        reader::{FileReader, SerializedFileReader},
26        statistics::Statistics,
27    },
28};
29
30/// Writes a `RecordBatch` to a Parquet file using object store, with optional compression.
31///
32/// # Errors
33///
34/// Returns an error if writing to Parquet fails or any I/O operation fails.
35pub async fn write_batch_to_parquet(
36    batch: RecordBatch,
37    path: &str,
38    storage_options: Option<AHashMap<String, String>>,
39    compression: Option<parquet::basic::Compression>,
40    max_row_group_size: Option<usize>,
41) -> anyhow::Result<()> {
42    write_batches_to_parquet(
43        &[batch],
44        path,
45        storage_options,
46        compression,
47        max_row_group_size,
48    )
49    .await
50}
51
52/// Writes multiple `RecordBatch` items to a Parquet file using object store, with optional compression, row group sizing, and storage options.
53///
54/// # Errors
55///
56/// Returns an error if writing to Parquet fails or any I/O operation fails.
57pub async fn write_batches_to_parquet(
58    batches: &[RecordBatch],
59    path: &str,
60    storage_options: Option<AHashMap<String, String>>,
61    compression: Option<parquet::basic::Compression>,
62    max_row_group_size: Option<usize>,
63) -> anyhow::Result<()> {
64    let (object_store, base_path, _) = create_object_store_from_path(path, storage_options)?;
65    let object_path = if base_path.is_empty() {
66        ObjectPath::from(path)
67    } else {
68        ObjectPath::from(format!("{base_path}/{path}"))
69    };
70
71    write_batches_to_object_store(
72        batches,
73        object_store,
74        &object_path,
75        compression,
76        max_row_group_size,
77    )
78    .await
79}
80
81/// Writes multiple `RecordBatch` items to an object store URI, with optional compression and row group sizing.
82///
83/// # Errors
84///
85/// Returns an error if writing to Parquet fails or any I/O operation fails.
86pub async fn write_batches_to_object_store(
87    batches: &[RecordBatch],
88    object_store: Arc<dyn ObjectStore>,
89    path: &ObjectPath,
90    compression: Option<parquet::basic::Compression>,
91    max_row_group_size: Option<usize>,
92) -> anyhow::Result<()> {
93    // Create a temporary buffer to write the parquet data
94    let mut buffer = Vec::new();
95
96    let writer_props = WriterProperties::builder()
97        .set_compression(compression.unwrap_or(parquet::basic::Compression::SNAPPY))
98        .set_max_row_group_size(max_row_group_size.unwrap_or(5000))
99        .build();
100
101    let mut writer = ArrowWriter::try_new(&mut buffer, batches[0].schema(), Some(writer_props))?;
102    for batch in batches {
103        writer.write(batch)?;
104    }
105    writer.close()?;
106
107    // Upload the buffer to object store
108    object_store.put(path, buffer.into()).await?;
109
110    Ok(())
111}
112
113/// Combines multiple Parquet files using object store with storage options
114///
115/// # Errors
116///
117/// Returns an error if file reading or writing fails.
118pub async fn combine_parquet_files(
119    file_paths: Vec<&str>,
120    new_file_path: &str,
121    storage_options: Option<AHashMap<String, String>>,
122    compression: Option<parquet::basic::Compression>,
123    max_row_group_size: Option<usize>,
124) -> anyhow::Result<()> {
125    if file_paths.len() <= 1 {
126        return Ok(());
127    }
128
129    // Create object store from the first file path (assuming all files are in the same store)
130    let (object_store, base_path, _) =
131        create_object_store_from_path(file_paths[0], storage_options)?;
132
133    // Convert string paths to ObjectPath
134    let object_paths: Vec<ObjectPath> = file_paths
135        .iter()
136        .map(|path| {
137            if base_path.is_empty() {
138                ObjectPath::from(*path)
139            } else {
140                ObjectPath::from(format!("{base_path}/{path}"))
141            }
142        })
143        .collect();
144
145    let new_object_path = if base_path.is_empty() {
146        ObjectPath::from(new_file_path)
147    } else {
148        ObjectPath::from(format!("{base_path}/{new_file_path}"))
149    };
150
151    combine_parquet_files_from_object_store(
152        object_store,
153        object_paths,
154        &new_object_path,
155        compression,
156        max_row_group_size,
157    )
158    .await
159}
160
161/// Combines multiple Parquet files from object store
162///
163/// # Errors
164///
165/// Returns an error if file reading or writing fails.
166pub async fn combine_parquet_files_from_object_store(
167    object_store: Arc<dyn ObjectStore>,
168    file_paths: Vec<ObjectPath>,
169    new_file_path: &ObjectPath,
170    compression: Option<parquet::basic::Compression>,
171    max_row_group_size: Option<usize>,
172) -> anyhow::Result<()> {
173    if file_paths.len() <= 1 {
174        return Ok(());
175    }
176
177    let mut all_batches: Vec<RecordBatch> = Vec::new();
178
179    // Read all files from object store
180    for path in &file_paths {
181        let data = object_store.get(path).await?.bytes().await?;
182        let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
183        let mut reader = builder.build()?;
184
185        for batch in reader.by_ref() {
186            all_batches.push(batch?);
187        }
188    }
189
190    // Write combined batches to new location
191    write_batches_to_object_store(
192        &all_batches,
193        object_store.clone(),
194        new_file_path,
195        compression,
196        max_row_group_size,
197    )
198    .await?;
199
200    // Remove the merged files
201    for path in &file_paths {
202        if path != new_file_path {
203            object_store.delete(path).await?;
204        }
205    }
206
207    Ok(())
208}
209
210/// Extracts the minimum and maximum i64 values for the specified `column_name` from a Parquet file's metadata using object store with storage options.
211///
212/// # Errors
213///
214/// Returns an error if the file cannot be read, metadata parsing fails, or the column is missing or has no statistics.
215///
216/// # Panics
217///
218/// Panics if the Parquet metadata's min/max unwrap operations fail unexpectedly.
219pub async fn min_max_from_parquet_metadata(
220    file_path: &str,
221    storage_options: Option<AHashMap<String, String>>,
222    column_name: &str,
223) -> anyhow::Result<(u64, u64)> {
224    let (object_store, base_path, _) = create_object_store_from_path(file_path, storage_options)?;
225    let object_path = if base_path.is_empty() {
226        ObjectPath::from(file_path)
227    } else {
228        ObjectPath::from(format!("{base_path}/{file_path}"))
229    };
230
231    min_max_from_parquet_metadata_object_store(object_store, &object_path, column_name).await
232}
233
234/// Extracts the minimum and maximum i64 values for the specified `column_name` from a Parquet file's metadata in object store.
235///
236/// # Errors
237///
238/// Returns an error if the file cannot be read, metadata parsing fails, or the column is missing or has no statistics.
239///
240/// # Panics
241///
242/// Panics if the Parquet metadata's min/max unwrap operations fail unexpectedly.
243pub async fn min_max_from_parquet_metadata_object_store(
244    object_store: Arc<dyn ObjectStore>,
245    file_path: &ObjectPath,
246    column_name: &str,
247) -> anyhow::Result<(u64, u64)> {
248    // Download the parquet file from object store
249    let data = object_store.get(file_path).await?.bytes().await?;
250    let reader = SerializedFileReader::new(data)?;
251
252    let metadata = reader.metadata();
253    let mut overall_min_value: Option<i64> = None;
254    let mut overall_max_value: Option<i64> = None;
255
256    // Iterate through all row groups
257    for i in 0..metadata.num_row_groups() {
258        let row_group = metadata.row_group(i);
259
260        // Iterate through all columns in this row group
261        for j in 0..row_group.num_columns() {
262            let col_metadata = row_group.column(j);
263
264            if col_metadata.column_path().string() == column_name {
265                if let Some(stats) = col_metadata.statistics() {
266                    // Check if we have Int64 statistics
267                    if let Statistics::Int64(int64_stats) = stats {
268                        // Extract min value if available
269                        if let Some(&min_value) = int64_stats.min_opt()
270                            && (overall_min_value.is_none()
271                                || min_value < overall_min_value.unwrap())
272                        {
273                            overall_min_value = Some(min_value);
274                        }
275
276                        // Extract max value if available
277                        if let Some(&max_value) = int64_stats.max_opt()
278                            && (overall_max_value.is_none()
279                                || max_value > overall_max_value.unwrap())
280                        {
281                            overall_max_value = Some(max_value);
282                        }
283                    } else {
284                        anyhow::bail!("Warning: Column name '{column_name}' is not of type i64.");
285                    }
286                } else {
287                    anyhow::bail!(
288                        "Warning: Statistics not available for column '{column_name}' in row group {i}."
289                    );
290                }
291            }
292        }
293    }
294
295    // Return the min/max pair if both are available
296    if let (Some(min), Some(max)) = (overall_min_value, overall_max_value) {
297        Ok((min as u64, max as u64))
298    } else {
299        anyhow::bail!(
300            "Column '{column_name}' not found or has no Int64 statistics in any row group."
301        )
302    }
303}
304
305/// Creates an object store from a URI string with optional storage options.
306///
307/// Supports multiple cloud storage providers:
308/// - AWS S3: `s3://bucket/path`
309/// - Google Cloud Storage: `gs://bucket/path` or `gcs://bucket/path`
310/// - Azure Blob Storage: `az://account/container/path` or `abfs://container@account.dfs.core.windows.net/path`
311/// - HTTP/WebDAV: `http://` or `https://`
312/// - Local files: `file://path` or plain paths
313///
314/// # Parameters
315///
316/// - `path`: The URI string for the storage location.
317/// - `storage_options`: Optional `HashMap` containing storage-specific configuration options:
318///   - For S3: `endpoint_url`, region, `access_key_id`, `secret_access_key`, `session_token`, etc.
319///   - For GCS: `service_account_path`, `service_account_key`, `project_id`, etc.
320///   - For Azure: `account_name`, `account_key`, `sas_token`, etc.
321///
322/// Returns a tuple of (`ObjectStore`, `base_path`, `normalized_uri`)
323#[allow(unused_variables)]
324pub fn create_object_store_from_path(
325    path: &str,
326    storage_options: Option<AHashMap<String, String>>,
327) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
328    let uri = normalize_path_to_uri(path);
329
330    match uri.as_str() {
331        #[cfg(feature = "cloud")]
332        s if s.starts_with("s3://") => create_s3_store(&uri, storage_options),
333        #[cfg(feature = "cloud")]
334        s if s.starts_with("gs://") || s.starts_with("gcs://") => {
335            create_gcs_store(&uri, storage_options)
336        }
337        #[cfg(feature = "cloud")]
338        s if s.starts_with("az://") => create_azure_store(&uri, storage_options),
339        #[cfg(feature = "cloud")]
340        s if s.starts_with("abfs://") => create_abfs_store(&uri, storage_options),
341        #[cfg(feature = "cloud")]
342        s if s.starts_with("http://") || s.starts_with("https://") => {
343            create_http_store(&uri, storage_options)
344        }
345        #[cfg(not(feature = "cloud"))]
346        s if s.starts_with("s3://")
347            || s.starts_with("gs://")
348            || s.starts_with("gcs://")
349            || s.starts_with("az://")
350            || s.starts_with("abfs://")
351            || s.starts_with("http://")
352            || s.starts_with("https://") =>
353        {
354            anyhow::bail!("Cloud storage support requires the 'cloud' feature: {uri}")
355        }
356        s if s.starts_with("file://") => create_local_store(&uri, true),
357        _ => create_local_store(&uri, false), // Fallback: assume local path
358    }
359}
360
361/// Normalizes a path to URI format for consistent object store usage.
362///
363/// If the path is already a URI (contains "://"), returns it as-is.
364/// Otherwise, converts local paths to file:// URIs with proper cross-platform handling.
365///
366/// Supported URI schemes:
367/// - `s3://` for AWS S3
368/// - `gs://` or `gcs://` for Google Cloud Storage
369/// - `az://` or `abfs://` for Azure Blob Storage
370/// - `http://` or `https://` for HTTP/WebDAV
371/// - `file://` for local files
372///
373/// # Cross-platform Path Handling
374///
375/// - Unix absolute paths: `/path/to/file` → `file:///path/to/file`
376/// - Windows drive paths: `C:\path\to\file` → `file:///C:/path/to/file`
377/// - Windows UNC paths: `\\server\share\file` → `file://server/share/file`
378/// - Relative paths: converted to absolute using current directory
379#[must_use]
380pub fn normalize_path_to_uri(path: &str) -> String {
381    if path.contains("://") {
382        // Already a URI - return as-is
383        path.to_string()
384    } else {
385        // Convert local path to file:// URI with cross-platform support
386        if is_absolute_path(path) {
387            path_to_file_uri(path)
388        } else {
389            // Relative path - make it absolute first
390            let absolute_path = std::env::current_dir().unwrap().join(path);
391            path_to_file_uri(&absolute_path.to_string_lossy())
392        }
393    }
394}
395
396/// Checks if a path is absolute on the current platform.
397#[must_use]
398fn is_absolute_path(path: &str) -> bool {
399    if path.starts_with('/') {
400        // Unix absolute path
401        true
402    } else if path.len() >= 3
403        && path.chars().nth(1) == Some(':')
404        && path.chars().nth(2) == Some('\\')
405    {
406        // Windows drive path like C:\
407        true
408    } else if path.len() >= 3
409        && path.chars().nth(1) == Some(':')
410        && path.chars().nth(2) == Some('/')
411    {
412        // Windows drive path with forward slashes like C:/
413        true
414    } else if path.starts_with("\\\\") {
415        // Windows UNC path
416        true
417    } else {
418        false
419    }
420}
421
422/// Converts an absolute path to a file:// URI with proper platform handling.
423#[must_use]
424fn path_to_file_uri(path: &str) -> String {
425    if path.starts_with('/') {
426        // Unix absolute path
427        format!("file://{path}")
428    } else if path.len() >= 3 && path.chars().nth(1) == Some(':') {
429        // Windows drive path - normalize separators and add proper prefix
430        let normalized = path.replace('\\', "/");
431        format!("file:///{normalized}")
432    } else if let Some(without_prefix) = path.strip_prefix("\\\\") {
433        // Windows UNC path \\server\share -> file://server/share
434        let normalized = without_prefix.replace('\\', "/");
435        format!("file://{normalized}")
436    } else {
437        // Fallback - treat as relative to root
438        format!("file://{path}")
439    }
440}
441
442/// Helper function to create local file system object store
443fn create_local_store(
444    uri: &str,
445    is_file_uri: bool,
446) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
447    let path = if is_file_uri {
448        uri.strip_prefix("file://").unwrap_or(uri)
449    } else {
450        uri
451    };
452
453    let local_store = object_store::local::LocalFileSystem::new_with_prefix(path)?;
454    Ok((Arc::new(local_store), String::new(), uri.to_string()))
455}
456
457/// Helper function to create S3 object store with options.
458#[cfg(feature = "cloud")]
459fn create_s3_store(
460    uri: &str,
461    storage_options: Option<AHashMap<String, String>>,
462) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
463    let (url, path) = parse_url_and_path(uri)?;
464    let bucket = extract_host(&url, "Invalid S3 URI: missing bucket")?;
465
466    let mut builder = object_store::aws::AmazonS3Builder::new().with_bucket_name(&bucket);
467
468    // Apply storage options if provided
469    if let Some(options) = storage_options {
470        for (key, value) in options {
471            match key.as_str() {
472                "endpoint_url" => {
473                    builder = builder.with_endpoint(&value);
474                }
475                "region" => {
476                    builder = builder.with_region(&value);
477                }
478                "access_key_id" | "key" => {
479                    builder = builder.with_access_key_id(&value);
480                }
481                "secret_access_key" | "secret" => {
482                    builder = builder.with_secret_access_key(&value);
483                }
484                "session_token" | "token" => {
485                    builder = builder.with_token(&value);
486                }
487                "allow_http" => {
488                    let allow_http = value.to_lowercase() == "true";
489                    builder = builder.with_allow_http(allow_http);
490                }
491                _ => {
492                    // Ignore unknown options for forward compatibility
493                    log::warn!("Unknown S3 storage option: {key}");
494                }
495            }
496        }
497    }
498
499    let s3_store = builder.build()?;
500    Ok((Arc::new(s3_store), path, uri.to_string()))
501}
502
503/// Helper function to create GCS object store with options.
504#[cfg(feature = "cloud")]
505fn create_gcs_store(
506    uri: &str,
507    storage_options: Option<AHashMap<String, String>>,
508) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
509    let (url, path) = parse_url_and_path(uri)?;
510    let bucket = extract_host(&url, "Invalid GCS URI: missing bucket")?;
511
512    let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new().with_bucket_name(&bucket);
513
514    // Apply storage options if provided
515    if let Some(options) = storage_options {
516        for (key, value) in options {
517            match key.as_str() {
518                "service_account_path" => {
519                    builder = builder.with_service_account_path(&value);
520                }
521                "service_account_key" => {
522                    builder = builder.with_service_account_key(&value);
523                }
524                "project_id" => {
525                    // Note: GoogleCloudStorageBuilder doesn't have with_project_id method
526                    // This would need to be handled via environment variables or service account
527                    log::warn!(
528                        "project_id should be set via service account or environment variables"
529                    );
530                }
531                "application_credentials" => {
532                    // Set GOOGLE_APPLICATION_CREDENTIALS env var required by Google auth libraries.
533                    // SAFETY: std::env::set_var is marked unsafe because it mutates global state and
534                    // can break signal-safe code. We only call it during configuration before any
535                    // multi-threaded work starts, so it is considered safe in this context.
536                    unsafe {
537                        std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", &value);
538                    }
539                }
540                _ => {
541                    // Ignore unknown options for forward compatibility
542                    log::warn!("Unknown GCS storage option: {key}");
543                }
544            }
545        }
546    }
547
548    let gcs_store = builder.build()?;
549    Ok((Arc::new(gcs_store), path, uri.to_string()))
550}
551
552/// Helper function to create Azure object store with options.
553#[cfg(feature = "cloud")]
554fn create_azure_store(
555    uri: &str,
556    storage_options: Option<AHashMap<String, String>>,
557) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
558    let (url, _) = parse_url_and_path(uri)?;
559    let container = extract_host(&url, "Invalid Azure URI: missing container")?;
560
561    let path = url.path().trim_start_matches('/').to_string();
562
563    let mut builder =
564        object_store::azure::MicrosoftAzureBuilder::new().with_container_name(container);
565
566    // Apply storage options if provided
567    if let Some(options) = storage_options {
568        for (key, value) in options {
569            match key.as_str() {
570                "account_name" => {
571                    builder = builder.with_account(&value);
572                }
573                "account_key" => {
574                    builder = builder.with_access_key(&value);
575                }
576                "sas_token" => {
577                    // Parse SAS token as query string parameters
578                    let query_pairs: Vec<(String, String)> = value
579                        .split('&')
580                        .filter_map(|pair| {
581                            let mut parts = pair.split('=');
582                            match (parts.next(), parts.next()) {
583                                (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
584                                _ => None,
585                            }
586                        })
587                        .collect();
588                    builder = builder.with_sas_authorization(query_pairs);
589                }
590                "client_id" => {
591                    builder = builder.with_client_id(&value);
592                }
593                "client_secret" => {
594                    builder = builder.with_client_secret(&value);
595                }
596                "tenant_id" => {
597                    builder = builder.with_tenant_id(&value);
598                }
599                _ => {
600                    // Ignore unknown options for forward compatibility
601                    log::warn!("Unknown Azure storage option: {key}");
602                }
603            }
604        }
605    }
606
607    let azure_store = builder.build()?;
608    Ok((Arc::new(azure_store), path, uri.to_string()))
609}
610
611/// Helper function to create Azure object store from abfs:// URI with options.
612#[cfg(feature = "cloud")]
613fn create_abfs_store(
614    uri: &str,
615    storage_options: Option<AHashMap<String, String>>,
616) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
617    let (url, path) = parse_url_and_path(uri)?;
618    let host = extract_host(&url, "Invalid ABFS URI: missing host")?;
619
620    // Extract account from host (account.dfs.core.windows.net)
621    let account = host
622        .split('.')
623        .next()
624        .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: cannot extract account from host"))?;
625
626    // Extract container from username part
627    let container = url
628        .username()
629        .split('@')
630        .next()
631        .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: missing container"))?;
632
633    let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
634        .with_account(account)
635        .with_container_name(container);
636
637    // Apply storage options if provided (same as Azure store)
638    if let Some(options) = storage_options {
639        for (key, value) in options {
640            match key.as_str() {
641                "account_name" => {
642                    builder = builder.with_account(&value);
643                }
644                "account_key" => {
645                    builder = builder.with_access_key(&value);
646                }
647                "sas_token" => {
648                    // Parse SAS token as query string parameters
649                    let query_pairs: Vec<(String, String)> = value
650                        .split('&')
651                        .filter_map(|pair| {
652                            let mut parts = pair.split('=');
653                            match (parts.next(), parts.next()) {
654                                (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
655                                _ => None,
656                            }
657                        })
658                        .collect();
659                    builder = builder.with_sas_authorization(query_pairs);
660                }
661                "client_id" => {
662                    builder = builder.with_client_id(&value);
663                }
664                "client_secret" => {
665                    builder = builder.with_client_secret(&value);
666                }
667                "tenant_id" => {
668                    builder = builder.with_tenant_id(&value);
669                }
670                _ => {
671                    // Ignore unknown options for forward compatibility
672                    log::warn!("Unknown ABFS storage option: {key}");
673                }
674            }
675        }
676    }
677
678    let azure_store = builder.build()?;
679    Ok((Arc::new(azure_store), path, uri.to_string()))
680}
681
682/// Helper function to create HTTP object store with options.
683#[cfg(feature = "cloud")]
684fn create_http_store(
685    uri: &str,
686    storage_options: Option<AHashMap<String, String>>,
687) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
688    let (url, path) = parse_url_and_path(uri)?;
689    let base_url = format!("{}://{}", url.scheme(), url.host_str().unwrap_or(""));
690
691    let builder = object_store::http::HttpBuilder::new().with_url(base_url);
692
693    // Apply storage options if provided
694    if let Some(options) = storage_options {
695        for (key, _value) in options {
696            // HTTP builder has limited configuration options
697            // Most HTTP-specific options would be handled via client options
698            // Ignore unknown options for forward compatibility
699            log::warn!("Unknown HTTP storage option: {key}");
700        }
701    }
702
703    let http_store = builder.build()?;
704    Ok((Arc::new(http_store), path, uri.to_string()))
705}
706
707/// Helper function to parse URL and extract path component.
708#[cfg(feature = "cloud")]
709fn parse_url_and_path(uri: &str) -> anyhow::Result<(url::Url, String)> {
710    let url = url::Url::parse(uri)?;
711    let path = url.path().trim_start_matches('/').to_string();
712    Ok((url, path))
713}
714
715/// Helper function to extract host from URL with error handling.
716#[cfg(feature = "cloud")]
717fn extract_host(url: &url::Url, error_msg: &str) -> anyhow::Result<String> {
718    url.host_str()
719        .map(ToString::to_string)
720        .ok_or_else(|| anyhow::anyhow!("{error_msg}"))
721}
722
723#[cfg(test)]
724mod tests {
725    #[cfg(feature = "cloud")]
726    use ahash::AHashMap;
727    use rstest::rstest;
728
729    use super::*;
730
731    #[rstest]
732    fn test_create_object_store_from_path_local() {
733        // Create a temporary directory for testing
734        let temp_dir = std::env::temp_dir().join("nautilus_test");
735        std::fs::create_dir_all(&temp_dir).unwrap();
736
737        let result = create_object_store_from_path(temp_dir.to_str().unwrap(), None);
738        if let Err(e) = &result {
739            println!("Error: {e:?}");
740        }
741        assert!(result.is_ok());
742        let (_, base_path, uri) = result.unwrap();
743        assert_eq!(base_path, "");
744        // The URI should be normalized to file:// format
745        assert_eq!(uri, format!("file://{}", temp_dir.to_str().unwrap()));
746
747        // Clean up
748        std::fs::remove_dir_all(&temp_dir).ok();
749    }
750
751    #[rstest]
752    #[cfg(feature = "cloud")]
753    fn test_create_object_store_from_path_s3() {
754        let mut options = AHashMap::new();
755        options.insert(
756            "endpoint_url".to_string(),
757            "https://test.endpoint.com".to_string(),
758        );
759        options.insert("region".to_string(), "us-west-2".to_string());
760        options.insert("access_key_id".to_string(), "test_key".to_string());
761        options.insert("secret_access_key".to_string(), "test_secret".to_string());
762
763        let result = create_object_store_from_path("s3://test-bucket/path", Some(options));
764        assert!(result.is_ok());
765        let (_, base_path, uri) = result.unwrap();
766        assert_eq!(base_path, "path");
767        assert_eq!(uri, "s3://test-bucket/path");
768    }
769
770    #[rstest]
771    #[cfg(feature = "cloud")]
772    fn test_create_object_store_from_path_azure() {
773        let mut options = AHashMap::new();
774        options.insert("account_name".to_string(), "testaccount".to_string());
775        // Use a valid base64 encoded key for testing
776        options.insert("account_key".to_string(), "dGVzdGtleQ==".to_string()); // "testkey" in base64
777
778        let result = create_object_store_from_path("az://container/path", Some(options));
779        if let Err(e) = &result {
780            println!("Azure Error: {e:?}");
781        }
782        assert!(result.is_ok());
783        let (_, base_path, uri) = result.unwrap();
784        assert_eq!(base_path, "path");
785        assert_eq!(uri, "az://container/path");
786    }
787
788    #[rstest]
789    #[cfg(feature = "cloud")]
790    fn test_create_object_store_from_path_gcs() {
791        // Test GCS without service account (will use default credentials or fail gracefully)
792        let mut options = AHashMap::new();
793        options.insert("project_id".to_string(), "test-project".to_string());
794
795        let result = create_object_store_from_path("gs://test-bucket/path", Some(options));
796        // GCS might fail due to missing credentials, but we're testing the path parsing
797        // The function should at least parse the URI correctly before failing on auth
798        match result {
799            Ok((_, base_path, uri)) => {
800                assert_eq!(base_path, "path");
801                assert_eq!(uri, "gs://test-bucket/path");
802            }
803            Err(e) => {
804                // Expected to fail due to missing credentials, but should contain bucket info
805                let error_msg = format!("{e:?}");
806                assert!(error_msg.contains("test-bucket") || error_msg.contains("credential"));
807            }
808        }
809    }
810
811    #[rstest]
812    #[cfg(feature = "cloud")]
813    fn test_create_object_store_from_path_empty_options() {
814        let result = create_object_store_from_path("s3://test-bucket/path", None);
815        assert!(result.is_ok());
816        let (_, base_path, uri) = result.unwrap();
817        assert_eq!(base_path, "path");
818        assert_eq!(uri, "s3://test-bucket/path");
819    }
820
821    #[rstest]
822    #[cfg(feature = "cloud")]
823    fn test_parse_url_and_path() {
824        let result = parse_url_and_path("s3://bucket/path/to/file");
825        assert!(result.is_ok());
826        let (url, path) = result.unwrap();
827        assert_eq!(url.scheme(), "s3");
828        assert_eq!(url.host_str().unwrap(), "bucket");
829        assert_eq!(path, "path/to/file");
830    }
831
832    #[rstest]
833    #[cfg(feature = "cloud")]
834    fn test_extract_host() {
835        let url = url::Url::parse("s3://test-bucket/path").unwrap();
836        let result = extract_host(&url, "Test error");
837        assert!(result.is_ok());
838        assert_eq!(result.unwrap(), "test-bucket");
839    }
840
841    #[rstest]
842    fn test_normalize_path_to_uri() {
843        // Unix absolute paths
844        assert_eq!(normalize_path_to_uri("/tmp/test"), "file:///tmp/test");
845
846        // Windows drive paths
847        assert_eq!(
848            normalize_path_to_uri("C:\\tmp\\test"),
849            "file:///C:/tmp/test"
850        );
851        assert_eq!(normalize_path_to_uri("C:/tmp/test"), "file:///C:/tmp/test");
852        assert_eq!(
853            normalize_path_to_uri("D:\\data\\file.txt"),
854            "file:///D:/data/file.txt"
855        );
856
857        // Windows UNC paths
858        assert_eq!(
859            normalize_path_to_uri("\\\\server\\share\\file"),
860            "file://server/share/file"
861        );
862
863        // Already URIs - should remain unchanged
864        assert_eq!(
865            normalize_path_to_uri("s3://bucket/path"),
866            "s3://bucket/path"
867        );
868        assert_eq!(
869            normalize_path_to_uri("file:///tmp/test"),
870            "file:///tmp/test"
871        );
872        assert_eq!(
873            normalize_path_to_uri("https://example.com/path"),
874            "https://example.com/path"
875        );
876    }
877
878    #[rstest]
879    fn test_is_absolute_path() {
880        // Unix absolute paths
881        assert!(is_absolute_path("/tmp/test"));
882        assert!(is_absolute_path("/"));
883
884        // Windows drive paths
885        assert!(is_absolute_path("C:\\tmp\\test"));
886        assert!(is_absolute_path("C:/tmp/test"));
887        assert!(is_absolute_path("D:\\"));
888        assert!(is_absolute_path("Z:/"));
889
890        // Windows UNC paths
891        assert!(is_absolute_path("\\\\server\\share"));
892        assert!(is_absolute_path("\\\\localhost\\c$"));
893
894        // Relative paths
895        assert!(!is_absolute_path("tmp/test"));
896        assert!(!is_absolute_path("./test"));
897        assert!(!is_absolute_path("../test"));
898        assert!(!is_absolute_path("test.txt"));
899
900        // Edge cases
901        assert!(!is_absolute_path(""));
902        assert!(!is_absolute_path("C"));
903        assert!(!is_absolute_path("C:"));
904        assert!(!is_absolute_path("\\"));
905    }
906
907    #[rstest]
908    fn test_path_to_file_uri() {
909        // Unix absolute paths
910        assert_eq!(path_to_file_uri("/tmp/test"), "file:///tmp/test");
911        assert_eq!(path_to_file_uri("/"), "file:///");
912
913        // Windows drive paths
914        assert_eq!(path_to_file_uri("C:\\tmp\\test"), "file:///C:/tmp/test");
915        assert_eq!(path_to_file_uri("C:/tmp/test"), "file:///C:/tmp/test");
916        assert_eq!(path_to_file_uri("D:\\"), "file:///D:/");
917
918        // Windows UNC paths
919        assert_eq!(
920            path_to_file_uri("\\\\server\\share\\file"),
921            "file://server/share/file"
922        );
923        assert_eq!(
924            path_to_file_uri("\\\\localhost\\c$\\test"),
925            "file://localhost/c$/test"
926        );
927    }
928}