diff --git a/README.md b/README.md index f19ce28..3f54f4c 100644 --- a/README.md +++ b/README.md @@ -15,25 +15,9 @@ Export issue metadata & agile metrics, transform and load to OLAP data storage. > You can fork this repository and refine the tool the way you want. Or use it as it is - this will allow you to build basic analytics on the tasks from Yandex.Tracker. -## On-premise arch example - -![](/docs/images/agile_metrics.png) - -So, you can install Clickhouse with Proxy via [Ansible role inside project (previous versions)](https://github.com/akimrx/yandex-tracker-exporter/tree/v0.1.19/ansible). -Edit the inventory file `ansible/inventory/hosts.yml` and just run ansible-playbook. - -> **Attention:** -> For the role to work correctly, docker must be installed on the target server. - -Example: -```bash - -pip3 install -r requirements-dev.txt -cd ansible -ansible-playbook -i inventory/hosts.yml playbooks/clickhouse.yml --limit agile -``` - -Also, you can use [this extended Clickhouse role](https://github.com/akimrx/ansible-clickhouse-role) +Require: +* Python `>=3.10.*` +* Clickhouse + specific [tables](#migration) ## Usage @@ -45,7 +29,7 @@ Also, you can use [this extended Clickhouse role](https://github.com/akimrx/ansi # prepare virtual environment python3 -m venv venv source venv/bin/activate -python3 setup.py install +make install # configure environment variables export EXPORTER_TRACKER__TOKEN=your_token @@ -68,7 +52,7 @@ pip3 install tracker-exporter tracker-exporter ``` -#### Use .env file +#### Configure via .env file Read about the settings [here](#environment-variables-settings) @@ -86,6 +70,31 @@ docker-compose up -d docker logs tracker-exporter -f ``` +## On-premise arch example + +![](/docs/images/agile_metrics.png) + +### On-premise Clickhouse + +So, you can install Clickhouse with Proxy via [Ansible role inside project (previous versions)](https://github.com/akimrx/yandex-tracker-exporter/tree/v0.1.19/ansible). +Edit the inventory file `ansible/inventory/hosts.yml` and just run ansible-playbook. + +> **Attention:** +> For the role to work correctly, docker must be installed on the target server. + +Example Clickhouse installation: +```bash +git clone https://github.com/akimrx/yandex-tracker-exporter.git +cd yandex-tracker-exporter +git checkout v0.1.19 +python3 -m venv venv && source venv/bin/activate +pip3 install -r requirements-dev.txt +cd ansible +ansible-playbook -i inventory/hosts.yml playbooks/clickhouse.yml --limit agile +``` + +Also, you can use [this extended Clickhouse role](https://github.com/akimrx/ansible-clickhouse-role) + ## Yandex.Cloud – Cloud Functions @@ -237,7 +246,7 @@ run_migration # Configuration via environment variables -See config declaration [here](/src/config.py) +See config declaration [here](/tracker_exporter/config.py) ## General settings @@ -290,6 +299,10 @@ See config declaration [here](/src/config.py) | `EXPORTER_CLICKHOUSE__ISSUES_TABLE` | Clickhouse table for issues metadata. Default: `issues` | `EXPORTER_CLICKHOUSE__ISSUE_METRICS_TABLE` | Clickhouse table for issue metrics. Default: `issue_metrics` | `EXPORTER_CLICKHOUSE__AUTO_DEDUPLICATE` | Execute `OPTIMIZE` after each `INSERT`. Default is `True` +| `EXPORTER_CLICKHOUSE__BACKOFF_BASE_DELAY` | Base delay for backoff strategy. Default: `0.5` (sec) +| `EXPORTER_CLICKHOUSE__BACKOFF_EXPO_FACTOR` | Exponential factor for multiply every try. Default: `2.5` (sec) +| `EXPORTER_CLICKHOUSE__BACKOFF_MAX_TRIES` | Max tries for backoff strategy. Default: `3` +| `EXPORTER_CLICKHOUSE__BACKOFF_JITTER` | Enable jitter (randomize delay) for retries. Default: `True` ## State settings @@ -314,8 +327,8 @@ See config declaration [here](/src/config.py) | `EXPORTER_MONITORING__METRICS_HOST` | DogStatsD / statsd host. Default: `localhost` | `EXPORTER_MONITORING__METRICS_PORT` | DogStatsD / statsd port. Default: `8125` | `EXPORTER_MONITORING__METRICS_BASE_PREFIX` | Prefix for metrics name. Default: `tracker_exporter` -| `EXPORTER_MONITORING__METRICS_BASE_LABELS` | Tags for metrics. Default: `["project:internal",]` -| `EXPORTER_MONITORING__SENTRY_ENABLED` | Enable send exception stacktrace to Sentry. Default is empty +| `EXPORTER_MONITORING__METRICS_BASE_LABELS` | List of tags for metrics. Example: `["project:internal",]`. Default is empty +| `EXPORTER_MONITORING__SENTRY_ENABLED` | Enable send exception stacktrace to Sentry. Default is `False` | `EXPORTER_MONITORING__SENTRY_DSN` | Sentry DSN. Default is empty diff --git a/TODO.md b/TODO.md index 465018a..fbc37a9 100644 --- a/TODO.md +++ b/TODO.md @@ -1,9 +1,9 @@ # Project roadmap -- [ ] refactoring code +- [x] refactoring code +- [x] CI - [ ] pytest -- [ ] CI - [ ] stateful mode - [ ] helm chart - [ ] docker image diff --git a/tracker_exporter/config.py b/tracker_exporter/config.py index 6abbde2..6acde89 100644 --- a/tracker_exporter/config.py +++ b/tracker_exporter/config.py @@ -23,7 +23,7 @@ class MonitoringSettings(BaseSettings): metrics_host: str = "localhost" metrics_port: int = 8125 metrics_base_prefix: str = "tracker_exporter" - metrics_base_labels: List[str] = ["project:internal",] + metrics_base_labels: List[str] = [] sentry_enabled: bool = False sentry_dsn: str | None = None @@ -52,6 +52,10 @@ class ClickhouseSettings(BaseSettings): issues_table: str = "issues" issue_metrics_table: str = "issue_metrics" auto_deduplicate: bool = True + backoff_base_delay: int | float = 0.5 + backoff_expo_factor: int | float = 2.5 + backoff_max_tries: int = 3 + backoff_jitter: bool = True @validator("serverless_proxy_id", pre=True, always=True) def validate_serverless_proxy_id(cls, value: str | None, values: dict) -> str: @@ -145,15 +149,24 @@ class StateSettings(BaseSettings): jsonfile_s3_secret_key: str | None = None custom_storage_params: dict = {} - # @root_validator(pre=True) - # def validate_state(cls, values) -> str: - # stateful = values.get("stateful") - # storage = values.get("storage") - - # if stateful and storage is None: - # raise ConfigurationError("Can't run ETL in stateful mode choose storage") + @root_validator(pre=True) + def validate_state(cls, values) -> str: + jsonfile_strategy = values.get("jsonfile_strategy") + jsonfile_s3_bucket = values.get("jsonfile_s3_bucket") + jsonfile_s3_endpoint = values.get("jsonfile_s3_endpoint") + jsonfile_s3_access_key = values.get("jsonfile_s3_access_key") + jsonfile_s3_secret_key = values.get("jsonfile_s3_secret_key") + s3_is_configured = all(( + jsonfile_s3_bucket, + jsonfile_s3_endpoint, + jsonfile_s3_access_key, + jsonfile_s3_secret_key + )) + + if jsonfile_strategy == JsonStorageStrategies.s3 and not s3_is_configured: + raise ConfigurationError("S3 must be configured for JSONFileStorage with S3 strategy.") - # return values + return values class Config: extra = "ignore" @@ -189,7 +202,7 @@ class Settings(BaseSettings): "end_date", "start_time", "end_time", - "moved_at" + "moved_at", ) @validator("closed_issue_statuses", pre=True, always=True) @@ -208,7 +221,7 @@ def validate_closed_issue_statuses(cls, value: str) -> list: def validate_not_nullable_fields(cls, value: str) -> list: if not isinstance(value, (str, list, tuple)): raise ConfigurationError( - "Invalid not_nullable_fields. Example: created_at,deadline,updated_at. Received: %s", + "Invalid NOT_NULLABLE_FIELDS. Example: created_at,deadline,updated_at. Received: %s", value ) diff --git a/tracker_exporter/etl.py b/tracker_exporter/etl.py index 9222eff..c52c600 100644 --- a/tracker_exporter/etl.py +++ b/tracker_exporter/etl.py @@ -140,7 +140,7 @@ def _export_and_transform( monitoring.send_gauge_metric("issues_without_metrics", value=issues_without_metrics) logger.info( - f"Total issues: {len(issues)}, total cycle time metrics: {len(metrics)}, " + f"Total issues: {len(issues)}, total metrics: {len(metrics)}, " f"ignored issues with empty metrics: {issues_without_metrics}" ) return issues, metrics, last_updated_at @@ -200,7 +200,7 @@ def run( else: if all((stateful, self.state, possible_new_state)): self.state.set(self.state_key, possible_new_state) - monitoring.send_gauge_metric("last_update_timestamp", value=int(time.time())) # TODO (akimrx): convert possible_new_state to timestamp instead time.time() + monitoring.send_gauge_metric("last_update_timestamp", value=int(time.time())) finally: monitoring.send_gauge_metric("etl_upload_status", value=1 if success else 2) else: diff --git a/tracker_exporter/main.py b/tracker_exporter/main.py index d8637bc..56f0dae 100644 --- a/tracker_exporter/main.py +++ b/tracker_exporter/main.py @@ -53,6 +53,7 @@ def signal_handler(sig, frame) -> None: # pylint: disable=W0613 + """Graceful shutdown.""" if sig in (signal.SIGINT, signal.SIGTERM,): logger.warning(f"Received {signal.Signals(sig).name}, graceful shutdown...") scheduler.shutdown() @@ -60,6 +61,7 @@ def signal_handler(sig, frame) -> None: # pylint: disable=W0613 def configure_sentry() -> None: + """Configure Sentry client for send exception stacktraces.""" if config.monitoring.sentry_enabled: assert config.monitoring.sentry_dsn is not None sentry_sdk.init( @@ -73,28 +75,35 @@ def configure_sentry() -> None: ) +def configure_jsonfile_storage() -> JsonStateStorage: + """Configure and returns storage for StateKeeper.""" + match config.state.jsonfile_strategy: + case JsonStorageStrategies.local: + storage_strategy = LocalFileStorageStrategy(config.state.jsonfile_path) + case JsonStorageStrategies.s3: + raise NotImplementedError + case _: + raise ValueError + return JsonStateStorage(storage_strategy) + + def configure_state_service() -> StateKeeper | None: + """Configure StateKeeper for ETL stateful mode.""" if not config.stateful: return match config.state.storage: case StateStorageTypes.jsonfile: - match config.state.jsonfile_strategy: - case JsonStorageStrategies.local: - storage_strategy = LocalFileStorageStrategy(config.state.jsonfile_path) - case JsonStorageStrategies.s3: - raise NotImplementedError - case _: - raise ValueError - state_service = StateKeeper(JsonStateStorage(storage_strategy)) + storage = configure_jsonfile_storage() case StateStorageTypes.redis: raise NotImplementedError case _: raise ValueError - return state_service + return StateKeeper(storage) def run_etl(ignore_exceptions: bool = False) -> None: + """Start ETL process.""" etl = YandexTrackerETL( tracker_client=YandexTrackerClient(), clickhouse_client=ClickhouseClient(), @@ -112,6 +121,7 @@ def run_etl(ignore_exceptions: bool = False) -> None: def main() -> None: + """Entry point for CLI command.""" configure_sentry() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @@ -119,7 +129,7 @@ def main() -> None: scheduler.add_job( run_etl, trigger="interval", - name="issues_cycle_time_exporter", + name="tracker_etl_default", minutes=int(config.etl_interval_minutes), max_instances=1, next_run_time=datetime.now() + timedelta(seconds=5) diff --git a/tracker_exporter/models/issue.py b/tracker_exporter/models/issue.py index 661af80..6a1c023 100644 --- a/tracker_exporter/models/issue.py +++ b/tracker_exporter/models/issue.py @@ -25,14 +25,15 @@ class TrackerIssueMetric(Base): """This object represents a issue metrics for TrackerIssue object.""" - def __init__(self, - issue_key: str, - status_name: str, - status_transitions_count: int, - duration: int, - busdays_duration: int, - last_seen: str): - + def __init__( + self, + issue_key: str, + status_name: str, + status_transitions_count: int, + duration: int, + busdays_duration: int, + last_seen: str + ) -> None: self.issue_key = issue_key self.status_name = status_name self.status_transitions_count = status_transitions_count @@ -52,7 +53,7 @@ def __init__(self, issue: Issues) -> None: self._transform(self._issue) def _transform(self, issue: Issues) -> None: - """Formation of a task object based on its metadata.""" + """Transformation of a issue into useful data.""" logger.debug(f"Transforming issue {issue.key}...") self.queue: str = issue.queue.key @@ -88,12 +89,14 @@ def _transform(self, issue: Issues) -> None: self.moved_by: str = None def _on_changelog_issue_moved(self, event: object) -> None: + """Actions whe 'issue moved' event triggered.""" logger.debug(f"Moved issue found: {self.issue_key}") self.was_moved = True self.moved_by = validate_resource(event.updatedBy, "email") self.moved_at = convert_datetime(event.updatedAt) def _on_changelog_issue_workflow(self, event: object) -> None: + """Actions whe 'issue wofklow' event triggered.""" logger.debug(f"Issue workflow fields found: {event.fields}") if len(event.fields) < 2: @@ -153,8 +156,6 @@ def _on_changelog_issue_workflow(self, event: object) -> None: def metrics(self) -> List[TrackerIssueMetric]: """ All metrics are based on status change events in the task history. - The method has the ability to filter only the necessary statuses - passed in the argument. The metric of being in the status is considered only after the end of being in the calculated status. diff --git a/tracker_exporter/services/clickhouse.py b/tracker_exporter/services/clickhouse.py index 4aeedf6..111df60 100644 --- a/tracker_exporter/services/clickhouse.py +++ b/tracker_exporter/services/clickhouse.py @@ -68,7 +68,13 @@ def _prepare_query_params(self): return params - @backoff((ConnectionError, Timeout)) + @backoff( + exceptions=(ConnectionError, Timeout), + base_delay=config.clickhouse.backoff_base_delay, + expo_factor=config.clickhouse.backoff_expo_factor, + max_tries=config.clickhouse.backoff_max_tries, + jitter=config.clickhouse.backoff_jitter, + ) def execute(self, query: str) -> Response | None: url = f"{self.proto}://{self.host}:{self.port}" params = self._prepare_query_params() diff --git a/tracker_exporter/services/state.py b/tracker_exporter/services/state.py index 82f3d9b..9af6afb 100644 --- a/tracker_exporter/services/state.py +++ b/tracker_exporter/services/state.py @@ -10,12 +10,14 @@ class S3FileStorageStrategy(JSONFileStorageStrategy): + """Strategy for storing a JSON file in the remote object storage (S3).""" def __init__(self, file_path: str): self.file_path = file_path raise NotImplementedError # TODO (akimrx): implement class LocalFileStorageStrategy(JSONFileStorageStrategy): + """Strategy for storing a JSON file in the local file system.""" def __init__(self, file_path: str, raise_if_not_exists: bool = False): self.file_path = file_path self.raise_if_not_exists = raise_if_not_exists diff --git a/tracker_exporter/services/tracker.py b/tracker_exporter/services/tracker.py index 6b9e020..f557ec6 100644 --- a/tracker_exporter/services/tracker.py +++ b/tracker_exporter/services/tracker.py @@ -27,7 +27,6 @@ def __init__( cloud_org_id: str | None = config.tracker.cloud_org_id, timeout: int = config.tracker.timeout, retries: int = config.tracker.max_retries, - timezone: str = "UTC", lang: YandexTrackerLanguages = config.tracker.language ) -> None: self.client = TrackerClient( diff --git a/tracker_exporter/utils/helpers.py b/tracker_exporter/utils/helpers.py index 4d85351..e8c6dd8 100644 --- a/tracker_exporter/utils/helpers.py +++ b/tracker_exporter/utils/helpers.py @@ -112,10 +112,10 @@ def to_snake_case(text: str) -> str: if text.strip() == "": return text.strip() - text = re.sub(r'(?<=[a-z])(?=[A-Z])', '_', text) - text = re.sub(r'(?<=[a-z])(?=\d)', '_', text) - text = re.sub(r'(?<=\d)(?=[a-z])', '_', text) - text = re.sub(r'[^a-zA-Z0-9_]', '_', text) + text = re.sub(r"(?<=[a-z])(?=[A-Z])", "_", text) + text = re.sub(r"(?<=[a-z])(?=\d)", "_", text) + text = re.sub(r"(?<=\d)(?=[a-z])", "_", text) + text = re.sub(r"[^a-zA-Z0-9_]", "_", text) return text.lower()