Skip to content

Commit

Permalink
Merge pull request #23633 from bharathv/ib_pt2
Browse files Browse the repository at this point in the history
datalake: introduce partition translator
  • Loading branch information
bharathv authored Oct 23, 2024
2 parents 3a7aeea + a71f368 commit 18c7d15
Show file tree
Hide file tree
Showing 66 changed files with 1,963 additions and 144 deletions.
1 change: 1 addition & 0 deletions src/v/cloud_storage/tests/topic_manifest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ SEASTAR_THREAD_TEST_CASE(test_topic_manifest_serde_feature_table) {
false,
std::nullopt,
false,
std::nullopt,
};

auto random_initial_revision_id
Expand Down
46 changes: 26 additions & 20 deletions src/v/cluster/tests/cluster_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,38 +237,41 @@ class cluster_test_fixture {
return groups;
}

void create_topic(
ss::future<> create_topic(
model::topic_namespace_view tp_ns,
int partitions = 1,
int16_t replication_factor = 1) {
int16_t replication_factor = 1,
std::optional<cluster::topic_properties> custom_properties
= std::nullopt) {
vassert(!_instances.empty(), "no nodes in the cluster");
// wait until there is a controller stm leader.
tests::cooperative_spin_wait_with_timeout(10s, [this] {
co_await tests::cooperative_spin_wait_with_timeout(10s, [this] {
return std::any_of(
_instances.begin(), _instances.end(), [](auto& it) {
return it.second->app.controller->linearizable_barrier()
.get();
return it.second->app.controller->is_raft0_leader();
});
}).get();
});
auto leader_it = std::find_if(
_instances.begin(), _instances.end(), [](auto& it) {
return it.second->app.controller->is_raft0_leader();
});
auto& app_0 = leader_it->second->app;
cluster::topic_configuration_vector cfgs = {
cluster::topic_configuration{
tp_ns.ns, tp_ns.tp, partitions, replication_factor}};
auto results = app_0.controller->get_topics_frontend()
auto topic_cfg = cluster::topic_configuration{
tp_ns.ns, tp_ns.tp, partitions, replication_factor};
if (custom_properties) {
topic_cfg.properties = custom_properties.value();
}
cluster::topic_configuration_vector cfgs = {std::move(topic_cfg)};
auto results = co_await app_0.controller->get_topics_frontend()
.local()
.create_topics(
cluster::without_custom_assignments(std::move(cfgs)),
model::no_timeout)
.get();
model::no_timeout);
BOOST_REQUIRE_EQUAL(results.size(), 1);
auto& result = results.at(0);
BOOST_REQUIRE_EQUAL(result.ec, cluster::errc::success);
auto& leaders = app_0.controller->get_partition_leaders().local();
tests::cooperative_spin_wait_with_timeout(10s, [&]() {
co_await tests::cooperative_spin_wait_with_timeout(10s, [&]() {
auto md = app_0.metadata_cache.local().get_topic_metadata(
result.tp_ns);
return md && md->get_assignments().size() == partitions
Expand All @@ -278,7 +281,7 @@ class cluster_test_fixture {
[&](const cluster::assignments_set::value_type& p) {
return leaders.get_leader(tp_ns, p.second.id);
});
}).get();
});
}

std::tuple<redpanda_thread_fixture*, ss::lw_shared_ptr<cluster::partition>>
Expand All @@ -296,21 +299,24 @@ class cluster_test_fixture {
return std::make_tuple(nullptr, nullptr);
}

