diff --git a/services/describe/api/jobs.go b/services/describe/api/jobs.go index e84a9b26b..e84e9e208 100644 --- a/services/describe/api/jobs.go +++ b/services/describe/api/jobs.go @@ -183,6 +183,7 @@ type RunDiscoveryRequest struct { ResourceTypes []string `json:"resource_types"` ForceFull bool `json:"force_full"` // force full discovery. only matters if ResourceTypes is empty IntegrationInfo []IntegrationInfoFilter `json:"integration_info"` + Parameters map[string][]string `json:"parameters"` } type RunDiscoveryJob struct { diff --git a/services/describe/db/model/describe.go b/services/describe/db/model/describe.go index 7a9bfde41..b1437038c 100644 --- a/services/describe/db/model/describe.go +++ b/services/describe/db/model/describe.go @@ -1,6 +1,7 @@ package model import ( + "github.com/jackc/pgtype" "time" "github.com/lib/pq" @@ -50,6 +51,8 @@ type DescribeIntegrationJob struct { ProviderID string TriggerType enums.DescribeTriggerType + Parameters pgtype.JSONB // map[string][]string + ResourceType string `gorm:"index:idx_resource_type_status;index"` Status api.DescribeResourceJobStatus `gorm:"index:idx_resource_type_status;index"` RetryCount int diff --git a/services/describe/scheduler_describe.go b/services/describe/scheduler_describe.go index 0b81acc68..94938d2f4 100644 --- a/services/describe/scheduler_describe.go +++ b/services/describe/scheduler_describe.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/jackc/pgtype" "github.com/opengovern/opencomply/services/integration/api/models" "math/rand" "time" @@ -225,7 +226,7 @@ func (s *Scheduler) scheduleDescribeJob(ctx context.Context) { zap.String("integration_type", string(integration.IntegrationType)), zap.String("resource_types", fmt.Sprintf("%v", len(resourceTypes)))) for resourceType, _ := range resourceTypes { - _, err = s.describe(integration, resourceType, true, false, false, nil, "system") + _, err = s.describe(integration, resourceType, true, false, false, nil, "system", nil) if err != nil { s.logger.Error("failed to describe connection", zap.String("integration_id", integration.IntegrationID), zap.String("resource_type", resourceType), zap.Error(err)) } @@ -268,7 +269,7 @@ func (s *Scheduler) retryFailedJobs(ctx context.Context) error { } func (s *Scheduler) describe(integration integrationapi.Integration, resourceType string, scheduled bool, costFullDiscovery bool, - removeResources bool, parentId *uint, createdBy string) (*model.DescribeIntegrationJob, error) { + removeResources bool, parentId *uint, createdBy string, parameters map[string][]string) (*model.DescribeIntegrationJob, error) { integrationType, ok := integration_type.IntegrationTypes[integration.IntegrationType] if !ok { @@ -338,8 +339,19 @@ func (s *Scheduler) describe(integration integrationapi.Integration, resourceTyp if costFullDiscovery { triggerType = enums.DescribeTriggerTypeCostFullDiscovery } + + if parameters == nil { + parameters = make(map[string][]string) + } + parametersJsonData, err := json.Marshal(parameters) + if err != nil { + return nil, err + } + parametersJsonb := pgtype.JSONB{} + err = parametersJsonb.Set(parametersJsonData) + s.logger.Debug("Connection is due for a describe. Creating a job now", zap.String("IntegrationID", integration.IntegrationID), zap.String("resourceType", resourceType)) - daj := newDescribeConnectionJob(integration, resourceType, triggerType, parentId, createdBy) + daj := newDescribeConnectionJob(integration, resourceType, triggerType, parentId, createdBy, parametersJsonb) if removeResources { daj.Status = apiDescribe.DescribeResourceJobRemovingResources } @@ -355,7 +367,7 @@ func (s *Scheduler) describe(integration integrationapi.Integration, resourceTyp } func newDescribeConnectionJob(a integrationapi.Integration, resourceType string, triggerType enums.DescribeTriggerType, - parentId *uint, createdBy string) model.DescribeIntegrationJob { + parentId *uint, createdBy string, parameters pgtype.JSONB) model.DescribeIntegrationJob { return model.DescribeIntegrationJob{ CreatedBy: createdBy, ParentID: parentId, @@ -365,6 +377,7 @@ func newDescribeConnectionJob(a integrationapi.Integration, resourceType string, TriggerType: triggerType, ResourceType: resourceType, Status: apiDescribe.DescribeResourceJobCreated, + Parameters: parameters, } } @@ -386,6 +399,13 @@ func (s *Scheduler) enqueueCloudNativeDescribeJob(ctx context.Context, dc model. zap.String("resourceType", dc.ResourceType), ) + var parameters map[string][]string + if dc.Parameters.Status == pgtype.Present { + if err := json.Unmarshal(dc.Parameters.Bytes, ¶meters); err != nil { + return err + } + } + input := describe.DescribeWorkerInput{ JobEndpoint: s.describeExternalEndpoint, DeliverEndpoint: s.describeExternalEndpoint, @@ -409,7 +429,7 @@ func (s *Scheduler) enqueueCloudNativeDescribeJob(ctx context.Context, dc model. RetryCounter: 0, }, - ExtraInputs: nil, + ExtraInputs: parameters, } if err := s.db.QueueDescribeIntegrationJob(dc.ID); err != nil { diff --git a/services/describe/scheduler_quickscan_sequence.go b/services/describe/scheduler_quickscan_sequence.go index ab1976721..3f1c81b5b 100644 --- a/services/describe/scheduler_quickscan_sequence.go +++ b/services/describe/scheduler_quickscan_sequence.go @@ -162,7 +162,7 @@ func (s *DescribeDependencies) Do(ctx context.Context) error { if _, ok := validResourceTypes[resourceType]; !ok { continue } - _, err = s.s.describe(integration, resourceType, false, false, false, &s.job.ID, "QuickScanSequencer") + _, err = s.s.describe(integration, resourceType, false, false, false, &s.job.ID, "QuickScanSequencer", nil) if err != nil { return err } diff --git a/services/describe/server.go b/services/describe/server.go index 24270045f..2b24b5bdc 100644 --- a/services/describe/server.go +++ b/services/describe/server.go @@ -401,7 +401,7 @@ func (h HttpServer) TriggerPerConnectionDescribeJob(ctx echo.Context) error { } for _, resourceType := range resourceTypes { - daj, err := h.Scheduler.describe(src, resourceType, false, costFullDiscovery, false, nil, userID) + daj, err := h.Scheduler.describe(src, resourceType, false, costFullDiscovery, false, nil, userID, nil) if err != nil && errors.Is(err, ErrJobInProgress) { return echo.NewHTTPError(http.StatusConflict, err.Error()) } @@ -461,7 +461,7 @@ func (h HttpServer) TriggerDescribeJob(ctx echo.Context) error { } for _, resourceType := range rtToDescribe { - _, err = h.Scheduler.describe(integration, resourceType, false, false, false, nil, userID) + _, err = h.Scheduler.describe(integration, resourceType, false, false, false, nil, userID, nil) if err != nil { h.Scheduler.logger.Error("failed to describe connection", zap.String("integration_id", integration.IntegrationID), zap.Error(err)) } @@ -732,7 +732,7 @@ func (h HttpServer) ReEvaluateComplianceJob(ctx echo.Context) error { var dependencyIDs []int64 for _, describeJob := range describeJobs { - daj, err := h.Scheduler.describe(describeJob.Integration, describeJob.ResourceType, false, false, false, nil, userID) + daj, err := h.Scheduler.describe(describeJob.Integration, describeJob.ResourceType, false, false, false, nil, userID, nil) if err != nil { h.Scheduler.logger.Error("failed to describe connection", zap.String("integration_id", describeJob.Integration.IntegrationID), zap.Error(err)) continue @@ -1550,7 +1550,7 @@ func (h HttpServer) RunDiscovery(ctx echo.Context) error { continue } var status, failureReason string - job, err := h.Scheduler.describe(integration, resourceType, false, false, false, &integrationDiscovery.ID, userID) + job, err := h.Scheduler.describe(integration, resourceType, false, false, false, &integrationDiscovery.ID, userID, request.Parameters) if err != nil { if err.Error() == "job already in progress" { tmpJob, err := h.Scheduler.db.GetLastDescribeIntegrationJob(integration.IntegrationID, resourceType)