Skip to content

Commit

Permalink
subminute support
Browse files Browse the repository at this point in the history
  • Loading branch information
yimuniao committed Jul 24, 2017
1 parent d17013a commit 68dae62
Show file tree
Hide file tree
Showing 22 changed files with 326 additions and 87 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ The default location of the configuration file used by collectd-cloudwatch plugi
* __host__ - Manual override for EC2 Instance ID and Host information propagated by collectd
* __proxy_server_name__ - Manual override for proxy server name, used by plugin to connect aws cloudwatch at *.amazonaws.com.
* __proxy_server_port__ - Manual override for proxy server port, used by plugin to connect aws cloudwatch at *.amazonaws.com.
* __enable_high_definition_metrics__ - The storage resolution is for high resolution support
* __flush_interval_in_seconds__ - The flush_interval_in_seconds is used for flush interval, it means how long plugin should flush the metrics to Cloudwatch
* __whitelist_pass_through__ - Used to enable potentially unsafe regular expressions. By default regex such as a line containing `.*` or `.+` only is automatically disabled in the whitelist configuration.
Setting this value to True may result in a large number of metrics being published. Before changing this parameter, read [pricing information](https://aws.amazon.com/cloudwatch/pricing/) to understand how to estimate your bill.
* __push_asg__ - Used to include the Auto-Scaling Group as a dimension for all metrics (see `Adding additional dimensions to metrics` below for details)
Expand All @@ -39,6 +41,8 @@ push_asg = False
push_constant = True
constant_dimension_value = "ALL"
debug = False
enable_high_definition_metrics = False
flush_interval_in_seconds = 60
```


Expand Down
8 changes: 7 additions & 1 deletion src/cloudwatch/config/plugin.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ whitelist_pass_through = False
# The debug parameter enables verbose logging of published metrics
debug = False


# Wheter or not to push the ASG as part of the dimension.
# WARNING: ENABLING THIS WILL LEAD TO CREATING A LARGE NUMBER OF METRICS.
push_asg = False
Expand All @@ -32,3 +31,10 @@ constant_dimension_value = "ALL"

# The proxy_server_port is used for connecting *.amazonaws.com, to put metrics.
#proxy_server_port = 8080

# The storage resolution is for high resolution support, it shows the resolution of storage, the unit is second
#enable_high_definition_metrics = False

# The flush_interval_in_seconds is used for flush interval, it means how long plugin should flush the metrics to Cloudwatch, the unit here is second
#flush_interval_in_seconds = 60

7 changes: 3 additions & 4 deletions src/cloudwatch/modules/client/baserequestbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class BaseRequestBuilder(object):
_ALGORITHM = "AWS4-HMAC-SHA256"
_V4_TERMINATOR = "aws4_request"

def __init__(self, credentials, region, service, action, api_version):
def __init__(self, credentials, region, service, action, api_version, enable_high_definition_metrics=False):
self.credentials = credentials
self.region = region
self.datestamp = None
Expand All @@ -26,10 +26,9 @@ def __init__(self, credentials, region, service, action, api_version):
self.api_version = api_version
self.aws_timestamp = None
self.payload = "" # for HTTP GET payload is always empty
self.querystring_builder = QuerystringBuilder()
self.querystring_builder = QuerystringBuilder(enable_high_definition_metrics)
self.signer = Signer(credentials, region, self.service, self._ALGORITHM)



def _init_timestamps(self):
""" Initializes timestamp and datestamp values """
self.datestamp = get_datestamp()
Expand Down
56 changes: 38 additions & 18 deletions src/cloudwatch/modules/client/putclient.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import re
import os

from ..plugininfo import PLUGIN_NAME, PLUGIN_VERSION
from requestbuilder import RequestBuilder
from ..logger.logger import get_logger
from requests.adapters import HTTPAdapter
from requests.sessions import Session
from tempfile import gettempdir


class PutClient(object):
Expand All @@ -24,20 +26,36 @@ class PutClient(object):
_DEFAULT_CONNECTION_TIMEOUT = 1
_DEFAULT_RESPONSE_TIMEOUT = 3
_TOTAL_RETRIES = 1
_LOG_FILE_MAX_SIZE = 10*1024*1024

def __init__(self, config_helper, connection_timeout=_DEFAULT_CONNECTION_TIMEOUT, response_timeout=_DEFAULT_RESPONSE_TIMEOUT):
self.request_builder = RequestBuilder(config_helper.credentials, config_helper.region)
self.request_builder = RequestBuilder(config_helper.credentials, config_helper.region, config_helper.enable_high_definition_metrics)
self._validate_and_set_endpoint(config_helper.endpoint)
self.timeout = (connection_timeout, response_timeout)
self.proxy_server_name = config_helper.proxy_server_name
self.proxy_server_port = config_helper.proxy_server_port
if (self.proxy_server_name != None):
self._LOGGER.info("Using proxy server: " + self.proxy_server_name)
if (self.proxy_server_port != None ):
self._LOGGER.info("Using proxy server port: " + self.proxy_server_port)
else:
self._LOGGER.info("No proxy server is in use")

self.debug = config_helper.debug
self.config = config_helper
self._prepare_session()
if config_helper.debug:
if config_helper.proxy_server_name is not None:
self._LOGGER.info("Using proxy server: " + config_helper.proxy_server_name)
if config_helper.proxy_server_port is not None:
self._LOGGER.info("Using proxy server port: " + config_helper.proxy_server_port)
else:
self._LOGGER.info("No proxy server is in use")

def _prepare_session(self):
self.session = Session()
if self.proxy_server_name is not None:
proxy_server = self.proxy_server_name
if self.proxy_server_port is not None:
proxy_server = proxy_server + ":" + self.proxy_server_port
proxies = {'https': proxy_server}
self.session.proxies.update(proxies)
self.session.mount("http://", HTTPAdapter(max_retries=self._TOTAL_RETRIES))
self.session.mount("https://", HTTPAdapter(max_retries=self._TOTAL_RETRIES))

def _validate_and_set_endpoint(self, endpoint):
pattern = re.compile("http[s]?://*/")
if pattern.match(endpoint) or "localhost" in endpoint:
Expand All @@ -56,6 +74,9 @@ def put_metric_data(self, namespace, metric_list):

if not self._is_namespace_consistent(namespace, metric_list):
raise ValueError("Metric list contains metrics with namespace different than the one passed as argument.")
credentials = self.config.credentials
self.request_builder.credentials = credentials
self.request_builder.signer.credentials = credentials
request = self.request_builder.create_signed_request(namespace, metric_list)
try:
self._run_request(request)
Expand All @@ -77,16 +98,15 @@ def _run_request(self, request):
"""
Executes HTTP GET request with timeout using the endpoint defined upon client creation.
"""
session = Session()
if (self.proxy_server_name != None):
proxy_server = self.proxy_server_name
if (self.proxy_server_port != None):
proxy_server = proxy_server +":"+self.proxy_server_port
proxies = {'https': proxy_server }
session.proxies.update(proxies)
session.mount("http://", HTTPAdapter(max_retries=self._TOTAL_RETRIES))
session.mount("https://", HTTPAdapter(max_retries=self._TOTAL_RETRIES))
result = session.get(self.endpoint + "?" + request, headers=self._get_custom_headers(), timeout=self.timeout)
if self.debug:
file_path = gettempdir() + "/collectd_plugin_request_trace_log"
if os.path.isfile(file_path) and os.path.getsize(file_path) > self._LOG_FILE_MAX_SIZE:
os.remove(file_path)
with open(file_path, "a") as logfile:
logfile.write("curl -i -v -connect-timeout 1 -m 3 -w %{http_code}:%{http_connect}:%{content_type}:%{time_namelookup}:%{time_redirect}:%{time_pretransfer}:%{time_connect}:%{time_starttransfer}:%{time_total}:%{speed_download} -A \"collectd/1.0\" \'" + self.endpoint + "?" + request + "\'")
logfile.write("\n\n")

result = self.session.get(self.endpoint + "?" + request, headers=self._get_custom_headers(), timeout=self.timeout)
result.raise_for_status()
return result

Expand Down
6 changes: 6 additions & 0 deletions src/cloudwatch/modules/client/querystringbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class QuerystringBuilder(object):
_STAT_MIN = _STATISTICS_KEY + "Minimum"
_STAT_SUM = _STATISTICS_KEY + "Sum"
_STAT_SAMPLE = _STATISTICS_KEY + "SampleCount"
_STORAGE_RESOLUTION = "StorageResolution"

def __init__(self, enable_high_definition_metrics=False):
self.enable_high_definition_metrics = enable_high_definition_metrics

def build_querystring(self, metric_list, request_map):
"""
Expand Down Expand Up @@ -52,6 +56,8 @@ def _build_metric_map(self, metric_list):
metric_prefix = self._METRIC_PREFIX + str(metric_index) + "."
metric_map[metric_prefix + self._METRIC_NAME_KEY] = metric.metric_name
metric_map[metric_prefix + self._TIMESTAMP_KEY] = metric.timestamp
if self.enable_high_definition_metrics:
metric_map[metric_prefix + self._STORAGE_RESOLUTION] = "1"
self._add_dimensions(metric, metric_map, metric_prefix)
self._add_values(metric, metric_map, metric_prefix)
metric_index += 1
Expand Down
4 changes: 2 additions & 2 deletions src/cloudwatch/modules/client/requestbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ class RequestBuilder(BaseRequestBuilder):
_ACTION = "PutMetricData"
_API_VERSION = "2010-08-01"

def __init__(self, credentials, region):
super(self.__class__, self).__init__(credentials, region, self._SERVICE, self._ACTION, self._API_VERSION)
def __init__(self, credentials, region, enable_high_definition_metrics):
super(self.__class__, self).__init__(credentials, region, self._SERVICE, self._ACTION, self._API_VERSION, enable_high_definition_metrics)
self.namespace = ""

def create_signed_request(self, namespace, metric_list):
Expand Down
15 changes: 15 additions & 0 deletions src/cloudwatch/modules/configuration/confighelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def __init__(self, config_path=_DEFAULT_CONFIG_PATH, metadata_server=_METADATA_S
self.push_asg = False
self.push_constant = False
self.constant_dimension_value = ''
self.enable_high_definition_metrics = False
self.flush_interval_in_seconds = ''
self._load_configuration()
self.whitelist = Whitelist(WhitelistConfigReader(self.WHITELIST_CONFIG_PATH, self.pass_through).get_regex_list(), self.BLOCKED_METRIC_PATH)

Expand Down Expand Up @@ -77,6 +79,8 @@ def _load_configuration(self):
self._load_hostname()
self._load_proxy_server_name()
self._load_proxy_server_port()
self.enable_high_definition_metrics = self.config_reader.enable_high_definition_metrics
self._load_flush_interval_in_seconds()
self._set_endpoint()
self._set_ec2_endpoint()
self._load_autoscaling_group()
Expand Down Expand Up @@ -163,6 +167,17 @@ def _load_proxy_server_port(self):
else:
self.proxy_server_port = None

def _load_flush_interval_in_seconds(self):
"""
Load flush_interval_in_seconds from the configuration file, if configuration file does not contain flush_interval_in_seconds entry, or the values is not in (1,60)
then set flush_interval_in_seconds to '60'.
"""
if self.config_reader.flush_interval_in_seconds in [str(x) for x in range(1, 61)]:
self.flush_interval_in_seconds = self.config_reader.flush_interval_in_seconds
else:
self.flush_interval_in_seconds = "60"
self._LOGGER.warning("flush_interval_in_seconds in configuration is invalid: " + str(self.config_reader.flush_interval_in_seconds) + " use the default value: " + self.flush_interval_in_seconds)

def _set_endpoint(self):
""" Creates endpoint from region information """
if self.region is "localhost":
Expand Down
9 changes: 9 additions & 0 deletions src/cloudwatch/modules/configuration/configreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class ConfigReader(object):

_LOGGER = get_logger(__name__)
_DEBUG_DEFAULT_VALUE = False
_ENABLE_HIGH_DEFINITION_METRICS_DEFAULT_VALUE = False
_PASS_THROUGH_DEFAULT_VALUE = False
_PUSH_ASG_DEFAULT_VALUE = False
_PUSH_CONSTANT_DEFAULT_VALUE = False
Expand All @@ -36,6 +37,8 @@ class ConfigReader(object):
CONSTANT_DIMENSION_KEY = "constant_dimension_value"
PROXY_SERVER_NAME_KEY = "proxy_server_name"
PROXY_SERVER_PORT_KEY = "proxy_server_port"
ENABLE_HIGH_DEFINITION_METRICS = "enable_high_definition_metrics"
FLUSH_INTERVAL_IN_SECONDS = "flush_interval_in_seconds"

def __init__(self, config_path):
self.config_path = config_path
Expand All @@ -47,6 +50,10 @@ def __init__(self, config_path):
self.push_asg = self._PUSH_ASG_DEFAULT_VALUE
self.push_constant = self._PUSH_CONSTANT_DEFAULT_VALUE
self.constant_dimension_value = ''
self.proxy_server_name=''
self.proxy_server_port = ''
self.enable_high_definition_metrics = self._ENABLE_HIGH_DEFINITION_METRICS_DEFAULT_VALUE
self.flush_interval_in_seconds = ''
try:
self.reader_utils = ReaderUtils(config_path)
self._parse_config_file()
Expand All @@ -64,6 +71,8 @@ def _parse_config_file(self):
self.region = self.reader_utils.get_string(self.REGION_CONFIG_KEY)
self.proxy_server_name = self.reader_utils.get_string(self.PROXY_SERVER_NAME_KEY)
self.proxy_server_port = self.reader_utils.get_string(self.PROXY_SERVER_PORT_KEY)
self.enable_high_definition_metrics = self.reader_utils.try_get_boolean(self.ENABLE_HIGH_DEFINITION_METRICS, self._ENABLE_HIGH_DEFINITION_METRICS_DEFAULT_VALUE)
self.flush_interval_in_seconds = self.reader_utils.get_string(self.FLUSH_INTERVAL_IN_SECONDS)
self.pass_through = self.reader_utils.try_get_boolean(self.PASS_THROUGH_CONFIG_KEY, self._PASS_THROUGH_DEFAULT_VALUE)
self.debug = self.reader_utils.try_get_boolean(self.DEBUG_CONFIG_KEY, self._DEBUG_DEFAULT_VALUE)
self.push_asg = self.reader_utils.try_get_boolean(self.PUSH_ASG_KEY, self._PUSH_ASG_DEFAULT_VALUE)
Expand Down
6 changes: 3 additions & 3 deletions src/cloudwatch/modules/configuration/metadatareader.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class MetadataReader(object):

def __init__(self, metadata_server):
self.metadata_server = metadata_server
self.session = Session()
self.session.mount("http://", HTTPAdapter(max_retries=self._TOTAL_RETRIES))

def get_region(self):
""" Get the region value from the metadata service, if the last character of region is A it is automatically trimmed """
Expand Down Expand Up @@ -57,9 +59,7 @@ def _get_metadata(self, request):
'http://169.254.169.254/latest/meta-data/placement/availability-zone/'
then the request part is 'latest/meta-data/placement/availability-zone/'.
"""
session = Session()
session.mount("http://", HTTPAdapter(max_retries=self._TOTAL_RETRIES))
result = session.get(self.metadata_server + request, timeout=self._REQUEST_TIMEOUT)
result = self.session.get(self.metadata_server + request, timeout=self._REQUEST_TIMEOUT)
if result.status_code is codes.ok:
return str(result.text)
else:
Expand Down
46 changes: 34 additions & 12 deletions src/cloudwatch/modules/flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ def __init__(self, config_helper):
self.metric_map = {}
self.last_flush_time = time.time()
self.nan_key_set = set()
self.enable_high_definition_metrics = config_helper.enable_high_definition_metrics
self.flush_interval_in_seconds = int(config_helper.flush_interval_in_seconds if config_helper.flush_interval_in_seconds else self._FLUSH_INTERVAL_IN_SECONDS)
self.max_metrics_to_aggregate = self._MAX_METRICS_PER_PUT_REQUEST if self.enable_high_definition_metrics else self._MAX_METRICS_TO_AGGREGATE
self.client = PutClient(self.config)

def is_numerical_value(self, value):
"""
Expand Down Expand Up @@ -72,10 +76,12 @@ def _flush_if_need(self, current_time):
self._flush()

def _is_flush_time(self, current_time):
return (current_time - self.last_flush_time) + self._FLUSH_DELTA_IN_SECONDS >= self._FLUSH_INTERVAL_IN_SECONDS
if self.enable_high_definition_metrics:
return (current_time - self.last_flush_time) >= self.flush_interval_in_seconds + self._FLUSH_DELTA_IN_SECONDS
return (current_time - self.last_flush_time) + self._FLUSH_DELTA_IN_SECONDS >= self.flush_interval_in_seconds

def record_nan_value(self, key, value_list):
if not key in self.nan_key_set:
if key not in self.nan_key_set:
self._LOGGER.warning(
"Adding Metric value is not numerical, key: " + key + " value: " + str(value_list.values))
self.nan_key_set.add(key)
Expand All @@ -86,19 +92,34 @@ def _aggregate_metric(self, value_list):
If the size of metric_map is above the limit, new metric will not be added and the value_list will be dropped.
"""
nan_value_count = 0
key = self._get_metric_key(value_list)
dimension_key = self._get_metric_key(value_list)
adjusted_time = int(value_list.time)
if self.config.debug:
self._LOGGER.info("Received key"+dimension_key + " Adjusted_time: " + str(adjusted_time) + " Original time: " + str(value_list.time))
key = dimension_key
if self.enable_high_definition_metrics:
key = dimension_key + "-" + str(adjusted_time)
if key in self.metric_map:
nan_value_count = self._add_values_to_metrics(self.metric_map[key], value_list)
else:
if len(self.metric_map) < self._MAX_METRICS_TO_AGGREGATE:
metrics = MetricDataBuilder(self.config, value_list).build()
nan_value_count = self._add_values_to_metrics(metrics, value_list)
if nan_value_count != len(value_list.values):
self.metric_map[key] = metrics
if len(self.metric_map) < self.max_metrics_to_aggregate:
nan_value_count = self._add_metric_to_queue(value_list, adjusted_time, key)
else:
self._LOGGER.warning("Batching queue overflow detected. Dropping metric.")
if self.enable_high_definition_metrics:
self._flush()
nan_value_count = self._add_metric_to_queue(value_list, adjusted_time, key)
else:
self._LOGGER.warning("Batching queue overflow detected. Dropping metric.")
if nan_value_count:
self.record_nan_value(key, value_list)
self.record_nan_value(dimension_key, value_list)

def _add_metric_to_queue(self, value_list, adjusted_time, key):
nan_value_count = 0
metrics = MetricDataBuilder(self.config, value_list, adjusted_time).build()
nan_value_count = self._add_values_to_metrics(metrics, value_list)
if nan_value_count != len(value_list.values):
self.metric_map[key] = metrics
return nan_value_count

def _get_metric_key(self, value_list):
"""
Expand Down Expand Up @@ -129,7 +150,7 @@ def _flush(self):
Batches and puts metrics to CloudWatch
"""
self.last_flush_time = time.time()
self.client = PutClient(self.config)
metric_map_size = len(self.metric_map)
if self.metric_map:
prepare_batch = self._prepare_batch()
try:
Expand All @@ -141,7 +162,8 @@ def _flush(self):
if len(metric_batch) < self._MAX_METRICS_PER_PUT_REQUEST:
break
except StopIteration, e:
self._LOGGER.error("_flush error: "+ str(e))
if metric_map_size % self._MAX_METRICS_PER_PUT_REQUEST != 0 or len(self.metric_map) != 0:
self._LOGGER.error("_flush error: " + str(e) + " Original map size: " + str(metric_map_size))

def _prepare_batch(self):
"""
Expand Down
Loading

0 comments on commit 68dae62

Please sign in to comment.