Skip to content

Commit

Permalink
Merge pull request #24222 from bharathv/fix_configs
Browse files Browse the repository at this point in the history
datalake/configs: rework iceberg enablement configs
  • Loading branch information
bharathv authored Nov 23, 2024
2 parents bd4fe21 + 2d92351 commit e8155a4
Show file tree
Hide file tree
Showing 46 changed files with 557 additions and 257 deletions.
4 changes: 2 additions & 2 deletions src/v/cloud_storage/tests/topic_manifest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,11 +479,11 @@ SEASTAR_THREAD_TEST_CASE(test_topic_manifest_serde_feature_table) {
std::nullopt,
std::nullopt,
std::nullopt,
false,
model::iceberg_mode::disabled,
std::nullopt,
false,
std::nullopt,
tristate<std::chrono::milliseconds>{},
std::nullopt,
};

auto random_initial_revision_id
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/cluster_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ partition_state get_partition_state(ss::lw_shared_ptr<partition> partition) {
} else {
state.start_cloud_offset = state.next_cloud_offset = model::offset{-1};
}
state.iceberg_enabled = partition->get_ntp_config().iceberg_enabled();
state.iceberg_mode = fmt::format(
"{}", partition->get_ntp_config().iceberg_mode());
state.raft_state = get_partition_raft_state(partition->raft());
return state;
}
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/tests/topic_properties_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ inline cluster::topic_properties random_topic_properties() {
[] { return tests::random_duration_ms(); });
properties.flush_bytes = tests::random_optional(
[] { return random_generators::get_int<size_t>(); });
properties.iceberg_enabled = false;
properties.iceberg_mode = model::iceberg_mode::disabled;

return properties;
}
4 changes: 1 addition & 3 deletions src/v/cluster/topic_configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ storage::ntp_config topic_configuration::make_ntp_config(
.write_caching = properties.write_caching,
.flush_ms = properties.flush_ms,
.flush_bytes = properties.flush_bytes,
.iceberg_enabled = properties.iceberg_enabled,
.iceberg_mode = properties.iceberg_mode,
.cloud_topic_enabled = properties.cloud_topic_enabled,
.iceberg_translation_interval_ms
= properties.iceberg_translation_interval_ms,
.tombstone_retention_ms = properties.delete_retention_ms,
});
}
Expand Down
26 changes: 12 additions & 14 deletions src/v/cluster/topic_properties.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
"write_caching: {}, "
"flush_ms: {}, "
"flush_bytes: {}, "
"remote_label: {}, iceberg_enabled: {}, "
"remote_label: {}, iceberg_mode: {}, "
"leaders_preference: {}, "
"iceberg_translation_interval_ms: {}, "
"delete_retention_ms: {}",
"delete_retention_ms: {}, "
"iceberg_delete: {}",
properties.compression,
properties.cleanup_policy_bitflags,
properties.compaction_strategy,
Expand Down Expand Up @@ -76,10 +76,10 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
properties.flush_ms,
properties.flush_bytes,
properties.remote_label,
properties.iceberg_enabled,
properties.iceberg_mode,
properties.leaders_preference,
properties.iceberg_translation_interval_ms,
properties.delete_retention_ms);
properties.delete_retention_ms,
properties.iceberg_delete);

