diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 384baecc2..322898711 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -17,11 +17,13 @@ jobs: os: - ubuntu-22.04 - windows-2022 - python-version: [ "3.10", "3.11" ] + python-version: ["3.10", "3.11"] runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v3 + with: + submodules: true - name: Install Python and set up Poetry uses: bakdata/ci-templates/actions/python-setup-poetry@v1.5.3 @@ -69,7 +71,7 @@ jobs: - name: Install docs dependencies run: poetry install --with docs - + - name: Check markdown, toml, css formatting uses: dprint/check@v2.2 if: runner.os == 'Linux' @@ -83,21 +85,21 @@ jobs: uses: bakdata/ci-templates/.github/workflows/python-poetry-publish-snapshot.yaml@v1.25.0 secrets: pypi-token: ${{ secrets.TEST_PYPI_TOKEN }} - + publish-docs-from-main: runs-on: ubuntu-22.04 if: ${{ github.ref == 'refs/heads/main' }} needs: [test] steps: - uses: actions/checkout@v3 - + - name: Publish docs from main branch uses: ./.github/actions/update-docs with: username: ${{ secrets.GH_USERNAME }} email: ${{ secrets.GH_EMAIL }} token: ${{ secrets.GH_TOKEN }} - version: main + version: main publish-dev-docs-from-pr: runs-on: ubuntu-22.04 @@ -118,11 +120,11 @@ jobs: docs: - added|deleted|modified: 'docs/**' - - name: Publish dev docs from PR + - name: Publish dev docs from PR if: steps.docs-changes.outputs.docs == 'true' uses: ./.github/actions/update-docs with: username: ${{ secrets.GH_USERNAME }} email: ${{ secrets.GH_EMAIL }} token: ${{ secrets.GH_TOKEN }} - version: dev + version: dev diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 000000000..6980a7f09 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "examples"] + path = examples + url = https://github.com/bakdata/kpops-examples diff --git a/docs/docs/resources/examples/defaults.md b/docs/docs/resources/examples/defaults.md index 96dfd3642..f70d2dfa9 100644 --- a/docs/docs/resources/examples/defaults.md +++ b/docs/docs/resources/examples/defaults.md @@ -1,26 +1,15 @@ # Example `defaults.yaml` files -## [ATM Fraud Pipeline](https://github.com/bakdata/kpops/tree/main/examples/bakdata/atm-fraud-detection){target=_blank} +## [KPOps examples](https://github.com/bakdata/kpops-examples){target=_blank} - - -??? example "defaults.yaml" - ```yaml - --8<-- - https://raw.githubusercontent.com/bakdata/kpops/main/examples/bakdata/atm-fraud-detection/defaults.yaml - --8<-- - ``` - - - -## [Word-count Pipeline](https://github.com/bakdata/kpops-examples/tree/main/word-count){target=_blank} +Shared defaults for [ATM Fraud Pipeline](https://github.com/bakdata/kpops-examples/tree/main/atm-fraud){target=_blank} and [Word-count Pipeline](https://github.com/bakdata/kpops-examples/tree/main/word-count){target=_blank} ??? example "defaults.yaml" ```yaml --8<-- - https://raw.githubusercontent.com/bakdata/kpops-examples/main/word-count/defaults.yaml + https://raw.githubusercontent.com/bakdata/kpops-examples/main/defaults.yaml --8<-- ``` diff --git a/docs/docs/user/examples/atm-fraud-pipeline.md b/docs/docs/user/examples/atm-fraud-pipeline.md index cfc811c70..90b4ea36a 100644 --- a/docs/docs/user/examples/atm-fraud-pipeline.md +++ b/docs/docs/user/examples/atm-fraud-pipeline.md @@ -41,7 +41,7 @@ postgresql bitnami/postgresql ??? example "PostgreSQL Example Helm chart values (`postgresql.yaml`)" ```yaml auth: - database: app_db + database: app_db enablePostgresUser: true password: AppPassword postgresPassword: StrongPassword @@ -74,20 +74,21 @@ kubectl port-forward --namespace kpops service/k8kafka-cp-kafka-connect 8083:808 -1. Export environment variables in your terminal: +1. Clone the [kpops-examples repository](https://github.com/bakdata/kpops-examples){target=_blank} and `cd` into the directory. + +2. Install KPOps `pip install -r requirements.txt`. + +3. Export environment variables in your terminal: ```shell export DOCKER_REGISTRY=bakdata && \ export NAMESPACE=kpops ``` -2. Deploy the pipeline +4. Deploy the pipeline ```shell - poetry run kpops deploy ./examples/bakdata/atm-fraud-detection/pipeline.yaml \ - --pipeline-base-dir ./examples \ - --config ./examples/bakdata/atm-fraud-detection/config.yaml \ - --execute + kpops deploy atm-fraud/pipeline.yaml --execute ``` !!! Note @@ -116,7 +117,7 @@ You should be able to see pipeline shown in the image below: !!! Attention - Kafka Connect needs some time to set up the connector. + Kafka Connect needs some time to set up the connector. Moreover, Streams Explorer needs a while to scrape the information from Kafka connect. Therefore, it might take a bit until you see the whole graph. @@ -146,11 +147,7 @@ helm --namespace kpops uninstall postgresql 2. Remove the pipeline ```shell - poetry run kpops clean ./examples/bakdata/atm-fraud-detection/pipeline.yaml \ - --pipeline-base-dir ./examples \ - --config ./examples/bakdata/atm-fraud-detection/config.yaml \ - --verbose \ - --execute + kpops clean atm-fraud/pipeline.yaml --verbose --execute ``` !!! Note @@ -166,12 +163,12 @@ helm --namespace kpops uninstall postgresql - `deploy` fails: 1. Read the error message. - 2. Try to correct the mistakes if there were any. Likely the configuration is not correct or the port-forwarding is not working as intended. + 2. Try to correct the mistakes if there were any. Likely the configuration is incorrect, or the port-forwarding is not working as intended. 3. Run `clean`. 4. Run `deploy --dry-run` to avoid havig to `clean` again. If an error is dropped, start over from step 1. 5. If the dry-run is succesful, run `deploy`. - `clean` fails: 1. Read the error message. - 2. Try to correct the indicated mistakes if there were any. Likely the configuration is not correct or the port-forwarding is not working as intended. + 2. Try to correct the indicated mistakes if there were any. Likely the configuration is incorrect, or the port-forwarding is not working as intended. 3. Run `clean`. 4. If `clean` fails, follow the steps in [teardown](../getting-started/teardown.md). diff --git a/docs/docs/user/getting-started/quick-start.md b/docs/docs/user/getting-started/quick-start.md index e5e6fd4fe..ab3437ad1 100644 --- a/docs/docs/user/getting-started/quick-start.md +++ b/docs/docs/user/getting-started/quick-start.md @@ -83,9 +83,7 @@ kubectl port-forward --namespace kpops service/k8kafka-cp-kafka-connect 8083:808 4. Deploy the pipeline ```shell - kpops deploy word-count/pipeline.yaml \ - --defaults word-count/defaults.yaml \ - --execute + kpops deploy word-count/pipeline.yaml --execute ``` !!! Note @@ -145,10 +143,7 @@ helm --namespace kpops uninstall redis 2. Remove the pipeline ```shell - kpops clean word-count/pipeline.yaml \ - --defaults word-count/defaults.yaml \ - --verbose \ - --execute + kpops clean word-count/pipeline.yaml --verbose --execute ``` !!! Note diff --git a/examples b/examples new file mode 160000 index 000000000..f7613426d --- /dev/null +++ b/examples @@ -0,0 +1 @@ +Subproject commit f7613426dffe5a1d6332c9e3cc7f0bfb23396e68 diff --git a/examples/bakdata/atm-fraud-detection/config.yaml b/examples/bakdata/atm-fraud-detection/config.yaml deleted file mode 100644 index c20493eb7..000000000 --- a/examples/bakdata/atm-fraud-detection/config.yaml +++ /dev/null @@ -1,19 +0,0 @@ -topic_name_config: - default_error_topic_name: "${pipeline.name}-${component.name}-dead-letter-topic" - default_output_topic_name: "${pipeline.name}-${component.name}-topic" - -kafka_brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" - -schema_registry: - enabled: true - url: "http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081" - -kafka_rest: - url: "http://localhost:8082" - -kafka_connect: - url: "http://localhost:8083" - -defaults_path: . - -pipeline_base_dir: examples diff --git a/examples/bakdata/atm-fraud-detection/defaults.yaml b/examples/bakdata/atm-fraud-detection/defaults.yaml deleted file mode 100644 index 2183f91d6..000000000 --- a/examples/bakdata/atm-fraud-detection/defaults.yaml +++ /dev/null @@ -1,32 +0,0 @@ -pipeline-component: - prefix: "" - -kubernetes-app: - namespace: ${NAMESPACE} - -kafka-app: - app: - streams: - brokers: ${config.kafka_brokers} - schemaRegistryUrl: ${config.schema_registry.url} - optimizeLeaveGroupBehavior: false - -producer-app: - to: - topics: - ${output_topic_name}: - partitions_count: 3 - -streams-app: - app: - labels: - pipeline: ${pipeline.name} - streams: - optimizeLeaveGroupBehavior: false - to: - topics: - ${error_topic_name}: - type: error - partitions_count: 1 - ${output_topic_name}: - partitions_count: 3 diff --git a/examples/bakdata/atm-fraud-detection/pipeline.yaml b/examples/bakdata/atm-fraud-detection/pipeline.yaml deleted file mode 100644 index d166a21f4..000000000 --- a/examples/bakdata/atm-fraud-detection/pipeline.yaml +++ /dev/null @@ -1,104 +0,0 @@ -- type: producer-app - name: account-producer - app: - replicaCount: 1 - image: ${DOCKER_REGISTRY}/atm-demo-accountproducer - imageTag: 1.0.0 - schedule: 0 12 * * * - suspend: true - prometheus: - jmx: - enabled: false - debug: true - -- type: producer-app - name: transaction-avro-producer - app: - replicaCount: 1 - image: ${DOCKER_REGISTRY}/atm-demo-transactionavroproducer - imageTag: 1.0.0 - commandLine: - REAL_TX: 19 - ITERATION: 20 - schedule: 0 12 * * * - suspend: true - prometheus: - jmx: - enabled: false - debug: true - -- type: streams-app - name: transaction-joiner - app: - replicaCount: 1 - image: ${DOCKER_REGISTRY}/atm-demo-transactionjoiner - imageTag: 1.0.0 - commandLine: - PRODUCTIVE: false - annotations: - consumerGroup: atm-transactionjoiner-atm-fraud-joinedtransactions-topic - prometheus: - jmx: - enabled: false - debug: true - -- type: streams-app - name: fraud-detector - app: - replicaCount: 1 - image: ${DOCKER_REGISTRY}/atm-demo-frauddetector - imageTag: 1.0.0 - commandLine: - PRODUCTIVE: false - annotations: - consumerGroup: atm-frauddetector-atm-fraud-possiblefraudtransactions-topic - prometheus: - jmx: - enabled: false - debug: true - -- type: streams-app - name: account-linker - from: - components: - fraud-detector: - type: input - account-producer: - role: accounts - app: - replicaCount: 1 - image: ${DOCKER_REGISTRY}/atm-demo-accountlinker - imageTag: 1.0.0 - commandLine: - PRODUCTIVE: false - annotations: - consumerGroup: atm-accountlinker-atm-fraud-output-topic - prometheus: - jmx: - enabled: false - debug: true - -- type: kafka-sink-connector - name: postgresql-connector - app: - connector.class: io.confluent.connect.jdbc.JdbcSinkConnector - tasks.max: 1 - topics: ${pipeline.name}-account-linker-topic - connection.url: jdbc:postgresql://postgresql-dev.${NAMESPACE}.svc.cluster.local:5432/app_db - connection.user: app1 - connection.password: AppPassword - connection.ds.pool.size: 5 - insert.mode: insert - insert.mode.databaselevel: true - value.converter: io.confluent.connect.avro.AvroConverter - value.converter.schema.registry.url: http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081 - key.converter: org.apache.kafka.connect.storage.StringConverter - transforms: flatten - transforms.flatten.type: org.apache.kafka.connect.transforms.Flatten$Value - auto.create: true - table.name.format: fraud_transactions - errors.deadletterqueue.context.headers.enable: true - errors.deadletterqueue.topic.name: postgres-request-sink-dead-letters - errors.deadletterqueue.topic.replication.factor: 1 - errors.tolerance: all - pk.mode: record_value diff --git a/tests/pipeline/snapshots/snap_test_example.py b/tests/pipeline/snapshots/snap_test_example.py index 77ba66496..187c2fa59 100644 --- a/tests/pipeline/snapshots/snap_test_example.py +++ b/tests/pipeline/snapshots/snap_test_example.py @@ -10,9 +10,11 @@ snapshots['TestExample.test_atm_fraud atm-fraud-pipeline'] = [ { 'app': { - 'debug': True, 'image': '${DOCKER_REGISTRY}/atm-demo-accountproducer', 'imageTag': '1.0.0', + 'labels': { + 'pipeline': 'atm-fraud' + }, 'prometheus': { 'jmx': { 'enabled': False @@ -25,14 +27,15 @@ 'extraOutputTopics': { }, 'optimizeLeaveGroupBehavior': False, - 'outputTopic': 'bakdata-atm-fraud-detection-account-producer-topic', + 'outputTopic': 'atm-fraud-account-producer-topic', 'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/' }, 'suspend': True }, + 'debug': True, 'name': 'account-producer', 'namespace': '${NAMESPACE}', - 'prefix': '', + 'prefix': 'atm-fraud-', 'repo_config': { 'repo_auth_flags': { 'insecure_skip_tls_verify': False @@ -44,7 +47,7 @@ 'models': { }, 'topics': { - 'bakdata-atm-fraud-detection-account-producer-topic': { + 'atm-fraud-account-producer-topic': { 'configs': { }, 'partitions_count': 3 @@ -60,9 +63,11 @@ 'ITERATION': 20, 'REAL_TX': 19 }, - 'debug': True, 'image': '${DOCKER_REGISTRY}/atm-demo-transactionavroproducer', 'imageTag': '1.0.0', + 'labels': { + 'pipeline': 'atm-fraud' + }, 'prometheus': { 'jmx': { 'enabled': False @@ -75,14 +80,15 @@ 'extraOutputTopics': { }, 'optimizeLeaveGroupBehavior': False, - 'outputTopic': 'bakdata-atm-fraud-detection-transaction-avro-producer-topic', + 'outputTopic': 'atm-fraud-transaction-avro-producer-topic', 'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/' }, 'suspend': True }, + 'debug': True, 'name': 'transaction-avro-producer', 'namespace': '${NAMESPACE}', - 'prefix': '', + 'prefix': 'atm-fraud-', 'repo_config': { 'repo_auth_flags': { 'insecure_skip_tls_verify': False @@ -94,7 +100,7 @@ 'models': { }, 'topics': { - 'bakdata-atm-fraud-detection-transaction-avro-producer-topic': { + 'atm-fraud-transaction-avro-producer-topic': { 'configs': { }, 'partitions_count': 3 @@ -112,11 +118,10 @@ 'commandLine': { 'PRODUCTIVE': False }, - 'debug': True, 'image': '${DOCKER_REGISTRY}/atm-demo-transactionjoiner', 'imageTag': '1.0.0', 'labels': { - 'pipeline': 'bakdata-atm-fraud-detection' + 'pipeline': 'atm-fraud' }, 'prometheus': { 'jmx': { @@ -126,18 +131,19 @@ 'replicaCount': 1, 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', - 'errorTopic': 'bakdata-atm-fraud-detection-transaction-joiner-dead-letter-topic', + 'errorTopic': 'atm-fraud-transaction-joiner-dead-letter-topic', 'inputTopics': [ - 'bakdata-atm-fraud-detection-transaction-avro-producer-topic' + 'atm-fraud-transaction-avro-producer-topic' ], 'optimizeLeaveGroupBehavior': False, - 'outputTopic': 'bakdata-atm-fraud-detection-transaction-joiner-topic', + 'outputTopic': 'atm-fraud-transaction-joiner-topic', 'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/' } }, + 'debug': True, 'name': 'transaction-joiner', 'namespace': '${NAMESPACE}', - 'prefix': '', + 'prefix': 'atm-fraud-', 'repo_config': { 'repo_auth_flags': { 'insecure_skip_tls_verify': False @@ -149,13 +155,13 @@ 'models': { }, 'topics': { - 'bakdata-atm-fraud-detection-transaction-joiner-dead-letter-topic': { + 'atm-fraud-transaction-joiner-dead-letter-topic': { 'configs': { }, 'partitions_count': 1, 'type': 'error' }, - 'bakdata-atm-fraud-detection-transaction-joiner-topic': { + 'atm-fraud-transaction-joiner-topic': { 'configs': { }, 'partitions_count': 3 @@ -173,11 +179,10 @@ 'commandLine': { 'PRODUCTIVE': False }, - 'debug': True, 'image': '${DOCKER_REGISTRY}/atm-demo-frauddetector', 'imageTag': '1.0.0', 'labels': { - 'pipeline': 'bakdata-atm-fraud-detection' + 'pipeline': 'atm-fraud' }, 'prometheus': { 'jmx': { @@ -187,18 +192,19 @@ 'replicaCount': 1, 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', - 'errorTopic': 'bakdata-atm-fraud-detection-fraud-detector-dead-letter-topic', + 'errorTopic': 'atm-fraud-fraud-detector-dead-letter-topic', 'inputTopics': [ - 'bakdata-atm-fraud-detection-transaction-joiner-topic' + 'atm-fraud-transaction-joiner-topic' ], 'optimizeLeaveGroupBehavior': False, - 'outputTopic': 'bakdata-atm-fraud-detection-fraud-detector-topic', + 'outputTopic': 'atm-fraud-fraud-detector-topic', 'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/' } }, + 'debug': True, 'name': 'fraud-detector', 'namespace': '${NAMESPACE}', - 'prefix': '', + 'prefix': 'atm-fraud-', 'repo_config': { 'repo_auth_flags': { 'insecure_skip_tls_verify': False @@ -210,13 +216,13 @@ 'models': { }, 'topics': { - 'bakdata-atm-fraud-detection-fraud-detector-dead-letter-topic': { + 'atm-fraud-fraud-detector-dead-letter-topic': { 'configs': { }, 'partitions_count': 1, 'type': 'error' }, - 'bakdata-atm-fraud-detection-fraud-detector-topic': { + 'atm-fraud-fraud-detector-topic': { 'configs': { }, 'partitions_count': 3 @@ -234,11 +240,10 @@ 'commandLine': { 'PRODUCTIVE': False }, - 'debug': True, 'image': '${DOCKER_REGISTRY}/atm-demo-accountlinker', 'imageTag': '1.0.0', 'labels': { - 'pipeline': 'bakdata-atm-fraud-detection' + 'pipeline': 'atm-fraud' }, 'prometheus': { 'jmx': { @@ -248,20 +253,21 @@ 'replicaCount': 1, 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', - 'errorTopic': 'bakdata-atm-fraud-detection-account-linker-dead-letter-topic', + 'errorTopic': 'atm-fraud-account-linker-dead-letter-topic', 'extraInputTopics': { 'accounts': [ - 'bakdata-atm-fraud-detection-account-producer-topic' + 'atm-fraud-account-producer-topic' ] }, 'inputTopics': [ - 'bakdata-atm-fraud-detection-fraud-detector-topic' + 'atm-fraud-fraud-detector-topic' ], 'optimizeLeaveGroupBehavior': False, - 'outputTopic': 'bakdata-atm-fraud-detection-account-linker-topic', + 'outputTopic': 'atm-fraud-account-linker-topic', 'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/' } }, + 'debug': True, 'from': { 'components': { 'account-producer': { @@ -276,7 +282,7 @@ }, 'name': 'account-linker', 'namespace': '${NAMESPACE}', - 'prefix': '', + 'prefix': 'atm-fraud-', 'repo_config': { 'repo_auth_flags': { 'insecure_skip_tls_verify': False @@ -288,13 +294,13 @@ 'models': { }, 'topics': { - 'bakdata-atm-fraud-detection-account-linker-dead-letter-topic': { + 'atm-fraud-account-linker-dead-letter-topic': { 'configs': { }, 'partitions_count': 1, 'type': 'error' }, - 'bakdata-atm-fraud-detection-account-linker-topic': { + 'atm-fraud-account-linker-topic': { 'configs': { }, 'partitions_count': 3 @@ -309,13 +315,13 @@ 'app': { 'config': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', - 'connector': 'postgresql-connector' + 'connector': 'atm-fraud-postgresql-connector' }, 'connectorType': 'sink' }, 'name': 'postgresql-connector', 'namespace': '${NAMESPACE}', - 'prefix': '', + 'prefix': 'atm-fraud-', 'repo_config': { 'repo_auth_flags': { 'insecure_skip_tls_verify': False @@ -341,18 +347,167 @@ 'insert.mode': 'insert', 'insert.mode.databaselevel': True, 'key.converter': 'org.apache.kafka.connect.storage.StringConverter', - 'name': 'postgresql-connector', + 'name': 'atm-fraud-postgresql-connector', 'pk.mode': 'record_value', 'table.name.format': 'fraud_transactions', 'tasks.max': 1, - 'topics': 'bakdata-atm-fraud-detection-account-linker-topic', + 'topics': 'atm-fraud-account-linker-topic', 'transforms': 'flatten', 'transforms.flatten.type': 'org.apache.kafka.connect.transforms.Flatten$Value', 'value.converter': 'io.confluent.connect.avro.AvroConverter', 'value.converter.schema.registry.url': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081' }, 'name': 'postgresql-connector', - 'prefix': '', + 'prefix': 'atm-fraud-', + 'resetter_values': { + }, + 'type': 'kafka-sink-connector' + } +] + +snapshots['TestExample.test_word_count word-count-pipeline'] = [ + { + 'app': { + 'image': 'bakdata/kpops-demo-sentence-producer', + 'imageTag': '1.0.0', + 'labels': { + 'pipeline': 'word-count' + }, + 'prometheus': { + 'jmx': { + 'enabled': False + } + }, + 'replicaCount': 1, + 'streams': { + 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', + 'extraOutputTopics': { + }, + 'optimizeLeaveGroupBehavior': False, + 'outputTopic': 'word-count-data-producer-topic', + 'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/' + } + }, + 'debug': True, + 'name': 'data-producer', + 'namespace': '${NAMESPACE}', + 'prefix': 'word-count-', + 'repo_config': { + 'repo_auth_flags': { + 'insecure_skip_tls_verify': False + }, + 'repository_name': 'bakdata-streams-bootstrap', + 'url': 'https://bakdata.github.io/streams-bootstrap/' + }, + 'to': { + 'models': { + }, + 'topics': { + 'word-count-data-producer-topic': { + 'configs': { + }, + 'partitions_count': 3 + } + } + }, + 'type': 'producer-app', + 'version': '2.9.0' + }, + { + 'app': { + 'commandLine': { + 'PRODUCTIVE': False + }, + 'image': 'bakdata/kpops-demo-word-count-app', + 'imageTag': '1.0.0', + 'labels': { + 'pipeline': 'word-count' + }, + 'prometheus': { + 'jmx': { + 'enabled': False + } + }, + 'replicaCount': 1, + 'streams': { + 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', + 'errorTopic': 'word-count-word-counter-dead-letter-topic', + 'inputTopics': [ + 'word-count-data-producer-topic' + ], + 'optimizeLeaveGroupBehavior': False, + 'outputTopic': 'word-count-word-counter-topic', + 'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081/' + } + }, + 'debug': True, + 'name': 'word-counter', + 'namespace': '${NAMESPACE}', + 'prefix': 'word-count-', + 'repo_config': { + 'repo_auth_flags': { + 'insecure_skip_tls_verify': False + }, + 'repository_name': 'bakdata-streams-bootstrap', + 'url': 'https://bakdata.github.io/streams-bootstrap/' + }, + 'to': { + 'models': { + }, + 'topics': { + 'word-count-word-counter-dead-letter-topic': { + 'configs': { + }, + 'partitions_count': 1, + 'type': 'error' + }, + 'word-count-word-counter-topic': { + 'configs': { + 'cleanup.policy': 'compact' + }, + 'partitions_count': 3, + 'type': 'output' + } + } + }, + 'type': 'streams-app', + 'version': '2.9.0' + }, + { + '_resetter': { + 'app': { + 'config': { + 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', + 'connector': 'word-count-redis-sink-connector' + }, + 'connectorType': 'sink' + }, + 'name': 'redis-sink-connector', + 'namespace': '${NAMESPACE}', + 'prefix': 'word-count-', + 'repo_config': { + 'repo_auth_flags': { + 'insecure_skip_tls_verify': False + }, + 'repository_name': 'bakdata-kafka-connect-resetter', + 'url': 'https://bakdata.github.io/kafka-connect-resetter/' + }, + 'suffix': '-clean', + 'type': 'kafka-connector-resetter', + 'version': '1.0.4' + }, + 'app': { + 'connector.class': 'com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector', + 'key.converter': 'org.apache.kafka.connect.storage.StringConverter', + 'name': 'word-count-redis-sink-connector', + 'redis.database': 0, + 'redis.hosts': 'redis-headless:6379', + 'tasks.max': 1, + 'topics': 'word-count-word-counter-topic', + 'value.converter': 'org.apache.kafka.connect.storage.StringConverter' + }, + 'name': 'redis-sink-connector', + 'prefix': 'word-count-', 'resetter_values': { }, 'type': 'kafka-sink-connector' diff --git a/tests/pipeline/test_example.py b/tests/pipeline/test_example.py index 5d6d587d0..2cb7e2553 100644 --- a/tests/pipeline/test_example.py +++ b/tests/pipeline/test_example.py @@ -1,3 +1,6 @@ +import os +from pathlib import Path + import pytest import yaml from snapshottest.module import SnapshotTest @@ -7,17 +10,42 @@ runner = CliRunner() +EXAMPLES_PATH = Path("examples").absolute() + -@pytest.mark.usefixtures("mock_env") +@pytest.mark.usefixtures("mock_env", "load_yaml_file_clear_cache") class TestExample: + @pytest.fixture(scope="class", autouse=True) + def cd(self): + cwd = Path.cwd().absolute() + os.chdir(EXAMPLES_PATH) + yield + os.chdir(cwd) + + def test_cwd(self): + assert Path.cwd() == EXAMPLES_PATH + + def test_word_count(self, snapshot: SnapshotTest): + result = runner.invoke( + app, + [ + "generate", + "word-count/pipeline.yaml", + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0, result.stdout + + enriched_pipeline: dict = yaml.safe_load(result.stdout) + snapshot.assert_match(enriched_pipeline, "word-count-pipeline") + def test_atm_fraud(self, snapshot: SnapshotTest): result = runner.invoke( app, [ "generate", - "./examples/bakdata/atm-fraud-detection/pipeline.yaml", - "--config", - "./examples/bakdata/atm-fraud-detection", + "atm-fraud/pipeline.yaml", ], catch_exceptions=False, )