From 8c3d0b0bcf669f8b3f450ee3351287143786571d Mon Sep 17 00:00:00 2001 From: artaasadi Date: Wed, 18 Dec 2024 20:30:45 +0100 Subject: [PATCH] fix: put data directly into es --- services/tasks/worker/consts/es.go | 11 +++++ services/tasks/worker/worker.go | 77 +++++++++++++++++++++--------- 2 files changed, 66 insertions(+), 22 deletions(-) create mode 100644 services/tasks/worker/consts/es.go diff --git a/services/tasks/worker/consts/es.go b/services/tasks/worker/consts/es.go new file mode 100644 index 000000000..5ae69ac18 --- /dev/null +++ b/services/tasks/worker/consts/es.go @@ -0,0 +1,11 @@ +package consts + +const ( + ElasticSearchAddressEnv = "ELASTICSEARCH_ADDRESS" + ElasticSearchUsernameEnv = "ELASTICSEARCH_USERNAME" + ElasticSearchPasswordEnv = "ELASTICSEARCH_PASSWORD" + ElasticSearchIsOnAksNameEnv = "ELASTICSEARCH_IS_ON_AKS" + ElasticSearchIsOpenSearch = "ELASTICSEARCH_IS_OPENSEARCH" + ElasticSearchAwsRegionEnv = "ELASTICSEARCH_AWS_REGION" + ElasticSearchAssumeRoleArnEnv = "ELASTICSEARCH_ASSUME_ROLE_ARN" +) diff --git a/services/tasks/worker/worker.go b/services/tasks/worker/worker.go index b10da0241..18f7bca84 100644 --- a/services/tasks/worker/worker.go +++ b/services/tasks/worker/worker.go @@ -12,6 +12,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "os" "sigs.k8s.io/controller-runtime/pkg/client" + "strconv" ) func CreateWorker(ctx context.Context, cfg config.Config, kubeClient client.Client, taskConfig *Task, namespace string) error { @@ -24,28 +25,7 @@ func CreateWorker(ctx context.Context, cfg config.Config, kubeClient client.Clie Value: v, }) } - env = append(env, []corev1.EnvVar{ - { - Name: consts.NatsURLEnv, - Value: cfg.NATS.URL, - }, - { - Name: consts.NatsConsumerEnv, - Value: taskConfig.NatsConfig.Consumer, - }, - { - Name: consts.NatsStreamNameEnv, - Value: taskConfig.NatsConfig.Stream, - }, - { - Name: consts.NatsTopicNameEnv, - Value: taskConfig.NatsConfig.Topic, - }, - { - Name: consts.NatsResultTopicNameEnv, - Value: taskConfig.NatsConfig.ResultTopic, - }, - }...) + env = append(env, defaultEnvs(cfg, taskConfig)...) switch taskConfig.WorkloadType { case WorkloadTypeDeployment: // deployment @@ -150,3 +130,56 @@ func CreateWorker(ctx context.Context, cfg config.Config, kubeClient client.Clie return nil } + +func defaultEnvs(cfg config.Config, taskConfig *Task) []corev1.EnvVar { + return []corev1.EnvVar{ + { + Name: consts.NatsURLEnv, + Value: cfg.NATS.URL, + }, + { + Name: consts.NatsConsumerEnv, + Value: taskConfig.NatsConfig.Consumer, + }, + { + Name: consts.NatsStreamNameEnv, + Value: taskConfig.NatsConfig.Stream, + }, + { + Name: consts.NatsTopicNameEnv, + Value: taskConfig.NatsConfig.Topic, + }, + { + Name: consts.NatsResultTopicNameEnv, + Value: taskConfig.NatsConfig.ResultTopic, + }, + { + Name: consts.ElasticSearchAddressEnv, + Value: cfg.ElasticSearch.Address, + }, + { + Name: consts.ElasticSearchUsernameEnv, + Value: cfg.ElasticSearch.Username, + }, + { + Name: consts.ElasticSearchPasswordEnv, + Value: cfg.ElasticSearch.Password, + }, + { + Name: consts.ElasticSearchIsOnAksNameEnv, + Value: strconv.FormatBool(cfg.ElasticSearch.IsOnAks), + }, + { + Name: consts.ElasticSearchIsOpenSearch, + Value: strconv.FormatBool(cfg.ElasticSearch.IsOpenSearch), + }, + { + Name: consts.ElasticSearchAwsRegionEnv, + Value: cfg.ElasticSearch.AwsRegion, + }, + { + Name: consts.ElasticSearchAssumeRoleArnEnv, + Value: cfg.ElasticSearch.AssumeRoleArn, + }, + } +}