Skip to content

Commit

Permalink
Merge pull request #2288 from opengovern/fix-tasks
Browse files Browse the repository at this point in the history
Fix tasks
  • Loading branch information
artaasadi authored Dec 19, 2024
2 parents d33ceef + 422ab93 commit 54de292
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 14 deletions.
2 changes: 1 addition & 1 deletion services/describe/api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ type RunBenchmarkResponse struct {

type RunDiscoveryRequest struct {
ResourceTypes []string `json:"resource_types"`
ForceFull bool `json:"force_full"` // force full discovery. only matters if ResourceTypes is empty
IntegrationInfo []IntegrationInfoFilter `json:"integration_info"`
Parameters map[string][]string `json:"parameters"`
}

type RunDiscoveryJob struct {
Expand Down
3 changes: 3 additions & 0 deletions services/describe/db/model/describe.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package model

import (
"github.com/jackc/pgtype"
"time"

"github.com/lib/pq"
Expand Down Expand Up @@ -50,6 +51,8 @@ type DescribeIntegrationJob struct {
ProviderID string
TriggerType enums.DescribeTriggerType

Parameters pgtype.JSONB // map[string][]string

ResourceType string `gorm:"index:idx_resource_type_status;index"`
Status api.DescribeResourceJobStatus `gorm:"index:idx_resource_type_status;index"`
RetryCount int
Expand Down
30 changes: 25 additions & 5 deletions services/describe/scheduler_describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/jackc/pgtype"
"github.com/opengovern/opencomply/services/integration/api/models"
"math/rand"
"time"
Expand Down Expand Up @@ -225,7 +226,7 @@ func (s *Scheduler) scheduleDescribeJob(ctx context.Context) {
zap.String("integration_type", string(integration.IntegrationType)),
zap.String("resource_types", fmt.Sprintf("%v", len(resourceTypes))))
for resourceType, _ := range resourceTypes {
_, err = s.describe(integration, resourceType, true, false, false, nil, "system")
_, err = s.describe(integration, resourceType, true, false, false, nil, "system", nil)
if err != nil {
s.logger.Error("failed to describe connection", zap.String("integration_id", integration.IntegrationID), zap.String("resource_type", resourceType), zap.Error(err))
}
Expand Down Expand Up @@ -268,7 +269,7 @@ func (s *Scheduler) retryFailedJobs(ctx context.Context) error {
}

func (s *Scheduler) describe(integration integrationapi.Integration, resourceType string, scheduled bool, costFullDiscovery bool,
removeResources bool, parentId *uint, createdBy string) (*model.DescribeIntegrationJob, error) {
removeResources bool, parentId *uint, createdBy string, parameters map[string][]string) (*model.DescribeIntegrationJob, error) {

integrationType, ok := integration_type.IntegrationTypes[integration.IntegrationType]
if !ok {
Expand Down Expand Up @@ -338,8 +339,19 @@ func (s *Scheduler) describe(integration integrationapi.Integration, resourceTyp
if costFullDiscovery {
triggerType = enums.DescribeTriggerTypeCostFullDiscovery
}

if parameters == nil {
parameters = make(map[string][]string)
}
parametersJsonData, err := json.Marshal(parameters)
if err != nil {
return nil, err
}
parametersJsonb := pgtype.JSONB{}
err = parametersJsonb.Set(parametersJsonData)

s.logger.Debug("Connection is due for a describe. Creating a job now", zap.String("IntegrationID", integration.IntegrationID), zap.String("resourceType", resourceType))
daj := newDescribeConnectionJob(integration, resourceType, triggerType, parentId, createdBy)
daj := newDescribeConnectionJob(integration, resourceType, triggerType, parentId, createdBy, parametersJsonb)
if removeResources {
daj.Status = apiDescribe.DescribeResourceJobRemovingResources
}
Expand All @@ -355,7 +367,7 @@ func (s *Scheduler) describe(integration integrationapi.Integration, resourceTyp
}

func newDescribeConnectionJob(a integrationapi.Integration, resourceType string, triggerType enums.DescribeTriggerType,
parentId *uint, createdBy string) model.DescribeIntegrationJob {
parentId *uint, createdBy string, parameters pgtype.JSONB) model.DescribeIntegrationJob {
return model.DescribeIntegrationJob{
CreatedBy: createdBy,
ParentID: parentId,
Expand All @@ -365,6 +377,7 @@ func newDescribeConnectionJob(a integrationapi.Integration, resourceType string,
TriggerType: triggerType,
ResourceType: resourceType,
Status: apiDescribe.DescribeResourceJobCreated,
Parameters: parameters,
}
}

Expand All @@ -386,6 +399,13 @@ func (s *Scheduler) enqueueCloudNativeDescribeJob(ctx context.Context, dc model.
zap.String("resourceType", dc.ResourceType),
)

var parameters map[string][]string
if dc.Parameters.Status == pgtype.Present {
if err := json.Unmarshal(dc.Parameters.Bytes, &parameters); err != nil {
return err
}
}

input := describe.DescribeWorkerInput{
JobEndpoint: s.describeExternalEndpoint,
DeliverEndpoint: s.describeExternalEndpoint,
Expand All @@ -409,7 +429,7 @@ func (s *Scheduler) enqueueCloudNativeDescribeJob(ctx context.Context, dc model.
RetryCounter: 0,
},

ExtraInputs: nil,
ExtraInputs: parameters,
}

if err := s.db.QueueDescribeIntegrationJob(dc.ID); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion services/describe/scheduler_quickscan_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (s *DescribeDependencies) Do(ctx context.Context) error {
if _, ok := validResourceTypes[resourceType]; !ok {
continue
}
_, err = s.s.describe(integration, resourceType, false, false, false, &s.job.ID, "QuickScanSequencer")
_, err = s.s.describe(integration, resourceType, false, false, false, &s.job.ID, "QuickScanSequencer", nil)
if err != nil {
return err
}
Expand Down
11 changes: 4 additions & 7 deletions services/describe/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func (h HttpServer) TriggerPerConnectionDescribeJob(ctx echo.Context) error {
}

for _, resourceType := range resourceTypes {
daj, err := h.Scheduler.describe(src, resourceType, false, costFullDiscovery, false, nil, userID)
daj, err := h.Scheduler.describe(src, resourceType, false, costFullDiscovery, false, nil, userID, nil)
if err != nil && errors.Is(err, ErrJobInProgress) {
return echo.NewHTTPError(http.StatusConflict, err.Error())
}
Expand Down Expand Up @@ -461,7 +461,7 @@ func (h HttpServer) TriggerDescribeJob(ctx echo.Context) error {
}

for _, resourceType := range rtToDescribe {
_, err = h.Scheduler.describe(integration, resourceType, false, false, false, nil, userID)
_, err = h.Scheduler.describe(integration, resourceType, false, false, false, nil, userID, nil)
if err != nil {
h.Scheduler.logger.Error("failed to describe connection", zap.String("integration_id", integration.IntegrationID), zap.Error(err))
}
Expand Down Expand Up @@ -732,7 +732,7 @@ func (h HttpServer) ReEvaluateComplianceJob(ctx echo.Context) error {

var dependencyIDs []int64
for _, describeJob := range describeJobs {
daj, err := h.Scheduler.describe(describeJob.Integration, describeJob.ResourceType, false, false, false, nil, userID)
daj, err := h.Scheduler.describe(describeJob.Integration, describeJob.ResourceType, false, false, false, nil, userID, nil)
if err != nil {
h.Scheduler.logger.Error("failed to describe connection", zap.String("integration_id", describeJob.Integration.IntegrationID), zap.Error(err))
continue
Expand Down Expand Up @@ -1503,9 +1503,6 @@ func (h HttpServer) RunDiscovery(ctx echo.Context) error {
}
rtToDescribe := request.ResourceTypes
discoveryType := model2.DiscoveryType_Fast
if request.ForceFull {
discoveryType = model2.DiscoveryType_Full
}
integrationDiscovery := &model2.IntegrationDiscovery{
TriggerID: uint(triggerId),
ConnectionID: integration.IntegrationID,
Expand Down Expand Up @@ -1550,7 +1547,7 @@ func (h HttpServer) RunDiscovery(ctx echo.Context) error {
continue
}
var status, failureReason string
job, err := h.Scheduler.describe(integration, resourceType, false, false, false, &integrationDiscovery.ID, userID)
job, err := h.Scheduler.describe(integration, resourceType, false, false, false, &integrationDiscovery.ID, userID, request.Parameters)
if err != nil {
if err.Error() == "job already in progress" {
tmpJob, err := h.Scheduler.db.GetLastDescribeIntegrationJob(integration.IntegrationID, resourceType)
Expand Down

0 comments on commit 54de292

Please sign in to comment.