void shuffle_leadership(model::ntp ntp) {
ss::future<> shuffle_leadership(model::ntp ntp) {
BOOST_REQUIRE(!_instances.empty());
auto& app = _instances.begin()->second.get()->app;
auto& leaders = app.controller->get_partition_leaders().local();
auto current_leader = leaders.get_leader(ntp);
if (!current_leader) {
return;
return ss::now();
}
auto& leader_app = _instances.at(*current_leader).get()->app;
auto partition = leader_app.partition_manager.local().get(ntp);
BOOST_REQUIRE(partition);
partition
->transfer_leadership(
raft::transfer_leadership_request{.group = partition->group()})
.get();
auto current_leader_id = current_leader.value()();
auto new_leader_id = model::node_id{
++current_leader_id % static_cast<int>(_instances.size())};
return partition
->transfer_leadership(raft::transfer_leadership_request{
.group = partition->group(), .target = new_leader_id})
.discard_result();
}

protected:
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/topic_configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ storage::ntp_config topic_configuration::make_ntp_config(
.flush_bytes = properties.flush_bytes,
.iceberg_enabled = properties.iceberg_enabled,
.cloud_topic_enabled = properties.cloud_topic_enabled,
.iceberg_translation_interval_ms
= properties.iceberg_translation_interval_ms,
});
}
return {
Expand Down
16 changes: 10 additions & 6 deletions src/v/cluster/topic_properties.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace cluster {
std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
fmt::print(
o,
"{{compression: {}, cleanup_policy_bitflags: {}, compaction_strategy: "
"{{ compression: {}, cleanup_policy_bitflags: {}, compaction_strategy: "
"{}, retention_bytes: {}, retention_duration_ms: {}, segment_size: {}, "
"timestamp_type: {}, recovery_enabled: {}, shadow_indexing: {}, "
"read_replica: {}, read_replica_bucket: {}, "
Expand All @@ -40,7 +40,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
"flush_ms: {}, "
"flush_bytes: {}, "
"remote_label: {}, iceberg_enabled: {}, "
"leaders_preference: {}",
"leaders_preference: {} ",
"iceberg_translation_interval_ms: {} ",
properties.compression,
properties.cleanup_policy_bitflags,
properties.compaction_strategy,
Expand Down Expand Up @@ -75,7 +76,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
properties.flush_bytes,
properties.remote_label,
properties.iceberg_enabled,
properties.leaders_preference);
properties.leaders_preference,
properties.iceberg_translation_interval_ms);

if (config::shard_local_cfg().development_enable_cloud_topics()) {
fmt::print(
Expand All @@ -86,7 +88,6 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {

return o;
}

bool topic_properties::is_compacted() const {
if (!cleanup_policy_bitflags) {
return false;
Expand Down Expand Up @@ -121,7 +122,8 @@ bool topic_properties::has_overrides() const {
|| write_caching.has_value() || flush_ms.has_value()
|| flush_bytes.has_value() || remote_label.has_value()
|| (iceberg_enabled != storage::ntp_config::default_iceberg_enabled)
|| leaders_preference.has_value();
|| leaders_preference.has_value()
|| iceberg_translation_interval_ms.has_value();

if (config::shard_local_cfg().development_enable_cloud_topics()) {
return overrides
Expand Down Expand Up @@ -163,6 +165,7 @@ topic_properties::get_ntp_cfg_overrides() const {
ret.flush_bytes = flush_bytes;
ret.iceberg_enabled = iceberg_enabled;
ret.cloud_topic_enabled = cloud_topic_enabled;
ret.iceberg_translation_interval_ms = iceberg_translation_interval_ms;
return ret;
}

Expand Down Expand Up @@ -253,7 +256,8 @@ adl<cluster::topic_properties>::from(iobuf_parser& parser) {
std::nullopt,
false,
std::nullopt,
false};
false,
std::nullopt};
}

} // namespace reflection
11 changes: 8 additions & 3 deletions src/v/cluster/topic_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ struct topic_properties
std::optional<size_t> flush_bytes,
bool iceberg_enabled,
std::optional<config::leaders_preference> leaders_preference,
bool cloud_topic_enabled)
bool cloud_topic_enabled,
std::optional<std::chrono::milliseconds> iceberg_translation_interval)
: compression(compression)
, cleanup_policy_bitflags(cleanup_policy_bitflags)
, compaction_strategy(compaction_strategy)
Expand Down Expand Up @@ -114,7 +115,8 @@ struct topic_properties
, flush_bytes(flush_bytes)
, iceberg_enabled(iceberg_enabled)
, leaders_preference(std::move(leaders_preference))
, cloud_topic_enabled(cloud_topic_enabled) {}
, cloud_topic_enabled(cloud_topic_enabled)
, iceberg_translation_interval_ms(iceberg_translation_interval) {}

std::optional<model::compression> compression;
std::optional<model::cleanup_policy_bitflags> cleanup_policy_bitflags;
Expand Down Expand Up @@ -187,6 +189,8 @@ struct topic_properties

bool cloud_topic_enabled{storage::ntp_config::default_cloud_topic_enabled};

std::optional<std::chrono::milliseconds> iceberg_translation_interval_ms;

bool is_compacted() const;
bool has_overrides() const;
bool requires_remote_erase() const;
Expand Down Expand Up @@ -231,7 +235,8 @@ struct topic_properties
remote_topic_namespace_override,
iceberg_enabled,
leaders_preference,
cloud_topic_enabled);
cloud_topic_enabled,
iceberg_translation_interval_ms);
}

friend bool operator==(const topic_properties&, const topic_properties&)
Expand Down
3 changes: 3 additions & 0 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,9 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) {
storage::ntp_config::default_iceberg_enabled);
incremental_update(
updated_properties.leaders_preference, overrides.leaders_preference);
incremental_update(
updated_properties.iceberg_translation_interval_ms,
overrides.iceberg_translation_interval_ms);

