Skip to content

Commit

Permalink
Merge pull request #23207 from bashtanov/migrations-high-level-api-impl
Browse files Browse the repository at this point in the history
Migrations (topic un-/mount) high level api implementation
  • Loading branch information
mmaslankaprv authored Sep 20, 2024
2 parents 9f013b0 + 65f7d79 commit ac7e0c1
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 70 deletions.
4 changes: 4 additions & 0 deletions src/v/cluster/data_migrated_resources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ migrated_resource_state get_resource_state<inbound_migration>(state state) {
case state::finished:
case state::cancelled:
return migrated_resource_state::non_restricted;
case state::deleted:
vassert(false, "a migration cannot be in deleted state");
}
}

Expand All @@ -79,6 +81,8 @@ migrated_resource_state get_resource_state<outbound_migration>(state state) {
case state::finished:
case state::cancelled:
return migrated_resource_state::non_restricted;
case state::deleted:
vassert(false, "a migration cannot be in deleted state");
}
}

Expand Down
94 changes: 72 additions & 22 deletions src/v/cluster/data_migration_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -772,27 +772,34 @@ void backend::to_advance_if_done(
}
}

ss::future<> backend::advance(id migration_id, state sought_state) {
std::error_code ec;
if (sought_state == state::deleted) {
ec = co_await _frontend.remove_migration(migration_id);
} else {
ec = co_await _frontend.update_migration_state(
migration_id, sought_state);
}
vlogl(
dm_log,
(ec == make_error_code(errc::success)) ? ss::log_level::debug
: ss::log_level::warn,
"request to advance migration {} into state {} has "
"been processed with error code {}",
migration_id,
sought_state,
ec);
}

void backend::spawn_advances() {
for (auto& [migration_id, advance_info] : _advance_requests) {
if (advance_info.sent) {
continue;
}
advance_info.sent = true;
auto& sought_state = advance_info.sought_state;
auto sought_state = advance_info.sought_state;
ssx::spawn_with_gate(_gate, [this, migration_id, sought_state]() {
return _frontend.update_migration_state(migration_id, sought_state)
.then([migration_id, sought_state](std::error_code ec) {
vlogl(
dm_log,
(ec == make_error_code(errc::success))
? ss::log_level::debug
: ss::log_level::warn,
"request to advance migration {} into state {} has "
"been processed with error code {}",
migration_id,
sought_state,
ec);
});
return advance(migration_id, sought_state);
});
}
}
Expand Down Expand Up @@ -867,6 +874,12 @@ ss::future<> backend::handle_migration_update(id id) {
vlog(dm_log.debug, "dropping migration {} reconciliation state", id);
co_await drop_migration_reconciliation_rstate(old_it);
}
// delete old advance requests
if (auto it = _advance_requests.find(id); it != _advance_requests.end()) {
if (!new_state || it->second.sought_state <= new_state) {
_advance_requests.erase(it);
}
}
// create new state if needed
if (new_maybe_metadata) {
const auto& new_metadata = new_maybe_metadata->get();
Expand All @@ -875,16 +888,15 @@ ss::future<> backend::handle_migration_update(id id) {
vlog(
dm_log.debug, "creating migration {} reconciliation state", id);
auto new_it = _migration_states.emplace_hint(old_it, id, scope);
co_await reconcile_migration(new_it->second, new_metadata);
if (scope.topic_work_needed || scope.partition_work_needed) {
co_await reconcile_migration(new_it->second, new_metadata);
} else {
// yes it is done as there is nothing to do
to_advance_if_done(new_it);
}
need_wakeup = true;
}
}
// delete old advance requests
if (auto it = _advance_requests.find(id); it != _advance_requests.end()) {
if (!new_state || it->second.sought_state <= new_state) {
_advance_requests.erase(it);
}
}

