-
Notifications
You must be signed in to change notification settings - Fork 1
/
queue_firehose_s3_backups.py
41 lines (35 loc) · 1.3 KB
/
queue_firehose_s3_backups.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
"""Recovers Kinesis S3 source record backup files, queueing each file to be proessed by recover_firehose_s3_backup.py"""
import boto3
import json
def main(event, context):
s3 = boto3.client('s3')
sqs = boto3.client('sqs')
if 'prefix' in event:
listing = s3.list_objects_v2(Bucket=event['bucket'], Prefix=event['prefix'])
else:
listing = s3.list_objects_v2(Bucket=event['bucket'])
if listing['IsTruncated']:
print("WARNING: S3 bucket listing was truncated")
messages = []
bucket = listing['Name']
for obj in listing['Contents']:
if not obj['Key'].endswith('/'):
message = {
'Id': str(len(messages) + 1),
'MessageBody': json.dumps({
'bucket': bucket,
'item': obj['Key'],
'kinesis_stream': event['kinesis_stream']
}),
'MessageGroupId': event['bucket']
}
messages.append(message)
if len(messages) == 10:
sqs.send_message_batch(
QueueUrl=event['queue_url'],
Entries=messages)
messages = []
if len(messages) > 0:
sqs.send_message_batch(
QueueUrl=event['queue_url'],
Entries=messages)