diff --git a/cmd/monitor/monitor_backup_retention.go b/cmd/monitor/monitor_backup_retention.go index b5411f3f..fffae850 100644 --- a/cmd/monitor/monitor_backup_retention.go +++ b/cmd/monitor/monitor_backup_retention.go @@ -18,14 +18,20 @@ func monitorBackupRetention(ctx context.Context, node *flypg.Node, barman *flypg log.Println("Shutting down backup retention monitor") return case <-ticker.C: - result, err := barman.WALArchiveDelete(ctx) + primary, err := isPrimary(ctx, node) if err != nil { - log.Printf("Backup retention failed with: %s", err) + log.Printf("Failed to resolve primary when evaluating retention: %s", err) + continue } - if len(result) > 0 { - log.Printf("Backup retention response: %s", result) + if !primary { + continue } + + if _, err := barman.WALArchiveDelete(ctx); err != nil { + log.Printf("WAL archive retention failed with: %s", err) + } + } } } diff --git a/cmd/monitor/monitor_backup_schedule.go b/cmd/monitor/monitor_backup_schedule.go index 3779e694..29f5b722 100644 --- a/cmd/monitor/monitor_backup_schedule.go +++ b/cmd/monitor/monitor_backup_schedule.go @@ -10,26 +10,37 @@ import ( ) func monitorBackupSchedule(ctx context.Context, node *flypg.Node, barman *flypg.Barman) { - nextScheduledBackup, err := calculateNextBackupTime(ctx, barman) + lastBackupTime, err := barman.LastCompletedBackup(ctx) + if err != nil { + log.Printf("Failed to resolve the last backup taken: %s", err) + } + + // Calculate when the next backup is due. + nextScheduledBackup, err := calculateNextBackupTime(barman, lastBackupTime) if err != nil { log.Printf("Failed to calculate the next scheduled backup time: %s", err) } + // Determine if the node is the primary. primary, err := isPrimary(ctx, node) if err != nil { log.Printf("Failed to resolve primary status: %s", err) } if primary { - if nextScheduledBackup < 0 { log.Println("No backups found! Performing the initial base backup.") - if err := performBaseBackup(ctx, barman, true); err != nil { - log.Printf("Failed to perform full backup: %s", err) + err := performBaseBackup(ctx, barman, true) + switch { + case err != nil: + log.Printf("Failed to perform initial base backup: %s", err) + default: + log.Println("Initial base backup completed successfully") + lastBackupTime = time.Now() } // Recalculate the next scheduled backup time after the initial backup. - nextScheduledBackup, err = calculateNextBackupTime(ctx, barman) + nextScheduledBackup, err = calculateNextBackupTime(barman, lastBackupTime) if err != nil { log.Printf("Failed to calculate the next scheduled backup time: %s", err) } @@ -41,105 +52,61 @@ func monitorBackupSchedule(ctx context.Context, node *flypg.Node, barman *flypg. ticker := time.NewTicker(nextScheduledBackup) defer ticker.Stop() - for range ticker.C { - primary, err := isPrimary(ctx, node) - if err != nil { - log.Printf("Failed to resolve primary status: %s", err) - } - - // Short-circuit if the node is not the primary. - if !primary { - continue - } + // Monitor the backup schedule even if we are not the primary. This is to ensure backups will + // continue to be taken in the event of a failover. + for { + select { + case <-ctx.Done(): + log.Println("Shutting down backup schedule monitor") + return + case <-ticker.C: + // Check to see if we are the Primary. + primary, err := isPrimary(ctx, node) + if err != nil { + log.Printf("Failed to resolve primary status: %s", err) + } - // Calculate when the next backup is due. This needs to be calculated per-tick - // in case the backup frequency has changed or the primary status has changed. - nextScheduledBackup, err := calculateNextBackupTime(ctx, barman) - if err != nil { - log.Printf("Failed to calculate the next scheduled backup time: %s", err) - } + // Noop if we are not the primary. + if !primary { + continue + } - if err := monitorBackupScheduleTick(ctx, node, barman); err != nil { - log.Printf("monitorBackupScheduleTick failed with: %s", err) - } + lastBackupTime, err := barman.LastCompletedBackup(ctx) + if err != nil { + log.Printf("Failed to resolve the last backup taken: %s", err) + } - // Reset the ticker frequency in case the backup frequency has changed. - ticker.Reset(backupFrequency(barman)) - } -} + // Recalculate the next scheduled backup time. + nextScheduledBackup, err := calculateNextBackupTime(barman, lastBackupTime) + if err != nil { + log.Printf("Failed to calculate the next scheduled backup time: %s", err) + } -// func performInitialBackup(ctx context.Context, node *flypg.Node, barman *flypg.Barman) error { -// primary, err := isPrimary(ctx, node) -// if err != nil { -// log.Printf("Failed to resolve primary status: %s", err) -// } - -// // Short-circuit if the node is not the primary. -// if !primary { -// return nil -// } - -// // Determine when the last backup was taken. -// lastBackupTime, err := barman.LastCompletedBackup(ctx) -// if err != nil { -// log.Printf("Failed to resolve the last backup taken: %s", err) -// } - -// // Perform the initial backup if the node is the primary. -// if lastBackupTime.IsZero() { -// log.Println("No backups found! Performing the initial base backup.") - -// if err := performBaseBackup(ctx, barman); err != nil { -// log.Printf("Failed to perform the initial full backup: %s", err) -// log.Printf("Backup scheduler will re-attempt in %s.", backupFrequency(barman)) -// } -// } - -// return nil -// } - -func monitorBackupScheduleTick(ctx context.Context, node *flypg.Node, barman *flypg.Barman) error { - primary, err := isPrimary(ctx, node) - if err != nil { - return fmt.Errorf("failed to resolve primary status: %s", err) - } + log.Printf("Next full backup due in: %s", nextScheduledBackup) - // Short-circuit if the node is not the primary. - if !primary { - return nil - } + // Perform a full backup if the next scheduled backup time is less than 0. + if nextScheduledBackup < 0 { + log.Println("Performing full backup now") + if err := performBaseBackup(ctx, barman, false); err != nil { + log.Printf("Failed to perform full backup: %s", err) + } - timeUntilNextBackup, err := calculateNextBackupTime(ctx, barman) - if err != nil { - return fmt.Errorf("failed to calculate the next scheduled backup time: %s", err) - } + nextScheduledBackup = backupFrequency(barman) + } - // Perform backup immediately if the time until the next backup is negative. - if timeUntilNextBackup < 0 { - log.Println("Performing full backup now") - if err := performBaseBackup(ctx, barman); err != nil { - return fmt.Errorf("failed to perform full backup: %s", err) + // Reset the ticker frequency in case the backup frequency has changed. + ticker.Reset(nextScheduledBackup) } } - - log.Printf("Next full backup due in: %s", timeUntilNextBackup) - } -func calculateNextBackupTime(ctx context.Context, barman *flypg.Barman) (time.Duration, error) { - frequency := backupFrequency(barman) - - lastBackupTime, err := barman.LastCompletedBackup(ctx) - if err != nil { - log.Printf("Failed to resolve the last backup taken: %s", err) - } - +func calculateNextBackupTime(barman *flypg.Barman, lastBackupTime time.Time) (time.Duration, error) { + // If there was no backup, return a negative duration to trigger an immediate backup. if lastBackupTime.IsZero() { - lastBackupTime = time.Now() + return -1, nil } - // Calculate the time until the next backup is due. - return time.Until(lastBackupTime.Add(frequency)), nil + return time.Until(lastBackupTime.Add(backupFrequency(barman))), nil } func isPrimary(ctx context.Context, node *flypg.Node) (bool, error) { @@ -175,10 +142,9 @@ func performBaseBackup(ctx context.Context, barman *flypg.Barman, immediateCheck for { select { case <-ctx.Done(): - return nil + return ctx.Err() default: - _, err := barman.Backup(ctx, immediateCheckpoint) - if err != nil { + if _, err := barman.Backup(ctx, immediateCheckpoint); err != nil { log.Printf("failed to perform full backup: %s. Retrying in 30 seconds.", err) // If we've exceeded the maximum number of retries, we should return an error. diff --git a/cmd/monitor/monitor_backup_schedule_test.go b/cmd/monitor/monitor_backup_schedule_test.go new file mode 100644 index 00000000..05b8c988 --- /dev/null +++ b/cmd/monitor/monitor_backup_schedule_test.go @@ -0,0 +1,128 @@ +package main + +import ( + "fmt" + "os" + "testing" + "time" + + "github.com/fly-apps/postgres-flex/internal/flypg" + "github.com/fly-apps/postgres-flex/internal/flypg/state" +) + +const ( + testBarmanConfigDir = "./test_results/barman" + pgTestDirectory = "./test_results/" +) + +func TestBackupFrequency(t *testing.T) { + if err := setup(t); err != nil { + t.Fatal(err) + } + defer cleanup() + + setDefaultEnv(t) + + store, _ := state.NewStore() + + barman, err := flypg.NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), flypg.DefaultAuthProfile) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if err := barman.LoadConfig(testBarmanConfigDir); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + frequency := backupFrequency(barman) + expected := time.Hour * 24 + if frequency != expected { + t.Fatalf("expected frequency to be %s, but got %s", expected, frequency) + } +} + +func TestCalculateNextBackupTime(t *testing.T) { + if err := setup(t); err != nil { + t.Fatal(err) + } + defer cleanup() + + setDefaultEnv(t) + + store, _ := state.NewStore() + barman, err := flypg.NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), flypg.DefaultAuthProfile) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if err := barman.LoadConfig(testBarmanConfigDir); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + t.Run("no backups", func(t *testing.T) { + nextBackupTime, err := calculateNextBackupTime(barman, time.Time{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if nextBackupTime.Hours() > 0 { + t.Fatalf("expected next backup time duration to be less than 0, but got %s", nextBackupTime) + } + }) + + t.Run("recent backup", func(t *testing.T) { + lastBackup := time.Now().Add(-1 * time.Hour) + + nextBackupTime, err := calculateNextBackupTime(barman, lastBackup) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + expected := 22.0 + if nextBackupTime.Hours() == expected { + t.Fatalf("expected next backup time duration to be %f, but got %f", expected, nextBackupTime.Hours()) + } + }) + + t.Run("old backup", func(t *testing.T) { + lastBackup := time.Now().Add(-25 * time.Hour) + + nextBackupTime, err := calculateNextBackupTime(barman, lastBackup) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + expected := -1.0 + if nextBackupTime.Hours() == -1.0 { + t.Fatalf("expected next backup time duration to be %f, but got %f", expected, nextBackupTime.Hours()) + } + }) + +} + +func setDefaultEnv(t *testing.T) { + t.Setenv("S3_ARCHIVE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory") + t.Setenv("FLY_APP_NAME", "postgres-flex") +} + +func setup(t *testing.T) error { + t.Setenv("FLY_VM_MEMORY_MB", fmt.Sprint(256*(1024*1024))) + t.Setenv("UNIT_TESTING", "true") + + if _, err := os.Stat(pgTestDirectory); err != nil { + if os.IsNotExist(err) { + if err := os.Mkdir(pgTestDirectory, 0750); err != nil { + return err + } + } else { + return err + } + } + return nil +} + +func cleanup() { + if err := os.RemoveAll(pgTestDirectory); err != nil { + fmt.Printf("failed to remove testing dir: %s\n", err) + } +}