Skip to content

Commit

Permalink
Inspire Matomo: visits per day DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Aug 8, 2024
1 parent a1d0250 commit fc1cfb4
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ airflow standalone
### 6. Start Postgres with Docker Compose

If you're using Docker to manage your Postgres database, start the service.
IMPORTANT: Please add MATOMO_AUTH_TOKEN value in Docker compose file

```sh
docker-compose -f docker-compose.standalone.yaml up
Expand Down
15 changes: 15 additions & 0 deletions dags/common/models/inspire_matomo/inspire_matomo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from sqlalchemy import Column, Date, DateTime, Integer, func
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class MatomoData(Base):
__tablename__ = "inspire_matomo_data"

id = Column(Integer, primary_key=True)
date = Column(Date)
visits = Column(Integer)
unique_visitors = Column(Integer)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
96 changes: 96 additions & 0 deletions dags/inspire_matomo/inspire_visits_per_day_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import json
import os
from datetime import datetime

import pendulum
from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from common.models.inspire_matomo.inspire_matomo import MatomoData
from common.operators.sqlalchemy_operator import sqlalchemy_task
from executor_config import kubernetes_executor_config
from inspire_matomo.utils import get_parameters
from tenacity import retry_if_exception_type, stop_after_attempt

now = datetime.now()
formatted_date = now.strftime("%Y-%m-%d")


@dag(
start_date=pendulum.today("UTC").add(days=-1),
schedule="@monthly",
params={"period": "day", "date": formatted_date},
)
def inspire_visits_per_day_dag():
@task(executor_config=kubernetes_executor_config)
def fetch_visits_per_day(**kwargs):
http_hook = HttpHook(http_conn_id="matomo", method="GET")
period = kwargs["params"].get("period")
date = kwargs["params"].get("date")
parameters = get_parameters(
period=period, date=date, endpoint_key="visits_per_day"
)
token = os.environ.get("MATOMO_AUTH_TOKEN")
response = http_hook.run_with_advanced_retry(
endpoint="/index.php",
data=parameters,
headers={"Authorization": f"Bearer {token}"},
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(AirflowException),
},
)
return response.text

@task(executor_config=kubernetes_executor_config)
def fetch_unique_visitors_per_day(**kwargs):
http_hook = HttpHook(http_conn_id="matomo", method="GET")
period = kwargs["params"].get("period")
date = kwargs["params"].get("date")
parameters = get_parameters(
period=period, date=date, endpoint_key="unique_visitors"
)
token = os.environ.get("MATOMO_AUTH_TOKEN")
response = http_hook.run_with_advanced_retry(
endpoint="/index.php",
data=parameters,
headers={"Authorization": f"Bearer {token}"},
_retry_args={
"stop": stop_after_attempt(3),
"retry": retry_if_exception_type(AirflowException),
},
)
return response.text

@sqlalchemy_task(conn_id="superset")
def populate_database(visits_per_day, unique_visitors_per_day, session, **kwargs):
visits_per_day_json = json.loads(visits_per_day)
unique_visitors_per_day_json = json.loads(unique_visitors_per_day)
date = kwargs["params"].get("date")

record = (
session.query(MatomoData)
.filter_by(date=visits_per_day_json.get("date"))
.first()
)
if record:
record.visits = int(visits_per_day_json.visits)
record.unique_visitors = int(unique_visitors_per_day_json.unique_visitors)
else:
parsed_date = datetime.strptime(date, "%Y-%m-%d").date()
new_record = MatomoData(
visits=int(visits_per_day_json.get("value")),
unique_visitors=int(unique_visitors_per_day_json.get("value")),
date=parsed_date,
)
session.add(new_record)

visits_per_day = fetch_visits_per_day()
unique_visitors_per_day = fetch_unique_visitors_per_day()
populate_database(
visits_per_day=visits_per_day,
unique_visitors_per_day=unique_visitors_per_day,
)


inspire_visits_per_day = inspire_visits_per_day_dag()
22 changes: 22 additions & 0 deletions dags/inspire_matomo/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import os


def get_endpoint(key):
end_point_map = {
"visits_per_day": "VisitsSummary.getVisits",
"unique_visitors": "VisitsSummary.getUniqueVisitors",
}
return end_point_map[key]


def get_parameters(period, date, endpoint_key):
return {
"module": "API",
"token_auth": os.environ.get("MATOMO_AUTH_TOKEN"),
"idSite": os.environ.get("MATOMO_SITE_ID"),
"date": str(date),
"period": period,
"format": "json",
"module": "API",
"method": get_endpoint(endpoint_key),
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Inspire Matomo Database Revison
Revision ID: 03e3056e748f
Revises: 101f23913167
Create Date: 2024-08-08 11:07:36.748634
"""
from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "03e3056e748f"
down_revision: Union[str, None] = "101f23913167"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.create_table(
"inspire_matomo_data",
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("date", sa.Date, nullable=False),
sa.Column("visits", sa.Integer, nullable=False),
sa.Column("unique_visitors", sa.Integer, nullable=False),
sa.Column("created_at", sa.TIMESTAMP(timezone=True), nullable=False),
sa.Column("updated_at", sa.TIMESTAMP(timezone=True), nullable=False),
)


def downgrade() -> None:
op.drop_table("inspire_matomo_data")

0 comments on commit fc1cfb4

Please sign in to comment.