Skip to content

Airflow example

Danila Ganchar edited this page Nov 5, 2024 · 7 revisions

You can easily use the package with Apache Airflow. Here is an example of the pipeline that calculates some table, loads the calculated data into S3 and replaces the partition.

Files structure:

├── dags
│   └── ripley_example.py
├── docker-compose.yml
├── Dockerfile
└── entrypoint.sh
docker-compose.yml
version: '3'
services:
  postgres:
    image: postgres:13
    container_name: ripley_postgres
    network_mode: host
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow

  webserver:
    build: .
    restart: always
    container_name: ripley_airflow_web
    network_mode: host
    environment:
      - AIRFLOW__CORE__EXECUTOR=LocalExecutor
      - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@localhost/airflow
      - AIRFLOW__WEBSERVER__SECRET_KEY=mysecretkey
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
    entrypoint: /entrypoint.sh
    command: webserver

  scheduler:
    build: .
    restart: always
    container_name: ripley_airflow_scheduler
    network_mode: host
    environment:
      - AIRFLOW__CORE__EXECUTOR=LocalExecutor
      - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@localhost/airflow
      - AIRFLOW__WEBSERVER__SECRET_KEY=mysecretkey
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
    entrypoint: /entrypoint.sh
    command: scheduler

  clickhouse:
    image: clickhouse/clickhouse-server:23.4.2.11-alpine
    container_name: ripley_clickhouse
    network_mode: host
    logging:
      driver: none
    healthcheck:
      test: wget --no-verbose --tries=1 --spider localhost:8123/ping || exit 1
      interval: 2s
      timeout: 2s
      retries: 16

  s3:
    image: quay.io/minio/minio:RELEASE.2024-10-13T13-34-11Z
    container_name: ripley_s3
    network_mode: host
    logging:
      driver: none
    command:
      - server
      - --address=localhost:9001
      - /data
    environment:
      - MINIO_ROOT_USER=ripley_key
      - MINIO_ROOT_PASSWORD=ripley_secret
Dockerfile
FROM apache/airflow:2.8.0

RUN pip install --no-cache-dir ripley clickhouse-driver==0.2.9 boto3==1.35.43

COPY dags/ /opt/airflow/dags/
COPY entrypoint.sh /entrypoint.sh

USER root
RUN chmod +x /entrypoint.sh

USER airflow
ENTRYPOINT ["/entrypoint.sh"]
entrypoint.sh
#!/usr/bin/env bash
airflow db init
airflow users create \
    --username admin \
    --firstname FIRST_NAME \
    --lastname LAST_NAME \
    --role Admin \
    --email [email protected] \
    --password admin

exec airflow "$1"

Dag example(dags/ripley_example.py):

💡 You can create a custom Operators(such as: ReplacePartitions, SyncTableFromS3, S3Upload etc) and use op_kwargs + logical_date + DAG-level Params for dynamic parameters. You don't have to think about logs or write raw SQL statements.

import abc
import logging
from typing import Any, Type, Dict

import boto3
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from clickhouse_driver import Client
from pendulum.datetime import DateTime as PDateTime
from ripley import from_clickhouse
from ripley.clickhouse_models.s3_settings import ClickhouseS3SettingsModel


class AbstractRipleyCall(metaclass=abc.ABCMeta):
    def __init__(self, ti: TaskInstance = None, params: dict = None) -> None:
        self._ti = ti
        self._params = params or {}
        # enable ripley logger
        logging.getLogger('ripley').propagate = True
        # get connections params etc. hardcoded for example...
        # clickhouse_connection = DbApiHook.get_connection('clickhouse')
        client = Client(
            port=9000,
            user='default',
            password='',
            database='default',
            host='localhost',
        )

        self._clickhouse = from_clickhouse(client)
        # get connections params etc. hardcoded for example...
        # clickhouse_connection = DbApiHook.get_connection('s3')
        self._s3 = boto3.client(
            's3',
            endpoint_url='https://localhost:9001',
            region_name='us-east-1',
            use_ssl=False,
            verify=False,
            aws_access_key_id='ripley_key',
            aws_secret_access_key='ripley_secret',
        )

    @abc.abstractmethod
    def run(self) -> Any:
        pass

    @property
    def logical_date(self) -> PDateTime:
        return self._params['logical_date']

    @classmethod
    def air_runner(cls: Type['AbstractRipleyCall'], ti: TaskInstance = None, **kwargs: Dict) -> Any:
        return cls(ti, kwargs).run()