auto& properties = tp->second.get_configuration().properties;
// no configuration change, no need to generate delta
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) {
"initial_retention_local_target_ms: {}, write_caching: {}, flush_ms: {}, "
"flush_bytes: {}, iceberg_enabled: {}, leaders_preference: {}, "
"remote_read: {}, remote_write: {}",
"iceberg_translation_interval_ms: {}",
i.compression,
i.cleanup_policy_bitflags,
i.compaction_strategy,
Expand Down Expand Up @@ -404,7 +405,8 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) {
i.iceberg_enabled,
i.leaders_preference,
i.remote_read,
i.remote_write);
i.remote_write,
i.iceberg_translation_interval_ms);
return o;
}

Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,8 @@ struct incremental_topic_updates
incremental_update_operation::none};
property_update<std::optional<config::leaders_preference>>
leaders_preference;
property_update<std::optional<std::chrono::milliseconds>>
iceberg_translation_interval_ms;

// To allow us to better control use of the deprecated shadow_indexing
// field, use getters and setters instead.
Expand Down Expand Up @@ -671,7 +673,8 @@ struct incremental_topic_updates
iceberg_enabled,
leaders_preference,
remote_read,
remote_write);
remote_write,
iceberg_translation_interval_ms);
}

friend std::ostream&
Expand Down
3 changes: 2 additions & 1 deletion src/v/compat/cluster_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,8 @@ struct instance_generator<cluster::topic_properties> {
[] { return random_generators::get_int<size_t>(); }),
false,
std::nullopt,
false};
false,
std::nullopt};
}

static std::vector<cluster::topic_properties> limits() { return {}; }
Expand Down
8 changes: 8 additions & 0 deletions src/v/compat/cluster_json.h
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,10 @@ inline void rjson_serialize(
write_member(w, "flush_bytes", tps.flush_bytes);
write_member(w, "flush_ms", tps.flush_ms);
write_member(w, "iceberg_enabled", tps.iceberg_enabled);
write_member(
w,
"iceberg_translation_interval_ms",
tps.iceberg_translation_interval_ms);
w.EndObject();
}

Expand Down Expand Up @@ -697,6 +701,10 @@ inline void read_value(const json::Value& rd, cluster::topic_properties& obj) {
read_member(rd, "flush_bytes", obj.flush_bytes);
read_member(rd, "flush_ms", obj.flush_ms);
read_member(rd, "iceberg_enabled", obj.iceberg_enabled);
read_member(
rd,
"iceberg_translation_interval_ms",
obj.iceberg_translation_interval_ms);
}

inline void rjson_serialize(
Expand Down
9 changes: 9 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3688,6 +3688,15 @@ configuration::configuration()
"topic-level settings.",
{.needs_restart = needs_restart::yes, .visibility = visibility::user},
false)
, iceberg_translation_interval_ms_default(
*this,
"iceberg_translation_interval_ms_default",
"How often an Iceberg enabled topic is checked for new data to "
"translate. You can override this value at topic level using "
"redpanda.iceberg.translation.interval.ms.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
std::chrono::milliseconds(1min),
{.min = 10ms})
, development_enable_cloud_topics(
*this,
"development_enable_cloud_topics",
Expand Down
2 changes: 2 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,8 @@ struct configuration final : public config_store {

// datalake configurations
property<bool> iceberg_enabled;
bounded_property<std::chrono::milliseconds>
iceberg_translation_interval_ms_default;

configuration();

Expand Down
55 changes: 31 additions & 24 deletions src/v/datalake/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -126,30 +126,37 @@ redpanda_cc_library(
],
)

redpanda_cc_library(
name = "manager",
srcs = [
"datalake_manager.cc",
],
hdrs = [
"datalake_manager.h",
],
include_prefix = "datalake",
visibility = ["//visibility:public"],
deps = [
":logger",
":types",
"//src/v/base",
"//src/v/cluster",
"//src/v/datalake/coordinator:frontend",
"//src/v/model",
"//src/v/raft",
"//src/v/serde",
"//src/v/ssx:semaphore",
"@fmt",
"@seastar",
],
)
# TODO(iceberg): uncomment once partition_translator
# and dependencies are bazel friendly.
#redpanda_cc_library(
# name = "manager",
# srcs = [
# "datalake_manager.cc",
# ],
# hdrs = [
# "datalake_manager.h",
# ],
# include_prefix = "datalake",
# visibility = ["//visibility:public"],
# deps = [
# ":cloud_data_io",
# ":logger",
# ":types",
# "//src/v/base",
# "//src/v/cluster",
# "//src/v/config",
# "//src/v/container:chunked_hash_map",
# "//src/v/datalake/coordinator:frontend",
# "//src/v/datalake/translation:partition_translator",
# "//src/v/features",
# "//src/v/model",
# "//src/v/raft",
# "//src/v/serde",
# "//src/v/ssx:semaphore",
# "@fmt",
# "@seastar",
# ],
#)

redpanda_cc_library(
name = "table_definition",
Expand Down
Loading

0 comments on commit 18c7d15

Please sign in to comment.