if (config::shard_local_cfg().development_enable_cloud_topics()) {
fmt::print(
Expand Down Expand Up @@ -123,10 +123,9 @@ bool topic_properties::has_overrides() const {
|| initial_retention_local_target_ms.is_engaged()
|| write_caching.has_value() || flush_ms.has_value()
|| flush_bytes.has_value() || remote_label.has_value()
|| (iceberg_enabled != storage::ntp_config::default_iceberg_enabled)
|| leaders_preference.has_value()
|| iceberg_translation_interval_ms.has_value()
|| delete_retention_ms.is_engaged();
|| (iceberg_mode != storage::ntp_config::default_iceberg_mode)
|| leaders_preference.has_value() || delete_retention_ms.is_engaged()
|| iceberg_delete.has_value();

if (config::shard_local_cfg().development_enable_cloud_topics()) {
return overrides
Expand Down Expand Up @@ -166,9 +165,8 @@ topic_properties::get_ntp_cfg_overrides() const {
ret.write_caching = write_caching;
ret.flush_ms = flush_ms;
ret.flush_bytes = flush_bytes;
ret.iceberg_enabled = iceberg_enabled;
ret.iceberg_mode = iceberg_mode;
ret.cloud_topic_enabled = cloud_topic_enabled;
ret.iceberg_translation_interval_ms = iceberg_translation_interval_ms;
ret.tombstone_retention_ms = delete_retention_ms;
return ret;
}
Expand Down Expand Up @@ -258,11 +256,11 @@ adl<cluster::topic_properties>::from(iobuf_parser& parser) {
std::nullopt,
std::nullopt,
std::nullopt,
false,
model::iceberg_mode::disabled,
std::nullopt,
false,
std::nullopt,
tristate<std::chrono::milliseconds>{disable_tristate},
std::nullopt,
};
}

Expand Down
24 changes: 12 additions & 12 deletions src/v/cluster/topic_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ struct topic_properties
std::optional<model::write_caching_mode> write_caching,
std::optional<std::chrono::milliseconds> flush_ms,
std::optional<size_t> flush_bytes,
bool iceberg_enabled,
model::iceberg_mode iceberg_mode,
std::optional<config::leaders_preference> leaders_preference,
bool cloud_topic_enabled,
std::optional<std::chrono::milliseconds> iceberg_translation_interval,
tristate<std::chrono::milliseconds> delete_retention_ms)
tristate<std::chrono::milliseconds> delete_retention_ms,
std::optional<bool> iceberg_delete)
: compression(compression)
, cleanup_policy_bitflags(cleanup_policy_bitflags)
, compaction_strategy(compaction_strategy)
Expand Down Expand Up @@ -114,11 +114,11 @@ struct topic_properties
, write_caching(write_caching)
, flush_ms(flush_ms)
, flush_bytes(flush_bytes)
, iceberg_enabled(iceberg_enabled)
, iceberg_mode(iceberg_mode)
, leaders_preference(std::move(leaders_preference))
, cloud_topic_enabled(cloud_topic_enabled)
, iceberg_translation_interval_ms(iceberg_translation_interval)
, delete_retention_ms(delete_retention_ms) {}
, delete_retention_ms(delete_retention_ms)
, iceberg_delete(iceberg_delete) {}

std::optional<model::compression> compression;
std::optional<model::cleanup_policy_bitflags> cleanup_policy_bitflags;
Expand Down Expand Up @@ -172,7 +172,7 @@ struct topic_properties
std::optional<model::write_caching_mode> write_caching;
std::optional<std::chrono::milliseconds> flush_ms;
std::optional<size_t> flush_bytes;
bool iceberg_enabled{storage::ntp_config::default_iceberg_enabled};
model::iceberg_mode iceberg_mode{storage::ntp_config::default_iceberg_mode};

// Label to be used when generating paths of remote objects (manifests,
// segments, etc) of this topic.
Expand All @@ -191,9 +191,9 @@ struct topic_properties

bool cloud_topic_enabled{storage::ntp_config::default_cloud_topic_enabled};

std::optional<std::chrono::milliseconds> iceberg_translation_interval_ms;

tristate<std::chrono::milliseconds> delete_retention_ms{disable_tristate};
// Should we delete the corresponding iceberg table when deleting the topic.
std::optional<bool> iceberg_delete;

bool is_compacted() const;
bool has_overrides() const;
Expand Down Expand Up @@ -237,11 +237,11 @@ struct topic_properties
flush_bytes,
remote_label,
remote_topic_namespace_override,
iceberg_enabled,
iceberg_mode,
leaders_preference,
cloud_topic_enabled,
iceberg_translation_interval_ms,
delete_retention_ms);
delete_retention_ms,
iceberg_delete);
}

