Skip to content

Commit

Permalink
refactor(s3): Prefer s3 for checking file existence
Browse files Browse the repository at this point in the history
Because it doesn't trigger a AccessDeniedException when not found

Signed-off-by: Vincent Boutour <[email protected]>
  • Loading branch information
ViBiOh committed Aug 7, 2024
1 parent 817ce8d commit 86ed42c
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 37 deletions.
39 changes: 25 additions & 14 deletions aws/logs_monitoring/caching/base_tags_cache.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import os
import logging
import json
from time import time
import logging
import os
from random import randint
from time import time

import boto3
from botocore.exceptions import ClientError

from caching.common import get_last_modified_time
from settings import (
DD_S3_BUCKET_NAME,
DD_TAGS_CACHE_TTL_SECONDS,
DD_S3_CACHE_LOCK_TTL_SECONDS,
DD_TAGS_CACHE_TTL_SECONDS,
)
from caching.common import get_last_modified_time
from telemetry import send_forwarder_internal_metrics

JITTER_MIN = 1
Expand Down Expand Up @@ -61,23 +63,32 @@ def write_cache_to_s3(self, data):
self.logger.debug("Unable to write new cache to S3", exc_info=True)

def acquire_s3_cache_lock(self):
key = self.get_cache_lock_with_prefix()

"""Acquire cache lock"""
cache_lock_object = self.s3_client.Object(
DD_S3_BUCKET_NAME, self.get_cache_lock_with_prefix()
)
try:
file_content = cache_lock_object.get()
response = self.s3_client.list_objects_v2(
Bucket=DD_S3_BUCKET_NAME, Prefix=key
)
for content in response.get("Contents", []):
if content["Key"] != key:
continue

# check lock file expiration
last_modified_unix_time = convert_last_modified_time(
content["LastModified"]
)
if last_modified_unix_time + DD_S3_CACHE_LOCK_TTL_SECONDS >= time():
return False

# check lock file expiration
last_modified_unix_time = get_last_modified_time(file_content)
if last_modified_unix_time + DD_S3_CACHE_LOCK_TTL_SECONDS >= time():
return False
except Exception:
self.logger.debug("Unable to get cache lock file")

# lock file doesn't exist, create file to acquire lock
try:
cache_lock_object.put(Body=(bytes("lock".encode("UTF-8"))))
self.s3_client.Object(DD_S3_BUCKET_NAME, key).put(
Body=(bytes("lock".encode("UTF-8")))
)
send_forwarder_internal_metrics("s3_cache_lock_acquired")
self.logger.debug("S3 cache lock acquired")
except ClientError:
Expand Down
1 change: 1 addition & 0 deletions aws/logs_monitoring/caching/cloudwatch_log_group_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import boto3
from botocore.config import Config

from caching.common import sanitize_aws_tag_string
from settings import (
DD_S3_BUCKET_NAME,
Expand Down
9 changes: 6 additions & 3 deletions aws/logs_monitoring/caching/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import datetime
import logging
import os
import re
from collections import defaultdict

Expand All @@ -19,8 +19,11 @@ def get_last_modified_time(s3_file):
last_modified_date = datetime.datetime.strptime(
last_modified_str, "%a, %d %b %Y %H:%M:%S %Z"
)
last_modified_unix_time = int(last_modified_date.strftime("%s"))
return last_modified_unix_time
return convert_last_modified_time(last_modified_date)


def convert_last_modified_time(last_modified_time):
return int(last_modified_time.strftime("%s"))


def parse_get_resources_response_for_tags_by_arn(get_resources_page):
Expand Down
23 changes: 12 additions & 11 deletions aws/logs_monitoring/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,30 @@
# Copyright 2021 Datadog, Inc.

import json
import logging
import os
from hashlib import sha1

import boto3
import logging
import requests
from hashlib import sha1
from datadog_lambda.wrapper import datadog_lambda_wrapper
from datadog import api
from enhanced_lambda_metrics import parse_and_submit_enhanced_metrics
from steps.parsing import parse
from steps.enrichment import enrich
from steps.transformation import transform
from steps.splitting import split
from datadog_lambda.wrapper import datadog_lambda_wrapper

from caching.cache_layer import CacheLayer
from enhanced_lambda_metrics import parse_and_submit_enhanced_metrics
from forwarder import Forwarder
from settings import (
DD_ADDITIONAL_TARGET_LAMBDAS,
DD_API_KEY,
DD_SKIP_SSL_VALIDATION,
DD_API_URL,
DD_FORWARDER_VERSION,
DD_ADDITIONAL_TARGET_LAMBDAS,
DD_RETRY_KEYWORD,
DD_SKIP_SSL_VALIDATION,
)

from steps.enrichment import enrich
from steps.parsing import parse
from steps.splitting import split
from steps.transformation import transform

logger = logging.getLogger()
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))
Expand Down
11 changes: 2 additions & 9 deletions aws/logs_monitoring/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -779,8 +779,9 @@ Resources:
- Fn::Sub: "${DdApiKeySecretArn}*"
Effect: Allow
# Fetch Lambda resource tags for data enrichment
# Fetch Step Functions resource tags for data enrichment
- Fn::If:
- SetDdFetchLambdaTags
- Fn::Or : [SetDdFetchLambdaTags, SetDdFetchStepFunctionsTags]
- Action:
- tag:GetResources
Resource: "*"
Expand All @@ -794,14 +795,6 @@ Resources:
Resource: "*"
Effect: Allow
- Ref: AWS::NoValue
# Fetch Step Functions resource tags for data enrichment
- Fn::If:
- SetDdFetchStepFunctionsTags
- Action:
- tag:GetResources
Resource: "*"
Effect: Allow
- Ref: AWS::NoValue
# Required for Lambda deployed in VPC
- Fn::If:
- UseVPC
Expand Down

0 comments on commit 86ed42c

Please sign in to comment.