Skip to content

Commit

Permalink
Merge pull request #2371 from opengovern/fix-queries
Browse files Browse the repository at this point in the history
fix: add logs to enqueue describe jobs
  • Loading branch information
artaasadi authored Jan 6, 2025
2 parents 0e31641 + 4fa8754 commit 3c94a8d
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions services/scheduler/scheduler_describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ func (s *Scheduler) RunDescribeResourceJobCycle(ctx context.Context, manuals boo
cred: credential,
}
wp.AddJob(func() (interface{}, error) {
err := s.enqueueCloudNativeDescribeJob(ctx, c.dc, c.cred.Secret, c.src)
err, natsPayload := s.enqueueCloudNativeDescribeJob(ctx, c.dc, c.cred.Secret, c.src)
if err != nil {
s.logger.Error("Failed to enqueueCloudNativeDescribeConnectionJob", zap.Error(err), zap.Uint("jobID", dc.ID))
DescribeResourceJobsCount.WithLabelValues("failure", "enqueue").Inc()
return nil, err
return natsPayload, err
}
DescribeResourceJobsCount.WithLabelValues("successful", "").Inc()
return nil, nil
Expand All @@ -173,6 +173,7 @@ func (s *Scheduler) RunDescribeResourceJobCycle(ctx context.Context, manuals boo
res := wp.Run()
for _, r := range res {
if r.Error != nil {
s.logger.Error("failure on calling cloudNative describer", zap.Error(r.Error), zap.String("payload", string(r.Value.([]byte))))
s.logger.Error("failure on calling cloudNative describer", zap.Error(r.Error))
}
}
Expand Down Expand Up @@ -383,15 +384,15 @@ func newDescribeConnectionJob(a integrationapi.Integration, resourceType string,
}

func (s *Scheduler) enqueueCloudNativeDescribeJob(ctx context.Context, dc model.DescribeIntegrationJob, cipherText string,
integration *integrationapi.Integration) error {
integration *integrationapi.Integration) (error, []byte) {
var err error

ctx, span := otel.Tracer(opengovernanceTrace.JaegerTracerName).Start(ctx, opengovernanceTrace.GetCurrentFuncName())
defer span.End()

integrationType, ok := integration_type.IntegrationTypes[dc.IntegrationType]
if !ok {
return fmt.Errorf("integration type not found")
return fmt.Errorf("integration type not found"), nil
}

s.logger.Debug("enqueueCloudNativeDescribeJob",
Expand All @@ -405,7 +406,7 @@ func (s *Scheduler) enqueueCloudNativeDescribeJob(ctx context.Context, dc model.
var parameters map[string]string
if dc.Parameters.Status == pgtype.Present {
if err := json.Unmarshal(dc.Parameters.Bytes, &parameters); err != nil {
return err
return err, nil
}
}

Expand Down Expand Up @@ -469,7 +470,7 @@ func (s *Scheduler) enqueueCloudNativeDescribeJob(ctx context.Context, dc model.
if err != nil {
s.logger.Error("failed to marshal cloud native req", zap.Uint("jobID", dc.ID), zap.String("IntegrationID", dc.IntegrationID), zap.String("resourceType", dc.ResourceType), zap.Error(err))
isFailed = true
return fmt.Errorf("failed to marshal cloud native req due to %w", err)
return fmt.Errorf("failed to marshal cloud native req due to %w", err), natsPayload
}

describerConfig := integrationType.GetConfiguration()
Expand All @@ -484,7 +485,7 @@ func (s *Scheduler) enqueueCloudNativeDescribeJob(ctx context.Context, dc model.
err = s.SetupNats(ctx)
if err != nil {
s.logger.Error("Failed to setup nats streams", zap.Error(err))
return err
return err, natsPayload
}
seqNum, err = s.jq.Produce(ctx, topic, natsPayload, fmt.Sprintf("%s-%d-%d", dc.IntegrationType, input.DescribeJob.JobID, input.DescribeJob.RetryCounter))
if err != nil {
Expand All @@ -495,7 +496,7 @@ func (s *Scheduler) enqueueCloudNativeDescribeJob(ctx context.Context, dc model.
zap.Error(err),
)
isFailed = true
return fmt.Errorf("failed to produce message to jetstream due to %v", err)
return fmt.Errorf("failed to produce message to jetstream due to %v", err), natsPayload
}
} else {
s.logger.Error("failed to produce message to jetstream",
Expand All @@ -506,7 +507,7 @@ func (s *Scheduler) enqueueCloudNativeDescribeJob(ctx context.Context, dc model.
zap.String("error message", err.Error()),
)
isFailed = true
return fmt.Errorf("failed to produce message to jetstream due to %v", err)
return fmt.Errorf("failed to produce message to jetstream due to %v", err), natsPayload
}
}
if seqNum != nil {
Expand All @@ -525,5 +526,5 @@ func (s *Scheduler) enqueueCloudNativeDescribeJob(ctx context.Context, dc model.
zap.String("resourceType", dc.ResourceType),
)

return nil
return nil, natsPayload
}

0 comments on commit 3c94a8d

Please sign in to comment.