Skip to content

Commit

Permalink
Merge pull request #2408 from opengovern/fix-queries
Browse files Browse the repository at this point in the history
fix: add validator for quick scan
  • Loading branch information
artaasadi authored Jan 10, 2025
2 parents ee042fa + 91dba20 commit 19f822a
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 3 deletions.
5 changes: 2 additions & 3 deletions services/scheduler/db/compliance_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import (
)

const (
runnerRetryCount = 1
runnerQueueTimeoutMinutes = 60 // Minutes
runnerRetryCount = 1
)

func (db Database) CreateRunnerJobs(tx *gorm.DB, runners []*model.ComplianceRunner) error {
Expand Down Expand Up @@ -129,7 +128,7 @@ func (db Database) UpdateRunnerJobNatsSeqNum(
func (db Database) UpdateTimeoutQueuedRunnerJobs() error {
tx := db.ORM.
Model(&model.ComplianceRunner{}).
Where("created_at < NOW() - INTERVAL '? MINUTES'", runnerQueueTimeoutMinutes).
Where("created_at < NOW() - INTERVAL '60 MINUTES'").
Where("status IN ?", []string{string(runner.ComplianceRunnerCreated), string(runner.ComplianceRunnerQueued)}).
Updates(model.ComplianceRunner{Status: runner.ComplianceRunnerTimeOut, FailureMessage: "Job timed out"})
if tx.Error != nil {
Expand Down
16 changes: 16 additions & 0 deletions services/scheduler/schedulers/compliance-quick-run/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ func (s *JobScheduler) runPublisher(ctx context.Context) error {
}
s.logger.Info("Fetch Created Policy Runner Jobs", zap.Any("Jobs Count", len(jobs)))
for _, job := range jobs {
if job.Status == model.ComplianceJobCreated {
framework, err := s.complianceClient.GetBenchmark(&httpclient.Context{UserRole: api.AdminRole}, job.FrameworkID)
if err != nil {
s.logger.Error("error while getting framework", zap.String("frameworkID", job.FrameworkID), zap.Error(err))
continue
}
if framework == nil {
s.logger.Error("framework not exist", zap.String("frameworkID", job.FrameworkID))
continue
}
err = s.ValidateComplianceJob(*framework)
if err != nil {
_ = s.db.UpdateComplianceJob(job.ID, model.ComplianceJobFailed, err.Error())
}
}

auditJobMsg := auditjob.AuditJob{
JobID: job.ID,
FrameworkID: job.FrameworkID,
Expand Down
156 changes: 156 additions & 0 deletions services/scheduler/schedulers/compliance-quick-run/validate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package compliance_quick_run

import (
"fmt"
api2 "github.com/opengovern/og-util/pkg/api"
"github.com/opengovern/og-util/pkg/httpclient"
"github.com/opengovern/og-util/pkg/integration"
"github.com/opengovern/opencomply/services/compliance/api"
integration_type "github.com/opengovern/opencomply/services/integration/integration-type"
"github.com/opengovern/opencomply/services/integration/integration-type/interfaces"
"go.uber.org/zap"
)

func (s *JobScheduler) ValidateComplianceJob(framework api.Benchmark) error {
listOfTables, err := s.getTablesUnderBenchmark(framework, make(map[string]FrameworkTablesCache))
if err != nil {
return err
}

var integrationTypes []interfaces.IntegrationType
for _, itName := range framework.IntegrationTypes {
if it, ok := integration_type.IntegrationTypes[integration.Type(itName)]; ok {
integrationTypes = append(integrationTypes, it)
} else {
return fmt.Errorf("integration type not valid: %s", itName)
}
}

for table := range listOfTables {
for _, it := range integrationTypes {
if rt := it.GetResourceTypeFromTableName(table); rt == "" {
return fmt.Errorf("invalid table name: %s", table)
}
}
}

listOfParameters, err := s.getParametersUnderFramework(framework, make(map[string]FrameworkParametersCache))
if err != nil {
return err
}
queryParams, err := s.coreClient.ListQueryParameters(&httpclient.Context{UserRole: api2.AdminRole})
if err != nil {
s.logger.Error("failed to get query parameters", zap.Error(err))
return err
}
queryParamMap := make(map[string]string)
for _, qp := range queryParams.Items {
if qp.Value != "" {
queryParamMap[qp.Key] = qp.Value
}
}

for param := range listOfParameters {
if _, ok := queryParamMap[param]; !ok {
return fmt.Errorf("query parameter %s not exists", param)
}
}
return nil
}

type FrameworkTablesCache struct {
ListTables map[string]bool
}

type FrameworkParametersCache struct {
ListParameters map[string]bool
}

// getTablesUnderBenchmark ctx context.Context, benchmarkId string -> primaryTables, listOfTables, error
func (s *JobScheduler) getTablesUnderBenchmark(framework api.Benchmark, benchmarkCache map[string]FrameworkTablesCache) (map[string]bool, error) {
listOfTables := make(map[string]bool)

controls, err := s.complianceClient.ListControl(&httpclient.Context{UserRole: api2.AdminRole}, framework.Controls, nil)
if err != nil {
s.logger.Error("failed to fetch controls", zap.Error(err))
return nil, err
}
for _, c := range controls {
if c.Policy != nil {
for _, t := range c.Policy.ListOfResources {
if t == "" {
continue
}
listOfTables[t] = true
}
}
}

children, err := s.complianceClient.ListBenchmarks(&httpclient.Context{UserRole: api2.AdminRole}, framework.Children, nil)
if err != nil {
s.logger.Error("failed to fetch children", zap.Error(err))
return nil, err
}
for _, child := range children {
var childListOfTables map[string]bool
if cache, ok := benchmarkCache[child.ID]; ok {
childListOfTables = cache.ListTables
} else {
childListOfTables, err = s.getTablesUnderBenchmark(child, benchmarkCache)
if err != nil {
return nil, err
}
benchmarkCache[child.ID] = FrameworkTablesCache{
ListTables: childListOfTables,
}
}

for k, _ := range childListOfTables {
childListOfTables[k] = true
}
}
return listOfTables, nil
}

// getParametersUnderFramework ctx context.Context, benchmarkId string -> primaryTables, listOfTables, error
func (s *JobScheduler) getParametersUnderFramework(framework api.Benchmark, frameworkCache map[string]FrameworkParametersCache) (map[string]bool, error) {
listOfParameters := make(map[string]bool)

controls, err := s.complianceClient.ListControl(&httpclient.Context{UserRole: api2.AdminRole}, framework.Controls, nil)
if err != nil {
s.logger.Error("failed to fetch controls", zap.Error(err))
return nil, err
}
for _, c := range controls {
if c.Policy != nil {
for _, t := range c.Policy.Parameters {
listOfParameters[t.Key] = true
}
}
}

children, err := s.complianceClient.ListBenchmarks(&httpclient.Context{UserRole: api2.AdminRole}, framework.Children, nil)
if err != nil {
s.logger.Error("failed to fetch children", zap.Error(err))
return nil, err
}
for _, child := range children {
var childListOfParameters map[string]bool
if cache, ok := frameworkCache[child.ID]; ok {
childListOfParameters = cache.ListParameters
} else {
childListOfParameters, err = s.getParametersUnderFramework(child, frameworkCache)
if err != nil {
return nil, err
}
frameworkCache[child.ID] = FrameworkParametersCache{
ListParameters: childListOfParameters,
}
}

for k, _ := range childListOfParameters {
childListOfParameters[k] = true
}
}
return listOfParameters, nil
}

0 comments on commit 19f822a

Please sign in to comment.