-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
81 lines (57 loc) · 2.48 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import os
import sys
import time
import requests
from flask import Flask, request
from cloudevents.http import from_http, CloudEvent, to_structured
from keptn import Keptn, start_polling
app = Flask(__name__)
# Port on which to listen for cloudevents
PORT = os.getenv('RCV_PORT', '8080')
# Path to which cloudevents are sent
PATH = os.getenv('RCV_PATH', '/')
@app.route(PATH, methods=["POST"])
def gotevent():
# create a CloudEvent
event = from_http(request.headers, request.get_data())
keptn = Keptn(event)
keptn.handle_cloud_event()
return "", 204
def deployment_triggered(keptn: Keptn, shkeptncontext: str, event, data):
print("In deployment_triggered:")
print(" ", shkeptncontext)
print(" ", event)
print(" ", data)
keptn.send_task_started_cloudevent(message="Deployment Started")
# keptn add-resource --project=XYZ --resource=project-resource.txt
project_resource = keptn.get_project_resource('project-resource.txt')
print("project_resource=", project_resource)
# keptn add-resource --project=XYZ --stage=STAGE --resource=stage-resource.txt
stage_resource = keptn.get_stage_resource('stage-resource.txt')
print("stage_resource=", stage_resource)
# keptn add-resource --project=XYZ --stage=STAGE --service=SERVICE --resource=service-resource.txt
service_resource = keptn.get_service_resource('service-resource.txt')
print("service_resource=", service_resource)
time.sleep(5)
keptn.send_task_finished_cloudevent(message="Deployment finished")
# register handler
Keptn.on('deployment.triggered', deployment_triggered)
if __name__ == "__main__":
if "KEPTN_API_TOKEN" in os.environ and "KEPTN_ENDPOINT" in os.environ and os.environ["KEPTN_API_TOKEN"] and os.environ["KEPTN_ENDPOINT"]:
print("Found environment variables KEPTN_ENDPOINT and KEPTN_API_TOKEN, polling events from API")
thread = start_polling(os.environ["KEPTN_ENDPOINT"], os.environ["KEPTN_API_TOKEN"])
if not thread:
print("ERROR: Failed to start polling thread, exiting")
sys.exit(1)
print("Exit using CTRL-C")
# wait til exit (e.g., using CTRL C)
while thread.is_alive():
try:
thread.join(0.5)
except KeyboardInterrupt:
print("Exiting...")
sys.exit(0)
else:
# run flask app with HTTP endpoint
print("Running on port", PORT, "on path", PATH)
app.run(host='0.0.0.0', port=PORT)