nautilus_persistence/
parquet.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use object_store::{ObjectStore, path::Path as ObjectPath};
20use parquet::{
21    arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
22    file::{
23        properties::WriterProperties,
24        reader::{FileReader, SerializedFileReader},
25        statistics::Statistics,
26    },
27};
28
29/// Writes a `RecordBatch` to a Parquet file using object store, with optional compression.
30///
31/// # Errors
32///
33/// Returns an error if writing to Parquet fails or any I/O operation fails.
34pub async fn write_batch_to_parquet(
35    batch: RecordBatch,
36    path: &str,
37    storage_options: Option<std::collections::HashMap<String, String>>,
38    compression: Option<parquet::basic::Compression>,
39    max_row_group_size: Option<usize>,
40) -> anyhow::Result<()> {
41    write_batches_to_parquet(
42        &[batch],
43        path,
44        storage_options,
45        compression,
46        max_row_group_size,
47    )
48    .await
49}
50
51/// Writes multiple `RecordBatch` items to a Parquet file using object store, with optional compression, row group sizing, and storage options.
52///
53/// # Errors
54///
55/// Returns an error if writing to Parquet fails or any I/O operation fails.
56pub async fn write_batches_to_parquet(
57    batches: &[RecordBatch],
58    path: &str,
59    storage_options: Option<std::collections::HashMap<String, String>>,
60    compression: Option<parquet::basic::Compression>,
61    max_row_group_size: Option<usize>,
62) -> anyhow::Result<()> {
63    let (object_store, base_path, _) = create_object_store_from_path(path, storage_options)?;
64    let object_path = if base_path.is_empty() {
65        ObjectPath::from(path)
66    } else {
67        ObjectPath::from(format!("{base_path}/{path}"))
68    };
69
70    write_batches_to_object_store(
71        batches,
72        object_store,
73        &object_path,
74        compression,
75        max_row_group_size,
76    )
77    .await
78}
79
80/// Writes multiple `RecordBatch` items to an object store URI, with optional compression and row group sizing.
81///
82/// # Errors
83///
84/// Returns an error if writing to Parquet fails or any I/O operation fails.
85pub async fn write_batches_to_object_store(
86    batches: &[RecordBatch],
87    object_store: Arc<dyn ObjectStore>,
88    path: &ObjectPath,
89    compression: Option<parquet::basic::Compression>,
90    max_row_group_size: Option<usize>,
91) -> anyhow::Result<()> {
92    // Create a temporary buffer to write the parquet data
93    let mut buffer = Vec::new();
94
95    let writer_props = WriterProperties::builder()
96        .set_compression(compression.unwrap_or(parquet::basic::Compression::SNAPPY))
97        .set_max_row_group_size(max_row_group_size.unwrap_or(5000))
98        .build();
99
100    let mut writer = ArrowWriter::try_new(&mut buffer, batches[0].schema(), Some(writer_props))?;
101    for batch in batches {
102        writer.write(batch)?;
103    }
104    writer.close()?;
105
106    // Upload the buffer to object store
107    object_store.put(path, buffer.into()).await?;
108
109    Ok(())
110}
111
112/// Combines multiple Parquet files using object store with storage options
113///
114/// # Errors
115///
116/// Returns an error if file reading or writing fails.
117pub async fn combine_parquet_files(
118    file_paths: Vec<&str>,
119    new_file_path: &str,
120    storage_options: Option<std::collections::HashMap<String, String>>,
121    compression: Option<parquet::basic::Compression>,
122    max_row_group_size: Option<usize>,
123) -> anyhow::Result<()> {
124    if file_paths.len() <= 1 {
125        return Ok(());
126    }
127
128    // Create object store from the first file path (assuming all files are in the same store)
129    let (object_store, base_path, _) =
130        create_object_store_from_path(file_paths[0], storage_options)?;
131
132    // Convert string paths to ObjectPath
133    let object_paths: Vec<ObjectPath> = file_paths
134        .iter()
135        .map(|path| {
136            if base_path.is_empty() {
137                ObjectPath::from(*path)
138            } else {
139                ObjectPath::from(format!("{base_path}/{path}"))
140            }
141        })
142        .collect();
143
144    let new_object_path = if base_path.is_empty() {
145        ObjectPath::from(new_file_path)
146    } else {
147        ObjectPath::from(format!("{base_path}/{new_file_path}"))
148    };
149
150    combine_parquet_files_from_object_store(
151        object_store,
152        object_paths,
153        &new_object_path,
154        compression,
155        max_row_group_size,
156    )
157    .await
158}
159
160/// Combines multiple Parquet files from object store
161///
162/// # Errors
163///
164/// Returns an error if file reading or writing fails.
165pub async fn combine_parquet_files_from_object_store(
166    object_store: Arc<dyn ObjectStore>,
167    file_paths: Vec<ObjectPath>,
168    new_file_path: &ObjectPath,
169    compression: Option<parquet::basic::Compression>,
170    max_row_group_size: Option<usize>,
171) -> anyhow::Result<()> {
172    if file_paths.len() <= 1 {
173        return Ok(());
174    }
175
176    let mut all_batches: Vec<RecordBatch> = Vec::new();
177
178    // Read all files from object store
179    for path in &file_paths {
180        let data = object_store.get(path).await?.bytes().await?;
181        let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
182        let mut reader = builder.build()?;
183
184        for batch in reader.by_ref() {
185            all_batches.push(batch?);
186        }
187    }
188
189    // Write combined batches to new location
190    write_batches_to_object_store(
191        &all_batches,
192        object_store.clone(),
193        new_file_path,
194        compression,
195        max_row_group_size,
196    )
197    .await?;
198
199    // Remove the merged files
200    for path in &file_paths {
201        if path != new_file_path {
202            object_store.delete(path).await?;
203        }
204    }
205
206    Ok(())
207}
208
209/// Extracts the minimum and maximum i64 values for the specified `column_name` from a Parquet file's metadata using object store with storage options.
210///
211/// # Errors
212///
213/// Returns an error if the file cannot be read, metadata parsing fails, or the column is missing or has no statistics.
214///
215/// # Panics
216///
217/// Panics if the Parquet metadata's min/max unwrap operations fail unexpectedly.
218pub async fn min_max_from_parquet_metadata(
219    file_path: &str,
220    storage_options: Option<std::collections::HashMap<String, String>>,
221    column_name: &str,
222) -> anyhow::Result<(u64, u64)> {
223    let (object_store, base_path, _) = create_object_store_from_path(file_path, storage_options)?;
224    let object_path = if base_path.is_empty() {
225        ObjectPath::from(file_path)
226    } else {
227        ObjectPath::from(format!("{base_path}/{file_path}"))
228    };
229
230    min_max_from_parquet_metadata_object_store(object_store, &object_path, column_name).await
231}
232
233/// Extracts the minimum and maximum i64 values for the specified `column_name` from a Parquet file's metadata in object store.
234///
235/// # Errors
236///
237/// Returns an error if the file cannot be read, metadata parsing fails, or the column is missing or has no statistics.
238///
239/// # Panics
240///
241/// Panics if the Parquet metadata's min/max unwrap operations fail unexpectedly.
242pub async fn min_max_from_parquet_metadata_object_store(
243    object_store: Arc<dyn ObjectStore>,
244    file_path: &ObjectPath,
245    column_name: &str,
246) -> anyhow::Result<(u64, u64)> {
247    // Download the parquet file from object store
248    let data = object_store.get(file_path).await?.bytes().await?;
249    let reader = SerializedFileReader::new(data)?;
250
251    let metadata = reader.metadata();
252    let mut overall_min_value: Option<i64> = None;
253    let mut overall_max_value: Option<i64> = None;
254
255    // Iterate through all row groups
256    for i in 0..metadata.num_row_groups() {
257        let row_group = metadata.row_group(i);
258
259        // Iterate through all columns in this row group
260        for j in 0..row_group.num_columns() {
261            let col_metadata = row_group.column(j);
262
263            if col_metadata.column_path().string() == column_name {
264                if let Some(stats) = col_metadata.statistics() {
265                    // Check if we have Int64 statistics
266                    if let Statistics::Int64(int64_stats) = stats {
267                        // Extract min value if available
268                        if let Some(&min_value) = int64_stats.min_opt()
269                            && (overall_min_value.is_none()
270                                || min_value < overall_min_value.unwrap())
271                        {
272                            overall_min_value = Some(min_value);
273                        }
274
275                        // Extract max value if available
276                        if let Some(&max_value) = int64_stats.max_opt()
277                            && (overall_max_value.is_none()
278                                || max_value > overall_max_value.unwrap())
279                        {
280                            overall_max_value = Some(max_value);
281                        }
282                    } else {
283                        anyhow::bail!("Warning: Column name '{column_name}' is not of type i64.");
284                    }
285                } else {
286                    anyhow::bail!(
287                        "Warning: Statistics not available for column '{column_name}' in row group {i}."
288                    );
289                }
290            }
291        }
292    }
293
294    // Return the min/max pair if both are available
295    if let (Some(min), Some(max)) = (overall_min_value, overall_max_value) {
296        Ok((min as u64, max as u64))
297    } else {
298        anyhow::bail!(
299            "Column '{column_name}' not found or has no Int64 statistics in any row group."
300        )
301    }
302}
303
304/// Creates an object store from a URI string with optional storage options.
305///
306/// Supports multiple cloud storage providers:
307/// - AWS S3: `s3://bucket/path`
308/// - Google Cloud Storage: `gs://bucket/path` or `gcs://bucket/path`
309/// - Azure Blob Storage: `azure://account/container/path` or `abfs://container@account.dfs.core.windows.net/path`
310/// - HTTP/WebDAV: `http://` or `https://`
311/// - Local files: `file://path` or plain paths
312///
313/// # Parameters
314///
315/// - `path`: The URI string for the storage location.
316/// - `storage_options`: Optional `HashMap` containing storage-specific configuration options:
317///   - For S3: `endpoint_url`, region, `access_key_id`, `secret_access_key`, `session_token`, etc.
318///   - For GCS: `service_account_path`, `service_account_key`, `project_id`, etc.
319///   - For Azure: `account_name`, `account_key`, `sas_token`, etc.
320///
321/// Returns a tuple of (`ObjectStore`, `base_path`, `normalized_uri`)
322pub fn create_object_store_from_path(
323    path: &str,
324    storage_options: Option<std::collections::HashMap<String, String>>,
325) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
326    let uri = normalize_path_to_uri(path);
327
328    match uri.as_str() {
329        s if s.starts_with("s3://") => create_s3_store(&uri, storage_options),
330        s if s.starts_with("gs://") || s.starts_with("gcs://") => {
331            create_gcs_store(&uri, storage_options)
332        }
333        s if s.starts_with("azure://") => create_azure_store(&uri, storage_options),
334        s if s.starts_with("abfs://") => create_abfs_store(&uri, storage_options),
335        s if s.starts_with("http://") || s.starts_with("https://") => {
336            create_http_store(&uri, storage_options)
337        }
338        s if s.starts_with("file://") => create_local_store(&uri, true),
339        _ => create_local_store(&uri, false), // Fallback: assume local path
340    }
341}
342
343/// Normalizes a path to URI format for consistent object store usage.
344///
345/// If the path is already a URI (contains "://"), returns it as-is.
346/// Otherwise, converts local paths to file:// URIs with proper cross-platform handling.
347///
348/// Supported URI schemes:
349/// - `s3://` for AWS S3
350/// - `gs://` or `gcs://` for Google Cloud Storage
351/// - `azure://` or `abfs://` for Azure Blob Storage
352/// - `http://` or `https://` for HTTP/WebDAV
353/// - `file://` for local files
354///
355/// # Cross-platform Path Handling
356///
357/// - Unix absolute paths: `/path/to/file` → `file:///path/to/file`
358/// - Windows drive paths: `C:\path\to\file` → `file:///C:/path/to/file`
359/// - Windows UNC paths: `\\server\share\file` → `file://server/share/file`
360/// - Relative paths: converted to absolute using current directory
361#[must_use]
362pub fn normalize_path_to_uri(path: &str) -> String {
363    if path.contains("://") {
364        // Already a URI - return as-is
365        path.to_string()
366    } else {
367        // Convert local path to file:// URI with cross-platform support
368        if is_absolute_path(path) {
369            path_to_file_uri(path)
370        } else {
371            // Relative path - make it absolute first
372            let absolute_path = std::env::current_dir().unwrap().join(path);
373            path_to_file_uri(&absolute_path.to_string_lossy())
374        }
375    }
376}
377
378/// Checks if a path is absolute on the current platform.
379#[must_use]
380fn is_absolute_path(path: &str) -> bool {
381    if path.starts_with('/') {
382        // Unix absolute path
383        true
384    } else if path.len() >= 3
385        && path.chars().nth(1) == Some(':')
386        && path.chars().nth(2) == Some('\\')
387    {
388        // Windows drive path like C:\
389        true
390    } else if path.len() >= 3
391        && path.chars().nth(1) == Some(':')
392        && path.chars().nth(2) == Some('/')
393    {
394        // Windows drive path with forward slashes like C:/
395        true
396    } else if path.starts_with("\\\\") {
397        // Windows UNC path
398        true
399    } else {
400        false
401    }
402}
403
404/// Converts an absolute path to a file:// URI with proper platform handling.
405#[must_use]
406fn path_to_file_uri(path: &str) -> String {
407    if path.starts_with('/') {
408        // Unix absolute path
409        format!("file://{path}")
410    } else if path.len() >= 3 && path.chars().nth(1) == Some(':') {
411        // Windows drive path - normalize separators and add proper prefix
412        let normalized = path.replace('\\', "/");
413        format!("file:///{normalized}")
414    } else if let Some(without_prefix) = path.strip_prefix("\\\\") {
415        // Windows UNC path \\server\share -> file://server/share
416        let normalized = without_prefix.replace('\\', "/");
417        format!("file://{normalized}")
418    } else {
419        // Fallback - treat as relative to root
420        format!("file://{path}")
421    }
422}
423
424/// Helper function to create local file system object store
425fn create_local_store(
426    uri: &str,
427    is_file_uri: bool,
428) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
429    let path = if is_file_uri {
430        uri.strip_prefix("file://").unwrap_or(uri)
431    } else {
432        uri
433    };
434
435    let local_store = object_store::local::LocalFileSystem::new_with_prefix(path)?;
436    Ok((Arc::new(local_store), String::new(), uri.to_string()))
437}
438
439/// Helper function to create S3 object store with options
440fn create_s3_store(
441    uri: &str,
442    storage_options: Option<std::collections::HashMap<String, String>>,
443) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
444    let (url, path) = parse_url_and_path(uri)?;
445    let bucket = extract_host(&url, "Invalid S3 URI: missing bucket")?;
446
447    let mut builder = object_store::aws::AmazonS3Builder::new().with_bucket_name(&bucket);
448
449    // Apply storage options if provided
450    if let Some(options) = storage_options {
451        for (key, value) in options {
452            match key.as_str() {
453                "endpoint_url" => {
454                    builder = builder.with_endpoint(&value);
455                }
456                "region" => {
457                    builder = builder.with_region(&value);
458                }
459                "access_key_id" | "key" => {
460                    builder = builder.with_access_key_id(&value);
461                }
462                "secret_access_key" | "secret" => {
463                    builder = builder.with_secret_access_key(&value);
464                }
465                "session_token" | "token" => {
466                    builder = builder.with_token(&value);
467                }
468                "allow_http" => {
469                    let allow_http = value.to_lowercase() == "true";
470                    builder = builder.with_allow_http(allow_http);
471                }
472                _ => {
473                    // Ignore unknown options for forward compatibility
474                    log::warn!("Unknown S3 storage option: {key}");
475                }
476            }
477        }
478    }
479
480    let s3_store = builder.build()?;
481    Ok((Arc::new(s3_store), path, uri.to_string()))
482}
483
484/// Helper function to create GCS object store with options
485fn create_gcs_store(
486    uri: &str,
487    storage_options: Option<std::collections::HashMap<String, String>>,
488) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
489    let (url, path) = parse_url_and_path(uri)?;
490    let bucket = extract_host(&url, "Invalid GCS URI: missing bucket")?;
491
492    let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new().with_bucket_name(&bucket);
493
494    // Apply storage options if provided
495    if let Some(options) = storage_options {
496        for (key, value) in options {
497            match key.as_str() {
498                "service_account_path" => {
499                    builder = builder.with_service_account_path(&value);
500                }
501                "service_account_key" => {
502                    builder = builder.with_service_account_key(&value);
503                }
504                "project_id" => {
505                    // Note: GoogleCloudStorageBuilder doesn't have with_project_id method
506                    // This would need to be handled via environment variables or service account
507                    log::warn!(
508                        "project_id should be set via service account or environment variables"
509                    );
510                }
511                "application_credentials" => {
512                    // Set GOOGLE_APPLICATION_CREDENTIALS env var required by Google auth libraries.
513                    // SAFETY: std::env::set_var is marked unsafe because it mutates global state and
514                    // can break signal-safe code. We only call it during configuration before any
515                    // multi-threaded work starts, so it is considered safe in this context.
516                    unsafe {
517                        std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", &value);
518                    }
519                }
520                _ => {
521                    // Ignore unknown options for forward compatibility
522                    log::warn!("Unknown GCS storage option: {key}");
523                }
524            }
525        }
526    }
527
528    let gcs_store = builder.build()?;
529    Ok((Arc::new(gcs_store), path, uri.to_string()))
530}
531
532/// Helper function to create Azure object store with options
533fn create_azure_store(
534    uri: &str,
535    storage_options: Option<std::collections::HashMap<String, String>>,
536) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
537    let (url, _) = parse_url_and_path(uri)?;
538    let account = extract_host(&url, "Invalid Azure URI: missing account")?;
539
540    let path_segments: Vec<&str> = url.path().trim_start_matches('/').split('/').collect();
541    if path_segments.is_empty() || path_segments[0].is_empty() {
542        anyhow::bail!("Invalid Azure URI: missing container");
543    }
544
545    let container = path_segments[0];
546    let path = if path_segments.len() > 1 {
547        path_segments[1..].join("/")
548    } else {
549        String::new()
550    };
551
552    let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
553        .with_account(&account)
554        .with_container_name(container);
555
556    // Apply storage options if provided
557    if let Some(options) = storage_options {
558        for (key, value) in options {
559            match key.as_str() {
560                "account_name" => {
561                    builder = builder.with_account(&value);
562                }
563                "account_key" => {
564                    builder = builder.with_access_key(&value);
565                }
566                "sas_token" => {
567                    // Parse SAS token as query string parameters
568                    let query_pairs: Vec<(String, String)> = value
569                        .split('&')
570                        .filter_map(|pair| {
571                            let mut parts = pair.split('=');
572                            match (parts.next(), parts.next()) {
573                                (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
574                                _ => None,
575                            }
576                        })
577                        .collect();
578                    builder = builder.with_sas_authorization(query_pairs);
579                }
580                "client_id" => {
581                    builder = builder.with_client_id(&value);
582                }
583                "client_secret" => {
584                    builder = builder.with_client_secret(&value);
585                }
586                "tenant_id" => {
587                    builder = builder.with_tenant_id(&value);
588                }
589                _ => {
590                    // Ignore unknown options for forward compatibility
591                    log::warn!("Unknown Azure storage option: {key}");
592                }
593            }
594        }
595    }
596
597    let azure_store = builder.build()?;
598    Ok((Arc::new(azure_store), path, uri.to_string()))
599}
600
601/// Helper function to create Azure object store from abfs:// URI with options.
602fn create_abfs_store(
603    uri: &str,
604    storage_options: Option<std::collections::HashMap<String, String>>,
605) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
606    let (url, path) = parse_url_and_path(uri)?;
607    let host = extract_host(&url, "Invalid ABFS URI: missing host")?;
608
609    // Extract account from host (account.dfs.core.windows.net)
610    let account = host
611        .split('.')
612        .next()
613        .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: cannot extract account from host"))?;
614
615    // Extract container from username part
616    let container = url
617        .username()
618        .split('@')
619        .next()
620        .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: missing container"))?;
621
622    let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
623        .with_account(account)
624        .with_container_name(container);
625
626    // Apply storage options if provided (same as Azure store)
627    if let Some(options) = storage_options {
628        for (key, value) in options {
629            match key.as_str() {
630                "account_name" => {
631                    builder = builder.with_account(&value);
632                }
633                "account_key" => {
634                    builder = builder.with_access_key(&value);
635                }
636                "sas_token" => {
637                    // Parse SAS token as query string parameters
638                    let query_pairs: Vec<(String, String)> = value
639                        .split('&')
640                        .filter_map(|pair| {
641                            let mut parts = pair.split('=');
642                            match (parts.next(), parts.next()) {
643                                (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
644                                _ => None,
645                            }
646                        })
647                        .collect();
648                    builder = builder.with_sas_authorization(query_pairs);
649                }
650                "client_id" => {
651                    builder = builder.with_client_id(&value);
652                }
653                "client_secret" => {
654                    builder = builder.with_client_secret(&value);
655                }
656                "tenant_id" => {
657                    builder = builder.with_tenant_id(&value);
658                }
659                _ => {
660                    // Ignore unknown options for forward compatibility
661                    log::warn!("Unknown ABFS storage option: {key}");
662                }
663            }
664        }
665    }
666
667    let azure_store = builder.build()?;
668    Ok((Arc::new(azure_store), path, uri.to_string()))
669}
670
671/// Helper function to create HTTP object store with options.
672fn create_http_store(
673    uri: &str,
674    storage_options: Option<std::collections::HashMap<String, String>>,
675) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
676    let (url, path) = parse_url_and_path(uri)?;
677    let base_url = format!("{}://{}", url.scheme(), url.host_str().unwrap_or(""));
678
679    let builder = object_store::http::HttpBuilder::new().with_url(base_url);
680
681    // Apply storage options if provided
682    if let Some(options) = storage_options {
683        for (key, _value) in options {
684            // HTTP builder has limited configuration options
685            // Most HTTP-specific options would be handled via client options
686            // Ignore unknown options for forward compatibility
687            log::warn!("Unknown HTTP storage option: {key}");
688        }
689    }
690
691    let http_store = builder.build()?;
692    Ok((Arc::new(http_store), path, uri.to_string()))
693}
694
695/// Helper function to parse URL and extract path component.
696fn parse_url_and_path(uri: &str) -> anyhow::Result<(url::Url, String)> {
697    let url = url::Url::parse(uri)?;
698    let path = url.path().trim_start_matches('/').to_string();
699    Ok((url, path))
700}
701
702/// Helper function to extract host from URL with error handling.
703fn extract_host(url: &url::Url, error_msg: &str) -> anyhow::Result<String> {
704    url.host_str()
705        .map(ToString::to_string)
706        .ok_or_else(|| anyhow::anyhow!("{}", error_msg))
707}
708
709////////////////////////////////////////////////////////////////////////////////
710// Tests
711////////////////////////////////////////////////////////////////////////////////
712
713#[cfg(test)]
714mod tests {
715    use std::collections::HashMap;
716
717    use rstest::rstest;
718
719    use super::*;
720
721    #[rstest]
722    fn test_create_object_store_from_path_local() {
723        // Create a temporary directory for testing
724        let temp_dir = std::env::temp_dir().join("nautilus_test");
725        std::fs::create_dir_all(&temp_dir).unwrap();
726
727        let result = create_object_store_from_path(temp_dir.to_str().unwrap(), None);
728        if let Err(e) = &result {
729            println!("Error: {e:?}");
730        }
731        assert!(result.is_ok());
732        let (_, base_path, uri) = result.unwrap();
733        assert_eq!(base_path, "");
734        // The URI should be normalized to file:// format
735        assert_eq!(uri, format!("file://{}", temp_dir.to_str().unwrap()));
736
737        // Clean up
738        std::fs::remove_dir_all(&temp_dir).ok();
739    }
740
741    #[rstest]
742    fn test_create_object_store_from_path_s3() {
743        let mut options = HashMap::new();
744        options.insert(
745            "endpoint_url".to_string(),
746            "https://test.endpoint.com".to_string(),
747        );
748        options.insert("region".to_string(), "us-west-2".to_string());
749        options.insert("access_key_id".to_string(), "test_key".to_string());
750        options.insert("secret_access_key".to_string(), "test_secret".to_string());
751
752        let result = create_object_store_from_path("s3://test-bucket/path", Some(options));
753        assert!(result.is_ok());
754        let (_, base_path, uri) = result.unwrap();
755        assert_eq!(base_path, "path");
756        assert_eq!(uri, "s3://test-bucket/path");
757    }
758
759    #[rstest]
760    fn test_create_object_store_from_path_azure() {
761        let mut options = HashMap::new();
762        options.insert("account_name".to_string(), "testaccount".to_string());
763        // Use a valid base64 encoded key for testing
764        options.insert("account_key".to_string(), "dGVzdGtleQ==".to_string()); // "testkey" in base64
765
766        let result =
767            create_object_store_from_path("azure://testaccount/container/path", Some(options));
768        if let Err(e) = &result {
769            println!("Azure Error: {e:?}");
770        }
771        assert!(result.is_ok());
772        let (_, base_path, uri) = result.unwrap();
773        assert_eq!(base_path, "path");
774        assert_eq!(uri, "azure://testaccount/container/path");
775    }
776
777    #[rstest]
778    fn test_create_object_store_from_path_gcs() {
779        // Test GCS without service account (will use default credentials or fail gracefully)
780        let mut options = HashMap::new();
781        options.insert("project_id".to_string(), "test-project".to_string());
782
783        let result = create_object_store_from_path("gs://test-bucket/path", Some(options));
784        // GCS might fail due to missing credentials, but we're testing the path parsing
785        // The function should at least parse the URI correctly before failing on auth
786        match result {
787            Ok((_, base_path, uri)) => {
788                assert_eq!(base_path, "path");
789                assert_eq!(uri, "gs://test-bucket/path");
790            }
791            Err(e) => {
792                // Expected to fail due to missing credentials, but should contain bucket info
793                let error_msg = format!("{e:?}");
794                assert!(error_msg.contains("test-bucket") || error_msg.contains("credential"));
795            }
796        }
797    }
798
799    #[rstest]
800    fn test_create_object_store_from_path_empty_options() {
801        let result = create_object_store_from_path("s3://test-bucket/path", None);
802        assert!(result.is_ok());
803        let (_, base_path, uri) = result.unwrap();
804        assert_eq!(base_path, "path");
805        assert_eq!(uri, "s3://test-bucket/path");
806    }
807
808    #[rstest]
809    fn test_parse_url_and_path() {
810        let result = parse_url_and_path("s3://bucket/path/to/file");
811        assert!(result.is_ok());
812        let (url, path) = result.unwrap();
813        assert_eq!(url.scheme(), "s3");
814        assert_eq!(url.host_str().unwrap(), "bucket");
815        assert_eq!(path, "path/to/file");
816    }
817
818    #[rstest]
819    fn test_extract_host() {
820        let url = url::Url::parse("s3://test-bucket/path").unwrap();
821        let result = extract_host(&url, "Test error");
822        assert!(result.is_ok());
823        assert_eq!(result.unwrap(), "test-bucket");
824    }
825
826    #[rstest]
827    fn test_normalize_path_to_uri() {
828        // Unix absolute paths
829        assert_eq!(normalize_path_to_uri("/tmp/test"), "file:///tmp/test");
830
831        // Windows drive paths
832        assert_eq!(
833            normalize_path_to_uri("C:\\tmp\\test"),
834            "file:///C:/tmp/test"
835        );
836        assert_eq!(normalize_path_to_uri("C:/tmp/test"), "file:///C:/tmp/test");
837        assert_eq!(
838            normalize_path_to_uri("D:\\data\\file.txt"),
839            "file:///D:/data/file.txt"
840        );
841
842        // Windows UNC paths
843        assert_eq!(
844            normalize_path_to_uri("\\\\server\\share\\file"),
845            "file://server/share/file"
846        );
847
848        // Already URIs - should remain unchanged
849        assert_eq!(
850            normalize_path_to_uri("s3://bucket/path"),
851            "s3://bucket/path"
852        );
853        assert_eq!(
854            normalize_path_to_uri("file:///tmp/test"),
855            "file:///tmp/test"
856        );
857        assert_eq!(
858            normalize_path_to_uri("https://example.com/path"),
859            "https://example.com/path"
860        );
861    }
862
863    #[rstest]
864    fn test_is_absolute_path() {
865        // Unix absolute paths
866        assert!(is_absolute_path("/tmp/test"));
867        assert!(is_absolute_path("/"));
868
869        // Windows drive paths
870        assert!(is_absolute_path("C:\\tmp\\test"));
871        assert!(is_absolute_path("C:/tmp/test"));
872        assert!(is_absolute_path("D:\\"));
873        assert!(is_absolute_path("Z:/"));
874
875        // Windows UNC paths
876        assert!(is_absolute_path("\\\\server\\share"));
877        assert!(is_absolute_path("\\\\localhost\\c$"));
878
879        // Relative paths
880        assert!(!is_absolute_path("tmp/test"));
881        assert!(!is_absolute_path("./test"));
882        assert!(!is_absolute_path("../test"));
883        assert!(!is_absolute_path("test.txt"));
884
885        // Edge cases
886        assert!(!is_absolute_path(""));
887        assert!(!is_absolute_path("C"));
888        assert!(!is_absolute_path("C:"));
889        assert!(!is_absolute_path("\\"));
890    }
891
892    #[rstest]
893    fn test_path_to_file_uri() {
894        // Unix absolute paths
895        assert_eq!(path_to_file_uri("/tmp/test"), "file:///tmp/test");
896        assert_eq!(path_to_file_uri("/"), "file:///");
897
898        // Windows drive paths
899        assert_eq!(path_to_file_uri("C:\\tmp\\test"), "file:///C:/tmp/test");
900        assert_eq!(path_to_file_uri("C:/tmp/test"), "file:///C:/tmp/test");
901        assert_eq!(path_to_file_uri("D:\\"), "file:///D:/");
902
903        // Windows UNC paths
904        assert_eq!(
905            path_to_file_uri("\\\\server\\share\\file"),
906            "file://server/share/file"
907        );
908        assert_eq!(
909            path_to_file_uri("\\\\localhost\\c$\\test"),
910            "file://localhost/c$/test"
911        );
912    }
913}