Skip to content

Commit

Permalink
Merge pull request #1995 from opengovern/feat-query-validator
Browse files Browse the repository at this point in the history
fix: add query validator scheduled
  • Loading branch information
artaasadi authored Nov 15, 2024
2 parents bd3023e + f54e5d1 commit ccc43d7
Show file tree
Hide file tree
Showing 19 changed files with 1,163 additions and 11 deletions.
41 changes: 41 additions & 0 deletions .github/workflows/go.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ jobs:
wastage-service: ${{ steps.build_services.outputs.wastage-service }}
information-service: ${{ steps.build_services.outputs.information-service }}
query-runner-worker: ${{ steps.build_services.outputs.query-runner-worker }}
query-validator-worker: ${{ steps.build_services.outputs.query-validator-worker }}
demo-importer-worker: ${{ steps.build_services.outputs.demo-importer-worker }}

env:
Expand Down Expand Up @@ -822,6 +823,46 @@ jobs:
PLUGIN_REGISTRY=ghcr.io/opengovern
context: .

deploy-query-validator-worker:
runs-on: ubuntu-latest
needs:
- build
- tag
- deploy-steampipe-plugin-opengovernance
- deploy-steampipe
permissions:
id-token: write
contents: read
environment: docker
if: needs.build.outputs.query-validator-worker == 'true' && github.event_name != 'pull_request'
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Download artifact
uses: actions/download-artifact@v3
with:
name: build
path: .
- name: Unpack artifact
run: |
tar -xvf build.tar.gz
- name: Log in to the Container registry
uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GHCR_PAT }}
- name: Build and push Docker images
uses: docker/build-push-action@v4
with:
push: true
tags: |
ghcr.io/${{ github.repository_owner }}/query-validator-worker:${{ needs.tag.outputs.latest_tag }}
file: docker/QueryValidatorWorkerDockerfile
build-args: |
PLUGIN_REGISTRY=ghcr.io/opengovern
context: .

deploy-import-data-script:
runs-on: ubuntu-latest
needs:
Expand Down
36 changes: 36 additions & 0 deletions cmd/query-validator-worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

queryvalidator "github.com/opengovern/opengovernance/pkg/query-validator"
)

