nautilus_persistence/
parquet.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use object_store::{ObjectStore, path::Path as ObjectPath};
20use parquet::{
21    arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
22    file::{
23        properties::WriterProperties,
24        reader::{FileReader, SerializedFileReader},
25        statistics::Statistics,
26    },
27};
28
29/// Writes a `RecordBatch` to a Parquet file using object store, with optional compression.
30///
31/// # Errors
32///
33/// Returns an error if writing to Parquet fails or any I/O operation fails.
34pub async fn write_batch_to_parquet(
35    batch: RecordBatch,
36    path: &str,
37    storage_options: Option<std::collections::HashMap<String, String>>,
38    compression: Option<parquet::basic::Compression>,
39    max_row_group_size: Option<usize>,
40) -> anyhow::Result<()> {
41    write_batches_to_parquet(
42        &[batch],
43        path,
44        storage_options,
45        compression,
46        max_row_group_size,
47    )
48    .await
49}
50
51/// Writes multiple `RecordBatch` items to a Parquet file using object store, with optional compression, row group sizing, and storage options.
52///
53/// # Errors
54///
55/// Returns an error if writing to Parquet fails or any I/O operation fails.
56pub async fn write_batches_to_parquet(
57    batches: &[RecordBatch],
58    path: &str,
59    storage_options: Option<std::collections::HashMap<String, String>>,
60    compression: Option<parquet::basic::Compression>,
61    max_row_group_size: Option<usize>,
62) -> anyhow::Result<()> {
63    let (object_store, base_path, _) = create_object_store_from_path(path, storage_options)?;
64    let object_path = if base_path.is_empty() {
65        ObjectPath::from(path)
66    } else {
67        ObjectPath::from(format!("{base_path}/{path}"))
68    };
69
70    write_batches_to_object_store(
71        batches,
72        object_store,
73        &object_path,
74        compression,
75        max_row_group_size,
76    )
77    .await
78}
79
80/// Writes multiple `RecordBatch` items to an object store URI, with optional compression and row group sizing.
81///
82/// # Errors
83///
84/// Returns an error if writing to Parquet fails or any I/O operation fails.
85pub async fn write_batches_to_object_store(
86    batches: &[RecordBatch],
87    object_store: Arc<dyn ObjectStore>,
88    path: &ObjectPath,
89    compression: Option<parquet::basic::Compression>,
90    max_row_group_size: Option<usize>,
91) -> anyhow::Result<()> {
92    // Create a temporary buffer to write the parquet data
93    let mut buffer = Vec::new();
94
95    let writer_props = WriterProperties::builder()
96        .set_compression(compression.unwrap_or(parquet::basic::Compression::SNAPPY))
97        .set_max_row_group_size(max_row_group_size.unwrap_or(5000))
98        .build();
99
100    let mut writer = ArrowWriter::try_new(&mut buffer, batches[0].schema(), Some(writer_props))?;
101    for batch in batches {
102        writer.write(batch)?;
103    }
104    writer.close()?;
105
106    // Upload the buffer to object store
107    object_store.put(path, buffer.into()).await?;
108
109    Ok(())
110}
111
112/// Combines multiple Parquet files using object store with storage options
113///
114/// # Errors
115///
116/// Returns an error if file reading or writing fails.
117pub async fn combine_parquet_files(
118    file_paths: Vec<&str>,
119    new_file_path: &str,
120    storage_options: Option<std::collections::HashMap<String, String>>,
121    compression: Option<parquet::basic::Compression>,
122    max_row_group_size: Option<usize>,
123) -> anyhow::Result<()> {
124    if file_paths.len() <= 1 {
125        return Ok(());
126    }
127
128    // Create object store from the first file path (assuming all files are in the same store)
129    let (object_store, base_path, _) =
130        create_object_store_from_path(file_paths[0], storage_options)?;
131
132    // Convert string paths to ObjectPath
133    let object_paths: Vec<ObjectPath> = file_paths
134        .iter()
135        .map(|path| {
136            if base_path.is_empty() {
137                ObjectPath::from(*path)
138            } else {
139                ObjectPath::from(format!("{base_path}/{path}"))
140            }
141        })
142        .collect();
143
144    let new_object_path = if base_path.is_empty() {
145        ObjectPath::from(new_file_path)
146    } else {
147        ObjectPath::from(format!("{base_path}/{new_file_path}"))
148    };
149
150    combine_parquet_files_from_object_store(
151        object_store,
152        object_paths,
153        &new_object_path,
154        compression,
155        max_row_group_size,
156    )
157    .await
158}
159
160/// Combines multiple Parquet files from object store
161///
162/// # Errors
163///
164/// Returns an error if file reading or writing fails.
165pub async fn combine_parquet_files_from_object_store(
166    object_store: Arc<dyn ObjectStore>,
167    file_paths: Vec<ObjectPath>,
168    new_file_path: &ObjectPath,
169    compression: Option<parquet::basic::Compression>,
170    max_row_group_size: Option<usize>,
171) -> anyhow::Result<()> {
172    if file_paths.len() <= 1 {
173        return Ok(());
174    }
175
176    let mut all_batches: Vec<RecordBatch> = Vec::new();
177
178    // Read all files from object store
179    for path in &file_paths {
180        let data = object_store.get(path).await?.bytes().await?;
181        let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
182        let mut reader = builder.build()?;
183
184        for batch in reader.by_ref() {
185            all_batches.push(batch?);
186        }
187    }
188
189    // Write combined batches to new location
190    write_batches_to_object_store(
191        &all_batches,
192        object_store.clone(),
193        new_file_path,
194        compression,
195        max_row_group_size,
196    )
197    .await?;
198
199    // Remove the merged files
200    for path in &file_paths {
201        if path != new_file_path {
202            object_store.delete(path).await?;
203        }
204    }
205
206    Ok(())
207}
208
209/// Extracts the minimum and maximum i64 values for the specified `column_name` from a Parquet file's metadata using object store with storage options.
210///
211/// # Errors
212///
213/// Returns an error if the file cannot be read, metadata parsing fails, or the column is missing or has no statistics.
214///
215/// # Panics
216///
217/// Panics if the Parquet metadata's min/max unwrap operations fail unexpectedly.
218pub async fn min_max_from_parquet_metadata(
219    file_path: &str,
220    storage_options: Option<std::collections::HashMap<String, String>>,
221    column_name: &str,
222) -> anyhow::Result<(u64, u64)> {
223    let (object_store, base_path, _) = create_object_store_from_path(file_path, storage_options)?;
224    let object_path = if base_path.is_empty() {
225        ObjectPath::from(file_path)
226    } else {
227        ObjectPath::from(format!("{base_path}/{file_path}"))
228    };
229
230    min_max_from_parquet_metadata_object_store(object_store, &object_path, column_name).await
231}
232
233/// Extracts the minimum and maximum i64 values for the specified `column_name` from a Parquet file's metadata in object store.
234///
235/// # Errors
236///
237/// Returns an error if the file cannot be read, metadata parsing fails, or the column is missing or has no statistics.
238///
239/// # Panics
240///
241/// Panics if the Parquet metadata's min/max unwrap operations fail unexpectedly.
242pub async fn min_max_from_parquet_metadata_object_store(
243    object_store: Arc<dyn ObjectStore>,
244    file_path: &ObjectPath,
245    column_name: &str,
246) -> anyhow::Result<(u64, u64)> {
247    // Download the parquet file from object store
248    let data = object_store.get(file_path).await?.bytes().await?;
249    let reader = SerializedFileReader::new(data)?;
250
251    let metadata = reader.metadata();
252    let mut overall_min_value: Option<i64> = None;
253    let mut overall_max_value: Option<i64> = None;
254
255    // Iterate through all row groups
256    for i in 0..metadata.num_row_groups() {
257        let row_group = metadata.row_group(i);
258
259        // Iterate through all columns in this row group
260        for j in 0..row_group.num_columns() {
261            let col_metadata = row_group.column(j);
262
263            if col_metadata.column_path().string() == column_name {
264                if let Some(stats) = col_metadata.statistics() {
265                    // Check if we have Int64 statistics
266                    if let Statistics::Int64(int64_stats) = stats {
267                        // Extract min value if available
268                        if let Some(&min_value) = int64_stats.min_opt()
269                            && (overall_min_value.is_none()
270                                || min_value < overall_min_value.unwrap())
271                        {
272                            overall_min_value = Some(min_value);
273                        }
274
275                        // Extract max value if available
276                        if let Some(&max_value) = int64_stats.max_opt()
277                            && (overall_max_value.is_none()
278                                || max_value > overall_max_value.unwrap())
279                        {
280                            overall_max_value = Some(max_value);
281                        }
282                    } else {
283                        anyhow::bail!("Warning: Column name '{column_name}' is not of type i64.");
284                    }
285                } else {
286                    anyhow::bail!(
287                        "Warning: Statistics not available for column '{column_name}' in row group {i}."
288                    );
289                }
290            }
291        }
292    }
293
294    // Return the min/max pair if both are available
295    if let (Some(min), Some(max)) = (overall_min_value, overall_max_value) {
296        Ok((min as u64, max as u64))
297    } else {
298        anyhow::bail!(
299            "Column '{column_name}' not found or has no Int64 statistics in any row group."
300        )
301    }
302}
303
304/// Creates an object store from a URI string with optional storage options.
305///
306/// Supports multiple cloud storage providers:
307/// - AWS S3: `s3://bucket/path`
308/// - Google Cloud Storage: `gs://bucket/path` or `gcs://bucket/path`
309/// - Azure Blob Storage: `az://account/container/path` or `abfs://container@account.dfs.core.windows.net/path`
310/// - HTTP/WebDAV: `http://` or `https://`
311/// - Local files: `file://path` or plain paths
312///
313/// # Parameters
314///
315/// - `path`: The URI string for the storage location.
316/// - `storage_options`: Optional `HashMap` containing storage-specific configuration options:
317///   - For S3: `endpoint_url`, region, `access_key_id`, `secret_access_key`, `session_token`, etc.
318///   - For GCS: `service_account_path`, `service_account_key`, `project_id`, etc.
319///   - For Azure: `account_name`, `account_key`, `sas_token`, etc.
320///
321/// Returns a tuple of (`ObjectStore`, `base_path`, `normalized_uri`)
322pub fn create_object_store_from_path(
323    path: &str,
324    storage_options: Option<std::collections::HashMap<String, String>>,
325) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
326    let uri = normalize_path_to_uri(path);
327
328    match uri.as_str() {
329        s if s.starts_with("s3://") => create_s3_store(&uri, storage_options),
330        s if s.starts_with("gs://") || s.starts_with("gcs://") => {
331            create_gcs_store(&uri, storage_options)
332        }
333        s if s.starts_with("az://") => create_azure_store(&uri, storage_options),
334        s if s.starts_with("abfs://") => create_abfs_store(&uri, storage_options),
335        s if s.starts_with("http://") || s.starts_with("https://") => {
336            create_http_store(&uri, storage_options)
337        }
338        s if s.starts_with("file://") => create_local_store(&uri, true),
339        _ => create_local_store(&uri, false), // Fallback: assume local path
340    }
341}
342
343/// Normalizes a path to URI format for consistent object store usage.
344///
345/// If the path is already a URI (contains "://"), returns it as-is.
346/// Otherwise, converts local paths to file:// URIs with proper cross-platform handling.
347///
348/// Supported URI schemes:
349/// - `s3://` for AWS S3
350/// - `gs://` or `gcs://` for Google Cloud Storage
351/// - `az://` or `abfs://` for Azure Blob Storage
352/// - `http://` or `https://` for HTTP/WebDAV
353/// - `file://` for local files
354///
355/// # Cross-platform Path Handling
356///
357/// - Unix absolute paths: `/path/to/file` → `file:///path/to/file`
358/// - Windows drive paths: `C:\path\to\file` → `file:///C:/path/to/file`
359/// - Windows UNC paths: `\\server\share\file` → `file://server/share/file`
360/// - Relative paths: converted to absolute using current directory
361#[must_use]
362pub fn normalize_path_to_uri(path: &str) -> String {
363    if path.contains("://") {
364        // Already a URI - return as-is
365        path.to_string()
366    } else {
367        // Convert local path to file:// URI with cross-platform support
368        if is_absolute_path(path) {
369            path_to_file_uri(path)
370        } else {
371            // Relative path - make it absolute first
372            let absolute_path = std::env::current_dir().unwrap().join(path);
373            path_to_file_uri(&absolute_path.to_string_lossy())
374        }
375    }
376}
377
378/// Checks if a path is absolute on the current platform.
379#[must_use]
380fn is_absolute_path(path: &str) -> bool {
381    if path.starts_with('/') {
382        // Unix absolute path
383        true
384    } else if path.len() >= 3
385        && path.chars().nth(1) == Some(':')
386        && path.chars().nth(2) == Some('\\')
387    {
388        // Windows drive path like C:\
389        true
390    } else if path.len() >= 3
391        && path.chars().nth(1) == Some(':')
392        && path.chars().nth(2) == Some('/')
393    {
394        // Windows drive path with forward slashes like C:/
395        true
396    } else if path.starts_with("\\\\") {
397        // Windows UNC path
398        true
399    } else {
400        false
401    }
402}
403
404/// Converts an absolute path to a file:// URI with proper platform handling.
405#[must_use]
406fn path_to_file_uri(path: &str) -> String {
407    if path.starts_with('/') {
408        // Unix absolute path
409        format!("file://{path}")
410    } else if path.len() >= 3 && path.chars().nth(1) == Some(':') {
411        // Windows drive path - normalize separators and add proper prefix
412        let normalized = path.replace('\\', "/");
413        format!("file:///{normalized}")
414    } else if let Some(without_prefix) = path.strip_prefix("\\\\") {
415        // Windows UNC path \\server\share -> file://server/share
416        let normalized = without_prefix.replace('\\', "/");
417        format!("file://{normalized}")
418    } else {
419        // Fallback - treat as relative to root
420        format!("file://{path}")
421    }
422}
423
424/// Helper function to create local file system object store
425fn create_local_store(
426    uri: &str,
427    is_file_uri: bool,
428) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
429    let path = if is_file_uri {
430        uri.strip_prefix("file://").unwrap_or(uri)
431    } else {
432        uri
433    };
434
435    let local_store = object_store::local::LocalFileSystem::new_with_prefix(path)?;
436    Ok((Arc::new(local_store), String::new(), uri.to_string()))
437}
438
439/// Helper function to create S3 object store with options
440fn create_s3_store(
441    uri: &str,
442    storage_options: Option<std::collections::HashMap<String, String>>,
443) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
444    let (url, path) = parse_url_and_path(uri)?;
445    let bucket = extract_host(&url, "Invalid S3 URI: missing bucket")?;
446
447    let mut builder = object_store::aws::AmazonS3Builder::new().with_bucket_name(&bucket);
448
449    // Apply storage options if provided
450    if let Some(options) = storage_options {
451        for (key, value) in options {
452            match key.as_str() {
453                "endpoint_url" => {
454                    builder = builder.with_endpoint(&value);
455                }
456                "region" => {
457                    builder = builder.with_region(&value);
458                }
459                "access_key_id" | "key" => {
460                    builder = builder.with_access_key_id(&value);
461                }
462                "secret_access_key" | "secret" => {
463                    builder = builder.with_secret_access_key(&value);
464                }
465                "session_token" | "token" => {
466                    builder = builder.with_token(&value);
467                }
468                "allow_http" => {
469                    let allow_http = value.to_lowercase() == "true";
470                    builder = builder.with_allow_http(allow_http);
471                }
472                _ => {
473                    // Ignore unknown options for forward compatibility
474                    log::warn!("Unknown S3 storage option: {key}");
475                }
476            }
477        }
478    }
479
480    let s3_store = builder.build()?;
481    Ok((Arc::new(s3_store), path, uri.to_string()))
482}
483
484/// Helper function to create GCS object store with options
485fn create_gcs_store(
486    uri: &str,
487    storage_options: Option<std::collections::HashMap<String, String>>,
488) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
489    let (url, path) = parse_url_and_path(uri)?;
490    let bucket = extract_host(&url, "Invalid GCS URI: missing bucket")?;
491
492    let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new().with_bucket_name(&bucket);
493
494    // Apply storage options if provided
495    if let Some(options) = storage_options {
496        for (key, value) in options {
497            match key.as_str() {
498                "service_account_path" => {
499                    builder = builder.with_service_account_path(&value);
500                }
501                "service_account_key" => {
502                    builder = builder.with_service_account_key(&value);
503                }
504                "project_id" => {
505                    // Note: GoogleCloudStorageBuilder doesn't have with_project_id method
506                    // This would need to be handled via environment variables or service account
507                    log::warn!(
508                        "project_id should be set via service account or environment variables"
509                    );
510                }
511                "application_credentials" => {
512                    // Set GOOGLE_APPLICATION_CREDENTIALS env var required by Google auth libraries.
513                    // SAFETY: std::env::set_var is marked unsafe because it mutates global state and
514                    // can break signal-safe code. We only call it during configuration before any
515                    // multi-threaded work starts, so it is considered safe in this context.
516                    unsafe {
517                        std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", &value);
518                    }
519                }
520                _ => {
521                    // Ignore unknown options for forward compatibility
522                    log::warn!("Unknown GCS storage option: {key}");
523                }
524            }
525        }
526    }
527
528    let gcs_store = builder.build()?;
529    Ok((Arc::new(gcs_store), path, uri.to_string()))
530}
531
532/// Helper function to create Azure object store with options
533fn create_azure_store(
534    uri: &str,
535    storage_options: Option<std::collections::HashMap<String, String>>,
536) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
537    let (url, _) = parse_url_and_path(uri)?;
538    let container = extract_host(&url, "Invalid Azure URI: missing container")?;
539
540    let path = url.path().trim_start_matches('/').to_string();
541
542    let mut builder =
543        object_store::azure::MicrosoftAzureBuilder::new().with_container_name(container);
544
545    // Apply storage options if provided
546    if let Some(options) = storage_options {
547        for (key, value) in options {
548            match key.as_str() {
549                "account_name" => {
550                    builder = builder.with_account(&value);
551                }
552                "account_key" => {
553                    builder = builder.with_access_key(&value);
554                }
555                "sas_token" => {
556                    // Parse SAS token as query string parameters
557                    let query_pairs: Vec<(String, String)> = value
558                        .split('&')
559                        .filter_map(|pair| {
560                            let mut parts = pair.split('=');
561                            match (parts.next(), parts.next()) {
562                                (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
563                                _ => None,
564                            }
565                        })
566                        .collect();
567                    builder = builder.with_sas_authorization(query_pairs);
568                }
569                "client_id" => {
570                    builder = builder.with_client_id(&value);
571                }
572                "client_secret" => {
573                    builder = builder.with_client_secret(&value);
574                }
575                "tenant_id" => {
576                    builder = builder.with_tenant_id(&value);
577                }
578                _ => {
579                    // Ignore unknown options for forward compatibility
580                    log::warn!("Unknown Azure storage option: {key}");
581                }
582            }
583        }
584    }
585
586    let azure_store = builder.build()?;
587    Ok((Arc::new(azure_store), path, uri.to_string()))
588}
589
590/// Helper function to create Azure object store from abfs:// URI with options.
591fn create_abfs_store(
592    uri: &str,
593    storage_options: Option<std::collections::HashMap<String, String>>,
594) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
595    let (url, path) = parse_url_and_path(uri)?;
596    let host = extract_host(&url, "Invalid ABFS URI: missing host")?;
597
598    // Extract account from host (account.dfs.core.windows.net)
599    let account = host
600        .split('.')
601        .next()
602        .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: cannot extract account from host"))?;
603
604    // Extract container from username part
605    let container = url
606        .username()
607        .split('@')
608        .next()
609        .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: missing container"))?;
610
611    let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
612        .with_account(account)
613        .with_container_name(container);
614
615    // Apply storage options if provided (same as Azure store)
616    if let Some(options) = storage_options {
617        for (key, value) in options {
618            match key.as_str() {
619                "account_name" => {
620                    builder = builder.with_account(&value);
621                }
622                "account_key" => {
623                    builder = builder.with_access_key(&value);
624                }
625                "sas_token" => {
626                    // Parse SAS token as query string parameters
627                    let query_pairs: Vec<(String, String)> = value
628                        .split('&')
629                        .filter_map(|pair| {
630                            let mut parts = pair.split('=');
631                            match (parts.next(), parts.next()) {
632                                (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
633                                _ => None,
634                            }
635                        })
636                        .collect();
637                    builder = builder.with_sas_authorization(query_pairs);
638                }
639                "client_id" => {
640                    builder = builder.with_client_id(&value);
641                }
642                "client_secret" => {
643                    builder = builder.with_client_secret(&value);
644                }
645                "tenant_id" => {
646                    builder = builder.with_tenant_id(&value);
647                }
648                _ => {
649                    // Ignore unknown options for forward compatibility
650                    log::warn!("Unknown ABFS storage option: {key}");
651                }
652            }
653        }
654    }
655
656    let azure_store = builder.build()?;
657    Ok((Arc::new(azure_store), path, uri.to_string()))
658}
659
660/// Helper function to create HTTP object store with options.
661fn create_http_store(
662    uri: &str,
663    storage_options: Option<std::collections::HashMap<String, String>>,
664) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
665    let (url, path) = parse_url_and_path(uri)?;
666    let base_url = format!("{}://{}", url.scheme(), url.host_str().unwrap_or(""));
667
668    let builder = object_store::http::HttpBuilder::new().with_url(base_url);
669
670    // Apply storage options if provided
671    if let Some(options) = storage_options {
672        for (key, _value) in options {
673            // HTTP builder has limited configuration options
674            // Most HTTP-specific options would be handled via client options
675            // Ignore unknown options for forward compatibility
676            log::warn!("Unknown HTTP storage option: {key}");
677        }
678    }
679
680    let http_store = builder.build()?;
681    Ok((Arc::new(http_store), path, uri.to_string()))
682}
683
684/// Helper function to parse URL and extract path component.
685fn parse_url_and_path(uri: &str) -> anyhow::Result<(url::Url, String)> {
686    let url = url::Url::parse(uri)?;
687    let path = url.path().trim_start_matches('/').to_string();
688    Ok((url, path))
689}
690
691/// Helper function to extract host from URL with error handling.
692fn extract_host(url: &url::Url, error_msg: &str) -> anyhow::Result<String> {
693    url.host_str()
694        .map(ToString::to_string)
695        .ok_or_else(|| anyhow::anyhow!("{}", error_msg))
696}
697
698////////////////////////////////////////////////////////////////////////////////
699// Tests
700////////////////////////////////////////////////////////////////////////////////
701
702#[cfg(test)]
703mod tests {
704    use std::collections::HashMap;
705
706    use rstest::rstest;
707
708    use super::*;
709
710    #[rstest]
711    fn test_create_object_store_from_path_local() {
712        // Create a temporary directory for testing
713        let temp_dir = std::env::temp_dir().join("nautilus_test");
714        std::fs::create_dir_all(&temp_dir).unwrap();
715
716        let result = create_object_store_from_path(temp_dir.to_str().unwrap(), None);
717        if let Err(e) = &result {
718            println!("Error: {e:?}");
719        }
720        assert!(result.is_ok());
721        let (_, base_path, uri) = result.unwrap();
722        assert_eq!(base_path, "");
723        // The URI should be normalized to file:// format
724        assert_eq!(uri, format!("file://{}", temp_dir.to_str().unwrap()));
725
726        // Clean up
727        std::fs::remove_dir_all(&temp_dir).ok();
728    }
729
730    #[rstest]
731    fn test_create_object_store_from_path_s3() {
732        let mut options = HashMap::new();
733        options.insert(
734            "endpoint_url".to_string(),
735            "https://test.endpoint.com".to_string(),
736        );
737        options.insert("region".to_string(), "us-west-2".to_string());
738        options.insert("access_key_id".to_string(), "test_key".to_string());
739        options.insert("secret_access_key".to_string(), "test_secret".to_string());
740
741        let result = create_object_store_from_path("s3://test-bucket/path", Some(options));
742        assert!(result.is_ok());
743        let (_, base_path, uri) = result.unwrap();
744        assert_eq!(base_path, "path");
745        assert_eq!(uri, "s3://test-bucket/path");
746    }
747
748    #[rstest]
749    fn test_create_object_store_from_path_azure() {
750        let mut options = HashMap::new();
751        options.insert("account_name".to_string(), "testaccount".to_string());
752        // Use a valid base64 encoded key for testing
753        options.insert("account_key".to_string(), "dGVzdGtleQ==".to_string()); // "testkey" in base64
754
755        let result = create_object_store_from_path("az://container/path", Some(options));
756        if let Err(e) = &result {
757            println!("Azure Error: {e:?}");
758        }
759        assert!(result.is_ok());
760        let (_, base_path, uri) = result.unwrap();
761        assert_eq!(base_path, "path");
762        assert_eq!(uri, "az://container/path");
763    }
764
765    #[rstest]
766    fn test_create_object_store_from_path_gcs() {
767        // Test GCS without service account (will use default credentials or fail gracefully)
768        let mut options = HashMap::new();
769        options.insert("project_id".to_string(), "test-project".to_string());
770
771        let result = create_object_store_from_path("gs://test-bucket/path", Some(options));
772        // GCS might fail due to missing credentials, but we're testing the path parsing
773        // The function should at least parse the URI correctly before failing on auth
774        match result {
775            Ok((_, base_path, uri)) => {
776                assert_eq!(base_path, "path");
777                assert_eq!(uri, "gs://test-bucket/path");
778            }
779            Err(e) => {
780                // Expected to fail due to missing credentials, but should contain bucket info
781                let error_msg = format!("{e:?}");
782                assert!(error_msg.contains("test-bucket") || error_msg.contains("credential"));
783            }
784        }
785    }
786
787    #[rstest]
788    fn test_create_object_store_from_path_empty_options() {
789        let result = create_object_store_from_path("s3://test-bucket/path", None);
790        assert!(result.is_ok());
791        let (_, base_path, uri) = result.unwrap();
792        assert_eq!(base_path, "path");
793        assert_eq!(uri, "s3://test-bucket/path");
794    }
795
796    #[rstest]
797    fn test_parse_url_and_path() {
798        let result = parse_url_and_path("s3://bucket/path/to/file");
799        assert!(result.is_ok());
800        let (url, path) = result.unwrap();
801        assert_eq!(url.scheme(), "s3");
802        assert_eq!(url.host_str().unwrap(), "bucket");
803        assert_eq!(path, "path/to/file");
804    }
805
806    #[rstest]
807    fn test_extract_host() {
808        let url = url::Url::parse("s3://test-bucket/path").unwrap();
809        let result = extract_host(&url, "Test error");
810        assert!(result.is_ok());
811        assert_eq!(result.unwrap(), "test-bucket");
812    }
813
814    #[rstest]
815    fn test_normalize_path_to_uri() {
816        // Unix absolute paths
817        assert_eq!(normalize_path_to_uri("/tmp/test"), "file:///tmp/test");
818
819        // Windows drive paths
820        assert_eq!(
821            normalize_path_to_uri("C:\\tmp\\test"),
822            "file:///C:/tmp/test"
823        );
824        assert_eq!(normalize_path_to_uri("C:/tmp/test"), "file:///C:/tmp/test");
825        assert_eq!(
826            normalize_path_to_uri("D:\\data\\file.txt"),
827            "file:///D:/data/file.txt"
828        );
829
830        // Windows UNC paths
831        assert_eq!(
832            normalize_path_to_uri("\\\\server\\share\\file"),
833            "file://server/share/file"
834        );
835
836        // Already URIs - should remain unchanged
837        assert_eq!(
838            normalize_path_to_uri("s3://bucket/path"),
839            "s3://bucket/path"
840        );
841        assert_eq!(
842            normalize_path_to_uri("file:///tmp/test"),
843            "file:///tmp/test"
844        );
845        assert_eq!(
846            normalize_path_to_uri("https://example.com/path"),
847            "https://example.com/path"
848        );
849    }
850
851    #[rstest]
852    fn test_is_absolute_path() {
853        // Unix absolute paths
854        assert!(is_absolute_path("/tmp/test"));
855        assert!(is_absolute_path("/"));
856
857        // Windows drive paths
858        assert!(is_absolute_path("C:\\tmp\\test"));
859        assert!(is_absolute_path("C:/tmp/test"));
860        assert!(is_absolute_path("D:\\"));
861        assert!(is_absolute_path("Z:/"));
862
863        // Windows UNC paths
864        assert!(is_absolute_path("\\\\server\\share"));
865        assert!(is_absolute_path("\\\\localhost\\c$"));
866
867        // Relative paths
868        assert!(!is_absolute_path("tmp/test"));
869        assert!(!is_absolute_path("./test"));
870        assert!(!is_absolute_path("../test"));
871        assert!(!is_absolute_path("test.txt"));
872
873        // Edge cases
874        assert!(!is_absolute_path(""));
875        assert!(!is_absolute_path("C"));
876        assert!(!is_absolute_path("C:"));
877        assert!(!is_absolute_path("\\"));
878    }
879
880    #[rstest]
881    fn test_path_to_file_uri() {
882        // Unix absolute paths
883        assert_eq!(path_to_file_uri("/tmp/test"), "file:///tmp/test");
884        assert_eq!(path_to_file_uri("/"), "file:///");
885
886        // Windows drive paths
887        assert_eq!(path_to_file_uri("C:\\tmp\\test"), "file:///C:/tmp/test");
888        assert_eq!(path_to_file_uri("C:/tmp/test"), "file:///C:/tmp/test");
889        assert_eq!(path_to_file_uri("D:\\"), "file:///D:/");
890
891        // Windows UNC paths
892        assert_eq!(
893            path_to_file_uri("\\\\server\\share\\file"),
894            "file://server/share/file"
895        );
896        assert_eq!(
897            path_to_file_uri("\\\\localhost\\c$\\test"),
898            "file://localhost/c$/test"
899        );
900    }
901}