From f46721fc64f2f1bbf2b186c0c14f0d64312de60e Mon Sep 17 00:00:00 2001 From: zhihonl <61301537+zhihonl@users.noreply.github.com> Date: Tue, 22 Oct 2024 10:36:25 -0400 Subject: [PATCH 1/3] Increment filter count when dynamic entity fields are modified (#25) --- plugins/out_cloudwatch_logs/cloudwatch_api.c | 21 +++++++++++++------ plugins/out_cloudwatch_logs/cloudwatch_logs.h | 3 +++ 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c index 6d0dcec2389..21831059b0c 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c @@ -1034,16 +1034,20 @@ void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map kube_key = val.via.map.ptr[j].key; kube_val = val.via.map.ptr[j].val; if(strncmp(kube_key.via.str.ptr, "aws_entity_service_name", kube_key.via.str.size) == 0) { - if(entity->key_attributes->name == NULL) { + if(!entity->service_name_found) { entity->filter_count++; - } else { + entity->service_name_found++; + } + if(entity->key_attributes->name != NULL) { flb_free(entity->key_attributes->name); } entity->key_attributes->name = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); } else if(strncmp(kube_key.via.str.ptr, "aws_entity_environment", kube_key.via.str.size) == 0) { - if(entity->key_attributes->environment == NULL) { + if(!entity->environment_found) { entity->filter_count++; - } else { + entity->environment_found++; + } + if(entity->key_attributes->environment != NULL) { flb_free(entity->key_attributes->environment); } entity->key_attributes->environment = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); @@ -1072,9 +1076,11 @@ void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map } entity->attributes->workload = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); } else if(strncmp(kube_key.via.str.ptr, "aws_entity_name_source", kube_key.via.str.size) == 0) { - if(entity->attributes->name_source == NULL) { + if(!entity->name_source_found) { entity->filter_count++; - } else { + entity->name_source_found++; + } + if(entity->attributes->name_source != NULL) { flb_free(entity->attributes->name_source); } entity->attributes->name_source = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size); @@ -1128,6 +1134,9 @@ void update_or_create_entity(struct flb_cloudwatch *ctx, struct log_stream *stre memset(stream->entity->attributes, 0, sizeof(entity_attributes)); stream->entity->filter_count = 0; stream->entity->root_filter_count = 0; + stream->entity->service_name_found = 0; + stream->entity->environment_found = 0; + stream->entity->name_source_found = 0; parse_entity(ctx,stream->entity,map, map.via.map.size); } else { diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.h b/plugins/out_cloudwatch_logs/cloudwatch_logs.h index 21816176fe2..df912e7b67f 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.h @@ -36,6 +36,9 @@ typedef struct entity { struct entity_key_attributes *key_attributes; struct entity_attributes *attributes; int filter_count; + int service_name_found; + int environment_found; + int name_source_found; int root_filter_count; }entity; From ba00160fde19002bf3b6698b53d92e22efa33747 Mon Sep 17 00:00:00 2001 From: zhihonl <61301537+zhihonl@users.noreply.github.com> Date: Tue, 22 Oct 2024 12:07:44 -0400 Subject: [PATCH 2/3] Move messagepack destroy to later stage to prevent memory issue (#26) --- plugins/out_cloudwatch_logs/cloudwatch_api.c | 28 ++++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c index 21831059b0c..af59e8b398f 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c @@ -1169,6 +1169,12 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, msgpack_object emf_payload; /* msgpack::sbuffer is a simple buffer implementation. */ msgpack_sbuffer mp_sbuf; + /* + * Msgpack objects used to store msgpack after filtering out fields + * with aws entity prefix + */ + msgpack_sbuffer filtered_sbuf; + msgpack_unpacked modified_unpacked; struct log_stream *stream; @@ -1216,6 +1222,10 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, map = root.via.array.ptr[1]; map_size = map.via.map.size; + if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + msgpack_sbuffer_init(&filtered_sbuf); + msgpack_unpacked_init(&modified_unpacked); + } stream = get_log_stream(ctx, tag, map); if (!stream) { flb_plg_debug(ctx->ins, "Couldn't determine log group & stream for record with tag %s", tag); @@ -1225,21 +1235,15 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, update_or_create_entity(ctx,stream,map); // Prepare a buffer to pack the modified map if(stream->entity != NULL && (stream->entity->root_filter_count > 0 || stream->entity->filter_count > 0)) { - msgpack_sbuffer sbuf; - msgpack_sbuffer_init(&sbuf); msgpack_packer pk; - msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write); + msgpack_packer_init(&pk, &filtered_sbuf, msgpack_sbuffer_write); remove_unneeded_field(&map, "kubernetes",&pk,stream->entity->root_filter_count, stream->entity->filter_count); // Now, unpack the modified data into a new msgpack_object - msgpack_unpacked modified_unpacked; - msgpack_unpacked_init(&modified_unpacked); size_t modified_offset = 0; - if (msgpack_unpack_next(&modified_unpacked, sbuf.data, sbuf.size, &modified_offset)) { + if (msgpack_unpack_next(&modified_unpacked, filtered_sbuf.data, filtered_sbuf.size, &modified_offset)) { map = modified_unpacked.data; } - msgpack_sbuffer_destroy(&sbuf); - msgpack_unpacked_destroy(&modified_unpacked); } } @@ -1360,6 +1364,10 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, if (ret == 0) { i++; } + if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + msgpack_sbuffer_destroy(&filtered_sbuf); + msgpack_unpacked_destroy(&modified_unpacked); + } } msgpack_unpacked_destroy(&result); @@ -1375,6 +1383,10 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, error: msgpack_unpacked_destroy(&result); + if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + msgpack_sbuffer_destroy(&filtered_sbuf); + msgpack_unpacked_destroy(&modified_unpacked); + } return -1; } From a24118a00472bc2e780a09318d0cf7c04974ecde Mon Sep 17 00:00:00 2001 From: zhihonl <61301537+zhihonl@users.noreply.github.com> Date: Tue, 22 Oct 2024 13:37:27 -0400 Subject: [PATCH 3/3] House cleaning changes to address upstream merge blockers (#27) --- plugins/filter_aws/aws.c | 3 ++- plugins/filter_kubernetes/kube_meta.c | 23 ++++++++++++-------- plugins/filter_kubernetes/kube_meta.h | 4 ++++ plugins/filter_kubernetes/kubernetes.c | 9 ++++---- plugins/out_cloudwatch_logs/cloudwatch_api.c | 17 +++++++++------ 5 files changed, 35 insertions(+), 21 deletions(-) diff --git a/plugins/filter_aws/aws.c b/plugins/filter_aws/aws.c index 89ac94dacd8..6a7c6b5108a 100644 --- a/plugins/filter_aws/aws.c +++ b/plugins/filter_aws/aws.c @@ -751,7 +751,8 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_BOOL, "enable_entity", "false", 0, FLB_TRUE, offsetof(struct flb_filter_aws, enable_entity), - "Enable entity prefix for necessary fields" + "Enable entity prefix for fields used for constructing entity." + "This currently only affects instance ID" }, {0} }; diff --git a/plugins/filter_kubernetes/kube_meta.c b/plugins/filter_kubernetes/kube_meta.c index 94cfc711be8..663a2d48910 100644 --- a/plugins/filter_kubernetes/kube_meta.c +++ b/plugins/filter_kubernetes/kube_meta.c @@ -348,7 +348,8 @@ static int get_meta_file_info(struct flb_kube *ctx, const char *namespace, static int get_meta_info_from_request(struct flb_kube *ctx, struct flb_upstream *upstream, const char *namespace, - const char *resource, + const char *resource_type, + const char *resource_name, char **buffer, size_t *size, int *root_type, char* uri) @@ -389,9 +390,9 @@ static int get_meta_info_from_request(struct flb_kube *ctx, } ret = flb_http_do(c, &b_sent); - flb_plg_debug(ctx->ins, "Request (ns=%s, resource=%s) http_do=%i, " + flb_plg_debug(ctx->ins, "Request (ns=%s, %s=%s) http_do=%i, " "HTTP Status: %i", - namespace, resource, ret, c->resp.status); + namespace, resource_type, resource_name, ret, c->resp.status); if (ret != 0 || c->resp.status != 200) { if (c->resp.payload_size > 0) { @@ -441,7 +442,7 @@ static int get_pods_from_kubelet(struct flb_kube *ctx, } flb_plg_debug(ctx->ins, "Send out request to Kubelet for pods information."); - packed = get_meta_info_from_request(ctx, ctx->upstream, namespace, podname, + packed = get_meta_info_from_request(ctx, ctx->upstream, namespace, FLB_KUBE_POD, podname, &buf, &size, &root_type, uri); } @@ -482,10 +483,10 @@ static int get_api_server_configmap(struct flb_kube *ctx, flb_plg_debug(ctx->ins, "Send out request to API Server for configmap information"); if(ctx->use_kubelet) { - packed = get_meta_info_from_request(ctx,ctx->kubernetes_upstream, namespace, configmap, + packed = get_meta_info_from_request(ctx,ctx->kubernetes_upstream, namespace,FLB_KUBE_CONFIGMAP, configmap, &buf, &size, &root_type, uri); } else { - packed = get_meta_info_from_request(ctx,ctx->upstream, namespace, configmap, + packed = get_meta_info_from_request(ctx,ctx->upstream, namespace, FLB_KUBE_CONFIGMAP, configmap, &buf, &size, &root_type, uri); } } @@ -530,7 +531,7 @@ static int get_api_server_info(struct flb_kube *ctx, } flb_plg_debug(ctx->ins, "Send out request to API Server for pods information"); - packed = get_meta_info_from_request(ctx, ctx->upstream, namespace, podname, + packed = get_meta_info_from_request(ctx, ctx->upstream, namespace,FLB_KUBE_POD, podname, &buf, &size, &root_type, uri); } @@ -785,7 +786,9 @@ static void cb_results_workload(const char *name, const char *value, /* * Search workload based on the following priority - * where the top is highest priority + * where the top is highest priority. This is done + * to find the owner of the pod which helps with + * determining the upper-level management of the pod * 1. Deployment name * 2. StatefulSet name * 3. DaemonSet name @@ -1116,6 +1119,7 @@ static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx, msgpack_object ann_map; struct flb_kube_props props = {0}; struct service_attributes *tmp_service_attributes = {0}; + /* * - reg_buf: is a msgpack Map containing meta captured using Regex * @@ -1937,7 +1941,8 @@ int flb_kube_meta_get(struct flb_kube *ctx, return 0; } -int flb_kube_meta_release(struct flb_kube_meta *meta) { +int flb_kube_meta_release(struct flb_kube_meta *meta) +{ int r = 0; if (meta->namespace) { diff --git a/plugins/filter_kubernetes/kube_meta.h b/plugins/filter_kubernetes/kube_meta.h index f5cfc143877..1b4f3684e42 100644 --- a/plugins/filter_kubernetes/kube_meta.h +++ b/plugins/filter_kubernetes/kube_meta.h @@ -60,6 +60,10 @@ struct flb_kube_meta { #define FLB_KUBE_API_CONFIGMAP_FMT "/api/v1/namespaces/%s/configmaps/%s" #define FLB_KUBELET_PODS "/pods" +/* Constants for possible kubernetes resources */ +#define FLB_KUBE_POD "pod" +#define FLB_KUBE_CONFIGMAP "configmap" + int flb_kube_meta_init(struct flb_kube *ctx, struct flb_config *config); int flb_kube_meta_fetch(struct flb_kube *ctx); int flb_kube_dummy_meta_get(char **out_buf, size_t *out_size); diff --git a/plugins/filter_kubernetes/kubernetes.c b/plugins/filter_kubernetes/kubernetes.c index 54d2f6d85b4..b244eff1423 100644 --- a/plugins/filter_kubernetes/kubernetes.c +++ b/plugins/filter_kubernetes/kubernetes.c @@ -461,6 +461,7 @@ static int cb_kube_init(struct flb_filter_instance *f_ins, // Start the background thread if (pthread_create(&background_thread, NULL, update_pod_service_map, NULL) != 0) { flb_error("Failed to create background thread"); + background_thread = NULL; free(task_args); } } @@ -1167,7 +1168,7 @@ static struct flb_config_map config_map[] = { FLB_CONFIG_MAP_BOOL, "use_pod_association", "false", 0, FLB_TRUE, offsetof(struct flb_kube, use_pod_association), "use custom endpoint to get pod to service name mapping" - }, + }, /* * The host used for pod to service name association , default is 127.0.0.1 * Will only check when "use_pod_association" config is set to true @@ -1185,7 +1186,7 @@ static struct flb_config_map config_map[] = { FLB_CONFIG_MAP_STR, "pod_association_endpoint", "/kubernetes/pod-to-service-env-map", 0, FLB_TRUE, offsetof(struct flb_kube, pod_association_endpoint), "endpoint to connect with when performing pod to service name association" - }, + }, /* * The port for pod to service name association endpoint, default is 4311 * Will only check when "use_pod_association" config is set to true @@ -1241,13 +1242,13 @@ static struct flb_config_map config_map[] = { FLB_CONFIG_MAP_BOOL, "pod_association_host_tls_verify", "true", 0, FLB_TRUE, offsetof(struct flb_kube, pod_association_host_tls_verify), "enable or disable verification of TLS peer certificate" - }, + }, { FLB_CONFIG_MAP_STR, "set_platform", NULL, 0, FLB_TRUE, offsetof(struct flb_kube, set_platform), "Set the platform that kubernetes is in. Possible values are k8s and eks" "This should only be used for testing purpose" - }, + }, /* EOF */ {0} }; diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c index af59e8b398f..d351d0c64e3 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c @@ -360,7 +360,7 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf, } // If we are missing the service name, the entity will get rejected by the frontend anyway // so do not emit entity unless service name is filled - if(ctx->add_entity && stream->entity != NULL && stream->entity->key_attributes->name != NULL) { + if(ctx->add_entity && stream->entity != NULL && stream->entity->key_attributes != NULL && stream->entity->key_attributes->name != NULL) { if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, "\"entity\":{", 10)) { goto error; @@ -523,7 +523,10 @@ static int truncate_log(const struct flb_cloudwatch *ctx, const char *log_buffer return FLB_FALSE; } -// Helper function to remove keys from a nested map +/* + * Helper function to remove keys prefixed with aws_entity + * from a message pack map + */ void remove_key_from_nested_map(msgpack_object_map *nested_map, msgpack_packer *pk, int filtered_fields) { const int remaining_kv_pairs = nested_map->size - filtered_fields; @@ -545,7 +548,10 @@ void remove_key_from_nested_map(msgpack_object_map *nested_map, msgpack_packer * } } -// Main function to remove a key from a nested map inside the root map +/* + * Main function to remove keys prefixed with aws_entity + * from the root and nested message pack map + */ void remove_unneeded_field(msgpack_object *root_map, const char *nested_map_key, msgpack_packer *pk,int root_filtered_fields, int filtered_fields) { if (root_map->type == MSGPACK_OBJECT_MAP) { msgpack_object_map root = root_map->via.map; @@ -1137,11 +1143,8 @@ void update_or_create_entity(struct flb_cloudwatch *ctx, struct log_stream *stre stream->entity->service_name_found = 0; stream->entity->environment_found = 0; stream->entity->name_source_found = 0; - - parse_entity(ctx,stream->entity,map, map.via.map.size); - } else { - parse_entity(ctx,stream->entity,map, map.via.map.size); } + parse_entity(ctx,stream->entity,map, map.via.map.size); if (!stream->entity) { flb_plg_warn(ctx->ins, "Failed to generate entity"); }