Skip to content

Commit

Permalink
Update vendored DuckDB sources to 44e807a
Browse files Browse the repository at this point in the history
  • Loading branch information
duckdblabs-bot committed Jan 18, 2025
1 parent 44e807a commit e7008ef
Show file tree
Hide file tree
Showing 50 changed files with 815 additions and 210 deletions.
11 changes: 9 additions & 2 deletions src/duckdb/src/common/arrow/arrow_type_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ ArrowTypeExtension::ArrowTypeExtension(string extension_name, string arrow_forma
: extension_metadata(std::move(extension_name), {}, {}, std::move(arrow_format)), type_extension(std::move(type)) {
}

ArrowTypeExtension::ArrowTypeExtension(ArrowExtensionMetadata &extension_metadata, unique_ptr<ArrowType> type)
: extension_metadata(extension_metadata) {
type_extension = make_shared_ptr<ArrowTypeExtensionData>(type->GetDuckType());
}

ArrowExtensionMetadata::ArrowExtensionMetadata(string extension_name, string vendor_name, string type_name,
string arrow_format)
: extension_name(std::move(extension_name)), vendor_name(std::move(vendor_name)), type_name(std::move(type_name)),
Expand Down Expand Up @@ -197,10 +202,12 @@ ArrowTypeExtension GetArrowExtensionInternal(
unordered_map<ArrowExtensionMetadata, ArrowTypeExtension, HashArrowTypeExtension> &type_extensions,
ArrowExtensionMetadata info) {
if (type_extensions.find(info) == type_extensions.end()) {
auto og_info = info;
info.SetArrowFormat("");
if (type_extensions.find(info) == type_extensions.end()) {
throw NotImplementedException("Arrow Extension with configuration:\n%s not yet registered",
info.ToString());
auto format = og_info.GetArrowFormat();
auto type = ArrowType::GetTypeFromFormat(format);
return ArrowTypeExtension(og_info, std::move(type));
}
}
return type_extensions[info];
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/common/arrow/schema_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ ArrowSchemaMetadata ArrowSchemaMetadata::NonCanonicalType(const string &type_nam

bool ArrowSchemaMetadata::HasExtension() const {
auto arrow_extension = GetOption(ArrowSchemaMetadata::ARROW_EXTENSION_NAME);
return !arrow_extension.empty() && !StringUtil::StartsWith(arrow_extension, "ogc");
return !arrow_extension.empty();
}

ArrowExtensionMetadata ArrowSchemaMetadata::GetExtensionInfo(string format) {
Expand Down
9 changes: 6 additions & 3 deletions src/duckdb/src/common/enum_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4283,19 +4283,22 @@ const StringUtil::EnumStringLiteral *GetWindowBoundaryValues() {
{ static_cast<uint32_t>(WindowBoundary::EXPR_PRECEDING_ROWS), "EXPR_PRECEDING_ROWS" },
{ static_cast<uint32_t>(WindowBoundary::EXPR_FOLLOWING_ROWS), "EXPR_FOLLOWING_ROWS" },
{ static_cast<uint32_t>(WindowBoundary::EXPR_PRECEDING_RANGE), "EXPR_PRECEDING_RANGE" },
{ static_cast<uint32_t>(WindowBoundary::EXPR_FOLLOWING_RANGE), "EXPR_FOLLOWING_RANGE" }
{ static_cast<uint32_t>(WindowBoundary::EXPR_FOLLOWING_RANGE), "EXPR_FOLLOWING_RANGE" },
{ static_cast<uint32_t>(WindowBoundary::CURRENT_ROW_GROUPS), "CURRENT_ROW_GROUPS" },
{ static_cast<uint32_t>(WindowBoundary::EXPR_PRECEDING_GROUPS), "EXPR_PRECEDING_GROUPS" },
{ static_cast<uint32_t>(WindowBoundary::EXPR_FOLLOWING_GROUPS), "EXPR_FOLLOWING_GROUPS" }
};
return values;
}

template<>
const char* EnumUtil::ToChars<WindowBoundary>(WindowBoundary value) {
return StringUtil::EnumToString(GetWindowBoundaryValues(), 9, "WindowBoundary", static_cast<uint32_t>(value));
return StringUtil::EnumToString(GetWindowBoundaryValues(), 12, "WindowBoundary", static_cast<uint32_t>(value));
}

template<>
WindowBoundary EnumUtil::FromString<WindowBoundary>(const char *value) {
return static_cast<WindowBoundary>(StringUtil::StringToEnum(GetWindowBoundaryValues(), 9, "WindowBoundary", value));
return static_cast<WindowBoundary>(StringUtil::StringToEnum(GetWindowBoundaryValues(), 12, "WindowBoundary", value));
}

const StringUtil::EnumStringLiteral *GetWindowExcludeModeValues() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ void CSVSniffer::DetectHeader() {
auto &sniffer_state_machine = best_candidate->GetStateMachine();
names = DetectHeaderInternal(buffer_manager->context, best_header_row, sniffer_state_machine, set_columns,
best_sql_types_candidates_per_column_idx, options, *error_handler);
if (single_row_file && sniffer_state_machine.dialect_options.header.GetValue()) {
if (EmptyOrOnlyHeader()) {
// This file only contains a header, lets default to the lowest type of all.
detected_types.clear();
for (idx_t i = 0; i < names.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ idx_t CSVSniffer::LinesSniffed() const {
return lines_sniffed;
}

bool CSVSniffer::EmptyOrOnlyHeader() const {
return (single_row_file && best_candidate->state_machine->dialect_options.header.GetValue()) || lines_sniffed == 0;
}

bool CSVSniffer::CanYouCastIt(ClientContext &context, const string_t value, const LogicalType &type,
const DialectOptions &dialect_options, const bool is_null, const char decimal_separator) {
if (is_null) {
Expand Down
65 changes: 56 additions & 9 deletions src/duckdb/src/execution/operator/persistent/physical_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ class DeleteLocalState : public LocalSinkState {
public:
DeleteLocalState(ClientContext &context, TableCatalogEntry &table,
const vector<unique_ptr<BoundConstraint>> &bound_constraints) {
delete_chunk.Initialize(Allocator::Get(context), table.GetTypes());
const auto &types = table.GetTypes();
auto initialize = vector<bool>(types.size(), false);
delete_chunk.Initialize(Allocator::Get(context), types, initialize);

auto &storage = table.GetStorage();
delete_state = storage.InitializeDelete(table, context, bound_constraints);
}
Expand All @@ -64,22 +67,66 @@ SinkResultType PhysicalDelete::Sink(ExecutionContext &context, DataChunk &chunk,
auto &transaction = DuckTransaction::Get(context.client, table.db);
auto &row_ids = chunk.data[row_id_index];

vector<StorageIndex> column_ids;
for (idx_t i = 0; i < table.ColumnCount(); i++) {
column_ids.emplace_back(i);
};
auto fetch_state = ColumnFetchState();

lock_guard<mutex> delete_guard(g_state.delete_lock);
if (!return_chunk && !g_state.has_unique_indexes) {
g_state.deleted_count += table.Delete(*l_state.delete_state, context.client, row_ids, chunk.size());
return SinkResultType::NEED_MORE_INPUT;
}

// Fetch the to-be-deleted chunk.
auto types = table.GetTypes();
auto to_be_fetched = vector<bool>(types.size(), return_chunk);
vector<StorageIndex> column_ids;
vector<LogicalType> column_types;
if (return_chunk) {
// Fetch all columns.
column_types = types;
for (idx_t i = 0; i < table.ColumnCount(); i++) {
column_ids.emplace_back(i);
}

} else {
// Fetch only the required columns for updating the delete indexes.
auto &local_storage = LocalStorage::Get(context.client, table.db);
auto storage = local_storage.GetStorage(table);
unordered_set<column_t> indexed_column_id_set;
storage->delete_indexes.Scan([&](Index &index) {
if (!index.IsBound() || !index.IsUnique()) {
return false;
}
auto &set = index.GetColumnIdSet();
indexed_column_id_set.insert(set.begin(), set.end());
return false;
});
for (auto &col : indexed_column_id_set) {
column_ids.emplace_back(col);
}
sort(column_ids.begin(), column_ids.end());
for (auto &col : column_ids) {
auto i = col.GetPrimaryIndex();
to_be_fetched[i] = true;
column_types.push_back(types[i]);
}
}

l_state.delete_chunk.Reset();
row_ids.Flatten(chunk.size());
table.Fetch(transaction, l_state.delete_chunk, column_ids, row_ids, chunk.size(), fetch_state);

// Fetch the to-be-deleted chunk.
DataChunk fetch_chunk;
fetch_chunk.Initialize(Allocator::Get(context.client), column_types, chunk.size());
auto fetch_state = ColumnFetchState();
table.Fetch(transaction, fetch_chunk, column_ids, row_ids, chunk.size(), fetch_state);

// Reference the necessary columns of the fetch_chunk.
idx_t fetch_idx = 0;
for (idx_t i = 0; i < table.ColumnCount(); i++) {
if (to_be_fetched[i]) {
l_state.delete_chunk.data[i].Reference(fetch_chunk.data[fetch_idx++]);
continue;
}
l_state.delete_chunk.data[i].Reference(Value(types[i]));
}
l_state.delete_chunk.SetCardinality(fetch_chunk);

// Append the deleted row IDs to the delete indexes.
// If we only delete local row IDs, then the delete_chunk is empty.
Expand Down
63 changes: 46 additions & 17 deletions src/duckdb/src/execution/operator/persistent/physical_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,17 @@ InsertGlobalState::InsertGlobalState(ClientContext &context, const vector<Logica
: table(table), insert_count(0), initialized(false), return_collection(context, return_types) {
}

InsertLocalState::InsertLocalState(ClientContext &context, const vector<LogicalType> &types,
InsertLocalState::InsertLocalState(ClientContext &context, const vector<LogicalType> &types_p,
const vector<unique_ptr<Expression>> &bound_defaults,
const vector<unique_ptr<BoundConstraint>> &bound_constraints)
: default_executor(context, bound_defaults), bound_constraints(bound_constraints) {

auto &allocator = Allocator::Get(context);
insert_chunk.Initialize(allocator, types);
update_chunk.Initialize(allocator, types);
append_chunk.Initialize(allocator, types);

types = types_p;
auto initialize = vector<bool>(types.size(), false);
update_chunk.Initialize(allocator, types, initialize);
append_chunk.Initialize(allocator, types, initialize);
}

ConstraintState &InsertLocalState::GetConstraintState(DataTable &table, TableCatalogEntry &table_ref) {
Expand Down Expand Up @@ -185,8 +187,10 @@ static void CombineExistingAndInsertTuples(DataChunk &result, DataChunk &scan_ch
auto &insert_types = op.insert_types;

if (types_to_fetch.empty()) {
// We have not scanned the initial table, so we can just duplicate the initial chunk
result.Initialize(client, input_chunk.GetTypes());
// We have not scanned the initial table, so we duplicate the initial chunk.
const auto &types = input_chunk.GetTypes();
auto initialize = vector<bool>(types.size(), false);
result.Initialize(client, types, initialize, input_chunk.size());
result.Reference(input_chunk);
result.SetCardinality(input_chunk);
return;
Expand All @@ -196,7 +200,7 @@ static void CombineExistingAndInsertTuples(DataChunk &result, DataChunk &scan_ch
combined_types.insert(combined_types.end(), insert_types.begin(), insert_types.end());
combined_types.insert(combined_types.end(), types_to_fetch.begin(), types_to_fetch.end());

result.Initialize(client, combined_types);
result.Initialize(client, combined_types, input_chunk.size());
result.Reset();
// Add the VALUES list
for (idx_t i = 0; i < insert_types.size(); i++) {
Expand All @@ -223,12 +227,13 @@ static void CombineExistingAndInsertTuples(DataChunk &result, DataChunk &scan_ch
result.SetCardinality(input_chunk.size());
}

static void CreateUpdateChunk(ExecutionContext &context, DataChunk &chunk, TableCatalogEntry &table, Vector &row_ids,
DataChunk &update_chunk, const PhysicalInsert &op) {
static void CreateUpdateChunk(ExecutionContext &context, DataChunk &chunk, Vector &row_ids, DataChunk &update_chunk,
const PhysicalInsert &op) {

auto &do_update_condition = op.do_update_condition;
auto &set_types = op.set_types;
auto &set_expressions = op.set_expressions;

// Check the optional condition for the DO UPDATE clause, to filter which rows will be updated
if (do_update_condition) {
DataChunk do_update_filter_result;
Expand All @@ -255,8 +260,15 @@ static void CreateUpdateChunk(ExecutionContext &context, DataChunk &chunk, Table
}
}

// Execute the SET expressions
update_chunk.Initialize(context.client, set_types);
if (chunk.size() == 0) {
auto initialize = vector<bool>(set_types.size(), false);
update_chunk.Initialize(context.client, set_types, initialize, chunk.size());
update_chunk.SetCardinality(chunk);
return;
}

// Execute the SET expressions.
update_chunk.Initialize(context.client, set_types, chunk.size());
ExpressionExecutor executor(context.client, set_expressions);
executor.Execute(chunk, update_chunk);
update_chunk.SetCardinality(chunk);
Expand All @@ -272,7 +284,7 @@ static idx_t PerformOnConflictAction(InsertLocalState &lstate, ExecutionContext

auto &set_columns = op.set_columns;
DataChunk update_chunk;
CreateUpdateChunk(context, chunk, table, row_ids, update_chunk, op);
CreateUpdateChunk(context, chunk, row_ids, update_chunk, op);
auto &data_table = table.GetStorage();

// Perform the UPDATE on the (global) storage.
Expand Down Expand Up @@ -476,7 +488,9 @@ static idx_t HandleInsertConflicts(TableCatalogEntry &table, ExecutionContext &c
DataChunk combined_chunk; // contains conflict_chunk + scan_chunk (wide)

// Filter out everything but the conflicting rows
conflict_chunk.Initialize(context.client, tuples.GetTypes());
const auto &types = tuples.GetTypes();
auto initialize = vector<bool>(types.size(), false);
conflict_chunk.Initialize(context.client, types, initialize, tuples.size());
conflict_chunk.Reference(tuples);
conflict_chunk.Slice(conflicts.Selection(), conflicts.Count());
conflict_chunk.SetCardinality(conflicts.Count());
Expand All @@ -487,7 +501,7 @@ static idx_t HandleInsertConflicts(TableCatalogEntry &table, ExecutionContext &c
D_ASSERT(scan_chunk.size() == 0);
// When these values are required for the conditions or the SET expressions,
// then we scan the existing table for the conflicting tuples, using the rowids
scan_chunk.Initialize(context.client, types_to_fetch);
scan_chunk.Initialize(context.client, types_to_fetch, conflicts.Count());
fetch_state = make_uniq<ColumnFetchState>();
if (GLOBAL) {
auto &transaction = DuckTransaction::Get(context.client, table.catalog);
Expand Down Expand Up @@ -520,7 +534,7 @@ static idx_t HandleInsertConflicts(TableCatalogEntry &table, ExecutionContext &c
return affected_tuples;
}

idx_t PhysicalInsert::OnConflictHandling(TableCatalogEntry &table, ExecutionContext &context, InsertGlobalState &gstate,
idx_t PhysicalInsert::OnConflictHandling(TableCatalogEntry &table, ExecutionContext &context,
InsertLocalState &lstate) const {
auto &data_table = table.GetStorage();
auto &local_storage = LocalStorage::Get(context.client, data_table.db);
Expand Down Expand Up @@ -620,6 +634,21 @@ SinkResultType PhysicalInsert::Sink(ExecutionContext &context, DataChunk &chunk,

auto &table = gstate.table;
auto &storage = table.GetStorage();
if (lstate.init_insert_chunk) {
auto initialize = vector<bool>(lstate.types.size(), false);
if (!column_index_map.empty()) {
for (auto &col : table.GetColumns().Physical()) {
auto storage_idx = col.StorageOid();
auto mapped_index = column_index_map[col.Physical()];
if (mapped_index == DConstants::INVALID_INDEX) {
initialize[storage_idx] = true;
}
}
}
auto &allocator = Allocator::Get(context.client);
lstate.insert_chunk.Initialize(allocator, lstate.types, initialize, chunk.size());
lstate.init_insert_chunk = false;
}
PhysicalInsert::ResolveDefaults(table, chunk, column_index_map, lstate.default_executor, lstate.insert_chunk);

if (!parallel) {
Expand All @@ -634,7 +663,7 @@ SinkResultType PhysicalInsert::Sink(ExecutionContext &context, DataChunk &chunk,
// so it should not be added to the RETURNING chunk
gstate.return_collection.Append(lstate.insert_chunk);
}
idx_t updated_tuples = OnConflictHandling(table, context, gstate, lstate);
idx_t updated_tuples = OnConflictHandling(table, context, lstate);
if (action_type == OnConflictAction::NOTHING && return_chunk) {
// Because we didn't add to the RETURNING chunk yet
// we add the tuples that did not get filtered out now
Expand Down Expand Up @@ -665,7 +694,7 @@ SinkResultType PhysicalInsert::Sink(ExecutionContext &context, DataChunk &chunk,
lstate.local_collection->InitializeAppend(lstate.local_append_state);
lstate.writer = &gstate.table.GetStorage().CreateOptimisticWriter(context.client);
}
OnConflictHandling(table, context, gstate, lstate);
OnConflictHandling(table, context, lstate);
D_ASSERT(action_type != OnConflictAction::UPDATE);

auto new_row_group = lstate.local_collection->Append(lstate.insert_chunk, lstate.local_append_state);
Expand Down
4 changes: 4 additions & 0 deletions src/duckdb/src/function/compression_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ static optional_ptr<CompressionFunction> LoadCompressionFunction(CompressionFunc

static void TryLoadCompression(DBConfig &config, vector<reference<CompressionFunction>> &result, CompressionType type,
const PhysicalType physical_type) {
if (config.options.disabled_compression_methods.find(type) != config.options.disabled_compression_methods.end()) {
// explicitly disabled
return;
}
auto function = config.GetCompressionFunction(type, physical_type);
if (!function) {
return;
Expand Down
10 changes: 9 additions & 1 deletion src/duckdb/src/function/table/arrow/arrow_duck_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void ArrowType::ThrowIfInvalid() const {
}
}

unique_ptr<ArrowType> ArrowType::GetTypeFromFormat(DBConfig &config, ArrowSchema &schema, string &format) {
unique_ptr<ArrowType> ArrowType::GetTypeFromFormat(string &format) {
if (format == "n") {
return make_uniq<ArrowType>(LogicalType::SQLNULL);
} else if (format == "b") {
Expand Down Expand Up @@ -179,6 +179,14 @@ unique_ptr<ArrowType> ArrowType::GetTypeFromFormat(DBConfig &config, ArrowSchema
}
return make_uniq<ArrowType>(LogicalType::TIMESTAMP_TZ, std::move(type_info));
}
return nullptr;
}

unique_ptr<ArrowType> ArrowType::GetTypeFromFormat(DBConfig &config, ArrowSchema &schema, string &format) {
auto type = GetTypeFromFormat(format);
if (type) {
return type;
}
if (format == "+l") {
return CreateListType(config, *schema.children[0], ArrowVariableSizeType::NORMAL, false);
} else if (format == "+L") {
Expand Down
Loading

0 comments on commit e7008ef

Please sign in to comment.