diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml
index 69f31776..2562217a 100644
--- a/.github/workflows/docker-image.yml
+++ b/.github/workflows/docker-image.yml
@@ -1,4 +1,4 @@
-name: Docker Image CI
+name: Druid Operator
on:
push:
@@ -18,5 +18,5 @@ jobs:
run: make lint
- name: Run helm template
run: make template
- - name: Build the Docker image
- run: docker build . -t druidio/druid-operator:$(date +%s)
+ - name: Run e2e tests
+ run: make e2e
\ No newline at end of file
diff --git a/LICENSE b/LICENSE
index c468cae4..797cc250 100644
--- a/LICENSE
+++ b/LICENSE
@@ -11,3 +11,17 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
+---
+Copyright 2023 Cloudnatively Services, Pvt Ltd
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
\ No newline at end of file
diff --git a/Makefile b/Makefile
index b5eb3445..28670857 100644
--- a/Makefile
+++ b/Makefile
@@ -1,7 +1,20 @@
+# IMG TAG
+IMG_TAG ?= "latest"
# Image URL to use all building/pushing image targets
-IMG ?= "druid-operator:latest"
+IMG ?= "druid-operator"
+# Local Image URL to be pushed to kind registery
+IMG_KIND ?= "localhost:5001/druid-operator"
+# NAMESPACE for druid operator e2e
+NAMESPACE_DRUID_OPERATOR ?= "druid-operator"
+# NAMESPACE for zk operator e2e
+NAMESPACE_ZK_OPERATOR ?= "zk-operator"
+# NAMESPACE for zk operator e2e
+NAMESPACE_MINIO_OPERATOR ?= "minio-operator"
+# NAMESPACE for druid app e2e
+NAMESPACE_DRUID ?= "druid"
+
# ENVTEST_K8S_VERSION refers to the version of kubebuilder assets to be downloaded by envtest binary.
-ENVTEST_K8S_VERSION = 1.24.2
+ENVTEST_K8S_VERSION = 1.25.0
# Get the currently used golang install path (in GOPATH/bin, unless GOBIN is set)
ifeq (,$(shell go env GOBIN))
@@ -77,11 +90,11 @@ template:
.PHONY: docker-build
docker-build: test ## Build docker image with the manager.
- docker build -t ${IMG} .
+ docker build -t ${IMG}:${IMG_TAG} .
.PHONY: docker-push
docker-push: ## Push docker image with the manager.
- docker push ${IMG}
+ docker push ${IMG}:${IMG_TAG}
##@ Deployment
@@ -99,7 +112,7 @@ uninstall: manifests kustomize ## Uninstall CRDs from the K8s cluster specified
.PHONY: deploy
deploy: manifests kustomize ## Deploy controller to the K8s cluster specified in ~/.kube/config.
- cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG}
+ cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG}:${IMG_TAG}
$(KUSTOMIZE) build config/default | kubectl apply -f -
.PHONY: undeploy
@@ -137,3 +150,50 @@ $(CONTROLLER_GEN): $(LOCALBIN)
envtest: $(ENVTEST) ## Download envtest-setup locally if necessary.
$(ENVTEST): $(LOCALBIN)
test -s $(LOCALBIN)/setup-envtest || GOBIN=$(LOCALBIN) go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest
+
+## e2e deployment
+.PHONY: e2e
+e2e:
+ e2e/e2e.sh
+
+## Build Kind
+.PHONY: kind
+kind: ## Bootstrap Kind Locally
+ sh e2e/kind.sh
+
+## Make Docker build for kind registery
+.PHONY: docker-build-local
+docker-build-local: ## Build docker image with the manager.
+ docker build -t ${IMG_KIND}:${IMG_TAG} .
+
+## Make Docker push locally to kind registery
+.PHONY: docker-push-local
+docker-push-local: ## Build docker image with the manager.
+ docker push ${IMG_KIND}:${IMG_TAG}
+
+## Helm install to deploy the druid operator
+.PHONY: helm-install-druid-operator
+helm-install-druid-operator: ## helm upgrade/install
+ helm upgrade --install \
+ --namespace ${NAMESPACE_DRUID_OPERATOR} \
+ --create-namespace \
+ ${NAMESPACE_DRUID_OPERATOR} chart/ \
+ --set image.repository=${IMG_KIND} \
+ --set image.tag=${IMG_TAG}
+
+## Helm deploy minio operator and minio
+.PHONY: helm-minio-install
+helm-minio-install:
+ helm repo add minio https://operator.min.io/
+ helm repo update minio
+ helm upgrade --install \
+ --namespace ${NAMESPACE_MINIO_OPERATOR} \
+ --create-namespace \
+ ${NAMESPACE_MINIO_OPERATOR} minio/operator \
+ -f e2e/configs/minio-operator-override.yaml
+ helm upgrade --install \
+ --namespace ${NAMESPACE_DRUID} \
+ --create-namespace \
+ ${NAMESPACE_DRUID}-minio minio/tenant \
+ -f e2e/configs/minio-tenant-override.yaml
+
diff --git a/README.md b/README.md
index f6bac157..92eac69d 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,5 @@
# Kubernetes Operator for Apache Druid
-![Build Status](https://github.com/druid-io/druid-operator/actions/workflows/docker-image.yml/badge.svg) ![Docker pull](https://img.shields.io/docker/pulls/druidio/druid-operator.svg) [![Latest Version](https://img.shields.io/github/tag/druid-io/druid-operator)](https://github.com/druid-io/druid-operator/releases)
+![Build Status](https://github.com/datainfrahq/druid-operator/actions/workflows/docker-image.yml/badge.svg) ![Docker pull](https://img.shields.io/docker/pulls/druidio/druid-operator.svg) [![Latest Version](https://img.shields.io/github/tag/druid-io/druid-operator)](https://github.com/druid-io/druid-operator/releases)
- druid-operator provisions and manages [Apache Druid](https://druid.apache.org/) cluster on kubernetes.
- druid-operator is designed to provision and manage [Apache Druid](https://druid.apache.org/) in distributed mode only.
@@ -14,11 +14,12 @@
- ```Druid``` CR belongs to api Group ```druid.apache.org``` and version ```v1alpha1```
### Notifications
-- Users may experience HPA issues with druid-operator with release 0.0.5, as described in the [issue]( https://github.com/druid-io/druid-operator/issues/160).
+- Users may experience HPA issues with druid-operator with release 0.0.5, as described in the [issue]( https://github.com/druid-io/druid-operator/issues/160).
- The latest release 0.0.6 has fixes for the above issue.
- The operator has moved from HPA apiVersion autoscaling/v2beta1 to autoscaling/v2beta2 API users will need to update there HPA Specs according v2beta2 api in order to work with the latest druid-operator release.
- Users may experience pvc deletion [issue](https://github.com/druid-io/druid-operator/issues/186) in release 0.0.6, this issue has been fixed in patch release 0.0.6.1.
- druid-operator has moved Ingress apiVersion networking/v1beta1 to networking/v1. Users will need to update there Ingress Spec in the druid CR according networking/v1 syntax. In case users are using schema validated CRD, the CRD will also be needed to be updated.
+- druid-operator has moved PodDisruptionBudget apiVersion policy/v1beta1 to policy/v1. Users will need to update there Kubernetes versions to 1.21+ to use druid-operator-0.0.10+.
### Note
ApacheĀ®, [Apache Druid, DruidĀ®](https://druid.apache.org/) are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. This project, druid-operator, is not an Apache Software Foundation project.
diff --git a/apis/druid/v1alpha1/druid_types.go b/apis/druid/v1alpha1/druid_types.go
index 6adc5e4e..9d39315b 100644
--- a/apis/druid/v1alpha1/druid_types.go
+++ b/apis/druid/v1alpha1/druid_types.go
@@ -7,7 +7,7 @@ import (
autoscalev2beta2 "k8s.io/api/autoscaling/v2beta2"
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
- "k8s.io/api/policy/v1beta1"
+ policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@@ -208,7 +208,7 @@ type DruidNodeSpec struct {
PodLabels map[string]string `json:"podLabels,omitempty"`
// Optional
- PodDisruptionBudgetSpec *v1beta1.PodDisruptionBudgetSpec `json:"podDisruptionBudgetSpec,omitempty"`
+ PodDisruptionBudgetSpec *policyv1.PodDisruptionBudgetSpec `json:"podDisruptionBudgetSpec,omitempty"`
// Required
RuntimeProperties string `json:"runtime.properties"`
diff --git a/apis/druid/v1alpha1/zz_generated.deepcopy.go b/apis/druid/v1alpha1/zz_generated.deepcopy.go
index 667ad696..d628c102 100644
--- a/apis/druid/v1alpha1/zz_generated.deepcopy.go
+++ b/apis/druid/v1alpha1/zz_generated.deepcopy.go
@@ -15,7 +15,7 @@ import (
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
- "k8s.io/api/policy/v1beta1"
+ policyv1 "k8s.io/api/policy/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)
@@ -223,7 +223,7 @@ func (in *DruidNodeSpec) DeepCopyInto(out *DruidNodeSpec) {
}
if in.PodDisruptionBudgetSpec != nil {
in, out := &in.PodDisruptionBudgetSpec, &out.PodDisruptionBudgetSpec
- *out = new(v1beta1.PodDisruptionBudgetSpec)
+ *out = new(policyv1.PodDisruptionBudgetSpec)
(*in).DeepCopyInto(*out)
}
if in.Services != nil {
diff --git a/controllers/druid/handler.go b/controllers/druid/handler.go
index 40d348b4..030a260d 100644
--- a/controllers/druid/handler.go
+++ b/controllers/druid/handler.go
@@ -17,6 +17,7 @@ import (
"github.com/druid-io/druid-operator/apis/druid/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
+ policyv1 "k8s.io/api/policy/v1"
"k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
@@ -83,7 +84,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
if _, err := sdkCreateOrUpdateAsNeeded(sdk,
func() (object, error) { return makeCommonConfigMap(m, ls) },
- func() object { return makeConfigMapEmptyObj() },
+ func() object { return &v1.ConfigMap{} },
alwaysTrueIsEqualsFn, noopUpdaterFn, m, configMapNames, emitEvents); err != nil {
return err
}
@@ -142,7 +143,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
if _, err := sdkCreateOrUpdateAsNeeded(sdk,
func() (object, error) { return nodeConfig, nil },
- func() object { return makeConfigMapEmptyObj() },
+ func() object { return &v1.ConfigMap{} },
alwaysTrueIsEqualsFn, noopUpdaterFn, m, configMapNames, emitEvents); err != nil {
return err
}
@@ -153,7 +154,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
for _, svc := range services {
if _, err := sdkCreateOrUpdateAsNeeded(sdk,
func() (object, error) { return makeService(&svc, &nodeSpec, m, lm, nodeSpecUniqueStr) },
- func() object { return makeServiceEmptyObj() }, alwaysTrueIsEqualsFn,
+ func() object { return &v1.Service{} }, alwaysTrueIsEqualsFn,
func(prev, curr object) { (curr.(*v1.Service)).Spec.ClusterIP = (prev.(*v1.Service)).Spec.ClusterIP },
m, serviceNames, emitEvents); err != nil {
return err
@@ -170,7 +171,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
func() (object, error) {
return makeDeployment(&nodeSpec, m, lm, nodeSpecUniqueStr, fmt.Sprintf("%s-%s", commonConfigSHA, nodeConfigSHA), firstServiceName)
},
- func() object { return makeDeploymentEmptyObj() },
+ func() object { return &appsv1.Deployment{} },
deploymentIsEquals, noopUpdaterFn, m, deploymentNames, emitEvents); err != nil {
return err
} else if m.Spec.RollingDeploy {
@@ -184,7 +185,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
// will be sequential.
if m.Generation > 1 {
// Check Deployment rolling update status, if in-progress then stop here
- done, err := isObjFullyDeployed(sdk, nodeSpec, nodeSpecUniqueStr, m, func() object { return makeDeploymentEmptyObj() }, emitEvents)
+ done, err := isObjFullyDeployed(sdk, nodeSpec, nodeSpecUniqueStr, m, func() object { return &appsv1.Deployment{} }, emitEvents)
if !done {
return err
}
@@ -209,7 +210,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
func() (object, error) {
return makeStatefulSet(&nodeSpec, m, lm, nodeSpecUniqueStr, fmt.Sprintf("%s-%s", commonConfigSHA, nodeConfigSHA), firstServiceName)
},
- func() object { return makeStatefulSetEmptyObj() },
+ func() object { return &appsv1.StatefulSet{} },
statefulSetIsEquals, noopUpdaterFn, m, statefulSetNames, emitEvents); err != nil {
return err
} else if m.Spec.RollingDeploy {
@@ -227,7 +228,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
// will be sequential.
if m.Generation > 1 {
//Check StatefulSet rolling update status, if in-progress then stop here
- done, err := isObjFullyDeployed(sdk, nodeSpec, nodeSpecUniqueStr, m, func() object { return makeStatefulSetEmptyObj() }, emitEvents)
+ done, err := isObjFullyDeployed(sdk, nodeSpec, nodeSpecUniqueStr, m, func() object { return &appsv1.StatefulSet{} }, emitEvents)
if !done {
return err
}
@@ -244,7 +245,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
func() (object, error) {
return makeIngress(&nodeSpec, m, ls, nodeSpecUniqueStr)
},
- func() object { return makeIngressEmptyObj() },
+ func() object { return &networkingv1.Ingress{} },
alwaysTrueIsEqualsFn, noopUpdaterFn, m, ingressNames, emitEvents); err != nil {
return err
}
@@ -254,7 +255,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
if nodeSpec.PodDisruptionBudgetSpec != nil {
if _, err := sdkCreateOrUpdateAsNeeded(sdk,
func() (object, error) { return makePodDisruptionBudget(&nodeSpec, m, lm, nodeSpecUniqueStr) },
- func() object { return makePodDisruptionBudgetEmptyObj() },
+ func() object { return &v1beta1.PodDisruptionBudget{} },
alwaysTrueIsEqualsFn, noopUpdaterFn, m, podDisruptionBudgetNames, emitEvents); err != nil {
return err
}
@@ -266,7 +267,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
func() (object, error) {
return makeHorizontalPodAutoscaler(&nodeSpec, m, ls, nodeSpecUniqueStr)
},
- func() object { return makeHorizontalPodAutoscalerEmptyObj() },
+ func() object { return &autoscalev2beta2.HorizontalPodAutoscaler{} },
alwaysTrueIsEqualsFn, noopUpdaterFn, m, hpaNames, emitEvents); err != nil {
return err
}
@@ -276,7 +277,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
for _, pvc := range nodeSpec.PersistentVolumeClaim {
if _, err := sdkCreateOrUpdateAsNeeded(sdk,
func() (object, error) { return makePersistentVolumeClaim(&pvc, &nodeSpec, m, lm, nodeSpecUniqueStr) },
- func() object { return makePersistentVolumeClaimEmptyObj() }, alwaysTrueIsEqualsFn,
+ func() object { return &v1.PersistentVolumeClaim{} }, alwaysTrueIsEqualsFn,
noopUpdaterFn,
m, pvcNames, emitEvents); err != nil {
return err
@@ -296,7 +297,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
updatedStatus := v1alpha1.DruidClusterStatus{}
updatedStatus.StatefulSets = deleteUnusedResources(sdk, m, statefulSetNames, ls,
- func() objectList { return makeStatefulSetListEmptyObj() },
+ func() objectList { return &appsv1.StatefulSetList{} },
func(listObj runtime.Object) []object {
items := listObj.(*appsv1.StatefulSetList).Items
result := make([]object, len(items))
@@ -308,7 +309,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
sort.Strings(updatedStatus.StatefulSets)
updatedStatus.Deployments = deleteUnusedResources(sdk, m, deploymentNames, ls,
- func() objectList { return makeDeloymentListEmptyObj() },
+ func() objectList { return &appsv1.DeploymentList{} },
func(listObj runtime.Object) []object {
items := listObj.(*appsv1.DeploymentList).Items
result := make([]object, len(items))
@@ -320,7 +321,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
sort.Strings(updatedStatus.Deployments)
updatedStatus.HPAutoScalers = deleteUnusedResources(sdk, m, hpaNames, ls,
- func() objectList { return makeHorizontalPodAutoscalerListEmptyObj() },
+ func() objectList { return &autoscalev2beta2.HorizontalPodAutoscalerList{} },
func(listObj runtime.Object) []object {
items := listObj.(*autoscalev2beta2.HorizontalPodAutoscalerList).Items
result := make([]object, len(items))
@@ -332,7 +333,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
sort.Strings(updatedStatus.HPAutoScalers)
updatedStatus.Ingress = deleteUnusedResources(sdk, m, ingressNames, ls,
- func() objectList { return makeIngressListEmptyObj() },
+ func() objectList { return &networkingv1.IngressList{} },
func(listObj runtime.Object) []object {
items := listObj.(*networkingv1.IngressList).Items
result := make([]object, len(items))
@@ -344,9 +345,9 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
sort.Strings(updatedStatus.Ingress)
updatedStatus.PodDisruptionBudgets = deleteUnusedResources(sdk, m, podDisruptionBudgetNames, ls,
- func() objectList { return makePodDisruptionBudgetListEmptyObj() },
+ func() objectList { return &v1beta1.PodDisruptionBudgetList{} },
func(listObj runtime.Object) []object {
- items := listObj.(*v1beta1.PodDisruptionBudgetList).Items
+ items := listObj.(*policyv1.PodDisruptionBudgetList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
result[i] = &items[i]
@@ -356,7 +357,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
sort.Strings(updatedStatus.PodDisruptionBudgets)
updatedStatus.Services = deleteUnusedResources(sdk, m, serviceNames, ls,
- func() objectList { return makeServiceListEmptyObj() },
+ func() objectList { return &v1.ServiceList{} },
func(listObj runtime.Object) []object {
items := listObj.(*v1.ServiceList).Items
result := make([]object, len(items))
@@ -368,7 +369,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
sort.Strings(updatedStatus.Services)
updatedStatus.ConfigMaps = deleteUnusedResources(sdk, m, configMapNames, ls,
- func() objectList { return makeConfigMapListEmptyObj() },
+ func() objectList { return &v1.ConfigMapList{} },
func(listObj runtime.Object) []object {
items := listObj.(*v1.ConfigMapList).Items
result := make([]object, len(items))
@@ -379,7 +380,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
}, emitEvents)
sort.Strings(updatedStatus.ConfigMaps)
- podList, _ := readers.List(context.TODO(), sdk, m, makeLabelsForDruid(m.Name), emitEvents, func() objectList { return makePodList() }, func(listObj runtime.Object) []object {
+ podList, _ := readers.List(context.TODO(), sdk, m, makeLabelsForDruid(m.Name), emitEvents, func() objectList { return &v1.PodList{} }, func(listObj runtime.Object) []object {
items := listObj.(*v1.PodList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
@@ -433,7 +434,7 @@ func deleteSTSAndPVC(sdk client.Client, drd *v1alpha1.Druid, stsList, pvcList []
}
func checkIfCRExists(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmitter) bool {
- _, err := readers.Get(context.TODO(), sdk, m.Name, m, func() object { return makeDruidEmptyObj() }, emitEvents)
+ _, err := readers.Get(context.TODO(), sdk, m.Name, m, func() object { return &v1alpha1.Druid{} }, emitEvents)
if err != nil {
return false
} else {
@@ -443,7 +444,7 @@ func checkIfCRExists(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmitt
func deleteOrphanPVC(sdk client.Client, drd *v1alpha1.Druid, emitEvents EventEmitter) error {
- podList, err := readers.List(context.TODO(), sdk, drd, makeLabelsForDruid(drd.Name), emitEvents, func() objectList { return makePodList() }, func(listObj runtime.Object) []object {
+ podList, err := readers.List(context.TODO(), sdk, drd, makeLabelsForDruid(drd.Name), emitEvents, func() objectList { return &v1.PodList{} }, func(listObj runtime.Object) []object {
items := listObj.(*v1.PodList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
@@ -459,7 +460,7 @@ func deleteOrphanPVC(sdk client.Client, drd *v1alpha1.Druid, emitEvents EventEmi
"druid_cr": drd.Name,
}
- pvcList, err := readers.List(context.TODO(), sdk, drd, pvcLabels, emitEvents, func() objectList { return makePersistentVolumeClaimListEmptyObj() }, func(listObj runtime.Object) []object {
+ pvcList, err := readers.List(context.TODO(), sdk, drd, pvcLabels, emitEvents, func() objectList { return &v1.PersistentVolumeClaimList{} }, func(listObj runtime.Object) []object {
items := listObj.(*v1.PersistentVolumeClaimList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
@@ -521,7 +522,7 @@ func executeFinalizers(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmi
"druid_cr": m.Name,
}
- pvcList, err := readers.List(context.TODO(), sdk, m, pvcLabels, emitEvents, func() objectList { return makePersistentVolumeClaimListEmptyObj() }, func(listObj runtime.Object) []object {
+ pvcList, err := readers.List(context.TODO(), sdk, m, pvcLabels, emitEvents, func() objectList { return &v1.PersistentVolumeClaimList{} }, func(listObj runtime.Object) []object {
items := listObj.(*v1.PersistentVolumeClaimList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
@@ -533,7 +534,7 @@ func executeFinalizers(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmi
return err
}
- stsList, err := readers.List(context.TODO(), sdk, m, makeLabelsForDruid(m.Name), emitEvents, func() objectList { return makeStatefulSetListEmptyObj() }, func(listObj runtime.Object) []object {
+ stsList, err := readers.List(context.TODO(), sdk, m, makeLabelsForDruid(m.Name), emitEvents, func() objectList { return &appsv1.StatefulSetList{} }, func(listObj runtime.Object) []object {
items := listObj.(*appsv1.StatefulSetList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
@@ -581,7 +582,7 @@ func execCheckCrashStatus(sdk client.Client, nodeSpec *v1alpha1.DruidNodeSpec, m
func checkCrashStatus(sdk client.Client, drd *v1alpha1.Druid, emitEvents EventEmitter) error {
- podList, err := readers.List(context.TODO(), sdk, drd, makeLabelsForDruid(drd.Name), emitEvents, func() objectList { return makePodList() }, func(listObj runtime.Object) []object {
+ podList, err := readers.List(context.TODO(), sdk, drd, makeLabelsForDruid(drd.Name), emitEvents, func() objectList { return &v1.PodList{} }, func(listObj runtime.Object) []object {
items := listObj.(*v1.PodList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
@@ -747,7 +748,7 @@ func isObjFullyDeployed(sdk client.Client, nodeSpec v1alpha1.DruidNodeSpec, node
// NOTE: To be called only if generation > 1
func scalePVCForSts(sdk client.Client, nodeSpec *v1alpha1.DruidNodeSpec, nodeSpecUniqueStr string, drd *v1alpha1.Druid, emitEvent EventEmitter) error {
- getSTSList, err := readers.List(context.TODO(), sdk, drd, makeLabelsForDruid(drd.Name), emitEvent, func() objectList { return makeStatefulSetListEmptyObj() }, func(listObj runtime.Object) []object {
+ getSTSList, err := readers.List(context.TODO(), sdk, drd, makeLabelsForDruid(drd.Name), emitEvent, func() objectList { return &appsv1.StatefulSetList{} }, func(listObj runtime.Object) []object {
items := listObj.(*appsv1.StatefulSetList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
@@ -770,7 +771,7 @@ func scalePVCForSts(sdk client.Client, nodeSpec *v1alpha1.DruidNodeSpec, nodeSpe
// return nil, in case return err the program halts since sts would not be able
// we would like the operator to create sts.
- sts, err := readers.Get(context.TODO(), sdk, nodeSpecUniqueStr, drd, func() object { return makeStatefulSetEmptyObj() }, emitEvent)
+ sts, err := readers.Get(context.TODO(), sdk, nodeSpecUniqueStr, drd, func() object { return &appsv1.StatefulSet{} }, emitEvent)
if err != nil {
return nil
}
@@ -779,7 +780,7 @@ func scalePVCForSts(sdk client.Client, nodeSpec *v1alpha1.DruidNodeSpec, nodeSpe
"component": nodeSpec.NodeType,
}
- pvcList, err := readers.List(context.TODO(), sdk, drd, pvcLabels, emitEvent, func() objectList { return makePersistentVolumeClaimListEmptyObj() }, func(listObj runtime.Object) []object {
+ pvcList, err := readers.List(context.TODO(), sdk, drd, pvcLabels, emitEvent, func() objectList { return &v1.PersistentVolumeClaimList{} }, func(listObj runtime.Object) []object {
items := listObj.(*v1.PersistentVolumeClaimList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
@@ -878,7 +879,7 @@ func getVolumeClaimTemplateSizes(sts object, nodeSpec *v1alpha1.DruidNodeSpec, p
func isVolumeExpansionEnabled(sdk client.Client, m *v1alpha1.Druid, nodeSpec *v1alpha1.DruidNodeSpec, emitEvent EventEmitter) bool {
for _, nodeVCT := range nodeSpec.VolumeClaimTemplates {
- sc, err := readers.Get(context.TODO(), sdk, *nodeVCT.Spec.StorageClassName, m, func() object { return makeStorageClassEmptyObj() }, emitEvent)
+ sc, err := readers.Get(context.TODO(), sdk, *nodeVCT.Spec.StorageClassName, m, func() object { return &storage.StorageClass{} }, emitEvent)
if err != nil {
return false
}
@@ -1345,7 +1346,7 @@ func makePodSpec(nodeSpec *v1alpha1.DruidNodeSpec, m *v1alpha1.Druid, nodeSpecUn
}
spec := v1.PodSpec{
- NodeSelector: firstNonNilValue(m.Spec.NodeSelector, nodeSpec.NodeSelector).(map[string]string),
+ NodeSelector: firstNonNilValue(nodeSpec.NodeSelector, m.Spec.NodeSelector).(map[string]string),
TopologySpreadConstraints: getTopologySpreadConstraints(nodeSpec),
Tolerations: getTolerations(nodeSpec, m),
Affinity: getAffinity(nodeSpec, m),
@@ -1366,13 +1367,13 @@ func updateDefaultPortInProbe(probe *v1.Probe, defaultPort int32) *v1.Probe {
return probe
}
-func makePodDisruptionBudget(nodeSpec *v1alpha1.DruidNodeSpec, m *v1alpha1.Druid, ls map[string]string, nodeSpecUniqueStr string) (*v1beta1.PodDisruptionBudget, error) {
+func makePodDisruptionBudget(nodeSpec *v1alpha1.DruidNodeSpec, m *v1alpha1.Druid, ls map[string]string, nodeSpecUniqueStr string) (*policyv1.PodDisruptionBudget, error) {
pdbSpec := *nodeSpec.PodDisruptionBudgetSpec
pdbSpec.Selector = &metav1.LabelSelector{MatchLabels: ls}
- pdb := &v1beta1.PodDisruptionBudget{
+ pdb := &policyv1.PodDisruptionBudget{
TypeMeta: metav1.TypeMeta{
- APIVersion: "policy/v1beta1",
+ APIVersion: "policy/v1",
Kind: "PodDisruptionBudget",
},
@@ -1539,10 +1540,10 @@ func makeDeloymentListEmptyObj() *appsv1.DeploymentList {
}
}
-func makePodDisruptionBudgetListEmptyObj() *v1beta1.PodDisruptionBudgetList {
- return &v1beta1.PodDisruptionBudgetList{
+func makePodDisruptionBudgetListEmptyObj() *policyv1.PodDisruptionBudgetList {
+ return &policyv1.PodDisruptionBudgetList{
TypeMeta: metav1.TypeMeta{
- APIVersion: "policy/v1beta1",
+ APIVersion: "policy/v1",
Kind: "PodDisruptionBudget",
},
}
@@ -1611,10 +1612,10 @@ func makeDeploymentEmptyObj() *appsv1.Deployment {
}
}
-func makePodDisruptionBudgetEmptyObj() *v1beta1.PodDisruptionBudget {
- return &v1beta1.PodDisruptionBudget{
+func makePodDisruptionBudgetEmptyObj() *policyv1.PodDisruptionBudget {
+ return &policyv1.PodDisruptionBudget{
TypeMeta: metav1.TypeMeta{
- APIVersion: "policy/v1beta1",
+ APIVersion: "policy/v1",
Kind: "PodDisruptionBudget",
},
}
diff --git a/controllers/druid/handler_test.go b/controllers/druid/handler_test.go
index 7774c8d7..ce51c75d 100644
--- a/controllers/druid/handler_test.go
+++ b/controllers/druid/handler_test.go
@@ -11,7 +11,7 @@ import (
"github.com/ghodss/yaml"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
- "k8s.io/api/policy/v1beta1"
+ policyv1 "k8s.io/api/policy/v1"
)
func TestMakeStatefulSetForBroker(t *testing.T) {
@@ -67,7 +67,7 @@ func TestMakePodDisruptionBudgetForBroker(t *testing.T) {
actual, _ := makePodDisruptionBudget(&nodeSpec, clusterSpec, makeLabelsForNodeSpec(&nodeSpec, clusterSpec, clusterSpec.Name, nodeSpecUniqueStr), nodeSpecUniqueStr)
addHashToObject(actual)
- expected := new(v1beta1.PodDisruptionBudget)
+ expected := new(policyv1.PodDisruptionBudget)
readAndUnmarshallResource("testdata/broker-pod-disruption-budget.yaml", &expected, t)
assertEquals(expected, actual, t)
diff --git a/controllers/druid/testdata/broker-pod-disruption-budget.yaml b/controllers/druid/testdata/broker-pod-disruption-budget.yaml
index 9f2b1474..8d6ae8d6 100644
--- a/controllers/druid/testdata/broker-pod-disruption-budget.yaml
+++ b/controllers/druid/testdata/broker-pod-disruption-budget.yaml
@@ -1,4 +1,4 @@
-apiVersion: policy/v1beta1
+apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
labels:
diff --git a/e2e/configs/druid-cr.yaml b/e2e/configs/druid-cr.yaml
new file mode 100644
index 00000000..448e4cf5
--- /dev/null
+++ b/e2e/configs/druid-cr.yaml
@@ -0,0 +1,240 @@
+apiVersion: "druid.apache.org/v1alpha1"
+kind: "Druid"
+metadata:
+ name: tiny-cluster
+spec:
+ image: apache/druid:25.0.0
+ # Optionally specify image for all nodes. Can be specify on nodes also
+ # imagePullSecrets:
+ # - name: tutu
+ startScript: /druid.sh
+ podLabels:
+ environment: stage
+ release: alpha
+ podAnnotations:
+ dummy: k8s_extn_needs_atleast_one_annotation
+ readinessProbe:
+ httpGet:
+ path: /status/health
+ port: 8088
+ securityContext:
+ fsGroup: 0
+ runAsUser: 0
+ runAsGroup: 0
+ containerSecurityContext:
+ privileged: true
+ services:
+ - spec:
+ type: ClusterIP
+ clusterIP: None
+ commonConfigMountPath: "/opt/druid/conf/druid/cluster/_common"
+ jvm.options: |-
+ -server
+ -XX:MaxDirectMemorySize=10240g
+ -Duser.timezone=UTC
+ -Dfile.encoding=UTF-8
+ -Dlog4j.debug
+ -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
+ log4j.config: |-
+
+
+
+
+
+
+
+
+
+
+
+
+
+ common.runtime.properties: |
+ #
+ # Zookeeper-less Druid Cluster
+ #
+ druid.zk.service.enabled=false
+ druid.discovery.type=k8s
+ druid.discovery.k8s.clusterIdentifier=druid-it
+ druid.serverview.type=http
+ druid.coordinator.loadqueuepeon.type=http
+ druid.indexer.runner.type=httpRemote
+ # Metadata Store
+ druid.metadata.storage.type=derby
+ druid.metadata.storage.connector.connectURI=jdbc:derby://localhost:1527/var/druid/metadata.db;create=true
+ druid.metadata.storage.connector.host=localhost
+ druid.metadata.storage.connector.port=1527
+ druid.metadata.storage.connector.createTables=true
+ # Deep Storage
+ druid.storage.type=s3
+ druid.storage.bucket=druid
+ druid.storage.baseKey=druid/segments
+ druid.s3.accessKey=minio
+ druid.s3.secretKey=minio123
+ druid.s3.protocol=http
+ druid.s3.enabePathStyleAccess=true
+ druid.s3.endpoint.signingRegion=us-east-1
+ druid.s3.enablePathStyleAccess=true
+ druid.s3.endpoint.url=http://minio1-hl.druid.svc.cluster.local:9000/
+ #
+ # Extensions
+ #
+ druid.extensions.loadList=["druid-avro-extensions", "druid-s3-extensions", "druid-hdfs-storage", "druid-kafka-indexing-service", "druid-datasketches", "druid-kubernetes-extensions"]
+ #
+ # Service discovery
+ #
+ druid.selectors.indexing.serviceName=druid/overlord
+ druid.selectors.coordinator.serviceName=druid/coordinator
+ druid.indexer.logs.type=s3
+ druid.indexer.logs.s3Bucket=druid
+ druid.indexer.logs.s3Prefix=druid/indexing-logs
+ druid.lookup.enableLookupSyncOnStartup=false
+ env:
+ - name: POD_NAME
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.name
+ - name: POD_NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+
+ nodes:
+ brokers:
+ # Optionally specify for running broker as Deployment
+ # kind: Deployment
+ nodeType: "broker"
+ # Optionally specify for broker nodes
+ # imagePullSecrets:
+ # - name: tutu
+ druid.port: 8088
+ services:
+ - spec:
+ type: ClusterIP
+ clusterIP: None
+ nodeConfigMountPath: "/opt/druid/conf/druid/cluster/query/broker"
+ replicas: 1
+ runtime.properties: |
+ druid.service=druid/broker
+ # HTTP server threads
+ druid.broker.http.numConnections=5
+ druid.server.http.numThreads=40
+ # Processing threads and buffers
+ druid.processing.buffer.sizeBytes=25000000
+ druid.sql.enable=true
+ extra.jvm.options: |-
+ -Xmx512m
+ -Xms512m
+
+ coordinators:
+ # Optionally specify for running coordinator as Deployment
+ # kind: Deployment
+ nodeType: "coordinator"
+ druid.port: 8088
+ services:
+ - spec:
+ type: ClusterIP
+ clusterIP: None
+ nodeConfigMountPath: "/opt/druid/conf/druid/cluster/master/coordinator-overlord"
+ replicas: 1
+ runtime.properties: |
+ druid.service=druid/coordinator
+ # HTTP server threads
+ druid.coordinator.startDelay=PT30S
+ druid.coordinator.period=PT30S
+ # Configure this coordinator to also run as Overlord
+ druid.coordinator.asOverlord.enabled=true
+ druid.coordinator.asOverlord.overlordService=druid/overlord
+ druid.indexer.queue.startDelay=PT30S
+ extra.jvm.options: |-
+ -Xmx800m
+ -Xms800m
+
+ historicals:
+ nodeType: "historical"
+ druid.port: 8088
+ services:
+ - spec:
+ type: ClusterIP
+ clusterIP: None
+ nodeConfigMountPath: "/opt/druid/conf/druid/cluster/data/historical"
+ replicas: 1
+ runtime.properties: |
+ druid.service=druid/historical
+ druid.processing.buffer.sizeBytes=25000000
+ druid.processing.numThreads=2
+ # Segment storage
+ druid.segmentCache.locations=[{"path":"/druid/data/segments","maxSize":10737418240}]
+ druid.server.maxSize=10737418240
+ extra.jvm.options: |-
+ -Xmx512m
+ -Xms512m
+
+ routers:
+ nodeType: "router"
+ druid.port: 8088
+ services:
+ - spec:
+ type: ClusterIP
+ clusterIP: None
+ nodeConfigMountPath: "/opt/druid/conf/druid/cluster/query/router"
+ replicas: 1
+ runtime.properties: |
+ druid.service=druid/router
+ # HTTP proxy
+ druid.router.http.numConnections=50
+ druid.router.http.readTimeout=PT5M
+ druid.router.http.numMaxThreads=100
+ druid.server.http.numThreads=100
+ # Service discovery
+ druid.router.defaultBrokerServiceName=druid/broker
+ druid.router.coordinatorServiceName=druid/coordinator
+ # Management proxy to coordinator / overlord: required for unified web console.
+ druid.router.managementProxy.enabled=true
+
+ middlemanagers:
+ nodeType: "middleManager"
+ nodeConfigMountPath: "/opt/druid/conf/druid/cluster/data/middleManager"
+ druid.port: 8088
+ services:
+ - spec:
+ type: ClusterIP
+ clusterIP: None
+ replicas: 1
+ extra.jvm.options: |-
+ -Xmx512m
+ -Xms512m
+ runtime.properties: |
+ druid.service=druid/middleManager
+ druid.worker.capacity=1
+ druid.indexer.runner.javaOpts=-server -Xms128m -Xmx128m -XX:MaxDirectMemorySize=256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.io.tmpdir=/druid/data/tmp -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
+ druid.indexer.task.baseTaskDir=/druid/data/baseTaskDir
+ druid.server.http.numThreads=1
+ druid.indexer.fork.property.druid.processing.buffer.sizeBytes=25000000
+ druid.indexer.fork.property.druid.processing.numMergeBuffers=2
+ druid.indexer.fork.property.druid.processing.numThreads=1
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: Role
+metadata:
+ name: druid-cluster
+rules:
+- apiGroups:
+ - ""
+ resources:
+ - pods
+ - configmaps
+ verbs:
+ - '*'
+---
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: druid-cluster
+subjects:
+- kind: ServiceAccount
+ name: default
+roleRef:
+ kind: Role
+ name: druid-cluster
+ apiGroup: rbac.authorization.k8s.io
diff --git a/e2e/configs/minio-operator-override.yaml b/e2e/configs/minio-operator-override.yaml
new file mode 100644
index 00000000..d0461022
--- /dev/null
+++ b/e2e/configs/minio-operator-override.yaml
@@ -0,0 +1,7 @@
+operator:
+ replicaCount: 1
+ env:
+ - name: MINIO_OPERATOR_TLS_ENABLE
+ value: "off"
+ - name: MINIO_CONSOLE_TLS_ENABLE
+ value: "off"
diff --git a/e2e/configs/minio-tenant-override.yaml b/e2e/configs/minio-tenant-override.yaml
new file mode 100644
index 00000000..34deeb6d
--- /dev/null
+++ b/e2e/configs/minio-tenant-override.yaml
@@ -0,0 +1,6 @@
+tenant:
+ pools:
+ - servers: 1
+ volumesPerServer: 1
+ certificate:
+ requestAutoCert: false
diff --git a/e2e/e2e.sh b/e2e/e2e.sh
new file mode 100755
index 00000000..43f10efa
--- /dev/null
+++ b/e2e/e2e.sh
@@ -0,0 +1,39 @@
+#!/bin/bash
+set -o errexit
+set -x
+# Get Kind
+go install sigs.k8s.io/kind@v0.17.0
+# minio statefulset name
+MINIO_STS_NAME=minio1-ss-0
+# druid namespace
+NAMESPACE=druid
+# fmt code
+make fmt
+# vet
+make vet
+# deploy kind
+make kind
+# build local docker druid operator image
+make docker-build-local
+# push to kind registery
+make docker-push-local
+# install druid-operator
+make helm-install-druid-operator
+# install minio-operator and tenant
+make helm-minio-install
+# hack for minio pod to get started
+sleep 60
+# wait for minio pods
+kubectl rollout status sts $MINIO_STS_NAME -n ${NAMESPACE} --timeout=60s
+# output pods
+kubectl get pods -n ${NAMESPACE}
+# apply druid cr
+kubectl apply -f e2e/configs/druid-cr.yaml -n ${NAMESPACE}
+# hack for druid pods
+sleep 30
+# wait for druid pods
+declare -a sts=($(kubectl get sts -n ${NAMESPACE} -l app=${NAMESPACE} | awk '{ print $1 }' | awk '(NR>1)'))
+for s in ${sts[@]}; do
+ echo $s
+ kubectl rollout status sts $s -n ${NAMESPACE} --timeout=60s
+done
diff --git a/e2e/kind.sh b/e2e/kind.sh
new file mode 100755
index 00000000..134c9900
--- /dev/null
+++ b/e2e/kind.sh
@@ -0,0 +1,41 @@
+#!/bin/bash
+#!/bin/sh
+set -o errexit
+
+# create registry container unless it already exists
+reg_name='kind-registry'
+reg_port='5001'
+if [ "$(docker inspect -f '{{.State.Running}}' "${reg_name}" 2>/dev/null || true)" != 'true' ]; then
+ docker run \
+ -d --restart=always -p "127.0.0.1:${reg_port}:5000" --name "${reg_name}" \
+ registry:2
+fi
+
+# create a cluster with the local registry enabled in containerd
+cat <