Skip to content

Commit

Permalink
chore: readme, docstrings, configurable backoff for clickhose
Browse files Browse the repository at this point in the history
  • Loading branch information
akimrx committed Oct 18, 2023
1 parent 82eb3db commit 9cad8fd
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 66 deletions.
61 changes: 37 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,9 @@ Export issue metadata & agile metrics, transform and load to OLAP data storage.

> You can fork this repository and refine the tool the way you want. Or use it as it is - this will allow you to build basic analytics on the tasks from Yandex.Tracker.
## On-premise arch example

![](/docs/images/agile_metrics.png)

So, you can install Clickhouse with Proxy via [Ansible role inside project (previous versions)](https://github.com/akimrx/yandex-tracker-exporter/tree/v0.1.19/ansible).
Edit the inventory file `ansible/inventory/hosts.yml` and just run ansible-playbook.

> **Attention:**
> For the role to work correctly, docker must be installed on the target server.
Example:
```bash

pip3 install -r requirements-dev.txt
cd ansible
ansible-playbook -i inventory/hosts.yml playbooks/clickhouse.yml --limit agile
```

Also, you can use [this extended Clickhouse role](https://github.com/akimrx/ansible-clickhouse-role)
Require:
* Python `>=3.10.*`
* Clickhouse + specific [tables](#migration)

## Usage

Expand All @@ -45,7 +29,7 @@ Also, you can use [this extended Clickhouse role](https://github.com/akimrx/ansi
# prepare virtual environment
python3 -m venv venv
source venv/bin/activate
python3 setup.py install
make install

# configure environment variables
export EXPORTER_TRACKER__TOKEN=your_token
Expand All @@ -68,7 +52,7 @@ pip3 install tracker-exporter
tracker-exporter
```

#### Use .env file
#### Configure via .env file

Read about the settings [here](#environment-variables-settings)

Expand All @@ -86,6 +70,31 @@ docker-compose up -d
docker logs tracker-exporter -f
```

## On-premise arch example

![](/docs/images/agile_metrics.png)

### On-premise Clickhouse

So, you can install Clickhouse with Proxy via [Ansible role inside project (previous versions)](https://github.com/akimrx/yandex-tracker-exporter/tree/v0.1.19/ansible).
Edit the inventory file `ansible/inventory/hosts.yml` and just run ansible-playbook.

> **Attention:**
> For the role to work correctly, docker must be installed on the target server.
Example Clickhouse installation:
```bash
git clone https://github.com/akimrx/yandex-tracker-exporter.git
cd yandex-tracker-exporter
git checkout v0.1.19
python3 -m venv venv && source venv/bin/activate
pip3 install -r requirements-dev.txt
cd ansible
ansible-playbook -i inventory/hosts.yml playbooks/clickhouse.yml --limit agile
```

Also, you can use [this extended Clickhouse role](https://github.com/akimrx/ansible-clickhouse-role)


## Yandex.Cloud – Cloud Functions

Expand Down Expand Up @@ -237,7 +246,7 @@ run_migration

# Configuration via environment variables

See config declaration [here](/src/config.py)
See config declaration [here](/tracker_exporter/config.py)

## General settings

Expand Down Expand Up @@ -290,6 +299,10 @@ See config declaration [here](/src/config.py)
| `EXPORTER_CLICKHOUSE__ISSUES_TABLE` | Clickhouse table for issues metadata. Default: `issues`
| `EXPORTER_CLICKHOUSE__ISSUE_METRICS_TABLE` | Clickhouse table for issue metrics. Default: `issue_metrics`
| `EXPORTER_CLICKHOUSE__AUTO_DEDUPLICATE` | Execute `OPTIMIZE` after each `INSERT`. Default is `True`
| `EXPORTER_CLICKHOUSE__BACKOFF_BASE_DELAY` | Base delay for backoff strategy. Default: `0.5` (sec)
| `EXPORTER_CLICKHOUSE__BACKOFF_EXPO_FACTOR` | Exponential factor for multiply every try. Default: `2.5` (sec)
| `EXPORTER_CLICKHOUSE__BACKOFF_MAX_TRIES` | Max tries for backoff strategy. Default: `3`
| `EXPORTER_CLICKHOUSE__BACKOFF_JITTER` | Enable jitter (randomize delay) for retries. Default: `True`

## State settings

Expand All @@ -314,8 +327,8 @@ See config declaration [here](/src/config.py)
| `EXPORTER_MONITORING__METRICS_HOST` | DogStatsD / statsd host. Default: `localhost`
| `EXPORTER_MONITORING__METRICS_PORT` | DogStatsD / statsd port. Default: `8125`
| `EXPORTER_MONITORING__METRICS_BASE_PREFIX` | Prefix for metrics name. Default: `tracker_exporter`
| `EXPORTER_MONITORING__METRICS_BASE_LABELS` | Tags for metrics. Default: `["project:internal",]`
| `EXPORTER_MONITORING__SENTRY_ENABLED` | Enable send exception stacktrace to Sentry. Default is empty
| `EXPORTER_MONITORING__METRICS_BASE_LABELS` | List of tags for metrics. Example: `["project:internal",]`. Default is empty
| `EXPORTER_MONITORING__SENTRY_ENABLED` | Enable send exception stacktrace to Sentry. Default is `False`
| `EXPORTER_MONITORING__SENTRY_DSN` | Sentry DSN. Default is empty


Expand Down
4 changes: 2 additions & 2 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@

# Project roadmap

- [ ] refactoring code
- [x] refactoring code
- [x] CI
- [ ] pytest
- [ ] CI
- [ ] stateful mode
- [ ] helm chart
- [ ] docker image
Expand Down
35 changes: 24 additions & 11 deletions tracker_exporter/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class MonitoringSettings(BaseSettings):
metrics_host: str = "localhost"
metrics_port: int = 8125
metrics_base_prefix: str = "tracker_exporter"
metrics_base_labels: List[str] = ["project:internal",]
metrics_base_labels: List[str] = []
sentry_enabled: bool = False
sentry_dsn: str | None = None

Expand Down Expand Up @@ -52,6 +52,10 @@ class ClickhouseSettings(BaseSettings):
issues_table: str = "issues"
issue_metrics_table: str = "issue_metrics"
auto_deduplicate: bool = True
backoff_base_delay: int | float = 0.5
backoff_expo_factor: int | float = 2.5
backoff_max_tries: int = 3
backoff_jitter: bool = True

@validator("serverless_proxy_id", pre=True, always=True)
def validate_serverless_proxy_id(cls, value: str | None, values: dict) -> str:
Expand Down Expand Up @@ -145,15 +149,24 @@ class StateSettings(BaseSettings):
jsonfile_s3_secret_key: str | None = None
custom_storage_params: dict = {}

# @root_validator(pre=True)
# def validate_state(cls, values) -> str:
# stateful = values.get("stateful")
# storage = values.get("storage")

# if stateful and storage is None:
# raise ConfigurationError("Can't run ETL in stateful mode choose storage")
@root_validator(pre=True)
def validate_state(cls, values) -> str:
jsonfile_strategy = values.get("jsonfile_strategy")
jsonfile_s3_bucket = values.get("jsonfile_s3_bucket")
jsonfile_s3_endpoint = values.get("jsonfile_s3_endpoint")
jsonfile_s3_access_key = values.get("jsonfile_s3_access_key")
jsonfile_s3_secret_key = values.get("jsonfile_s3_secret_key")
s3_is_configured = all((
jsonfile_s3_bucket,
jsonfile_s3_endpoint,
jsonfile_s3_access_key,
jsonfile_s3_secret_key
))

if jsonfile_strategy == JsonStorageStrategies.s3 and not s3_is_configured:
raise ConfigurationError("S3 must be configured for JSONFileStorage with S3 strategy.")

# return values
return values

class Config:
extra = "ignore"
Expand Down Expand Up @@ -189,7 +202,7 @@ class Settings(BaseSettings):
"end_date",
"start_time",
"end_time",
"moved_at"
"moved_at",
)

@validator("closed_issue_statuses", pre=True, always=True)
Expand All @@ -208,7 +221,7 @@ def validate_closed_issue_statuses(cls, value: str) -> list:
def validate_not_nullable_fields(cls, value: str) -> list:
if not isinstance(value, (str, list, tuple)):
raise ConfigurationError(
"Invalid not_nullable_fields. Example: created_at,deadline,updated_at. Received: %s",
"Invalid NOT_NULLABLE_FIELDS. Example: created_at,deadline,updated_at. Received: %s",
value
)

Expand Down
4 changes: 2 additions & 2 deletions tracker_exporter/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def _export_and_transform(

monitoring.send_gauge_metric("issues_without_metrics", value=issues_without_metrics)
logger.info(
f"Total issues: {len(issues)}, total cycle time metrics: {len(metrics)}, "
f"Total issues: {len(issues)}, total metrics: {len(metrics)}, "
f"ignored issues with empty metrics: {issues_without_metrics}"
)
return issues, metrics, last_updated_at
Expand Down Expand Up @@ -200,7 +200,7 @@ def run(
else:
if all((stateful, self.state, possible_new_state)):
self.state.set(self.state_key, possible_new_state)
monitoring.send_gauge_metric("last_update_timestamp", value=int(time.time())) # TODO (akimrx): convert possible_new_state to timestamp instead time.time()
monitoring.send_gauge_metric("last_update_timestamp", value=int(time.time()))
finally:
monitoring.send_gauge_metric("etl_upload_status", value=1 if success else 2)
else:
Expand Down
30 changes: 20 additions & 10 deletions tracker_exporter/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@


def signal_handler(sig, frame) -> None: # pylint: disable=W0613
"""Graceful shutdown."""
if sig in (signal.SIGINT, signal.SIGTERM,):
logger.warning(f"Received {signal.Signals(sig).name}, graceful shutdown...")
scheduler.shutdown()
sys.exit(0)


def configure_sentry() -> None:
"""Configure Sentry client for send exception stacktraces."""
if config.monitoring.sentry_enabled:
assert config.monitoring.sentry_dsn is not None
sentry_sdk.init(
Expand All @@ -73,28 +75,35 @@ def configure_sentry() -> None:
)


def configure_jsonfile_storage() -> JsonStateStorage:
"""Configure and returns storage for StateKeeper."""
match config.state.jsonfile_strategy:
case JsonStorageStrategies.local:
storage_strategy = LocalFileStorageStrategy(config.state.jsonfile_path)
case JsonStorageStrategies.s3:
raise NotImplementedError
case _:
raise ValueError
return JsonStateStorage(storage_strategy)


def configure_state_service() -> StateKeeper | None:
"""Configure StateKeeper for ETL stateful mode."""
if not config.stateful:
return

match config.state.storage:
case StateStorageTypes.jsonfile:
match config.state.jsonfile_strategy:
case JsonStorageStrategies.local:
storage_strategy = LocalFileStorageStrategy(config.state.jsonfile_path)
case JsonStorageStrategies.s3:
raise NotImplementedError
case _:
raise ValueError
state_service = StateKeeper(JsonStateStorage(storage_strategy))
storage = configure_jsonfile_storage()
case StateStorageTypes.redis:
raise NotImplementedError
case _:
raise ValueError
return state_service
return StateKeeper(storage)


def run_etl(ignore_exceptions: bool = False) -> None:
"""Start ETL process."""
etl = YandexTrackerETL(
tracker_client=YandexTrackerClient(),
clickhouse_client=ClickhouseClient(),
Expand All @@ -112,14 +121,15 @@ def run_etl(ignore_exceptions: bool = False) -> None:


def main() -> None:
"""Entry point for CLI command."""
configure_sentry()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
scheduler.start()
scheduler.add_job(
run_etl,
trigger="interval",
name="issues_cycle_time_exporter",
name="tracker_etl_default",
minutes=int(config.etl_interval_minutes),
max_instances=1,
next_run_time=datetime.now() + timedelta(seconds=5)
Expand Down
23 changes: 12 additions & 11 deletions tracker_exporter/models/issue.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@

class TrackerIssueMetric(Base):
"""This object represents a issue metrics for TrackerIssue object."""
def __init__(self,
issue_key: str,
status_name: str,
status_transitions_count: int,
duration: int,
busdays_duration: int,
last_seen: str):

def __init__(
self,
issue_key: str,
status_name: str,
status_transitions_count: int,
duration: int,
busdays_duration: int,
last_seen: str
) -> None:
self.issue_key = issue_key
self.status_name = status_name
self.status_transitions_count = status_transitions_count
Expand All @@ -52,7 +53,7 @@ def __init__(self, issue: Issues) -> None:
self._transform(self._issue)

def _transform(self, issue: Issues) -> None:
"""Formation of a task object based on its metadata."""
"""Transformation of a issue into useful data."""
logger.debug(f"Transforming issue {issue.key}...")

self.queue: str = issue.queue.key
Expand Down Expand Up @@ -88,12 +89,14 @@ def _transform(self, issue: Issues) -> None:
self.moved_by: str = None

def _on_changelog_issue_moved(self, event: object) -> None:
"""Actions whe 'issue moved' event triggered."""
logger.debug(f"Moved issue found: {self.issue_key}")
self.was_moved = True
self.moved_by = validate_resource(event.updatedBy, "email")
self.moved_at = convert_datetime(event.updatedAt)

def _on_changelog_issue_workflow(self, event: object) -> None:
"""Actions whe 'issue wofklow' event triggered."""
logger.debug(f"Issue workflow fields found: {event.fields}")

if len(event.fields) < 2:
Expand Down Expand Up @@ -153,8 +156,6 @@ def _on_changelog_issue_workflow(self, event: object) -> None:
def metrics(self) -> List[TrackerIssueMetric]:
"""
All metrics are based on status change events in the task history.
The method has the ability to filter only the necessary statuses
passed in the argument.
The metric of being in the status is considered
only after the end of being in the calculated status.
Expand Down
8 changes: 7 additions & 1 deletion tracker_exporter/services/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,13 @@ def _prepare_query_params(self):

return params

@backoff((ConnectionError, Timeout))
@backoff(
exceptions=(ConnectionError, Timeout),
base_delay=config.clickhouse.backoff_base_delay,
expo_factor=config.clickhouse.backoff_expo_factor,
max_tries=config.clickhouse.backoff_max_tries,
jitter=config.clickhouse.backoff_jitter,
)
def execute(self, query: str) -> Response | None:
url = f"{self.proto}://{self.host}:{self.port}"
params = self._prepare_query_params()
Expand Down
2 changes: 2 additions & 0 deletions tracker_exporter/services/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@


class S3FileStorageStrategy(JSONFileStorageStrategy):
"""Strategy for storing a JSON file in the remote object storage (S3)."""
def __init__(self, file_path: str):
self.file_path = file_path
raise NotImplementedError # TODO (akimrx): implement


class LocalFileStorageStrategy(JSONFileStorageStrategy):
"""Strategy for storing a JSON file in the local file system."""
def __init__(self, file_path: str, raise_if_not_exists: bool = False):
self.file_path = file_path
self.raise_if_not_exists = raise_if_not_exists
Expand Down
1 change: 0 additions & 1 deletion tracker_exporter/services/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ def __init__(
cloud_org_id: str | None = config.tracker.cloud_org_id,
timeout: int = config.tracker.timeout,
retries: int = config.tracker.max_retries,
timezone: str = "UTC",
lang: YandexTrackerLanguages = config.tracker.language
) -> None:
self.client = TrackerClient(
Expand Down
8 changes: 4 additions & 4 deletions tracker_exporter/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ def to_snake_case(text: str) -> str:
if text.strip() == "":
return text.strip()

text = re.sub(r'(?<=[a-z])(?=[A-Z])', '_', text)
text = re.sub(r'(?<=[a-z])(?=\d)', '_', text)
text = re.sub(r'(?<=\d)(?=[a-z])', '_', text)
text = re.sub(r'[^a-zA-Z0-9_]', '_', text)
text = re.sub(r"(?<=[a-z])(?=[A-Z])", "_", text)
text = re.sub(r"(?<=[a-z])(?=\d)", "_", text)
text = re.sub(r"(?<=\d)(?=[a-z])", "_", text)
text = re.sub(r"[^a-zA-Z0-9_]", "_", text)

return text.lower()

Expand Down

0 comments on commit 9cad8fd

Please sign in to comment.