Skip to content

Commit

Permalink
Added kafka splitting setup
Browse files Browse the repository at this point in the history
  • Loading branch information
Razz4780 committed Oct 11, 2024
1 parent 2b07413 commit 83f3963
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 3 deletions.
2 changes: 1 addition & 1 deletion mirrord-operator/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 1.9.2
version: 1.9.3

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
Expand Down
32 changes: 30 additions & 2 deletions mirrord-operator/templates/cluster-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ rules:
- get
- list
- watch
{{- if .Values.operator.sqsSplitting }}
# For patching target workloads to use different queue.
{{- if or .Values.operator.sqsSplitting .Values.operator.kafkaSplitting }}
# For patching target workloads to use different queue/topic.
- apiGroups:
- apps
resources:
Expand Down Expand Up @@ -111,6 +111,34 @@ rules:
verbs:
- update
{{- end }}
{{- if .Values.operator.kafkaSplitting }}
- apiGroups:
- queues.mirrord.metalbear.co
resources:
- mirrordkafkaephemeraltopics
verbs:
- get
- list
- watch
- create
- delete
- apiGroups:
- queues.mirrord.metalbear.co
resources:
- mirrordkafkaclientconfigs
verbs:
- get
- list
- watch
- apiGroups:
- queues.mirrord.metalbear.co
resources:
- mirrordkafkatopicsconsumers
verbs:
- get
- list
- watch
{{- end }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
Expand Down
273 changes: 273 additions & 0 deletions mirrord-operator/templates/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -438,3 +438,276 @@ spec:
subresources:
status: {}
{{ end }}
{{ if .Values.operator.kafkaSplitting}}
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: mirrordkafkaclientconfigs.queues.mirrord.metalbear.co
spec:
group: queues.mirrord.metalbear.co
names:
categories: []
kind: MirrordKafkaClientConfig
plural: mirrordkafkaclientconfigs
shortNames: []
singular: mirrordkafkaclientconfig
scope: Namespaced
versions:
- additionalPrinterColumns:
- description: Name of parent configuration.
jsonPath: .spec.parent
name: PARENT
type: string
name: v1alpha
schema:
openAPIV3Schema:
description: Auto-generated derived type for MirrordKafkaClientConfigSpec via `CustomResource`
properties:
spec:
description: Configuration to use when creating operator's Kafka client. Resources of this kind should live in the operator's namespace.
properties:
parent:
description: Name of parent resource to use as base when resolving final configuration.
nullable: true
type: string
properties:
description: |-
Properties to set.
When performing Kafka splitting, the operator will override `group.id` property.
The list of all available properties can be found [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
items:
description: Property to use when creating operator's Kafka client.
properties:
name:
description: Name of the property, e.g `bootstrap.servers`.
type: string
value:
description: Value for the property, e.g `kafka.default.svc.cluster.local:9092`. `null` clears the property from parent resource when resolving the final configuration.
nullable: true
type: string
required:
- name
type: object
type: array
required:
- properties
type: object
required:
- spec
title: MirrordKafkaClientConfig
type: object
served: true
storage: true
subresources: {}
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: mirrordkafkaephemeraltopics.queues.mirrord.metalbear.co
spec:
group: queues.mirrord.metalbear.co
names:
categories: []
kind: MirrordKafkaEphemeralTopic
plural: mirrordkafkaephemeraltopics
shortNames: []
singular: mirrordkafkaephemeraltopic
scope: Namespaced
versions:
- additionalPrinterColumns:
- description: Name of the topic.
jsonPath: .spec.name
name: NAME
type: string
- description: Name of MirrordKafkaClientProperties to use when creating Kafka client.
jsonPath: .spec.clientConfig
name: CLIENT-CONFIG
type: string
name: v1alpha
schema:
openAPIV3Schema:
description: Auto-generated derived type for MirrordKafkaEphemeralTopicSpec via `CustomResource`
properties:
spec:
description: |-
Ephemeral topic created in your Kafka cluster for the purpose of running a Kafka splitting session.
Resources of this kind should live in the operator's namespace. They will be used to clean up topics that are no longer used.
properties:
clientConfig:
description: Links to [`MirrordKafkaClientConfigSpec`] resource living in the same namespace.
type: string
name:
description: Name of the topic.
type: string
required:
- clientConfig
- name
type: object
required:
- spec
title: MirrordKafkaEphemeralTopic
type: object
served: true
storage: true
subresources: {}
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: mirrordkafkatopicsconsumers.queues.mirrord.metalbear.co
spec:
group: queues.mirrord.metalbear.co
names:
categories: []
kind: MirrordKafkaTopicsConsumer
plural: mirrordkafkatopicsconsumers
shortNames: []
singular: mirrordkafkatopicsconsumer
scope: Namespaced
versions:
- additionalPrinterColumns:
- description: Name of the topic consumer workload.
jsonPath: .spec.consumerName
name: CONSUMER-NAME
type: string
- description: Kind of the topic consumer workload.
jsonPath: .spec.consumerKind
name: CONSUMER-KIND
type: string
- description: Api version of the topic consumer workload.
jsonPath: .spec.consumerApiVersion
name: CONSUMER-API-VERSION
type: string
- description: Timeout for consumer workload restart.
jsonPath: .spec.consumerRestartTimeout
name: CONSUMER-RESTART-TIMEOUT
type: string
name: v1alpha
schema:
openAPIV3Schema:
description: Auto-generated derived type for MirrordKafkaTopicsConsumerSpec via `CustomResource`
properties:
spec:
description: |-
Defines splittable Kafka topics consumed by some workload living in the same namespace.
# Concurrent splitting
Concurrent Kafka splitting sessions are allowed, as long as they use the same topic id or their topics' `nameSources` do not overlap.
# Example
```yaml apiVersion: queues.mirrord.metalbear.co/v1alpha kind: MirrordKafkaTopicsConsumer metadata: name: example namespace: default spec: consumerName: example-deployment consumerApiVersion: apps/v1 consumerKind: Deployment topics: - id: example-topic nameSources: - directEnvVar: container: example-container name: KAFKA_TOPIC_NAME groupIdSources: - directEnvVar: container: example-container name: KAFKA_GROUP_ID clientConfig: example-config ```
1. Creating the resource below will enable Kafka splitting on a deployment `example-deployment` living in namespace `default`. Id `example-topic` can be then used in the mirrord config to split the topic for the duration of the mirrord session.
2. Topic name will be resolved based on `example-deployment`'s pod template by extracting value of variable `KAFKA_TOPIC_NAME` defined directly in `example-container`.
3. Consumer group id used by the mirrord operator will be resolved based on `example-deployment`'s pod template by extracting value of variable `KAFKA_GROUP_ID` defined directly in `example-container`.
4. For the duration of the session, `example-deployment` will be patched - the mirrord operator will substitute topic name in `KAFKA_TOPIC_NAME` variable with a name of an ephemeral Kafka topic.
5. Local application will see a different value of the `KAFKA_TOPIC_NAME` - it will be a name of another ephemeral Kafka topic.
6. `MirrordKafkaClientConfig` named `example-config` living in mirrord operator's namespace will be used to manage ephemeral Kafka topics and consume/produce messages.
properties:
consumerApiVersion:
description: Workload api version, for example `apps/v1`.
type: string
consumerKind:
description: Workload kind, for example `Deployment`.
type: string
consumerName:
description: Workload name, for example `my-deployment`.
type: string
consumerRestartTimeout:
description: |-
Timeout for waiting until workload patch takes effect, that is at least one pod reads from the ephemeral topic.
Specified in seconds. Defaults to 60s.
format: uint32
minimum: 0.0
nullable: true
type: integer
topics:
description: List of consumed splittable topics.
items:
description: Splittable Kafka topic consumed by some remote target.
properties:
clientConfig:
description: Links to [`MirrordKafkaClientConfig`] in the operator's namespace. This config will be used to manage ephemeral Kafka topics and consume/produce messages.
type: string
groupIdSources:
description: All occurrences of this topic's group id in the workload's pod template.
items:
description: Source of some topic property required for Kafka splitting.
oneOf:
- required:
- directEnvVar
properties:
directEnvVar:
description: Environment variable with value defined directly in the pod template.
properties:
container:
description: Name of the container.
type: string
variable:
description: Name of the variable.
type: string
required:
- container
- variable
type: object
type: object
type: array
id:
description: Id of this topic. Can be used in mirrord config to identify this topic.
type: string
nameSources:
description: All occurrences of this topic's name in the workload's pod template.
items:
description: Source of some topic property required for Kafka splitting.
oneOf:
- required:
- directEnvVar
properties:
directEnvVar:
description: Environment variable with value defined directly in the pod template.
properties:
container:
description: Name of the container.
type: string
variable:
description: Name of the variable.
type: string
required:
- container
- variable
type: object
type: object
type: array
required:
- clientConfig
- groupIdSources
- id
- nameSources
type: object
type: array
required:
- consumerApiVersion
- consumerKind
- consumerName
- topics
type: object
required:
- spec
title: MirrordKafkaTopicsConsumer
type: object
served: true
storage: true
subresources: {}
{{ end }}
2 changes: 2 additions & 0 deletions mirrord-operator/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ spec:
{{- end }}
- name: OPERATOR_SQS_SPLITTING
value: {{ .Values.operator.sqsSplitting | ternary "true" "false" | quote }}
- name: OPERATOR_KAFKA_SPLITTING
value: {{ .Values.operator.kafkaSplitting | ternary "true" "false" | quote }}
- name: OPERATOR_JSON_LOG
value: {{ .Values.operator.jsonLog | ternary "true" "false" | quote }}
- name: OPERATOR_AGENT_CONFIG
Expand Down
2 changes: 2 additions & 0 deletions mirrord-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ operator:
jsonLog: false
# Has to be set to `true` in order to use the SQS queue splitting feature.
sqsSplitting: false
# Has to be set to `false` in order to use the Kafka queue splitting feature.
kafkaSplitting: false
# imagePullSecrets:
# - name: value

Expand Down

0 comments on commit 83f3963

Please sign in to comment.