-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfunc.py
118 lines (97 loc) · 3.92 KB
/
func.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import io
import os
import json
import logging
import sys
import requests
from typing import Dict
from mint import MintMetric
from summary_stat import SummaryStat
from metric_mapping import namespace_map
def process_metrics(body: Dict):
logging.getLogger().info(f"process_metrics: {body}")
namespace = body.get("namespace")
metric_name = body.get("name")
oci_dimensions: Dict[str, str] = body.get("dimensions", {})
# Include the resourceGroup and compartmentId in oci_dimensions so they can be mapped using the dimension_mapping
oci_dimensions["resourceGroup"] = body.get("resourceGroup")
oci_dimensions["compartmentId"] = body.get("compartmentId")
datapoints = body.get("datapoints")
metric_map = namespace_map.get(namespace)
if metric_map is None:
logging.getLogger().error(
f"Could not find a metric mapping for namespace '{namespace}'"
)
return
import_all_metrics = bool(os.environ["IMPORT_ALL_METRICS"])
logging.getLogger().info(f"import_all_metrics: {import_all_metrics}")
value_or_none = metric_map.value_from_oci_metric_name(
metric_name, oci_dimensions, datapoints
)
if value_or_none is None:
if import_all_metrics:
key_namespace = namespace.replace("oci_", "")
key = f"cloud.oci.{key_namespace}.{metric_name}"
min_value = 0
max_value = 0
sum_value = 0
timestamp = sys.maxsize
for datapoint in datapoints:
timestamp = min(timestamp, datapoint.get("timestamp"))
value = float(datapoint.get("value"))
min_value = min(min_value, value)
max_value = min(max_value, value)
sum_value += value
mint_metric = MintMetric(
key,
SummaryStat(min_value, max_value, sum_value, len(datapoints)),
{
"oci.resource_group": oci_dimensions.get("resourceGroup"),
"oci.compartment_id": oci_dimensions.get("compartmentId"),
},
timestamp,
)
logging.getLogger().info(f"mint_metric: {mint_metric}")
push_metrics_to_dynatrace(mint_metric)
else:
logging.getLogger().debug(
f"Could not find a mapping for metric '{metric_name}' in namespace '{namespace}'"
)
return
dynatrace_metric_key, result = value_or_none
dimensions = metric_map.dimensions(oci_dimensions)
mint_metric = MintMetric(
dynatrace_metric_key, result.value, dimensions, result.timestamp
)
logging.getLogger().info(f"process_metrics: Mint Metric: {mint_metric}")
push_metrics_to_dynatrace(mint_metric)
METRIC_INGEST_ENDPOINT = "/api/v2/metrics/ingest"
def push_metrics_to_dynatrace(mint_metric: MintMetric):
try:
dynatrace_api_key = os.environ["DYNATRACE_API_KEY"]
tenant_url = os.environ["DYNATRACE_TENANT"]
# Remove the trailing slash if it exits
if tenant_url.endswith("/"):
tenant_url = tenant_url[:-1]
# Append the log ingest endpoint to tenant url
tenant_url = f"{tenant_url}{METRIC_INGEST_ENDPOINT}"
headers = {
"Content-Type": "text/plain",
"Authorization": f"Api-Token {dynatrace_api_key}",
}
response = requests.post(tenant_url, data=str(mint_metric), headers=headers)
logging.getLogger().info(response.text)
except (Exception, ValueError) as ex:
logging.getLogger().error(str(ex))
def handler(ctx, data: io.BytesIO = None):
try:
body = json.loads(data.getvalue())
if isinstance(body, list):
# Batch of CloudEvents format
for b in body:
process_metrics(b)
else:
# Single CloudEvent
process_metrics(body)
except (Exception, ValueError) as ex:
logging.getLogger().error(str(ex))