-
Notifications
You must be signed in to change notification settings - Fork 0
Airflow example
Danila Ganchar edited this page Nov 5, 2024
·
7 revisions
You can easily use the package with Apache Airflow. Here is an example of the pipeline that calculates some table, loads the calculated data into S3
and replaces the partition.
Files structure:
├── dags
│ └── ripley_example.py
├── docker-compose.yml
├── Dockerfile
└── entrypoint.sh
docker-compose.yml
version: '3'
services:
postgres:
image: postgres:13
container_name: ripley_postgres
network_mode: host
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
webserver:
build: .
restart: always
container_name: ripley_airflow_web
network_mode: host
environment:
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@localhost/airflow
- AIRFLOW__WEBSERVER__SECRET_KEY=mysecretkey
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
entrypoint: /entrypoint.sh
command: webserver
scheduler:
build: .
restart: always
container_name: ripley_airflow_scheduler
network_mode: host
environment:
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@localhost/airflow
- AIRFLOW__WEBSERVER__SECRET_KEY=mysecretkey
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
entrypoint: /entrypoint.sh
command: scheduler
clickhouse:
image: clickhouse/clickhouse-server:23.4.2.11-alpine
container_name: ripley_clickhouse
network_mode: host
logging:
driver: none
healthcheck:
test: wget --no-verbose --tries=1 --spider localhost:8123/ping || exit 1
interval: 2s
timeout: 2s
retries: 16
s3:
image: quay.io/minio/minio:RELEASE.2024-10-13T13-34-11Z
container_name: ripley_s3
network_mode: host
logging:
driver: none
command:
- server
- --address=localhost:9001
- /data
environment:
- MINIO_ROOT_USER=ripley_key
- MINIO_ROOT_PASSWORD=ripley_secret
Dockerfile
FROM apache/airflow:2.8.0
RUN pip install --no-cache-dir ripley clickhouse-driver==0.2.9 boto3==1.35.43
COPY dags/ /opt/airflow/dags/
COPY entrypoint.sh /entrypoint.sh
USER root
RUN chmod +x /entrypoint.sh
USER airflow
ENTRYPOINT ["/entrypoint.sh"]
entrypoint.sh
#!/usr/bin/env bash
airflow db init
airflow users create \
--username admin \
--firstname FIRST_NAME \
--lastname LAST_NAME \
--role Admin \
--email [email protected] \
--password admin
exec airflow "$1"
Dag example(dags/ripley_example.py
):
💡 You can create a custom Operators(such as:
ReplacePartitions
,SyncTableFromS3
,S3Upload
etc) and use op_kwargs + logical_date + DAG-level Params for dynamic parameters. You don't have to think about logs or write rawSQL
statements.
import abc
import logging
from typing import Any, Type, Dict
import boto3
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from clickhouse_driver import Client
from pendulum.datetime import DateTime as PDateTime
from ripley import from_clickhouse
from ripley.clickhouse_models.s3_settings import ClickhouseS3SettingsModel
class AbstractRipleyCall(metaclass=abc.ABCMeta):
def __init__(self, ti: TaskInstance = None, params: dict = None) -> None:
self._ti = ti
self._params = params or {}
# enable ripley logger
logging.getLogger('ripley').propagate = True
# get connections params etc. hardcoded for example...
# clickhouse_connection = DbApiHook.get_connection('clickhouse')
client = Client(
port=9000,
user='default',
password='',
database='default',
host='localhost',
)
self._clickhouse = from_clickhouse(client)
# get connections params etc. hardcoded for example...
# clickhouse_connection = DbApiHook.get_connection('s3')
self._s3 = boto3.client(
's3',
endpoint_url='https://localhost:9001',
region_name='us-east-1',
use_ssl=False,
verify=False,
aws_access_key_id='ripley_key',
aws_secret_access_key='ripley_secret',
)
@abc.abstractmethod
def run(self) -> Any:
pass
@property
def logical_date(self) -> PDateTime:
return self._params['logical_date']
@classmethod
def air_runner(cls: Type['AbstractRipleyCall'], ti: TaskInstance = None, **kwargs: Dict) -> Any:
return cls(ti, kwargs).run()
class InitExampleStructure(AbstractRipleyCall):
def run(self) -> Any:
for bucket in self._s3.list_buckets().get('Buckets', []):
bucket_name = bucket['Name']
files = self._s3.list_objects_v2(Bucket=bucket_name).get('Contents', [])
files = [{'Key': f['Key']} for f in files]
if files:
self._s3.delete_objects(Bucket=bucket_name, Delete={'Objects': files})
self._s3.delete_bucket(Bucket=bucket_name)
self._s3.create_bucket(Bucket='ripley')
self._clickhouse.exec('DROP DATABASE IF EXISTS maintenance')
self._clickhouse.create_db('maintenance')
# let's say we have a partition ('2024-01-01') with incorrect events
self._clickhouse.exec(f"""CREATE OR REPLACE TABLE default.events (
value String,
day Date
)
ENGINE MergeTree() ORDER BY value PARTITION BY day AS (SELECT 'old_value', '{self.logical_date.date()}')""")
class CalculateEvents(AbstractRipleyCall):
def run(self) -> Any:
from_table = self._clickhouse.get_table_by_name('events')
self._clickhouse.create_table_as(
from_table=from_table,
table='fixed_events',
db='maintenance',
)
sql = f"INSERT INTO maintenance.fixed_events (value, day) VALUES ('FIXED_VALUE', '{self.logical_date.date()}')"
logging.info(sql)
self._clickhouse.exec(sql)
class UploadEventsToS3(AbstractRipleyCall):
def run(self) -> Any:
# get connections params etc. hardcoded for example...
# clickhouse_connection = DbApiHook.get_connection('s3')
settings = ClickhouseS3SettingsModel(
url=f"http://localhost:9001/ripley/{self.logical_date.date()}",
access_key_id='ripley_key',
secret_access_key='ripley_secret',
)
to_s3_table = self._clickhouse.get_table_by_name('fixed_events', 'maintenance')
self._clickhouse.insert_table_to_s3(table=to_s3_table, s3_settings=settings)
class FixProductionEvents(AbstractRipleyCall):
def run(self) -> Any:
prod_table = self._clickhouse.get_table_by_name('events')
fixed_table = self._clickhouse.get_table_by_name('fixed_events', 'maintenance')
self._clickhouse.replace_partition(
from_table=fixed_table,
to_table=prod_table,
partition=f'{self.logical_date.date()}',
)
sql = 'SELECT * FROM default.events'
logging.info('fixed events %s', self._clickhouse.exec(sql))
dag = DAG('ripley_example', schedule_interval=None, start_date=days_ago(1))
init = PythonOperator(
task_id='init',
python_callable=InitExampleStructure.air_runner,
dag=dag,
)
calculate_events = PythonOperator(
task_id='calculate_events',
python_callable=CalculateEvents.air_runner,
dag=dag,
)
upload_to_s3 = PythonOperator(
task_id='upload_fixed_events_to_s3',
python_callable=UploadEventsToS3.air_runner,
dag=dag,
)
fix_production_events = PythonOperator(
task_id='fix_production_events',
python_callable=FixProductionEvents.air_runner,
dag=dag,
)
init >> calculate_events >> upload_to_s3 >> fix_production_events
How to run:
$ mkdir logs
$ sudo chmod 777 ./logs
$ docker-compose build
$ docker-compose up -d
Open http://localhost:8080/
, use admin
/ admin
to login. Enable and run ripley_example
:
For more information see ClickhouseProtocol and Clickhouse Tests