Skip to content

Commit

Permalink
feat: maintenance mode enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta committed Nov 14, 2024
1 parent 42b0208 commit ddce22b
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 3 deletions.
35 changes: 32 additions & 3 deletions flow/cmd/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log/slog"
"os"
"time"

"github.com/aws/smithy-go/ptr"
"go.temporal.io/sdk/client"
Expand All @@ -33,6 +34,7 @@ type MaintenanceCLIParams struct {
FlowTlsEnabled bool
SkipOnApiVersionMatch bool
SkipOnNoMirrors bool
SkipOnNoSnapshotMirrors bool
UseMaintenanceTaskQueue bool
AssumeSkippedMaintenanceWorkflows bool
}
Expand Down Expand Up @@ -140,7 +142,7 @@ func skipStartMaintenanceIfNeeded(ctx context.Context, args *MaintenanceCLIParam
})
}
}
if args.SkipOnApiVersionMatch || args.SkipOnNoMirrors {
if args.SkipOnApiVersionMatch || args.SkipOnNoMirrors || args.SkipOnNoSnapshotMirrors {
if args.FlowGrpcAddress == "" {
return false, errors.New("flow address is required when skipping based on API")
}
Expand Down Expand Up @@ -176,20 +178,47 @@ func skipStartMaintenanceIfNeeded(ctx context.Context, args *MaintenanceCLIParam
})
}
}
if args.SkipOnNoMirrors {
if args.SkipOnNoMirrors || args.SkipOnNoSnapshotMirrors {
slog.Info("Checking if there are any mirrors")
startTime := time.Now()
mirrors, err := peerFlowClient.ListMirrors(ctx, &protos.ListMirrorsRequest{})
if err != nil {
return false, err
}
slog.Info("Got mirrors from flow", "mirrors", mirrors.Mirrors)
if len(mirrors.Mirrors) == 0 {
if args.SkipOnNoMirrors && len(mirrors.Mirrors) == 0 {
slog.Info("Skipping maintenance workflow due to no mirrors")
return true, WriteMaintenanceOutputToCatalog(ctx, StartMaintenanceResult{
Skipped: true,
SkippedReason: ptr.String("No mirrors found"),
})
}
if args.SkipOnNoSnapshotMirrors {
for _, mirror := range mirrors.Mirrors {
if time.Now().Sub(startTime) > 30*time.Second {

Check failure on line 198 in flow/cmd/maintenance.go

View workflow job for this annotation

GitHub Actions / lint

S1012: should use `time.Since` instead of `time.Now().Sub` (gosimple)
slog.Warn("Mirrors list was fetched more than 30 seconds ago, assuming mirrors list is stale and not skipping maintenance workflow")
return false, nil
}
mirrorInfo, err := peerFlowClient.MirrorStatus(ctx, &protos.MirrorStatusRequest{
FlowJobName: mirror.Name,
IncludeFlowInfo: false,
})
if err != nil {
return false, err
}
if mirrorInfo.CurrentFlowState == protos.FlowStatus_STATUS_SNAPSHOT ||
mirrorInfo.CurrentFlowState == protos.FlowStatus_STATUS_SETUP {
slog.Info("Found an active snapshot mirror, not skipping maintenance workflow", "mirror", mirror.Name,
"state", mirrorInfo.CurrentFlowState)
return false, nil
}
}
slog.Info("Skipping maintenance workflow due to no snapshot mirrors")
return true, WriteMaintenanceOutputToCatalog(ctx, StartMaintenanceResult{
Skipped: true,
SkippedReason: ptr.String("No snapshot mirrors found"),
})
}
}
}
return false, nil
Expand Down
9 changes: 9 additions & 0 deletions flow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ func main() {
Sources: cli.EnvVars("MAINTENANCE_SKIP_ON_NO_MIRRORS"),
}

maintenanceSkipOnNoSnapshotMirrorsFlag := &cli.BoolFlag{
Name: "skip-on-no-snapshot-mirrors",
Value: false,
Usage: "Skip maintenance flow if there are no snapshot mirrors",
Sources: cli.EnvVars("MAINTENANCE_SKIP_ON_NO_SNAPSHOT_MIRRORS"),
}

flowGrpcAddressFlag := &cli.StringFlag{
Name: "flow-grpc-address",
Value: "",
Expand Down Expand Up @@ -211,6 +218,7 @@ func main() {
temporalNamespaceFlag,
maintenanceModeWorkflowFlag,
maintenanceSkipOnApiVersionMatchFlag,
maintenanceSkipOnNoSnapshotMirrorsFlag,
maintenanceSkipOnNoMirrorsFlag,
flowGrpcAddressFlag,
flowTlsEnabledFlag,
Expand All @@ -227,6 +235,7 @@ func main() {
Mode: clicmd.String(maintenanceModeWorkflowFlag.Name),
SkipOnApiVersionMatch: clicmd.Bool(maintenanceSkipOnApiVersionMatchFlag.Name),
SkipOnNoMirrors: clicmd.Bool(maintenanceSkipOnNoMirrorsFlag.Name),
SkipOnNoSnapshotMirrors: clicmd.Bool(maintenanceSkipOnNoSnapshotMirrorsFlag.Name),
FlowGrpcAddress: clicmd.String(flowGrpcAddressFlag.Name),
FlowTlsEnabled: clicmd.Bool(flowTlsEnabledFlag.Name),
UseMaintenanceTaskQueue: clicmd.Bool(useMaintenanceTaskQueueFlag.Name),
Expand Down

0 comments on commit ddce22b

Please sign in to comment.