Skip to content

Commit

Permalink
Merge pull request #2279 from opengovern/fix-tasks
Browse files Browse the repository at this point in the history
fix: put data directly into es
  • Loading branch information
artaasadi authored Dec 18, 2024
2 parents f5fcbb2 + 8c3d0b0 commit a48845e
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 22 deletions.
11 changes: 11 additions & 0 deletions services/tasks/worker/consts/es.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package consts

const (
ElasticSearchAddressEnv = "ELASTICSEARCH_ADDRESS"
ElasticSearchUsernameEnv = "ELASTICSEARCH_USERNAME"
ElasticSearchPasswordEnv = "ELASTICSEARCH_PASSWORD"
ElasticSearchIsOnAksNameEnv = "ELASTICSEARCH_IS_ON_AKS"
ElasticSearchIsOpenSearch = "ELASTICSEARCH_IS_OPENSEARCH"
ElasticSearchAwsRegionEnv = "ELASTICSEARCH_AWS_REGION"
ElasticSearchAssumeRoleArnEnv = "ELASTICSEARCH_ASSUME_ROLE_ARN"
)
77 changes: 55 additions & 22 deletions services/tasks/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"os"
"sigs.k8s.io/controller-runtime/pkg/client"
"strconv"
)

func CreateWorker(ctx context.Context, cfg config.Config, kubeClient client.Client, taskConfig *Task, namespace string) error {
Expand All @@ -24,28 +25,7 @@ func CreateWorker(ctx context.Context, cfg config.Config, kubeClient client.Clie
Value: v,
})
}
env = append(env, []corev1.EnvVar{
{
Name: consts.NatsURLEnv,
Value: cfg.NATS.URL,
},
{
Name: consts.NatsConsumerEnv,
Value: taskConfig.NatsConfig.Consumer,
},
{
Name: consts.NatsStreamNameEnv,
Value: taskConfig.NatsConfig.Stream,
},
{
Name: consts.NatsTopicNameEnv,
Value: taskConfig.NatsConfig.Topic,
},
{
Name: consts.NatsResultTopicNameEnv,
Value: taskConfig.NatsConfig.ResultTopic,
},
}...)
env = append(env, defaultEnvs(cfg, taskConfig)...)
switch taskConfig.WorkloadType {
case WorkloadTypeDeployment:
// deployment
Expand Down Expand Up @@ -150,3 +130,56 @@ func CreateWorker(ctx context.Context, cfg config.Config, kubeClient client.Clie

return nil
}

func defaultEnvs(cfg config.Config, taskConfig *Task) []corev1.EnvVar {
return []corev1.EnvVar{
{
Name: consts.NatsURLEnv,
Value: cfg.NATS.URL,
},
{
Name: consts.NatsConsumerEnv,
Value: taskConfig.NatsConfig.Consumer,
},
{
Name: consts.NatsStreamNameEnv,
Value: taskConfig.NatsConfig.Stream,
},
{
Name: consts.NatsTopicNameEnv,
Value: taskConfig.NatsConfig.Topic,
},
{
Name: consts.NatsResultTopicNameEnv,
Value: taskConfig.NatsConfig.ResultTopic,
},
{
Name: consts.ElasticSearchAddressEnv,
Value: cfg.ElasticSearch.Address,
},
{
Name: consts.ElasticSearchUsernameEnv,
Value: cfg.ElasticSearch.Username,
},
{
Name: consts.ElasticSearchPasswordEnv,
Value: cfg.ElasticSearch.Password,
},
{
Name: consts.ElasticSearchIsOnAksNameEnv,
Value: strconv.FormatBool(cfg.ElasticSearch.IsOnAks),
},
{
Name: consts.ElasticSearchIsOpenSearch,
Value: strconv.FormatBool(cfg.ElasticSearch.IsOpenSearch),
},
{
Name: consts.ElasticSearchAwsRegionEnv,
Value: cfg.ElasticSearch.AwsRegion,
},
{
Name: consts.ElasticSearchAssumeRoleArnEnv,
Value: cfg.ElasticSearch.AssumeRoleArn,
},
}
}

0 comments on commit a48845e

Please sign in to comment.