From aa17e5e6c67dfd48e5947cfccadf3ff31e54f873 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Tue, 12 Nov 2024 20:00:57 +0100 Subject: [PATCH 01/12] config: introduce iceberg_delete cluster config property --- src/v/config/configuration.cc | 11 +++++++++++ src/v/config/configuration.h | 2 ++ 2 files changed, 13 insertions(+) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 5078d9c992964..862f340236672 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -3790,6 +3790,17 @@ configuration::configuration() {.visibility = visibility::user}, std::nullopt, &validate_non_empty_string_opt) + , iceberg_delete( + *this, + "iceberg_delete", + "Default value for the redpanda.iceberg.delete topic property that " + "determines if the corresponding Iceberg table is deleted upon deleting " + "the topic.", + meta{ + .needs_restart = needs_restart::no, + .visibility = visibility::user, + }, + true) , development_enable_cloud_topics( *this, "development_enable_cloud_topics", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index e359a8b0677d5..6a91807e419bf 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -717,6 +717,8 @@ struct configuration final : public config_store { property> iceberg_rest_catalog_crl_file; property> iceberg_rest_catalog_prefix; + property iceberg_delete; + configuration(); error_map_t load(const YAML::Node& root_node); From 5cbd42abf864babfdd64bdd84b562643762945ad Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 13 Nov 2024 13:14:47 +0100 Subject: [PATCH 02/12] cluster: add iceberg_delete topic property --- src/v/cloud_storage/tests/topic_manifest_test.cc | 1 + src/v/cluster/topic_properties.cc | 9 ++++++--- src/v/cluster/topic_properties.h | 11 ++++++++--- src/v/compat/cluster_generator.h | 3 ++- tools/offline_log_viewer/controller.py | 3 ++- 5 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/v/cloud_storage/tests/topic_manifest_test.cc b/src/v/cloud_storage/tests/topic_manifest_test.cc index 613d9ac23897d..6a06c305719aa 100644 --- a/src/v/cloud_storage/tests/topic_manifest_test.cc +++ b/src/v/cloud_storage/tests/topic_manifest_test.cc @@ -484,6 +484,7 @@ SEASTAR_THREAD_TEST_CASE(test_topic_manifest_serde_feature_table) { false, std::nullopt, tristate{}, + std::nullopt, }; auto random_initial_revision_id diff --git a/src/v/cluster/topic_properties.cc b/src/v/cluster/topic_properties.cc index ed548b21101d6..d9707f2622b58 100644 --- a/src/v/cluster/topic_properties.cc +++ b/src/v/cluster/topic_properties.cc @@ -42,7 +42,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) { "remote_label: {}, iceberg_enabled: {}, " "leaders_preference: {}, " "iceberg_translation_interval_ms: {}, " - "delete_retention_ms: {}", + "delete_retention_ms: {}, " + "iceberg_delete: {}", properties.compression, properties.cleanup_policy_bitflags, properties.compaction_strategy, @@ -79,7 +80,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) { properties.iceberg_enabled, properties.leaders_preference, properties.iceberg_translation_interval_ms, - properties.delete_retention_ms); + properties.delete_retention_ms, + properties.iceberg_delete); if (config::shard_local_cfg().development_enable_cloud_topics()) { fmt::print( @@ -126,7 +128,7 @@ bool topic_properties::has_overrides() const { || (iceberg_enabled != storage::ntp_config::default_iceberg_enabled) || leaders_preference.has_value() || iceberg_translation_interval_ms.has_value() - || delete_retention_ms.is_engaged(); + || delete_retention_ms.is_engaged() || iceberg_delete.has_value(); if (config::shard_local_cfg().development_enable_cloud_topics()) { return overrides @@ -263,6 +265,7 @@ adl::from(iobuf_parser& parser) { false, std::nullopt, tristate{disable_tristate}, + std::nullopt, }; } diff --git a/src/v/cluster/topic_properties.h b/src/v/cluster/topic_properties.h index 088541e635ba5..498b32bdbc9bf 100644 --- a/src/v/cluster/topic_properties.h +++ b/src/v/cluster/topic_properties.h @@ -76,7 +76,8 @@ struct topic_properties std::optional leaders_preference, bool cloud_topic_enabled, std::optional iceberg_translation_interval, - tristate delete_retention_ms) + tristate delete_retention_ms, + std::optional iceberg_delete) : compression(compression) , cleanup_policy_bitflags(cleanup_policy_bitflags) , compaction_strategy(compaction_strategy) @@ -118,7 +119,8 @@ struct topic_properties , leaders_preference(std::move(leaders_preference)) , cloud_topic_enabled(cloud_topic_enabled) , iceberg_translation_interval_ms(iceberg_translation_interval) - , delete_retention_ms(delete_retention_ms) {} + , delete_retention_ms(delete_retention_ms) + , iceberg_delete(iceberg_delete) {} std::optional compression; std::optional cleanup_policy_bitflags; @@ -194,6 +196,8 @@ struct topic_properties std::optional iceberg_translation_interval_ms; tristate delete_retention_ms{disable_tristate}; + // Should we delete the corresponding iceberg table when deleting the topic. + std::optional iceberg_delete; bool is_compacted() const; bool has_overrides() const; @@ -241,7 +245,8 @@ struct topic_properties leaders_preference, cloud_topic_enabled, iceberg_translation_interval_ms, - delete_retention_ms); + delete_retention_ms, + iceberg_delete); } friend bool operator==(const topic_properties&, const topic_properties&) diff --git a/src/v/compat/cluster_generator.h b/src/v/compat/cluster_generator.h index 51a8b3fce46ab..938971813b8e6 100644 --- a/src/v/compat/cluster_generator.h +++ b/src/v/compat/cluster_generator.h @@ -655,7 +655,8 @@ struct instance_generator { std::nullopt, false, std::nullopt, - tristate{disable_tristate}}; + tristate{disable_tristate}, + std::nullopt}; } static std::vector limits() { return {}; } diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 6d909245c41d8..8a52ee9cd147f 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -143,7 +143,8 @@ def read_topic_properties_serde(rdr: Reader, version): 'cloud_topic_enabled': rdr.read_bool(), 'iceberg_translation_interval_ms': rdr.read_optional(Reader.read_int64), - 'delete_retention_ms': rdr.read_tristate(Reader.read_int64) + 'delete_retention_ms': rdr.read_tristate(Reader.read_int64), + 'iceberg_delete': rdr.read_optional(Reader.read_bool), } return topic_properties From 882af4db57d9e597af4e67d2ded26ed01f2e05da Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 13 Nov 2024 13:21:38 +0100 Subject: [PATCH 03/12] k/create_topics: added handling for iceberg_delete property --- src/v/kafka/server/handlers/create_topics.cc | 3 ++- src/v/kafka/server/handlers/topics/types.cc | 3 +++ src/v/kafka/server/handlers/topics/types.h | 3 +++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/v/kafka/server/handlers/create_topics.cc b/src/v/kafka/server/handlers/create_topics.cc index 990562c871f0b..50c2845dbcc73 100644 --- a/src/v/kafka/server/handlers/create_topics.cc +++ b/src/v/kafka/server/handlers/create_topics.cc @@ -76,7 +76,8 @@ bool is_supported(std::string_view name) { topic_property_iceberg_enabled, topic_property_leaders_preference, topic_property_iceberg_translation_interval_ms, - topic_property_delete_retention_ms}); + topic_property_delete_retention_ms, + topic_property_iceberg_delete}); if (std::any_of( supported_configs.begin(), diff --git a/src/v/kafka/server/handlers/topics/types.cc b/src/v/kafka/server/handlers/topics/types.cc index d797433283966..ad29efdca8385 100644 --- a/src/v/kafka/server/handlers/topics/types.cc +++ b/src/v/kafka/server/handlers/topics/types.cc @@ -272,6 +272,9 @@ to_cluster_type(const creatable_topic& t) { cfg.properties.delete_retention_ms = get_delete_retention_ms( config_entries); + cfg.properties.iceberg_delete = get_bool_value( + config_entries, topic_property_iceberg_delete); + schema_id_validation_config_parser schema_id_validation_config_parser{ cfg.properties}; diff --git a/src/v/kafka/server/handlers/topics/types.h b/src/v/kafka/server/handlers/topics/types.h index 4de19db450731..e3897689d3d67 100644 --- a/src/v/kafka/server/handlers/topics/types.h +++ b/src/v/kafka/server/handlers/topics/types.h @@ -108,6 +108,9 @@ inline constexpr std::string_view topic_property_cloud_topic_enabled inline constexpr std::string_view topic_property_iceberg_translation_interval_ms = "redpanda.iceberg.translation.interval.ms"; +inline constexpr std::string_view topic_property_iceberg_delete + = "redpanda.iceberg.delete"; + // Kafka topic properties that is not relevant for Redpanda // Or cannot be altered with kafka alter handler inline constexpr std::array allowlist_topic_noop_confs = { From e0fabee56041c1bbdb95a18b69b62992278eaadc Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 13 Nov 2024 14:09:01 +0100 Subject: [PATCH 04/12] k/describe_configs: add handling for iceberg_delete topic config --- .../handlers/configs/config_response_utils.cc | 14 ++++++++++++++ tests/rptest/tests/describe_topics_test.py | 9 ++++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.cc b/src/v/kafka/server/handlers/configs/config_response_utils.cc index ead5260ae21ac..2a31b8ac70644 100644 --- a/src/v/kafka/server/handlers/configs/config_response_utils.cc +++ b/src/v/kafka/server/handlers/configs/config_response_utils.cc @@ -991,6 +991,20 @@ config_response_container_t make_topic_configs( "Preferred location (e.g. rack) for partition leaders of this topic."), &describe_as_string); + add_topic_config_if_requested( + config_keys, + result, + config::shard_local_cfg().iceberg_delete.name(), + config::shard_local_cfg().iceberg_delete(), + topic_property_iceberg_delete, + topic_properties.iceberg_delete, + include_synonyms, + maybe_make_documentation( + include_documentation, + "If true, delete the corresponding Iceberg table when deleting the " + "topic."), + &describe_as_string); + return result; } diff --git a/tests/rptest/tests/describe_topics_test.py b/tests/rptest/tests/describe_topics_test.py index a0411e961d179..dc86a33debbad 100644 --- a/tests/rptest/tests/describe_topics_test.py +++ b/tests/rptest/tests/describe_topics_test.py @@ -301,7 +301,14 @@ def test_describe_topics_with_documentation_and_types(self): value="-1", doc_string= "The retention time for tombstone records in a compacted topic. Cannot be enabled at the same time as any of `cloud_storage_enabled`, `cloud_storage_enable_remote_read`, or `cloud_storage_enable_remote_write`." - ) + ), + "redpanda.iceberg.delete": + ConfigProperty( + config_type="BOOLEAN", + value="true", + doc_string= + "If true, delete the corresponding Iceberg table when deleting the topic." + ), } tp_spec = TopicSpec() From 54c42df9d1277c9346dd1243197e25fa8158f8d4 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 13 Nov 2024 14:28:20 +0100 Subject: [PATCH 05/12] k/alter_configs: add handling for iceberg_delete topic property --- src/v/cluster/topic_table.cc | 2 ++ src/v/cluster/types.cc | 5 +++-- src/v/cluster/types.h | 4 +++- src/v/kafka/server/handlers/alter_configs.cc | 9 ++++++++- src/v/kafka/server/handlers/incremental_alter_configs.cc | 5 +++++ src/v/kafka/server/tests/alter_config_test.cc | 1 + tools/offline_log_viewer/controller.py | 1 + 7 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index 3186f43eec41b..2d4b92d30efa2 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -1036,6 +1036,8 @@ topic_properties topic_table::update_topic_properties( overrides.iceberg_translation_interval_ms); incremental_update( updated_properties.delete_retention_ms, overrides.delete_retention_ms); + incremental_update( + updated_properties.iceberg_delete, overrides.iceberg_delete); return updated_properties; } diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 7945c41f2b8e7..22517190c72bb 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -375,7 +375,7 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) { "initial_retention_local_target_ms: {}, write_caching: {}, flush_ms: {}, " "flush_bytes: {}, iceberg_enabled: {}, leaders_preference: {}, " "remote_read: {}, remote_write: {}", - "iceberg_translation_interval_ms: {}", + "iceberg_translation_interval_ms: {}, iceberg_delete: {}", i.compression, i.cleanup_policy_bitflags, i.compaction_strategy, @@ -406,7 +406,8 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) { i.leaders_preference, i.remote_read, i.remote_write, - i.iceberg_translation_interval_ms); + i.iceberg_translation_interval_ms, + i.iceberg_delete); return o; } diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index 24dffd2d5d9ae..9bba1c6c14831 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -637,6 +637,7 @@ struct incremental_topic_updates property_update> iceberg_translation_interval_ms; property_update> delete_retention_ms; + property_update> iceberg_delete; // To allow us to better control use of the deprecated shadow_indexing // field, use getters and setters instead. @@ -676,7 +677,8 @@ struct incremental_topic_updates remote_read, remote_write, iceberg_translation_interval_ms, - delete_retention_ms); + delete_retention_ms, + iceberg_delete); } friend std::ostream& diff --git a/src/v/kafka/server/handlers/alter_configs.cc b/src/v/kafka/server/handlers/alter_configs.cc index 511afa4656b2e..14cba271407c8 100644 --- a/src/v/kafka/server/handlers/alter_configs.cc +++ b/src/v/kafka/server/handlers/alter_configs.cc @@ -83,7 +83,7 @@ create_topic_properties_update( std::apply(apply_op(op_t::none), update.custom_properties.serde_fields()); static_assert( - std::tuple_size_v == 32, + std::tuple_size_v == 33, "If you added a property, please decide on it's default alter config " "policy, and handle the update in the loop below"); static_assert( @@ -372,6 +372,13 @@ create_topic_properties_update( kafka::config_resource_operation::set); continue; } + if (cfg.name == topic_property_iceberg_delete) { + parse_and_set_optional_bool_alpha( + update.properties.iceberg_delete, + cfg.value, + kafka::config_resource_operation::set); + continue; + } } catch (const validation_error& e) { return make_error_alter_config_resource_response< diff --git a/src/v/kafka/server/handlers/incremental_alter_configs.cc b/src/v/kafka/server/handlers/incremental_alter_configs.cc index bf174e587b5e2..76f5418726065 100644 --- a/src/v/kafka/server/handlers/incremental_alter_configs.cc +++ b/src/v/kafka/server/handlers/incremental_alter_configs.cc @@ -377,6 +377,11 @@ create_topic_properties_update( delete_retention_ms_validator{}); continue; } + if (cfg.name == topic_property_iceberg_delete) { + parse_and_set_optional_bool_alpha( + update.properties.iceberg_delete, cfg.value, op); + continue; + } } catch (const validation_error& e) { vlog( diff --git a/src/v/kafka/server/tests/alter_config_test.cc b/src/v/kafka/server/tests/alter_config_test.cc index 4dedd765a9fec..83313d19a5e01 100644 --- a/src/v/kafka/server/tests/alter_config_test.cc +++ b/src/v/kafka/server/tests/alter_config_test.cc @@ -379,6 +379,7 @@ FIXTURE_TEST( "redpanda.leaders.preference", "redpanda.iceberg.translation.interval.ms", "delete.retention.ms", + "redpanda.iceberg.delete", }; // All properties_request diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 8a52ee9cd147f..ff1a194ff4945 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -308,6 +308,7 @@ def incr_topic_upd(rdr: Reader, version): 'iceberg_enabled': rdr.read_bool(), 'leaders_preference': rdr.read_optional(read_leaders_preference), + 'iceberg_delete': rdr.read_optional(Reader.read_bool), } return incr_obj From 8eca1847662765d851b0fbe5fc759e078524a607 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Wed, 20 Nov 2024 14:09:28 -0800 Subject: [PATCH 06/12] topic/properties: introduce iceberg_mode enum --- src/v/model/metadata.h | 19 +++++++++++++++++++ src/v/model/model.cc | 26 ++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/src/v/model/metadata.h b/src/v/model/metadata.h index 20b301e4d7cad..ef67f09b67f0f 100644 --- a/src/v/model/metadata.h +++ b/src/v/model/metadata.h @@ -588,6 +588,25 @@ enum class recovery_validation_mode : std::uint16_t { std::ostream& operator<<(std::ostream&, recovery_validation_mode); std::istream& operator>>(std::istream&, recovery_validation_mode&); +// Iceberg enablement options for a topic +enum class iceberg_mode : uint8_t { + // Iceberg is disabled + disabled = 0, + // Iceberg translation interprets record key and value as binary + // types and uses default Iceberg table schema. + key_value = 1, + // Iceberg translation interprets the record value using the schema + // id embedded in value. Kafka serializers embed a magic byte as the + // first byte of the value to indicate the presence of a schema id + // which is then resolved with the schema registry. The value bytes + // are then interepted using the schema and the resulting columns are + // mapped to appropriate iceberg types and corresponding table columns. + value_schema_id_prefix = 2 +}; + +std::ostream& operator<<(std::ostream&, const iceberg_mode&); +std::istream& operator>>(std::istream&, iceberg_mode&); + } // namespace model template<> diff --git a/src/v/model/model.cc b/src/v/model/model.cc index 8df26b8661b16..34ec7d03c0e2b 100644 --- a/src/v/model/model.cc +++ b/src/v/model/model.cc @@ -582,6 +582,32 @@ std::istream& operator>>(std::istream& is, recovery_validation_mode& vm) { return is; } +std::ostream& operator<<(std::ostream& os, const iceberg_mode& mode) { + switch (mode) { + case iceberg_mode::disabled: + return os << "disabled"; + case iceberg_mode::key_value: + return os << "key_value"; + case iceberg_mode::value_schema_id_prefix: + return os << "value_schema_id_prefix"; + } +} + +std::istream& operator>>(std::istream& is, iceberg_mode& mode) { + using enum iceberg_mode; + ss::sstring s; + is >> s; + try { + mode = string_switch(s) + .match("disabled", disabled) + .match("key_value", key_value) + .match("value_schema_id_prefix", value_schema_id_prefix); + } catch (const std::runtime_error&) { + is.setstate(std::ios::failbit); + } + return is; +} + std::ostream& operator<<(std::ostream& os, const fips_mode_flag& f) { return os << to_string_view(f); } From 8f0fc0494fefadb25a17575ea39f6ec13f7659f1 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Wed, 20 Nov 2024 15:40:52 -0800 Subject: [PATCH 07/12] datalake/config: switch iceberg_enabled to iceberg_mode Actual integrtation with schema_less/schemaful mode will be a separate PR. --- .../tests/topic_manifest_test.cc | 2 +- src/v/cluster/cluster_utils.cc | 3 +- .../tests/topic_properties_generator.h | 2 +- src/v/cluster/topic_configuration.cc | 2 +- src/v/cluster/topic_properties.cc | 10 ++-- src/v/cluster/topic_properties.h | 8 +-- src/v/cluster/topic_table.cc | 6 +- src/v/cluster/types.cc | 2 +- src/v/cluster/types.h | 10 ++-- src/v/compat/cluster_generator.h | 2 +- src/v/compat/cluster_json.h | 4 +- src/v/datalake/datalake_manager.cc | 6 +- src/v/datalake/tests/fixture.h | 2 +- .../translation/tests/state_machine_test.cc | 15 +++-- src/v/kafka/server/handlers/alter_configs.cc | 7 +-- .../handlers/configs/config_response_utils.cc | 25 ++++---- .../server/handlers/configs/config_utils.h | 60 ++++++++++++++++++- src/v/kafka/server/handlers/create_topics.cc | 2 +- .../handlers/incremental_alter_configs.cc | 7 +-- src/v/kafka/server/handlers/topics/types.cc | 7 ++- src/v/kafka/server/handlers/topics/types.h | 4 +- .../kafka/server/handlers/topics/validators.h | 11 ++-- src/v/kafka/server/tests/alter_config_test.cc | 52 ++++++++-------- src/v/redpanda/admin/api-doc/debug.json | 6 +- src/v/redpanda/admin/debug.cc | 2 +- src/v/storage/ntp_config.h | 16 +++-- src/v/storage/types.cc | 4 +- tests/rptest/clients/types.py | 2 +- .../tests/datalake/datalake_e2e_test.py | 3 +- .../tests/datalake/datalake_services.py | 3 +- .../datalake/rest_catalog_connection_test.py | 4 +- tests/rptest/tests/describe_topics_test.py | 9 ++- .../tests/polaris_catalog_smoke_test.py | 4 +- tools/offline_log_viewer/controller.py | 4 +- 34 files changed, 188 insertions(+), 118 deletions(-) diff --git a/src/v/cloud_storage/tests/topic_manifest_test.cc b/src/v/cloud_storage/tests/topic_manifest_test.cc index 6a06c305719aa..81ff0e8da2339 100644 --- a/src/v/cloud_storage/tests/topic_manifest_test.cc +++ b/src/v/cloud_storage/tests/topic_manifest_test.cc @@ -479,7 +479,7 @@ SEASTAR_THREAD_TEST_CASE(test_topic_manifest_serde_feature_table) { std::nullopt, std::nullopt, std::nullopt, - false, + model::iceberg_mode::disabled, std::nullopt, false, std::nullopt, diff --git a/src/v/cluster/cluster_utils.cc b/src/v/cluster/cluster_utils.cc index 611c45f0bce61..40d9ddfaa2a7f 100644 --- a/src/v/cluster/cluster_utils.cc +++ b/src/v/cluster/cluster_utils.cc @@ -287,7 +287,8 @@ partition_state get_partition_state(ss::lw_shared_ptr partition) { } else { state.start_cloud_offset = state.next_cloud_offset = model::offset{-1}; } - state.iceberg_enabled = partition->get_ntp_config().iceberg_enabled(); + state.iceberg_mode = fmt::format( + "{}", partition->get_ntp_config().iceberg_mode()); state.raft_state = get_partition_raft_state(partition->raft()); return state; } diff --git a/src/v/cluster/tests/topic_properties_generator.h b/src/v/cluster/tests/topic_properties_generator.h index fe62df4d9dd37..494c550470837 100644 --- a/src/v/cluster/tests/topic_properties_generator.h +++ b/src/v/cluster/tests/topic_properties_generator.h @@ -73,7 +73,7 @@ inline cluster::topic_properties random_topic_properties() { [] { return tests::random_duration_ms(); }); properties.flush_bytes = tests::random_optional( [] { return random_generators::get_int(); }); - properties.iceberg_enabled = false; + properties.iceberg_mode = model::iceberg_mode::disabled; return properties; } diff --git a/src/v/cluster/topic_configuration.cc b/src/v/cluster/topic_configuration.cc index 71062d13cace9..b0f17d21a9316 100644 --- a/src/v/cluster/topic_configuration.cc +++ b/src/v/cluster/topic_configuration.cc @@ -55,7 +55,7 @@ storage::ntp_config topic_configuration::make_ntp_config( .write_caching = properties.write_caching, .flush_ms = properties.flush_ms, .flush_bytes = properties.flush_bytes, - .iceberg_enabled = properties.iceberg_enabled, + .iceberg_mode = properties.iceberg_mode, .cloud_topic_enabled = properties.cloud_topic_enabled, .iceberg_translation_interval_ms = properties.iceberg_translation_interval_ms, diff --git a/src/v/cluster/topic_properties.cc b/src/v/cluster/topic_properties.cc index d9707f2622b58..236227baf6458 100644 --- a/src/v/cluster/topic_properties.cc +++ b/src/v/cluster/topic_properties.cc @@ -39,7 +39,7 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) { "write_caching: {}, " "flush_ms: {}, " "flush_bytes: {}, " - "remote_label: {}, iceberg_enabled: {}, " + "remote_label: {}, iceberg_mode: {}, " "leaders_preference: {}, " "iceberg_translation_interval_ms: {}, " "delete_retention_ms: {}, " @@ -77,7 +77,7 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) { properties.flush_ms, properties.flush_bytes, properties.remote_label, - properties.iceberg_enabled, + properties.iceberg_mode, properties.leaders_preference, properties.iceberg_translation_interval_ms, properties.delete_retention_ms, @@ -125,7 +125,7 @@ bool topic_properties::has_overrides() const { || initial_retention_local_target_ms.is_engaged() || write_caching.has_value() || flush_ms.has_value() || flush_bytes.has_value() || remote_label.has_value() - || (iceberg_enabled != storage::ntp_config::default_iceberg_enabled) + || (iceberg_mode != storage::ntp_config::default_iceberg_mode) || leaders_preference.has_value() || iceberg_translation_interval_ms.has_value() || delete_retention_ms.is_engaged() || iceberg_delete.has_value(); @@ -168,7 +168,7 @@ topic_properties::get_ntp_cfg_overrides() const { ret.write_caching = write_caching; ret.flush_ms = flush_ms; ret.flush_bytes = flush_bytes; - ret.iceberg_enabled = iceberg_enabled; + ret.iceberg_mode = iceberg_mode; ret.cloud_topic_enabled = cloud_topic_enabled; ret.iceberg_translation_interval_ms = iceberg_translation_interval_ms; ret.tombstone_retention_ms = delete_retention_ms; @@ -260,7 +260,7 @@ adl::from(iobuf_parser& parser) { std::nullopt, std::nullopt, std::nullopt, - false, + model::iceberg_mode::disabled, std::nullopt, false, std::nullopt, diff --git a/src/v/cluster/topic_properties.h b/src/v/cluster/topic_properties.h index 498b32bdbc9bf..f78978683bba3 100644 --- a/src/v/cluster/topic_properties.h +++ b/src/v/cluster/topic_properties.h @@ -72,7 +72,7 @@ struct topic_properties std::optional write_caching, std::optional flush_ms, std::optional flush_bytes, - bool iceberg_enabled, + model::iceberg_mode iceberg_mode, std::optional leaders_preference, bool cloud_topic_enabled, std::optional iceberg_translation_interval, @@ -115,7 +115,7 @@ struct topic_properties , write_caching(write_caching) , flush_ms(flush_ms) , flush_bytes(flush_bytes) - , iceberg_enabled(iceberg_enabled) + , iceberg_mode(iceberg_mode) , leaders_preference(std::move(leaders_preference)) , cloud_topic_enabled(cloud_topic_enabled) , iceberg_translation_interval_ms(iceberg_translation_interval) @@ -174,7 +174,7 @@ struct topic_properties std::optional write_caching; std::optional flush_ms; std::optional flush_bytes; - bool iceberg_enabled{storage::ntp_config::default_iceberg_enabled}; + model::iceberg_mode iceberg_mode{storage::ntp_config::default_iceberg_mode}; // Label to be used when generating paths of remote objects (manifests, // segments, etc) of this topic. @@ -241,7 +241,7 @@ struct topic_properties flush_bytes, remote_label, remote_topic_namespace_override, - iceberg_enabled, + iceberg_mode, leaders_preference, cloud_topic_enabled, iceberg_translation_interval_ms, diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index 2d4b92d30efa2..f965e5c7a9e52 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -1026,9 +1026,9 @@ topic_properties topic_table::update_topic_properties( incremental_update(updated_properties.flush_ms, overrides.flush_ms); incremental_update(updated_properties.flush_bytes, overrides.flush_bytes); incremental_update( - updated_properties.iceberg_enabled, - overrides.iceberg_enabled, - storage::ntp_config::default_iceberg_enabled); + updated_properties.iceberg_mode, + overrides.iceberg_mode, + storage::ntp_config::default_iceberg_mode); incremental_update( updated_properties.leaders_preference, overrides.leaders_preference); incremental_update( diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 22517190c72bb..702e70de89a51 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -402,7 +402,7 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) { i.write_caching, i.flush_ms, i.flush_bytes, - i.iceberg_enabled, + i.iceberg_mode, i.leaders_preference, i.remote_read, i.remote_write, diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index 9bba1c6c14831..e57214c39239a 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -629,8 +629,8 @@ struct incremental_topic_updates property_update> write_caching; property_update> flush_ms; property_update> flush_bytes; - property_update iceberg_enabled{ - storage::ntp_config::default_iceberg_enabled, + property_update iceberg_mode{ + storage::ntp_config::default_iceberg_mode, incremental_update_operation::none}; property_update> leaders_preference; @@ -672,7 +672,7 @@ struct incremental_topic_updates write_caching, flush_ms, flush_bytes, - iceberg_enabled, + iceberg_mode, leaders_preference, remote_read, remote_write, @@ -2877,7 +2877,7 @@ struct partition_state bool is_remote_fetch_enabled; bool is_cloud_data_available; ss::sstring read_replica_bucket; - bool iceberg_enabled; + ss::sstring iceberg_mode; partition_raft_state raft_state; auto serde_fields() { @@ -2898,7 +2898,7 @@ struct partition_state is_cloud_data_available, read_replica_bucket, raft_state, - iceberg_enabled); + iceberg_mode); } friend bool operator==(const partition_state&, const partition_state&) diff --git a/src/v/compat/cluster_generator.h b/src/v/compat/cluster_generator.h index 938971813b8e6..53ff21427a455 100644 --- a/src/v/compat/cluster_generator.h +++ b/src/v/compat/cluster_generator.h @@ -651,7 +651,7 @@ struct instance_generator { tests::random_optional([] { return tests::random_duration_ms(); }), tests::random_optional( [] { return random_generators::get_int(); }), - false, + model::iceberg_mode::disabled, std::nullopt, false, std::nullopt, diff --git a/src/v/compat/cluster_json.h b/src/v/compat/cluster_json.h index c2a38e0847d31..02bcabe364e9c 100644 --- a/src/v/compat/cluster_json.h +++ b/src/v/compat/cluster_json.h @@ -625,7 +625,7 @@ inline void rjson_serialize( write_exceptional_member_type(w, "write_caching", tps.write_caching); write_member(w, "flush_bytes", tps.flush_bytes); write_member(w, "flush_ms", tps.flush_ms); - write_member(w, "iceberg_enabled", tps.iceberg_enabled); + write_member(w, "iceberg_mode", tps.iceberg_mode); write_member( w, "iceberg_translation_interval_ms", @@ -701,7 +701,7 @@ inline void read_value(const json::Value& rd, cluster::topic_properties& obj) { read_member(rd, "write_caching", obj.write_caching); read_member(rd, "flush_bytes", obj.flush_bytes); read_member(rd, "flush_ms", obj.flush_ms); - read_member(rd, "iceberg_enabled", obj.iceberg_enabled); + read_member(rd, "iceberg_mode", obj.iceberg_mode); read_member( rd, "iceberg_translation_interval_ms", diff --git a/src/v/datalake/datalake_manager.cc b/src/v/datalake/datalake_manager.cc index eae30d5b7c6e5..8103d2ffaaba7 100644 --- a/src/v/datalake/datalake_manager.cc +++ b/src/v/datalake/datalake_manager.cc @@ -101,7 +101,7 @@ ss::future<> datalake_manager::start() { } }); - // Handle topic properties changes (iceberg.enabled=true/false) + // Handle topic properties changes (iceberg_mode) auto topic_properties_registration = _topic_table->local().register_ntp_delta_notification( [this](cluster::topic_table::ntp_delta_range_t range) { @@ -162,7 +162,9 @@ void datalake_manager::on_group_notification(const model::ntp& ntp) { } auto it = _translators.find(ntp); // todo(iceberg) handle topic / partition disabling - if (!partition->is_leader() || !topic_cfg->properties.iceberg_enabled) { + auto iceberg_disabled = topic_cfg->properties.iceberg_mode + == model::iceberg_mode::disabled; + if (!partition->is_leader() || iceberg_disabled) { if (it != _translators.end()) { ssx::spawn_with_gate(_gate, [this, partition] { return stop_translator(partition->ntp()); diff --git a/src/v/datalake/tests/fixture.h b/src/v/datalake/tests/fixture.h index ef7633afb69de..78a18e5dbc105 100644 --- a/src/v/datalake/tests/fixture.h +++ b/src/v/datalake/tests/fixture.h @@ -67,7 +67,7 @@ class datalake_cluster_test_fixture : public archiver_cluster_fixture { ss::future<> create_iceberg_topic( model::topic topic, int num_partitions = 1, int16_t num_replicas = 3) { cluster::topic_properties props; - props.iceberg_enabled = true; + props.iceberg_mode = model::iceberg_mode::value_schema_id_prefix; props.iceberg_translation_interval_ms = 50ms; return cluster_test_fixture::create_topic( {model::kafka_namespace, topic}, diff --git a/src/v/datalake/translation/tests/state_machine_test.cc b/src/v/datalake/translation/tests/state_machine_test.cc index 397dd13fafde6..dc026b9abe676 100644 --- a/src/v/datalake/translation/tests/state_machine_test.cc +++ b/src/v/datalake/translation/tests/state_machine_test.cc @@ -21,18 +21,23 @@ struct translator_stm_fixture : stm_raft_fixture { return builder.create_stm(logger(), node.raft().get()); } - ss::future<> update_iceberg_config(bool value) { - co_await parallel_for_each_node([value](raft_node_instance& node) { + ss::future<> update_iceberg_config(model::iceberg_mode mode) { + co_await parallel_for_each_node([mode](raft_node_instance& node) { auto log = node.raft()->log(); log->set_overrides( - storage::ntp_config::default_overrides{.iceberg_enabled = value}); + storage::ntp_config::default_overrides{.iceberg_mode = mode}); return ss::make_ready_future<>(); }); } - ss::future<> enable_iceberg() { return update_iceberg_config(true); } + ss::future<> enable_iceberg() { + return update_iceberg_config( + model::iceberg_mode::value_schema_id_prefix); + } - ss::future<> disable_iceberg() { return update_iceberg_config(false); } + ss::future<> disable_iceberg() { + return update_iceberg_config(model::iceberg_mode::disabled); + } template ss::future<> for_each_stm(Func&& f) { diff --git a/src/v/kafka/server/handlers/alter_configs.cc b/src/v/kafka/server/handlers/alter_configs.cc index 14cba271407c8..463fb91dfb912 100644 --- a/src/v/kafka/server/handlers/alter_configs.cc +++ b/src/v/kafka/server/handlers/alter_configs.cc @@ -329,13 +329,12 @@ create_topic_properties_update( flush_bytes_validator{}); continue; } - if (cfg.name == topic_property_iceberg_enabled) { - parse_and_set_bool( + if (cfg.name == topic_property_iceberg_mode) { + parse_and_set_property( tp_ns, - update.properties.iceberg_enabled, + update.properties.iceberg_mode, cfg.value, kafka::config_resource_operation::set, - storage::ntp_config::default_iceberg_enabled, iceberg_config_validator{}); continue; } diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.cc b/src/v/kafka/server/handlers/configs/config_response_utils.cc index 2a31b8ac70644..266fe2906d859 100644 --- a/src/v/kafka/server/handlers/configs/config_response_utils.cc +++ b/src/v/kafka/server/handlers/configs/config_response_utils.cc @@ -103,7 +103,7 @@ consteval describe_configs_type property_config_type() { std::is_same_v || std::is_same_v || std::is_same_v || - std::is_same_v; + std::is_same_v || std::is_same_v; constexpr auto is_long_type = is_long || // Long type since seconds is atleast a 35-bit signed integral @@ -772,21 +772,22 @@ config_response_container_t make_topic_configs( }); } - if (config_property_requested( - config_keys, topic_property_iceberg_enabled)) { - add_topic_config( + if (config_property_requested(config_keys, topic_property_iceberg_mode)) { + add_topic_config( result, - topic_property_iceberg_enabled, - storage::ntp_config::default_iceberg_enabled, - topic_property_iceberg_enabled, + topic_property_iceberg_mode, + storage::ntp_config::default_iceberg_mode, + topic_property_iceberg_mode, override_if_not_default( - std::make_optional(topic_properties.iceberg_enabled), - storage::ntp_config::default_iceberg_enabled), + std::make_optional( + topic_properties.iceberg_mode), + storage::ntp_config::default_iceberg_mode), true, maybe_make_documentation( - include_documentation, - "Iceberg format translation enabled on this topic if true."), - [](const bool& b) { return b ? "true" : "false"; }); + include_documentation, "Iceberg enablement mode for the topic."), + [](const model::iceberg_mode& mode) { + return ssx::sformat("{}", mode); + }); } if (config::shard_local_cfg().development_enable_cloud_topics()) { diff --git a/src/v/kafka/server/handlers/configs/config_utils.h b/src/v/kafka/server/handlers/configs/config_utils.h index 0f37ae2a85cb0..64009c17a9678 100644 --- a/src/v/kafka/server/handlers/configs/config_utils.h +++ b/src/v/kafka/server/handlers/configs/config_utils.h @@ -334,6 +334,14 @@ struct noop_bool_validator { } }; +template +struct noop_validator_with_tn { + std::optional + operator()(model::topic_namespace_view, const ss::sstring&, const T&) { + return std::nullopt; + } +}; + struct segment_size_validator { std::optional operator()(const ss::sstring&, const size_t& value) { @@ -460,12 +468,16 @@ struct flush_bytes_validator { struct iceberg_config_validator { std::optional operator()( - model::topic_namespace_view tns, const ss::sstring&, bool value) { + model::topic_namespace_view tns, + const ss::sstring&, + const model::iceberg_mode& value) { if (!model::is_user_topic(tns)) { return fmt::format( "Iceberg configuration cannot be altered on non user topics"); } - if (!config::shard_local_cfg().iceberg_enabled() && value) { + if ( + !config::shard_local_cfg().iceberg_enabled() + && value != model::iceberg_mode::disabled) { return fmt::format( "Iceberg disabled in the cluster configuration, enable it by " "setting: {}", @@ -521,6 +533,50 @@ using replication_factor_validator = config_validator_list< replication_factor_must_be_odd, replication_factor_must_be_greater_or_equal_to_minimum>; +template< + typename T, + typename Validator = noop_validator_with_tn, + typename ParseFunc = decltype(boost::lexical_cast)> +requires requires( + model::topic_namespace_view tn, + const T& value, + const ss::sstring& str, + Validator validator, + ParseFunc parse) { + { parse(str) } -> std::convertible_to; + { + validator(tn, str, value) + } -> std::convertible_to>; +} +void parse_and_set_property( + model::topic_namespace_view tn, + cluster::property_update& property, + const std::optional& value, + config_resource_operation op, + Validator validator = noop_validator{}, + ParseFunc parse = boost::lexical_cast) { + // remove property value + if (op == config_resource_operation::remove) { + property.op = cluster::incremental_update_operation::remove; + return; + } + // set property value if preset, otherwise do nothing + if (op == config_resource_operation::set && value) { + property.op = cluster::incremental_update_operation::set; + try { + auto v = parse(*value); + auto v_error = validator(tn, *value, v); + if (v_error) { + throw validation_error(*v_error); + } + property.value = std::move(v); + } catch (const std::runtime_error&) { + throw boost::bad_lexical_cast(); + } + return; + } +} + template< typename T, typename Validator = noop_validator, diff --git a/src/v/kafka/server/handlers/create_topics.cc b/src/v/kafka/server/handlers/create_topics.cc index 50c2845dbcc73..2e79446b9af1e 100644 --- a/src/v/kafka/server/handlers/create_topics.cc +++ b/src/v/kafka/server/handlers/create_topics.cc @@ -73,7 +73,7 @@ bool is_supported(std::string_view name) { topic_property_write_caching, topic_property_flush_ms, topic_property_flush_bytes, - topic_property_iceberg_enabled, + topic_property_iceberg_mode, topic_property_leaders_preference, topic_property_iceberg_translation_interval_ms, topic_property_delete_retention_ms, diff --git a/src/v/kafka/server/handlers/incremental_alter_configs.cc b/src/v/kafka/server/handlers/incremental_alter_configs.cc index 76f5418726065..4dff42ef603a8 100644 --- a/src/v/kafka/server/handlers/incremental_alter_configs.cc +++ b/src/v/kafka/server/handlers/incremental_alter_configs.cc @@ -335,13 +335,12 @@ create_topic_properties_update( flush_bytes_validator{}); continue; } - if (cfg.name == topic_property_iceberg_enabled) { - parse_and_set_bool( + if (cfg.name == topic_property_iceberg_mode) { + parse_and_set_property( tp_ns, - update.properties.iceberg_enabled, + update.properties.iceberg_mode, cfg.value, op, - storage::ntp_config::default_iceberg_enabled, iceberg_config_validator{}); continue; } diff --git a/src/v/kafka/server/handlers/topics/types.cc b/src/v/kafka/server/handlers/topics/types.cc index ad29efdca8385..89a6feb92e821 100644 --- a/src/v/kafka/server/handlers/topics/types.cc +++ b/src/v/kafka/server/handlers/topics/types.cc @@ -259,9 +259,10 @@ to_cluster_type(const creatable_topic& t) { cfg.properties.flush_bytes = get_config_value( config_entries, topic_property_flush_bytes); - cfg.properties.iceberg_enabled - = get_bool_value(config_entries, topic_property_iceberg_enabled) - .value_or(storage::ntp_config::default_iceberg_enabled); + cfg.properties.iceberg_mode + = get_config_value( + config_entries, topic_property_iceberg_mode) + .value_or(storage::ntp_config::default_iceberg_mode); cfg.properties.leaders_preference = get_leaders_preference(config_entries); diff --git a/src/v/kafka/server/handlers/topics/types.h b/src/v/kafka/server/handlers/topics/types.h index e3897689d3d67..14731355eec4c 100644 --- a/src/v/kafka/server/handlers/topics/types.h +++ b/src/v/kafka/server/handlers/topics/types.h @@ -96,8 +96,8 @@ inline constexpr std::string_view inline constexpr std::string_view topic_property_mpx_virtual_cluster_id = "redpanda.virtual.cluster.id"; -inline constexpr std::string_view topic_property_iceberg_enabled - = "redpanda.iceberg.enabled"; +inline constexpr std::string_view topic_property_iceberg_mode + = "redpanda.iceberg.mode"; inline constexpr std::string_view topic_property_leaders_preference = "redpanda.leaders.preference"; diff --git a/src/v/kafka/server/handlers/topics/validators.h b/src/v/kafka/server/handlers/topics/validators.h index 2b2fba8803ba2..79b26d4a14bb5 100644 --- a/src/v/kafka/server/handlers/topics/validators.h +++ b/src/v/kafka/server/handlers/topics/validators.h @@ -280,16 +280,15 @@ struct iceberg_config_validator { c.configs.begin(), c.configs.end(), [](const createable_topic_config& cfg) { - return cfg.name == topic_property_iceberg_enabled; + return cfg.name == topic_property_iceberg_mode; }); if (it == c.configs.end() || !it->value.has_value()) { return true; } - bool enabled_with_topic_override = false; + model::iceberg_mode parsed_mode; try { - enabled_with_topic_override = string_switch(it->value.value()) - .match("true", true) - .match("false", false); + parsed_mode = boost::lexical_cast( + it->value.value()); } catch (...) { return false; } @@ -298,7 +297,7 @@ struct iceberg_config_validator { // at the cluster level, it cannot be enabled with a topic // override. return config::shard_local_cfg().iceberg_enabled() - || !enabled_with_topic_override; + || parsed_mode == model::iceberg_mode::disabled; } }; diff --git a/src/v/kafka/server/tests/alter_config_test.cc b/src/v/kafka/server/tests/alter_config_test.cc index 83313d19a5e01..91d1de95a626f 100644 --- a/src/v/kafka/server/tests/alter_config_test.cc +++ b/src/v/kafka/server/tests/alter_config_test.cc @@ -375,7 +375,7 @@ FIXTURE_TEST( "write.caching", "flush.ms", "flush.bytes", - "redpanda.iceberg.enabled", + "redpanda.iceberg.mode", "redpanda.leaders.preference", "redpanda.iceberg.translation.interval.ms", "delete.retention.ms", @@ -848,19 +848,19 @@ FIXTURE_TEST(test_incremental_alter_config_remove, alter_config_test_fixture) { } FIXTURE_TEST(test_iceberg_property, alter_config_test_fixture) { - auto do_create_topic = [&](model::topic tp, bool iceberg) { - absl::flat_hash_map properties; - properties.emplace( - "redpanda.iceberg.enabled", (iceberg ? "true" : "false")); - return create_topic(tp, properties); - }; + auto do_create_topic = + [&](model::topic tp, ss::sstring iceberg_mode = "key_value") { + absl::flat_hash_map properties; + properties.emplace("redpanda.iceberg.mode", iceberg_mode); + return create_topic(tp, properties); + }; model::topic topic1{"test1"}; model::topic topic2{"topic2"}; { // Try creating a topic with iceberg enabled while it is // disabled in cluster config. - auto resp = do_create_topic(topic1, true); + auto resp = do_create_topic(topic1); BOOST_REQUIRE_EQUAL(resp.data.topics.size(), 1); BOOST_REQUIRE_EQUAL( resp.data.topics[0].error_code, kafka::error_code::invalid_config); @@ -869,13 +869,13 @@ FIXTURE_TEST(test_iceberg_property, alter_config_test_fixture) { { // create a topic without iceberg and try enabling iceberg // while it is disabled at the cluster lvel. - auto resp = do_create_topic(topic2, false); + auto resp = do_create_topic(topic2, "disabled"); BOOST_REQUIRE_EQUAL(resp.data.topics.size(), 1); BOOST_REQUIRE_EQUAL( resp.data.topics[0].error_code, kafka::error_code::none); absl::flat_hash_map properties; - properties.emplace("redpanda.iceberg.enabled", "true"); + properties.emplace("redpanda.iceberg.mode", "value_schema_id_prefix"); auto alter_resp = alter_configs( make_alter_topic_config_resource_cv(topic2, properties)); BOOST_REQUIRE_EQUAL(alter_resp.data.responses.size(), 1); @@ -892,8 +892,9 @@ FIXTURE_TEST(test_iceberg_property, alter_config_test_fixture) { pair, kafka::config_resource_operation>> properties; properties.emplace( - "redpanda.iceberg.enabled", - std::make_pair("true", kafka::config_resource_operation::set)); + "redpanda.iceberg.mode", + std::make_pair( + "value_schema_id_prefix", kafka::config_resource_operation::set)); auto resp = incremental_alter_configs( make_incremental_alter_topic_config_resource_cv(topic2, properties)); @@ -909,7 +910,7 @@ FIXTURE_TEST(test_iceberg_property, alter_config_test_fixture) { { // Attempt to create the topic again. - auto resp = do_create_topic(topic1, true); + auto resp = do_create_topic(topic1); BOOST_REQUIRE_EQUAL(resp.data.topics.size(), 1); BOOST_REQUIRE_EQUAL( resp.data.topics[0].error_code, kafka::error_code::none); @@ -917,10 +918,10 @@ FIXTURE_TEST(test_iceberg_property, alter_config_test_fixture) { { // alter the iceberg config of an existing topic, should work. - for (auto prop : {false, true}) { - ss::sstring prop_str = prop ? "true" : "false"; + for (const auto& prop : + {"disabled", "key_value", "value_schema_id_prefix"}) { absl::flat_hash_map properties; - properties.emplace("redpanda.iceberg.enabled", prop_str); + properties.emplace("redpanda.iceberg.mode", prop); auto resp = alter_configs( make_alter_topic_config_resource_cv(topic2, properties)); @@ -932,14 +933,14 @@ FIXTURE_TEST(test_iceberg_property, alter_config_test_fixture) { auto describe_resp = describe_configs(topic2); assert_property_value( - topic2, "redpanda.iceberg.enabled", prop_str, describe_resp); + topic2, "redpanda.iceberg.mode", prop, describe_resp); } } { // same as above, with incremental alter - for (auto prop : {false, true}) { - ss::sstring prop_str = prop ? "true" : "false"; + for (const auto& prop : + {"disabled", "key_value", "value_schema_id_prefix"}) { absl::flat_hash_map< ss::sstring, std::pair< @@ -947,8 +948,8 @@ FIXTURE_TEST(test_iceberg_property, alter_config_test_fixture) { kafka::config_resource_operation>> properties; properties.emplace( - "redpanda.iceberg.enabled", - std::make_pair(prop_str, kafka::config_resource_operation::set)); + "redpanda.iceberg.mode", + std::make_pair(prop, kafka::config_resource_operation::set)); auto resp = incremental_alter_configs( make_incremental_alter_topic_config_resource_cv( @@ -961,7 +962,7 @@ FIXTURE_TEST(test_iceberg_property, alter_config_test_fixture) { auto describe_resp = describe_configs(topic2); assert_property_value( - topic2, "redpanda.iceberg.enabled", prop_str, describe_resp); + topic2, "redpanda.iceberg.mode", prop, describe_resp); } } @@ -978,7 +979,7 @@ FIXTURE_TEST(test_iceberg_property, alter_config_test_fixture) { .set_value(std::vector{}); absl::flat_hash_map properties; - properties.emplace("redpanda.iceberg.enabled", "true"); + properties.emplace("redpanda.iceberg.mode", "key_value"); auto resp = alter_configs(make_alter_topic_config_resource_cv( model::kafka_consumer_offsets_topic, properties)); @@ -992,8 +993,9 @@ FIXTURE_TEST(test_iceberg_property, alter_config_test_fixture) { pair, kafka::config_resource_operation>> incr_properties; incr_properties.emplace( - "redpanda.iceberg.enabled", - std::make_pair("true", kafka::config_resource_operation::set)); + "redpanda.iceberg.mode", + std::make_pair( + "value_schema_id_prefix", kafka::config_resource_operation::set)); auto incr_resp = incremental_alter_configs( make_incremental_alter_topic_config_resource_cv( model::kafka_consumer_offsets_topic, incr_properties)); diff --git a/src/v/redpanda/admin/api-doc/debug.json b/src/v/redpanda/admin/api-doc/debug.json index e0dad8165853a..ac2dd9bde2d07 100644 --- a/src/v/redpanda/admin/api-doc/debug.json +++ b/src/v/redpanda/admin/api-doc/debug.json @@ -1148,9 +1148,9 @@ "type": "long", "description": "Next cloud offset" }, - "iceberg_enabled": { - "type": "boolean", - "description": "Whether iceberg translation is enabled on this replica." + "iceberg_mode": { + "type": "string", + "description": "Iceberg enablement mode for the topic" }, "raft_state": { "type": "raft_replica_state", diff --git a/src/v/redpanda/admin/debug.cc b/src/v/redpanda/admin/debug.cc index 4ca5d81854756..79bdc4a10bf5c 100644 --- a/src/v/redpanda/admin/debug.cc +++ b/src/v/redpanda/admin/debug.cc @@ -803,7 +803,7 @@ admin_server::get_partition_state_handler( replica.is_cloud_data_available = state.is_cloud_data_available; replica.start_cloud_offset = state.start_cloud_offset; replica.next_cloud_offset = state.next_cloud_offset; - replica.iceberg_enabled = state.iceberg_enabled; + replica.iceberg_mode = state.iceberg_mode; fill_raft_state(replica, std::move(state)); response.replicas.push(std::move(replica)); } diff --git a/src/v/storage/ntp_config.h b/src/v/storage/ntp_config.h index 30c777ba6e2f6..7bb06507ff042 100644 --- a/src/v/storage/ntp_config.h +++ b/src/v/storage/ntp_config.h @@ -33,7 +33,8 @@ class ntp_config { // is handled during adl/serde decode). static constexpr bool default_remote_delete{true}; static constexpr bool legacy_remote_delete{false}; - static constexpr bool default_iceberg_enabled{false}; + static constexpr model::iceberg_mode default_iceberg_mode + = model::iceberg_mode::disabled; static constexpr bool default_cloud_topic_enabled{false}; static constexpr std::chrono::milliseconds read_replica_retention{3600000}; @@ -78,7 +79,7 @@ class ntp_config { std::optional flush_ms; std::optional flush_bytes; - bool iceberg_enabled{default_iceberg_enabled}; + model::iceberg_mode iceberg_mode{default_iceberg_mode}; bool cloud_topic_enabled{default_cloud_topic_enabled}; std::optional iceberg_translation_interval_ms{std::nullopt}; @@ -343,12 +344,15 @@ class ntp_config { return cleanup_policy_override().value_or(cluster_default); } - bool iceberg_enabled() const { + model::iceberg_mode iceberg_mode() const { if (!config::shard_local_cfg().iceberg_enabled) { - return false; + return model::iceberg_mode::disabled; } - return _overrides ? _overrides->iceberg_enabled - : default_iceberg_enabled; + return _overrides ? _overrides->iceberg_mode : default_iceberg_mode; + } + + bool iceberg_enabled() const { + return iceberg_mode() != model::iceberg_mode::disabled; } bool cloud_topic_enabled() const { diff --git a/src/v/storage/types.cc b/src/v/storage/types.cc index 4f1adbad94ab5..5103787853e77 100644 --- a/src/v/storage/types.cc +++ b/src/v/storage/types.cc @@ -114,7 +114,7 @@ operator<<(std::ostream& o, const ntp_config::default_overrides& v) { "remote_delete: {}, segment_ms: {}, " "initial_retention_local_target_bytes: {}, " "initial_retention_local_target_ms: {}, write_caching: {}, flush_ms: {}, " - "flush_bytes: {} iceberg_enabled: {}, iceberg_translation_interval_ms: " + "flush_bytes: {} iceberg_mode: {}, iceberg_translation_interval_ms: " "{}}}", v.compaction_strategy, v.cleanup_policy_bitflags, @@ -131,7 +131,7 @@ operator<<(std::ostream& o, const ntp_config::default_overrides& v) { v.write_caching, v.flush_ms, v.flush_bytes, - v.iceberg_enabled, + v.iceberg_mode, v.iceberg_translation_interval_ms); if (config::shard_local_cfg().development_enable_cloud_topics()) { diff --git a/tests/rptest/clients/types.py b/tests/rptest/clients/types.py index ed03624b5d847..91a3a6042602c 100644 --- a/tests/rptest/clients/types.py +++ b/tests/rptest/clients/types.py @@ -39,7 +39,7 @@ class TopicSpec: PROPERTY_WRITE_CACHING = "write.caching" PROPERTY_FLUSH_MS = "flush.ms" PROPERTY_FLUSH_BYTES = "flush.bytes" - PROPERTY_ICEBERG_ENABLED = "redpanda.iceberg.enabled" + PROPERTY_ICEBERG_MODE = "redpanda.iceberg.mode" PROPERTY_ICEBERG_TRANSLATION_INTERVAL = "redpanda.iceberg.translation.interval.ms" PROPERTY_DELETE_RETENTION_MS = "delete.retention.ms" diff --git a/tests/rptest/tests/datalake/datalake_e2e_test.py b/tests/rptest/tests/datalake/datalake_e2e_test.py index 1154b847e9bbc..71c886f511f5c 100644 --- a/tests/rptest/tests/datalake/datalake_e2e_test.py +++ b/tests/rptest/tests/datalake/datalake_e2e_test.py @@ -93,7 +93,8 @@ def test_avro_schema(self, cloud_storage_type, query_engine): redpanda=self.redpanda, filesystem_catalog_mode=True, include_query_engines=[query_engine]) as dl: - dl.create_iceberg_enabled_topic(self.topic_name) + dl.create_iceberg_enabled_topic( + self.topic_name, iceberg_mode="value_schema_id_prefix") avro_serde_client = self._get_serde_client(SchemaType.AVRO, SerdeClientType.Golang, self.topic_name, count) diff --git a/tests/rptest/tests/datalake/datalake_services.py b/tests/rptest/tests/datalake/datalake_services.py index 94a4a752fdd69..a5d66bdd672fc 100644 --- a/tests/rptest/tests/datalake/datalake_services.py +++ b/tests/rptest/tests/datalake/datalake_services.py @@ -83,9 +83,10 @@ def create_iceberg_enabled_topic(self, name, partitions=1, replicas=1, + iceberg_mode="key_value", translation_interval_ms=3000, config: dict[str, Any] = dict()): - config[TopicSpec.PROPERTY_ICEBERG_ENABLED] = "true" + config[TopicSpec.PROPERTY_ICEBERG_MODE] = iceberg_mode config[TopicSpec. PROPERTY_ICEBERG_TRANSLATION_INTERVAL] = translation_interval_ms rpk = RpkTool(self.redpanda) diff --git a/tests/rptest/tests/datalake/rest_catalog_connection_test.py b/tests/rptest/tests/datalake/rest_catalog_connection_test.py index 8c04cda8ee33b..e1f4747b48e37 100644 --- a/tests/rptest/tests/datalake/rest_catalog_connection_test.py +++ b/tests/rptest/tests/datalake/rest_catalog_connection_test.py @@ -92,8 +92,8 @@ def test_redpanda_connection_to_rest_catalog(self, cloud_storage_type): topic = TopicSpec(name='datalake-test-topic', partition_count=3) self.client().create_topic(topic) - self.client().alter_topic_config(topic.name, - "redpanda.iceberg.enabled", "true") + self.client().alter_topic_config(topic.name, "redpanda.iceberg.mode", + "key_value") producer = self.start_producer(topic_name=topic.name) # wait for the producer to finish diff --git a/tests/rptest/tests/describe_topics_test.py b/tests/rptest/tests/describe_topics_test.py index dc86a33debbad..af13f464bd051 100644 --- a/tests/rptest/tests/describe_topics_test.py +++ b/tests/rptest/tests/describe_topics_test.py @@ -275,12 +275,11 @@ def test_describe_topics_with_documentation_and_types(self): doc_string= "Maximum number of bytes that are not flushed per partition. If the configured threshold is reached, the log is automatically flushed even if it has not been explicitly requested." ), - "redpanda.iceberg.enabled": + "redpanda.iceberg.mode": ConfigProperty( - config_type="BOOLEAN", - value="false", - doc_string= - "Iceberg format translation enabled on this topic if true."), + config_type="STRING", + value="disabled", + doc_string="Iceberg enablement mode for the topic."), "redpanda.leaders.preference": ConfigProperty( config_type="STRING", diff --git a/tests/rptest/tests/polaris_catalog_smoke_test.py b/tests/rptest/tests/polaris_catalog_smoke_test.py index cb4a7f24e720d..74ef45c93fb79 100644 --- a/tests/rptest/tests/polaris_catalog_smoke_test.py +++ b/tests/rptest/tests/polaris_catalog_smoke_test.py @@ -235,8 +235,8 @@ def test_connecting_to_catalog(self, cloud_storage_type, with_tls): topic = TopicSpec(partition_count=1, replication_factor=1) client = DefaultClient(self.redpanda) client.create_topic(topic) - client.alter_topic_config(topic.name, - TopicSpec.PROPERTY_ICEBERG_ENABLED, "true") + client.alter_topic_config(topic.name, TopicSpec.PROPERTY_ICEBERG_MODE, + "key_value") rpk = RpkTool(self.redpanda) for i in range(10): rpk.produce(topic.name, "key", f"value-{i}") diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index ff1a194ff4945..902bf3f7e9dfc 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -138,7 +138,7 @@ def read_topic_properties_serde(rdr: Reader, version): } if version >= 10: topic_properties |= { - 'iceberg_enabled': rdr.read_bool(), + 'iceberg_mode': rdr.read_serde_enum(), 'leaders_preference': rdr.read_optional(read_leaders_preference), 'cloud_topic_enabled': rdr.read_bool(), 'iceberg_translation_interval_ms': @@ -305,7 +305,7 @@ def incr_topic_upd(rdr: Reader, version): } if version >= 7: incr_obj |= { - 'iceberg_enabled': rdr.read_bool(), + 'iceberg_mode': rdr.read_serde_enum(), 'leaders_preference': rdr.read_optional(read_leaders_preference), 'iceberg_delete': rdr.read_optional(Reader.read_bool), From 23829f562d615d39b60e8f18aae3cfc7802d50e7 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Wed, 20 Nov 2024 21:00:32 -0800 Subject: [PATCH 08/12] datalake/configs: remove datalake_translation_interval_ms --- .../tests/topic_manifest_test.cc | 1 - src/v/cluster/topic_configuration.cc | 2 -- src/v/cluster/topic_properties.cc | 9 ++------ src/v/cluster/topic_properties.h | 5 ----- src/v/cluster/topic_table.cc | 3 --- src/v/cluster/types.cc | 4 +--- src/v/cluster/types.h | 3 --- src/v/compat/cluster_generator.h | 1 - src/v/compat/cluster_json.h | 8 ------- src/v/config/configuration.cc | 9 -------- src/v/config/configuration.h | 2 -- src/v/datalake/datalake_manager.cc | 21 ++++++++++++------- src/v/datalake/datalake_manager.h | 3 ++- src/v/datalake/tests/fixture.h | 1 - src/v/datalake/tests/translator_test.cc | 8 ++++++- src/v/kafka/server/handlers/alter_configs.cc | 10 +-------- .../handlers/configs/config_response_utils.cc | 14 ------------- src/v/kafka/server/handlers/create_topics.cc | 1 - .../handlers/incremental_alter_configs.cc | 7 ------- src/v/kafka/server/handlers/topics/types.cc | 4 ---- src/v/kafka/server/handlers/topics/types.h | 3 --- src/v/kafka/server/tests/alter_config_test.cc | 1 - src/v/storage/ntp_config.h | 10 --------- src/v/storage/types.cc | 6 ++---- tests/rptest/clients/types.py | 1 - .../tests/datalake/datalake_services.py | 3 --- .../datalake/rest_catalog_connection_test.py | 1 - tests/rptest/tests/describe_topics_test.py | 7 ------- .../tests/polaris_catalog_smoke_test.py | 2 -- tools/offline_log_viewer/controller.py | 2 -- 30 files changed, 29 insertions(+), 123 deletions(-) diff --git a/src/v/cloud_storage/tests/topic_manifest_test.cc b/src/v/cloud_storage/tests/topic_manifest_test.cc index 81ff0e8da2339..515a1c6a1afbf 100644 --- a/src/v/cloud_storage/tests/topic_manifest_test.cc +++ b/src/v/cloud_storage/tests/topic_manifest_test.cc @@ -482,7 +482,6 @@ SEASTAR_THREAD_TEST_CASE(test_topic_manifest_serde_feature_table) { model::iceberg_mode::disabled, std::nullopt, false, - std::nullopt, tristate{}, std::nullopt, }; diff --git a/src/v/cluster/topic_configuration.cc b/src/v/cluster/topic_configuration.cc index b0f17d21a9316..e8f320f7e0c5f 100644 --- a/src/v/cluster/topic_configuration.cc +++ b/src/v/cluster/topic_configuration.cc @@ -57,8 +57,6 @@ storage::ntp_config topic_configuration::make_ntp_config( .flush_bytes = properties.flush_bytes, .iceberg_mode = properties.iceberg_mode, .cloud_topic_enabled = properties.cloud_topic_enabled, - .iceberg_translation_interval_ms - = properties.iceberg_translation_interval_ms, .tombstone_retention_ms = properties.delete_retention_ms, }); } diff --git a/src/v/cluster/topic_properties.cc b/src/v/cluster/topic_properties.cc index 236227baf6458..c6f7e67607531 100644 --- a/src/v/cluster/topic_properties.cc +++ b/src/v/cluster/topic_properties.cc @@ -41,7 +41,6 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) { "flush_bytes: {}, " "remote_label: {}, iceberg_mode: {}, " "leaders_preference: {}, " - "iceberg_translation_interval_ms: {}, " "delete_retention_ms: {}, " "iceberg_delete: {}", properties.compression, @@ -79,7 +78,6 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) { properties.remote_label, properties.iceberg_mode, properties.leaders_preference, - properties.iceberg_translation_interval_ms, properties.delete_retention_ms, properties.iceberg_delete); @@ -126,9 +124,8 @@ bool topic_properties::has_overrides() const { || write_caching.has_value() || flush_ms.has_value() || flush_bytes.has_value() || remote_label.has_value() || (iceberg_mode != storage::ntp_config::default_iceberg_mode) - || leaders_preference.has_value() - || iceberg_translation_interval_ms.has_value() - || delete_retention_ms.is_engaged() || iceberg_delete.has_value(); + || leaders_preference.has_value() || delete_retention_ms.is_engaged() + || iceberg_delete.has_value(); if (config::shard_local_cfg().development_enable_cloud_topics()) { return overrides @@ -170,7 +167,6 @@ topic_properties::get_ntp_cfg_overrides() const { ret.flush_bytes = flush_bytes; ret.iceberg_mode = iceberg_mode; ret.cloud_topic_enabled = cloud_topic_enabled; - ret.iceberg_translation_interval_ms = iceberg_translation_interval_ms; ret.tombstone_retention_ms = delete_retention_ms; return ret; } @@ -263,7 +259,6 @@ adl::from(iobuf_parser& parser) { model::iceberg_mode::disabled, std::nullopt, false, - std::nullopt, tristate{disable_tristate}, std::nullopt, }; diff --git a/src/v/cluster/topic_properties.h b/src/v/cluster/topic_properties.h index f78978683bba3..93797c69ad494 100644 --- a/src/v/cluster/topic_properties.h +++ b/src/v/cluster/topic_properties.h @@ -75,7 +75,6 @@ struct topic_properties model::iceberg_mode iceberg_mode, std::optional leaders_preference, bool cloud_topic_enabled, - std::optional iceberg_translation_interval, tristate delete_retention_ms, std::optional iceberg_delete) : compression(compression) @@ -118,7 +117,6 @@ struct topic_properties , iceberg_mode(iceberg_mode) , leaders_preference(std::move(leaders_preference)) , cloud_topic_enabled(cloud_topic_enabled) - , iceberg_translation_interval_ms(iceberg_translation_interval) , delete_retention_ms(delete_retention_ms) , iceberg_delete(iceberg_delete) {} @@ -193,8 +191,6 @@ struct topic_properties bool cloud_topic_enabled{storage::ntp_config::default_cloud_topic_enabled}; - std::optional iceberg_translation_interval_ms; - tristate delete_retention_ms{disable_tristate}; // Should we delete the corresponding iceberg table when deleting the topic. std::optional iceberg_delete; @@ -244,7 +240,6 @@ struct topic_properties iceberg_mode, leaders_preference, cloud_topic_enabled, - iceberg_translation_interval_ms, delete_retention_ms, iceberg_delete); } diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index f965e5c7a9e52..3ad69bdad6d1e 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -1031,9 +1031,6 @@ topic_properties topic_table::update_topic_properties( storage::ntp_config::default_iceberg_mode); incremental_update( updated_properties.leaders_preference, overrides.leaders_preference); - incremental_update( - updated_properties.iceberg_translation_interval_ms, - overrides.iceberg_translation_interval_ms); incremental_update( updated_properties.delete_retention_ms, overrides.delete_retention_ms); incremental_update( diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 702e70de89a51..34e2546ed16af 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -374,8 +374,7 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) { "initial_retention_local_target_bytes: {}, " "initial_retention_local_target_ms: {}, write_caching: {}, flush_ms: {}, " "flush_bytes: {}, iceberg_enabled: {}, leaders_preference: {}, " - "remote_read: {}, remote_write: {}", - "iceberg_translation_interval_ms: {}, iceberg_delete: {}", + "remote_read: {}, remote_write: {}, iceberg_delete: {}", i.compression, i.cleanup_policy_bitflags, i.compaction_strategy, @@ -406,7 +405,6 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) { i.leaders_preference, i.remote_read, i.remote_write, - i.iceberg_translation_interval_ms, i.iceberg_delete); return o; } diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index e57214c39239a..c5dfd11f7e254 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -634,8 +634,6 @@ struct incremental_topic_updates incremental_update_operation::none}; property_update> leaders_preference; - property_update> - iceberg_translation_interval_ms; property_update> delete_retention_ms; property_update> iceberg_delete; @@ -676,7 +674,6 @@ struct incremental_topic_updates leaders_preference, remote_read, remote_write, - iceberg_translation_interval_ms, delete_retention_ms, iceberg_delete); } diff --git a/src/v/compat/cluster_generator.h b/src/v/compat/cluster_generator.h index 53ff21427a455..08b598bf489e4 100644 --- a/src/v/compat/cluster_generator.h +++ b/src/v/compat/cluster_generator.h @@ -654,7 +654,6 @@ struct instance_generator { model::iceberg_mode::disabled, std::nullopt, false, - std::nullopt, tristate{disable_tristate}, std::nullopt}; } diff --git a/src/v/compat/cluster_json.h b/src/v/compat/cluster_json.h index 02bcabe364e9c..26c1dd3229f01 100644 --- a/src/v/compat/cluster_json.h +++ b/src/v/compat/cluster_json.h @@ -626,10 +626,6 @@ inline void rjson_serialize( write_member(w, "flush_bytes", tps.flush_bytes); write_member(w, "flush_ms", tps.flush_ms); write_member(w, "iceberg_mode", tps.iceberg_mode); - write_member( - w, - "iceberg_translation_interval_ms", - tps.iceberg_translation_interval_ms); write_member(w, "delete_retention_ms", tps.delete_retention_ms); w.EndObject(); } @@ -702,10 +698,6 @@ inline void read_value(const json::Value& rd, cluster::topic_properties& obj) { read_member(rd, "flush_bytes", obj.flush_bytes); read_member(rd, "flush_ms", obj.flush_ms); read_member(rd, "iceberg_mode", obj.iceberg_mode); - read_member( - rd, - "iceberg_translation_interval_ms", - obj.iceberg_translation_interval_ms); read_member(rd, "delete_retention_ms", obj.delete_retention_ms); } diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 862f340236672..23f6a94900978 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -3680,15 +3680,6 @@ configuration::configuration() .visibility = visibility::user, }, false) - , iceberg_translation_interval_ms_default( - *this, - "iceberg_translation_interval_ms_default", - "How often an Iceberg enabled topic is checked for new data to " - "translate. You can override this value at topic level using " - "redpanda.iceberg.translation.interval.ms.", - {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, - std::chrono::milliseconds(1min), - {.min = 10ms}) , iceberg_catalog_commit_interval_ms( *this, "iceberg_catalog_commit_interval_ms", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 6a91807e419bf..f49305764f290 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -698,8 +698,6 @@ struct configuration final : public config_store { // datalake configurations enterprise> iceberg_enabled; - bounded_property - iceberg_translation_interval_ms_default; bounded_property iceberg_catalog_commit_interval_ms; property iceberg_catalog_base_location; diff --git a/src/v/datalake/datalake_manager.cc b/src/v/datalake/datalake_manager.cc index 8103d2ffaaba7..df73ab2f4c47c 100644 --- a/src/v/datalake/datalake_manager.cc +++ b/src/v/datalake/datalake_manager.cc @@ -65,8 +65,8 @@ datalake_manager::datalake_manager( size_t( std::floor(memory_limit / _effective_max_translator_buffered_data)), "datalake_parallel_translations")) - , _translation_ms_conf(config::shard_local_cfg() - .iceberg_translation_interval_ms_default.bind()) {} + , _iceberg_commit_interval( + config::shard_local_cfg().iceberg_catalog_commit_interval_ms.bind()) {} datalake_manager::~datalake_manager() = default; ss::future<> datalake_manager::start() { @@ -131,7 +131,7 @@ ss::future<> datalake_manager::start() { _topic_table->local().unregister_ntp_delta_notification( topic_properties_registration); }); - _translation_ms_conf.watch([this] { + _iceberg_commit_interval.watch([this] { ssx::spawn_with_gate(_gate, [this]() { for (const auto& [group, _] : _translators) { on_group_notification(group); @@ -150,6 +150,15 @@ ss::future<> datalake_manager::stop() { co_await std::move(f); } +std::chrono::milliseconds datalake_manager::translation_interval_ms() const { + // This aims to have multiple translations within a single commit interval + // window. A minimum interval is in place to disallow frequent translations + // and hence tiny parquet files. This is generally optimized for higher + // throughputs that accumulate enough data within a commit interval window. + static constexpr std::chrono::milliseconds min_translation_interval{5s}; + return std::max(min_translation_interval, _iceberg_commit_interval() / 3); +} + void datalake_manager::on_group_notification(const model::ntp& ntp) { auto partition = _partition_mgr->local().get(ntp); if (!partition || !model::is_user_topic(ntp)) { @@ -178,9 +187,7 @@ void datalake_manager::on_group_notification(const model::ntp& ntp) { start_translator(partition); } else { // check if translation interval changed. - auto target_interval - = topic_cfg->properties.iceberg_translation_interval_ms.value_or( - _translation_ms_conf()); + auto target_interval = translation_interval_ms(); if (it->second->translation_interval() != target_interval) { it->second->reset_translation_interval(target_interval); } @@ -203,7 +210,7 @@ void datalake_manager::start_translator( &_cloud_data_io, _schema_mgr.get(), _type_resolver.get(), - partition->get_ntp_config().iceberg_translation_interval_ms(), + translation_interval_ms(), _sg, _effective_max_translator_buffered_data, &_parallel_translations); diff --git a/src/v/datalake/datalake_manager.h b/src/v/datalake/datalake_manager.h index 62eefb7725abf..4630731e376a7 100644 --- a/src/v/datalake/datalake_manager.h +++ b/src/v/datalake/datalake_manager.h @@ -71,6 +71,7 @@ class datalake_manager : public ss::peering_sharded_service { using translator = std::unique_ptr; using translator_map = chunked_hash_map; + std::chrono::milliseconds translation_interval_ms() const; void on_group_notification(const model::ntp&); void start_translator(ss::lw_shared_ptr); ss::future<> stop_translator(const model::ntp&); @@ -99,7 +100,7 @@ class datalake_manager : public ss::peering_sharded_service { translator_map _translators; using deferred_action = ss::deferred_action>; std::vector _deregistrations; - config::binding _translation_ms_conf; + config::binding _iceberg_commit_interval; // Translation requires buffering data batches in memory for efficient // output representation, this controls the maximum bytes buffered in memory diff --git a/src/v/datalake/tests/fixture.h b/src/v/datalake/tests/fixture.h index 78a18e5dbc105..6bd03f9acefe7 100644 --- a/src/v/datalake/tests/fixture.h +++ b/src/v/datalake/tests/fixture.h @@ -68,7 +68,6 @@ class datalake_cluster_test_fixture : public archiver_cluster_fixture { model::topic topic, int num_partitions = 1, int16_t num_replicas = 3) { cluster::topic_properties props; props.iceberg_mode = model::iceberg_mode::value_schema_id_prefix; - props.iceberg_translation_interval_ms = 50ms; return cluster_test_fixture::create_topic( {model::kafka_namespace, topic}, num_partitions, diff --git a/src/v/datalake/tests/translator_test.cc b/src/v/datalake/tests/translator_test.cc index daabf148c73f3..55134e4968373 100644 --- a/src/v/datalake/tests/translator_test.cc +++ b/src/v/datalake/tests/translator_test.cc @@ -11,6 +11,7 @@ #include "datalake/tests/fixture.h" #include "kafka/client/client.h" #include "kafka/client/test/utils.h" +#include "test_utils/scoped_config.h" #include "test_utils/test.h" namespace kc = kafka::client; @@ -25,6 +26,9 @@ class PartitionTranslatorTestFixture add_node(); } co_await wait_for_all_members(5s); + reduced_commit_interval.get("iceberg_catalog_commit_interval_ms") + .set_value(10000ms); + co_await create_iceberg_topic(test_topic.tp); // create a kafka client for the cluster. auto* rp = instance(model::node_id{0}); @@ -71,6 +75,8 @@ class PartitionTranslatorTestFixture }); } + scoped_config reduced_commit_interval; + model::topic_namespace test_topic{ model::kafka_namespace, model::topic{"test"}}; @@ -83,7 +89,7 @@ TEST_F_CORO(PartitionTranslatorTestFixture, TestBasic) { std::vector> background; background.reserve(5); background.push_back(produce_data_for(ntp, test_runtime)); - // roundrobin leadership across test topic replicas. + // roundrobin leadership across test topic replicas background.push_back(shuffle_leadership(ntp, test_runtime)); for (auto i = 0; i < 3; i++) { auto f = ss::do_with( diff --git a/src/v/kafka/server/handlers/alter_configs.cc b/src/v/kafka/server/handlers/alter_configs.cc index 463fb91dfb912..3dddacada1401 100644 --- a/src/v/kafka/server/handlers/alter_configs.cc +++ b/src/v/kafka/server/handlers/alter_configs.cc @@ -83,7 +83,7 @@ create_topic_properties_update( std::apply(apply_op(op_t::none), update.custom_properties.serde_fields()); static_assert( - std::tuple_size_v == 33, + std::tuple_size_v == 32, "If you added a property, please decide on it's default alter config " "policy, and handle the update in the loop below"); static_assert( @@ -363,14 +363,6 @@ create_topic_properties_update( delete_retention_ms_validator{}); continue; } - - if (cfg.name == topic_property_iceberg_translation_interval_ms) { - parse_and_set_optional_duration( - update.properties.iceberg_translation_interval_ms, - cfg.value, - kafka::config_resource_operation::set); - continue; - } if (cfg.name == topic_property_iceberg_delete) { parse_and_set_optional_bool_alpha( update.properties.iceberg_delete, diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.cc b/src/v/kafka/server/handlers/configs/config_response_utils.cc index 266fe2906d859..dadf54560768e 100644 --- a/src/v/kafka/server/handlers/configs/config_response_utils.cc +++ b/src/v/kafka/server/handlers/configs/config_response_utils.cc @@ -727,20 +727,6 @@ config_response_container_t make_topic_configs( [](const bool& b) { return b ? "true" : "false"; }); } - add_topic_config_if_requested( - config_keys, - result, - topic_property_iceberg_translation_interval_ms, - config::shard_local_cfg().iceberg_translation_interval_ms_default(), - topic_property_iceberg_translation_interval_ms, - topic_properties.iceberg_translation_interval_ms, - include_synonyms, - maybe_make_documentation( - include_documentation, - "Controls how often iceberg translation is attempted on the topic " - "partitions."), - &describe_as_string); - add_topic_config_if_requested( config_keys, result, diff --git a/src/v/kafka/server/handlers/create_topics.cc b/src/v/kafka/server/handlers/create_topics.cc index 2e79446b9af1e..56db192b3ad2c 100644 --- a/src/v/kafka/server/handlers/create_topics.cc +++ b/src/v/kafka/server/handlers/create_topics.cc @@ -75,7 +75,6 @@ bool is_supported(std::string_view name) { topic_property_flush_bytes, topic_property_iceberg_mode, topic_property_leaders_preference, - topic_property_iceberg_translation_interval_ms, topic_property_delete_retention_ms, topic_property_iceberg_delete}); diff --git a/src/v/kafka/server/handlers/incremental_alter_configs.cc b/src/v/kafka/server/handlers/incremental_alter_configs.cc index 4dff42ef603a8..30f866fe71e9a 100644 --- a/src/v/kafka/server/handlers/incremental_alter_configs.cc +++ b/src/v/kafka/server/handlers/incremental_alter_configs.cc @@ -361,13 +361,6 @@ create_topic_properties_update( } throw validation_error("Cloud topics is not enabled"); } - if (cfg.name == topic_property_iceberg_translation_interval_ms) { - parse_and_set_optional_duration( - update.properties.iceberg_translation_interval_ms, - cfg.value, - op); - continue; - } if (cfg.name == topic_property_delete_retention_ms) { parse_and_set_tristate( update.properties.delete_retention_ms, diff --git a/src/v/kafka/server/handlers/topics/types.cc b/src/v/kafka/server/handlers/topics/types.cc index 89a6feb92e821..3e1d42941d011 100644 --- a/src/v/kafka/server/handlers/topics/types.cc +++ b/src/v/kafka/server/handlers/topics/types.cc @@ -266,10 +266,6 @@ to_cluster_type(const creatable_topic& t) { cfg.properties.leaders_preference = get_leaders_preference(config_entries); - cfg.properties.iceberg_translation_interval_ms - = get_duration_value( - config_entries, topic_property_iceberg_translation_interval_ms, true); - cfg.properties.delete_retention_ms = get_delete_retention_ms( config_entries); diff --git a/src/v/kafka/server/handlers/topics/types.h b/src/v/kafka/server/handlers/topics/types.h index 14731355eec4c..ab3304353791c 100644 --- a/src/v/kafka/server/handlers/topics/types.h +++ b/src/v/kafka/server/handlers/topics/types.h @@ -105,9 +105,6 @@ inline constexpr std::string_view topic_property_leaders_preference inline constexpr std::string_view topic_property_cloud_topic_enabled = "redpanda.cloud_topic.enabled"; -inline constexpr std::string_view topic_property_iceberg_translation_interval_ms - = "redpanda.iceberg.translation.interval.ms"; - inline constexpr std::string_view topic_property_iceberg_delete = "redpanda.iceberg.delete"; diff --git a/src/v/kafka/server/tests/alter_config_test.cc b/src/v/kafka/server/tests/alter_config_test.cc index 91d1de95a626f..6f2be1cfdd7a6 100644 --- a/src/v/kafka/server/tests/alter_config_test.cc +++ b/src/v/kafka/server/tests/alter_config_test.cc @@ -377,7 +377,6 @@ FIXTURE_TEST( "flush.bytes", "redpanda.iceberg.mode", "redpanda.leaders.preference", - "redpanda.iceberg.translation.interval.ms", "delete.retention.ms", "redpanda.iceberg.delete", }; diff --git a/src/v/storage/ntp_config.h b/src/v/storage/ntp_config.h index 7bb06507ff042..cf500a95cd673 100644 --- a/src/v/storage/ntp_config.h +++ b/src/v/storage/ntp_config.h @@ -81,8 +81,6 @@ class ntp_config { std::optional flush_bytes; model::iceberg_mode iceberg_mode{default_iceberg_mode}; bool cloud_topic_enabled{default_cloud_topic_enabled}; - std::optional - iceberg_translation_interval_ms{std::nullopt}; // Should not be enabled at the same time as any other tiered storage // properties. @@ -362,14 +360,6 @@ class ntp_config { return _overrides ? _overrides->cloud_topic_enabled : default_cloud_topic_enabled; } - std::chrono::milliseconds iceberg_translation_interval_ms() const { - auto default_value - = config::shard_local_cfg().iceberg_translation_interval_ms_default(); - return _overrides - ? _overrides->iceberg_translation_interval_ms.value_or( - default_value) - : default_value; - } private: model::ntp _ntp; diff --git a/src/v/storage/types.cc b/src/v/storage/types.cc index 5103787853e77..ef91edf16fe97 100644 --- a/src/v/storage/types.cc +++ b/src/v/storage/types.cc @@ -114,8 +114,7 @@ operator<<(std::ostream& o, const ntp_config::default_overrides& v) { "remote_delete: {}, segment_ms: {}, " "initial_retention_local_target_bytes: {}, " "initial_retention_local_target_ms: {}, write_caching: {}, flush_ms: {}, " - "flush_bytes: {} iceberg_mode: {}, iceberg_translation_interval_ms: " - "{}}}", + "flush_bytes: {} iceberg_mode: {} }}", v.compaction_strategy, v.cleanup_policy_bitflags, v.segment_size, @@ -131,8 +130,7 @@ operator<<(std::ostream& o, const ntp_config::default_overrides& v) { v.write_caching, v.flush_ms, v.flush_bytes, - v.iceberg_mode, - v.iceberg_translation_interval_ms); + v.iceberg_mode); if (config::shard_local_cfg().development_enable_cloud_topics()) { fmt::print(o, ", cloud_topic_enabled: {}", v.cloud_topic_enabled); diff --git a/tests/rptest/clients/types.py b/tests/rptest/clients/types.py index 91a3a6042602c..63f880a3bd3b5 100644 --- a/tests/rptest/clients/types.py +++ b/tests/rptest/clients/types.py @@ -40,7 +40,6 @@ class TopicSpec: PROPERTY_FLUSH_MS = "flush.ms" PROPERTY_FLUSH_BYTES = "flush.bytes" PROPERTY_ICEBERG_MODE = "redpanda.iceberg.mode" - PROPERTY_ICEBERG_TRANSLATION_INTERVAL = "redpanda.iceberg.translation.interval.ms" PROPERTY_DELETE_RETENTION_MS = "delete.retention.ms" class CompressionTypes(str, Enum): diff --git a/tests/rptest/tests/datalake/datalake_services.py b/tests/rptest/tests/datalake/datalake_services.py index a5d66bdd672fc..32b2acf72343a 100644 --- a/tests/rptest/tests/datalake/datalake_services.py +++ b/tests/rptest/tests/datalake/datalake_services.py @@ -84,11 +84,8 @@ def create_iceberg_enabled_topic(self, partitions=1, replicas=1, iceberg_mode="key_value", - translation_interval_ms=3000, config: dict[str, Any] = dict()): config[TopicSpec.PROPERTY_ICEBERG_MODE] = iceberg_mode - config[TopicSpec. - PROPERTY_ICEBERG_TRANSLATION_INTERVAL] = translation_interval_ms rpk = RpkTool(self.redpanda) rpk.create_topic(topic=name, partitions=partitions, diff --git a/tests/rptest/tests/datalake/rest_catalog_connection_test.py b/tests/rptest/tests/datalake/rest_catalog_connection_test.py index e1f4747b48e37..05fa75771d4b2 100644 --- a/tests/rptest/tests/datalake/rest_catalog_connection_test.py +++ b/tests/rptest/tests/datalake/rest_catalog_connection_test.py @@ -25,7 +25,6 @@ def __init__(self, test_context): cloud_storage_enable_remote_write=False), extra_rp_conf={ "iceberg_enabled": True, - "iceberg_translation_interval_ms_default": 3000, "iceberg_catalog_commit_interval_ms": 10000 }) self.catalog_service = IcebergRESTCatalog( diff --git a/tests/rptest/tests/describe_topics_test.py b/tests/rptest/tests/describe_topics_test.py index af13f464bd051..2ce2b80f2d7e1 100644 --- a/tests/rptest/tests/describe_topics_test.py +++ b/tests/rptest/tests/describe_topics_test.py @@ -287,13 +287,6 @@ def test_describe_topics_with_documentation_and_types(self): doc_string= "Preferred location (e.g. rack) for partition leaders of this topic." ), - "redpanda.iceberg.translation.interval.ms": - ConfigProperty( - config_type="LONG", - value="60000", - doc_string= - "Controls how often iceberg translation is attempted on the topic partitions." - ), "delete.retention.ms": ConfigProperty( config_type="LONG", diff --git a/tests/rptest/tests/polaris_catalog_smoke_test.py b/tests/rptest/tests/polaris_catalog_smoke_test.py index 74ef45c93fb79..f3b5b41e8af37 100644 --- a/tests/rptest/tests/polaris_catalog_smoke_test.py +++ b/tests/rptest/tests/polaris_catalog_smoke_test.py @@ -81,8 +81,6 @@ def _start_redpanad(self, catalog_prefix, client_id, client_secret, client_id, "iceberg_rest_catalog_client_secret": client_secret, - "iceberg_translation_interval_ms_default": - 3000, "iceberg_catalog_commit_interval_ms": 10000, "iceberg_rest_catalog_prefix": diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 902bf3f7e9dfc..687f6832f83ab 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -141,8 +141,6 @@ def read_topic_properties_serde(rdr: Reader, version): 'iceberg_mode': rdr.read_serde_enum(), 'leaders_preference': rdr.read_optional(read_leaders_preference), 'cloud_topic_enabled': rdr.read_bool(), - 'iceberg_translation_interval_ms': - rdr.read_optional(Reader.read_int64), 'delete_retention_ms': rdr.read_tristate(Reader.read_int64), 'iceberg_delete': rdr.read_optional(Reader.read_bool), } From 1ba9882e8d4f3a6593c818af221559c60e30b93f Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Thu, 21 Nov 2024 07:59:47 -0800 Subject: [PATCH 09/12] datalake/translation: use record translator based on iceberg mode --- src/v/datalake/BUILD | 1 + src/v/datalake/datalake_manager.cc | 39 +++++++++++++++++-- src/v/datalake/datalake_manager.h | 3 +- src/v/datalake/tests/fixture.h | 2 +- .../translation/partition_translator.cc | 9 ++--- .../translation/partition_translator.h | 5 ++- 6 files changed, 47 insertions(+), 12 deletions(-) diff --git a/src/v/datalake/BUILD b/src/v/datalake/BUILD index 5dee666a30829..a75e4a2fd12aa 100644 --- a/src/v/datalake/BUILD +++ b/src/v/datalake/BUILD @@ -177,6 +177,7 @@ redpanda_cc_library( ":catalog_schema_manager", ":cloud_data_io", ":logger", + ":record_translator", ":types", "//src/v/base", "//src/v/cluster", diff --git a/src/v/datalake/datalake_manager.cc b/src/v/datalake/datalake_manager.cc index df73ab2f4c47c..f67804f77d543 100644 --- a/src/v/datalake/datalake_manager.cc +++ b/src/v/datalake/datalake_manager.cc @@ -19,6 +19,7 @@ #include "datalake/coordinator/frontend.h" #include "datalake/logger.h" #include "datalake/record_schema_resolver.h" +#include "datalake/record_translator.h" #include "raft/group_manager.h" #include "schema/registry.h" @@ -26,6 +27,37 @@ namespace datalake { +namespace { + +static std::unique_ptr +make_type_resolver(model::iceberg_mode mode, schema::registry& sr) { + switch (mode) { + case model::iceberg_mode::disabled: + vassert( + false, + "Cannot make record translator when iceberg is disabled, logic bug."); + case model::iceberg_mode::key_value: + return std::make_unique(); + case model::iceberg_mode::value_schema_id_prefix: + return std::make_unique(sr); + } +} + +static std::unique_ptr +make_record_translator(model::iceberg_mode mode) { + switch (mode) { + case model::iceberg_mode::disabled: + vassert( + false, + "Cannot make record translator when iceberg is disabled, logic bug."); + case model::iceberg_mode::key_value: + return std::make_unique(); + case model::iceberg_mode::value_schema_id_prefix: + return std::make_unique(); + } +} +} // namespace + datalake_manager::datalake_manager( model::node_id self, ss::sharded* group_mgr, @@ -184,7 +216,7 @@ void datalake_manager::on_group_notification(const model::ntp& ntp) { // By now we know the partition is a leader and iceberg is enabled, so // there has to be a translator, spin one up if it doesn't already exist. if (it == _translators.end()) { - start_translator(partition); + start_translator(partition, topic_cfg->properties.iceberg_mode); } else { // check if translation interval changed. auto target_interval = translation_interval_ms(); @@ -195,7 +227,7 @@ void datalake_manager::on_group_notification(const model::ntp& ntp) { } void datalake_manager::start_translator( - ss::lw_shared_ptr partition) { + ss::lw_shared_ptr partition, model::iceberg_mode mode) { auto it = _translators.find(partition->ntp()); vassert( it == _translators.end(), @@ -209,7 +241,8 @@ void datalake_manager::start_translator( _features, &_cloud_data_io, _schema_mgr.get(), - _type_resolver.get(), + make_type_resolver(mode, *_schema_registry), + make_record_translator(mode), translation_interval_ms(), _sg, _effective_max_translator_buffered_data, diff --git a/src/v/datalake/datalake_manager.h b/src/v/datalake/datalake_manager.h index 4630731e376a7..7c562f0af72e0 100644 --- a/src/v/datalake/datalake_manager.h +++ b/src/v/datalake/datalake_manager.h @@ -73,7 +73,8 @@ class datalake_manager : public ss::peering_sharded_service { std::chrono::milliseconds translation_interval_ms() const; void on_group_notification(const model::ntp&); - void start_translator(ss::lw_shared_ptr); + void start_translator( + ss::lw_shared_ptr, model::iceberg_mode); ss::future<> stop_translator(const model::ntp&); model::node_id _self; diff --git a/src/v/datalake/tests/fixture.h b/src/v/datalake/tests/fixture.h index 6bd03f9acefe7..ff3cf7be6771b 100644 --- a/src/v/datalake/tests/fixture.h +++ b/src/v/datalake/tests/fixture.h @@ -67,7 +67,7 @@ class datalake_cluster_test_fixture : public archiver_cluster_fixture { ss::future<> create_iceberg_topic( model::topic topic, int num_partitions = 1, int16_t num_replicas = 3) { cluster::topic_properties props; - props.iceberg_mode = model::iceberg_mode::value_schema_id_prefix; + props.iceberg_mode = model::iceberg_mode::key_value; return cluster_test_fixture::create_topic( {model::kafka_namespace, topic}, num_partitions, diff --git a/src/v/datalake/translation/partition_translator.cc b/src/v/datalake/translation/partition_translator.cc index 1e800063796f2..faeb5a844586c 100644 --- a/src/v/datalake/translation/partition_translator.cc +++ b/src/v/datalake/translation/partition_translator.cc @@ -88,7 +88,8 @@ partition_translator::partition_translator( ss::sharded* features, std::unique_ptr* cloud_io, schema_manager* schema_mgr, - type_resolver* type_resolver, + std::unique_ptr type_resolver, + std::unique_ptr record_translator, std::chrono::milliseconds translation_interval, ss::scheduling_group sg, size_t reader_max_bytes, @@ -102,10 +103,8 @@ partition_translator::partition_translator( , _features(features) , _cloud_io(cloud_io) , _schema_mgr(schema_mgr) - // TODO: type resolver and record translator should be constructed based on - // topic configs. - , _type_resolver(type_resolver) - , _record_translator(std::make_unique()) + , _type_resolver(std::move(type_resolver)) + , _record_translator(std::move(record_translator)) , _partition_proxy(std::make_unique( kafka::make_partition_proxy(_partition))) , _jitter{translation_interval, translation_jitter} diff --git a/src/v/datalake/translation/partition_translator.h b/src/v/datalake/translation/partition_translator.h index 1b243b2c36569..59ca380903226 100644 --- a/src/v/datalake/translation/partition_translator.h +++ b/src/v/datalake/translation/partition_translator.h @@ -72,7 +72,8 @@ class partition_translator { ss::sharded* features, std::unique_ptr* cloud_io, schema_manager* schema_mgr, - type_resolver* type_resolver, + std::unique_ptr type_resolver, + std::unique_ptr record_translator, std::chrono::milliseconds translation_interval, ss::scheduling_group sg, size_t reader_max_bytes, @@ -117,7 +118,7 @@ class partition_translator { ss::sharded* _features; std::unique_ptr* _cloud_io; schema_manager* _schema_mgr; - type_resolver* _type_resolver; + std::unique_ptr _type_resolver; std::unique_ptr _record_translator; std::unique_ptr _partition_proxy; using jitter_t From a81ee528ef01e676d4501cc4d8d19207605f8ee0 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Fri, 22 Nov 2024 11:07:05 -0800 Subject: [PATCH 10/12] datalake/tests: bump coordinator snapshot timeout .. it seems to take longer in debug builds for the coordinator to snapshot --- tests/rptest/tests/datalake/coordinator_retention_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/rptest/tests/datalake/coordinator_retention_test.py b/tests/rptest/tests/datalake/coordinator_retention_test.py index e126c7058c388..cf8a66819cd23 100644 --- a/tests/rptest/tests/datalake/coordinator_retention_test.py +++ b/tests/rptest/tests/datalake/coordinator_retention_test.py @@ -72,7 +72,7 @@ def do_test_retention(self, dl: DatalakeServices): try: wait_until( self.wait_until_coordinator_snapshots, - timeout_sec=20, + timeout_sec=30, backoff_sec=3, err_msg= "Timed out waiting for coordinator partitions to snapshot.") From a87e6ce9c79eb4e491076618c63d488e8d2623f6 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Thu, 21 Nov 2024 13:20:01 -0800 Subject: [PATCH 11/12] datalake/translation: prevent offset gaps and stuck translation Normally gaps shouldn't happen with translation due to the enforcement of max_collectible_offset. However the following sequence of actions can create a gap. iceberg enabled iceberg disabled <-- compaction --> iceberg enabled This is an unfixable gap if compaction cleaned up an offset range adjacent to last translated offset. The fix just plugs the gap by adjusting the begin offset of the range that is to be committed with the coordinator. This is a rare case but can result in a stuck translation if it happens. --- .../translation/partition_translator.cc | 20 ++++++++++++++++++- .../translation/partition_translator.h | 4 +++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/v/datalake/translation/partition_translator.cc b/src/v/datalake/translation/partition_translator.cc index faeb5a844586c..1920559890978 100644 --- a/src/v/datalake/translation/partition_translator.cc +++ b/src/v/datalake/translation/partition_translator.cc @@ -279,7 +279,7 @@ partition_translator::do_translate_once(retry_chain_node& parent_rcn) { if ( translation_result && co_await checkpoint_translated_data( - parent_rcn, std::move(translation_result.value()))) { + parent_rcn, read_begin_offset, std::move(translation_result.value()))) { co_return translation_success::yes; } co_return translation_success::no; @@ -288,10 +288,28 @@ partition_translator::do_translate_once(retry_chain_node& parent_rcn) { ss::future partition_translator::checkpoint_translated_data( retry_chain_node& rcn, + kafka::offset reader_begin_offset, coordinator::translated_offset_range translated_range) { if (translated_range.files.empty()) { co_return checkpoint_result::yes; } + if (reader_begin_offset != translated_range.start_offset) { + // This is possible if there is a gap in offsets range, eg from + // compaction. Normally that shouldn't be the case, as translation + // enforces max_collectible_offset which prevents compaction or other + // forms of retention from kicking in before translation actually + // happens. However there could be a sequence of enabling / disabling + // iceberg configuration on the topic that can temporarily unblock + // compaction thus creating gaps. Here we adjust the offset range to + // so the coordinator sees a contiguous offset range. + vlog( + _logger.info, + "detected an offset range gap in [{}, {}), adjusting the begin " + "offset to avoid gaps in coordinator tracked offsets.", + reader_begin_offset, + translated_range.start_offset); + translated_range.start_offset = reader_begin_offset; + } auto last_offset = translated_range.last_offset; coordinator::add_translated_data_files_request request; request.tp = _partition->ntp().tp; diff --git a/src/v/datalake/translation/partition_translator.h b/src/v/datalake/translation/partition_translator.h index 59ca380903226..cf04a6d52fbc4 100644 --- a/src/v/datalake/translation/partition_translator.h +++ b/src/v/datalake/translation/partition_translator.h @@ -105,7 +105,9 @@ class partition_translator { using checkpoint_result = ss::bool_class; ss::future checkpoint_translated_data( - retry_chain_node& parent, coordinator::translated_offset_range); + retry_chain_node& parent, + kafka::offset reader_begin_offset, + coordinator::translated_offset_range task_result); kafka::offset min_offset_for_translation() const; // Returns max consumable offset for translation. From 2d923519b67c211a39c9f36e1b5ddd7a76212110 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Fri, 22 Nov 2024 18:27:24 -0800 Subject: [PATCH 12/12] datalake/tests: add a reproducer for gaps with compaction Test that can reproduce gaps due to compaction due to a series of enable/disable iceberg commands --- .../tests/datalake/compaction_gaps_test.py | 116 ++++++++++++++++++ .../tests/datalake/datalake_services.py | 32 +++++ .../tests/datalake/query_engine_base.py | 5 + 3 files changed, 153 insertions(+) create mode 100644 tests/rptest/tests/datalake/compaction_gaps_test.py diff --git a/tests/rptest/tests/datalake/compaction_gaps_test.py b/tests/rptest/tests/datalake/compaction_gaps_test.py new file mode 100644 index 0000000000000..07767e244e166 --- /dev/null +++ b/tests/rptest/tests/datalake/compaction_gaps_test.py @@ -0,0 +1,116 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from time import time +from rptest.clients.kafka_cat import KafkaCat +from rptest.clients.types import TopicSpec +from rptest.services.kgo_verifier_services import KgoVerifierProducer +from rptest.tests.datalake.datalake_services import DatalakeServices +from rptest.tests.datalake.query_engine_base import QueryEngineType +from rptest.tests.redpanda_test import RedpandaTest +from rptest.services.redpanda import SISettings +from rptest.tests.datalake.utils import supported_storage_types +from ducktape.mark import matrix +from ducktape.utils.util import wait_until +from rptest.services.cluster import cluster +from rptest.utils.mode_checks import skip_debug_mode + + +class CompactionGapsTest(RedpandaTest): + def __init__(self, test_ctx, *args, **kwargs): + super(CompactionGapsTest, self).__init__( + test_ctx, + num_brokers=1, + si_settings=SISettings(test_context=test_ctx, fast_uploads=True), + extra_rp_conf={ + "iceberg_enabled": "true", + "iceberg_catalog_commit_interval_ms": 5000, + "datalake_coordinator_snapshot_max_delay_secs": 10, + "log_compaction_interval_ms": 2000 + }, + *args, + **kwargs) + self.test_ctx = test_ctx + self.topic_name = "test" + self.segment_size = 5 * 1024 * 1024 + self.kafka_cat = KafkaCat(self.redpanda) + + def partition_segments(self) -> int: + assert len(self.redpanda.nodes) == 1, self.redpanda.nodes + node = self.redpanda.nodes[0] + storage = self.redpanda.node_storage(node) + topic_partitions = storage.partitions("kafka", self.topic_name) + assert len(topic_partitions) == 1, len(topic_partitions) + segment_count = len(topic_partitions[0].segments) + self.redpanda.logger.debug(f"Current segment count: {segment_count}") + return segment_count + + def wait_until_segment_count(self, count): + wait_until( + lambda: self.partition_segments() == count, + timeout_sec=30, + backoff_sec=3, + err_msg=f"Timed out waiting for segment count to reach {count}") + + def produce_until_segment_count(self, count): + timeout_sec = 30 + deadline = time() + timeout_sec + while True: + current_segment_count = self.partition_segments() + if current_segment_count >= count: + return + if time() > deadline: + assert False, f"Unable to reach segment count {count} in {timeout_sec}s, current count {current_segment_count}" + KgoVerifierProducer.oneshot(self.test_ctx, + self.redpanda, + self.topic_name, + 2024, + 10000, + key_set_cardinality=2) + + def ensure_translation(self, dl: DatalakeServices): + (_, max_offset) = self.kafka_cat.list_offsets(topic=self.topic_name, + partition=0) + self.redpanda.logger.debug( + f"Ensuring translation until: {max_offset - 1}") + dl.wait_for_translation_until_offset(self.topic_name, max_offset - 1) + + def do_test_no_gaps(self, dl: DatalakeServices): + + dl.create_iceberg_enabled_topic(self.topic_name, + iceberg_mode="key_value", + config={ + "cleanup.policy": + TopicSpec.CLEANUP_COMPACT, + "segment.bytes": self.segment_size + }) + + for _ in range(5): + self.produce_until_segment_count(5) + # # Ensure everything is translated + self.ensure_translation(dl) + # # Disable iceberg + dl.set_iceberg_mode_on_topic(self.topic_name, "disabled") + # Append more data + self.produce_until_segment_count(8) + # # Compact the data + # # One closed segment and one open (current) segment + self.wait_until_segment_count(2) + # # Enable iceberg again + dl.set_iceberg_mode_on_topic(self.topic_name, "key_value") + + @cluster(num_nodes=4) + @skip_debug_mode + @matrix(cloud_storage_type=supported_storage_types()) + def test_translation_no_gaps(self, cloud_storage_type): + with DatalakeServices(self.test_ctx, + redpanda=self.redpanda, + include_query_engines=[QueryEngineType.TRINO + ]) as dl: + self.do_test_no_gaps(dl) diff --git a/tests/rptest/tests/datalake/datalake_services.py b/tests/rptest/tests/datalake/datalake_services.py index 32b2acf72343a..f2af2ccc3481f 100644 --- a/tests/rptest/tests/datalake/datalake_services.py +++ b/tests/rptest/tests/datalake/datalake_services.py @@ -92,6 +92,10 @@ def create_iceberg_enabled_topic(self, replicas=replicas, config=config) + def set_iceberg_mode_on_topic(self, topic: str, mode: str): + rpk = RpkTool(self.redpanda) + rpk.alter_topic_config(topic, "redpanda.iceberg.mode", mode) + def wait_for_iceberg_table(self, namespace, table, timeout, backoff_sec): client = self.catalog_service.client("redpanda-iceberg-catalog") @@ -109,6 +113,34 @@ def table_created(): f"Timed out waiting {namespace}.{table} to be created in the catalog" ) + def wait_for_translation_until_offset(self, + topic, + offset, + partition=0, + timeout=30, + backoff_sec=5): + self.wait_for_iceberg_table("redpanda", topic, timeout, backoff_sec) + table_name = f"redpanda.{topic}" + + def translation_done(): + offsets = dict( + map( + lambda e: (e.engine_name(), + e.max_translated_offset(table_name, partition)), + self.query_engines)) + self.redpanda.logger.debug( + f"Current translated offsets: {offsets}") + return all( + [offset <= max_offset for _, max_offset in offsets.items()]) + + wait_until( + translation_done, + timeout_sec=timeout, + backoff_sec=backoff_sec, + err_msg= + f"Timed out waiting for iceberg translation until offset: {offset}" + ) + def wait_for_translation(self, topic, msg_count, diff --git a/tests/rptest/tests/datalake/query_engine_base.py b/tests/rptest/tests/datalake/query_engine_base.py index 2fbf26b129494..c97ad156b5493 100644 --- a/tests/rptest/tests/datalake/query_engine_base.py +++ b/tests/rptest/tests/datalake/query_engine_base.py @@ -50,3 +50,8 @@ def count_table(self, table) -> int: query = f"select count(*) from {table}" with self.run_query(query) as cursor: return int(cursor.fetchone()[0]) + + def max_translated_offset(self, table, partition) -> int: + query = f"select max(redpanda.offset) from {table} where redpanda.partition={partition}" + with self.run_query(query) as cursor: + return int(cursor.fetchone()[0])