friend bool operator==(const topic_properties&, const topic_properties&)
Expand Down
11 changes: 5 additions & 6 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1026,16 +1026,15 @@ topic_properties topic_table::update_topic_properties(
incremental_update(updated_properties.flush_ms, overrides.flush_ms);
incremental_update(updated_properties.flush_bytes, overrides.flush_bytes);
incremental_update(
updated_properties.iceberg_enabled,
overrides.iceberg_enabled,
storage::ntp_config::default_iceberg_enabled);
updated_properties.iceberg_mode,
overrides.iceberg_mode,
storage::ntp_config::default_iceberg_mode);
incremental_update(
updated_properties.leaders_preference, overrides.leaders_preference);
incremental_update(
updated_properties.iceberg_translation_interval_ms,
overrides.iceberg_translation_interval_ms);
incremental_update(
updated_properties.delete_retention_ms, overrides.delete_retention_ms);
incremental_update(
updated_properties.iceberg_delete, overrides.iceberg_delete);
return updated_properties;
}

Expand Down
7 changes: 3 additions & 4 deletions src/v/cluster/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,7 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) {
"initial_retention_local_target_bytes: {}, "
"initial_retention_local_target_ms: {}, write_caching: {}, flush_ms: {}, "
"flush_bytes: {}, iceberg_enabled: {}, leaders_preference: {}, "
"remote_read: {}, remote_write: {}",
"iceberg_translation_interval_ms: {}",
"remote_read: {}, remote_write: {}, iceberg_delete: {}",
i.compression,
i.cleanup_policy_bitflags,
i.compaction_strategy,
Expand All @@ -402,11 +401,11 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) {
i.write_caching,
i.flush_ms,
i.flush_bytes,
i.iceberg_enabled,
i.iceberg_mode,
i.leaders_preference,
i.remote_read,
i.remote_write,
i.iceberg_translation_interval_ms);
i.iceberg_delete);
return o;
}

Expand Down
17 changes: 8 additions & 9 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -629,14 +629,13 @@ struct incremental_topic_updates
property_update<std::optional<model::write_caching_mode>> write_caching;
property_update<std::optional<std::chrono::milliseconds>> flush_ms;
property_update<std::optional<size_t>> flush_bytes;
property_update<bool> iceberg_enabled{
storage::ntp_config::default_iceberg_enabled,
property_update<model::iceberg_mode> iceberg_mode{
storage::ntp_config::default_iceberg_mode,
incremental_update_operation::none};
property_update<std::optional<config::leaders_preference>>
leaders_preference;
property_update<std::optional<std::chrono::milliseconds>>
iceberg_translation_interval_ms;
property_update<tristate<std::chrono::milliseconds>> delete_retention_ms;
property_update<std::optional<bool>> iceberg_delete;

// To allow us to better control use of the deprecated shadow_indexing
// field, use getters and setters instead.
Expand Down Expand Up @@ -671,12 +670,12 @@ struct incremental_topic_updates
write_caching,
flush_ms,
flush_bytes,
iceberg_enabled,
iceberg_mode,
leaders_preference,
remote_read,
remote_write,
iceberg_translation_interval_ms,
delete_retention_ms);
delete_retention_ms,
iceberg_delete);
}

friend std::ostream&
Expand Down Expand Up @@ -2875,7 +2874,7 @@ struct partition_state
bool is_remote_fetch_enabled;
bool is_cloud_data_available;
ss::sstring read_replica_bucket;
bool iceberg_enabled;
ss::sstring iceberg_mode;
partition_raft_state raft_state;

auto serde_fields() {
Expand All @@ -2896,7 +2895,7 @@ struct partition_state
is_cloud_data_available,
read_replica_bucket,
raft_state,
iceberg_enabled);
iceberg_mode);
}

friend bool operator==(const partition_state&, const partition_state&)
Expand Down
6 changes: 3 additions & 3 deletions src/v/compat/cluster_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -651,11 +651,11 @@ struct instance_generator<cluster::topic_properties> {
tests::random_optional([] { return tests::random_duration_ms(); }),
tests::random_optional(
[] { return random_generators::get_int<size_t>(); }),
false,
model::iceberg_mode::disabled,
std::nullopt,
false,
std::nullopt,
tristate<std::chrono::milliseconds>{disable_tristate}};
tristate<std::chrono::milliseconds>{disable_tristate},
std::nullopt};
}

