From 6de941ec4d47f02c2cb5ff2d20e543387720d538 Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Thu, 1 Aug 2024 11:56:58 +0200 Subject: [PATCH] fix blob type --- scripts/generate_test_data.py | 23 ++++++++++++++--------- src/delta_utils.cpp | 26 +------------------------- src/functions/delta_scan.cpp | 10 +++++++--- test/sql/dat/all.test | 2 +- test/sql/generated/blob.test | 23 +++++++++++++++++++++++ 5 files changed, 46 insertions(+), 38 deletions(-) create mode 100644 test/sql/generated/blob.test diff --git a/scripts/generate_test_data.py b/scripts/generate_test_data.py index eaf9d30..7bb6bcf 100644 --- a/scripts/generate_test_data.py +++ b/scripts/generate_test_data.py @@ -13,7 +13,7 @@ def delete_old_files(): if (os.path.isdir(BASE_PATH)): shutil.rmtree(BASE_PATH) -def generate_test_data_delta_rs(path, query, part_column=False): +def generate_test_data_delta_rs(path, query, part_column=False, add_golden_table=True): """ generate_test_data_delta_rs generates some test data using delta-rs and duckdb @@ -38,12 +38,13 @@ def generate_test_data_delta_rs(path, query, part_column=False): else: write_deltalake(f"{generated_path}/delta_lake", test_table_df) - # Write DuckDB's reference data - os.mkdir(f'{generated_path}/duckdb') - if (part_column): - con.sql(f"COPY test_table to '{generated_path}/duckdb' (FORMAT parquet, PARTITION_BY {part_column})") - else: - con.sql(f"COPY test_table to '{generated_path}/duckdb/data.parquet' (FORMAT parquet)") + if add_golden_table: + # Write DuckDB's reference data + os.mkdir(f'{generated_path}/duckdb') + if (part_column): + con.sql(f"COPY test_table to '{generated_path}/duckdb' (FORMAT parquet, PARTITION_BY {part_column})") + else: + con.sql(f"COPY test_table to '{generated_path}/duckdb/data.parquet' (FORMAT parquet)") def generate_test_data_pyspark(name, current_path, input_path, delete_predicate = False): """ @@ -112,14 +113,18 @@ def generate_test_data_pyspark(name, current_path, input_path, delete_predicate query += "CREATE table test_table AS SELECT *, l_orderkey%10 as part from lineitem;" generate_test_data_delta_rs("lineitem_sf1_10part", query, "part") +## Simple table with a blob as a value +query = "create table test_table as SELECT encode('ABCDE') as blob, encode('ABCDE') as blob_part, 'ABCDE' as string UNION ALL SELECT encode('😈') as blob, encode('😈') as blob_part, '😈' as string" +generate_test_data_delta_rs("simple_blob_table", query, "blob_part", add_golden_table=False) + ## Simple partitioned table with structs query = "CREATE table test_table AS SELECT {'i':i, 'j':i+1} as value, i%2 as part from range(0,10) tbl(i);" -generate_test_data_delta_rs("simple_partitioned_with_structs", query, "part"); +generate_test_data_delta_rs("simple_partitioned_with_structs", query, "part") ## Partitioned table with all types we can file skip on for type in ["bool", "int", "tinyint", "smallint", "bigint", "float", "double", "varchar"]: query = f"CREATE table test_table as select i::{type} as value, i::{type} as part from range(0,2) tbl(i)" - generate_test_data_delta_rs(f"test_file_skipping/{type}", query, "part"); + generate_test_data_delta_rs(f"test_file_skipping/{type}", query, "part") ## Simple table with deletion vector con = duckdb.connect() diff --git a/src/delta_utils.cpp b/src/delta_utils.cpp index 9648481..32db255 100644 --- a/src/delta_utils.cpp +++ b/src/delta_utils.cpp @@ -25,7 +25,7 @@ unique_ptr SchemaVisitor::VisitSnapshotSchema(ffi::Sha visitor.visit_float = VisitSimpleType(); visitor.visit_double = VisitSimpleType(); visitor.visit_boolean = VisitSimpleType(); - visitor.visit_binary = VisitSimpleType(); + visitor.visit_binary = VisitSimpleType(); visitor.visit_date = VisitSimpleType(); visitor.visit_timestamp = VisitSimpleType(); visitor.visit_timestamp_ntz = VisitSimpleType(); @@ -194,30 +194,6 @@ ffi::EngineIterator EngineIteratorFromCallable(Callable& callable) { return {&callable, (const void *(*)(void*)) get_next}; }; -// Helper function to prevent pushing down filters kernel cant handle -// TODO: remove once kernel handles this properly? -static bool CanHandleFilter(TableFilter *filter) { - switch (filter->filter_type) { - case TableFilterType::CONSTANT_COMPARISON: - return true; - case TableFilterType::IS_NULL: - return true; - case TableFilterType::IS_NOT_NULL: - return true; - case TableFilterType::CONJUNCTION_AND: { - auto &conjunction = static_cast(*filter); - bool can_handle = true; - for (const auto& child : conjunction.child_filters) { - can_handle = can_handle && CanHandleFilter(child.get()); - } - return can_handle; - } - - default: - return false; - } -} - uintptr_t PredicateVisitor::VisitPredicate(PredicateVisitor* predicate, ffi::KernelExpressionVisitorState* state) { auto &filters = predicate->column_filters; diff --git a/src/functions/delta_scan.cpp b/src/functions/delta_scan.cpp index 28ea597..abd400a 100644 --- a/src/functions/delta_scan.cpp +++ b/src/functions/delta_scan.cpp @@ -562,9 +562,13 @@ void DeltaMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_optio } auto col_partition_entry = file_metadata->partition_map.find(global_names[col_id]); if (col_partition_entry != file_metadata->partition_map.end()) { - // Todo: use https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization - auto maybe_value = Value(col_partition_entry->second).DefaultCastAs(global_types[col_id]); - reader_data.constant_map.emplace_back(i, maybe_value); + auto ¤t_type = global_types[col_id]; + if (current_type == LogicalType::BLOB) { + reader_data.constant_map.emplace_back(i, Value::BLOB_RAW(col_partition_entry->second)); + } else { + auto maybe_value = Value(col_partition_entry->second).DefaultCastAs(current_type); + reader_data.constant_map.emplace_back(i, maybe_value); + } } } } diff --git a/test/sql/dat/all.test b/test/sql/dat/all.test index 96f3870..ecd06c9 100644 --- a/test/sql/dat/all.test +++ b/test/sql/dat/all.test @@ -112,7 +112,7 @@ FROM delta_scan('${DAT_PATH}/out/reader_tests/generated/multi_partitioned/delta' ---- query I rowsort multi_partitioned -SELECT letter, date, decode(data) as data, number +SELECT * FROM parquet_scan('${DAT_PATH}/out/reader_tests/generated/multi_partitioned/expected/latest/**/*.parquet') ---- diff --git a/test/sql/generated/blob.test b/test/sql/generated/blob.test new file mode 100644 index 0000000..ec40ead --- /dev/null +++ b/test/sql/generated/blob.test @@ -0,0 +1,23 @@ +# name: test/sql/generated/blob.test +# description: Test the BLOB type +# group: [delta_generated] + +require parquet + +require delta + +require-env GENERATED_DATA_AVAILABLE + +query IIIIII +describe select * +from delta_scan('./data/generated/simple_blob_table/delta_lake'); +---- +blob BLOB YES NULL NULL NULL +blob_part BLOB YES NULL NULL NULL +string VARCHAR YES NULL NULL NULL + +query III +from delta_scan('./data/generated/simple_blob_table/delta_lake') order by string; +---- +ABCDE ABCDE ABCDE +\xF0\x9F\x98\x88 \xF0\x9F\x98\x88 😈