diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index ddc33b43d134..d3af3d5596dc 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1261,6 +1261,7 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-physical-plan", + "parking_lot", ] [[package]] diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs index c4636f1ce0e0..9b9afc1c2420 100644 --- a/datafusion-cli/src/catalog.rs +++ b/datafusion-cli/src/catalog.rs @@ -21,10 +21,9 @@ use std::sync::{Arc, Weak}; use crate::object_storage::{get_object_store, AwsOptions, GcpOptions}; use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider}; + use datafusion::common::plan_datafusion_err; -use datafusion::datasource::listing::{ - ListingTable, ListingTableConfig, ListingTableUrl, -}; +use datafusion::datasource::listing::ListingTableUrl; use datafusion::datasource::TableProvider; use datafusion::error::Result; use datafusion::execution::context::SessionState; @@ -34,14 +33,13 @@ use async_trait::async_trait; use dirs::home_dir; use parking_lot::RwLock; -/// Wraps another catalog, automatically creating table providers -/// for local files if needed -pub struct DynamicFileCatalog { +/// Wraps another catalog, automatically register require object stores for the file locations +pub struct DynamicObjectStoreCatalog { inner: Arc, state: Weak>, } -impl DynamicFileCatalog { +impl DynamicObjectStoreCatalog { pub fn new( inner: Arc, state: Weak>, @@ -50,7 +48,7 @@ impl DynamicFileCatalog { } } -impl CatalogProviderList for DynamicFileCatalog { +impl CatalogProviderList for DynamicObjectStoreCatalog { fn as_any(&self) -> &dyn Any { self } @@ -69,19 +67,19 @@ impl CatalogProviderList for DynamicFileCatalog { fn catalog(&self, name: &str) -> Option> { let state = self.state.clone(); - self.inner - .catalog(name) - .map(|catalog| Arc::new(DynamicFileCatalogProvider::new(catalog, state)) as _) + self.inner.catalog(name).map(|catalog| { + Arc::new(DynamicObjectStoreCatalogProvider::new(catalog, state)) as _ + }) } } /// Wraps another catalog provider -struct DynamicFileCatalogProvider { +struct DynamicObjectStoreCatalogProvider { inner: Arc, state: Weak>, } -impl DynamicFileCatalogProvider { +impl DynamicObjectStoreCatalogProvider { pub fn new( inner: Arc, state: Weak>, @@ -90,7 +88,7 @@ impl DynamicFileCatalogProvider { } } -impl CatalogProvider for DynamicFileCatalogProvider { +impl CatalogProvider for DynamicObjectStoreCatalogProvider { fn as_any(&self) -> &dyn Any { self } @@ -101,9 +99,9 @@ impl CatalogProvider for DynamicFileCatalogProvider { fn schema(&self, name: &str) -> Option> { let state = self.state.clone(); - self.inner - .schema(name) - .map(|schema| Arc::new(DynamicFileSchemaProvider::new(schema, state)) as _) + self.inner.schema(name).map(|schema| { + Arc::new(DynamicObjectStoreSchemaProvider::new(schema, state)) as _ + }) } fn register_schema( @@ -115,13 +113,14 @@ impl CatalogProvider for DynamicFileCatalogProvider { } } -/// Wraps another schema provider -struct DynamicFileSchemaProvider { +/// Wraps another schema provider. [DynamicObjectStoreSchemaProvider] is responsible for registering the required +/// object stores for the file locations. +struct DynamicObjectStoreSchemaProvider { inner: Arc, state: Weak>, } -impl DynamicFileSchemaProvider { +impl DynamicObjectStoreSchemaProvider { pub fn new( inner: Arc, state: Weak>, @@ -131,7 +130,7 @@ impl DynamicFileSchemaProvider { } #[async_trait] -impl SchemaProvider for DynamicFileSchemaProvider { +impl SchemaProvider for DynamicObjectStoreSchemaProvider { fn as_any(&self) -> &dyn Any { self } @@ -149,9 +148,11 @@ impl SchemaProvider for DynamicFileSchemaProvider { } async fn table(&self, name: &str) -> Result>> { - let inner_table = self.inner.table(name).await?; - if inner_table.is_some() { - return Ok(inner_table); + let inner_table = self.inner.table(name).await; + if inner_table.is_ok() { + if let Some(inner_table) = inner_table? { + return Ok(Some(inner_table)); + } } // if the inner schema provider didn't have a table by @@ -201,16 +202,7 @@ impl SchemaProvider for DynamicFileSchemaProvider { state.runtime_env().register_object_store(url, store); } } - - let config = match ListingTableConfig::new(table_url).infer(&state).await { - Ok(cfg) => cfg, - Err(_) => { - // treat as non-existing - return Ok(None); - } - }; - - Ok(Some(Arc::new(ListingTable::try_new(config)?))) + self.inner.table(name).await } fn deregister_table(&self, name: &str) -> Result>> { @@ -221,7 +213,8 @@ impl SchemaProvider for DynamicFileSchemaProvider { self.inner.table_exist(name) } } -fn substitute_tilde(cur: String) -> String { + +pub fn substitute_tilde(cur: String) -> String { if let Some(usr_dir_path) = home_dir() { if let Some(usr_dir) = usr_dir_path.to_str() { if cur.starts_with('~') && !usr_dir.is_empty() { @@ -231,9 +224,9 @@ fn substitute_tilde(cur: String) -> String { } cur } - #[cfg(test)] mod tests { + use super::*; use datafusion::catalog::SchemaProvider; @@ -241,12 +234,12 @@ mod tests { fn setup_context() -> (SessionContext, Arc) { let ctx = SessionContext::new(); - ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new( + ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new( ctx.state().catalog_list().clone(), ctx.state_weak_ref(), ))); - let provider = &DynamicFileCatalog::new( + let provider = &DynamicObjectStoreCatalog::new( ctx.state().catalog_list().clone(), ctx.state_weak_ref(), ) as &dyn CatalogProviderList; @@ -269,7 +262,7 @@ mod tests { let (ctx, schema) = setup_context(); // That's a non registered table so expecting None here - let table = schema.table(&location).await.unwrap(); + let table = schema.table(&location).await?; assert!(table.is_none()); // It should still create an object store for the location in the SessionState @@ -293,7 +286,7 @@ mod tests { let (ctx, schema) = setup_context(); - let table = schema.table(&location).await.unwrap(); + let table = schema.table(&location).await?; assert!(table.is_none()); let store = ctx @@ -315,7 +308,7 @@ mod tests { let (ctx, schema) = setup_context(); - let table = schema.table(&location).await.unwrap(); + let table = schema.table(&location).await?; assert!(table.is_none()); let store = ctx @@ -337,6 +330,7 @@ mod tests { assert!(schema.table(location).await.is_err()); } + #[cfg(not(target_os = "windows"))] #[test] fn test_substitute_tilde() { diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 6e94e6ea4186..cdefada5e24a 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -26,7 +26,7 @@ use datafusion::execution::context::SessionConfig; use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool}; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::prelude::SessionContext; -use datafusion_cli::catalog::DynamicFileCatalog; +use datafusion_cli::catalog::DynamicObjectStoreCatalog; use datafusion_cli::functions::ParquetMetadataFunc; use datafusion_cli::{ exec, @@ -173,11 +173,13 @@ async fn main_inner() -> Result<()> { let runtime_env = create_runtime_env(rt_config.clone())?; + // enable dynamic file query let ctx = - SessionContext::new_with_config_rt(session_config.clone(), Arc::new(runtime_env)); + SessionContext::new_with_config_rt(session_config.clone(), Arc::new(runtime_env)) + .enable_url_table(); ctx.refresh_catalogs().await?; - // install dynamic catalog provider that knows how to open files - ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new( + // install dynamic catalog provider that can register required object stores + ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new( ctx.state().catalog_list().clone(), ctx.state_weak_ref(), ))); diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe.rs index ea01c53b1c62..d7e0068ef88f 100644 --- a/datafusion-examples/examples/dataframe.rs +++ b/datafusion-examples/examples/dataframe.rs @@ -64,6 +64,12 @@ async fn main() -> Result<()> { .await?; parquet_df.describe().await.unwrap().show().await?; + let dyn_ctx = ctx.enable_url_table(); + let df = dyn_ctx + .sql(&format!("SELECT * FROM '{}'", file_path.to_str().unwrap())) + .await?; + df.show().await?; + Ok(()) } diff --git a/datafusion-examples/examples/external_dependency/query-aws-s3.rs b/datafusion-examples/examples/external_dependency/query-aws-s3.rs index e32286e30e4f..9c4d76703c9c 100644 --- a/datafusion-examples/examples/external_dependency/query-aws-s3.rs +++ b/datafusion-examples/examples/external_dependency/query-aws-s3.rs @@ -63,5 +63,14 @@ async fn main() -> Result<()> { // print the results df.show().await?; + // dynamic query by the file path + ctx.enable_url_table(); + let df = ctx + .sql(format!(r#"SELECT * FROM '{}' LIMIT 10"#, &path).as_str()) + .await?; + + // print the results + df.show().await?; + Ok(()) } diff --git a/datafusion/catalog/Cargo.toml b/datafusion/catalog/Cargo.toml index 533bd1eeba08..f9801352087d 100644 --- a/datafusion/catalog/Cargo.toml +++ b/datafusion/catalog/Cargo.toml @@ -34,6 +34,7 @@ datafusion-common = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-physical-plan = { workspace = true } +parking_lot = { workspace = true } [lints] workspace = true diff --git a/datafusion/catalog/src/dynamic_file/catalog.rs b/datafusion/catalog/src/dynamic_file/catalog.rs new file mode 100644 index 000000000000..cd586446f82c --- /dev/null +++ b/datafusion/catalog/src/dynamic_file/catalog.rs @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`DynamicFileCatalog`] that creates tables from file paths + +use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider}; +use async_trait::async_trait; +use std::any::Any; +use std::sync::Arc; + +/// Wrap another catalog provider list +pub struct DynamicFileCatalog { + /// The inner catalog provider list + inner: Arc, + /// The factory that can create a table provider from the file path + factory: Arc, +} + +impl DynamicFileCatalog { + pub fn new( + inner: Arc, + factory: Arc, + ) -> Self { + Self { inner, factory } + } +} + +impl CatalogProviderList for DynamicFileCatalog { + fn as_any(&self) -> &dyn Any { + self + } + + fn register_catalog( + &self, + name: String, + catalog: Arc, + ) -> Option> { + self.inner.register_catalog(name, catalog) + } + + fn catalog_names(&self) -> Vec { + self.inner.catalog_names() + } + + fn catalog(&self, name: &str) -> Option> { + self.inner.catalog(name).map(|catalog| { + Arc::new(DynamicFileCatalogProvider::new( + catalog, + Arc::clone(&self.factory), + )) as _ + }) + } +} + +/// Wraps another catalog provider +struct DynamicFileCatalogProvider { + /// The inner catalog provider + inner: Arc, + /// The factory that can create a table provider from the file path + factory: Arc, +} + +impl DynamicFileCatalogProvider { + pub fn new( + inner: Arc, + factory: Arc, + ) -> Self { + Self { inner, factory } + } +} + +impl CatalogProvider for DynamicFileCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + self.inner.schema_names() + } + + fn schema(&self, name: &str) -> Option> { + self.inner.schema(name).map(|schema| { + Arc::new(DynamicFileSchemaProvider::new( + schema, + Arc::clone(&self.factory), + )) as _ + }) + } + + fn register_schema( + &self, + name: &str, + schema: Arc, + ) -> datafusion_common::Result>> { + self.inner.register_schema(name, schema) + } +} + +/// Implements the [DynamicFileSchemaProvider] that can create tables provider from the file path. +/// +/// The provider will try to create a table provider from the file path if the table provider +/// isn't exist in the inner schema provider. +pub struct DynamicFileSchemaProvider { + /// The inner schema provider + inner: Arc, + /// The factory that can create a table provider from the file path + factory: Arc, +} + +impl DynamicFileSchemaProvider { + /// Create a new [DynamicFileSchemaProvider] with the given inner schema provider. + pub fn new( + inner: Arc, + factory: Arc, + ) -> Self { + Self { inner, factory } + } +} + +#[async_trait] +impl SchemaProvider for DynamicFileSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.inner.table_names() + } + + async fn table( + &self, + name: &str, + ) -> datafusion_common::Result>> { + if let Some(table) = self.inner.table(name).await? { + return Ok(Some(table)); + }; + + self.factory.try_new(name).await + } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> datafusion_common::Result>> { + self.inner.register_table(name, table) + } + + fn deregister_table( + &self, + name: &str, + ) -> datafusion_common::Result>> { + self.inner.deregister_table(name) + } + + fn table_exist(&self, name: &str) -> bool { + self.inner.table_exist(name) + } +} + +/// [UrlTableFactory] is a factory that can create a table provider from the given url. +#[async_trait] +pub trait UrlTableFactory: Sync + Send { + /// create a new table provider from the provided url + async fn try_new( + &self, + url: &str, + ) -> datafusion_common::Result>>; +} diff --git a/datafusion/catalog/src/dynamic_file/mod.rs b/datafusion/catalog/src/dynamic_file/mod.rs new file mode 100644 index 000000000000..59142333dd54 --- /dev/null +++ b/datafusion/catalog/src/dynamic_file/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub(crate) mod catalog; diff --git a/datafusion/catalog/src/lib.rs b/datafusion/catalog/src/lib.rs index fe76b5dc9c64..21630f267d2c 100644 --- a/datafusion/catalog/src/lib.rs +++ b/datafusion/catalog/src/lib.rs @@ -16,11 +16,13 @@ // under the License. mod catalog; +mod dynamic_file; mod schema; mod session; mod table; pub use catalog::*; +pub use dynamic_file::catalog::*; pub use schema::*; pub use session::*; pub use table::*; diff --git a/datafusion/catalog/src/session.rs b/datafusion/catalog/src/session.rs index 05d2684ed3e0..61d9c2d8a71e 100644 --- a/datafusion/catalog/src/session.rs +++ b/datafusion/catalog/src/session.rs @@ -24,9 +24,10 @@ use datafusion_execution::TaskContext; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF}; use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr}; +use parking_lot::{Mutex, RwLock}; use std::any::Any; use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, Weak}; /// Interface for accessing [`SessionState`] from the catalog. /// @@ -136,3 +137,34 @@ impl From<&dyn Session> for TaskContext { ) } } +type SessionRefLock = Arc>>>>; +/// The state store that stores the reference of the runtime session state. +pub struct SessionStore { + session: SessionRefLock, +} + +impl SessionStore { + /// Create a new [SessionStore] + pub fn new() -> Self { + Self { + session: Arc::new(Mutex::new(None)), + } + } + + /// Set the session state of the store + pub fn with_state(&self, state: Weak>) { + let mut lock = self.session.lock(); + *lock = Some(state); + } + + /// Get the current session of the store + pub fn get_session(&self) -> Weak> { + self.session.lock().clone().unwrap() + } +} + +impl Default for SessionStore { + fn default() -> Self { + Self::new() + } +} diff --git a/datafusion/core/src/datasource/dynamic_file.rs b/datafusion/core/src/datasource/dynamic_file.rs new file mode 100644 index 000000000000..a95f3abb939b --- /dev/null +++ b/datafusion/core/src/datasource/dynamic_file.rs @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! dynamic_file_schema contains an [`UrlTableFactory`] implementation that +//! can create a [`ListingTable`] from the given url. + +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion_catalog::{SessionStore, UrlTableFactory}; +use datafusion_common::plan_datafusion_err; + +use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl}; +use crate::datasource::TableProvider; +use crate::error::Result; +use crate::execution::context::SessionState; + +/// [DynamicListTableFactory] is a factory that can create a [ListingTable] from the given url. +#[derive(Default)] +pub struct DynamicListTableFactory { + /// The session store that contains the current session. + session_store: SessionStore, +} + +impl DynamicListTableFactory { + /// Create a new [DynamicListTableFactory] with the given state store. + pub fn new(session_store: SessionStore) -> Self { + Self { session_store } + } + + /// Get the session store. + pub fn session_store(&self) -> &SessionStore { + &self.session_store + } +} + +#[async_trait] +impl UrlTableFactory for DynamicListTableFactory { + async fn try_new(&self, url: &str) -> Result>> { + let Ok(table_url) = ListingTableUrl::parse(url) else { + return Ok(None); + }; + + let state = &self + .session_store() + .get_session() + .upgrade() + .and_then(|session| { + session + .read() + .as_any() + .downcast_ref::() + .cloned() + }) + .ok_or_else(|| plan_datafusion_err!("get current SessionStore error"))?; + + match ListingTableConfig::new(table_url.clone()) + .infer(state) + .await + { + Ok(cfg) => ListingTable::try_new(cfg) + .map(|table| Some(Arc::new(table) as Arc)), + Err(_) => Ok(None), + } + } +} diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 529bb799e23d..0ed53418fe32 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -22,6 +22,7 @@ pub mod avro_to_arrow; pub mod cte_worktable; pub mod default_table_source; +pub mod dynamic_file; pub mod empty; pub mod file_format; pub mod function; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 06dc797ae27a..621b214818e9 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -69,17 +69,18 @@ use datafusion_expr::{ // backwards compatibility pub use crate::execution::session_state::SessionState; +use crate::datasource::dynamic_file::DynamicListTableFactory; +use crate::execution::session_state::SessionStateBuilder; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use object_store::ObjectStore; -use parking_lot::RwLock; -use url::Url; - -use crate::execution::session_state::SessionStateBuilder; +use datafusion_catalog::{DynamicFileCatalog, SessionStore, UrlTableFactory}; pub use datafusion_execution::config::SessionConfig; pub use datafusion_execution::TaskContext; pub use datafusion_expr::execution_props::ExecutionProps; use datafusion_optimizer::{AnalyzerRule, OptimizerRule}; +use object_store::ObjectStore; +use parking_lot::RwLock; +use url::Url; mod avro; mod csv; @@ -356,6 +357,53 @@ impl SessionContext { } } + /// Enable dynamic file querying for the current session. + /// + /// This allows queries to directly access arbitrary file names via SQL like + /// `SELECT * from 'my_file.parquet'` + /// so it should only be enabled for systems that such access is not a security risk + /// + /// See [DynamicFileCatalog] for more details + /// + /// ``` + /// # use datafusion::prelude::*; + /// # use datafusion::{error::Result, assert_batches_eq}; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let ctx = SessionContext::new().enable_url_table(); + /// let results = ctx + /// .sql("SELECT a, MIN(b) FROM 'tests/data/example.csv' as example GROUP BY a LIMIT 100") + /// .await? + /// .collect() + /// .await?; + /// assert_batches_eq!( + /// &[ + /// "+---+----------------+", + /// "| a | min(example.b) |", + /// "+---+----------------+", + /// "| 1 | 2 |", + /// "+---+----------------+", + /// ], + /// &results + /// ); + /// # Ok(()) + /// # } + /// ``` + pub fn enable_url_table(&self) -> Self { + let state_ref = self.state(); + let factory = Arc::new(DynamicListTableFactory::new(SessionStore::new())); + let catalog_list = Arc::new(DynamicFileCatalog::new( + Arc::clone(state_ref.catalog_list()), + Arc::clone(&factory) as Arc, + )); + let new_state = SessionStateBuilder::new_from_existing(self.state()) + .with_catalog_list(catalog_list) + .build(); + let ctx = SessionContext::new_with_state(new_state); + factory.session_store().with_state(ctx.state_weak_ref()); + ctx + } + /// Creates a new `SessionContext` using the provided [`SessionState`] #[deprecated(since = "32.0.0", note = "Use SessionContext::new_with_state")] pub fn with_state(state: SessionState) -> Self { @@ -1790,6 +1838,38 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_dynamic_file_query() -> Result<()> { + let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let path = path.join("tests/tpch-csv/customer.csv"); + let url = format!("file://{}", path.display()); + let cfg = SessionConfig::new(); + let session_state = SessionStateBuilder::new() + .with_default_features() + .with_config(cfg) + .build(); + let ctx = SessionContext::new_with_state(session_state).enable_url_table(); + let result = plan_and_collect( + &ctx, + format!("select c_name from '{}' limit 3;", &url).as_str(), + ) + .await?; + + let actual = arrow::util::pretty::pretty_format_batches(&result) + .unwrap() + .to_string(); + let expected = r#"+--------------------+ +| c_name | ++--------------------+ +| Customer#000000002 | +| Customer#000000003 | +| Customer#000000004 | ++--------------------+"#; + assert_eq!(actual, expected); + + Ok(()) + } + #[tokio::test] async fn custom_query_planner() -> Result<()> { let runtime = Arc::new(RuntimeEnv::default()); diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index 224a0e18eac4..ef2fa863e6b0 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -98,6 +98,9 @@ impl TestContext { return None; } } + "dynamic_file.slt" => { + test_ctx.ctx = test_ctx.ctx.enable_url_table(); + } "joins.slt" => { info!("Registering partition table tables"); let example_udf = create_example_udf(); diff --git a/datafusion/sqllogictest/test_files/arrow_files.slt b/datafusion/sqllogictest/test_files/arrow_files.slt index 8cf3550fdb25..e66ba7477fc4 100644 --- a/datafusion/sqllogictest/test_files/arrow_files.slt +++ b/datafusion/sqllogictest/test_files/arrow_files.slt @@ -43,6 +43,11 @@ SELECT * FROM arrow_simple 3 baz false 4 NULL true +# Ensure that local files can not be read by default (a potential security issue) +# (url table is only supported when DynamicFileCatalog is enabled) +statement error DataFusion error: Error during planning: table 'datafusion.public.../core/tests/data/example.arrow' not found +SELECT * FROM '../core/tests/data/example.arrow'; + # ARROW partitioned table statement ok CREATE EXTERNAL TABLE arrow_partitioned ( diff --git a/datafusion/sqllogictest/test_files/csv_files.slt b/datafusion/sqllogictest/test_files/csv_files.slt index d6600e06dc1c..01d0f4ac39bd 100644 --- a/datafusion/sqllogictest/test_files/csv_files.slt +++ b/datafusion/sqllogictest/test_files/csv_files.slt @@ -50,6 +50,11 @@ id7 value7 id8 value8 id9 value9 +# Ensure that local files can not be read by default (a potential security issue) +# (url table is only supported when DynamicFileCatalog is enabled) +statement error DataFusion error: Error during planning: table 'datafusion.public.../core/tests/data/quote.csv' not found +select * from '../core/tests/data/quote.csv'; + query TT select * from csv_with_escape; ---- diff --git a/datafusion/sqllogictest/test_files/describe.slt b/datafusion/sqllogictest/test_files/describe.slt index a15c3a109cab..077e8e6474d1 100644 --- a/datafusion/sqllogictest/test_files/describe.slt +++ b/datafusion/sqllogictest/test_files/describe.slt @@ -57,7 +57,7 @@ statement ok DROP TABLE aggregate_simple; ########## -# Describe file (currently we can only describe file in datafusion-cli, fix this after issue (#4850) has been done) +# Describe file (we can only describe file if the default catalog is `DynamicFileCatalog`) ########## statement error Error during planning: table 'datafusion.public.../core/tests/data/aggregate_simple.csv' not found diff --git a/datafusion/sqllogictest/test_files/dynamic_file.slt b/datafusion/sqllogictest/test_files/dynamic_file.slt new file mode 100644 index 000000000000..e177fd3de243 --- /dev/null +++ b/datafusion/sqllogictest/test_files/dynamic_file.slt @@ -0,0 +1,106 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# Note: This file runs with a SessionContext that has the `enable_url_table` flag set +# +# dynamic select arrow file in the folder +query ITB +SELECT * FROM '../core/tests/data/partitioned_table_arrow/part=123' ORDER BY f0; +---- +1 foo true +2 bar false + +# dynamic file query doesn't support partitioned table +statement error DataFusion error: Error during planning: table 'datafusion.public.../core/tests/data/partitioned_table_arrow' not found +SELECT * FROM '../core/tests/data/partitioned_table_arrow' ORDER BY f0; + +# read avro file +query IT +SELECT id, CAST(string_col AS varchar) FROM '../../testing/data/avro/alltypes_plain.avro' +---- +4 0 +5 1 +6 0 +7 1 +2 0 +3 1 +0 0 +1 1 + +# dynamic query snappy avro file +query IT +SELECT id, CAST(string_col AS varchar) FROM '../../testing/data/avro/alltypes_plain.snappy.avro' +---- +4 0 +5 1 +6 0 +7 1 +2 0 +3 1 +0 0 +1 1 + +# query the csv file dynamically with the config of current session +query TT +select * from '../core/tests/data/quote.csv'; +---- +~id0~ ~value0~ +~id1~ ~value1~ +~id2~ ~value2~ +~id3~ ~value3~ +~id4~ ~value4~ +~id5~ ~value5~ +~id6~ ~value6~ +~id7~ ~value7~ +~id8~ ~value8~ +~id9~ ~value9~ + +query TTT +DESCRIBE '../core/tests/data/aggregate_simple.csv'; +---- +c1 Float64 YES +c2 Float64 YES +c3 Boolean YES + +query IR rowsort +SELECT a, b FROM '../core/tests/data/2.json' +---- +-10 -3.5 +1 -3.5 +1 0.6 +1 0.6 +1 2 +1 2 +1 2 +1 2 +100000000000000 0.6 +2 0.6 +5 -3.5 +7 -3.5 + +query IT +SELECT id, CAST(string_col AS varchar) FROM '../../parquet-testing/data/alltypes_plain.parquet'; +---- +4 0 +5 1 +6 0 +7 1 +2 0 +3 1 +0 0 +1 1 diff --git a/datafusion/sqllogictest/test_files/json.slt b/datafusion/sqllogictest/test_files/json.slt index 0b9508310b00..0903c2427649 100644 --- a/datafusion/sqllogictest/test_files/json.slt +++ b/datafusion/sqllogictest/test_files/json.slt @@ -45,6 +45,11 @@ SELECT a, b FROM json_test 5 -3.5 7 -3.5 +# Ensure that local files can not be read by default (a potential security issue) +# (url table is only supported when DynamicFileCatalog is enabled) +statement error DataFusion error: Error during planning: table 'datafusion.public.../core/tests/data/2.json' not found +SELECT a, b FROM '../core/tests/data/2.json' + query TT EXPLAIN SELECT count(*) from json_test ---- diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index 9a7b085312bb..f8b163adc796 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -202,6 +202,11 @@ SELECT id, CAST(string_col AS varchar) FROM alltypes_plain 0 0 1 1 +# Ensure that local files can not be read by default (a potential security issue) +# (url table is only supported when DynamicFileCatalog is enabled) +statement error DataFusion error: Error during planning: table 'datafusion.public.../../parquet-testing/data/alltypes_plain.parquet' not found +SELECT id, CAST(string_col AS varchar) FROM '../../parquet-testing/data/alltypes_plain.parquet'; + # Clean up statement ok DROP TABLE alltypes_plain;