1use std::sync::Arc;
17
18use ahash::AHashMap;
19use arrow::record_batch::RecordBatch;
20use object_store::{ObjectStore, path::Path as ObjectPath};
21use parquet::{
22 arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
23 file::{
24 properties::WriterProperties,
25 reader::{FileReader, SerializedFileReader},
26 statistics::Statistics,
27 },
28};
29
30pub async fn write_batch_to_parquet(
36 batch: RecordBatch,
37 path: &str,
38 storage_options: Option<AHashMap<String, String>>,
39 compression: Option<parquet::basic::Compression>,
40 max_row_group_size: Option<usize>,
41) -> anyhow::Result<()> {
42 write_batches_to_parquet(
43 &[batch],
44 path,
45 storage_options,
46 compression,
47 max_row_group_size,
48 )
49 .await
50}
51
52pub async fn write_batches_to_parquet(
58 batches: &[RecordBatch],
59 path: &str,
60 storage_options: Option<AHashMap<String, String>>,
61 compression: Option<parquet::basic::Compression>,
62 max_row_group_size: Option<usize>,
63) -> anyhow::Result<()> {
64 let (object_store, base_path, _) = create_object_store_from_path(path, storage_options)?;
65 let object_path = if base_path.is_empty() {
66 ObjectPath::from(path)
67 } else {
68 ObjectPath::from(format!("{base_path}/{path}"))
69 };
70
71 write_batches_to_object_store(
72 batches,
73 object_store,
74 &object_path,
75 compression,
76 max_row_group_size,
77 )
78 .await
79}
80
81pub async fn write_batches_to_object_store(
87 batches: &[RecordBatch],
88 object_store: Arc<dyn ObjectStore>,
89 path: &ObjectPath,
90 compression: Option<parquet::basic::Compression>,
91 max_row_group_size: Option<usize>,
92) -> anyhow::Result<()> {
93 let mut buffer = Vec::new();
95
96 let writer_props = WriterProperties::builder()
97 .set_compression(compression.unwrap_or(parquet::basic::Compression::SNAPPY))
98 .set_max_row_group_size(max_row_group_size.unwrap_or(5000))
99 .build();
100
101 let mut writer = ArrowWriter::try_new(&mut buffer, batches[0].schema(), Some(writer_props))?;
102 for batch in batches {
103 writer.write(batch)?;
104 }
105 writer.close()?;
106
107 object_store.put(path, buffer.into()).await?;
109
110 Ok(())
111}
112
113pub async fn combine_parquet_files(
119 file_paths: Vec<&str>,
120 new_file_path: &str,
121 storage_options: Option<AHashMap<String, String>>,
122 compression: Option<parquet::basic::Compression>,
123 max_row_group_size: Option<usize>,
124) -> anyhow::Result<()> {
125 if file_paths.len() <= 1 {
126 return Ok(());
127 }
128
129 let (object_store, base_path, _) =
131 create_object_store_from_path(file_paths[0], storage_options)?;
132
133 let object_paths: Vec<ObjectPath> = file_paths
135 .iter()
136 .map(|path| {
137 if base_path.is_empty() {
138 ObjectPath::from(*path)
139 } else {
140 ObjectPath::from(format!("{base_path}/{path}"))
141 }
142 })
143 .collect();
144
145 let new_object_path = if base_path.is_empty() {
146 ObjectPath::from(new_file_path)
147 } else {
148 ObjectPath::from(format!("{base_path}/{new_file_path}"))
149 };
150
151 combine_parquet_files_from_object_store(
152 object_store,
153 object_paths,
154 &new_object_path,
155 compression,
156 max_row_group_size,
157 )
158 .await
159}
160
161pub async fn combine_parquet_files_from_object_store(
167 object_store: Arc<dyn ObjectStore>,
168 file_paths: Vec<ObjectPath>,
169 new_file_path: &ObjectPath,
170 compression: Option<parquet::basic::Compression>,
171 max_row_group_size: Option<usize>,
172) -> anyhow::Result<()> {
173 if file_paths.len() <= 1 {
174 return Ok(());
175 }
176
177 let mut all_batches: Vec<RecordBatch> = Vec::new();
178
179 for path in &file_paths {
181 let data = object_store.get(path).await?.bytes().await?;
182 let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
183 let mut reader = builder.build()?;
184
185 for batch in reader.by_ref() {
186 all_batches.push(batch?);
187 }
188 }
189
190 write_batches_to_object_store(
192 &all_batches,
193 object_store.clone(),
194 new_file_path,
195 compression,
196 max_row_group_size,
197 )
198 .await?;
199
200 for path in &file_paths {
202 if path != new_file_path {
203 object_store.delete(path).await?;
204 }
205 }
206
207 Ok(())
208}
209
210pub async fn min_max_from_parquet_metadata(
220 file_path: &str,
221 storage_options: Option<AHashMap<String, String>>,
222 column_name: &str,
223) -> anyhow::Result<(u64, u64)> {
224 let (object_store, base_path, _) = create_object_store_from_path(file_path, storage_options)?;
225 let object_path = if base_path.is_empty() {
226 ObjectPath::from(file_path)
227 } else {
228 ObjectPath::from(format!("{base_path}/{file_path}"))
229 };
230
231 min_max_from_parquet_metadata_object_store(object_store, &object_path, column_name).await
232}
233
234pub async fn min_max_from_parquet_metadata_object_store(
244 object_store: Arc<dyn ObjectStore>,
245 file_path: &ObjectPath,
246 column_name: &str,
247) -> anyhow::Result<(u64, u64)> {
248 let data = object_store.get(file_path).await?.bytes().await?;
250 let reader = SerializedFileReader::new(data)?;
251
252 let metadata = reader.metadata();
253 let mut overall_min_value: Option<i64> = None;
254 let mut overall_max_value: Option<i64> = None;
255
256 for i in 0..metadata.num_row_groups() {
258 let row_group = metadata.row_group(i);
259
260 for j in 0..row_group.num_columns() {
262 let col_metadata = row_group.column(j);
263
264 if col_metadata.column_path().string() == column_name {
265 if let Some(stats) = col_metadata.statistics() {
266 if let Statistics::Int64(int64_stats) = stats {
268 if let Some(&min_value) = int64_stats.min_opt()
270 && (overall_min_value.is_none()
271 || min_value < overall_min_value.unwrap())
272 {
273 overall_min_value = Some(min_value);
274 }
275
276 if let Some(&max_value) = int64_stats.max_opt()
278 && (overall_max_value.is_none()
279 || max_value > overall_max_value.unwrap())
280 {
281 overall_max_value = Some(max_value);
282 }
283 } else {
284 anyhow::bail!("Warning: Column name '{column_name}' is not of type i64.");
285 }
286 } else {
287 anyhow::bail!(
288 "Warning: Statistics not available for column '{column_name}' in row group {i}."
289 );
290 }
291 }
292 }
293 }
294
295 if let (Some(min), Some(max)) = (overall_min_value, overall_max_value) {
297 Ok((min as u64, max as u64))
298 } else {
299 anyhow::bail!(
300 "Column '{column_name}' not found or has no Int64 statistics in any row group."
301 )
302 }
303}
304
305#[allow(unused_variables)]
324pub fn create_object_store_from_path(
325 path: &str,
326 storage_options: Option<AHashMap<String, String>>,
327) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
328 let uri = normalize_path_to_uri(path);
329
330 match uri.as_str() {
331 #[cfg(feature = "cloud")]
332 s if s.starts_with("s3://") => create_s3_store(&uri, storage_options),
333 #[cfg(feature = "cloud")]
334 s if s.starts_with("gs://") || s.starts_with("gcs://") => {
335 create_gcs_store(&uri, storage_options)
336 }
337 #[cfg(feature = "cloud")]
338 s if s.starts_with("az://") => create_azure_store(&uri, storage_options),
339 #[cfg(feature = "cloud")]
340 s if s.starts_with("abfs://") => create_abfs_store(&uri, storage_options),
341 #[cfg(feature = "cloud")]
342 s if s.starts_with("http://") || s.starts_with("https://") => {
343 create_http_store(&uri, storage_options)
344 }
345 #[cfg(not(feature = "cloud"))]
346 s if s.starts_with("s3://")
347 || s.starts_with("gs://")
348 || s.starts_with("gcs://")
349 || s.starts_with("az://")
350 || s.starts_with("abfs://")
351 || s.starts_with("http://")
352 || s.starts_with("https://") =>
353 {
354 anyhow::bail!("Cloud storage support requires the 'cloud' feature: {uri}")
355 }
356 s if s.starts_with("file://") => create_local_store(&uri, true),
357 _ => create_local_store(&uri, false), }
359}
360
361#[must_use]
380pub fn normalize_path_to_uri(path: &str) -> String {
381 if path.contains("://") {
382 path.to_string()
384 } else {
385 if is_absolute_path(path) {
387 path_to_file_uri(path)
388 } else {
389 let absolute_path = std::env::current_dir().unwrap().join(path);
391 path_to_file_uri(&absolute_path.to_string_lossy())
392 }
393 }
394}
395
396#[must_use]
398fn is_absolute_path(path: &str) -> bool {
399 if path.starts_with('/') {
400 true
402 } else if path.len() >= 3
403 && path.chars().nth(1) == Some(':')
404 && path.chars().nth(2) == Some('\\')
405 {
406 true
408 } else if path.len() >= 3
409 && path.chars().nth(1) == Some(':')
410 && path.chars().nth(2) == Some('/')
411 {
412 true
414 } else if path.starts_with("\\\\") {
415 true
417 } else {
418 false
419 }
420}
421
422#[must_use]
424fn path_to_file_uri(path: &str) -> String {
425 if path.starts_with('/') {
426 format!("file://{path}")
428 } else if path.len() >= 3 && path.chars().nth(1) == Some(':') {
429 let normalized = path.replace('\\', "/");
431 format!("file:///{normalized}")
432 } else if let Some(without_prefix) = path.strip_prefix("\\\\") {
433 let normalized = without_prefix.replace('\\', "/");
435 format!("file://{normalized}")
436 } else {
437 format!("file://{path}")
439 }
440}
441
442fn create_local_store(
444 uri: &str,
445 is_file_uri: bool,
446) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
447 let path = if is_file_uri {
448 uri.strip_prefix("file://").unwrap_or(uri)
449 } else {
450 uri
451 };
452
453 let local_store = object_store::local::LocalFileSystem::new_with_prefix(path)?;
454 Ok((Arc::new(local_store), String::new(), uri.to_string()))
455}
456
457#[cfg(feature = "cloud")]
459fn create_s3_store(
460 uri: &str,
461 storage_options: Option<AHashMap<String, String>>,
462) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
463 let (url, path) = parse_url_and_path(uri)?;
464 let bucket = extract_host(&url, "Invalid S3 URI: missing bucket")?;
465
466 let mut builder = object_store::aws::AmazonS3Builder::new().with_bucket_name(&bucket);
467
468 if let Some(options) = storage_options {
470 for (key, value) in options {
471 match key.as_str() {
472 "endpoint_url" => {
473 builder = builder.with_endpoint(&value);
474 }
475 "region" => {
476 builder = builder.with_region(&value);
477 }
478 "access_key_id" | "key" => {
479 builder = builder.with_access_key_id(&value);
480 }
481 "secret_access_key" | "secret" => {
482 builder = builder.with_secret_access_key(&value);
483 }
484 "session_token" | "token" => {
485 builder = builder.with_token(&value);
486 }
487 "allow_http" => {
488 let allow_http = value.to_lowercase() == "true";
489 builder = builder.with_allow_http(allow_http);
490 }
491 _ => {
492 log::warn!("Unknown S3 storage option: {key}");
494 }
495 }
496 }
497 }
498
499 let s3_store = builder.build()?;
500 Ok((Arc::new(s3_store), path, uri.to_string()))
501}
502
503#[cfg(feature = "cloud")]
505fn create_gcs_store(
506 uri: &str,
507 storage_options: Option<AHashMap<String, String>>,
508) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
509 let (url, path) = parse_url_and_path(uri)?;
510 let bucket = extract_host(&url, "Invalid GCS URI: missing bucket")?;
511
512 let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new().with_bucket_name(&bucket);
513
514 if let Some(options) = storage_options {
516 for (key, value) in options {
517 match key.as_str() {
518 "service_account_path" => {
519 builder = builder.with_service_account_path(&value);
520 }
521 "service_account_key" => {
522 builder = builder.with_service_account_key(&value);
523 }
524 "project_id" => {
525 log::warn!(
528 "project_id should be set via service account or environment variables"
529 );
530 }
531 "application_credentials" => {
532 unsafe {
537 std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", &value);
538 }
539 }
540 _ => {
541 log::warn!("Unknown GCS storage option: {key}");
543 }
544 }
545 }
546 }
547
548 let gcs_store = builder.build()?;
549 Ok((Arc::new(gcs_store), path, uri.to_string()))
550}
551
552#[cfg(feature = "cloud")]
554fn create_azure_store(
555 uri: &str,
556 storage_options: Option<AHashMap<String, String>>,
557) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
558 let (url, _) = parse_url_and_path(uri)?;
559 let container = extract_host(&url, "Invalid Azure URI: missing container")?;
560
561 let path = url.path().trim_start_matches('/').to_string();
562
563 let mut builder =
564 object_store::azure::MicrosoftAzureBuilder::new().with_container_name(container);
565
566 if let Some(options) = storage_options {
568 for (key, value) in options {
569 match key.as_str() {
570 "account_name" => {
571 builder = builder.with_account(&value);
572 }
573 "account_key" => {
574 builder = builder.with_access_key(&value);
575 }
576 "sas_token" => {
577 let query_pairs: Vec<(String, String)> = value
579 .split('&')
580 .filter_map(|pair| {
581 let mut parts = pair.split('=');
582 match (parts.next(), parts.next()) {
583 (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
584 _ => None,
585 }
586 })
587 .collect();
588 builder = builder.with_sas_authorization(query_pairs);
589 }
590 "client_id" => {
591 builder = builder.with_client_id(&value);
592 }
593 "client_secret" => {
594 builder = builder.with_client_secret(&value);
595 }
596 "tenant_id" => {
597 builder = builder.with_tenant_id(&value);
598 }
599 _ => {
600 log::warn!("Unknown Azure storage option: {key}");
602 }
603 }
604 }
605 }
606
607 let azure_store = builder.build()?;
608 Ok((Arc::new(azure_store), path, uri.to_string()))
609}
610
611#[cfg(feature = "cloud")]
613fn create_abfs_store(
614 uri: &str,
615 storage_options: Option<AHashMap<String, String>>,
616) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
617 let (url, path) = parse_url_and_path(uri)?;
618 let host = extract_host(&url, "Invalid ABFS URI: missing host")?;
619
620 let account = host
622 .split('.')
623 .next()
624 .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: cannot extract account from host"))?;
625
626 let container = url
628 .username()
629 .split('@')
630 .next()
631 .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: missing container"))?;
632
633 let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
634 .with_account(account)
635 .with_container_name(container);
636
637 if let Some(options) = storage_options {
639 for (key, value) in options {
640 match key.as_str() {
641 "account_name" => {
642 builder = builder.with_account(&value);
643 }
644 "account_key" => {
645 builder = builder.with_access_key(&value);
646 }
647 "sas_token" => {
648 let query_pairs: Vec<(String, String)> = value
650 .split('&')
651 .filter_map(|pair| {
652 let mut parts = pair.split('=');
653 match (parts.next(), parts.next()) {
654 (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
655 _ => None,
656 }
657 })
658 .collect();
659 builder = builder.with_sas_authorization(query_pairs);
660 }
661 "client_id" => {
662 builder = builder.with_client_id(&value);
663 }
664 "client_secret" => {
665 builder = builder.with_client_secret(&value);
666 }
667 "tenant_id" => {
668 builder = builder.with_tenant_id(&value);
669 }
670 _ => {
671 log::warn!("Unknown ABFS storage option: {key}");
673 }
674 }
675 }
676 }
677
678 let azure_store = builder.build()?;
679 Ok((Arc::new(azure_store), path, uri.to_string()))
680}
681
682#[cfg(feature = "cloud")]
684fn create_http_store(
685 uri: &str,
686 storage_options: Option<AHashMap<String, String>>,
687) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
688 let (url, path) = parse_url_and_path(uri)?;
689 let base_url = format!("{}://{}", url.scheme(), url.host_str().unwrap_or(""));
690
691 let builder = object_store::http::HttpBuilder::new().with_url(base_url);
692
693 if let Some(options) = storage_options {
695 for (key, _value) in options {
696 log::warn!("Unknown HTTP storage option: {key}");
700 }
701 }
702
703 let http_store = builder.build()?;
704 Ok((Arc::new(http_store), path, uri.to_string()))
705}
706
707#[cfg(feature = "cloud")]
709fn parse_url_and_path(uri: &str) -> anyhow::Result<(url::Url, String)> {
710 let url = url::Url::parse(uri)?;
711 let path = url.path().trim_start_matches('/').to_string();
712 Ok((url, path))
713}
714
715#[cfg(feature = "cloud")]
717fn extract_host(url: &url::Url, error_msg: &str) -> anyhow::Result<String> {
718 url.host_str()
719 .map(ToString::to_string)
720 .ok_or_else(|| anyhow::anyhow!("{error_msg}"))
721}
722
723#[cfg(test)]
724mod tests {
725 #[cfg(feature = "cloud")]
726 use ahash::AHashMap;
727 use rstest::rstest;
728
729 use super::*;
730
731 #[rstest]
732 fn test_create_object_store_from_path_local() {
733 let temp_dir = std::env::temp_dir().join("nautilus_test");
735 std::fs::create_dir_all(&temp_dir).unwrap();
736
737 let result = create_object_store_from_path(temp_dir.to_str().unwrap(), None);
738 if let Err(e) = &result {
739 println!("Error: {e:?}");
740 }
741 assert!(result.is_ok());
742 let (_, base_path, uri) = result.unwrap();
743 assert_eq!(base_path, "");
744 assert_eq!(uri, format!("file://{}", temp_dir.to_str().unwrap()));
746
747 std::fs::remove_dir_all(&temp_dir).ok();
749 }
750
751 #[rstest]
752 #[cfg(feature = "cloud")]
753 fn test_create_object_store_from_path_s3() {
754 let mut options = AHashMap::new();
755 options.insert(
756 "endpoint_url".to_string(),
757 "https://test.endpoint.com".to_string(),
758 );
759 options.insert("region".to_string(), "us-west-2".to_string());
760 options.insert("access_key_id".to_string(), "test_key".to_string());
761 options.insert("secret_access_key".to_string(), "test_secret".to_string());
762
763 let result = create_object_store_from_path("s3://test-bucket/path", Some(options));
764 assert!(result.is_ok());
765 let (_, base_path, uri) = result.unwrap();
766 assert_eq!(base_path, "path");
767 assert_eq!(uri, "s3://test-bucket/path");
768 }
769
770 #[rstest]
771 #[cfg(feature = "cloud")]
772 fn test_create_object_store_from_path_azure() {
773 let mut options = AHashMap::new();
774 options.insert("account_name".to_string(), "testaccount".to_string());
775 options.insert("account_key".to_string(), "dGVzdGtleQ==".to_string()); let result = create_object_store_from_path("az://container/path", Some(options));
779 if let Err(e) = &result {
780 println!("Azure Error: {e:?}");
781 }
782 assert!(result.is_ok());
783 let (_, base_path, uri) = result.unwrap();
784 assert_eq!(base_path, "path");
785 assert_eq!(uri, "az://container/path");
786 }
787
788 #[rstest]
789 #[cfg(feature = "cloud")]
790 fn test_create_object_store_from_path_gcs() {
791 let mut options = AHashMap::new();
793 options.insert("project_id".to_string(), "test-project".to_string());
794
795 let result = create_object_store_from_path("gs://test-bucket/path", Some(options));
796 match result {
799 Ok((_, base_path, uri)) => {
800 assert_eq!(base_path, "path");
801 assert_eq!(uri, "gs://test-bucket/path");
802 }
803 Err(e) => {
804 let error_msg = format!("{e:?}");
806 assert!(error_msg.contains("test-bucket") || error_msg.contains("credential"));
807 }
808 }
809 }
810
811 #[rstest]
812 #[cfg(feature = "cloud")]
813 fn test_create_object_store_from_path_empty_options() {
814 let result = create_object_store_from_path("s3://test-bucket/path", None);
815 assert!(result.is_ok());
816 let (_, base_path, uri) = result.unwrap();
817 assert_eq!(base_path, "path");
818 assert_eq!(uri, "s3://test-bucket/path");
819 }
820
821 #[rstest]
822 #[cfg(feature = "cloud")]
823 fn test_parse_url_and_path() {
824 let result = parse_url_and_path("s3://bucket/path/to/file");
825 assert!(result.is_ok());
826 let (url, path) = result.unwrap();
827 assert_eq!(url.scheme(), "s3");
828 assert_eq!(url.host_str().unwrap(), "bucket");
829 assert_eq!(path, "path/to/file");
830 }
831
832 #[rstest]
833 #[cfg(feature = "cloud")]
834 fn test_extract_host() {
835 let url = url::Url::parse("s3://test-bucket/path").unwrap();
836 let result = extract_host(&url, "Test error");
837 assert!(result.is_ok());
838 assert_eq!(result.unwrap(), "test-bucket");
839 }
840
841 #[rstest]
842 fn test_normalize_path_to_uri() {
843 assert_eq!(normalize_path_to_uri("/tmp/test"), "file:///tmp/test");
845
846 assert_eq!(
848 normalize_path_to_uri("C:\\tmp\\test"),
849 "file:///C:/tmp/test"
850 );
851 assert_eq!(normalize_path_to_uri("C:/tmp/test"), "file:///C:/tmp/test");
852 assert_eq!(
853 normalize_path_to_uri("D:\\data\\file.txt"),
854 "file:///D:/data/file.txt"
855 );
856
857 assert_eq!(
859 normalize_path_to_uri("\\\\server\\share\\file"),
860 "file://server/share/file"
861 );
862
863 assert_eq!(
865 normalize_path_to_uri("s3://bucket/path"),
866 "s3://bucket/path"
867 );
868 assert_eq!(
869 normalize_path_to_uri("file:///tmp/test"),
870 "file:///tmp/test"
871 );
872 assert_eq!(
873 normalize_path_to_uri("https://example.com/path"),
874 "https://example.com/path"
875 );
876 }
877
878 #[rstest]
879 fn test_is_absolute_path() {
880 assert!(is_absolute_path("/tmp/test"));
882 assert!(is_absolute_path("/"));
883
884 assert!(is_absolute_path("C:\\tmp\\test"));
886 assert!(is_absolute_path("C:/tmp/test"));
887 assert!(is_absolute_path("D:\\"));
888 assert!(is_absolute_path("Z:/"));
889
890 assert!(is_absolute_path("\\\\server\\share"));
892 assert!(is_absolute_path("\\\\localhost\\c$"));
893
894 assert!(!is_absolute_path("tmp/test"));
896 assert!(!is_absolute_path("./test"));
897 assert!(!is_absolute_path("../test"));
898 assert!(!is_absolute_path("test.txt"));
899
900 assert!(!is_absolute_path(""));
902 assert!(!is_absolute_path("C"));
903 assert!(!is_absolute_path("C:"));
904 assert!(!is_absolute_path("\\"));
905 }
906
907 #[rstest]
908 fn test_path_to_file_uri() {
909 assert_eq!(path_to_file_uri("/tmp/test"), "file:///tmp/test");
911 assert_eq!(path_to_file_uri("/"), "file:///");
912
913 assert_eq!(path_to_file_uri("C:\\tmp\\test"), "file:///C:/tmp/test");
915 assert_eq!(path_to_file_uri("C:/tmp/test"), "file:///C:/tmp/test");
916 assert_eq!(path_to_file_uri("D:\\"), "file:///D:/");
917
918 assert_eq!(
920 path_to_file_uri("\\\\server\\share\\file"),
921 "file://server/share/file"
922 );
923 assert_eq!(
924 path_to_file_uri("\\\\localhost\\c$\\test"),
925 "file://localhost/c$/test"
926 );
927 }
928}