class InitExampleStructure(AbstractRipleyCall):
    def run(self) -> Any:
        for bucket in self._s3.list_buckets().get('Buckets', []):
            bucket_name = bucket['Name']
            files = self._s3.list_objects_v2(Bucket=bucket_name).get('Contents', [])
            files = [{'Key': f['Key']} for f in files]

            if files:
                self._s3.delete_objects(Bucket=bucket_name, Delete={'Objects': files})

            self._s3.delete_bucket(Bucket=bucket_name)

        self._s3.create_bucket(Bucket='ripley')
        self._clickhouse.exec('DROP DATABASE IF EXISTS maintenance')
        self._clickhouse.create_db('maintenance')
        # let's say we have a partition ('2024-01-01') with incorrect events
        self._clickhouse.exec(f"""CREATE OR REPLACE TABLE default.events (
          value String,
          day Date
        )
        ENGINE MergeTree() ORDER BY value PARTITION BY day AS (SELECT 'old_value', '{self.logical_date.date()}')""")


class CalculateEvents(AbstractRipleyCall):
    def run(self) -> Any:
        from_table = self._clickhouse.get_table_by_name('events')
        self._clickhouse.create_table_as(
            from_table=from_table,
            table='fixed_events',
            db='maintenance',
        )

        sql = f"INSERT INTO maintenance.fixed_events (value, day) VALUES ('FIXED_VALUE', '{self.logical_date.date()}')"
        logging.info(sql)
        self._clickhouse.exec(sql)


class UploadEventsToS3(AbstractRipleyCall):
    def run(self) -> Any:
        # get connections params etc. hardcoded for example...
        # clickhouse_connection = DbApiHook.get_connection('s3')
        settings = ClickhouseS3SettingsModel(
            url=f"http://localhost:9001/ripley/{self.logical_date.date()}",
            access_key_id='ripley_key',
            secret_access_key='ripley_secret',
        )

        to_s3_table = self._clickhouse.get_table_by_name('fixed_events', 'maintenance')
        self._clickhouse.insert_table_to_s3(table=to_s3_table, s3_settings=settings)


class FixProductionEvents(AbstractRipleyCall):
    def run(self) -> Any:
        prod_table = self._clickhouse.get_table_by_name('events')
        fixed_table = self._clickhouse.get_table_by_name('fixed_events', 'maintenance')
        self._clickhouse.replace_partition(
            from_table=fixed_table,
            to_table=prod_table,
            partition=f'{self.logical_date.date()}',
        )

        sql = 'SELECT * FROM default.events'
        logging.info('fixed events %s', self._clickhouse.exec(sql))


dag = DAG('ripley_example', schedule_interval=None, start_date=days_ago(1))
init = PythonOperator(
    task_id='init',
    python_callable=InitExampleStructure.air_runner,
    dag=dag,
)

calculate_events = PythonOperator(
    task_id='calculate_events',
    python_callable=CalculateEvents.air_runner,
    dag=dag,
)


upload_to_s3 = PythonOperator(
    task_id='upload_fixed_events_to_s3',
    python_callable=UploadEventsToS3.air_runner,
    dag=dag,
)


fix_production_events = PythonOperator(
    task_id='fix_production_events',
    python_callable=FixProductionEvents.air_runner,
    dag=dag,
)


init >> calculate_events >> upload_to_s3 >> fix_production_events

How to run:

$ mkdir logs
$ sudo chmod 777 ./logs
$ docker-compose build
$ docker-compose up -d

Open http://localhost:8080/, use admin / admin to login. Enable and run ripley_example:

pipeline logs1 logs2 logs3

Clone this wiki locally