diff --git a/src/v/kafka/protocol/create_partitions.h b/src/v/kafka/protocol/create_partitions.h index 6fbf97054b407..51f06dda6e9ac 100644 --- a/src/v/kafka/protocol/create_partitions.h +++ b/src/v/kafka/protocol/create_partitions.h @@ -43,6 +43,39 @@ struct create_partitions_response final { create_partitions_response_data data; + create_partitions_response() = default; + + create_partitions_response( + error_code ec, + std::optional error_message, + create_partitions_response current_resp, + create_partitions_request request_data, + std::vector::difference_type request_topic_end) { + data.results.reserve( + current_resp.data.results.size() + request_data.data.topics.size()); + std::transform( + current_resp.data.results.begin(), + current_resp.data.results.end(), + std::back_inserter(data.results), + [ec, &error_message](const create_partitions_topic_result& r) { + return create_partitions_topic_result{ + .name = r.name, + .error_code = ec, + .error_message = error_message, + }; + }); + std::transform( + request_data.data.topics.begin(), + request_data.data.topics.begin() + request_topic_end, + std::back_inserter(data.results), + [ec, &error_message](const create_partitions_topic& t) { + return create_partitions_topic_result{ + .name = t.name, + .error_code = ec, + .error_message = error_message}; + }); + } + void encode(protocol::encoder& writer, api_version version) { data.encode(writer, version); } diff --git a/src/v/kafka/protocol/find_coordinator.h b/src/v/kafka/protocol/find_coordinator.h index 155660088d6ca..66a819f506aa8 100644 --- a/src/v/kafka/protocol/find_coordinator.h +++ b/src/v/kafka/protocol/find_coordinator.h @@ -56,9 +56,14 @@ struct find_coordinator_response final { find_coordinator_response() = default; find_coordinator_response( - error_code error, model::node_id node, ss::sstring host, int32_t port) + error_code error, + std::optional error_message, + model::node_id node, + ss::sstring host, + int32_t port) : data({ .error_code = error, + .error_message = std::move(error_message), .node_id = node, .host = std::move(host), .port = port, @@ -67,10 +72,15 @@ struct find_coordinator_response final { find_coordinator_response( model::node_id node, ss::sstring host, int32_t port) : find_coordinator_response( - error_code::none, node, std::move(host), port) {} + error_code::none, std::nullopt, node, std::move(host), port) {} + + find_coordinator_response(error_code error, ss::sstring error_message) + : find_coordinator_response( + error, std::move(error_message), model::node_id(-1), "", -1) {} explicit find_coordinator_response(error_code error) - : find_coordinator_response(error, model::node_id(-1), "", -1) {} + : find_coordinator_response( + error, std::nullopt, model::node_id(-1), "", -1) {} void encode(protocol::encoder& writer, api_version version) { data.encode(writer, version); diff --git a/src/v/kafka/protocol/offset_for_leader_epoch.h b/src/v/kafka/protocol/offset_for_leader_epoch.h index 279fb909a2366..417fc08e4761c 100644 --- a/src/v/kafka/protocol/offset_for_leader_epoch.h +++ b/src/v/kafka/protocol/offset_for_leader_epoch.h @@ -43,6 +43,48 @@ struct offset_for_leader_epoch_response final { offset_for_leader_epoch_response_data data; + offset_for_leader_epoch_response() = default; + + offset_for_leader_epoch_response( + error_code ec, + offset_for_leader_epoch_request current_request, + std::vector unauthorized_results) { + data.topics.reserve( + current_request.data.topics.size() + unauthorized_results.size()); + + std::transform( + current_request.data.topics.begin(), + current_request.data.topics.end(), + std::back_inserter(data.topics), + [ec](offset_for_leader_topic& o) { + std::vector offsets; + offsets.reserve(o.partitions.size()); + std::transform( + o.partitions.begin(), + o.partitions.end(), + std::back_inserter(offsets), + [ec](const offset_for_leader_partition& ol) { + return epoch_end_offset{ + .error_code = ec, .partition = ol.partition}; + }); + return offset_for_leader_topic_result{ + .topic = std::move(o.topic), .partitions = std::move(offsets)}; + }); + std::for_each( + unauthorized_results.begin(), + unauthorized_results.end(), + [ec](offset_for_leader_topic_result& r) { + std::for_each( + r.partitions.begin(), + r.partitions.end(), + [ec](epoch_end_offset& o) { o.error_code = ec; }); + }); + std::move( + unauthorized_results.begin(), + unauthorized_results.end(), + std::back_inserter(data.topics)); + } + static epoch_end_offset make_epoch_end_offset( model::partition_id p_id, kafka::error_code ec, diff --git a/src/v/kafka/server/handlers/alter_configs.cc b/src/v/kafka/server/handlers/alter_configs.cc index 91a40963f32b3..d4a9b1870f3b5 100644 --- a/src/v/kafka/server/handlers/alter_configs.cc +++ b/src/v/kafka/server/handlers/alter_configs.cc @@ -340,6 +340,17 @@ ss::future alter_configs_handler::handle( alter_configs_resource, alter_configs_resource_response>(ctx, groupped); + if (!ctx.audit()) { + auto responses = make_audit_failure_response< + alter_configs_resource_response, + alter_configs_resource>( + std::move(groupped), std::move(unauthorized_responsens)); + co_return co_await ctx.respond( + assemble_alter_config_response< + alter_configs_response, + alter_configs_resource_response>(std::move(responses))); + } + std::vector>> futures; futures.reserve(2); diff --git a/src/v/kafka/server/handlers/alter_partition_reassignments.cc b/src/v/kafka/server/handlers/alter_partition_reassignments.cc index 9a06fddcdf6ec..3423af6a6aad6 100644 --- a/src/v/kafka/server/handlers/alter_partition_reassignments.cc +++ b/src/v/kafka/server/handlers/alter_partition_reassignments.cc @@ -335,8 +335,30 @@ static ss::future do_handle(alter_op_context& octx) { return octx.rctx.respond(std::move(octx.response)); } - if (!octx.rctx.authorized( - security::acl_operation::alter, security::default_cluster_name)) { + auto authz = octx.rctx.authorized( + security::acl_operation::alter, security::default_cluster_name); + + auto additional_resources = [&octx]() { + std::vector topics; + topics.reserve(octx.request.data.topics.size()); + std::transform( + octx.request.data.topics.begin(), + octx.request.data.topics.end(), + std::back_inserter(topics), + [](const reassignable_topic& t) { return t.name; }); + + return topics; + }; + + if (!octx.rctx.audit(std::move(additional_resources))) { + octx.response.data.error_code = error_code::broker_not_available; + octx.response.data.error_message + = "Broker not available - audit system failure"; + + return octx.rctx.respond(std::move(octx.response)); + } + + if (!authz) { vlog( klog.debug, "Failed cluster authorization. Requires ALTER permissions on the " diff --git a/src/v/kafka/server/handlers/configs/config_utils.h b/src/v/kafka/server/handlers/configs/config_utils.h index 20e4131fbfa16..ce110b222131d 100644 --- a/src/v/kafka/server/handlers/configs/config_utils.h +++ b/src/v/kafka/server/handlers/configs/config_utils.h @@ -73,6 +73,49 @@ T make_error_alter_config_resource_response( .resource_type = resource.resource_type, .resource_name = resource.resource_name}; } + +template +std::vector> make_audit_failure_response( + groupped_resources&& resources, std::vector unauthorized_responses) { + std::vector responses; + + auto gen_resp = [](const T& res) { + return make_error_alter_config_resource_response( + res, + error_code::broker_not_available, + "Broker not available - audit system failure"); + }; + + responses.reserve( + resources.broker_changes.size() + resources.topic_changes.size() + + unauthorized_responses.size()); + + std::transform( + resources.broker_changes.begin(), + resources.broker_changes.end(), + std::back_inserter(responses), + gen_resp); + + std::transform( + resources.topic_changes.begin(), + resources.topic_changes.end(), + std::back_inserter(responses), + gen_resp); + + std::for_each( + unauthorized_responses.begin(), unauthorized_responses.end(), [](R& r) { + r.error_code = error_code::broker_not_available; + r.error_message = "Broker not available - audit system failure"; + }); + + std::move( + unauthorized_responses.begin(), + unauthorized_responses.end(), + std::back_inserter(responses)); + + return {responses}; +} + /** * Authorizes groupped alter configuration resources, it returns not authorized * responsens and modifies passed in group_resources diff --git a/src/v/kafka/server/handlers/create_partitions.cc b/src/v/kafka/server/handlers/create_partitions.cc index c28e224cc3b62..cb5807a91cc1c 100644 --- a/src/v/kafka/server/handlers/create_partitions.cc +++ b/src/v/kafka/server/handlers/create_partitions.cc @@ -180,6 +180,18 @@ ss::future create_partitions_handler::handle( return ctx.authorized(security::acl_operation::alter, tp.name); }); + if (!ctx.audit()) { + auto distance = std::distance( + request.data.topics.begin(), valid_range_end); + + co_return co_await ctx.respond(create_partitions_response( + error_code::broker_not_available, + "Broker not available - audit system failure", + std::move(resp), + std::move(request), + distance)); + } + // check duplicates valid_range_end = validate_range_duplicates( request.data.topics.begin(), diff --git a/src/v/kafka/server/handlers/delete_records.cc b/src/v/kafka/server/handlers/delete_records.cc index feb70c8a7a859..5af5391a5e8e7 100644 --- a/src/v/kafka/server/handlers/delete_records.cc +++ b/src/v/kafka/server/handlers/delete_records.cc @@ -52,9 +52,6 @@ validate_at_topic_level(request_context& ctx, const delete_records_topic& t) { return make_partition_errors(t, error_code::policy_violation); } - const auto is_authorized = [&ctx](const delete_records_topic& t) { - return ctx.authorized(security::acl_operation::remove, t.name); - }; const auto is_deletable = [](const cluster::topic_configuration& cfg) { if (cfg.is_read_replica()) { return false; @@ -83,9 +80,6 @@ validate_at_topic_level(request_context& ctx, const delete_records_topic& t) { model::topic_namespace_view(model::kafka_namespace, t.name)); if (!cfg) { return make_partition_errors(t, error_code::unknown_topic_or_partition); - } - if (!is_authorized(t)) { - return make_partition_errors(t, error_code::topic_authorization_failed); } else if (!is_deletable(*cfg)) { return make_partition_errors(t, error_code::policy_violation); } else if (is_nodelete_topic(t)) { @@ -181,44 +175,89 @@ delete_records_handler::handle(request_context ctx, ss::smp_service_group) { log_request(ctx.header(), request); delete_records_response response; - std::vector> fs; - for (auto& topic : request.data.topics) { - /// Topic level validation, errors will be all the same for each - /// partition under the topic. Validation for individual partitions may - /// happen in the inner for loop below. - auto topic_level_errors = validate_at_topic_level(ctx, topic); - if (!topic_level_errors.empty()) { - response.data.topics.push_back(delete_records_topic_result{ - .name = topic.name, .partitions = std::move(topic_level_errors)}); - continue; - } - for (auto& partition : topic.partitions) { - auto ktp = model::ktp(topic.name, partition.partition_index); - auto shard = ctx.shards().shard_for(ktp); - if (!shard) { - fs.push_back( - ss::make_ready_future(make_partition_error( - ktp, error_code::unknown_topic_or_partition))); - continue; - } - auto f - = ctx.partition_manager() - .invoke_on( - *shard, - [ktp, - timeout = request.data.timeout_ms, - o = partition.offset](cluster::partition_manager& pm) { - return prefix_truncate(pm, ktp, o, timeout); - }) - .handle_exception([ktp](std::exception_ptr eptr) { - vlog(klog.error, "Caught unexpected exception: {}", eptr); - return make_partition_error( - ktp, error_code::unknown_server_error); - }); - fs.push_back(std::move(f)); - } + + auto begin = request.data.topics.begin(); + auto valid_range_end = request.data.topics.end(); + + const auto is_authorized = [&ctx](const delete_records_topic& t) { + return ctx.authorized(security::acl_operation::remove, t.name); + }; + + auto unauthorized_it = std::partition( + begin, valid_range_end, is_authorized); + + if (!ctx.audit()) { + response.data.topics.reserve(request.data.topics.size()); + std::transform( + request.data.topics.begin(), + request.data.topics.end(), + std::back_inserter(response.data.topics), + [](const delete_records_topic& t) { + auto errs = make_partition_errors( + t, error_code::broker_not_available); + return delete_records_topic_result{ + .name = t.name, .partitions = std::move(errs)}; + }); + + co_return co_await ctx.respond(std::move(response)); } + std::transform( + unauthorized_it, + valid_range_end, + std::back_inserter(response.data.topics), + [](const delete_records_topic& t) { + auto errs = make_partition_errors( + t, error_code::topic_authorization_failed); + return delete_records_topic_result{ + .name = t.name, .partitions = std::move(errs)}; + }); + valid_range_end = unauthorized_it; + + std::vector> fs; + + std::for_each( + begin, + valid_range_end, + [&fs, &ctx, &response, &request](const delete_records_topic& topic) { + /// Topic level validation, errors will be all the same for each + /// partition under the topic. Validation for individual partitions + /// may happen in the inner for loop below. + auto topic_level_errors = validate_at_topic_level(ctx, topic); + if (!topic_level_errors.empty()) { + response.data.topics.push_back(delete_records_topic_result{ + .name = topic.name, + .partitions = std::move(topic_level_errors)}); + return; + } + for (auto& partition : topic.partitions) { + auto ktp = model::ktp(topic.name, partition.partition_index); + auto shard = ctx.shards().shard_for(ktp); + if (!shard) { + fs.push_back( + ss::make_ready_future(make_partition_error( + ktp, error_code::unknown_topic_or_partition))); + return; + } + auto f + = ctx.partition_manager() + .invoke_on( + *shard, + [ktp, + timeout = request.data.timeout_ms, + o = partition.offset](cluster::partition_manager& pm) { + return prefix_truncate(pm, ktp, o, timeout); + }) + .handle_exception([ktp](std::exception_ptr eptr) { + vlog( + klog.error, "Caught unexpected exception: {}", eptr); + return make_partition_error( + ktp, error_code::unknown_server_error); + }); + fs.push_back(std::move(f)); + } + }); + /// Perform prefix truncation on partitions auto results = co_await ss::when_all_succeed(fs.begin(), fs.end()); diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index dc7e8242b1414..ef746a9d9d740 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -891,7 +891,16 @@ fetch_handler::handle(request_context rctx, ss::smp_service_group ssg) { [&octx] { return octx.should_stop_fetch(); }, [&octx] { return fetch_topic_partitions(octx); }); }) - .then([&octx] { return std::move(octx).send_response(); }); + .then([&octx] { + // NOTE: Audit call doesn't happen until _after_ the fetch + // is done. This was done for the sake of simplicity and + // because fetch doesn't alter the state of the broker + if (!octx.rctx.audit()) { + return std::move(octx).send_error_response( + error_code::broker_not_available); + } + return std::move(octx).send_response(); + }); }); }); } @@ -1080,6 +1089,19 @@ ss::future op_context::send_response() && { return rctx.respond(std::move(final_response)); } +ss::future op_context::send_error_response(error_code ec) && { + fetch_response resp; + resp.data.error_code = ec; + + if (session_ctx.is_sessionless()) { + resp.data.session_id = invalid_fetch_session_id; + } else { + resp.data.session_id = session_ctx.session()->id(); + } + + return rctx.respond(std::move(resp)); +} + op_context::response_placeholder::response_placeholder( fetch_response::iterator it, op_context* ctx) : _it(it) diff --git a/src/v/kafka/server/handlers/fetch.h b/src/v/kafka/server/handlers/fetch.h index 546208b262637..beff867de121a 100644 --- a/src/v/kafka/server/handlers/fetch.h +++ b/src/v/kafka/server/handlers/fetch.h @@ -120,6 +120,8 @@ struct op_context { ss::future send_response() &&; + ss::future send_error_response(error_code ec) &&; + response_iterator response_begin() { return iteration_order.begin(); } response_iterator response_end() { return iteration_order.end(); } diff --git a/src/v/kafka/server/handlers/find_coordinator.cc b/src/v/kafka/server/handlers/find_coordinator.cc index 481c46e322f97..20963f0b8c0b0 100644 --- a/src/v/kafka/server/handlers/find_coordinator.cc +++ b/src/v/kafka/server/handlers/find_coordinator.cc @@ -14,6 +14,7 @@ #include "cluster/tx_registry_frontend.h" #include "config/configuration.h" #include "kafka/protocol/errors.h" +#include "kafka/protocol/find_coordinator.h" #include "kafka/server/coordinator_ntp_mapper.h" #include "kafka/server/rm_group_frontend.h" #include "model/metadata.h" @@ -84,7 +85,15 @@ ss::future find_coordinator_handler::handle( transactional_id tx_id(request.data.key); - if (!ctx.authorized(security::acl_operation::describe, tx_id)) { + auto authz = ctx.authorized(security::acl_operation::describe, tx_id); + + if (!ctx.audit()) { + return ctx.respond(find_coordinator_response( + error_code::broker_not_available, + "Broker not available - audit system failure")); + } + + if (!authz) { return ctx.respond(find_coordinator_response( error_code::transactional_id_authorization_failed)); } @@ -112,8 +121,16 @@ ss::future find_coordinator_handler::handle( find_coordinator_response(error_code::unsupported_version)); } - if (!ctx.authorized( - security::acl_operation::describe, group_id(request.data.key))) { + auto authz = ctx.authorized( + security::acl_operation::describe, group_id(request.data.key)); + + if (!ctx.audit()) { + return ctx.respond(find_coordinator_response( + error_code::broker_not_available, + "Broker not available - audit system failure")); + } + + if (!authz) { return ctx.respond( find_coordinator_response(error_code::group_authorization_failed)); } diff --git a/src/v/kafka/server/handlers/incremental_alter_configs.cc b/src/v/kafka/server/handlers/incremental_alter_configs.cc index 4e91543aa65e9..886eb2ec0df09 100644 --- a/src/v/kafka/server/handlers/incremental_alter_configs.cc +++ b/src/v/kafka/server/handlers/incremental_alter_configs.cc @@ -473,6 +473,17 @@ ss::future incremental_alter_configs_handler::handle( incremental_alter_configs_resource, resp_resource_t>(ctx, groupped); + if (!ctx.audit()) { + auto responses = make_audit_failure_response< + resp_resource_t, + incremental_alter_configs_resource>( + std::move(groupped), std::move(unauthorized_responsens)); + + co_return co_await ctx.respond(assemble_alter_config_response< + incremental_alter_configs_response, + resp_resource_t>(std::move(responses))); + } + std::vector>> futures; futures.reserve(2); futures.push_back(alter_topic_configuration( diff --git a/src/v/kafka/server/handlers/metadata.cc b/src/v/kafka/server/handlers/metadata.cc index 4a4c00ca8969f..c8eb9e0c10f0b 100644 --- a/src/v/kafka/server/handlers/metadata.cc +++ b/src/v/kafka/server/handlers/metadata.cc @@ -289,6 +289,7 @@ static ss::future> get_topic_metadata( std::move(res)); } + std::vector topics_to_be_created; std::vector> new_topics; for (auto& topic : *request.data.topics) { @@ -326,10 +327,36 @@ static ss::future> get_topic_metadata( std::move(topic.name), error_code::topic_authorization_failed)); continue; } - new_topics.push_back( - create_topic(ctx, std::move(topic.name), is_node_isolated)); + topics_to_be_created.emplace_back(std::move(topic.name)); } + if (!ctx.audit()) { + std::for_each(res.begin(), res.end(), [](metadata_response::topic& t) { + t.error_code = error_code::broker_not_available; + }); + + std::transform( + topics_to_be_created.begin(), + topics_to_be_created.end(), + std::back_inserter(res), + [](model::topic& t) { + return metadata_response::topic{ + .error_code = error_code::broker_not_available, + .name = std::move(t)}; + }); + + return ss::make_ready_future>( + std::move(res)); + } + + std::for_each( + topics_to_be_created.begin(), + topics_to_be_created.end(), + [&new_topics, &ctx, is_node_isolated](model::topic& t) { + new_topics.emplace_back( + create_topic(ctx, std::move(t), is_node_isolated)); + }); + return ss::when_all_succeed(new_topics.begin(), new_topics.end()) .then([res = std::move(res)]( std::vector topics) mutable { diff --git a/src/v/kafka/server/handlers/offset_for_leader_epoch.cc b/src/v/kafka/server/handlers/offset_for_leader_epoch.cc index 831e2125130e3..42837b5e96292 100644 --- a/src/v/kafka/server/handlers/offset_for_leader_epoch.cc +++ b/src/v/kafka/server/handlers/offset_for_leader_epoch.cc @@ -231,11 +231,19 @@ ss::future offset_for_leader_epoch_handler::handle( request.data.topics.erase(it, request.data.topics.end()); } + if (!ctx.audit()) { + co_return co_await ctx.respond(offset_for_leader_epoch_response( + error_code::broker_not_available, + std::move(request), + std::move(unauthorized))); + } + + offset_for_leader_epoch_response response; + // fetch offsets auto results = co_await get_offsets_for_leader_epochs( ctx, std::move(request.data.topics)); - offset_for_leader_epoch_response response; response.data.topics = std::move(results); // merge with unauthorized topics diff --git a/src/v/kafka/server/handlers/produce.cc b/src/v/kafka/server/handlers/produce.cc index 0bf1f3b3e5501..7d8271fad4b91 100644 --- a/src/v/kafka/server/handlers/produce.cc +++ b/src/v/kafka/server/handlers/produce.cc @@ -499,16 +499,6 @@ produce_topic(produce_ctx& octx, produce_request::topic& topic) { partitions_dispatched.reserve(topic.partitions.size()); for (auto& part : topic.partitions) { - if (!octx.rctx.authorized(security::acl_operation::write, topic.name)) { - partitions_dispatched.push_back(ss::now()); - partitions_produced.push_back( - ss::make_ready_future( - produce_response::partition{ - .partition_index = part.partition_index, - .error_code = error_code::topic_authorization_failed})); - continue; - } - const auto& kafka_noproduce_topics = config::shard_local_cfg().kafka_noproduce_topics(); const auto is_noproduce_topic = std::find( @@ -700,9 +690,13 @@ produce_handler::handle(request_context ctx, ss::smp_service_group ssg) { || !ctx.authorized( security::acl_operation::write, transactional_id(*request.data.transactional_id))) { + auto ec = error_code::transactional_id_authorization_failed; + + if (!ctx.audit()) [[unlikely]] { + ec = error_code::broker_not_available; + } return process_result_stages::single_stage( - ctx.respond(request.make_error_response( - error_code::transactional_id_authorization_failed))); + ctx.respond(request.make_error_response(ec))); } // Note that authorization to a transactionalId implies // ProducerId authorization @@ -727,6 +721,58 @@ produce_handler::handle(request_context ctx, ss::smp_service_group ssg) { return process_result_stages::single_stage(ctx.respond( request.make_error_response(error_code::invalid_required_acks))); } + + // Must now validate if we are authorized, we will remove items from the + // request that are not authorized and create response entries for those. + // Once authz is checked, then attempt audit + auto unauthorized_it = std::partition( + request.data.topics.begin(), + request.data.topics.end(), + [&ctx](const topic_produce_data& t) { + return ctx.authorized(security::acl_operation::write, t.name); + }); + + const auto unauthorized_contains_topic = [&unauthorized_it, + &request](const model::topic& t) { + return std::any_of( + unauthorized_it, + request.data.topics.end(), + [t](const topic_produce_data& tp) { return tp.name == t; }); + }; + + // We do not want to audit if the request contains the audit topic + // however we _definitely_ want to audit if it was contained in the + // unauthorized list + if ( + !(ctx.request_contains_audit_topic() + && !unauthorized_contains_topic(model::kafka_audit_logging_topic)) + && !ctx.audit()) { + return process_result_stages::single_stage(ctx.respond( + request.make_error_response(error_code::broker_not_available))); + } + + resp.data.responses.reserve( + std::distance(unauthorized_it, request.data.topics.end())); + std::transform( + unauthorized_it, + request.data.topics.end(), + std::back_inserter(resp.data.responses), + [](const topic_produce_data& t) { + topic_produce_response r; + r.name = t.name; + r.partitions.reserve(t.partitions.size()); + for (const auto& p : t.partitions) { + r.partitions.emplace_back(partition_produce_response{ + .partition_index = p.partition_index, + .error_code = error_code::topic_authorization_failed, + }); + } + + return r; + }); + + request.data.topics.erase(unauthorized_it, request.data.topics.end()); + ss::promise<> dispatched_promise; auto dispatched_f = dispatched_promise.get_future(); auto produced_f = ss::do_with( @@ -756,7 +802,10 @@ produce_handler::handle(request_context ctx, ss::smp_service_group ssg) { return when_all_succeed(produced.begin(), produced.end()) .then( [&octx](std::vector topics) { - octx.response.data.responses = std::move(topics); + std::move( + topics.begin(), + topics.end(), + std::back_inserter(octx.response.data.responses)); }) .then([&octx] { // send response immediately diff --git a/src/v/kafka/server/server.cc b/src/v/kafka/server/server.cc index 3d4856ee09898..4409a176601c3 100644 --- a/src/v/kafka/server/server.cc +++ b/src/v/kafka/server/server.cc @@ -16,6 +16,7 @@ #include "config/configuration.h" #include "config/node_config.h" #include "features/feature_table.h" +#include "kafka/protocol/schemata/list_groups_response.h" #include "kafka/server/connection_context.h" #include "kafka/server/coordinator_ntp_mapper.h" #include "kafka/server/errors.h" @@ -515,21 +516,43 @@ ss::future list_groups_handler::handle( resp.data.error_code = error; resp.data.groups = std::move(groups); - if (ctx.authorized( - security::acl_operation::describe, security::default_cluster_name)) { - co_return co_await ctx.respond(std::move(resp)); + auto cluster_authz = ctx.authorized( + security::acl_operation::describe, security::default_cluster_name); + + if (!cluster_authz) { + // remove groups from response that should not be visible + auto non_visible_it = std::partition( + resp.data.groups.begin(), + resp.data.groups.end(), + [&ctx](const listed_group& group) { + return ctx.authorized( + security::acl_operation::describe, group.group_id); + }); + + resp.data.groups.erase(non_visible_it, resp.data.groups.end()); } - // remove groups from response that should not be visible - auto non_visible_it = std::partition( - resp.data.groups.begin(), - resp.data.groups.end(), - [&ctx](const listed_group& group) { - return ctx.authorized( - security::acl_operation::describe, group.group_id); - }); + auto additional_resources_func = [&resp, cluster_authz]() { + std::vector groups; + if (!cluster_authz) { + return groups; + } + + groups.reserve(resp.data.groups.size()); + std::transform( + resp.data.groups.begin(), + resp.data.groups.end(), + std::back_inserter(groups), + [](const listed_group& g) { return g.group_id; }); + + return groups; + }; - resp.data.groups.erase(non_visible_it, resp.data.groups.end()); + if (!ctx.audit(std::move(additional_resources_func))) { + resp.data.groups.clear(); + resp.data.error_code = error_code::broker_not_available; + co_return co_await ctx.respond(std::move(resp)); + } co_return co_await ctx.respond(std::move(resp)); } @@ -631,7 +654,15 @@ process_result_stages join_group_handler::handle( ctx.respond(join_group_response(error_code::policy_violation))); } - if (!ctx.authorized(security::acl_operation::read, request.data.group_id)) { + auto authz = ctx.authorized( + security::acl_operation::read, request.data.group_id); + + if (!ctx.audit()) { + return process_result_stages::single_stage( + ctx.respond(join_group_response(error_code::broker_not_available))); + } + + if (!authz) { return process_result_stages::single_stage(ctx.respond( join_group_response(error_code::group_authorization_failed))); } @@ -662,6 +693,21 @@ ss::future delete_groups_handler::handle( return ctx.authorized(security::acl_operation::remove, group); }); + if (!ctx.audit()) { + std::vector resp; + resp.reserve(request.data.groups_names.size()); + std::transform( + request.data.groups_names.begin(), + request.data.groups_names.end(), + std::back_inserter(resp), + [](const kafka::group_id& g) { + return deletable_group_result{ + .group_id = g, .error_code = error_code::broker_not_available}; + }); + + co_return co_await ctx.respond(delete_groups_response(std::move(resp))); + } + std::vector unauthorized( std::make_move_iterator(unauthorized_it), std::make_move_iterator(request.data.groups_names.end())); @@ -899,8 +945,13 @@ offset_fetch_handler::handle(request_context ctx, ss::smp_service_group) { log_request(ctx.header(), request); if (!ctx.authorized( security::acl_operation::describe, request.data.group_id)) { - co_return co_await ctx.respond( - offset_fetch_response(error_code::group_authorization_failed)); + if (!ctx.audit()) { + co_return co_await ctx.respond( + offset_fetch_response(error_code::broker_not_available)); + } else { + co_return co_await ctx.respond( + offset_fetch_response(error_code::group_authorization_failed)); + } } /* @@ -926,6 +977,13 @@ offset_fetch_handler::handle(request_context ctx, ss::smp_service_group) { topic.name, authz_quiet{true}); }); + + if (!ctx.audit()) { + resp.data.topics.clear(); + resp.data.error_code = error_code::broker_not_available; + co_return co_await ctx.respond(std::move(resp)); + } + resp.data.topics.erase(unauthorized, resp.data.topics.end()); co_return co_await ctx.respond(std::move(resp)); @@ -941,6 +999,11 @@ offset_fetch_handler::handle(request_context ctx, ss::smp_service_group) { return ctx.authorized(security::acl_operation::describe, topic.name); }); + if (!ctx.audit()) { + co_return co_await ctx.respond( + offset_fetch_response(error_code::broker_not_available)); + } + std::vector unauthorized( std::make_move_iterator(unauthorized_it), std::make_move_iterator(request.data.topics->end())); @@ -972,8 +1035,13 @@ offset_delete_handler::handle(request_context ctx, ss::smp_service_group) { log_request(ctx.header(), request); if (!ctx.authorized( security::acl_operation::remove, request.data.group_id)) { - co_return co_await ctx.respond( - offset_fetch_response(error_code::group_authorization_failed)); + if (!ctx.audit()) { + co_return co_await ctx.respond( + offset_delete_response(error_code::broker_not_available)); + } else { + co_return co_await ctx.respond( + offset_delete_response(error_code::group_authorization_failed)); + } } /// Remove unauthorized topics from request @@ -984,6 +1052,11 @@ offset_delete_handler::handle(request_context ctx, ss::smp_service_group) { return ctx.authorized(security::acl_operation::read, topic.name); }); + if (!ctx.audit()) { + co_return co_await ctx.respond( + offset_delete_response(error_code::broker_not_available)); + } + std::vector unauthorized( std::make_move_iterator(unauthorized_it), std::make_move_iterator(request.data.topics.end())); @@ -1060,6 +1133,31 @@ delete_topics_handler::handle(request_context ctx, ss::smp_service_group) { return ctx.authorized(security::acl_operation::remove, topic); }); + if (!ctx.audit()) { + delete_topics_response resp; + std::transform( + request.data.topic_names.begin(), + request.data.topic_names.end(), + std::back_inserter(resp.data.responses), + [](const model::topic& t) { + return deletable_topic_result{ + .name = t, .error_code = error_code::broker_not_available}; + }); + + std::transform( + request.data.topics.begin(), + request.data.topics.end(), + std::back_inserter(resp.data.responses), + [](const delete_topic_state& t) { + return deletable_topic_result{ + .name = t.name, + .error_code = error_code::broker_not_available, + }; + }); + + co_return co_await ctx.respond(std::move(resp)); + } + std::vector unauthorized( std::make_move_iterator(unauthorized_it), std::make_move_iterator(request.data.topic_names.end())); @@ -1186,8 +1284,12 @@ ss::future init_producer_id_handler::handle( security::acl_operation::write, transactional_id(*request.data.transactional_id))) { init_producer_id_response reply; - reply.data.error_code - = error_code::transactional_id_authorization_failed; + if (!ctx.audit()) { + reply.data.error_code = error_code::broker_not_available; + } else { + reply.data.error_code + = error_code::transactional_id_authorization_failed; + } return ctx.respond(reply); } @@ -1269,15 +1371,24 @@ ss::future init_producer_id_handler::handle( } } + bool cluster_authorized = false; + if (!permitted) { - if (!ctx.authorized( - security::acl_operation::idempotent_write, - security::default_cluster_name)) { - init_producer_id_response reply; - reply.data.error_code - = error_code::cluster_authorization_failed; - return ctx.respond(reply); - } + cluster_authorized = ctx.authorized( + security::acl_operation::idempotent_write, + security::default_cluster_name); + } + + if (!ctx.audit()) { + init_producer_id_response reply; + reply.data.error_code = error_code::broker_not_available; + return ctx.respond(std::move(reply)); + } + + if (!permitted && !cluster_authorized) { + init_producer_id_response reply; + reply.data.error_code = error_code::cluster_authorization_failed; + return ctx.respond(std::move(reply)); } return ctx.id_allocator_frontend() @@ -1534,6 +1645,27 @@ offset_commit_handler::handle(request_context ctx, ss::smp_service_group ssg) { } } + if (!octx.rctx.audit()) { + offset_commit_response resp( + octx.request, error_code::broker_not_available); + for (auto& unauthorized : octx.unauthorized_tps) { + offset_commit_response_topic tmp; + tmp.name = unauthorized.first; + tmp.partitions = std::move(unauthorized.second); + resp.data.topics.emplace_back(std::move(tmp)); + } + + for (auto& nonexistent : octx.nonexistent_tps) { + offset_commit_response_topic tmp; + tmp.name = nonexistent.first; + tmp.partitions = std::move(nonexistent.second); + resp.data.topics.emplace_back(std::move(tmp)); + } + + return process_result_stages::single_stage( + octx.rctx.respond(std::move(resp))); + } + // all of the topics either don't exist or failed authorization if (unlikely(octx.request.data.topics.empty())) { offset_commit_response resp; @@ -1616,6 +1748,21 @@ describe_groups_handler::handle(request_context ctx, ss::smp_service_group) { return ctx.authorized(security::acl_operation::describe, id); }); + if (!ctx.audit()) { + describe_groups_response resp; + resp.data.groups.reserve(request.data.groups.size()); + std::transform( + request.data.groups.begin(), + request.data.groups.end(), + std::back_inserter(resp.data.groups), + [](const kafka::group_id& g) { + return described_group{ + .error_code = error_code::broker_not_available, .group_id = g}; + }); + + co_return co_await ctx.respond(std::move(resp)); + } + std::vector unauthorized( std::make_move_iterator(unauthorized_it), std::make_move_iterator(request.data.groups.end())); @@ -1715,6 +1862,11 @@ list_transactions_handler::handle(request_context ctx, ss::smp_service_group) { response.data.transaction_states.push_back(std::move(tx_state)); } } + + if (!ctx.audit()) { + response.data.transaction_states.clear(); + response.data.error_code = error_code::broker_not_available; + } } else { // In this 2 errors not coordinator got request and we just return empty // array diff --git a/src/v/security/audit/audit_log_manager.cc b/src/v/security/audit/audit_log_manager.cc index fdf3de37e65bc..0784442e3b0fd 100644 --- a/src/v/security/audit/audit_log_manager.cc +++ b/src/v/security/audit/audit_log_manager.cc @@ -1,11 +1,12 @@ -// Copyright 2023 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ #include "security/audit/audit_log_manager.h" diff --git a/src/v/security/audit/audit_log_manager.h b/src/v/security/audit/audit_log_manager.h index 7b15dc4ece6e9..3ac96dd4ea5a5 100644 --- a/src/v/security/audit/audit_log_manager.h +++ b/src/v/security/audit/audit_log_manager.h @@ -1,11 +1,12 @@ -// Copyright 2023 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ #pragma once #include "cluster/fwd.h" diff --git a/src/v/security/audit/client_probe.h b/src/v/security/audit/client_probe.h index 54526bff9f0a8..54038224e216a 100644 --- a/src/v/security/audit/client_probe.h +++ b/src/v/security/audit/client_probe.h @@ -1,12 +1,11 @@ /* * Copyright 2023 Redpanda Data, Inc. * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ #pragma once diff --git a/src/v/security/audit/logger.cc b/src/v/security/audit/logger.cc index 26e24f90648e8..7fb4ee558eb03 100644 --- a/src/v/security/audit/logger.cc +++ b/src/v/security/audit/logger.cc @@ -1,14 +1,12 @@ /* * Copyright 2023 Redpanda Data, Inc. * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ - #include "cluster/logger.h" namespace security::audit { diff --git a/src/v/security/audit/logger.h b/src/v/security/audit/logger.h index 01b835067fdf2..049b8cbf11c61 100644 --- a/src/v/security/audit/logger.h +++ b/src/v/security/audit/logger.h @@ -1,12 +1,11 @@ /* * Copyright 2023 Redpanda Data, Inc. * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ #pragma once #include diff --git a/src/v/security/audit/probe.h b/src/v/security/audit/probe.h index 9005b6fe170f6..cc70f97a978bc 100644 --- a/src/v/security/audit/probe.h +++ b/src/v/security/audit/probe.h @@ -1,12 +1,11 @@ /* * Copyright 2023 Redpanda Data, Inc. * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ #pragma once diff --git a/src/v/security/audit/probes.cc b/src/v/security/audit/probes.cc index decb9c7158f87..1819a26acb17f 100644 --- a/src/v/security/audit/probes.cc +++ b/src/v/security/audit/probes.cc @@ -1,12 +1,11 @@ /* * Copyright 2023 Redpanda Data, Inc. * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ #include "config/configuration.h" diff --git a/src/v/security/audit/schemas/application_activity.h b/src/v/security/audit/schemas/application_activity.h index 9fd38f9eb904c..3e752a66d80ef 100644 --- a/src/v/security/audit/schemas/application_activity.h +++ b/src/v/security/audit/schemas/application_activity.h @@ -1,12 +1,11 @@ /* * Copyright 2023 Redpanda Data, Inc. * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ #pragma once @@ -48,12 +47,14 @@ class api_activity final : public ocsf_base_event { : ocsf_base_event( category_uid::application_activity, class_uid::api_activity, + ocsf_redpanda_metadata_cloud_profile(), severity_id, time, activity_id) , _activity_id(activity_id) , _actor(std::move(actor)) , _api(std::move(api)) + , _cloud(cloud{.provider = ""}) , _dst_endpoint(std::move(dst_endpoint)) , _http_request(std::move(http_request)) , _resources(std::move(resources)) @@ -78,6 +79,7 @@ class api_activity final : public ocsf_base_event { activity_id _activity_id; actor _actor; api _api; + cloud _cloud; network_endpoint _dst_endpoint; std::optional _http_request; std::vector _resources; @@ -97,6 +99,8 @@ class api_activity final : public ocsf_base_event { ::json::rjson_serialize(w, a._actor); w.Key("api"); ::json::rjson_serialize(w, a._api); + w.Key("cloud"); + ::json::rjson_serialize(w, a._cloud); w.Key("dst_endpoint"); ::json::rjson_serialize(w, a._dst_endpoint); if (a._http_request) { diff --git a/src/v/security/audit/schemas/iam.h b/src/v/security/audit/schemas/iam.h index 22e36061b7e63..eaf76b777d63a 100644 --- a/src/v/security/audit/schemas/iam.h +++ b/src/v/security/audit/schemas/iam.h @@ -1,12 +1,11 @@ /* * Copyright 2023 Redpanda Data, Inc. * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ #pragma once diff --git a/src/v/security/audit/schemas/schemas.h b/src/v/security/audit/schemas/schemas.h index f65101285cfd1..3dbb569bd642a 100644 --- a/src/v/security/audit/schemas/schemas.h +++ b/src/v/security/audit/schemas/schemas.h @@ -1,14 +1,12 @@ /* * Copyright 2023 Redpanda Data, Inc. * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ - #pragma once #include "security/audit/schemas/types.h" @@ -102,6 +100,22 @@ class ocsf_base_event : public ocsf_base_impl { , _time(time) , _type_uid(get_ocsf_type(this->_class_uid, activity_id)) {} + template + ocsf_base_event( + category_uid category_uid, + class_uid class_uid, + metadata metadata, + severity_id severity_id, + timestamp_t time, + T activity_id) + : _category_uid(category_uid) + , _class_uid(class_uid) + , _metadata(std::move(metadata)) + , _severity_id(severity_id) + , _start_time(time) + , _time(time) + , _type_uid(get_ocsf_type(this->_class_uid, activity_id)) {} + virtual size_t hash() const = 0; void rjson_serialize(::json::Writer<::json::StringBuffer>& w) const { diff --git a/src/v/security/audit/schemas/tests/ocsf_schemas_test.cc b/src/v/security/audit/schemas/tests/ocsf_schemas_test.cc index b4b0cf6e199fb..8dfd0f4e526aa 100644 --- a/src/v/security/audit/schemas/tests/ocsf_schemas_test.cc +++ b/src/v/security/audit/schemas/tests/ocsf_schemas_test.cc @@ -1,12 +1,11 @@ /* * Copyright 2023 Redpanda Data, Inc. * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ #include "json/json.h" @@ -164,6 +163,20 @@ static const ss::sstring metadata_ser{ } )"}; +static const ss::sstring metadata_cloud_profile_ser{ + R"( +{ + "product": { + "name": "Redpanda", + "vendor_name": "Redpanda Data, Inc.", + "version": ")" + + ss::sstring{redpanda_git_version()} + R"(" + }, + "profiles": ["cloud"], + "version": "1.0.0" +} + )"}; + static const sa::http_header test_header{ .name = "Accept-Encoding", .value = "application/json"}; @@ -247,7 +260,7 @@ BOOST_AUTO_TEST_CASE(validate_api_activity) { "category_uid": 6, "class_uid": 6003, "metadata": )" - + metadata_ser + R"(, + + metadata_cloud_profile_ser + R"(, "severity_id": 1, "time": )" + ss::to_sstring(now) + R"(, @@ -261,6 +274,7 @@ BOOST_AUTO_TEST_CASE(validate_api_activity) { }, "api": )" + api_create_topic_ser + R"(, + "cloud": { "provider": "" }, "dst_endpoint": )" + rp_kafka_endpoint_ser + R"(, "http_request": )" @@ -739,6 +753,7 @@ BOOST_AUTO_TEST_CASE(make_api_activity_event_authorized) { "name": "http" }} }}, + "cloud": {{ "provider": "" }}, "dst_endpoint": {{ "ip": "10.1.1.1", "port": 23456, @@ -778,7 +793,7 @@ BOOST_AUTO_TEST_CASE(make_api_activity_event_authorized) { "unmapped": {{}} }} )", - fmt::arg("metadata", metadata_ser), + fmt::arg("metadata", metadata_cloud_profile_ser), fmt::arg("time", ss::to_sstring(api_activity.get_time())), fmt::arg("username", username), fmt::arg("method", method), @@ -927,6 +942,7 @@ BOOST_AUTO_TEST_CASE(make_api_activity_event_authorized_authn_disabled) { "name": "http" }} }}, + "cloud": {{ "provider": "" }}, "dst_endpoint": {{ "ip": "10.1.1.1", "port": 23456, @@ -966,7 +982,7 @@ BOOST_AUTO_TEST_CASE(make_api_activity_event_authorized_authn_disabled) { "unmapped": {{}} }} )", - fmt::arg("metadata", metadata_ser), + fmt::arg("metadata", metadata_cloud_profile_ser), fmt::arg("time", ss::to_sstring(api_activity.get_time())), fmt::arg("method", method), fmt::arg("user_agent", user_agent), diff --git a/src/v/security/audit/schemas/types.h b/src/v/security/audit/schemas/types.h index 3b898b8397def..595a664698090 100644 --- a/src/v/security/audit/schemas/types.h +++ b/src/v/security/audit/schemas/types.h @@ -1,12 +1,11 @@ /* * Copyright 2023 Redpanda Data, Inc. * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ #pragma once @@ -127,9 +126,12 @@ static inline const product& redpanda_product() { struct metadata { product product; + std::vector profiles; ss::sstring version; - auto equality_fields() const { return std::tie(product, version); } + auto equality_fields() const { + return std::tie(product, profiles, version); + } }; static inline const metadata& ocsf_redpanda_metadata() { @@ -139,6 +141,15 @@ static inline const metadata& ocsf_redpanda_metadata() { return ocsf_metadata; } +static inline const metadata& ocsf_redpanda_metadata_cloud_profile() { + static const metadata ocsf_metadata{ + .product = redpanda_product(), + .profiles = {"cloud"}, + .version = ss::sstring{ocsf_api_version}}; + + return ocsf_metadata; +} + struct network_endpoint { std::vector intermediate_ips; net::unresolved_address addr; @@ -278,6 +289,12 @@ struct http_request { return std::tie(http_headers, http_method, url, user_agent, version); } }; + +struct cloud { + ss::sstring provider; + + auto equality_fields() const { return std::tie(provider); } +}; } // namespace security::audit namespace json { @@ -315,6 +332,10 @@ inline void rjson_serialize(Writer& w, const sa::metadata& m) { w.StartObject(); w.Key("product"); rjson_serialize(w, m.product); + if (!m.profiles.empty()) { + w.Key("profiles"); + rjson_serialize(w, m.profiles); + } w.Key("version"); rjson_serialize(w, m.version); w.EndObject(); @@ -518,6 +539,13 @@ rjson_serialize(Writer& w, const sa::http_request& r) { rjson_serialize(w, r.version); w.EndObject(); } + +inline void rjson_serialize(Writer& w, const sa::cloud& c) { + w.StartObject(); + w.Key("provider"); + rjson_serialize(w, c.provider); + w.EndObject(); +} } // namespace json namespace std { diff --git a/src/v/security/audit/schemas/utils.cc b/src/v/security/audit/schemas/utils.cc index d181ae86e3ed6..fc4a39ad6f247 100644 --- a/src/v/security/audit/schemas/utils.cc +++ b/src/v/security/audit/schemas/utils.cc @@ -1,12 +1,11 @@ /* * Copyright 2023 Redpanda Data, Inc. * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ #include "security/audit/schemas/utils.h" diff --git a/src/v/security/audit/schemas/utils.h b/src/v/security/audit/schemas/utils.h index 661dd9bd2eef9..54422cab59632 100644 --- a/src/v/security/audit/schemas/utils.h +++ b/src/v/security/audit/schemas/utils.h @@ -1,12 +1,11 @@ /* * Copyright 2023 Redpanda Data, Inc. * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ #pragma once diff --git a/src/v/security/audit/tests/audit_log_test.cc b/src/v/security/audit/tests/audit_log_test.cc index 6aea116963ab2..10f6dcb0314c5 100644 --- a/src/v/security/audit/tests/audit_log_test.cc +++ b/src/v/security/audit/tests/audit_log_test.cc @@ -1,11 +1,12 @@ -// Copyright 2023 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ #include "cluster/types.h" #include "kafka/types.h" diff --git a/src/v/security/audit/types.h b/src/v/security/audit/types.h index 3cf6efc715b0d..a8cd6335a54a3 100644 --- a/src/v/security/audit/types.h +++ b/src/v/security/audit/types.h @@ -1,14 +1,12 @@ /* * Copyright 2023 Redpanda Data, Inc. * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.md + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ - #pragma once #include "utils/string_switch.h" diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index d36ffa9d11b9f..ff84b0c0b7414 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -214,6 +214,12 @@ RUN /tinygo-wasi-transforms && \ ################################# +FROM base as ocsf +COPY --chown=0:0 --chmod=0755 tests/docker/ducktape-deps/ocsf-server / +RUN /ocsf-server && rm /ocsf-server + +################################# + FROM librdkafka as final COPY --chown=0:0 --chmod=0755 tests/docker/ducktape-deps/teleport / @@ -290,6 +296,8 @@ COPY --from=kgo-verifier /opt/kgo-verifier /opt/kgo-verifier COPY --from=byoc-mock /opt/redpanda-tests/go/byoc-mock/.rpk.managed-byoc /root/.local/bin/.rpk.managed-byoc COPY --from=keycloak /opt/keycloak/ /opt/keycloak/ COPY --from=wasi-transforms /opt/transforms/ /opt/transforms/ +COPY --from=ocsf /opt/ocsf-schema/ /opt/ocsf-schema/ +COPY --from=ocsf /opt/ocsf-server/ /opt/ocsf-server/ RUN ldconfig diff --git a/tests/docker/ducktape-deps/ocsf-server b/tests/docker/ducktape-deps/ocsf-server new file mode 100644 index 0000000000000..5d0ac1cdea435 --- /dev/null +++ b/tests/docker/ducktape-deps/ocsf-server @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +set -e + +OCSF_SCHEMA_VERSION=1.0.0 +OCSF_SERVER_VERSION=d3b26de39df9eb33c6d63e34a126c77c0811c7a0 + +wget "https://github.com/redpanda-data/ocsf-schema/archive/refs/tags/v${OCSF_SCHEMA_VERSION}.tar.gz" +wget "https://github.com/redpanda-data/ocsf-server/archive/${OCSF_SERVER_VERSION}.tar.gz" + +tar -xvzf v${OCSF_SCHEMA_VERSION}.tar.gz +rm v${OCSF_SCHEMA_VERSION}.tar.gz +mv ocsf-schema-${OCSF_SCHEMA_VERSION} /opt/ocsf-schema + +tar -xvzf ${OCSF_SERVER_VERSION}.tar.gz +rm ${OCSF_SERVER_VERSION}.tar.gz +mv ocsf-server-${OCSF_SERVER_VERSION} /opt/ocsf-server + +apt-get update +apt-get install -qq software-properties-common + +add-apt-repository -y ppa:rabbitmq/rabbitmq-erlang +apt-get update +apt-get install -qq elixir erlang-dev erlang-xmerl + +mix local.hex --force && mix local.rebar --force + +pushd /opt/ocsf-server +./build_server.sh +openssl req -new -newkey rsa:2048 -sha256 -days 365 -nodes -x509 -keyout server.key -out server.crt -subj "/O=Redpanda" -batch +popd diff --git a/tests/rptest/services/ocsf_server.py b/tests/rptest/services/ocsf_server.py new file mode 100644 index 0000000000000..ac9bba9b855e2 --- /dev/null +++ b/tests/rptest/services/ocsf_server.py @@ -0,0 +1,148 @@ +# Copyright 2023 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import json +import os +import requests + +from ducktape.cluster.remoteaccount import RemoteCommandError +from ducktape.services.service import Service +from rptest.util import wait_until_result + +SERVER_DIR = '/opt/ocsf-server' +SCHEMA_DIR = '/opt/ocsf-schema' +KEY_FILE = os.path.join(SERVER_DIR, "server.key") +CRT_FILE = os.path.join(SERVER_DIR, "server.crt") +SCHEMA_SERVER_BIN = os.path.join(SERVER_DIR, "dist/bin/schema_server") + +CMD_TMPL = "HTTPS_KEY_FILE={key_file} HTTPS_CERT_FILE={cert_file} SCHEMA_DIR={schema_dir} {bin_file} {cmd}" + +HTTP_PORT = 8000 + + +class OcsfSchemaError(Exception): + """Exception used by OCSF services + """ + def __init__(self, error): + super(OcsfSchemaError, self).__init__(error) + + +class OcsfServer(Service): + """Service used to start and interact with the OCSF server + + This service provides the ability to validate OCSF schema + """ + def __init__(self, context): + super(OcsfServer, self).__init__(context, num_nodes=1) + + def _format_cmd(self, cmd: str): + return CMD_TMPL.format(key_file=KEY_FILE, + cert_file=CRT_FILE, + schema_dir=SCHEMA_DIR, + bin_file=SCHEMA_SERVER_BIN, + cmd=cmd) + + def _start_cmd(self): + return self._format_cmd("daemon") + + def _pid_cmd(self): + return self._format_cmd("pid") + + def _stop_cmd(self): + return self._format_cmd("stop") + + def _create_uri(self, node, path): + if node is None: + node = self.nodes[0] + hostname = node.account.hostname + return f'http://{hostname}:{HTTP_PORT}/api/{path}' + + def get_api_version(self, node=None): + """Returns current verison of OCSF server + + Parameters + ---------- + node: default=None + + Returns + ------- + Version of the OCSF server + """ + uri = self._create_uri(node, 'version') + self.logger.debug(f'Getting version via "{uri}"') + + def _wait_for_api_version(): + r = requests.get(uri) + if r.status_code != 200: + return (False, None) + return (True, r) + + r = wait_until_result(_wait_for_api_version, + timeout_sec=5, + backoff_sec=1, + retry_on_exc=True, + err_msg=f'Could not get API version from {uri}') + + r = requests.get(uri) + if r.status_code != 200: + raise Exception(f'Unexepected status code: {r.status_code}') + return r.json()['version'] + + def validate_schema(self, schema, node=None): + """Validates a provided schema + + Will throw an OcsfSchemaError if the schema fails to validate + + Parameters + ---------- + schema : json + The schema to generate + + node : default=None + """ + uri = self._create_uri(node, 'validate') + self.logger.debug( + f'Attempting to validate schema {schema} against {uri}') + r = requests.post(uri, + headers={ + 'content-type': 'application/json', + 'accept': 'application/json' + }, + json=schema) + if r.status_code != 200: + raise Exception(f'Unexpected status code: {r.status_code}') + self.logger.debug(f'Response from server: {r.json()}') + resp = r.json() + if len(resp) != 0: + raise OcsfSchemaError(json.dumps(resp)) + + def pids(self, node): + pid_cmd = self._pid_cmd() + self.logger.debug(f'Getting OCSF server pid with cmd "{pid_cmd}"') + try: + line = node.account.ssh_capture(pid_cmd, + allow_fail=False, + callback=int) + p = next(line) + if node.account.alive(p): + return [p] + except (RemoteCommandError, ValueError): + self.logger.warn(f'pid file not found for ocsf server') + + return [] + + def start_node(self, node): + cmd = self._start_cmd() + self.logger.debug(f'Starting OCSF Server with cmd "{cmd}"') + node.account.ssh(cmd, allow_fail=False) + + def stop_node(self, node): + cmd = self._stop_cmd() + self.logger.debug(f'Stopping OCSF server with cmd "{cmd}"') + node.account.ssh(cmd, allow_fail=True) diff --git a/tests/rptest/tests/audit_log_test.py b/tests/rptest/tests/audit_log_test.py index 4b8a61e707396..ab03a22827464 100644 --- a/tests/rptest/tests/audit_log_test.py +++ b/tests/rptest/tests/audit_log_test.py @@ -7,6 +7,7 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 +import confluent_kafka as ck from functools import partial, reduce import json import re @@ -14,17 +15,19 @@ from ducktape.cluster.cluster import ClusterNode from ducktape.mark import ok_to_fail +from rptest.clients.default import DefaultClient +from rptest.clients.kcl import KCL from rptest.clients.rpk import RpkTool from rptest.services import tls from rptest.services.admin import Admin from rptest.services.cluster import cluster from rptest.services import redpanda +from rptest.services.ocsf_server import OcsfServer from rptest.services.redpanda import LoggingConfig, MetricSamples, MetricsEndpoint, SecurityConfig from rptest.services.rpk_consumer import RpkConsumer from rptest.tests.cluster_config_test import wait_for_version_sync from rptest.tests.redpanda_test import RedpandaTest from rptest.util import wait_until, wait_until_result -from rptest.utils.audit_schemas import validate_audit_schema class BaseTestItem: @@ -257,6 +260,7 @@ def __init__( self.rpk = self.get_rpk() self.super_rpk = self.get_super_rpk() self.admin = Admin(self.redpanda) + self.ocsf_server = OcsfServer(test_context) def get_rpk_credentials(self, username: str, password: str, mechanism: str) -> RpkTool: @@ -300,6 +304,10 @@ def setUp(self): """Initializes the Redpanda node and waits for audit log to be present """ super().setUp() + self.ocsf_server.start() + self.logger.debug( + f'Running OCSF Server Version {self.ocsf_server.get_api_version(None)}' + ) self.wait_for_audit_log() def wait_for_audit_log(self): @@ -342,6 +350,17 @@ def api_resource_match(expected_api_op, resource_entry, service_name, 'operation'] == expected_api_op and resource_entry in record[ 'resources'] + @staticmethod + def multi_api_resource_match(expected: list[dict[str, dict[str, str]]], + service_name, record): + for items in expected: + for expected_api_op, resource_entry in items.items(): + if AuditLogTestsBase.api_resource_match( + expected_api_op, resource_entry, service_name, record): + return True + + return False + @staticmethod def api_match(expected_api_op, service_name, record): return record['class_uid'] == 6003 and record['api']['service'][ @@ -416,11 +435,12 @@ def read_all_from_audit_log(self, List of records as json objects """ class MessageMapper(): - def __init__(self, logger, filter_fn, stop_cond): + def __init__(self, logger, filter_fn, stop_cond, ocsf_server): self.logger = logger self.records = [] self.filter_fn = filter_fn self.stop_cond = stop_cond + self.ocsf_server = ocsf_server self.next_offset_ingest = 0 def ingest(self, records): @@ -431,13 +451,22 @@ def ingest(self, records): for rec in new_records: self.logger.debug(f'{rec}') self.logger.info(f"Ingested: {len(new_records)} records") - [validate_audit_schema(record) for record in new_records] + [ + self.ocsf_server.validate_schema(record) + for record in new_records + ] + for r in new_records: + if self.filter_fn(r): + self.logger.info(f'Selected {r}') + else: + self.logger.info(f'DID NOT SELECT {r}') self.records += [r for r in new_records if self.filter_fn(r)] def is_finished(self): return stop_cond(self.records) - mapper = MessageMapper(self.redpanda.logger, filter_fn, stop_cond) + mapper = MessageMapper(self.redpanda.logger, filter_fn, stop_cond, + self.ocsf_server) consumer = self.get_rpk_consumer(topic=self.audit_log, offset=start_offset) consumer.start() @@ -496,7 +525,7 @@ def __init__(self, test_context): 'trace' })) - @cluster(num_nodes=4) + @cluster(num_nodes=5) def test_audit_log_functioning(self): """ Ensures that the audit log can be produced to when the audit_enabled() @@ -555,7 +584,7 @@ def number_of_records_matching(filter_by, n_expected): _ = number_of_records_matching(api_keys, 1000) @ok_to_fail # https://github.com/redpanda-data/redpanda/issues/14565 - @cluster(num_nodes=3) + @cluster(num_nodes=4) def test_audit_log_metrics(self): """ Confirm that audit log metrics are present @@ -623,13 +652,24 @@ def __init__(self, test_context): 'kafka': 'trace' })) - @cluster(num_nodes=4) + self.kcl = KCL(self.redpanda) + self.default_client = DefaultClient(self.redpanda) + + @cluster(num_nodes=5) def test_management(self): """Validates management messages """ topic_name = 'test_mgmt_audit' + def alter_partition_reassignments_with_kcl( + kcl: KCL, topics: dict[str, dict[int, list[int]]]): + + kcl.alter_partition_reassignments(topics=topics) + + def alter_config(client, values: dict[str, Any], incremental: bool): + client.alter_broker_config(values, incremental) + tests = [ AbsoluteTestItem( f'Create Topic {topic_name}', @@ -638,6 +678,38 @@ def test_management(self): "name": f"{topic_name}", "type": "topic" }, self.kafka_rpc_service_name), 1), + AbsoluteTestItem( + f'Add partitions to {topic_name}', lambda: self.rpk. + add_partitions(topic=topic_name, partitions=3), + partial(self.api_resource_match, "create_partitions", { + "name": f"{topic_name}", + "type": "topic" + }, self.kafka_rpc_service_name), 1), + AbsoluteTestItem( + f'Attempt group offset delete', + lambda: self.execute_command_ignore_error( + partial(self.rpk.offset_delete, "fake", {topic_name: [0]}) + ), + partial(self.api_resource_match, "offset_delete", { + "name": "fake", + "type": "group" + }, self.kafka_rpc_service_name), + 5), # expect five because rpk will retry + RangeTestItem( + f'Attempting delete records for {topic_name}', + lambda: self.execute_command_ignore_error( + partial(self.rpk.trim_prefix, topic_name, 0)), + partial(self.api_resource_match, "delete_records", { + "name": f"{topic_name}", + "type": "topic" + }, self.kafka_rpc_service_name), 1, 3), + AbsoluteTestItem( + f'Delete Topic {topic_name}', + lambda: self.rpk.delete_topic(topic=topic_name), + partial(self.api_resource_match, "delete_topics", { + "name": f"{topic_name}", + "type": "topic" + }, self.kafka_rpc_service_name), 1), AbsoluteTestItem( f'Create ACL', lambda: self.rpk.allow_principal(principal="test", @@ -676,6 +748,42 @@ def test_management(self): "acl_permission": "allow" } }, self.kafka_rpc_service_name), 1), + AbsoluteTestItem( + f'Delete group test', + lambda: self.execute_command_ignore_error( + partial(self.rpk.group_delete, "test")), + partial(self.api_resource_match, "delete_groups", { + "name": "test", + "type": "group" + }, self.kafka_rpc_service_name), 1), + AbsoluteTestItem( + f'Alter Partition Reassignments', + lambda: self.execute_command_ignore_error( + partial(alter_partition_reassignments_with_kcl, self.kcl, + {topic_name: { + 1: [0] + }})), + partial(self.api_resource_match, + "alter_partition_reassignments", { + "name": topic_name, + "type": "topic" + }, self.kafka_rpc_service_name), 1), + AbsoluteTestItem( + f'Alter Config (not-incremental)', + lambda: self.execute_command_ignore_error( + partial(alter_config, self.default_client, { + "log_message_timestamp_type": "CreateTime" + }, False)), + partial(self.api_match, "alter_configs", + self.kafka_rpc_service_name), 1), + AbsoluteTestItem( + f'Incremental Alter Config', + lambda: self.execute_command_ignore_error( + partial(alter_config, self.default_client, { + "log_message_timestamp_type": "CreateTime" + }, True)), + partial(self.api_match, "incremental_alter_configs", + self.kafka_rpc_service_name), 1), AbsoluteTestItem( f'List ACLs (no item)', lambda: self.rpk.acl_list(), partial(self.api_match, "list_acls", @@ -691,3 +799,58 @@ def test_management(self): test.generate_function() _ = self.find_matching_record(test.filter_function, test.valid_count, test.desc()) + + @cluster(num_nodes=5) + def test_produce(self): + """Validates produce audit messages + """ + + topic_name = 'test_produce_audit' + tx_topic_name = 'test_produce_tx_audit' + + self.rpk.create_topic(topic=topic_name, partitions=3) + self.rpk.create_topic(topic=tx_topic_name, partitions=3) + + def transaction_generate(): + producer = ck.Producer({ + 'bootstrap.servers': self.redpanda.brokers(), + 'transactional.id': '1' + }) + producer.init_transactions() + producer.begin_transaction() + producer.produce(tx_topic_name, '0', '0', 1) + producer.produce(tx_topic_name, '0', '1', 2) + producer.flush() + + tests = [ + AbsoluteTestItem( + f'Produce one message to {topic_name}', lambda: self.rpk. + produce(topic_name, key='Test key', msg='Test msg'), + partial(self.api_resource_match, "produce", { + "name": f'{topic_name}', + "type": "topic" + }, self.kafka_rpc_service_name), 1), + AbsoluteTestItem( + f'Produce two messages to {tx_topic_name}', + lambda: transaction_generate(), + partial(self.multi_api_resource_match, [{ + "produce": { + "name": f'{tx_topic_name}', + "type": "topic" + } + }, { + "produce": { + "name": "1", + "type": "transactional_id" + } + }], self.kafka_rpc_service_name), 4) + ] + + self.logger.debug("Modifying event types") + self.modify_audit_event_types(['produce']) + + for test in tests: + self.logger.info(f'Running test "{test.name}"') + test.generate_function() + _ = self.find_matching_record(test.filter_function, + test.valid_count, test.desc()) diff --git a/tests/rptest/utils/audit_schemas.py b/tests/rptest/utils/audit_schemas.py deleted file mode 100644 index 64d420bf4fb4b..0000000000000 --- a/tests/rptest/utils/audit_schemas.py +++ /dev/null @@ -1,463 +0,0 @@ -# Copyright 2023 Redpanda Data, Inc. -# -# Use of this software is governed by the Business Source License -# included in the file licenses/BSL.md -# -# As of the Change Date specified in that file, in accordance with -# the Business Source License, use of this software will be governed -# by the Apache License, Version 2.0 - -import jsonschema -import jsonschema.exceptions - -PRODUCT_SCHEMA = { - 'type': 'object', - 'properties': { - 'name': { - 'type': 'string' - }, - 'vendor_name': { - 'type': 'string' - }, - 'version': { - 'type': 'string' - } - }, - 'required': ['name', 'vendor_name', 'version'], - 'additionalProperties': False -} - -USER_SCHEMA = { - 'type': 'object', - 'properties': { - 'credential_uid': { - 'type': 'string' - }, - 'domain': { - 'type': 'string' - }, - 'name': { - 'type': 'string' - }, - 'type_id': { - 'type': 'number', - 'enum': [0, 1, 2, 3, 99] - } - }, - 'required': ['name', 'type_id'], - 'additionalProperties': False -} - -OCSF_BASE_SCHEMA = { - 'type': - 'object', - 'properties': { - 'category_uid': { - 'type': 'number', - 'enum': [1, 2, 3, 4, 5, 6] - }, - 'class_uid': { - 'type': - 'number', - 'enum': [ - 1001, 1002, 1003, 1004, 1005, 1006, 1007, 2001, 3001, 3002, - 3003, 3004, 3005, 3006, 4001, 4002, 4003, 4004, 4005, 4006, - 4007, 4008, 4009, 4010, 4011, 4012, 5001, 5002, 6001, 6002, - 6003, 6004 - ] - }, - 'count': { - 'type': 'number' - }, - 'end_time': { - 'type': 'number' - }, - 'metadata': { - 'type': 'object', - 'properties': { - 'version': { - 'type': 'string' - }, - 'product': PRODUCT_SCHEMA - }, - 'required': ['product', 'version'], - 'additionalProperties': False - }, - 'severity_id': { - 'type': 'number', - 'enum': [0, 1, 2, 3, 4, 5, 6, 99] - }, - 'start_time': { - 'type': 'number' - }, - 'time': { - 'type': 'number' - }, - 'type_uid': { - 'type': 'number' - } - }, - 'required': [ - 'category_uid', 'class_uid', 'metadata', 'severity_id', 'time', - 'type_uid' - ], - 'additionalProperties': - False -} - -ENDPOINT_SCHEMA = { - 'type': 'object', - 'properties': { - 'intermediate_ips': { - 'type': 'array' - }, - 'ip': { - 'type': 'string' - }, - 'name': { - 'type': 'string' - }, - 'port': { - 'type': 'number' - }, - 'svc_name': { - 'type': 'string' - }, - 'uid': { - 'type': 'string' - } - }, - 'required': ['ip', 'port'], - 'additionalProperties': False -} - -AUTH_METADATA_SCHEMA = { - 'type': 'object', - 'properties': { - 'acl_authorization': { - 'type': 'object', - 'properties': { - 'host': { - 'type': 'string' - }, - 'op': { - 'type': 'string' - }, - 'permission_type': { - 'type': 'string' - }, - 'principal': { - 'type': 'string' - } - }, - 'required': ['host', 'op', 'permission_type', 'principal'], - 'additionalProperties': False - }, - 'resource': { - 'type': 'object', - 'properties': { - 'name': { - 'type': 'string' - }, - 'pattern': { - 'type': 'string' - }, - 'type': { - 'type': 'string' - } - }, - 'required': ['name', 'pattern', 'type'], - 'additionalProperties': False - } - }, - 'required': ['acl_authorization', 'resource'], - 'additionalProperties': False -} - -API_ACTIVITY_SCHEMA = { - 'type': - 'object', - 'properties': { - 'activity_id': { - 'type': 'number', - 'enum': [0, 1, 2, 3, 4, 99] - }, - 'actor': { - 'type': 'object', - 'properties': { - 'authorizations': { - 'type': 'array', - 'items': { - 'type': 'object', - 'properties': { - 'decision': { - 'type': 'string' - }, - 'policy': { - 'type': 'object', - 'properties': { - 'desc': { - 'type': 'string' - }, - 'name': { - 'type': 'string' - } - }, - 'required': ['desc', 'name'], - 'additionalProperties': False - }, - }, - 'required': ['decision'], - 'additionalProperties': False - } - }, - 'user': USER_SCHEMA - }, - 'required': ['authorizations', 'user'], - 'additionalProperties': False - }, - 'api': { - 'type': 'object', - 'properties': { - 'operation': { - 'type': 'string' - }, - 'service': { - 'type': 'object', - 'properties': { - 'name': { - 'type': 'string' - } - }, - 'required': ['name'] - } - }, - 'required': ['operation', 'service'], - 'additionalProperties': False - }, - 'dst_endpoint': ENDPOINT_SCHEMA, - 'http_request': { - 'type': - 'object', - 'properties': { - 'http_headers': { - 'type': 'array', - 'items': { - 'type': 'object', - 'properties': { - 'name': { - 'type': 'string' - }, - 'value': { - 'type': 'string' - } - }, - 'required': ['name', 'value'], - 'additionalProperties': False - } - }, - 'http_method': { - 'type': 'string' - }, - 'url': { - 'type': - 'object', - 'properties': { - 'hostname': { - 'type': 'string' - }, - 'path': { - 'type': 'string' - }, - 'port': { - 'type': 'number' - }, - 'scheme': { - 'type': 'string' - }, - 'url_string': { - 'type': 'string' - } - }, - 'required': - ['hostname', 'path', 'port', 'scheme', 'url_string'], - 'additionalProperties': - False - }, - 'user_agent': { - 'type': 'string' - }, - 'version': { - 'type': 'string' - } - }, - 'required': - ['http_headers', 'http_method', 'url', 'user_agent', 'version'], - 'additionalProperties': - False - }, - 'resources': { - 'type': 'array', - 'items': { - 'type': 'object', - 'properties': { - 'name': { - 'type': 'string' - }, - 'type': { - 'type': 'string' - } - }, - 'required': ['name', 'type'], - }, - 'additionalProperties': False - }, - 'src_endpoint': ENDPOINT_SCHEMA, - 'status_id': { - 'type': 'number', - 'enum': [0, 1, 2, 99] - }, - 'unmapped': { - 'type': 'object', - 'properties': { - 'authorization_metadata': AUTH_METADATA_SCHEMA - }, - 'required': [], - 'additionalProperties': False - } - }, - 'required': [ - 'activity_id', 'actor', 'api', 'dst_endpoint', 'src_endpoint', - 'status_id', 'unmapped' - ], - 'additionalProperties': - False -} - -APPLICATION_ACTIVITY_SCHEMA = { - 'type': 'object', - 'properties': { - 'activity_id': { - 'type': 'number', - 'enum': [0, 1, 2, 3, 4, 99] - }, - 'app': PRODUCT_SCHEMA - }, - 'required': ['activity_id', 'app'], - 'additionalProperties': False -} - -AUTHENTICATION_SCHEMA = { - 'type': - 'object', - 'properties': { - 'activity_id': { - 'type': 'number', - 'enum': [0, 1, 2, 3, 4, 99] - }, - 'auth_protocol': { - 'type': 'string' - }, - 'auth_protocol_id': { - 'type': 'number', - 'enum': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 99] - }, - 'dst_endpoint': ENDPOINT_SCHEMA, - 'is_cleartext': { - 'type': 'boolean' - }, - 'is_mfa': { - 'type': 'boolean' - }, - 'service': { - 'type': 'object', - 'properties': { - 'name': { - 'type': 'string' - } - }, - 'required': ['name'] - }, - 'src_endpoint': ENDPOINT_SCHEMA, - 'status_id': { - 'type': 'number', - 'enum': [0, 1, 2, 99] - }, - 'status_detail': { - 'type': 'string' - }, - 'user': USER_SCHEMA - }, - 'required': [ - 'activity_id', 'auth_protocol_id', 'dst_endpoint', 'is_cleartext', - 'is_mfa', 'service', 'src_endpoint', 'status_id' - ], - 'additionalProperties': - False -} - - -# A proper schema is an application based schema + the fields that exist -# in the OCSF base schema. -def _create_schema(schema): - return { - 'type': 'object', - 'properties': schema['properties'] | OCSF_BASE_SCHEMA['properties'], - 'required': schema['required'] + OCSF_BASE_SCHEMA['required'], - 'additionalProperties': False - } - - -# All schemas outside of this list are nested within the supported structures -# and are not expected to be observed as-is in the audit log -VALID_AUDIT_SCHEMAS = [ - _create_schema(schema) for schema in - [API_ACTIVITY_SCHEMA, APPLICATION_ACTIVITY_SCHEMA, AUTHENTICATION_SCHEMA] -] - - -class AuditSchemeValidationException(Exception): - def __init__(self, excs): - failures = [str(exc) for exc in excs] - self.message = '\n'.join(failures) - - -def validate_audit_contents(message): - """ - This method verifies the validity of the contents of a message that - has already passed the schema check. - """ - if 'count' not in message: - return 'start_time' not in message and 'end_time' not in message - - if message['count'] <= 1: - return False - if 'start_time' not in message: - return False - if 'end_time' not in message: - return False - return True - - -def validate_audit_schema(message): - """ - Verify that a given message adheres to one of the supported audit log - record formats and if one of those checks pass, that the fields within - the records are themselves valid. - """ - excs = [] - for schema in VALID_AUDIT_SCHEMAS: - try: - jsonschema.validate(instance=message, schema=schema) - if not validate_audit_contents(message): - excs = [ - RuntimeError( - f'Message {message} failed to pass content validation, check "count", "start_time" and "end_time keys/values"' - ) - ] - break - return - except Exception as e: - excs.append(e) - assert len(excs) > 0 - raise AuditSchemeValidationException(excs)