Skip to content

Commit

Permalink
Merge pull request #14586 from michael-redpanda/audit-kafka-api-part-2
Browse files Browse the repository at this point in the history
Audit kafka api part 2
  • Loading branch information
piyushredpanda authored Nov 2, 2023
2 parents a7929e7 + c97fe3e commit bf5ff0f
Show file tree
Hide file tree
Showing 37 changed files with 1,100 additions and 660 deletions.
33 changes: 33 additions & 0 deletions src/v/kafka/protocol/create_partitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,39 @@ struct create_partitions_response final {

create_partitions_response_data data;

create_partitions_response() = default;

create_partitions_response(
error_code ec,
std::optional<ss::sstring> error_message,
create_partitions_response current_resp,
create_partitions_request request_data,
std::vector<create_partitions_topic>::difference_type request_topic_end) {
data.results.reserve(
current_resp.data.results.size() + request_data.data.topics.size());
std::transform(
current_resp.data.results.begin(),
current_resp.data.results.end(),
std::back_inserter(data.results),
[ec, &error_message](const create_partitions_topic_result& r) {
return create_partitions_topic_result{
.name = r.name,
.error_code = ec,
.error_message = error_message,
};
});
std::transform(
request_data.data.topics.begin(),
request_data.data.topics.begin() + request_topic_end,
std::back_inserter(data.results),
[ec, &error_message](const create_partitions_topic& t) {
return create_partitions_topic_result{
.name = t.name,
.error_code = ec,
.error_message = error_message};
});
}

void encode(protocol::encoder& writer, api_version version) {
data.encode(writer, version);
}
Expand Down
16 changes: 13 additions & 3 deletions src/v/kafka/protocol/find_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,14 @@ struct find_coordinator_response final {
find_coordinator_response() = default;

find_coordinator_response(
error_code error, model::node_id node, ss::sstring host, int32_t port)
error_code error,
std::optional<ss::sstring> error_message,
model::node_id node,
ss::sstring host,
int32_t port)
: data({
.error_code = error,
.error_message = std::move(error_message),
.node_id = node,
.host = std::move(host),
.port = port,
Expand All @@ -67,10 +72,15 @@ struct find_coordinator_response final {
find_coordinator_response(
model::node_id node, ss::sstring host, int32_t port)
: find_coordinator_response(
error_code::none, node, std::move(host), port) {}
error_code::none, std::nullopt, node, std::move(host), port) {}

find_coordinator_response(error_code error, ss::sstring error_message)
: find_coordinator_response(
error, std::move(error_message), model::node_id(-1), "", -1) {}

explicit find_coordinator_response(error_code error)
: find_coordinator_response(error, model::node_id(-1), "", -1) {}
: find_coordinator_response(
error, std::nullopt, model::node_id(-1), "", -1) {}

void encode(protocol::encoder& writer, api_version version) {
data.encode(writer, version);
Expand Down
42 changes: 42 additions & 0 deletions src/v/kafka/protocol/offset_for_leader_epoch.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,48 @@ struct offset_for_leader_epoch_response final {

offset_for_leader_epoch_response_data data;

offset_for_leader_epoch_response() = default;

offset_for_leader_epoch_response(
error_code ec,
offset_for_leader_epoch_request current_request,
std::vector<offset_for_leader_topic_result> unauthorized_results) {
data.topics.reserve(
current_request.data.topics.size() + unauthorized_results.size());

std::transform(
current_request.data.topics.begin(),
current_request.data.topics.end(),
std::back_inserter(data.topics),
[ec](offset_for_leader_topic& o) {
std::vector<epoch_end_offset> offsets;
offsets.reserve(o.partitions.size());
std::transform(
o.partitions.begin(),
o.partitions.end(),
std::back_inserter(offsets),
[ec](const offset_for_leader_partition& ol) {
return epoch_end_offset{
.error_code = ec, .partition = ol.partition};
});
return offset_for_leader_topic_result{
.topic = std::move(o.topic), .partitions = std::move(offsets)};
});
std::for_each(
unauthorized_results.begin(),
unauthorized_results.end(),
[ec](offset_for_leader_topic_result& r) {
std::for_each(
r.partitions.begin(),
r.partitions.end(),
[ec](epoch_end_offset& o) { o.error_code = ec; });
});
std::move(
unauthorized_results.begin(),
unauthorized_results.end(),
std::back_inserter(data.topics));
}

static epoch_end_offset make_epoch_end_offset(
model::partition_id p_id,
kafka::error_code ec,
Expand Down
11 changes: 11 additions & 0 deletions src/v/kafka/server/handlers/alter_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,17 @@ ss::future<response_ptr> alter_configs_handler::handle(
alter_configs_resource,
alter_configs_resource_response>(ctx, groupped);

if (!ctx.audit()) {
auto responses = make_audit_failure_response<
alter_configs_resource_response,
alter_configs_resource>(
std::move(groupped), std::move(unauthorized_responsens));
co_return co_await ctx.respond(
assemble_alter_config_response<
alter_configs_response,
alter_configs_resource_response>(std::move(responses)));
}

std::vector<ss::future<std::vector<alter_configs_resource_response>>>
futures;
futures.reserve(2);
Expand Down
26 changes: 24 additions & 2 deletions src/v/kafka/server/handlers/alter_partition_reassignments.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,30 @@ static ss::future<response_ptr> do_handle(alter_op_context& octx) {
return octx.rctx.respond(std::move(octx.response));
}

if (!octx.rctx.authorized(
security::acl_operation::alter, security::default_cluster_name)) {
auto authz = octx.rctx.authorized(
security::acl_operation::alter, security::default_cluster_name);

auto additional_resources = [&octx]() {
std::vector<model::topic> topics;
topics.reserve(octx.request.data.topics.size());
std::transform(
octx.request.data.topics.begin(),
octx.request.data.topics.end(),
std::back_inserter(topics),
[](const reassignable_topic& t) { return t.name; });

return topics;
};

if (!octx.rctx.audit(std::move(additional_resources))) {
octx.response.data.error_code = error_code::broker_not_available;
octx.response.data.error_message
= "Broker not available - audit system failure";

return octx.rctx.respond(std::move(octx.response));
}

if (!authz) {
vlog(
klog.debug,
"Failed cluster authorization. Requires ALTER permissions on the "
Expand Down
43 changes: 43 additions & 0 deletions src/v/kafka/server/handlers/configs/config_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,49 @@ T make_error_alter_config_resource_response(
.resource_type = resource.resource_type,
.resource_name = resource.resource_name};
}

template<typename R, typename T>
std::vector<std::vector<R>> make_audit_failure_response(
groupped_resources<T>&& resources, std::vector<R> unauthorized_responses) {
std::vector<R> responses;

auto gen_resp = [](const T& res) {
return make_error_alter_config_resource_response<R>(
res,
error_code::broker_not_available,
"Broker not available - audit system failure");
};

responses.reserve(
resources.broker_changes.size() + resources.topic_changes.size()
+ unauthorized_responses.size());

std::transform(
resources.broker_changes.begin(),
resources.broker_changes.end(),
std::back_inserter(responses),
gen_resp);

std::transform(
resources.topic_changes.begin(),
resources.topic_changes.end(),
std::back_inserter(responses),
gen_resp);

std::for_each(
unauthorized_responses.begin(), unauthorized_responses.end(), [](R& r) {
r.error_code = error_code::broker_not_available;
r.error_message = "Broker not available - audit system failure";
});

std::move(
unauthorized_responses.begin(),
unauthorized_responses.end(),
std::back_inserter(responses));

return {responses};
}

/**
* Authorizes groupped alter configuration resources, it returns not authorized
* responsens and modifies passed in group_resources<T>
Expand Down
12 changes: 12 additions & 0 deletions src/v/kafka/server/handlers/create_partitions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,18 @@ ss::future<response_ptr> create_partitions_handler::handle(
return ctx.authorized(security::acl_operation::alter, tp.name);
});

if (!ctx.audit()) {
auto distance = std::distance(
request.data.topics.begin(), valid_range_end);

co_return co_await ctx.respond(create_partitions_response(
error_code::broker_not_available,
"Broker not available - audit system failure",
std::move(resp),
std::move(request),
distance));
}

// check duplicates
valid_range_end = validate_range_duplicates(
request.data.topics.begin(),
Expand Down
123 changes: 81 additions & 42 deletions src/v/kafka/server/handlers/delete_records.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ validate_at_topic_level(request_context& ctx, const delete_records_topic& t) {
return make_partition_errors(t, error_code::policy_violation);
}

const auto is_authorized = [&ctx](const delete_records_topic& t) {
return ctx.authorized(security::acl_operation::remove, t.name);
};
const auto is_deletable = [](const cluster::topic_configuration& cfg) {
if (cfg.is_read_replica()) {
return false;
Expand Down Expand Up @@ -83,9 +80,6 @@ validate_at_topic_level(request_context& ctx, const delete_records_topic& t) {
model::topic_namespace_view(model::kafka_namespace, t.name));
if (!cfg) {
return make_partition_errors(t, error_code::unknown_topic_or_partition);
}
if (!is_authorized(t)) {
return make_partition_errors(t, error_code::topic_authorization_failed);
} else if (!is_deletable(*cfg)) {
return make_partition_errors(t, error_code::policy_violation);
} else if (is_nodelete_topic(t)) {
Expand Down Expand Up @@ -181,44 +175,89 @@ delete_records_handler::handle(request_context ctx, ss::smp_service_group) {
log_request(ctx.header(), request);

delete_records_response response;
std::vector<ss::future<result_t>> fs;
for (auto& topic : request.data.topics) {
/// Topic level validation, errors will be all the same for each
/// partition under the topic. Validation for individual partitions may
/// happen in the inner for loop below.
auto topic_level_errors = validate_at_topic_level(ctx, topic);
if (!topic_level_errors.empty()) {
response.data.topics.push_back(delete_records_topic_result{
.name = topic.name, .partitions = std::move(topic_level_errors)});
continue;
}
for (auto& partition : topic.partitions) {
auto ktp = model::ktp(topic.name, partition.partition_index);
auto shard = ctx.shards().shard_for(ktp);
if (!shard) {
fs.push_back(
ss::make_ready_future<result_t>(make_partition_error(
ktp, error_code::unknown_topic_or_partition)));
continue;
}
auto f
= ctx.partition_manager()
.invoke_on(
*shard,
[ktp,
timeout = request.data.timeout_ms,
o = partition.offset](cluster::partition_manager& pm) {
return prefix_truncate(pm, ktp, o, timeout);
})
.handle_exception([ktp](std::exception_ptr eptr) {
vlog(klog.error, "Caught unexpected exception: {}", eptr);
return make_partition_error(
ktp, error_code::unknown_server_error);
});
fs.push_back(std::move(f));
}

auto begin = request.data.topics.begin();
auto valid_range_end = request.data.topics.end();

const auto is_authorized = [&ctx](const delete_records_topic& t) {
return ctx.authorized(security::acl_operation::remove, t.name);
};

auto unauthorized_it = std::partition(
begin, valid_range_end, is_authorized);

if (!ctx.audit()) {
response.data.topics.reserve(request.data.topics.size());
std::transform(
request.data.topics.begin(),
request.data.topics.end(),
std::back_inserter(response.data.topics),
[](const delete_records_topic& t) {
auto errs = make_partition_errors(
t, error_code::broker_not_available);
return delete_records_topic_result{
.name = t.name, .partitions = std::move(errs)};
});

co_return co_await ctx.respond(std::move(response));
}

std::transform(
unauthorized_it,
valid_range_end,
std::back_inserter(response.data.topics),
[](const delete_records_topic& t) {
auto errs = make_partition_errors(
t, error_code::topic_authorization_failed);
return delete_records_topic_result{
.name = t.name, .partitions = std::move(errs)};
});
valid_range_end = unauthorized_it;

std::vector<ss::future<result_t>> fs;

std::for_each(
begin,
valid_range_end,
[&fs, &ctx, &response, &request](const delete_records_topic& topic) {
/// Topic level validation, errors will be all the same for each
/// partition under the topic. Validation for individual partitions
/// may happen in the inner for loop below.
auto topic_level_errors = validate_at_topic_level(ctx, topic);
if (!topic_level_errors.empty()) {
response.data.topics.push_back(delete_records_topic_result{
.name = topic.name,
.partitions = std::move(topic_level_errors)});
return;
}
for (auto& partition : topic.partitions) {
auto ktp = model::ktp(topic.name, partition.partition_index);
auto shard = ctx.shards().shard_for(ktp);
if (!shard) {
fs.push_back(
ss::make_ready_future<result_t>(make_partition_error(
ktp, error_code::unknown_topic_or_partition)));
return;
}
auto f
= ctx.partition_manager()
.invoke_on(
*shard,
[ktp,
timeout = request.data.timeout_ms,
o = partition.offset](cluster::partition_manager& pm) {
return prefix_truncate(pm, ktp, o, timeout);
})
.handle_exception([ktp](std::exception_ptr eptr) {
vlog(
klog.error, "Caught unexpected exception: {}", eptr);
return make_partition_error(
ktp, error_code::unknown_server_error);
});
fs.push_back(std::move(f));
}
});

/// Perform prefix truncation on partitions
auto results = co_await ss::when_all_succeed(fs.begin(), fs.end());

Expand Down
Loading

0 comments on commit bf5ff0f

Please sign in to comment.