Skip to content

Commit

Permalink
refactored cloudwatch output plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
nathalapooja committed Dec 27, 2024
1 parent 96814cb commit 4781687
Showing 1 changed file with 5 additions and 15 deletions.
20 changes: 5 additions & 15 deletions plugins/out_cloudwatch_logs/cloudwatch_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ static int entity_add_key_attributes(struct flb_cloudwatch *ctx, struct cw_flush
}

static int entity_add_resource_key_attributes(struct flb_cloudwatch *ctx, struct cw_flush *buf, struct log_stream *stream, int *offset) {
flb_plg_info(ctx->ins, "entity_add_resource_key_attributes is called");
char ts[KEY_ATTRIBUTES_MAX_LEN];
if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
"\"keyAttributes\":{",0)) {
Expand All @@ -250,9 +249,7 @@ static int entity_add_resource_key_attributes(struct flb_cloudwatch *ctx, struct
goto error;
}
if(stream->entity->key_attributes->platform != NULL && strlen(stream->entity->key_attributes->platform) != 0) {
flb_plg_info(ctx->ins, "stream entity resource platform %s", stream->entity->key_attributes->platform);
if (strncmp(stream->entity->key_attributes->platform, EKS_PLATFORM, 3) == 0) {
flb_plg_info(ctx->ins, "setting platform to eks %s", stream->entity->key_attributes->platform);
if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s","\"ResourceType\":\"","AWS::EKS::Cluster","\"")) {
goto error;
}
Expand Down Expand Up @@ -456,7 +453,6 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf,
"\"logEvents\":[", 13)) {
goto error;
}
flb_plg_info(ctx->ins, "entity %s", buf->out_buf);

return 0;

Expand Down Expand Up @@ -811,7 +807,7 @@ int send_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf) {
return -1;
}

flb_plg_info(ctx->ins, "cloudwatch:PutLogEvents: events=%d, payload=%d bytes", i, offset);
flb_plg_debug(ctx->ins, "cloudwatch:PutLogEvents: events=%d, payload=%d bytes", i, offset);
ret = put_log_events(ctx, buf, buf->current_stream, (size_t) offset);
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to send log events");
Expand Down Expand Up @@ -1190,7 +1186,6 @@ void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map
flb_free(entity->key_attributes->platform);
}
entity->key_attributes->platform = flb_strndup(val.via.str.ptr, val.via.str.size);
flb_plg_info(ctx->ins,"entity platform is added %s", entity->key_attributes->platform);
}
if(strncmp(key.via.str.ptr, "aws_entity_cluster",key.via.str.size ) == 0 ) {
if(entity->key_attributes->cluster_name == NULL) {
Expand All @@ -1199,7 +1194,6 @@ void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map
flb_free(entity->key_attributes->cluster_name);
}
entity->key_attributes->cluster_name = flb_strndup(val.via.str.ptr, val.via.str.size);
flb_plg_info(ctx->ins,"entity cluster name is added %s", entity->key_attributes->cluster_name);
}
}
if (strncmp(ctx->entity_type, FLB_FILTER_ENTITY_TYPE_SERVICE, FLB_FILTER_ENTITY_TYPE_SERVICE_LEN) == 0) {
Expand Down Expand Up @@ -1334,9 +1328,6 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
update_or_create_entity(ctx,stream,map);
// Prepare a buffer to pack the modified map
if(stream->entity != NULL && (stream->entity->root_filter_count > 0 || stream->entity->filter_count > 0)) {
if (strncmp(ctx->entity_type, FLB_FILTER_ENTITY_TYPE_RESOURCE, FLB_FILTER_ENTITY_TYPE_RESOURCE_LEN) == 0) {
flb_plg_info(ctx->ins, "stream->entity->root_filter_count %d", stream->entity->root_filter_count);
}
msgpack_packer pk;
msgpack_packer_init(&pk, &filtered_sbuf, msgpack_sbuffer_write);
remove_unneeded_field(&map, "kubernetes",&pk,stream->entity->root_filter_count, stream->entity->filter_count);
Expand Down Expand Up @@ -1896,8 +1887,7 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
int num_headers = 1;
int retry = FLB_TRUE;

flb_plg_info(ctx->ins, "Sending log events to log stream %s", stream->name);
flb_plg_info(ctx->ins, "data buf %s", buf->out_buf);
flb_plg_debug(ctx->ins, "Sending log events to log stream %s", stream->name);

/* stream is being used, update expiration */
stream->expiration = time(NULL) + FOUR_HOURS_IN_SECONDS;
Expand All @@ -1920,9 +1910,9 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
}

if (c) {
flb_plg_info(ctx->ins, "PutLogEvents http status=%d", c->resp.status);
flb_plg_info(ctx->ins, "PutLogEvents http data=%s", c->resp.data);
flb_plg_info(ctx->ins, "PutLogEvents http payload=%s", c->resp.payload);
flb_plg_debug(ctx->ins, "PutLogEvents http status=%d", c->resp.status);
flb_plg_debug(ctx->ins, "PutLogEvents http data=%s", c->resp.data);
flb_plg_debug(ctx->ins, "PutLogEvents http payload=%s", c->resp.payload);

if (c->resp.status == 200) {
if (c->resp.data == NULL || c->resp.data_len == 0 || strstr(c->resp.data, AMZN_REQUEST_ID_HEADER) == NULL) {
Expand Down

0 comments on commit 4781687

Please sign in to comment.