forked from marcy-terui/jeffy
-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathhandler.py
80 lines (63 loc) · 1.85 KB
/
handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import json
import logging
import os
import uuid
from jeffy.framework import get_app
from jeffy.logging.handlers.firehose import KinesisFirehoseHandler
from jeffy.settings import Logging
from jeffy.sdk.kinesis import Kinesis
from jeffy.sdk.s3 import S3
from jeffy.sdk.sns import Sns
from jeffy.sdk.sqs import Sqs
import boto3
import requests
app = get_app(logging=Logging(handlers=[
logging.StreamHandler(),
KinesisFirehoseHandler(
stream_name=os.environ['FIREHOSE_NAME']
)]))
@app.handlers.common()
def start_test(event, context):
requests.post(
os.environ['API_URL'],
data=json.dumps({'foo': 'bar'}),
headers={'content-type': 'application/json'})
Sns().publish(
topic_arn=os.environ['TOPIC_ARN'],
subject='foo',
message='bar')
Kinesis().put_record(
stream_name=os.environ['STREAM_NAME'],
data={'foo': 'bar'},
partition_key='partition_key')
Sqs().send_message(
queue_url=os.environ['QUEUE_URL'],
message='hello world')
S3().upload_file(
file_path='logo.png',
bucket_name=os.environ['BUCKET_NAME'],
key='logo.png')
boto3.resource('dynamodb').Table(os.environ['TABLE_NAME']).put_item(Item={'id': str(uuid.uuid4())})
return 'ok'
@app.handlers.rest_api()
def rest_api_test(event, context):
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({'result': 'ok.'})}
@app.handlers.sqs()
def sqs_test(event, context):
return event
@app.handlers.sns()
def sns_test(event, context):
return event
@app.handlers.kinesis_streams()
def kinesis_test(event, context):
return event
@app.handlers.dynamodb_streams()
def dynamodb_test(event, context):
return event
@app.handlers.s3()
def s3_test(event, context):
del event['body']
return event