Skip to content

Commit

Permalink
normalization clean up logging
Browse files Browse the repository at this point in the history
  • Loading branch information
geeli123 committed Nov 20, 2024
1 parent 613f106 commit a3eeb31
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 8 deletions.
2 changes: 0 additions & 2 deletions src/normalize/norm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@


def check_config(config: dict):
print(config)
assert config["s3_bucket"]["uri"]
assert config["s3_bucket"]["public_key"]
assert config["s3_bucket"]["secret_key"]
Expand All @@ -24,7 +23,6 @@ def get_last_processed_timestamp(s3, bucket, key):
try:
response = s3.get_object(Bucket=bucket, Key=key)
last_processed_epoch_timestamp = float(json.loads(response["Body"].read())["last_processed"])
print(last_processed_epoch_timestamp)
return dt.datetime.fromtimestamp(
last_processed_epoch_timestamp, dt.timezone.utc
)
Expand Down
13 changes: 7 additions & 6 deletions src/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ def parse_files(s3, s3_fs, source_prefix, destination_prefix, start_date, state_

cur_time = pytz.UTC.localize(dt.datetime.utcnow())
cur_processing = last_processed

global_data_written = False
while cur_processing <= cur_time:
date_partition = os.path.join(
source_key,
Expand Down Expand Up @@ -182,20 +184,19 @@ def parse_files(s3, s3_fs, source_prefix, destination_prefix, start_date, state_
)
write_data(s3_fs, alerts_pa, s3_uri)
new_data_written = True

# Update the last processed timestamp
if max_epoch_timestamp == last_processed:
LOGGER.warning(
f"No new data found in partition: {date_partition} - is this expected?"
)
if new_data_written:
global_data_written = True
LOGGER.info(
f"Updating last processed timestamp to "
f"maximum file timestamp: {dt.datetime.utcfromtimestamp(max_epoch_timestamp).isoformat()}"
)
update_last_processed_timestamp(s3, state_bucket, state_key, max_epoch_timestamp)
cur_processing = (cur_processing + dt.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)

if not global_data_written:
LOGGER.warning(
f"No new data written - is this expected?"
)

@click.command()
@click.option("-f", "--feed_id", required=True, type=str, help="feed ID to be scraped")
Expand Down

0 comments on commit a3eeb31

Please sign in to comment.