func main() {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
defer func() {
signal.Stop(c)
cancel()
}()

go func() {
select {
case <-c:
cancel()
case <-ctx.Done():
}
}()

if err := queryvalidator.WorkerCommand().ExecuteContext(ctx); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
36 changes: 36 additions & 0 deletions docker/QueryValidatorWorkerDockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
ARG PLUGIN_REGISTRY
FROM ${PLUGIN_REGISTRY}/steampipe-plugin-aws:0.0.1 as aws
FROM ${PLUGIN_REGISTRY}/steampipe-plugin-azure:0.0.1 as azure
FROM ${PLUGIN_REGISTRY}/steampipe-plugin-azuread:0.0.1 as azuread
FROM ${PLUGIN_REGISTRY}/steampipe-plugin-opengovernance:0.0.1 as opengovernance

FROM ubuntu:20.04 AS base
RUN apt-get update && apt-get install -y \
curl \
sudo \
&& rm -rf /var/lib/apt/lists/*

RUN sudo /bin/sh -c "$(curl -fsSL https://steampipe.io/install/steampipe.sh)"

COPY --from=aws /steampipe-plugin-aws.plugin /home/steampipe/.steampipe/plugins/hub.steampipe.io/plugins/turbot/aws@latest/steampipe-plugin-aws.plugin
COPY --from=azure /steampipe-plugin-azure.plugin /home/steampipe/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azure@latest/steampipe-plugin-azure.plugin
COPY --from=azuread /steampipe-plugin-azuread.plugin /home/steampipe/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azuread@latest/steampipe-plugin-azuread.plugin
COPY --from=opengovernance /steampipe-plugin-opengovernance.plugin /home/steampipe/.steampipe/plugins/local/opengovernance/opengovernance.plugin

USER root
RUN useradd -ms /bin/bash steampipe
RUN mkdir -p /home/steampipe/.steampipe/config
RUN mkdir -p /home/steampipe/.steampipe/db
RUN mkdir -p /home/steampipe/.steampipe/db/14.2.0
RUN chown -R steampipe:steampipe /home/steampipe
RUN chmod -R 755 /home/steampipe
RUN apt update
RUN apt install -y procps htop
USER steampipe

RUN steampipe plugin list

COPY ./build/query-validator-worker /

ENTRYPOINT [ "/query-validator-worker" ]
CMD [ "/query-validator-worker" ]
14 changes: 14 additions & 0 deletions pkg/describe/db/model/query_validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package model

import (
queryvalidator "github.com/opengovern/opengovernance/pkg/query-validator"
"gorm.io/gorm"
)

type QueryValidatorJob struct {
gorm.Model
QueryId string
QueryType queryvalidator.QueryType
Status queryvalidator.QueryValidatorStatus
FailureMessage string
}
143 changes: 143 additions & 0 deletions pkg/describe/db/query_validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package db

import (
"errors"
"fmt"
queryrunner "github.com/opengovern/opengovernance/services/inventory/query-runner"

"github.com/opengovern/opengovernance/pkg/describe/db/model"
queryvalidator "github.com/opengovern/opengovernance/pkg/query-validator"
"gorm.io/gorm"
)

func (db Database) CreateQueryValidatorJob(job *model.QueryValidatorJob) (uint, error) {
tx := db.ORM.Create(job)
if tx.Error != nil {
return 0, tx.Error
}

return job.ID, nil
}

func (db Database) GetQueryValidatorJob(id uint) (*model.QueryValidatorJob, error) {
var job model.QueryValidatorJob
tx := db.ORM.Model(&model.QueryValidatorJob{}).Where("id = ?", id).First(&job)
if tx.Error != nil {
return nil, tx.Error
}
return &job, nil
}

func (db Database) ListQueryValidatorJobsById(ids []string) ([]model.QueryValidatorJob, error) {
var jobs []model.QueryValidatorJob
tx := db.ORM.Model(&model.QueryValidatorJob{}).Where("id IN ?", ids).Find(&jobs)
if tx.Error != nil {
return nil, tx.Error
}
return jobs, nil
}

func (db Database) ListQueryValidatorJobs() ([]model.QueryValidatorJob, error) {
var jobs []model.QueryValidatorJob
tx := db.ORM.Model(&model.QueryValidatorJob{}).Find(&jobs)
if tx.Error != nil {
return nil, tx.Error
}
return jobs, nil
}

func (db Database) FetchCreatedQueryValidatorJobs(limit int64) ([]model.QueryValidatorJob, error) {
var jobs []model.QueryValidatorJob
tx := db.ORM.Model(&model.QueryValidatorJob{}).Where("status = ?", queryvalidator.QueryValidatorCreated).Limit(int(limit)).Find(&jobs)
if tx.Error != nil {
return nil, tx.Error
}
return jobs, nil
}

func (db Database) GetInProgressJobsCount() (int64, error) {
var count int64
tx := db.ORM.Model(&model.QueryValidatorJob{}).Where("status IN ?", []string{string(queryvalidator.QueryValidatorInProgress),
string(queryvalidator.QueryValidatorQueued)}).Count(&count)
if tx.Error != nil {
return 0, tx.Error
}
return count, nil
}

func (db Database) DeleteQueryValidatorJob(id uint) error {
tx := db.ORM.Model(&model.QueryValidatorJob{}).Delete(&model.QueryValidatorJob{}, id)
if tx.Error != nil {
return tx.Error
}
return nil
}

func (db Database) UpdateQueryValidatorJobStatus(jobId uint, status queryvalidator.QueryValidatorStatus, failureReason string) error {
tx := db.ORM.Model(&model.QueryValidatorJob{}).Where("id = ?", jobId).
Updates(model.QueryValidatorJob{Status: status, FailureMessage: failureReason})
if tx.Error != nil {
return tx.Error
}
return nil
}

func (db Database) UpdateTimedOutInProgressQueryValidators() error {
tx := db.ORM.
Model(&model.QueryValidatorJob{}).
Where("status = ?", queryrunner.QueryRunnerInProgress).
Where("updated_at < NOW() - INTERVAL '5 MINUTES'").
Updates(model.QueryValidatorJob{Status: queryvalidator.QueryValidatorTimeOut, FailureMessage: "Job timed out"})
if tx.Error != nil {
return tx.Error
}

return nil
}

func (db Database) UpdateTimedOutQueuedQueryValidators() error {
tx := db.ORM.
Model(&model.QueryValidatorJob{}).
Where("status = ?", queryrunner.QueryRunnerQueued).
Where("updated_at < NOW() - INTERVAL '12 HOURS'").
Updates(model.QueryValidatorJob{Status: queryvalidator.QueryValidatorTimeOut, FailureMessage: "Job timed out"})
if tx.Error != nil {
return tx.Error
}

return nil
}

func (db Database) ListQueryValidatorJobForInterval(interval, triggerType, createdBy string) ([]model.QueryValidatorJob, error) {
var job []model.QueryValidatorJob

tx := db.ORM.Model(&model.QueryValidatorJob{})

if interval != "" {
tx = tx.Where(fmt.Sprintf("NOW() - updated_at < INTERVAL '%s'", interval))
}
if triggerType != "" {
tx = tx.Where("trigger_type = ?", triggerType)
}
if createdBy != "" {
tx = tx.Where("created_by = ?", createdBy)
}

tx = tx.Find(&job)

if tx.Error != nil {
if errors.Is(tx.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, tx.Error
}
return job, nil
}

func (db Database) CleanupAllQueryValidatorJobs() error {
tx := db.ORM.Where("1 = 1").Unscoped().Delete(&model.QueryValidatorJob{})
if tx.Error != nil {
return tx.Error
}
return nil
}
32 changes: 28 additions & 4 deletions pkg/describe/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
queryvalidator "github.com/opengovern/opengovernance/pkg/query-validator"
"net"
"net/http"
"strconv"
Expand All @@ -14,6 +15,7 @@ import (

"github.com/opengovern/og-util/pkg/opengovernance-es-sdk"
queryrunnerscheduler "github.com/opengovern/opengovernance/pkg/describe/schedulers/query-runner"
queryrvalidatorscheduler "github.com/opengovern/opengovernance/pkg/describe/schedulers/query-validator"
integration_type "github.com/opengovern/opengovernance/services/integration/integration-type"
queryrunner "github.com/opengovern/opengovernance/services/inventory/query-runner"

Expand Down Expand Up @@ -135,10 +137,11 @@ type Scheduler struct {
OperationMode OperationMode
MaxConcurrentCall int64

complianceScheduler *compliance.JobScheduler
discoveryScheduler *discovery.Scheduler
queryRunnerScheduler *queryrunnerscheduler.JobScheduler
conf config.SchedulerConfig
complianceScheduler *compliance.JobScheduler
discoveryScheduler *discovery.Scheduler
queryRunnerScheduler *queryrunnerscheduler.JobScheduler
queryValidatorScheduler *queryrvalidatorscheduler.JobScheduler
conf config.SchedulerConfig
}

func InitializeScheduler(
Expand Down Expand Up @@ -316,6 +319,11 @@ func (s *Scheduler) SetupNatsStreams(ctx context.Context) error {
return err
}

if err := s.jq.Stream(ctx, queryvalidator.StreamName, "Query Validator job queues", []string{queryvalidator.JobQueueTopic, queryvalidator.JobResultQueueTopic}, 1000); err != nil {
s.logger.Error("Failed to stream to Query Validator queue", zap.Error(err))
return err
}

if err := s.jq.Stream(ctx, summarizer.StreamName, "compliance summarizer job queues", []string{summarizer.JobQueueTopic, summarizer.JobQueueTopicManuals, summarizer.ResultQueueTopic}, 1000); err != nil {
s.logger.Error("Failed to stream to compliance summarizer queue", zap.Error(err))
return err
Expand Down Expand Up @@ -454,6 +462,22 @@ func (s *Scheduler) Run(ctx context.Context) error {
)
s.queryRunnerScheduler.Run(ctx)

// Query Validator
s.queryValidatorScheduler = queryrvalidatorscheduler.New(
func(ctx context.Context) error {
return s.SetupNatsStreams(ctx)
},
s.conf,
s.logger,
s.db,
s.jq,
s.es,
s.inventoryClient,
s.complianceClient,
s.metadataClient,
)
s.queryValidatorScheduler.Run(ctx)

// Compliance
s.complianceScheduler = compliance.New(
func(ctx context.Context) error {
Expand Down
40 changes: 40 additions & 0 deletions pkg/describe/schedulers/query-validator/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package query_runner

import (
"context"
"encoding/json"
"github.com/nats-io/nats.go/jetstream"
queryvalidator "github.com/opengovern/opengovernance/pkg/query-validator"
"go.uber.org/zap"
)

func (s *JobScheduler) RunQueryRunnerReportJobResultsConsumer(ctx context.Context) error {
if _, err := s.jq.Consume(ctx, "scheduler-query-validator", queryvalidator.StreamName, []string{queryvalidator.JobResultQueueTopic}, "scheduler-query-validator", func(msg jetstream.Msg) {
if err := msg.Ack(); err != nil {
s.logger.Error("Failed committing message", zap.Error(err))
}

var result queryvalidator.JobResult
if err := json.Unmarshal(msg.Data(), &result); err != nil {
s.logger.Error("Failed to unmarshal ComplianceReportJob results", zap.Error(err))
return
}

s.logger.Info("Processing ReportJobResult for Job",
zap.Uint("jobId", result.ID),
zap.String("status", string(result.Status)),
)
err := s.db.UpdateQueryValidatorJobStatus(result.ID, result.Status, result.FailureMessage)
if err != nil {
s.logger.Error("Failed to update the status of QueryRunnerReportJob",
zap.Uint("jobId", result.ID),
zap.Error(err))
return
}
}); err != nil {
return err
}

<-ctx.Done()
return nil
}
Loading

0 comments on commit ccc43d7

Please sign in to comment.