Skip to content

Commit

Permalink
Merge branch 'aws-fluent-bit-cherry-pick' of github.com:zhihonl/priva…
Browse files Browse the repository at this point in the history
…te-fluent-bit into add-entity
  • Loading branch information
nathalapooja committed Oct 21, 2024
2 parents 740a57a + 526512e commit 5dc2e63
Showing 1 changed file with 25 additions and 32 deletions.
57 changes: 25 additions & 32 deletions plugins/out_cloudwatch_logs/cloudwatch_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -551,13 +551,14 @@ void remove_unneeded_field(msgpack_object *root_map, const char *nested_map_key,
msgpack_object_map root = root_map->via.map;

// Prepare to pack the modified root map (size may be unchanged or reduced)
msgpack_pack_map(pk, root.size-root_filtered_fields); // Assume no top-level key is removed
msgpack_pack_map(pk, root.size-root_filtered_fields);

for (uint32_t i = 0; i < root.size; i++) {
msgpack_object_kv root_kv = root.ptr[i];

// Check if this key matches the nested map key (e.g., "kubernetes")
if (root_kv.key.type == MSGPACK_OBJECT_STR &&
if (filtered_fields > 0 &&
root_kv.key.type == MSGPACK_OBJECT_STR &&
strncmp(root_kv.key.via.str.ptr, nested_map_key, root_kv.key.via.str.size) == 0 &&
root_kv.val.type == MSGPACK_OBJECT_MAP) {

Expand All @@ -566,7 +567,7 @@ void remove_unneeded_field(msgpack_object *root_map, const char *nested_map_key,

// Remove the unneeded key from the nested map
remove_key_from_nested_map(&root_kv.val.via.map, pk,filtered_fields);
} else if (root_kv.key.type == MSGPACK_OBJECT_STR && root_kv.key.via.str.size > AWS_ENTITY_PREFIX_LEN && strncmp(root_kv.key.via.str.ptr, AWS_ENTITY_PREFIX, AWS_ENTITY_PREFIX_LEN) == 0) {
} else if (root_filtered_fields > 0 && root_kv.key.type == MSGPACK_OBJECT_STR && root_kv.key.via.str.size > AWS_ENTITY_PREFIX_LEN && strncmp(root_kv.key.via.str.ptr, AWS_ENTITY_PREFIX, AWS_ENTITY_PREFIX_LEN) == 0) {
} else {
// Pack other key-value pairs unchanged
msgpack_pack_object(pk, root_kv.key);
Expand Down Expand Up @@ -598,35 +599,9 @@ int process_event(struct flb_cloudwatch *ctx, struct cw_flush *buf,
char *tmp_buf_ptr;

tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset;

if (ctx->add_entity && buf->current_stream->entity && buf->current_stream->entity->filter_count > 0) {
// Prepare a buffer to pack the modified map
msgpack_sbuffer sbuf;
msgpack_sbuffer_init(&sbuf);
msgpack_packer pk;
msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
remove_unneeded_field(obj, "kubernetes",&pk,buf->current_stream->entity->root_filter_count, buf->current_stream->entity->filter_count);

// Now, unpack the modified data into a new msgpack_object
msgpack_unpacked modified_unpacked;
msgpack_unpacked_init(&modified_unpacked);
msgpack_object modified_obj;
size_t modified_offset = 0;
if (msgpack_unpack_next(&modified_unpacked, sbuf.data, sbuf.size, &modified_offset)) {
modified_obj = modified_unpacked.data;
}

ret = flb_msgpack_to_json(tmp_buf_ptr,
buf->tmp_buf_size - buf->tmp_buf_offset,
&modified_obj);

msgpack_sbuffer_destroy(&sbuf);
msgpack_unpacked_destroy(&modified_unpacked);
} else {
ret = flb_msgpack_to_json(tmp_buf_ptr,
buf->tmp_buf_size - buf->tmp_buf_offset,
obj);
}
ret = flb_msgpack_to_json(tmp_buf_ptr,
buf->tmp_buf_size - buf->tmp_buf_offset,
obj);
if (ret <= 0) {
/*
* failure to write to buffer,
Expand Down Expand Up @@ -1239,6 +1214,24 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
}
if(ctx->kubernete_metadata_enabled && ctx->add_entity) {
update_or_create_entity(ctx,stream,map);
// Prepare a buffer to pack the modified map
if(stream->entity != NULL && (stream->entity->root_filter_count > 0 || stream->entity->filter_count > 0)) {
msgpack_sbuffer sbuf;
msgpack_sbuffer_init(&sbuf);
msgpack_packer pk;
msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
remove_unneeded_field(&map, "kubernetes",&pk,stream->entity->root_filter_count, stream->entity->filter_count);

// Now, unpack the modified data into a new msgpack_object
msgpack_unpacked modified_unpacked;
msgpack_unpacked_init(&modified_unpacked);
size_t modified_offset = 0;
if (msgpack_unpack_next(&modified_unpacked, sbuf.data, sbuf.size, &modified_offset)) {
map = modified_unpacked.data;
}
msgpack_sbuffer_destroy(&sbuf);
msgpack_unpacked_destroy(&modified_unpacked);
}
}

if (ctx->log_key) {
Expand Down

0 comments on commit 5dc2e63

Please sign in to comment.