Skip to content

Commit

Permalink
Added cannot load field protection from executions due to change in v…
Browse files Browse the repository at this point in the history
…alues in struct
  • Loading branch information
frikky committed Nov 6, 2024
1 parent 169a0ef commit 48c15b0
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 20 deletions.
47 changes: 38 additions & 9 deletions db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2009,7 +2009,7 @@ func GetWorkflowExecutionByAuth(ctx context.Context, authId string) (*WorkflowEx
if err == nil {
cacheData := []byte(cache.([]uint8))
err = json.Unmarshal(cacheData, &workflowExecution)
if err == nil {
if err == nil || len(workflowExecution.ExecutionId) > 0 {
return workflowExecution, nil
}
}
Expand All @@ -2024,7 +2024,11 @@ func GetWorkflowExecutionByAuth(ctx context.Context, authId string) (*WorkflowEx
_, err := project.Dbclient.GetAll(ctx, q, &allExecutions)
if err != nil {
log.Printf("[WARNING] Failed getting workflow execution by auth: %s", err)
return nil, err
if strings.Contains(err.Error(), `cannot load field`) {
err = nil
} else {
return nil, err
}
} else {
if len(allExecutions) > 0 {
workflowExecution = allExecutions[0]
Expand Down Expand Up @@ -2056,7 +2060,7 @@ func GetWorkflowExecution(ctx context.Context, id string) (*WorkflowExecution, e
if err == nil {
cacheData := []byte(cache.([]uint8))
err = json.Unmarshal(cacheData, &workflowExecution)
if err == nil {
if err == nil || len(workflowExecution.ExecutionId) > 0 {
//log.Printf("[DEBUG] Checking individual execution cache with %d results", len(workflowExecution.Results))
if strings.Contains(workflowExecution.ExecutionArgument, "Result too large to handle") {
baseArgument := &ActionResult{
Expand Down Expand Up @@ -2128,7 +2132,11 @@ func GetWorkflowExecution(ctx context.Context, id string) (*WorkflowExecution, e
} else {
key := datastore.NameKey(nameKey, strings.ToLower(id), nil)
if err := project.Dbclient.Get(ctx, key, workflowExecution); err != nil {
return workflowExecution, err
if strings.Contains(err.Error(), `cannot load field`) {
err = nil
} else {
return workflowExecution, err
}
}

// A workaround for large bits of information for execution argument
Expand Down Expand Up @@ -10834,11 +10842,20 @@ func GetAllWorkflowExecutionsV2(ctx context.Context, workflowId string, amount i
_, err := it.Next(&innerWorkflow)
if err != nil {
if strings.Contains(err.Error(), "context deadline exceeded") {
log.Printf("[WARNING] Error getting workflow executions: %s", err)
log.Printf("[WARNING] Error getting workflow executions (1): %s", err)
breakOuter = true
} else {
if strings.Contains(err.Error(), `cannot load field`) {
// Bug with moving types
err = nil
} else if strings.Contains(err.Error(), `no more items`) {
//breakOuter = true
break
} else {
log.Printf("[WARNING] Error getting workflow executions (2): %s", err)
break
}
}

break
}

executions = append(executions, innerWorkflow)
Expand Down Expand Up @@ -13451,8 +13468,20 @@ func GetWorkflowRunsBySearch(ctx context.Context, orgId string, search WorkflowS
innerWorkflow := WorkflowExecution{}
_, err := it.Next(&innerWorkflow)
if err != nil {
//log.Printf("[WARNING] Error getting workflow executions: %s", err)
break
if strings.Contains(err.Error(), "context deadline exceeded") {
log.Printf("[WARNING] Error getting workflow search executions (1): %s", err)
} else {
if strings.Contains(err.Error(), `cannot load field`) {
// Bug with moving types
err = nil
} else if strings.Contains(err.Error(), `no more items`) {
//breakOuter = true
break
} else {
log.Printf("[WARNING] Error getting workflow search executions (2): %s", err)
break
}
}
}

executions = append(executions, innerWorkflow)
Expand Down
15 changes: 4 additions & 11 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -4184,7 +4184,7 @@ func GetWorkflowExecutionsV2(resp http.ResponseWriter, request *http.Request) {
}

// Add timeout of 6 seconds to the ctx
ctx, cancel := context.WithTimeout(ctx, 7*time.Second)
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

cursor := ""
Expand Down Expand Up @@ -29142,16 +29142,9 @@ func checkExecutionStatus(ctx context.Context, exec *WorkflowExecution) *Workflo
workflow.Actions = originalActions

// This causes too many writes and can't be handled at scale. Removing for now. Only setting cache.
cacheKey := fmt.Sprintf("workflow_%s", workflow.ID)
data, err := json.Marshal(workflow)
if err != nil {
log.Printf("[ERROR] Failed marshalling validation of %s: %s", workflow.ID, err)
} else {
err = SetCache(ctx, cacheKey, data, 30)
if err != nil {
log.Printf("[ERROR] Failed setting cache for workflow '%s': %s", cacheKey, err)
}
}
/*
// FIXME: Even removing cache due to possibility of workflow override if an execution is finishing after a users' save. Also fails with delays. For now, using validation_workflow_%s to handle it all
*/
}

exec.Workflow.Validation = workflow.Validation
Expand Down

0 comments on commit 48c15b0

Please sign in to comment.