if (_is_coordinator && need_wakeup) {
wakeup();
Expand Down Expand Up @@ -918,6 +930,11 @@ ss::future<> backend::process_delta(cluster::topic_table_delta&& delta) {
delta.type,
migration_id);
auto& mrstate = _migration_states.find(migration_id)->second;
if (
!mrstate.scope.partition_work_needed
&& !mrstate.scope.topic_work_needed) {
co_return;
}
auto& tstate = mrstate.outstanding_topics[nt];
clear_tstate_belongings(nt, tstate);
tstate.clear();
Expand Down Expand Up @@ -1434,7 +1451,40 @@ backend::get_work_scope(const migration_metadata& metadata) {
return std::visit(
[&metadata](const auto& migration) {
migration_direction_tag<std::decay_t<decltype(migration)>> tag;
return get_work_scope(tag, metadata);
auto scope = get_work_scope(tag, metadata);
if (migration.auto_advance && !scope.sought_state) {
switch (metadata.state) {
case state::planned:
scope.sought_state = state::preparing;
break;
case state::prepared:
scope.sought_state = state::executing;
break;
case state::executed:
scope.sought_state = state::cut_over;
break;
case state::finished:
scope.sought_state = state::deleted;
break;
case state::cancelled:
// An auto-advance migration can only be cancelled manually if
// it got stuck. Let's not deleted it automatically in case
// we'd like to investigate how it happened.
break;
case state::deleted:
vassert(false, "A migration cannot be in a deleted state");
case state::preparing:
case state::executing:
case state::cut_over:
case state::canceling:
vassert(
false,
"Work scope not found for migration {} transient state {}",
metadata.id,
metadata.state);
}
}
return scope;
},
metadata.migration);
}
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/data_migration_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class backend {
check_ntp_states_locally(check_ntp_states_request&& req);
void
to_advance_if_done(migration_reconciliation_states_t::const_iterator it);
ss::future<> advance(id migration_id, state sought_state);
void spawn_advances();

/* topic work */
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/data_migration_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ bool migrations_table::is_valid_state_transition(state current, state target) {
* in other way than deleting migration object
**/
case state::cancelled:
[[fallthrough]];
case state::finished:
[[fallthrough]];
case state::deleted:
return false;
}
}
Expand Down
22 changes: 16 additions & 6 deletions src/v/cluster/data_migration_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,18 @@ data_migration copy_migration(const data_migration& migration) {
}

inbound_migration inbound_migration::copy() const {
return inbound_migration{.topics = topics.copy(), .groups = groups.copy()};
return inbound_migration{
.topics = topics.copy(),
.groups = groups.copy(),
.auto_advance = auto_advance};
}

outbound_migration outbound_migration::copy() const {
return outbound_migration{
.topics = topics.copy(), .groups = groups.copy(), .copy_to = copy_to};
.topics = topics.copy(),
.groups = groups.copy(),
.copy_to = copy_to,
.auto_advance = auto_advance};
}

std::ostream& operator<<(std::ostream& o, state state) {
Expand All @@ -65,6 +71,8 @@ std::ostream& operator<<(std::ostream& o, state state) {
return o << "canceling";
case state::cancelled:
return o << "cancelled";
case state::deleted:
return o << "deleted";
}
}

Expand Down Expand Up @@ -114,19 +122,21 @@ std::ostream& operator<<(std::ostream& o, const copy_target& t) {
std::ostream& operator<<(std::ostream& o, const inbound_migration& dm) {
fmt::print(
o,
"{{topics: {}, consumer_groups: {}}}",
"{{topics: {}, consumer_groups: {}, auto_advance: {}}}",
fmt::join(dm.topics, ", "),
fmt::join(dm.groups, ", "));
fmt::join(dm.groups, ", "),
dm.auto_advance);
return o;
}

std::ostream& operator<<(std::ostream& o, const outbound_migration& dm) {
fmt::print(
o,
"{{topics: {}, consumer_groups: {}, copy_to: {}}}",
"{{topics: {}, consumer_groups: {}, copy_to: {}, auto_advance: {}}}",
fmt::join(dm.topics, ", "),
fmt::join(dm.groups, ", "),
dm.copy_to);
dm.copy_to,
dm.auto_advance);
return o;
}

Expand Down
15 changes: 11 additions & 4 deletions src/v/cluster/data_migration_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ enum class state {
finished,
canceling,
cancelled,
deleted // a migration cannot use it
};
std::ostream& operator<<(std::ostream& o, state);

