nautilus_persistence/
parquet.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use object_store::{ObjectStore, path::Path as ObjectPath};
20use parquet::{
21    arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
22    file::{
23        properties::WriterProperties,
24        reader::{FileReader, SerializedFileReader},
25        statistics::Statistics,
26    },
27};
28
29/// Writes a `RecordBatch` to a Parquet file using object store, with optional compression.
30///
31/// # Errors
32///
33/// Returns an error if writing to Parquet fails or any I/O operation fails.
34pub async fn write_batch_to_parquet(
35    batch: RecordBatch,
36    path: &str,
37    storage_options: Option<std::collections::HashMap<String, String>>,
38    compression: Option<parquet::basic::Compression>,
39    max_row_group_size: Option<usize>,
40) -> anyhow::Result<()> {
41    write_batches_to_parquet(
42        &[batch],
43        path,
44        storage_options,
45        compression,
46        max_row_group_size,
47    )
48    .await
49}
50
51/// Writes multiple `RecordBatch` items to a Parquet file using object store, with optional compression, row group sizing, and storage options.
52///
53/// # Errors
54///
55/// Returns an error if writing to Parquet fails or any I/O operation fails.
56pub async fn write_batches_to_parquet(
57    batches: &[RecordBatch],
58    path: &str,
59    storage_options: Option<std::collections::HashMap<String, String>>,
60    compression: Option<parquet::basic::Compression>,
61    max_row_group_size: Option<usize>,
62) -> anyhow::Result<()> {
63    let (object_store, base_path, _) = create_object_store_from_path(path, storage_options)?;
64    let object_path = if base_path.is_empty() {
65        ObjectPath::from(path)
66    } else {
67        ObjectPath::from(format!("{base_path}/{path}"))
68    };
69
70    write_batches_to_object_store(
71        batches,
72        object_store,
73        &object_path,
74        compression,
75        max_row_group_size,
76    )
77    .await
78}
79
80/// Writes multiple `RecordBatch` items to an object store URI, with optional compression and row group sizing.
81///
82/// # Errors
83///
84/// Returns an error if writing to Parquet fails or any I/O operation fails.
85pub async fn write_batches_to_object_store(
86    batches: &[RecordBatch],
87    object_store: Arc<dyn ObjectStore>,
88    path: &ObjectPath,
89    compression: Option<parquet::basic::Compression>,
90    max_row_group_size: Option<usize>,
91) -> anyhow::Result<()> {
92    // Create a temporary buffer to write the parquet data
93    let mut buffer = Vec::new();
94
95    let writer_props = WriterProperties::builder()
96        .set_compression(compression.unwrap_or(parquet::basic::Compression::SNAPPY))
97        .set_max_row_group_size(max_row_group_size.unwrap_or(5000))
98        .build();
99
100    let mut writer = ArrowWriter::try_new(&mut buffer, batches[0].schema(), Some(writer_props))?;
101    for batch in batches {
102        writer.write(batch)?;
103    }
104    writer.close()?;
105
106    // Upload the buffer to object store
107    object_store.put(path, buffer.into()).await?;
108
109    Ok(())
110}
111
112/// Combines multiple Parquet files using object store with storage options
113///
114/// # Errors
115///
116/// Returns an error if file reading or writing fails.
117pub async fn combine_parquet_files(
118    file_paths: Vec<&str>,
119    new_file_path: &str,
120    storage_options: Option<std::collections::HashMap<String, String>>,
121    compression: Option<parquet::basic::Compression>,
122    max_row_group_size: Option<usize>,
123) -> anyhow::Result<()> {
124    if file_paths.len() <= 1 {
125        return Ok(());
126    }
127
128    // Create object store from the first file path (assuming all files are in the same store)
129    let (object_store, base_path, _) =
130        create_object_store_from_path(file_paths[0], storage_options)?;
131
132    // Convert string paths to ObjectPath
133    let object_paths: Vec<ObjectPath> = file_paths
134        .iter()
135        .map(|path| {
136            if base_path.is_empty() {
137                ObjectPath::from(*path)
138            } else {
139                ObjectPath::from(format!("{base_path}/{path}"))
140            }
141        })
142        .collect();
143
144    let new_object_path = if base_path.is_empty() {
145        ObjectPath::from(new_file_path)
146    } else {
147        ObjectPath::from(format!("{base_path}/{new_file_path}"))
148    };
149
150    combine_parquet_files_from_object_store(
151        object_store,
152        object_paths,
153        &new_object_path,
154        compression,
155        max_row_group_size,
156    )
157    .await
158}
159
160/// Combines multiple Parquet files from object store
161///
162/// # Errors
163///
164/// Returns an error if file reading or writing fails.
165pub async fn combine_parquet_files_from_object_store(
166    object_store: Arc<dyn ObjectStore>,
167    file_paths: Vec<ObjectPath>,
168    new_file_path: &ObjectPath,
169    compression: Option<parquet::basic::Compression>,
170    max_row_group_size: Option<usize>,
171) -> anyhow::Result<()> {
172    if file_paths.len() <= 1 {
173        return Ok(());
174    }
175
176    let mut all_batches: Vec<RecordBatch> = Vec::new();
177
178    // Read all files from object store
179    for path in &file_paths {
180        let data = object_store.get(path).await?.bytes().await?;
181        let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
182        let mut reader = builder.build()?;
183
184        for batch in reader.by_ref() {
185            all_batches.push(batch?);
186        }
187    }
188
189    // Write combined batches to new location
190    write_batches_to_object_store(
191        &all_batches,
192        object_store.clone(),
193        new_file_path,
194        compression,
195        max_row_group_size,
196    )
197    .await?;
198
199    // Remove the merged files
200    for path in &file_paths {
201        object_store.delete(path).await?;
202    }
203
204    Ok(())
205}
206
207/// Extracts the minimum and maximum i64 values for the specified `column_name` from a Parquet file's metadata using object store with storage options.
208///
209/// # Errors
210///
211/// Returns an error if the file cannot be read, metadata parsing fails, or the column is missing or has no statistics.
212///
213/// # Panics
214///
215/// Panics if the Parquet metadata's min/max unwrap operations fail unexpectedly.
216pub async fn min_max_from_parquet_metadata(
217    file_path: &str,
218    storage_options: Option<std::collections::HashMap<String, String>>,
219    column_name: &str,
220) -> anyhow::Result<(u64, u64)> {
221    let (object_store, base_path, _) = create_object_store_from_path(file_path, storage_options)?;
222    let object_path = if base_path.is_empty() {
223        ObjectPath::from(file_path)
224    } else {
225        ObjectPath::from(format!("{base_path}/{file_path}"))
226    };
227
228    min_max_from_parquet_metadata_object_store(object_store, &object_path, column_name).await
229}
230
231/// Extracts the minimum and maximum i64 values for the specified `column_name` from a Parquet file's metadata in object store.
232///
233/// # Errors
234///
235/// Returns an error if the file cannot be read, metadata parsing fails, or the column is missing or has no statistics.
236///
237/// # Panics
238///
239/// Panics if the Parquet metadata's min/max unwrap operations fail unexpectedly.
240pub async fn min_max_from_parquet_metadata_object_store(
241    object_store: Arc<dyn ObjectStore>,
242    file_path: &ObjectPath,
243    column_name: &str,
244) -> anyhow::Result<(u64, u64)> {
245    // Download the parquet file from object store
246    let data = object_store.get(file_path).await?.bytes().await?;
247    let reader = SerializedFileReader::new(data)?;
248
249    let metadata = reader.metadata();
250    let mut overall_min_value: Option<i64> = None;
251    let mut overall_max_value: Option<i64> = None;
252
253    // Iterate through all row groups
254    for i in 0..metadata.num_row_groups() {
255        let row_group = metadata.row_group(i);
256
257        // Iterate through all columns in this row group
258        for j in 0..row_group.num_columns() {
259            let col_metadata = row_group.column(j);
260
261            if col_metadata.column_path().string() == column_name {
262                if let Some(stats) = col_metadata.statistics() {
263                    // Check if we have Int64 statistics
264                    if let Statistics::Int64(int64_stats) = stats {
265                        // Extract min value if available
266                        if let Some(&min_value) = int64_stats.min_opt()
267                            && (overall_min_value.is_none()
268                                || min_value < overall_min_value.unwrap())
269                        {
270                            overall_min_value = Some(min_value);
271                        }
272
273                        // Extract max value if available
274                        if let Some(&max_value) = int64_stats.max_opt()
275                            && (overall_max_value.is_none()
276                                || max_value > overall_max_value.unwrap())
277                        {
278                            overall_max_value = Some(max_value);
279                        }
280                    } else {
281                        anyhow::bail!("Warning: Column name '{column_name}' is not of type i64.");
282                    }
283                } else {
284                    anyhow::bail!(
285                        "Warning: Statistics not available for column '{column_name}' in row group {i}."
286                    );
287                }
288            }
289        }
290    }
291
292    // Return the min/max pair if both are available
293    if let (Some(min), Some(max)) = (overall_min_value, overall_max_value) {
294        Ok((min as u64, max as u64))
295    } else {
296        anyhow::bail!(
297            "Column '{column_name}' not found or has no Int64 statistics in any row group."
298        )
299    }
300}
301
302/// Creates an object store from a URI string with optional storage options.
303///
304/// Supports multiple cloud storage providers:
305/// - AWS S3: `s3://bucket/path`
306/// - Google Cloud Storage: `gs://bucket/path` or `gcs://bucket/path`
307/// - Azure Blob Storage: `azure://account/container/path` or `abfs://container@account.dfs.core.windows.net/path`
308/// - HTTP/WebDAV: `http://` or `https://`
309/// - Local files: `file://path` or plain paths
310///
311/// # Parameters
312///
313/// - `path`: The URI string for the storage location.
314/// - `storage_options`: Optional `HashMap` containing storage-specific configuration options:
315///   - For S3: `endpoint_url`, region, `access_key_id`, `secret_access_key`, `session_token`, etc.
316///   - For GCS: `service_account_path`, `service_account_key`, `project_id`, etc.
317///   - For Azure: `account_name`, `account_key`, `sas_token`, etc.
318///
319/// Returns a tuple of (`ObjectStore`, `base_path`, `normalized_uri`)
320pub fn create_object_store_from_path(
321    path: &str,
322    storage_options: Option<std::collections::HashMap<String, String>>,
323) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
324    let uri = normalize_path_to_uri(path);
325
326    match uri.as_str() {
327        s if s.starts_with("s3://") => create_s3_store(&uri, storage_options),
328        s if s.starts_with("gs://") || s.starts_with("gcs://") => {
329            create_gcs_store(&uri, storage_options)
330        }
331        s if s.starts_with("azure://") => create_azure_store(&uri, storage_options),
332        s if s.starts_with("abfs://") => create_abfs_store(&uri, storage_options),
333        s if s.starts_with("http://") || s.starts_with("https://") => {
334            create_http_store(&uri, storage_options)
335        }
336        s if s.starts_with("file://") => create_local_store(&uri, true),
337        _ => create_local_store(&uri, false), // Fallback: assume local path
338    }
339}
340
341/// Normalizes a path to URI format for consistent object store usage.
342///
343/// If the path is already a URI (contains "://"), returns it as-is.
344/// Otherwise, converts local paths to file:// URIs with proper cross-platform handling.
345///
346/// Supported URI schemes:
347/// - `s3://` for AWS S3
348/// - `gs://` or `gcs://` for Google Cloud Storage
349/// - `azure://` or `abfs://` for Azure Blob Storage
350/// - `http://` or `https://` for HTTP/WebDAV
351/// - `file://` for local files
352///
353/// # Cross-platform Path Handling
354///
355/// - Unix absolute paths: `/path/to/file` → `file:///path/to/file`
356/// - Windows drive paths: `C:\path\to\file` → `file:///C:/path/to/file`
357/// - Windows UNC paths: `\\server\share\file` → `file://server/share/file`
358/// - Relative paths: converted to absolute using current directory
359#[must_use]
360pub fn normalize_path_to_uri(path: &str) -> String {
361    if path.contains("://") {
362        // Already a URI - return as-is
363        path.to_string()
364    } else {
365        // Convert local path to file:// URI with cross-platform support
366        if is_absolute_path(path) {
367            path_to_file_uri(path)
368        } else {
369            // Relative path - make it absolute first
370            let absolute_path = std::env::current_dir().unwrap().join(path);
371            path_to_file_uri(&absolute_path.to_string_lossy())
372        }
373    }
374}
375
376/// Checks if a path is absolute on the current platform.
377#[must_use]
378fn is_absolute_path(path: &str) -> bool {
379    if path.starts_with('/') {
380        // Unix absolute path
381        true
382    } else if path.len() >= 3
383        && path.chars().nth(1) == Some(':')
384        && path.chars().nth(2) == Some('\\')
385    {
386        // Windows drive path like C:\
387        true
388    } else if path.len() >= 3
389        && path.chars().nth(1) == Some(':')
390        && path.chars().nth(2) == Some('/')
391    {
392        // Windows drive path with forward slashes like C:/
393        true
394    } else if path.starts_with("\\\\") {
395        // Windows UNC path
396        true
397    } else {
398        false
399    }
400}
401
402/// Converts an absolute path to a file:// URI with proper platform handling.
403#[must_use]
404fn path_to_file_uri(path: &str) -> String {
405    if path.starts_with('/') {
406        // Unix absolute path
407        format!("file://{path}")
408    } else if path.len() >= 3 && path.chars().nth(1) == Some(':') {
409        // Windows drive path - normalize separators and add proper prefix
410        let normalized = path.replace('\\', "/");
411        format!("file:///{normalized}")
412    } else if let Some(without_prefix) = path.strip_prefix("\\\\") {
413        // Windows UNC path \\server\share -> file://server/share
414        let normalized = without_prefix.replace('\\', "/");
415        format!("file://{normalized}")
416    } else {
417        // Fallback - treat as relative to root
418        format!("file://{path}")
419    }
420}
421
422/// Helper function to create local file system object store
423fn create_local_store(
424    uri: &str,
425    is_file_uri: bool,
426) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
427    let path = if is_file_uri {
428        uri.strip_prefix("file://").unwrap_or(uri)
429    } else {
430        uri
431    };
432
433    let local_store = object_store::local::LocalFileSystem::new_with_prefix(path)?;
434    Ok((Arc::new(local_store), String::new(), uri.to_string()))
435}
436
437/// Helper function to create S3 object store with options
438fn create_s3_store(
439    uri: &str,
440    storage_options: Option<std::collections::HashMap<String, String>>,
441) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
442    let (url, path) = parse_url_and_path(uri)?;
443    let bucket = extract_host(&url, "Invalid S3 URI: missing bucket")?;
444
445    let mut builder = object_store::aws::AmazonS3Builder::new().with_bucket_name(&bucket);
446
447    // Apply storage options if provided
448    if let Some(options) = storage_options {
449        for (key, value) in options {
450            match key.as_str() {
451                "endpoint_url" => {
452                    builder = builder.with_endpoint(&value);
453                }
454                "region" => {
455                    builder = builder.with_region(&value);
456                }
457                "access_key_id" | "key" => {
458                    builder = builder.with_access_key_id(&value);
459                }
460                "secret_access_key" | "secret" => {
461                    builder = builder.with_secret_access_key(&value);
462                }
463                "session_token" | "token" => {
464                    builder = builder.with_token(&value);
465                }
466                "allow_http" => {
467                    let allow_http = value.to_lowercase() == "true";
468                    builder = builder.with_allow_http(allow_http);
469                }
470                _ => {
471                    // Ignore unknown options for forward compatibility
472                    log::warn!("Unknown S3 storage option: {key}");
473                }
474            }
475        }
476    }
477
478    let s3_store = builder.build()?;
479    Ok((Arc::new(s3_store), path, uri.to_string()))
480}
481
482/// Helper function to create GCS object store with options
483fn create_gcs_store(
484    uri: &str,
485    storage_options: Option<std::collections::HashMap<String, String>>,
486) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
487    let (url, path) = parse_url_and_path(uri)?;
488    let bucket = extract_host(&url, "Invalid GCS URI: missing bucket")?;
489
490    let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new().with_bucket_name(&bucket);
491
492    // Apply storage options if provided
493    if let Some(options) = storage_options {
494        for (key, value) in options {
495            match key.as_str() {
496                "service_account_path" => {
497                    builder = builder.with_service_account_path(&value);
498                }
499                "service_account_key" => {
500                    builder = builder.with_service_account_key(&value);
501                }
502                "project_id" => {
503                    // Note: GoogleCloudStorageBuilder doesn't have with_project_id method
504                    // This would need to be handled via environment variables or service account
505                    log::warn!(
506                        "project_id should be set via service account or environment variables"
507                    );
508                }
509                "application_credentials" => {
510                    // Set GOOGLE_APPLICATION_CREDENTIALS env var required by Google auth libraries.
511                    // SAFETY: std::env::set_var is marked unsafe because it mutates global state and
512                    // can break signal-safe code. We only call it during configuration before any
513                    // multi-threaded work starts, so it is considered safe in this context.
514                    unsafe {
515                        std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", &value);
516                    }
517                }
518                _ => {
519                    // Ignore unknown options for forward compatibility
520                    log::warn!("Unknown GCS storage option: {key}");
521                }
522            }
523        }
524    }
525
526    let gcs_store = builder.build()?;
527    Ok((Arc::new(gcs_store), path, uri.to_string()))
528}
529
530/// Helper function to create Azure object store with options
531fn create_azure_store(
532    uri: &str,
533    storage_options: Option<std::collections::HashMap<String, String>>,
534) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
535    let (url, _) = parse_url_and_path(uri)?;
536    let account = extract_host(&url, "Invalid Azure URI: missing account")?;
537
538    let path_segments: Vec<&str> = url.path().trim_start_matches('/').split('/').collect();
539    if path_segments.is_empty() || path_segments[0].is_empty() {
540        anyhow::bail!("Invalid Azure URI: missing container");
541    }
542
543    let container = path_segments[0];
544    let path = if path_segments.len() > 1 {
545        path_segments[1..].join("/")
546    } else {
547        String::new()
548    };
549
550    let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
551        .with_account(&account)
552        .with_container_name(container);
553
554    // Apply storage options if provided
555    if let Some(options) = storage_options {
556        for (key, value) in options {
557            match key.as_str() {
558                "account_name" => {
559                    builder = builder.with_account(&value);
560                }
561                "account_key" => {
562                    builder = builder.with_access_key(&value);
563                }
564                "sas_token" => {
565                    // Parse SAS token as query string parameters
566                    let query_pairs: Vec<(String, String)> = value
567                        .split('&')
568                        .filter_map(|pair| {
569                            let mut parts = pair.split('=');
570                            match (parts.next(), parts.next()) {
571                                (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
572                                _ => None,
573                            }
574                        })
575                        .collect();
576                    builder = builder.with_sas_authorization(query_pairs);
577                }
578                "client_id" => {
579                    builder = builder.with_client_id(&value);
580                }
581                "client_secret" => {
582                    builder = builder.with_client_secret(&value);
583                }
584                "tenant_id" => {
585                    builder = builder.with_tenant_id(&value);
586                }
587                _ => {
588                    // Ignore unknown options for forward compatibility
589                    log::warn!("Unknown Azure storage option: {key}");
590                }
591            }
592        }
593    }
594
595    let azure_store = builder.build()?;
596    Ok((Arc::new(azure_store), path, uri.to_string()))
597}
598
599/// Helper function to create Azure object store from abfs:// URI with options.
600fn create_abfs_store(
601    uri: &str,
602    storage_options: Option<std::collections::HashMap<String, String>>,
603) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
604    let (url, path) = parse_url_and_path(uri)?;
605    let host = extract_host(&url, "Invalid ABFS URI: missing host")?;
606
607    // Extract account from host (account.dfs.core.windows.net)
608    let account = host
609        .split('.')
610        .next()
611        .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: cannot extract account from host"))?;
612
613    // Extract container from username part
614    let container = url
615        .username()
616        .split('@')
617        .next()
618        .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: missing container"))?;
619
620    let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
621        .with_account(account)
622        .with_container_name(container);
623
624    // Apply storage options if provided (same as Azure store)
625    if let Some(options) = storage_options {
626        for (key, value) in options {
627            match key.as_str() {
628                "account_name" => {
629                    builder = builder.with_account(&value);
630                }
631                "account_key" => {
632                    builder = builder.with_access_key(&value);
633                }
634                "sas_token" => {
635                    // Parse SAS token as query string parameters
636                    let query_pairs: Vec<(String, String)> = value
637                        .split('&')
638                        .filter_map(|pair| {
639                            let mut parts = pair.split('=');
640                            match (parts.next(), parts.next()) {
641                                (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
642                                _ => None,
643                            }
644                        })
645                        .collect();
646                    builder = builder.with_sas_authorization(query_pairs);
647                }
648                "client_id" => {
649                    builder = builder.with_client_id(&value);
650                }
651                "client_secret" => {
652                    builder = builder.with_client_secret(&value);
653                }
654                "tenant_id" => {
655                    builder = builder.with_tenant_id(&value);
656                }
657                _ => {
658                    // Ignore unknown options for forward compatibility
659                    log::warn!("Unknown ABFS storage option: {key}");
660                }
661            }
662        }
663    }
664
665    let azure_store = builder.build()?;
666    Ok((Arc::new(azure_store), path, uri.to_string()))
667}
668
669/// Helper function to create HTTP object store with options.
670fn create_http_store(
671    uri: &str,
672    storage_options: Option<std::collections::HashMap<String, String>>,
673) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
674    let (url, path) = parse_url_and_path(uri)?;
675    let base_url = format!("{}://{}", url.scheme(), url.host_str().unwrap_or(""));
676
677    let builder = object_store::http::HttpBuilder::new().with_url(base_url);
678
679    // Apply storage options if provided
680    if let Some(options) = storage_options {
681        for (key, _value) in options {
682            // HTTP builder has limited configuration options
683            // Most HTTP-specific options would be handled via client options
684            // Ignore unknown options for forward compatibility
685            log::warn!("Unknown HTTP storage option: {key}");
686        }
687    }
688
689    let http_store = builder.build()?;
690    Ok((Arc::new(http_store), path, uri.to_string()))
691}
692
693/// Helper function to parse URL and extract path component.
694fn parse_url_and_path(uri: &str) -> anyhow::Result<(url::Url, String)> {
695    let url = url::Url::parse(uri)?;
696    let path = url.path().trim_start_matches('/').to_string();
697    Ok((url, path))
698}
699
700/// Helper function to extract host from URL with error handling.
701fn extract_host(url: &url::Url, error_msg: &str) -> anyhow::Result<String> {
702    url.host_str()
703        .map(ToString::to_string)
704        .ok_or_else(|| anyhow::anyhow!("{}", error_msg))
705}
706
707////////////////////////////////////////////////////////////////////////////////
708// Tests
709////////////////////////////////////////////////////////////////////////////////
710
711#[cfg(test)]
712mod tests {
713    use std::collections::HashMap;
714
715    use rstest::rstest;
716
717    use super::*;
718
719    #[rstest]
720    fn test_create_object_store_from_path_local() {
721        // Create a temporary directory for testing
722        let temp_dir = std::env::temp_dir().join("nautilus_test");
723        std::fs::create_dir_all(&temp_dir).unwrap();
724
725        let result = create_object_store_from_path(temp_dir.to_str().unwrap(), None);
726        if let Err(e) = &result {
727            println!("Error: {e:?}");
728        }
729        assert!(result.is_ok());
730        let (_, base_path, uri) = result.unwrap();
731        assert_eq!(base_path, "");
732        // The URI should be normalized to file:// format
733        assert_eq!(uri, format!("file://{}", temp_dir.to_str().unwrap()));
734
735        // Clean up
736        std::fs::remove_dir_all(&temp_dir).ok();
737    }
738
739    #[rstest]
740    fn test_create_object_store_from_path_s3() {
741        let mut options = HashMap::new();
742        options.insert(
743            "endpoint_url".to_string(),
744            "https://test.endpoint.com".to_string(),
745        );
746        options.insert("region".to_string(), "us-west-2".to_string());
747        options.insert("access_key_id".to_string(), "test_key".to_string());
748        options.insert("secret_access_key".to_string(), "test_secret".to_string());
749
750        let result = create_object_store_from_path("s3://test-bucket/path", Some(options));
751        assert!(result.is_ok());
752        let (_, base_path, uri) = result.unwrap();
753        assert_eq!(base_path, "path");
754        assert_eq!(uri, "s3://test-bucket/path");
755    }
756
757    #[rstest]
758    fn test_create_object_store_from_path_azure() {
759        let mut options = HashMap::new();
760        options.insert("account_name".to_string(), "testaccount".to_string());
761        // Use a valid base64 encoded key for testing
762        options.insert("account_key".to_string(), "dGVzdGtleQ==".to_string()); // "testkey" in base64
763
764        let result =
765            create_object_store_from_path("azure://testaccount/container/path", Some(options));
766        if let Err(e) = &result {
767            println!("Azure Error: {e:?}");
768        }
769        assert!(result.is_ok());
770        let (_, base_path, uri) = result.unwrap();
771        assert_eq!(base_path, "path");
772        assert_eq!(uri, "azure://testaccount/container/path");
773    }
774
775    #[rstest]
776    fn test_create_object_store_from_path_gcs() {
777        // Test GCS without service account (will use default credentials or fail gracefully)
778        let mut options = HashMap::new();
779        options.insert("project_id".to_string(), "test-project".to_string());
780
781        let result = create_object_store_from_path("gs://test-bucket/path", Some(options));
782        // GCS might fail due to missing credentials, but we're testing the path parsing
783        // The function should at least parse the URI correctly before failing on auth
784        match result {
785            Ok((_, base_path, uri)) => {
786                assert_eq!(base_path, "path");
787                assert_eq!(uri, "gs://test-bucket/path");
788            }
789            Err(e) => {
790                // Expected to fail due to missing credentials, but should contain bucket info
791                let error_msg = format!("{e:?}");
792                assert!(error_msg.contains("test-bucket") || error_msg.contains("credential"));
793            }
794        }
795    }
796
797    #[rstest]
798    fn test_create_object_store_from_path_empty_options() {
799        let result = create_object_store_from_path("s3://test-bucket/path", None);
800        assert!(result.is_ok());
801        let (_, base_path, uri) = result.unwrap();
802        assert_eq!(base_path, "path");
803        assert_eq!(uri, "s3://test-bucket/path");
804    }
805
806    #[rstest]
807    fn test_parse_url_and_path() {
808        let result = parse_url_and_path("s3://bucket/path/to/file");
809        assert!(result.is_ok());
810        let (url, path) = result.unwrap();
811        assert_eq!(url.scheme(), "s3");
812        assert_eq!(url.host_str().unwrap(), "bucket");
813        assert_eq!(path, "path/to/file");
814    }
815
816    #[rstest]
817    fn test_extract_host() {
818        let url = url::Url::parse("s3://test-bucket/path").unwrap();
819        let result = extract_host(&url, "Test error");
820        assert!(result.is_ok());
821        assert_eq!(result.unwrap(), "test-bucket");
822    }
823
824    #[rstest]
825    fn test_normalize_path_to_uri() {
826        // Unix absolute paths
827        assert_eq!(normalize_path_to_uri("/tmp/test"), "file:///tmp/test");
828
829        // Windows drive paths
830        assert_eq!(
831            normalize_path_to_uri("C:\\tmp\\test"),
832            "file:///C:/tmp/test"
833        );
834        assert_eq!(normalize_path_to_uri("C:/tmp/test"), "file:///C:/tmp/test");
835        assert_eq!(
836            normalize_path_to_uri("D:\\data\\file.txt"),
837            "file:///D:/data/file.txt"
838        );
839
840        // Windows UNC paths
841        assert_eq!(
842            normalize_path_to_uri("\\\\server\\share\\file"),
843            "file://server/share/file"
844        );
845
846        // Already URIs - should remain unchanged
847        assert_eq!(
848            normalize_path_to_uri("s3://bucket/path"),
849            "s3://bucket/path"
850        );
851        assert_eq!(
852            normalize_path_to_uri("file:///tmp/test"),
853            "file:///tmp/test"
854        );
855        assert_eq!(
856            normalize_path_to_uri("https://example.com/path"),
857            "https://example.com/path"
858        );
859    }
860
861    #[rstest]
862    fn test_is_absolute_path() {
863        // Unix absolute paths
864        assert!(is_absolute_path("/tmp/test"));
865        assert!(is_absolute_path("/"));
866
867        // Windows drive paths
868        assert!(is_absolute_path("C:\\tmp\\test"));
869        assert!(is_absolute_path("C:/tmp/test"));
870        assert!(is_absolute_path("D:\\"));
871        assert!(is_absolute_path("Z:/"));
872
873        // Windows UNC paths
874        assert!(is_absolute_path("\\\\server\\share"));
875        assert!(is_absolute_path("\\\\localhost\\c$"));
876
877        // Relative paths
878        assert!(!is_absolute_path("tmp/test"));
879        assert!(!is_absolute_path("./test"));
880        assert!(!is_absolute_path("../test"));
881        assert!(!is_absolute_path("test.txt"));
882
883        // Edge cases
884        assert!(!is_absolute_path(""));
885        assert!(!is_absolute_path("C"));
886        assert!(!is_absolute_path("C:"));
887        assert!(!is_absolute_path("\\"));
888    }
889
890    #[rstest]
891    fn test_path_to_file_uri() {
892        // Unix absolute paths
893        assert_eq!(path_to_file_uri("/tmp/test"), "file:///tmp/test");
894        assert_eq!(path_to_file_uri("/"), "file:///");
895
896        // Windows drive paths
897        assert_eq!(path_to_file_uri("C:\\tmp\\test"), "file:///C:/tmp/test");
898        assert_eq!(path_to_file_uri("C:/tmp/test"), "file:///C:/tmp/test");
899        assert_eq!(path_to_file_uri("D:\\"), "file:///D:/");
900
901        // Windows UNC paths
902        assert_eq!(
903            path_to_file_uri("\\\\server\\share\\file"),
904            "file://server/share/file"
905        );
906        assert_eq!(
907            path_to_file_uri("\\\\localhost\\c$\\test"),
908            "file://localhost/c$/test"
909        );
910    }
911}