Skip to content

Commit

Permalink
Merge pull request #24339 from ztlpn/iceberg-fix-concurrent
Browse files Browse the repository at this point in the history
datalake: make purging table idempotent
  • Loading branch information
ztlpn authored Nov 28, 2024
2 parents 949b11e + d611809 commit c88f087
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 6 deletions.
6 changes: 6 additions & 0 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,12 @@ topic_table::apply(topic_lifecycle_transition soft_del, model::offset offset) {
model::revision_id purged_revision{
soft_del.topic.initial_revision_id};
if (tombstone_it->second.last_deleted_revision > purged_revision) {
vlog(
clusterlog.info,
"[{}] unexpected iceberg tombstone revision {} (expected {})",
soft_del.topic.nt,
tombstone_it->second.last_deleted_revision,
purged_revision);
return ss::make_ready_future<std::error_code>(
errc::concurrent_modification_error);
}
Expand Down
6 changes: 2 additions & 4 deletions src/v/datalake/coordinator/iceberg_file_committer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,9 @@ ss::future<
iceberg_file_committer::commit_topic_files_to_catalog(
model::topic topic, const topics_state& state) const {
auto tp_it = state.topic_to_state.find(topic);
if (tp_it == state.topic_to_state.end()) {
co_return chunked_vector<mark_files_committed_update>{};
}
if (
tp_it->second.lifecycle_state == topic_state::lifecycle_state_t::purged) {
tp_it == state.topic_to_state.end()
|| !tp_it->second.has_pending_entries()) {
co_return chunked_vector<mark_files_committed_update>{};
}
auto topic_revision = tp_it->second.revision;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,11 @@ TEST_F(FileCommitterTest, TestMissingTable) {
topics_state state;
state.topic_to_state[topic] = make_topic_state({});

// Requires a table (which is not created yet)
// If there are no files to commit, this should be a no-op even if the table
// is not there yet.
auto res = committer.commit_topic_files_to_catalog(topic, state).get();
ASSERT_TRUE(res.has_error());
ASSERT_FALSE(res.has_error());
ASSERT_EQ(res.value().size(), 0);

create_table();

Expand Down

0 comments on commit c88f087

Please sign in to comment.