static std::vector<cluster::topic_properties> limits() { return {}; }
Expand Down
12 changes: 2 additions & 10 deletions src/v/compat/cluster_json.h
Original file line number Diff line number Diff line change
Expand Up @@ -625,11 +625,7 @@ inline void rjson_serialize(
write_exceptional_member_type(w, "write_caching", tps.write_caching);
write_member(w, "flush_bytes", tps.flush_bytes);
write_member(w, "flush_ms", tps.flush_ms);
write_member(w, "iceberg_enabled", tps.iceberg_enabled);
write_member(
w,
"iceberg_translation_interval_ms",
tps.iceberg_translation_interval_ms);
write_member(w, "iceberg_mode", tps.iceberg_mode);
write_member(w, "delete_retention_ms", tps.delete_retention_ms);
w.EndObject();
}
Expand Down Expand Up @@ -701,11 +697,7 @@ inline void read_value(const json::Value& rd, cluster::topic_properties& obj) {
read_member(rd, "write_caching", obj.write_caching);
read_member(rd, "flush_bytes", obj.flush_bytes);
read_member(rd, "flush_ms", obj.flush_ms);
read_member(rd, "iceberg_enabled", obj.iceberg_enabled);
read_member(
rd,
"iceberg_translation_interval_ms",
obj.iceberg_translation_interval_ms);
read_member(rd, "iceberg_mode", obj.iceberg_mode);
read_member(rd, "delete_retention_ms", obj.delete_retention_ms);
}

Expand Down
20 changes: 11 additions & 9 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3680,15 +3680,6 @@ configuration::configuration()
.visibility = visibility::user,
},
false)
, iceberg_translation_interval_ms_default(
*this,
"iceberg_translation_interval_ms_default",
"How often an Iceberg enabled topic is checked for new data to "
"translate. You can override this value at topic level using "
"redpanda.iceberg.translation.interval.ms.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
std::chrono::milliseconds(1min),
{.min = 10ms})
, iceberg_catalog_commit_interval_ms(
*this,
"iceberg_catalog_commit_interval_ms",
Expand Down Expand Up @@ -3790,6 +3781,17 @@ configuration::configuration()
{.visibility = visibility::user},
std::nullopt,
&validate_non_empty_string_opt)
, iceberg_delete(
*this,
"iceberg_delete",
"Default value for the redpanda.iceberg.delete topic property that "
"determines if the corresponding Iceberg table is deleted upon deleting "
"the topic.",
meta{
.needs_restart = needs_restart::no,
.visibility = visibility::user,
},
true)
, development_enable_cloud_topics(
*this,
"development_enable_cloud_topics",
Expand Down
4 changes: 2 additions & 2 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -698,8 +698,6 @@ struct configuration final : public config_store {

// datalake configurations
enterprise<property<bool>> iceberg_enabled;
bounded_property<std::chrono::milliseconds>
iceberg_translation_interval_ms_default;
bounded_property<std::chrono::milliseconds>
iceberg_catalog_commit_interval_ms;
property<ss::sstring> iceberg_catalog_base_location;
Expand All @@ -717,6 +715,8 @@ struct configuration final : public config_store {
property<std::optional<ss::sstring>> iceberg_rest_catalog_crl_file;
property<std::optional<ss::sstring>> iceberg_rest_catalog_prefix;

property<bool> iceberg_delete;

configuration();

error_map_t load(const YAML::Node& root_node);
Expand Down
1 change: 1 addition & 0 deletions src/v/datalake/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ redpanda_cc_library(
":catalog_schema_manager",
":cloud_data_io",
":logger",
":record_translator",
":types",
"//src/v/base",
"//src/v/cluster",
Expand Down
Loading

0 comments on commit e8155a4

Please sign in to comment.