diff --git a/cmd/monitor/monitor_backup_retention.go b/cmd/monitor/monitor_backup_retention.go index fffae850..f53dd34f 100644 --- a/cmd/monitor/monitor_backup_retention.go +++ b/cmd/monitor/monitor_backup_retention.go @@ -9,18 +9,19 @@ import ( ) func monitorBackupRetention(ctx context.Context, node *flypg.Node, barman *flypg.Barman) { + ticker := time.NewTicker(defaultBackupRetentionEvalFrequency) defer ticker.Stop() for { select { case <-ctx.Done(): - log.Println("Shutting down backup retention monitor") + log.Println("[WARN] Shutting down backup retention monitor...") return case <-ticker.C: primary, err := isPrimary(ctx, node) if err != nil { - log.Printf("Failed to resolve primary when evaluating retention: %s", err) + log.Printf("[WARN] Failed to resolve primary when evaluating retention: %s", err) continue } @@ -29,7 +30,7 @@ func monitorBackupRetention(ctx context.Context, node *flypg.Node, barman *flypg } if _, err := barman.WALArchiveDelete(ctx); err != nil { - log.Printf("WAL archive retention failed with: %s", err) + log.Printf("[WARN] Failed to prune WAL Archive: %s", err) } } diff --git a/cmd/monitor/monitor_backup_schedule.go b/cmd/monitor/monitor_backup_schedule.go index 29f5b722..e0c5f173 100644 --- a/cmd/monitor/monitor_backup_schedule.go +++ b/cmd/monitor/monitor_backup_schedule.go @@ -9,61 +9,60 @@ import ( "github.com/fly-apps/postgres-flex/internal/flypg" ) +const ( + backupRetryInterval = time.Second * 30 +) + func monitorBackupSchedule(ctx context.Context, node *flypg.Node, barman *flypg.Barman) { lastBackupTime, err := barman.LastCompletedBackup(ctx) if err != nil { - log.Printf("Failed to resolve the last backup taken: %s", err) + log.Printf("[WARN] Failed to resolve the last backup taken: %s", err) } // Calculate when the next backup is due. - nextScheduledBackup, err := calculateNextBackupTime(barman, lastBackupTime) - if err != nil { - log.Printf("Failed to calculate the next scheduled backup time: %s", err) - } + nextScheduledBackup := calculateNextBackupTime(barman, lastBackupTime) // Determine if the node is the primary. primary, err := isPrimary(ctx, node) if err != nil { - log.Printf("Failed to resolve primary status: %s", err) + log.Printf("[WARN] Failed to resolve primary status: %s", err) } if primary { if nextScheduledBackup < 0 { - log.Println("No backups found! Performing the initial base backup.") + log.Println("[INFO] No backups found! Performing the initial base backup.") err := performBaseBackup(ctx, barman, true) switch { case err != nil: - log.Printf("Failed to perform initial base backup: %s", err) + log.Printf("[WARN] Failed to perform initial base backup: %s", err) + lastBackupTime = time.Now().Add(-backupFrequency(barman) + time.Hour) default: - log.Println("Initial base backup completed successfully") + log.Println("[INFO] Initial base backup completed successfully") lastBackupTime = time.Now() } // Recalculate the next scheduled backup time after the initial backup. - nextScheduledBackup, err = calculateNextBackupTime(barman, lastBackupTime) - if err != nil { - log.Printf("Failed to calculate the next scheduled backup time: %s", err) - } + nextScheduledBackup = calculateNextBackupTime(barman, lastBackupTime) } - log.Printf("Next full backup due in: %s", nextScheduledBackup) + log.Printf("[INFO] Next full backup due in: %s", nextScheduledBackup) } + // Monitor the backup schedule even if we are not the primary. This is to ensure backups will + // continue to be taken in the event of a failover. ticker := time.NewTicker(nextScheduledBackup) defer ticker.Stop() - // Monitor the backup schedule even if we are not the primary. This is to ensure backups will - // continue to be taken in the event of a failover. for { select { case <-ctx.Done(): - log.Println("Shutting down backup schedule monitor") + log.Println("[WARN] Shutting down backup schedule monitor...") return case <-ticker.C: // Check to see if we are the Primary. primary, err := isPrimary(ctx, node) if err != nil { - log.Printf("Failed to resolve primary status: %s", err) + log.Printf("[WARN] Failed to resolve primary status: %s", err) } // Noop if we are not the primary. @@ -73,25 +72,27 @@ func monitorBackupSchedule(ctx context.Context, node *flypg.Node, barman *flypg. lastBackupTime, err := barman.LastCompletedBackup(ctx) if err != nil { - log.Printf("Failed to resolve the last backup taken: %s", err) + log.Printf("[WARN] Failed to determine when the last backup was taken: %s", err) } // Recalculate the next scheduled backup time. - nextScheduledBackup, err := calculateNextBackupTime(barman, lastBackupTime) - if err != nil { - log.Printf("Failed to calculate the next scheduled backup time: %s", err) - } + nextScheduledBackup = calculateNextBackupTime(barman, lastBackupTime) - log.Printf("Next full backup due in: %s", nextScheduledBackup) + log.Printf("[INFO] Next full backup due in: %s", nextScheduledBackup) // Perform a full backup if the next scheduled backup time is less than 0. if nextScheduledBackup < 0 { - log.Println("Performing full backup now") - if err := performBaseBackup(ctx, barman, false); err != nil { - log.Printf("Failed to perform full backup: %s", err) + log.Println("[INFO] Performing full backup now") + err := performBaseBackup(ctx, barman, false) + switch { + case err != nil: + log.Printf("[WARN] Failed to perform full backup: %s", err) + // Retry the backup in 1 hour. + nextScheduledBackup = time.Hour + default: + log.Println("[INFO] Full backup completed successfully") + nextScheduledBackup = backupFrequency(barman) } - - nextScheduledBackup = backupFrequency(barman) } // Reset the ticker frequency in case the backup frequency has changed. @@ -100,13 +101,12 @@ func monitorBackupSchedule(ctx context.Context, node *flypg.Node, barman *flypg. } } -func calculateNextBackupTime(barman *flypg.Barman, lastBackupTime time.Time) (time.Duration, error) { +func calculateNextBackupTime(barman *flypg.Barman, lastBackupTime time.Time) time.Duration { // If there was no backup, return a negative duration to trigger an immediate backup. if lastBackupTime.IsZero() { - return -1, nil + return -1 } - - return time.Until(lastBackupTime.Add(backupFrequency(barman))), nil + return time.Until(lastBackupTime.Add(backupFrequency(barman))) } func isPrimary(ctx context.Context, node *flypg.Node) (bool, error) { @@ -127,7 +127,7 @@ func backupFrequency(barman *flypg.Barman) time.Duration { fullBackupDur, err := time.ParseDuration(barman.Settings.FullBackupFrequency) switch { case err != nil: - log.Printf("Failed to parse full backup frequency: %s", err) + log.Printf("[WARN] Failed to parse full backup frequency: %s", err) default: fullBackupSchedule = fullBackupDur } @@ -145,7 +145,7 @@ func performBaseBackup(ctx context.Context, barman *flypg.Barman, immediateCheck return ctx.Err() default: if _, err := barman.Backup(ctx, immediateCheckpoint); err != nil { - log.Printf("failed to perform full backup: %s. Retrying in 30 seconds.", err) + log.Printf("[WARN] Failed to perform full backup: %s. Retrying in 30 seconds.", err) // If we've exceeded the maximum number of retries, we should return an error. if retryCount >= maxRetries { @@ -157,12 +157,11 @@ func performBaseBackup(ctx context.Context, barman *flypg.Barman, immediateCheck select { case <-ctx.Done(): return ctx.Err() - case <-time.After(time.Second * 30): + case <-time.After(backupRetryInterval): continue } } - log.Println("full backup completed successfully") return nil } } diff --git a/cmd/monitor/monitor_backup_schedule_test.go b/cmd/monitor/monitor_backup_schedule_test.go index 05b8c988..7965d9e1 100644 --- a/cmd/monitor/monitor_backup_schedule_test.go +++ b/cmd/monitor/monitor_backup_schedule_test.go @@ -60,10 +60,7 @@ func TestCalculateNextBackupTime(t *testing.T) { } t.Run("no backups", func(t *testing.T) { - nextBackupTime, err := calculateNextBackupTime(barman, time.Time{}) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } + nextBackupTime := calculateNextBackupTime(barman, time.Time{}) if nextBackupTime.Hours() > 0 { t.Fatalf("expected next backup time duration to be less than 0, but got %s", nextBackupTime) @@ -73,10 +70,7 @@ func TestCalculateNextBackupTime(t *testing.T) { t.Run("recent backup", func(t *testing.T) { lastBackup := time.Now().Add(-1 * time.Hour) - nextBackupTime, err := calculateNextBackupTime(barman, lastBackup) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } + nextBackupTime := calculateNextBackupTime(barman, lastBackup) expected := 22.0 if nextBackupTime.Hours() == expected { @@ -87,10 +81,7 @@ func TestCalculateNextBackupTime(t *testing.T) { t.Run("old backup", func(t *testing.T) { lastBackup := time.Now().Add(-25 * time.Hour) - nextBackupTime, err := calculateNextBackupTime(barman, lastBackup) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } + nextBackupTime := calculateNextBackupTime(barman, lastBackup) expected := -1.0 if nextBackupTime.Hours() == -1.0 {