-
Notifications
You must be signed in to change notification settings - Fork 8
/
test.py
67 lines (53 loc) · 1.69 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import json
import os
import boto3
import logging
from boto3.dynamodb.conditions import Key, Attr
from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
logging.basicConfig(level=logging.INFO)
client = boto3.client('dynamodb')
def dynamodb_deserializer_to_json(item):
d_item = json.loads(item)
return d_item
def get_item_cust_profile(dynamo_table, cust_id):
print(cust_id)
cust_profile = client.get_item(
TableName=dynamo_table,
Key={
'CustID': {
'S': cust_id
}
})
return cust_profile
def put_item_cust_profile_table(dynamo_table, cust_profile):
response = client.put_item(
TableName='dynamo_table',
Item=cust_profile)
return response
def handler(event, context):
region = os.environ['AWS_REGION']
records = event["Records"]
dynamodb = boto3.resource('dynamodb', region_name=region)
dynamo_table = dynamodb.Table('CustProfile')
for record in records:
if record["eventName"] == "INSERT":
item = dynamodb_deserializer_to_json(record["dynamodb"]["Keys"])
cust_profile = get_item_cust_profile(dynamo_table, item["CustID"])
print(cust_profile)
if cust_profile:
profile = cust_profile
profile["TotalTnValue"] = int(profile["TotalTnValue"]) + int(item["TnValue"])
else:
profile = dict()
profile["CustID"] = item["CustID"]
profile["TotalTnValue"] = item["TnValue"]
if profile["TotalTnValue"] < 1000:
cust_status = "Normal"
else:
cust_status = "Elite"
profile["CustStatus"] = cust_status
response = put_item_cust_profile_table(dynamo_table, profile)
return {
'statusCode': 200,
'body': json.dumps("Success")
}