Expand Down Expand Up @@ -176,13 +177,15 @@ struct inbound_topic
*/
struct inbound_migration
: serde::
envelope<inbound_migration, serde::version<0>, serde::compat_version<0>> {
envelope<inbound_migration, serde::version<1>, serde::compat_version<0>> {
chunked_vector<inbound_topic> topics;
chunked_vector<consumer_group> groups;
// run the migration through stages without explicit user action
bool auto_advance = false;

inbound_migration copy() const;

auto serde_fields() { return std::tie(topics, groups); }
auto serde_fields() { return std::tie(topics, groups, auto_advance); }

friend bool operator==(const inbound_migration&, const inbound_migration&)
= default;
Expand Down Expand Up @@ -218,7 +221,7 @@ struct copy_target
struct outbound_migration
: serde::envelope<
outbound_migration,
serde::version<0>,
serde::version<1>,
serde::compat_version<0>> {
// topics which ownership should be released
chunked_vector<model::topic_namespace> topics;
Expand All @@ -227,10 +230,14 @@ struct outbound_migration
// optional target where the data should be copied to in the process of
// migration
std::optional<copy_target> copy_to;
// run the migration through stages without explicit user action
bool auto_advance = false;

outbound_migration copy() const;

auto serde_fields() { return std::tie(topics, groups, copy_to); }
auto serde_fields() {
return std::tie(topics, groups, copy_to, auto_advance);
}

friend bool operator==(const outbound_migration&, const outbound_migration&)
= default;
Expand Down
10 changes: 9 additions & 1 deletion src/v/redpanda/admin/api-doc/migration.def.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
},
"copy_to": {
"type": "bucket_configuration"
},
"auto_advance": {
"type": "boolean",
"description": "If set, migration does not need manual actions to progress"
}
}
},
Expand All @@ -66,6 +70,10 @@
"items": {
"type": "string"
}
},
"auto_advance": {
"type": "boolean",
"description": "If set, migration does not need manual actions to progress"
}
},
"required": [
Expand All @@ -81,7 +89,7 @@
},
"alias": {
"description": "Name of the topic in the destination cluster",
"type": "namespaced_topic"
"type": "namespaced_topic"
}
}
},
Expand Down
2 changes: 2 additions & 0 deletions src/v/redpanda/admin/migrations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ ss::httpd::migration_json::inbound_migration_state to_admin_type(
for (auto& cg : idm.groups) {
migration.consumer_groups.push(cg);
}
migration.auto_advance = idm.auto_advance;
ret.migration = migration;
return ret;
}
Expand All @@ -87,6 +88,7 @@ ss::httpd::migration_json::outbound_migration_state to_admin_type(
for (auto& cg : odm.groups) {
migration.consumer_groups.push(cg);
}
migration.auto_advance = odm.auto_advance;
ret.migration = migration;
return ret;
}
Expand Down
2 changes: 2 additions & 0 deletions src/v/redpanda/admin/topics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ admin_server::mount_topics(std::unique_ptr<ss::http::request> req) {
apply_validator(validator, json_doc);
cluster::data_migrations::inbound_migration migration;

migration.auto_advance = true;
migration.topics = parse_inbound_topics(json_doc);
auto result = co_await _controller->get_data_migration_frontend()
.local()
Expand Down Expand Up @@ -170,6 +171,7 @@ admin_server::unmount_topics(std::unique_ptr<ss::http::request> req) {
apply_validator(validator, json_doc);
cluster::data_migrations::outbound_migration migration;

migration.auto_advance = true;
migration.topics = parse_topics(json_doc);
auto result = co_await _controller->get_data_migration_frontend()
.local()
Expand Down
25 changes: 23 additions & 2 deletions tests/rptest/services/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,15 @@ def from_response(cls, rsp: Response):


class NamespacedTopic:
def __init__(self, topic: str, namespace: str = "kafka"):
def __init__(self, topic: str, namespace: str | None = "kafka"):
self.ns = namespace
self.topic = topic

def as_dict(self):
return {'ns': self.ns, 'topic': self.topic}
ret = {'topic': self.topic}
if self.ns is not None:
ret['ns'] = self.ns
return ret

@classmethod
def from_json(cls, body: bytes):
Expand Down Expand Up @@ -1693,3 +1696,21 @@ def delete_data_migration(self,

path = f"migrations/{migration_id}"
return self._request("DELETE", path, node=node)

def unmount_topics(self,
topics: list[NamespacedTopic],
node: Optional[ClusterNode] = None):
path = "topics/unmount"
return self._request("POST",
path,
node=node,
json={"topics": [t.as_dict() for t in topics]})

def mount_topics(self,
topics: list[InboundTopic],
node: Optional[ClusterNode] = None):
path = "topics/mount"
return self._request("POST",
path,
node=node,
json={"topics": [t.as_dict() for t in topics]})
Loading

0 comments on commit ac7e0c1

Please sign in to comment.