Skip to content

Commit

Permalink
fix: test data sample for catalog example
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Feb 26, 2024
1 parent a26f583 commit 2a9665d
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 51 deletions.
4 changes: 0 additions & 4 deletions ci/scripts/rust_example.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ files=$(ls .)
for filename in $files
do
example_name=`basename $filename ".rs"`
# Skip tests that rely on external storage and flight
# todo: Currently, catalog.rs is placed in the external-dependence directory because there is a problem parsing
# the parquet file of the external parquet-test that it currently relies on.
# We will wait for this issue[https://github.com/apache/arrow-datafusion/issues/8041] to be resolved.
if [ ! -d $filename ]; then
cargo run --example $example_name
fi
Expand Down
4 changes: 0 additions & 4 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ path = "examples/flight/flight_server.rs"
name = "flight_client"
path = "examples/flight/flight_client.rs"

[[example]]
name = "catalog"
path = "examples/external_dependency/catalog.rs"

[[example]]
name = "dataframe_to_s3"
path = "examples/external_dependency/dataframe-to-s3.rs"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
// under the License.

//! Simple example of a catalog/schema implementation.
//!
//! Example requires git submodules to be initialized in repo as it uses data from
//! the `parquet-testing` repo.
use async_trait::async_trait;
use datafusion::{
arrow::util::pretty,
Expand All @@ -27,7 +24,7 @@ use datafusion::{
{CatalogProvider, CatalogProviderList},
},
datasource::{
file_format::{csv::CsvFormat, parquet::ParquetFormat, FileFormat},
file_format::{csv::CsvFormat, FileFormat},
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
TableProvider,
},
Expand All @@ -36,94 +33,102 @@ use datafusion::{
prelude::SessionContext,
};
use std::sync::RwLock;
use std::{
any::Any,
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
use std::{any::Any, collections::HashMap, path::Path, sync::Arc};
use std::{fs::File, io::Write};
use tempfile::TempDir;

#[tokio::main]
async fn main() -> Result<()> {
let repo_dir = std::fs::canonicalize(
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
// parent dir of datafusion-examples = repo root
.join(".."),
)
.unwrap();
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.init();

// Prepare test directories containing multiple files
let dir_a = prepare_example_data()?;
let dir_b = prepare_example_data()?;

let mut ctx = SessionContext::new();
let state = ctx.state();
let catlist = Arc::new(CustomCatalogProviderList::new());

// use our custom catalog list for context. each context has a single catalog list.
// context will by default have [`MemoryCatalogProviderList`]
ctx.register_catalog_list(catlist.clone());

// initialize our catalog and schemas
let catalog = DirCatalog::new();
let parquet_schema = DirSchema::create(
let schema_a = DirSchema::create(
&state,
DirSchemaOpts {
format: Arc::new(ParquetFormat::default()),
dir: &repo_dir.join("parquet-testing").join("data"),
ext: "parquet",
format: Arc::new(CsvFormat::default()),
dir: dir_a.path(),
ext: "csv",
},
)
.await?;
let csv_schema = DirSchema::create(
let schema_b = DirSchema::create(
&state,
DirSchemaOpts {
format: Arc::new(CsvFormat::default()),
dir: &repo_dir.join("testing").join("data").join("csv"),
dir: dir_b.path(),
ext: "csv",
},
)
.await?;

// register schemas into catalog
catalog.register_schema("parquet", parquet_schema.clone())?;
catalog.register_schema("csv", csv_schema.clone())?;
catalog.register_schema("schema_a", schema_a.clone())?;
catalog.register_schema("schema_b", schema_b.clone())?;

// register our catalog in the context
ctx.register_catalog("dircat", Arc::new(catalog));
{
// catalog was passed down into our custom catalog list since we overide the ctx's default
let catalogs = catlist.catalogs.read().unwrap();
assert!(catalogs.contains_key("dircat"));
};
// take the first 5 (arbitrary amount) keys from our schema's hashmap.

// take the first 3 (arbitrary amount) keys from our schema's hashmap.
// in our `DirSchema`, the table names are equivalent to their key in the hashmap,
// so any key in the hashmap will now be a queryable in our datafusion context.
let parquet_tables = {
let tables = parquet_schema.tables.read().unwrap();
let tables = {
let tables = schema_a.tables.read().unwrap();
tables.keys().take(5).cloned().collect::<Vec<_>>()
};
for table in parquet_tables {
println!("querying table {table} from parquet schema");
for table in tables {
log::info!("querying table {table} from schema_a");
let df = ctx
.sql(&format!("select * from dircat.parquet.\"{table}\" "))
.sql(&format!("select * from dircat.schema_a.\"{table}\" "))
.await?
.limit(0, Some(5))?;
let result = df.collect().await;
match result {
Ok(batches) => {
log::info!("query completed");
pretty::print_batches(&batches).unwrap();
}
Err(e) => {
println!("table '{table}' query failed due to {e}");
log::error!("table '{table}' query failed due to {e}");
}
}
}

// Select table to drop from registered tables
let table_to_drop = {
let parquet_tables = parquet_schema.tables.read().unwrap();
parquet_tables.keys().next().unwrap().to_owned()
let tables = schema_a.tables.read().unwrap();
tables.keys().next().unwrap().to_owned()
};
// DDL example
let df = ctx
.sql(&format!("DROP TABLE dircat.parquet.\"{table_to_drop}\""))

// Execute drop table
let df: datafusion::prelude::DataFrame = ctx
.sql(&format!("DROP TABLE dircat.schema_a.\"{table_to_drop}\""))
.await?;
df.collect().await?;
let parquet_tables = parquet_schema.tables.read().unwrap();
// datafusion has deregistered the table from our schema

// Ensure that datafusion has deregistered the table from our schema
// (called our schema's deregister func)
assert!(!parquet_tables.contains_key(&table_to_drop));
let tables = schema_a.tables.read().unwrap();
assert!(!tables.contains_key(&table_to_drop));
Ok(())
}

Expand Down Expand Up @@ -154,8 +159,14 @@ impl DirSchema {
let conf = ListingTableConfig::new(table_path)
.with_listing_options(opts)
.infer_schema(state)
.await?;
let table = ListingTable::try_new(conf)?;
.await;

if let Err(err) = conf {
log::error!("Error while inferring schema for {filename}: {err}");
continue;
}

let table = ListingTable::try_new(conf?)?;
tables.insert(filename, Arc::new(table) as Arc<dyn TableProvider>);
}
Ok(Arc::new(Self {
Expand Down Expand Up @@ -195,7 +206,7 @@ impl SchemaProvider for DirSchema {
table: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>> {
let mut tables = self.tables.write().unwrap();
println!("adding table {name}");
log::info!("adding table {name}");
tables.insert(name, table.clone());
Ok(Some(table))
}
Expand All @@ -205,7 +216,7 @@ impl SchemaProvider for DirSchema {
#[allow(unused_variables)]
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let mut tables = self.tables.write().unwrap();
println!("dropping table {name}");
log::info!("dropping table {name}");
Ok(tables.remove(name))
}
}
Expand Down Expand Up @@ -287,3 +298,20 @@ impl CatalogProviderList for CustomCatalogProviderList {
cats.get(name).cloned()
}
}

fn prepare_example_data() -> Result<TempDir> {
let dir = tempfile::tempdir()?;
let path = dir.path();

let content = r#"key,value
1,foo
2,bar
3,baz"#;

for i in 0..5 {
let mut file = File::create(path.join(format!("{}.csv", i)))?;
file.write_all(content.as_bytes())?;
}

Ok(dir)
}

0 comments on commit 2a9665d

Please sign in to comment.