diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs index c90b1e48584..6fa39f9f68b 100644 --- a/core/src/raw/http_util/mod.rs +++ b/core/src/raw/http_util/mod.rs @@ -55,6 +55,7 @@ pub use header::parse_prefixed_headers; mod uri; pub use uri::percent_decode_path; pub use uri::percent_encode_path; +pub use uri::query_pairs; mod error; pub use error::new_request_build_error; diff --git a/core/src/raw/http_util/uri.rs b/core/src/raw/http_util/uri.rs index 1f3b893e035..1339494f8fb 100644 --- a/core/src/raw/http_util/uri.rs +++ b/core/src/raw/http_util/uri.rs @@ -58,6 +58,25 @@ pub fn percent_decode_path(path: &str) -> String { } } +/// query_pairs will parse a query string encoded as key-value pairs separated by `&` to a vector of key-value pairs. +/// It also does percent decoding for both key and value. +/// +/// Note that `?` is not allowed in the query string, and it will be treated as a part of the first key if included. +pub fn query_pairs(query: &str) -> Vec<(String, String)> { + query + .split('&') + .filter_map(|pair| { + let mut iter = pair.splitn(2, '='); + // TODO: should we silently ignore invalid pairs and filter them out without the user noticing? + // or should we return an error in the whole `query_pairs` function so the caller knows it failed? + let key = iter.next()?; + let value = iter.next().unwrap_or(""); + Some((key, value)) + }) + .map(|(key, value)| (percent_decode_path(key), percent_decode_path(value))) + .collect() +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/src/types/builder.rs b/core/src/types/builder.rs index 43b05e582dc..a57ea715239 100644 --- a/core/src/types/builder.rs +++ b/core/src/types/builder.rs @@ -17,11 +17,13 @@ use std::fmt::Debug; +use http::Uri; use serde::de::DeserializeOwned; use serde::Serialize; use crate::raw::*; use crate::*; +use types::operator::{OperatorFactory, OperatorRegistry}; /// Builder is used to set up underlying services. /// @@ -56,6 +58,15 @@ pub trait Builder: Default + 'static { /// Consume the accessor builder to build a service. fn build(self) -> Result; + + /// Register this builder in the given registry. + fn register_in_registry(registry: &mut OperatorRegistry) { + let operator_factory: OperatorFactory = |uri, options| { + let builder = Self::Config::from_uri(uri, options)?.into_builder(); + Ok(Operator::new(builder)?.finish()) + }; + registry.register(Self::SCHEME.into_static(), operator_factory) + } } /// Dummy implementation of builder @@ -137,6 +148,32 @@ pub trait Configurator: Serialize + DeserializeOwned + Debug + 'static { }) } + /// TODO: document this. + fn from_uri(uri: &str, options: impl IntoIterator) -> Result { + let parsed_uri = uri.parse::().map_err(|err| { + Error::new(ErrorKind::ConfigInvalid, "uri is invalid") + .with_context("uri", uri) + .set_source(err) + })?; + + // TODO: I have some doubts about this default implementation + // It was inspired from https://github.com/apache/opendal/blob/52c96bb8e8cb4d024ccab1f415c4756447c726dd/bin/ofs/src/main.rs#L60 + // Parameters should be specified in uri's query param. Example: 'fs://?root=' + // this is very similar to https://github.com/apache/opendal/blob/52c96bb8e8cb4d024ccab1f415c4756447c726dd/bin/ofs/README.md?plain=1#L45 + let query_pairs = parsed_uri.query().map(query_pairs).unwrap_or_default(); + + // TODO: should we log a warning if the query_pairs vector is empty? + + // TODO: we are not interpreting the host or path params + // the `let op = Operator::from_uri("fs:///tmp/test", vec![])?;` statement from the RFC wont work. + // instead we should use `let op = Operator::from_uri("fs://?root=/tmp/test", vec![])?;` as done + // in `ofs`. The `fs` service should override this default implementation if it wants to use the host or path params? + + let merged_options = query_pairs.into_iter().chain(options); + + Self::from_iter(merged_options) + } + /// Convert this configuration into a service builder. fn into_builder(self) -> Self::Builder; } diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 4393cd5e020..bc54b61ef2b 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -21,6 +21,8 @@ use std::sync::Arc; use crate::layers::*; use crate::raw::*; use crate::*; +// TODO: is this import path idiomatic to the project? +use super::registry::GLOBAL_OPERATOR_REGISTRY; /// # Operator build API /// @@ -95,6 +97,27 @@ impl Operator { Ok(OperatorBuilder::new(acc)) } + /// TODO: document this. + /// + /// TODO: improve those examples + /// TODO: this test does not work. It always output Ok + /// # Examples + /// ``` + /// # use anyhow::Result; + /// use opendal::Operator; + /// + /// fn test() -> Result<()> { + /// Operator::from_uri("fs://?root=/tmp/test", vec![])?; + /// Ok(()) + /// } + /// ``` + pub fn from_uri( + uri: &str, + options: impl IntoIterator, + ) -> Result { + GLOBAL_OPERATOR_REGISTRY.with(|registry| registry.parse(uri, options)) + } + /// Create a new operator from given iterator in static dispatch. /// /// # Notes diff --git a/core/src/types/operator/mod.rs b/core/src/types/operator/mod.rs index 879f113008c..5b2ea2f3115 100644 --- a/core/src/types/operator/mod.rs +++ b/core/src/types/operator/mod.rs @@ -32,3 +32,11 @@ pub use metadata::OperatorInfo; pub mod operator_functions; pub mod operator_futures; + +// TODO: should we make the registry module public or export the OperatorFactory and OperatorRegistry +// types directly? +mod registry; +// TODO: warning as not used. How can we expose them as public api? +pub use registry::OperatorFactory; +pub use registry::OperatorRegistry; +pub use registry::GLOBAL_OPERATOR_REGISTRY; diff --git a/core/src/types/operator/registry.rs b/core/src/types/operator/registry.rs new file mode 100644 index 00000000000..28deadf3f1c --- /dev/null +++ b/core/src/types/operator/registry.rs @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cell::LazyCell; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use http::Uri; + +use crate::services::*; +use crate::*; + +// TODO: thread local or use LazyLock instead? this way the access is lock-free +// TODO: should we expose the `GLOBAL_OPERATOR_REGISTRY` as public API at `crate::types::operator::GLOBAL_OPERATOR_REGISTRY`? +thread_local! { + pub static GLOBAL_OPERATOR_REGISTRY: LazyCell = LazyCell::new(OperatorRegistry::new); +} + +// In order to reduce boilerplate, we should return in this function a `Builder` instead of operator?. +pub type OperatorFactory = fn(&str, HashMap) -> Result; + +// TODO: the default implementation should return an empty registry? Or it should return the initialized +// registry with all the services that are enabled? If so, should we include an `OperatorRegistry::empty` method +// that allows users to create an empty registry? +#[derive(Clone, Debug, Default)] +pub struct OperatorRegistry { + registry: Arc>>, +} + +impl OperatorRegistry { + pub fn new() -> Self { + let mut registry = Self::default(); + // TODO: is this correct? have a `Builder::enabled()` method that returns the set of enabled services builders? + // Similar to `Scheme::Enabled()` + // or have an `Scheme::associated_builder` that given a scheme returns the associated builder? The problem with this + // is that `Scheme` variants are not gate behind a feature gate and the associated builder is. As a workaround + + // TODO: it seems too error-prone to have this list manually updated, we should have a macro that generates this list? + // it could be something like: + // + // ```rust + // apply_for_all_services!{ + // $service::register_in_registry(&mut registry>(); + // } + // ``` + // and the apply_for_all_services macro would gate every statement behind the corresponding feature gate + // This seems to not be the place where we should have a "list of enabled services". + #[cfg(feature = "services-aliyun-drive")] + AliyunDrive::register_in_registry(&mut registry); + #[cfg(feature = "services-atomicserver")] + Atomicserver::register_in_registry(&mut registry); + #[cfg(feature = "services-alluxio")] + Alluxio::register_in_registry(&mut registry); + #[cfg(feature = "services-azblob")] + Azblob::register_in_registry(&mut registry); + #[cfg(feature = "services-azdls")] + Azdls::register_in_registry(&mut registry); + #[cfg(feature = "services-azfile")] + Azfile::register_in_registry(&mut registry); + #[cfg(feature = "services-b2")] + B2::register_in_registry(&mut registry); + #[cfg(feature = "services-cacache")] + Cacache::register_in_registry(&mut registry); + #[cfg(feature = "services-cos")] + Cos::register_in_registry(&mut registry); + #[cfg(feature = "services-compfs")] + Compfs::register_in_registry(&mut registry); + #[cfg(feature = "services-dashmap")] + Dashmap::register_in_registry(&mut registry); + #[cfg(feature = "services-dropbox")] + Dropbox::register_in_registry(&mut registry); + #[cfg(feature = "services-etcd")] + Etcd::register_in_registry(&mut registry); + #[cfg(feature = "services-foundationdb")] + Foundationdb::register_in_registry(&mut registry); + #[cfg(feature = "services-fs")] + Fs::register_in_registry(&mut registry); + #[cfg(feature = "services-ftp")] + Ftp::register_in_registry(&mut registry); + #[cfg(feature = "services-gcs")] + Gcs::register_in_registry(&mut registry); + #[cfg(feature = "services-ghac")] + Ghac::register_in_registry(&mut registry); + #[cfg(feature = "services-hdfs")] + Hdfs::register_in_registry(&mut registry); + #[cfg(feature = "services-http")] + Http::register_in_registry(&mut registry); + #[cfg(feature = "services-huggingface")] + Huggingface::register_in_registry(&mut registry); + #[cfg(feature = "services-ipfs")] + Ipfs::register_in_registry(&mut registry); + #[cfg(feature = "services-ipmfs")] + Ipmfs::register_in_registry(&mut registry); + #[cfg(feature = "services-icloud")] + Icloud::register_in_registry(&mut registry); + #[cfg(feature = "services-libsql")] + Libsql::register_in_registry(&mut registry); + #[cfg(feature = "services-memcached")] + Memcached::register_in_registry(&mut registry); + #[cfg(feature = "services-memory")] + Memory::register_in_registry(&mut registry); + #[cfg(feature = "services-mini-moka")] + MiniMoka::register_in_registry(&mut registry); + #[cfg(feature = "services-moka")] + Moka::register_in_registry(&mut registry); + #[cfg(feature = "services-monoiofs")] + Monoiofs::register_in_registry(&mut registry); + #[cfg(feature = "services-mysql")] + Mysql::register_in_registry(&mut registry); + #[cfg(feature = "services-obs")] + Obs::register_in_registry(&mut registry); + #[cfg(feature = "services-onedrive")] + Onedrive::register_in_registry(&mut registry); + #[cfg(feature = "services-postgresql")] + Postgresql::register_in_registry(&mut registry); + #[cfg(feature = "services-gdrive")] + Gdrive::register_in_registry(&mut registry); + #[cfg(feature = "services-oss")] + Oss::register_in_registry(&mut registry); + #[cfg(feature = "services-persy")] + Persy::register_in_registry(&mut registry); + #[cfg(feature = "services-redis")] + Redis::register_in_registry(&mut registry); + #[cfg(feature = "services-rocksdb")] + Rocksdb::register_in_registry(&mut registry); + #[cfg(feature = "services-s3")] + S3::register_in_registry(&mut registry); + #[cfg(feature = "services-seafile")] + Seafile::register_in_registry(&mut registry); + #[cfg(feature = "services-upyun")] + Upyun::register_in_registry(&mut registry); + #[cfg(feature = "services-yandex-disk")] + YandexDisk::register_in_registry(&mut registry); + #[cfg(feature = "services-pcloud")] + Pcloud::register_in_registry(&mut registry); + #[cfg(feature = "services-sftp")] + Sftp::register_in_registry(&mut registry); + #[cfg(feature = "services-sled")] + Sled::register_in_registry(&mut registry); + #[cfg(feature = "services-sqlite")] + Sqlite::register_in_registry(&mut registry); + #[cfg(feature = "services-supabase")] + Supabase::register_in_registry(&mut registry); + #[cfg(feature = "services-swift")] + Swift::register_in_registry(&mut registry); + #[cfg(feature = "services-tikv")] + Tikv::register_in_registry(&mut registry); + #[cfg(feature = "services-vercel-artifacts")] + VercelArtifacts::register_in_registry(&mut registry); + #[cfg(feature = "services-vercel-blob")] + VercelBlob::register_in_registry(&mut registry); + #[cfg(feature = "services-webdav")] + Webdav::register_in_registry(&mut registry); + #[cfg(feature = "services-webhdfs")] + Webhdfs::register_in_registry(&mut registry); + #[cfg(feature = "services-redb")] + Redb::register_in_registry(&mut registry); + #[cfg(feature = "services-mongodb")] + Mongodb::register_in_registry(&mut registry); + #[cfg(feature = "services-hdfs-native")] + HdfsNative::register_in_registry(&mut registry); + #[cfg(feature = "services-surrealdb")] + Surrealdb::register_in_registry(&mut registry); + #[cfg(feature = "services-lakefs")] + Lakefs::register_in_registry(&mut registry); + #[cfg(feature = "services-nebula-graph")] + NebulaGraph::register_in_registry(&mut registry); + + registry + } + + pub fn register(&mut self, scheme: &str, factory: OperatorFactory) { + // TODO: should we receive a `&str` or a `String`? we are cloning it anyway + self.registry + .lock() + .expect("poisoned lock") + .insert(scheme.to_string(), factory); + } + + pub fn parse( + &self, + uri: &str, + options: impl IntoIterator, + ) -> Result { + // TODO: we use the `url::Url` struct instead of `http:Uri`, because + // we needed it in `Configurator::from_uri` method. + let parsed_uri = uri.parse::().map_err(|err| { + Error::new(ErrorKind::ConfigInvalid, "uri is invalid") + .with_context("uri", uri) + .set_source(err) + })?; + + let scheme = parsed_uri.scheme_str().ok_or_else(|| { + Error::new(ErrorKind::ConfigInvalid, "uri is missing scheme").with_context("uri", uri) + })?; + + let registry_lock = self.registry.lock().expect("poisoned lock"); + let factory = registry_lock.get(scheme).ok_or_else(|| { + Error::new( + ErrorKind::ConfigInvalid, + "could not find any operator factory for the given scheme", + ) + .with_context("uri", uri) + .with_context("scheme", scheme) + })?; + + // TODO: `OperatorFactory` should receive `IntoIterator` instead of `HashMap`? + // however, impl Traits in type aliases is unstable and also are not allowed in fn pointers + let options = options.into_iter().collect(); + + factory(uri, options) + } +}