diff --git a/.github/workflows/go.yaml b/.github/workflows/go.yaml index dbb655071..cd0d55cab 100644 --- a/.github/workflows/go.yaml +++ b/.github/workflows/go.yaml @@ -80,6 +80,7 @@ jobs: wastage-service: ${{ steps.build_services.outputs.wastage-service }} information-service: ${{ steps.build_services.outputs.information-service }} query-runner-worker: ${{ steps.build_services.outputs.query-runner-worker }} + query-validator-worker: ${{ steps.build_services.outputs.query-validator-worker }} demo-importer-worker: ${{ steps.build_services.outputs.demo-importer-worker }} env: @@ -822,6 +823,46 @@ jobs: PLUGIN_REGISTRY=ghcr.io/opengovern context: . + deploy-query-validator-worker: + runs-on: ubuntu-latest + needs: + - build + - tag + - deploy-steampipe-plugin-opengovernance + - deploy-steampipe + permissions: + id-token: write + contents: read + environment: docker + if: needs.build.outputs.query-validator-worker == 'true' && github.event_name != 'pull_request' + steps: + - name: Checkout code + uses: actions/checkout@v3 + - name: Download artifact + uses: actions/download-artifact@v3 + with: + name: build + path: . + - name: Unpack artifact + run: | + tar -xvf build.tar.gz + - name: Log in to the Container registry + uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GHCR_PAT }} + - name: Build and push Docker images + uses: docker/build-push-action@v4 + with: + push: true + tags: | + ghcr.io/${{ github.repository_owner }}/query-validator-worker:${{ needs.tag.outputs.latest_tag }} + file: docker/QueryValidatorWorkerDockerfile + build-args: | + PLUGIN_REGISTRY=ghcr.io/opengovern + context: . + deploy-import-data-script: runs-on: ubuntu-latest needs: diff --git a/cmd/query-validator-worker/main.go b/cmd/query-validator-worker/main.go new file mode 100644 index 000000000..0c0cedd7b --- /dev/null +++ b/cmd/query-validator-worker/main.go @@ -0,0 +1,36 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + + queryvalidator "github.com/opengovern/opengovernance/pkg/query-validator" +) + +func main() { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + defer func() { + signal.Stop(c) + cancel() + }() + + go func() { + select { + case <-c: + cancel() + case <-ctx.Done(): + } + }() + + if err := queryvalidator.WorkerCommand().ExecuteContext(ctx); err != nil { + fmt.Println(err) + os.Exit(1) + } +} diff --git a/docker/QueryValidatorWorkerDockerfile b/docker/QueryValidatorWorkerDockerfile new file mode 100644 index 000000000..ab8482a40 --- /dev/null +++ b/docker/QueryValidatorWorkerDockerfile @@ -0,0 +1,36 @@ +ARG PLUGIN_REGISTRY +FROM ${PLUGIN_REGISTRY}/steampipe-plugin-aws:0.0.1 as aws +FROM ${PLUGIN_REGISTRY}/steampipe-plugin-azure:0.0.1 as azure +FROM ${PLUGIN_REGISTRY}/steampipe-plugin-azuread:0.0.1 as azuread +FROM ${PLUGIN_REGISTRY}/steampipe-plugin-opengovernance:0.0.1 as opengovernance + +FROM ubuntu:20.04 AS base +RUN apt-get update && apt-get install -y \ + curl \ + sudo \ + && rm -rf /var/lib/apt/lists/* + +RUN sudo /bin/sh -c "$(curl -fsSL https://steampipe.io/install/steampipe.sh)" + +COPY --from=aws /steampipe-plugin-aws.plugin /home/steampipe/.steampipe/plugins/hub.steampipe.io/plugins/turbot/aws@latest/steampipe-plugin-aws.plugin +COPY --from=azure /steampipe-plugin-azure.plugin /home/steampipe/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azure@latest/steampipe-plugin-azure.plugin +COPY --from=azuread /steampipe-plugin-azuread.plugin /home/steampipe/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azuread@latest/steampipe-plugin-azuread.plugin +COPY --from=opengovernance /steampipe-plugin-opengovernance.plugin /home/steampipe/.steampipe/plugins/local/opengovernance/opengovernance.plugin + +USER root +RUN useradd -ms /bin/bash steampipe +RUN mkdir -p /home/steampipe/.steampipe/config +RUN mkdir -p /home/steampipe/.steampipe/db +RUN mkdir -p /home/steampipe/.steampipe/db/14.2.0 +RUN chown -R steampipe:steampipe /home/steampipe +RUN chmod -R 755 /home/steampipe +RUN apt update +RUN apt install -y procps htop +USER steampipe + +RUN steampipe plugin list + +COPY ./build/query-validator-worker / + +ENTRYPOINT [ "/query-validator-worker" ] +CMD [ "/query-validator-worker" ] \ No newline at end of file diff --git a/pkg/describe/db/model/query_validator.go b/pkg/describe/db/model/query_validator.go new file mode 100644 index 000000000..6d99bd2fc --- /dev/null +++ b/pkg/describe/db/model/query_validator.go @@ -0,0 +1,14 @@ +package model + +import ( + queryvalidator "github.com/opengovern/opengovernance/pkg/query-validator" + "gorm.io/gorm" +) + +type QueryValidatorJob struct { + gorm.Model + QueryId string + QueryType queryvalidator.QueryType + Status queryvalidator.QueryValidatorStatus + FailureMessage string +} diff --git a/pkg/describe/db/query_validator.go b/pkg/describe/db/query_validator.go new file mode 100644 index 000000000..ff613bf61 --- /dev/null +++ b/pkg/describe/db/query_validator.go @@ -0,0 +1,143 @@ +package db + +import ( + "errors" + "fmt" + queryrunner "github.com/opengovern/opengovernance/services/inventory/query-runner" + + "github.com/opengovern/opengovernance/pkg/describe/db/model" + queryvalidator "github.com/opengovern/opengovernance/pkg/query-validator" + "gorm.io/gorm" +) + +func (db Database) CreateQueryValidatorJob(job *model.QueryValidatorJob) (uint, error) { + tx := db.ORM.Create(job) + if tx.Error != nil { + return 0, tx.Error + } + + return job.ID, nil +} + +func (db Database) GetQueryValidatorJob(id uint) (*model.QueryValidatorJob, error) { + var job model.QueryValidatorJob + tx := db.ORM.Model(&model.QueryValidatorJob{}).Where("id = ?", id).First(&job) + if tx.Error != nil { + return nil, tx.Error + } + return &job, nil +} + +func (db Database) ListQueryValidatorJobsById(ids []string) ([]model.QueryValidatorJob, error) { + var jobs []model.QueryValidatorJob + tx := db.ORM.Model(&model.QueryValidatorJob{}).Where("id IN ?", ids).Find(&jobs) + if tx.Error != nil { + return nil, tx.Error + } + return jobs, nil +} + +func (db Database) ListQueryValidatorJobs() ([]model.QueryValidatorJob, error) { + var jobs []model.QueryValidatorJob + tx := db.ORM.Model(&model.QueryValidatorJob{}).Find(&jobs) + if tx.Error != nil { + return nil, tx.Error + } + return jobs, nil +} + +func (db Database) FetchCreatedQueryValidatorJobs(limit int64) ([]model.QueryValidatorJob, error) { + var jobs []model.QueryValidatorJob + tx := db.ORM.Model(&model.QueryValidatorJob{}).Where("status = ?", queryvalidator.QueryValidatorCreated).Limit(int(limit)).Find(&jobs) + if tx.Error != nil { + return nil, tx.Error + } + return jobs, nil +} + +func (db Database) GetInProgressJobsCount() (int64, error) { + var count int64 + tx := db.ORM.Model(&model.QueryValidatorJob{}).Where("status IN ?", []string{string(queryvalidator.QueryValidatorInProgress), + string(queryvalidator.QueryValidatorQueued)}).Count(&count) + if tx.Error != nil { + return 0, tx.Error + } + return count, nil +} + +func (db Database) DeleteQueryValidatorJob(id uint) error { + tx := db.ORM.Model(&model.QueryValidatorJob{}).Delete(&model.QueryValidatorJob{}, id) + if tx.Error != nil { + return tx.Error + } + return nil +} + +func (db Database) UpdateQueryValidatorJobStatus(jobId uint, status queryvalidator.QueryValidatorStatus, failureReason string) error { + tx := db.ORM.Model(&model.QueryValidatorJob{}).Where("id = ?", jobId). + Updates(model.QueryValidatorJob{Status: status, FailureMessage: failureReason}) + if tx.Error != nil { + return tx.Error + } + return nil +} + +func (db Database) UpdateTimedOutInProgressQueryValidators() error { + tx := db.ORM. + Model(&model.QueryValidatorJob{}). + Where("status = ?", queryrunner.QueryRunnerInProgress). + Where("updated_at < NOW() - INTERVAL '5 MINUTES'"). + Updates(model.QueryValidatorJob{Status: queryvalidator.QueryValidatorTimeOut, FailureMessage: "Job timed out"}) + if tx.Error != nil { + return tx.Error + } + + return nil +} + +func (db Database) UpdateTimedOutQueuedQueryValidators() error { + tx := db.ORM. + Model(&model.QueryValidatorJob{}). + Where("status = ?", queryrunner.QueryRunnerQueued). + Where("updated_at < NOW() - INTERVAL '12 HOURS'"). + Updates(model.QueryValidatorJob{Status: queryvalidator.QueryValidatorTimeOut, FailureMessage: "Job timed out"}) + if tx.Error != nil { + return tx.Error + } + + return nil +} + +func (db Database) ListQueryValidatorJobForInterval(interval, triggerType, createdBy string) ([]model.QueryValidatorJob, error) { + var job []model.QueryValidatorJob + + tx := db.ORM.Model(&model.QueryValidatorJob{}) + + if interval != "" { + tx = tx.Where(fmt.Sprintf("NOW() - updated_at < INTERVAL '%s'", interval)) + } + if triggerType != "" { + tx = tx.Where("trigger_type = ?", triggerType) + } + if createdBy != "" { + tx = tx.Where("created_by = ?", createdBy) + } + + tx = tx.Find(&job) + + if tx.Error != nil { + if errors.Is(tx.Error, gorm.ErrRecordNotFound) { + return nil, nil + } + return nil, tx.Error + } + return job, nil +} + +func (db Database) CleanupAllQueryValidatorJobs() error { + tx := db.ORM.Where("1 = 1").Unscoped().Delete(&model.QueryValidatorJob{}) + if tx.Error != nil { + return tx.Error + } + return nil +} diff --git a/pkg/describe/scheduler.go b/pkg/describe/scheduler.go index 3743c9aa3..a612ef0b9 100644 --- a/pkg/describe/scheduler.go +++ b/pkg/describe/scheduler.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + queryvalidator "github.com/opengovern/opengovernance/pkg/query-validator" "net" "net/http" "strconv" @@ -14,6 +15,7 @@ import ( "github.com/opengovern/og-util/pkg/opengovernance-es-sdk" queryrunnerscheduler "github.com/opengovern/opengovernance/pkg/describe/schedulers/query-runner" + queryrvalidatorscheduler "github.com/opengovern/opengovernance/pkg/describe/schedulers/query-validator" integration_type "github.com/opengovern/opengovernance/services/integration/integration-type" queryrunner "github.com/opengovern/opengovernance/services/inventory/query-runner" @@ -135,10 +137,11 @@ type Scheduler struct { OperationMode OperationMode MaxConcurrentCall int64 - complianceScheduler *compliance.JobScheduler - discoveryScheduler *discovery.Scheduler - queryRunnerScheduler *queryrunnerscheduler.JobScheduler - conf config.SchedulerConfig + complianceScheduler *compliance.JobScheduler + discoveryScheduler *discovery.Scheduler + queryRunnerScheduler *queryrunnerscheduler.JobScheduler + queryValidatorScheduler *queryrvalidatorscheduler.JobScheduler + conf config.SchedulerConfig } func InitializeScheduler( @@ -316,6 +319,11 @@ func (s *Scheduler) SetupNatsStreams(ctx context.Context) error { return err } + if err := s.jq.Stream(ctx, queryvalidator.StreamName, "Query Validator job queues", []string{queryvalidator.JobQueueTopic, queryvalidator.JobResultQueueTopic}, 1000); err != nil { + s.logger.Error("Failed to stream to Query Validator queue", zap.Error(err)) + return err + } + if err := s.jq.Stream(ctx, summarizer.StreamName, "compliance summarizer job queues", []string{summarizer.JobQueueTopic, summarizer.JobQueueTopicManuals, summarizer.ResultQueueTopic}, 1000); err != nil { s.logger.Error("Failed to stream to compliance summarizer queue", zap.Error(err)) return err @@ -454,6 +462,22 @@ func (s *Scheduler) Run(ctx context.Context) error { ) s.queryRunnerScheduler.Run(ctx) + // Query Validator + s.queryValidatorScheduler = queryrvalidatorscheduler.New( + func(ctx context.Context) error { + return s.SetupNatsStreams(ctx) + }, + s.conf, + s.logger, + s.db, + s.jq, + s.es, + s.inventoryClient, + s.complianceClient, + s.metadataClient, + ) + s.queryValidatorScheduler.Run(ctx) + // Compliance s.complianceScheduler = compliance.New( func(ctx context.Context) error { diff --git a/pkg/describe/schedulers/query-validator/consumer.go b/pkg/describe/schedulers/query-validator/consumer.go new file mode 100644 index 000000000..9918ba449 --- /dev/null +++ b/pkg/describe/schedulers/query-validator/consumer.go @@ -0,0 +1,40 @@ +package query_runner + +import ( + "context" + "encoding/json" + "github.com/nats-io/nats.go/jetstream" + queryvalidator "github.com/opengovern/opengovernance/pkg/query-validator" + "go.uber.org/zap" +) + +func (s *JobScheduler) RunQueryRunnerReportJobResultsConsumer(ctx context.Context) error { + if _, err := s.jq.Consume(ctx, "scheduler-query-validator", queryvalidator.StreamName, []string{queryvalidator.JobResultQueueTopic}, "scheduler-query-validator", func(msg jetstream.Msg) { + if err := msg.Ack(); err != nil { + s.logger.Error("Failed committing message", zap.Error(err)) + } + + var result queryvalidator.JobResult + if err := json.Unmarshal(msg.Data(), &result); err != nil { + s.logger.Error("Failed to unmarshal ComplianceReportJob results", zap.Error(err)) + return + } + + s.logger.Info("Processing ReportJobResult for Job", + zap.Uint("jobId", result.ID), + zap.String("status", string(result.Status)), + ) + err := s.db.UpdateQueryValidatorJobStatus(result.ID, result.Status, result.FailureMessage) + if err != nil { + s.logger.Error("Failed to update the status of QueryRunnerReportJob", + zap.Uint("jobId", result.ID), + zap.Error(err)) + return + } + }); err != nil { + return err + } + + <-ctx.Done() + return nil +} diff --git a/pkg/describe/schedulers/query-validator/publisher.go b/pkg/describe/schedulers/query-validator/publisher.go new file mode 100644 index 000000000..be0b38b33 --- /dev/null +++ b/pkg/describe/schedulers/query-validator/publisher.go @@ -0,0 +1,134 @@ +package query_runner + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "text/template" + + "github.com/opengovern/og-util/pkg/api" + "github.com/opengovern/og-util/pkg/httpclient" + queryvalidator "github.com/opengovern/opengovernance/pkg/query-validator" + inventoryApi "github.com/opengovern/opengovernance/services/inventory/api" + "go.uber.org/zap" +) + +func (s *JobScheduler) runPublisher(ctx context.Context) error { + ctx2 := &httpclient.Context{UserRole: api.AdminRole} + ctx2.Ctx = ctx + + s.logger.Info("Query Runner publisher started") + + err := s.db.UpdateTimedOutQueuedQueryRunners() + if err != nil { + s.logger.Error("failed to update timed out query runners", zap.Error(err)) + } + + err = s.db.UpdateTimedOutInProgressQueryRunners() + if err != nil { + s.logger.Error("failed to update timed out query runners", zap.Error(err)) + } + + count, err := s.db.GetInProgressJobsCount() + if err != nil { + s.logger.Error("GetInProgressJobsCount Error", zap.Error(err)) + return err + } + jobs, err := s.db.FetchCreatedQueryValidatorJobs(200 - count) + if err != nil { + s.logger.Error("List Queries Error", zap.Error(err)) + return err + } + for _, job := range jobs { + jobMsg := &queryvalidator.Job{ + ID: job.ID, + } + if job.QueryType == queryvalidator.QueryTypeNamedQuery { + jobMsg.QueryType = queryvalidator.QueryTypeNamedQuery + namedQuery, err := s.inventoryClient.GetQuery(ctx2, job.QueryId) + if err != nil { + s.logger.Error("Get Query Error", zap.Error(err)) + } + jobMsg.Query = namedQuery.Query.QueryToExecute + jobMsg.Parameters = namedQuery.Query.Parameters + jobMsg.ListOfTables = namedQuery.Query.ListOfTables + jobMsg.PrimaryTable = namedQuery.Query.PrimaryTable + jobMsg.IntegrationType = namedQuery.IntegrationTypes + } else if job.QueryType == queryvalidator.QueryTypeComplianceControl { + controlQuery, err := s.complianceClient.GetControlDetails(ctx2, job.QueryId) + if err != nil { + s.logger.Error("Get Control Error", zap.Error(err)) + } + jobMsg.Query = controlQuery.Query.QueryToExecute + var parameters []inventoryApi.QueryParameter + for _, qp := range controlQuery.Query.Parameters { + parameters = append(parameters, inventoryApi.QueryParameter{ + Key: qp.Key, + Required: qp.Required, + }) + } + jobMsg.Parameters = parameters + jobMsg.ListOfTables = controlQuery.Query.ListOfTables + jobMsg.PrimaryTable = controlQuery.Query.PrimaryTable + jobMsg.IntegrationType = controlQuery.IntegrationType + } else { + _ = s.db.UpdateQueryValidatorJobStatus(job.ID, queryvalidator.QueryValidatorFailed, "query ID not found") + continue + } + + queryParams, err := s.metadataClient.ListQueryParameters(&httpclient.Context{UserRole: api.AdminRole}) + if err != nil { + _ = s.db.UpdateQueryValidatorJobStatus(job.ID, queryvalidator.QueryValidatorFailed, fmt.Sprintf("failed to list parameters: %s", err.Error())) + return err + } + queryParamMap := make(map[string]string) + for _, qp := range queryParams.QueryParameters { + queryParamMap[qp.Key] = qp.Value + } + queryTemplate, err := template.New("query").Parse(jobMsg.Query) + if err != nil { + return err + } + var queryOutput bytes.Buffer + if err := queryTemplate.Execute(&queryOutput, queryParamMap); err != nil { + _ = s.db.UpdateQueryValidatorJobStatus(job.ID, queryvalidator.QueryValidatorFailed, fmt.Sprintf("failed to execute query template: %s", err.Error())) + return fmt.Errorf("failed to execute query template: %w", err) + } + + jobMsg.Query = queryOutput.String() + + jobJson, err := json.Marshal(jobMsg) + if err != nil { + _ = s.db.UpdateQueryValidatorJobStatus(job.ID, queryvalidator.QueryValidatorFailed, "failed to marshal job") + s.logger.Error("failed to marshal Query Runner Job", zap.Error(err), zap.Uint("runnerId", job.ID)) + continue + } + + s.logger.Info("publishing query runner job", zap.Uint("jobId", job.ID)) + topic := queryvalidator.JobQueueTopic + _, err = s.jq.Produce(ctx, topic, jobJson, fmt.Sprintf("job-%d", job.ID)) + if err != nil { + if err.Error() == "nats: no response from stream" { + err = s.runSetupNatsStreams(ctx) + if err != nil { + s.logger.Error("Failed to setup nats streams", zap.Error(err)) + return err + } + _, err = s.jq.Produce(ctx, topic, jobJson, fmt.Sprintf("job-%d", job.ID)) + if err != nil { + _ = s.db.UpdateQueryValidatorJobStatus(job.ID, queryvalidator.QueryValidatorFailed, err.Error()) + s.logger.Error("failed to send job", zap.Error(err), zap.Uint("runnerId", job.ID)) + continue + } + } else { + _ = s.db.UpdateQueryValidatorJobStatus(job.ID, queryvalidator.QueryValidatorFailed, err.Error()) + s.logger.Error("failed to send query runner job", zap.Error(err), zap.Uint("runnerId", job.ID), zap.String("error message", err.Error())) + continue + } + } + + _ = s.db.UpdateQueryValidatorJobStatus(job.ID, queryvalidator.QueryValidatorQueued, "") + } + return nil +} diff --git a/pkg/describe/schedulers/query-validator/scheduler.go b/pkg/describe/schedulers/query-validator/scheduler.go new file mode 100644 index 000000000..58affbfc0 --- /dev/null +++ b/pkg/describe/schedulers/query-validator/scheduler.go @@ -0,0 +1,50 @@ +package query_runner + +import ( + "fmt" + "github.com/opengovern/og-util/pkg/api" + "github.com/opengovern/og-util/pkg/httpclient" + "github.com/opengovern/opengovernance/pkg/describe/db/model" + queryvalidator "github.com/opengovern/opengovernance/pkg/query-validator" + "go.uber.org/zap" +) + +func (s *JobScheduler) runScheduler() error { + clientCtx := &httpclient.Context{UserRole: api.AdminRole} + + controls, err := s.complianceClient.ListControl(clientCtx, nil, nil) + if err != nil { + s.logger.Error("error while listing benchmarks", zap.Error(err)) + return fmt.Errorf("error while listing benchmarks: %v", err) + } + for _, c := range controls { + _, err = s.db.CreateQueryValidatorJob(&model.QueryValidatorJob{ + QueryId: c.ID, + QueryType: queryvalidator.QueryTypeComplianceControl, + Status: queryvalidator.QueryValidatorCreated, + FailureMessage: "", + }) + if err != nil { + s.logger.Error("error while creating query-validator job", zap.Error(err)) + } + } + + namedQueries, err := s.inventoryClient.ListQueriesV2(clientCtx) + if err != nil { + s.logger.Error("error while listing benchmarks", zap.Error(err)) + return fmt.Errorf("error while listing benchmarks: %v", err) + } + for _, nq := range namedQueries.Items { + _, err = s.db.CreateQueryValidatorJob(&model.QueryValidatorJob{ + QueryId: nq.ID, + QueryType: queryvalidator.QueryTypeNamedQuery, + Status: queryvalidator.QueryValidatorCreated, + FailureMessage: "", + }) + if err != nil { + s.logger.Error("error while creating query-validator job", zap.Error(err)) + } + } + + return nil +} diff --git a/pkg/describe/schedulers/query-validator/service.go b/pkg/describe/schedulers/query-validator/service.go new file mode 100644 index 000000000..1ae1989de --- /dev/null +++ b/pkg/describe/schedulers/query-validator/service.go @@ -0,0 +1,96 @@ +package query_runner + +import ( + "context" + "time" + + "github.com/opengovern/og-util/pkg/jq" + metadataClient "github.com/opengovern/opengovernance/services/metadata/client" + + "github.com/opengovern/og-util/pkg/opengovernance-es-sdk" + "github.com/opengovern/og-util/pkg/ticker" + "github.com/opengovern/opengovernance/pkg/describe/config" + "github.com/opengovern/opengovernance/pkg/describe/db" + "github.com/opengovern/opengovernance/pkg/utils" + complianceClient "github.com/opengovern/opengovernance/services/compliance/client" + inventoryClient "github.com/opengovern/opengovernance/services/inventory/client" + "go.uber.org/zap" +) + +const JobSchedulingInterval = 10 * time.Second + +type JobScheduler struct { + runSetupNatsStreams func(context.Context) error + conf config.SchedulerConfig + logger *zap.Logger + db db.Database + jq *jq.JobQueue + esClient opengovernance.Client + inventoryClient inventoryClient.InventoryServiceClient + complianceClient complianceClient.ComplianceServiceClient + metadataClient metadataClient.MetadataServiceClient +} + +func New( + runSetupNatsStreams func(context.Context) error, + conf config.SchedulerConfig, + logger *zap.Logger, + db db.Database, + jq *jq.JobQueue, + esClient opengovernance.Client, + inventoryClient inventoryClient.InventoryServiceClient, + complianceClient complianceClient.ComplianceServiceClient, + metadataClient metadataClient.MetadataServiceClient, +) *JobScheduler { + return &JobScheduler{ + runSetupNatsStreams: runSetupNatsStreams, + conf: conf, + logger: logger, + db: db, + jq: jq, + esClient: esClient, + inventoryClient: inventoryClient, + complianceClient: complianceClient, + metadataClient: metadataClient, + } +} + +func (s *JobScheduler) Run(ctx context.Context) { + utils.EnsureRunGoroutine(func() { + s.RunScheduler() + }) + utils.EnsureRunGoroutine(func() { + s.RunPublisher(ctx) + }) + utils.EnsureRunGoroutine(func() { + s.logger.Fatal("ComplianceReportJobResult consumer exited", zap.Error(s.RunQueryRunnerReportJobResultsConsumer(ctx))) + }) +} + +func (s *JobScheduler) RunScheduler() { + s.logger.Info("Scheduling compliance jobs on a timer") + + t := ticker.NewTicker(12*time.Hour, time.Second*10) + defer t.Stop() + + for ; ; <-t.C { + if err := s.runScheduler(); err != nil { + s.logger.Error("failed to run compliance scheduler", zap.Error(err)) + continue + } + } +} + +func (s *JobScheduler) RunPublisher(ctx context.Context) { + s.logger.Info("Scheduling publisher on a timer") + + t := ticker.NewTicker(JobSchedulingInterval, time.Second*10) + defer t.Stop() + + for ; ; <-t.C { + if err := s.runPublisher(ctx); err != nil { + s.logger.Error("failed to run compliance publisher", zap.Error(err)) + continue + } + } +} diff --git a/pkg/query-validator/command.go b/pkg/query-validator/command.go new file mode 100644 index 000000000..0d2ee2405 --- /dev/null +++ b/pkg/query-validator/command.go @@ -0,0 +1,51 @@ +package query_validator + +import ( + "errors" + "github.com/opengovern/og-util/pkg/config" + "github.com/spf13/cobra" + "go.uber.org/zap" +) + +func WorkerCommand() *cobra.Command { + var ( + id string + cnf Config + ) + config.ReadFromEnv(&cnf, nil) + + cmd := &cobra.Command{ + PreRunE: func(cmd *cobra.Command, args []string) error { + switch { + case id == "": + return errors.New("missing required flag 'id'") + default: + return nil + } + }, + RunE: func(cmd *cobra.Command, args []string) error { + cmd.SilenceUsage = true + logger, err := zap.NewProduction() + if err != nil { + return err + } + + w, err := NewWorker( + cnf, + logger, + cmd.Context(), + ) + if err != nil { + return err + } + + defer w.Stop() + + return w.Run(cmd.Context()) + }, + } + + cmd.Flags().StringVar(&id, "id", "", "The worker id") + + return cmd +} diff --git a/pkg/query-validator/const.go b/pkg/query-validator/const.go new file mode 100644 index 000000000..5fdc9c8ac --- /dev/null +++ b/pkg/query-validator/const.go @@ -0,0 +1,23 @@ +package query_validator + +import "time" + +const ( + JobQueueTopic = "query-validator-jobs-queue" + JobResultQueueTopic = "query-validator-results-queue" + ConsumerGroup = "query-validator-worker" + StreamName = "query-validator" + + JobTimeoutMinutes = 5 + JobTimeout = JobTimeoutMinutes * time.Minute +) + +type QueryError string + +const ( + MissingPlatformResourceIDQueryError QueryError = "missing_platform_resource_id" + MissingAccountIDQueryError QueryError = "missing_account_id" + MissingResourceQueryError QueryError = "missing_resource" + MissingResourceTypeQueryError QueryError = "missing_resource_type" + ResourceNotFoundQueryError QueryError = "resource_not_found" +) diff --git a/pkg/query-validator/job.go b/pkg/query-validator/job.go new file mode 100644 index 000000000..d828b2d80 --- /dev/null +++ b/pkg/query-validator/job.go @@ -0,0 +1,172 @@ +package query_validator + +import ( + "context" + "encoding/json" + "fmt" + "github.com/labstack/echo/v4" + "github.com/opengovern/og-util/pkg/es" + "github.com/opengovern/og-util/pkg/integration" + "github.com/opengovern/og-util/pkg/opengovernance-es-sdk" + integration_type "github.com/opengovern/opengovernance/services/integration/integration-type" + "github.com/opengovern/opengovernance/services/inventory/api" + "go.uber.org/zap" + "net/http" + "regexp" + "strings" +) + +type QueryType string + +const ( + QueryTypeNamedQuery QueryType = "NAMED_QUERY" + QueryTypeComplianceControl QueryType = "COMPLIANCE_CONTROL" +) + +type Job struct { + ID uint `json:"id"` + + QueryType QueryType `json:"query_type"` + ControlId string `json:"control_id"` + QueryId string `json:"query_id"` + Parameters []api.QueryParameter `json:"parameters"` + Query string `json:"query"` + IntegrationType []integration.Type `json:"integration_type"` + PrimaryTable *string `json:"primary_table"` + ListOfTables []string `json:"list_of_tables"` +} + +func (w *Worker) RunJob(ctx context.Context, job Job) error { + ctx, cancel := context.WithTimeout(ctx, JobTimeout) + defer cancel() + res, err := w.RunSQLNamedQuery(ctx, job.Query) + if err != nil { + return err + } + + if job.QueryType == QueryTypeComplianceControl { + queryResourceType := "" + if job.PrimaryTable != nil || len(job.ListOfTables) == 1 { + tableName := "" + if job.PrimaryTable != nil { + tableName = *job.PrimaryTable + } else { + tableName = job.ListOfTables[0] + } + if tableName != "" { + queryResourceType, _, err = GetResourceTypeFromTableName(tableName, job.IntegrationType) + if err != nil { + return err + } + } + } + if queryResourceType == "" { + return fmt.Errorf(string(MissingResourceTypeQueryError)) + } + + esIndex := ResourceTypeToESIndex(queryResourceType) + + for _, record := range res.Data { + if len(record) != len(res.Headers) { + return fmt.Errorf("invalid record length, record=%d headers=%d", len(record), len(res.Headers)) + } + recordValue := make(map[string]any) + for idx, header := range res.Headers { + value := record[idx] + recordValue[header] = value + } + + var platformResourceID string + if v, ok := recordValue["og_resource_id"].(string); ok { + platformResourceID = v + } else { + return fmt.Errorf(string(MissingPlatformResourceIDQueryError)) + } + if _, ok := recordValue["og_account_id"].(string); !ok { + return fmt.Errorf(string(MissingAccountIDQueryError)) + } + if v, ok := recordValue["resource"].(string); !ok || v == "" || v == "null" { + return fmt.Errorf(string(MissingResourceQueryError)) + } + err = w.SearchResourceTypeByPlatformID(ctx, esIndex, platformResourceID) + if err != nil { + return err + } + } + } + + return nil +} + +func GetResourceTypeFromTableName(tableName string, queryIntegrationType []integration.Type) (string, integration.Type, error) { + var integrationType integration.Type + if len(queryIntegrationType) == 1 { + integrationType = queryIntegrationType[0] + } else { + integrationType = "" + } + integration, ok := integration_type.IntegrationTypes[integrationType] + if !ok { + return "", "", echo.NewHTTPError(http.StatusInternalServerError, "unknown integration type") + } + return integration.GetResourceTypeFromTableName(tableName), integrationType, nil +} + +var stopWordsRe = regexp.MustCompile(`\W+`) + +func ResourceTypeToESIndex(t string) string { + t = stopWordsRe.ReplaceAllString(t, "_") + return strings.ToLower(t) +} + +func (w *Worker) SearchResourceTypeByPlatformID(ctx context.Context, index string, platformID string) error { + var filters []opengovernance.BoolFilter + + filters = append(filters, opengovernance.NewTermsFilter("platformResourceID", []string{platformID})) + + root := map[string]any{} + + root["query"] = map[string]any{ + "bool": map[string]any{ + "filter": filters, + }, + } + + queryBytes, err := json.Marshal(root) + if err != nil { + w.logger.Error("SearchResourceTypeByPlatformID", zap.Error(err), zap.String("query", string(queryBytes)), zap.String("index", index)) + return err + } + + w.logger.Info("SearchResourceTypeByPlatformID", zap.String("query", string(queryBytes)), zap.String("index", index)) + + var resp SearchResourceTypeByPlatformIDResponse + err = w.esClient.Search(ctx, index, string(queryBytes), &resp) + if err != nil { + w.logger.Error("SearchResourceTypeByPlatformID", zap.Error(err), zap.String("query", string(queryBytes)), zap.String("index", index)) + return err + } + if len(resp.Hits.Hits) > 0 { + w.logger.Info("SearchResourceTypeByPlatformID", zap.String("query", string(queryBytes)), zap.String("index", index), + zap.String("platformID", platformID), zap.Any("result", resp.Hits.Hits)) + } else { + return fmt.Errorf(string(ResourceNotFoundQueryError)) + } + return nil +} + +type SearchResourceTypeByPlatformIDHit struct { + ID string `json:"_id"` + Score float64 `json:"_score"` + Index string `json:"_index"` + Type string `json:"_type"` + Version int64 `json:"_version,omitempty"` + Source es.Resource `json:"_source"` + Sort []any `json:"sort"` +} + +type SearchResourceTypeByPlatformIDResponse struct { + Hits struct { + Hits []SearchResourceTypeByPlatformIDHit `json:"hits"` + } `json:"hits"` +} diff --git a/pkg/query-validator/job_result.go b/pkg/query-validator/job_result.go new file mode 100644 index 000000000..5a273d3b8 --- /dev/null +++ b/pkg/query-validator/job_result.go @@ -0,0 +1,21 @@ +package query_validator + +type QueryValidatorStatus string + +const ( + QueryValidatorCreated QueryValidatorStatus = "CREATED" + QueryValidatorQueued QueryValidatorStatus = "QUEUED" + QueryValidatorInProgress QueryValidatorStatus = "IN_PROGRESS" + QueryValidatorSucceeded QueryValidatorStatus = "SUCCEEDED" + QueryValidatorFailed QueryValidatorStatus = "FAILED" + QueryValidatorTimeOut QueryValidatorStatus = "TIMEOUT" +) + +type JobResult struct { + ID uint `json:"id"` + QueryType QueryType `json:"query_type"` + ControlId string `json:"control_id"` + QueryId string `json:"query_id"` + Status QueryValidatorStatus `json:"status"` + FailureMessage string `json:"failure_message"` +} diff --git a/pkg/query-validator/service.go b/pkg/query-validator/service.go new file mode 100644 index 000000000..466a326ef --- /dev/null +++ b/pkg/query-validator/service.go @@ -0,0 +1,221 @@ +package query_validator + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/opengovern/og-util/pkg/config" + "github.com/opengovern/og-util/pkg/jq" + "github.com/opengovern/og-util/pkg/opengovernance-es-sdk" + "github.com/opengovern/og-util/pkg/source" + "github.com/opengovern/og-util/pkg/steampipe" + "go.uber.org/zap" +) + +type Config struct { + ElasticSearch config.ElasticSearch + NATS config.NATS + Compliance config.OpenGovernanceService + Onboard config.OpenGovernanceService + Inventory config.OpenGovernanceService + Metadata config.OpenGovernanceService + EsSink config.OpenGovernanceService + Steampipe config.Postgres + PrometheusPushAddress string +} + +type Worker struct { + config Config + logger *zap.Logger + steampipeConn *steampipe.Database + esClient opengovernance.Client + jq *jq.JobQueue +} + +func NewWorker( + config Config, + logger *zap.Logger, + ctx context.Context, +) (*Worker, error) { + err := steampipe.PopulateSteampipeConfig(config.ElasticSearch, source.CloudAWS) + if err != nil { + return nil, err + } + err = steampipe.PopulateSteampipeConfig(config.ElasticSearch, source.CloudAzure) + if err != nil { + return nil, err + } + + if err := steampipe.PopulateOpenGovernancePluginSteampipeConfig(config.ElasticSearch, config.Steampipe); err != nil { + return nil, err + } + + time.Sleep(2 * time.Minute) + + steampipeConn, err := steampipe.StartSteampipeServiceAndGetConnection(logger) + if err != nil { + return nil, err + } + + esClient, err := opengovernance.NewClient(opengovernance.ClientConfig{ + Addresses: []string{config.ElasticSearch.Address}, + Username: &config.ElasticSearch.Username, + Password: &config.ElasticSearch.Password, + IsOnAks: &config.ElasticSearch.IsOnAks, + IsOpenSearch: &config.ElasticSearch.IsOpenSearch, + AwsRegion: &config.ElasticSearch.AwsRegion, + AssumeRoleArn: &config.ElasticSearch.AssumeRoleArn, + }) + if err != nil { + return nil, err + } + + jq, err := jq.New(config.NATS.URL, logger) + if err != nil { + return nil, err + } + + if err := jq.Stream(ctx, StreamName, "compliance runner job queue", []string{JobQueueTopic, JobResultQueueTopic}, 1000); err != nil { + return nil, err + } + + w := &Worker{ + config: config, + logger: logger, + steampipeConn: steampipeConn, + esClient: esClient, + jq: jq, + } + + return w, nil +} + +// Run is a blocking function so you may decide to call it in another goroutine. +// It runs a NATS consumer and it will close it when the given context is closed. +func (w *Worker) Run(ctx context.Context) error { + w.logger.Info("starting to consume") + + queueTopic := JobQueueTopic + consumer := ConsumerGroup + + consumeCtx, err := w.jq.ConsumeWithConfig(ctx, consumer, StreamName, []string{queueTopic}, + jetstream.ConsumerConfig{ + DeliverPolicy: jetstream.DeliverAllPolicy, + AckPolicy: jetstream.AckExplicitPolicy, + AckWait: time.Hour, + MaxDeliver: 1, + InactiveThreshold: time.Hour, + Replicas: 1, + MemoryStorage: false, + }, nil, + func(msg jetstream.Msg) { + w.logger.Info("received a new job") + w.logger.Info("committing") + if err := msg.InProgress(); err != nil { + w.logger.Error("failed to send the initial in progress message", zap.Error(err), zap.Any("msg", msg)) + } + ticker := time.NewTicker(15 * time.Second) + go func() { + for range ticker.C { + if err := msg.InProgress(); err != nil { + w.logger.Error("failed to send an in progress message", zap.Error(err), zap.Any("msg", msg)) + } + } + }() + + err := w.ProcessMessage(ctx, msg) + if err != nil { + w.logger.Error("failed to process message", zap.Error(err)) + } + ticker.Stop() + + if err := msg.Ack(); err != nil { + w.logger.Error("failed to send the ack message", zap.Error(err), zap.Any("msg", msg)) + } + + w.logger.Info("processing a job completed") + }) + if err != nil { + return err + } + + w.logger.Info("consuming") + + <-ctx.Done() + consumeCtx.Drain() + consumeCtx.Stop() + + return nil +} + +func (w *Worker) ProcessMessage(ctx context.Context, msg jetstream.Msg) (err error) { + var job Job + + if err = json.Unmarshal(msg.Data(), &job); err != nil { + return err + } + + w.logger.Info("job message delivered", zap.String("jobID", strconv.Itoa(int(job.ID)))) + + result := JobResult{ + ID: job.ID, + QueryType: job.QueryType, + QueryId: job.QueryId, + ControlId: job.ControlId, + Status: QueryValidatorInProgress, + FailureMessage: "", + } + + defer func() { + if err != nil { + result.FailureMessage = err.Error() + result.Status = QueryValidatorFailed + } else { + result.Status = QueryValidatorSucceeded + } + + w.logger.Info("job is finished with status", zap.String("ID", strconv.Itoa(int(job.ID))), zap.String("status", string(result.Status))) + + resultJson, err := json.Marshal(result) + if err != nil { + w.logger.Error("failed to create job result json", zap.Error(err)) + return + } + + if _, err := w.jq.Produce(ctx, JobResultQueueTopic, resultJson, fmt.Sprintf("query-runner-result-%d", job.ID)); err != nil { + w.logger.Error("failed to publish job result", zap.String("jobResult", string(resultJson)), zap.Error(err)) + } + }() + + resultJson, err := json.Marshal(result) + if err != nil { + w.logger.Error("failed to create job in progress json", zap.Error(err)) + return err + } + + if _, err := w.jq.Produce(ctx, JobResultQueueTopic, resultJson, fmt.Sprintf("query-validator-inprogress-%d", job.ID)); err != nil { + w.logger.Error("failed to publish job in progress", zap.String("jobInProgress", string(resultJson)), zap.Error(err)) + } + + w.logger.Info("running job", zap.ByteString("job", msg.Data())) + + err = w.RunJob(ctx, job) + if err != nil { + return err + } + + return nil +} + +func (w *Worker) Stop() error { + w.steampipeConn.Conn().Close() + err := steampipe.StopSteampipeService(w.logger) + if err != nil { + return err + } + return nil +} diff --git a/pkg/query-validator/sql_runner.go b/pkg/query-validator/sql_runner.go new file mode 100644 index 000000000..07e074251 --- /dev/null +++ b/pkg/query-validator/sql_runner.go @@ -0,0 +1,38 @@ +package query_validator + +import ( + "context" + "net/http" + "time" + + "github.com/labstack/echo/v4" + "github.com/opengovern/og-util/pkg/steampipe" + inventoryApi "github.com/opengovern/opengovernance/services/inventory/api" + "go.uber.org/zap" +) + +func (w *Worker) RunSQLNamedQuery(ctx context.Context, query string) (*steampipe.Result, error) { + var err error + + direction := inventoryApi.DirectionType("") + + for i := 0; i < 10; i++ { + err = w.steampipeConn.Conn().Ping(ctx) + if err == nil { + break + } + time.Sleep(time.Second) + } + if err != nil { + w.logger.Error("failed to ping steampipe", zap.Error(err)) + return nil, echo.NewHTTPError(http.StatusInternalServerError, err.Error()) + } + + w.logger.Info("executing named query", zap.String("query", query)) + res, err := w.steampipeConn.Query(ctx, query, nil, nil, "", steampipe.DirectionType(direction)) + if err != nil { + return nil, echo.NewHTTPError(http.StatusBadRequest, err.Error()) + } + + return res, nil +} diff --git a/pkg/query-validator/types.go b/pkg/query-validator/types.go new file mode 100644 index 000000000..0f39207ae --- /dev/null +++ b/pkg/query-validator/types.go @@ -0,0 +1,6 @@ +package query_validator + +type QueryResult struct { + Headers []string `json:"headers"` // Column names + Result [][]any `json:"result"` // Result of query. in order to access a specific cell please use Result[Row][Column] +} diff --git a/services/compliance/runner/job.go b/services/compliance/runner/job.go index 418b151af..c743e7fd5 100644 --- a/services/compliance/runner/job.go +++ b/services/compliance/runner/job.go @@ -77,13 +77,6 @@ func (w *Worker) Initialize(ctx context.Context, j Job) error { } func (w *Worker) RunJob(ctx context.Context, j Job) (int, error) { - //cutOff := time.Now().AddDate(0, -3, 0) - //w.logger.Info("Deleting old complianceResults", zap.Uint("job_id", j.ID), zap.Time("cut_off", cutOff)) - //if err := w.handleOldComplianceResultsStateByTime(ctx, cutOff, false); err != nil { - // w.logger.Error("failed to delete old complianceResults", zap.Error(err), zap.Uint("job_id", j.ID), zap.Time("cut_off", cutOff)) - // return 0, err - //} - w.logger.Info("Running query", zap.Uint("job_id", j.ID), zap.String("query_id", j.ExecutionPlan.Query.ID), diff --git a/services/inventory/client/inventory.go b/services/inventory/client/inventory.go index c3d6a6b3f..1980a41a4 100644 --- a/services/inventory/client/inventory.go +++ b/services/inventory/client/inventory.go @@ -19,6 +19,7 @@ import ( type InventoryServiceClient interface { RunQuery(ctx *httpclient.Context, req api.RunQueryRequest) (*api.RunQueryResponse, error) GetQuery(ctx *httpclient.Context, id string) (*api.NamedQueryItemV2, error) + ListQueriesV2(ctx *httpclient.Context) (*api.ListQueriesV2Response, error) CountResources(ctx *httpclient.Context) (int64, error) ListIntegrationsData(ctx *httpclient.Context, integrationIds []string, resourceCollections []string, startTime, endTime *time.Time, metricIDs []string, needCost, needResourceCount bool) (map[string]api.ConnectionData, error) ListResourceTypesMetadata(ctx *httpclient.Context, integrationTypes []integration.Type, services []string, resourceTypes []string, summarized bool, tags map[string]string, pageSize, pageNumber int) (*api.ListResourceTypeMetadataResponse, error) @@ -84,6 +85,18 @@ func (s *inventoryClient) GetQuery(ctx *httpclient.Context, id string) (*api.Nam return &namedQuery, nil } +func (s *inventoryClient) ListQueriesV2(ctx *httpclient.Context) (*api.ListQueriesV2Response, error) { + url := fmt.Sprintf("%s/api/v3/queries", s.baseURL) + + var namedQuery api.ListQueriesV2Response + if statusCode, err := httpclient.DoRequest(ctx.Ctx, http.MethodGet, url, ctx.ToHeaders(), nil, &namedQuery); err != nil { + if statusCode == http.StatusNotFound { + return nil, nil + } + } + return &namedQuery, nil +} + func (s *inventoryClient) ListAnalyticsMetrics(ctx *httpclient.Context, metricType *analyticsDB.MetricType) ([]api.AnalyticsMetric, error) { url := fmt.Sprintf("%s/api/v2/analytics/metrics/list", s.baseURL)