diff --git a/ci/scripts/rust_example.sh b/ci/scripts/rust_example.sh index 18a7306b520d..ec14ab5685ad 100755 --- a/ci/scripts/rust_example.sh +++ b/ci/scripts/rust_example.sh @@ -26,10 +26,6 @@ files=$(ls .) for filename in $files do example_name=`basename $filename ".rs"` - # Skip tests that rely on external storage and flight - # todo: Currently, catalog.rs is placed in the external-dependence directory because there is a problem parsing - # the parquet file of the external parquet-test that it currently relies on. - # We will wait for this issue[https://github.com/apache/arrow-datafusion/issues/8041] to be resolved. if [ ! -d $filename ]; then cargo run --example $example_name fi diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index b1a9cbcad5f7..ff18ce6d47dc 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -41,10 +41,6 @@ path = "examples/flight/flight_server.rs" name = "flight_client" path = "examples/flight/flight_client.rs" -[[example]] -name = "catalog" -path = "examples/external_dependency/catalog.rs" - [[example]] name = "dataframe_to_s3" path = "examples/external_dependency/dataframe-to-s3.rs" diff --git a/datafusion-examples/examples/external_dependency/catalog.rs b/datafusion-examples/examples/catalog.rs similarity index 77% rename from datafusion-examples/examples/external_dependency/catalog.rs rename to datafusion-examples/examples/catalog.rs index a623eafdf3d7..4706b2cabffb 100644 --- a/datafusion-examples/examples/external_dependency/catalog.rs +++ b/datafusion-examples/examples/catalog.rs @@ -16,9 +16,6 @@ // under the License. //! Simple example of a catalog/schema implementation. -//! -//! Example requires git submodules to be initialized in repo as it uses data from -//! the `parquet-testing` repo. use async_trait::async_trait; use datafusion::{ arrow::util::pretty, @@ -27,7 +24,7 @@ use datafusion::{ {CatalogProvider, CatalogProviderList}, }, datasource::{ - file_format::{csv::CsvFormat, parquet::ParquetFormat, FileFormat}, + file_format::{csv::CsvFormat, FileFormat}, listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, TableProvider, }, @@ -36,51 +33,53 @@ use datafusion::{ prelude::SessionContext, }; use std::sync::RwLock; -use std::{ - any::Any, - collections::HashMap, - path::{Path, PathBuf}, - sync::Arc, -}; +use std::{any::Any, collections::HashMap, path::Path, sync::Arc}; +use std::{fs::File, io::Write}; +use tempfile::TempDir; #[tokio::main] async fn main() -> Result<()> { - let repo_dir = std::fs::canonicalize( - PathBuf::from(env!("CARGO_MANIFEST_DIR")) - // parent dir of datafusion-examples = repo root - .join(".."), - ) - .unwrap(); + env_logger::builder() + .filter_level(log::LevelFilter::Info) + .init(); + + // Prepare test directories containing multiple files + let dir_a = prepare_example_data()?; + let dir_b = prepare_example_data()?; + let mut ctx = SessionContext::new(); let state = ctx.state(); let catlist = Arc::new(CustomCatalogProviderList::new()); + // use our custom catalog list for context. each context has a single catalog list. // context will by default have [`MemoryCatalogProviderList`] ctx.register_catalog_list(catlist.clone()); // initialize our catalog and schemas let catalog = DirCatalog::new(); - let parquet_schema = DirSchema::create( + let schema_a = DirSchema::create( &state, DirSchemaOpts { - format: Arc::new(ParquetFormat::default()), - dir: &repo_dir.join("parquet-testing").join("data"), - ext: "parquet", + format: Arc::new(CsvFormat::default()), + dir: dir_a.path(), + ext: "csv", }, ) .await?; - let csv_schema = DirSchema::create( + let schema_b = DirSchema::create( &state, DirSchemaOpts { format: Arc::new(CsvFormat::default()), - dir: &repo_dir.join("testing").join("data").join("csv"), + dir: dir_b.path(), ext: "csv", }, ) .await?; + // register schemas into catalog - catalog.register_schema("parquet", parquet_schema.clone())?; - catalog.register_schema("csv", csv_schema.clone())?; + catalog.register_schema("schema_a", schema_a.clone())?; + catalog.register_schema("schema_b", schema_b.clone())?; + // register our catalog in the context ctx.register_catalog("dircat", Arc::new(catalog)); { @@ -88,42 +87,48 @@ async fn main() -> Result<()> { let catalogs = catlist.catalogs.read().unwrap(); assert!(catalogs.contains_key("dircat")); }; - // take the first 5 (arbitrary amount) keys from our schema's hashmap. + + // take the first 3 (arbitrary amount) keys from our schema's hashmap. // in our `DirSchema`, the table names are equivalent to their key in the hashmap, // so any key in the hashmap will now be a queryable in our datafusion context. - let parquet_tables = { - let tables = parquet_schema.tables.read().unwrap(); + let tables = { + let tables = schema_a.tables.read().unwrap(); tables.keys().take(5).cloned().collect::>() }; - for table in parquet_tables { - println!("querying table {table} from parquet schema"); + for table in tables { + log::info!("querying table {table} from schema_a"); let df = ctx - .sql(&format!("select * from dircat.parquet.\"{table}\" ")) + .sql(&format!("select * from dircat.schema_a.\"{table}\" ")) .await? .limit(0, Some(5))?; let result = df.collect().await; match result { Ok(batches) => { + log::info!("query completed"); pretty::print_batches(&batches).unwrap(); } Err(e) => { - println!("table '{table}' query failed due to {e}"); + log::error!("table '{table}' query failed due to {e}"); } } } + + // Select table to drop from registered tables let table_to_drop = { - let parquet_tables = parquet_schema.tables.read().unwrap(); - parquet_tables.keys().next().unwrap().to_owned() + let tables = schema_a.tables.read().unwrap(); + tables.keys().next().unwrap().to_owned() }; - // DDL example - let df = ctx - .sql(&format!("DROP TABLE dircat.parquet.\"{table_to_drop}\"")) + + // Execute drop table + let df: datafusion::prelude::DataFrame = ctx + .sql(&format!("DROP TABLE dircat.schema_a.\"{table_to_drop}\"")) .await?; df.collect().await?; - let parquet_tables = parquet_schema.tables.read().unwrap(); - // datafusion has deregistered the table from our schema + + // Ensure that datafusion has deregistered the table from our schema // (called our schema's deregister func) - assert!(!parquet_tables.contains_key(&table_to_drop)); + let tables = schema_a.tables.read().unwrap(); + assert!(!tables.contains_key(&table_to_drop)); Ok(()) } @@ -154,8 +159,14 @@ impl DirSchema { let conf = ListingTableConfig::new(table_path) .with_listing_options(opts) .infer_schema(state) - .await?; - let table = ListingTable::try_new(conf)?; + .await; + + if let Err(err) = conf { + log::error!("Error while inferring schema for {filename}: {err}"); + continue; + } + + let table = ListingTable::try_new(conf?)?; tables.insert(filename, Arc::new(table) as Arc); } Ok(Arc::new(Self { @@ -195,7 +206,7 @@ impl SchemaProvider for DirSchema { table: Arc, ) -> Result>> { let mut tables = self.tables.write().unwrap(); - println!("adding table {name}"); + log::info!("adding table {name}"); tables.insert(name, table.clone()); Ok(Some(table)) } @@ -205,7 +216,7 @@ impl SchemaProvider for DirSchema { #[allow(unused_variables)] fn deregister_table(&self, name: &str) -> Result>> { let mut tables = self.tables.write().unwrap(); - println!("dropping table {name}"); + log::info!("dropping table {name}"); Ok(tables.remove(name)) } } @@ -287,3 +298,20 @@ impl CatalogProviderList for CustomCatalogProviderList { cats.get(name).cloned() } } + +fn prepare_example_data() -> Result { + let dir = tempfile::tempdir()?; + let path = dir.path(); + + let content = r#"key,value +1,foo +2,bar +3,baz"#; + + for i in 0..5 { + let mut file = File::create(path.join(format!("{}.csv", i)))?; + file.write_all(content.as_bytes())?; + } + + Ok(dir) +}