Skip to content

Commit

Permalink
Ensure backups and retention are only evaluated on the primary
Browse files Browse the repository at this point in the history
  • Loading branch information
davissp14 committed Jul 8, 2024
1 parent 140abbc commit ae95fd5
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 97 deletions.
14 changes: 10 additions & 4 deletions cmd/monitor/monitor_backup_retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ func monitorBackupRetention(ctx context.Context, node *flypg.Node, barman *flypg
log.Println("Shutting down backup retention monitor")
return
case <-ticker.C:
result, err := barman.WALArchiveDelete(ctx)
primary, err := isPrimary(ctx, node)
if err != nil {
log.Printf("Backup retention failed with: %s", err)
log.Printf("Failed to resolve primary when evaluating retention: %s", err)
continue
}

if len(result) > 0 {
log.Printf("Backup retention response: %s", result)
if !primary {
continue
}

if _, err := barman.WALArchiveDelete(ctx); err != nil {
log.Printf("WAL archive retention failed with: %s", err)
}

}
}
}
152 changes: 59 additions & 93 deletions cmd/monitor/monitor_backup_schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,37 @@ import (
)

func monitorBackupSchedule(ctx context.Context, node *flypg.Node, barman *flypg.Barman) {
nextScheduledBackup, err := calculateNextBackupTime(ctx, barman)
lastBackupTime, err := barman.LastCompletedBackup(ctx)
if err != nil {
log.Printf("Failed to resolve the last backup taken: %s", err)
}

// Calculate when the next backup is due.
nextScheduledBackup, err := calculateNextBackupTime(barman, lastBackupTime)
if err != nil {
log.Printf("Failed to calculate the next scheduled backup time: %s", err)
}

// Determine if the node is the primary.
primary, err := isPrimary(ctx, node)
if err != nil {
log.Printf("Failed to resolve primary status: %s", err)
}

if primary {

if nextScheduledBackup < 0 {
log.Println("No backups found! Performing the initial base backup.")
if err := performBaseBackup(ctx, barman, true); err != nil {
log.Printf("Failed to perform full backup: %s", err)
err := performBaseBackup(ctx, barman, true)
switch {
case err != nil:
log.Printf("Failed to perform initial base backup: %s", err)
default:
log.Println("Initial base backup completed successfully")
lastBackupTime = time.Now()
}

// Recalculate the next scheduled backup time after the initial backup.
nextScheduledBackup, err = calculateNextBackupTime(ctx, barman)
nextScheduledBackup, err = calculateNextBackupTime(barman, lastBackupTime)
if err != nil {
log.Printf("Failed to calculate the next scheduled backup time: %s", err)
}
Expand All @@ -41,105 +52,61 @@ func monitorBackupSchedule(ctx context.Context, node *flypg.Node, barman *flypg.
ticker := time.NewTicker(nextScheduledBackup)
defer ticker.Stop()

for range ticker.C {
primary, err := isPrimary(ctx, node)
if err != nil {
log.Printf("Failed to resolve primary status: %s", err)
}

// Short-circuit if the node is not the primary.
if !primary {
continue
}
// Monitor the backup schedule even if we are not the primary. This is to ensure backups will
// continue to be taken in the event of a failover.
for {
select {
case <-ctx.Done():
log.Println("Shutting down backup schedule monitor")
return
case <-ticker.C:
// Check to see if we are the Primary.
primary, err := isPrimary(ctx, node)
if err != nil {
log.Printf("Failed to resolve primary status: %s", err)
}

// Calculate when the next backup is due. This needs to be calculated per-tick
// in case the backup frequency has changed or the primary status has changed.
nextScheduledBackup, err := calculateNextBackupTime(ctx, barman)
if err != nil {
log.Printf("Failed to calculate the next scheduled backup time: %s", err)
}
// Noop if we are not the primary.
if !primary {
continue
}

if err := monitorBackupScheduleTick(ctx, node, barman); err != nil {
log.Printf("monitorBackupScheduleTick failed with: %s", err)
}
lastBackupTime, err := barman.LastCompletedBackup(ctx)
if err != nil {
log.Printf("Failed to resolve the last backup taken: %s", err)
}

// Reset the ticker frequency in case the backup frequency has changed.
ticker.Reset(backupFrequency(barman))
}
}
// Recalculate the next scheduled backup time.
nextScheduledBackup, err := calculateNextBackupTime(barman, lastBackupTime)
if err != nil {
log.Printf("Failed to calculate the next scheduled backup time: %s", err)
}

// func performInitialBackup(ctx context.Context, node *flypg.Node, barman *flypg.Barman) error {
// primary, err := isPrimary(ctx, node)
// if err != nil {
// log.Printf("Failed to resolve primary status: %s", err)
// }

// // Short-circuit if the node is not the primary.
// if !primary {
// return nil
// }

// // Determine when the last backup was taken.
// lastBackupTime, err := barman.LastCompletedBackup(ctx)
// if err != nil {
// log.Printf("Failed to resolve the last backup taken: %s", err)
// }

// // Perform the initial backup if the node is the primary.
// if lastBackupTime.IsZero() {
// log.Println("No backups found! Performing the initial base backup.")

// if err := performBaseBackup(ctx, barman); err != nil {
// log.Printf("Failed to perform the initial full backup: %s", err)
// log.Printf("Backup scheduler will re-attempt in %s.", backupFrequency(barman))
// }
// }

// return nil
// }

func monitorBackupScheduleTick(ctx context.Context, node *flypg.Node, barman *flypg.Barman) error {
primary, err := isPrimary(ctx, node)
if err != nil {
return fmt.Errorf("failed to resolve primary status: %s", err)
}
log.Printf("Next full backup due in: %s", nextScheduledBackup)

// Short-circuit if the node is not the primary.
if !primary {
return nil
}
// Perform a full backup if the next scheduled backup time is less than 0.
if nextScheduledBackup < 0 {
log.Println("Performing full backup now")
if err := performBaseBackup(ctx, barman, false); err != nil {
log.Printf("Failed to perform full backup: %s", err)
}

timeUntilNextBackup, err := calculateNextBackupTime(ctx, barman)
if err != nil {
return fmt.Errorf("failed to calculate the next scheduled backup time: %s", err)
}
nextScheduledBackup = backupFrequency(barman)
}

// Perform backup immediately if the time until the next backup is negative.
if timeUntilNextBackup < 0 {
log.Println("Performing full backup now")
if err := performBaseBackup(ctx, barman); err != nil {
return fmt.Errorf("failed to perform full backup: %s", err)
// Reset the ticker frequency in case the backup frequency has changed.
ticker.Reset(nextScheduledBackup)
}
}

log.Printf("Next full backup due in: %s", timeUntilNextBackup)

}

func calculateNextBackupTime(ctx context.Context, barman *flypg.Barman) (time.Duration, error) {
frequency := backupFrequency(barman)

lastBackupTime, err := barman.LastCompletedBackup(ctx)
if err != nil {
log.Printf("Failed to resolve the last backup taken: %s", err)
}

func calculateNextBackupTime(barman *flypg.Barman, lastBackupTime time.Time) (time.Duration, error) {
// If there was no backup, return a negative duration to trigger an immediate backup.
if lastBackupTime.IsZero() {
lastBackupTime = time.Now()
return -1, nil
}

// Calculate the time until the next backup is due.
return time.Until(lastBackupTime.Add(frequency)), nil
return time.Until(lastBackupTime.Add(backupFrequency(barman))), nil
}

func isPrimary(ctx context.Context, node *flypg.Node) (bool, error) {
Expand Down Expand Up @@ -175,10 +142,9 @@ func performBaseBackup(ctx context.Context, barman *flypg.Barman, immediateCheck
for {
select {
case <-ctx.Done():
return nil
return ctx.Err()
default:
_, err := barman.Backup(ctx, immediateCheckpoint)
if err != nil {
if _, err := barman.Backup(ctx, immediateCheckpoint); err != nil {
log.Printf("failed to perform full backup: %s. Retrying in 30 seconds.", err)

// If we've exceeded the maximum number of retries, we should return an error.
Expand Down
128 changes: 128 additions & 0 deletions cmd/monitor/monitor_backup_schedule_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package main

import (
"fmt"
"os"
"testing"
"time"

"github.com/fly-apps/postgres-flex/internal/flypg"
"github.com/fly-apps/postgres-flex/internal/flypg/state"
)

const (
testBarmanConfigDir = "./test_results/barman"
pgTestDirectory = "./test_results/"
)

func TestBackupFrequency(t *testing.T) {
if err := setup(t); err != nil {
t.Fatal(err)
}
defer cleanup()

setDefaultEnv(t)

store, _ := state.NewStore()

barman, err := flypg.NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), flypg.DefaultAuthProfile)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if err := barman.LoadConfig(testBarmanConfigDir); err != nil {
t.Fatalf("unexpected error: %v", err)
}

frequency := backupFrequency(barman)
expected := time.Hour * 24
if frequency != expected {
t.Fatalf("expected frequency to be %s, but got %s", expected, frequency)
}
}

func TestCalculateNextBackupTime(t *testing.T) {
if err := setup(t); err != nil {
t.Fatal(err)
}
defer cleanup()

setDefaultEnv(t)

store, _ := state.NewStore()
barman, err := flypg.NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), flypg.DefaultAuthProfile)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if err := barman.LoadConfig(testBarmanConfigDir); err != nil {
t.Fatalf("unexpected error: %v", err)
}

t.Run("no backups", func(t *testing.T) {
nextBackupTime, err := calculateNextBackupTime(barman, time.Time{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if nextBackupTime.Hours() > 0 {
t.Fatalf("expected next backup time duration to be less than 0, but got %s", nextBackupTime)
}
})

t.Run("recent backup", func(t *testing.T) {
lastBackup := time.Now().Add(-1 * time.Hour)

nextBackupTime, err := calculateNextBackupTime(barman, lastBackup)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

expected := 22.0
if nextBackupTime.Hours() == expected {
t.Fatalf("expected next backup time duration to be %f, but got %f", expected, nextBackupTime.Hours())
}
})

t.Run("old backup", func(t *testing.T) {
lastBackup := time.Now().Add(-25 * time.Hour)

nextBackupTime, err := calculateNextBackupTime(barman, lastBackup)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

expected := -1.0
if nextBackupTime.Hours() == -1.0 {
t.Fatalf("expected next backup time duration to be %f, but got %f", expected, nextBackupTime.Hours())
}
})

}

func setDefaultEnv(t *testing.T) {
t.Setenv("S3_ARCHIVE_CONFIG", "https://my-key:[email protected]/my-bucket/my-directory")
t.Setenv("FLY_APP_NAME", "postgres-flex")
}

func setup(t *testing.T) error {
t.Setenv("FLY_VM_MEMORY_MB", fmt.Sprint(256*(1024*1024)))
t.Setenv("UNIT_TESTING", "true")

if _, err := os.Stat(pgTestDirectory); err != nil {
if os.IsNotExist(err) {
if err := os.Mkdir(pgTestDirectory, 0750); err != nil {
return err
}
} else {
return err
}
}
return nil
}

func cleanup() {
if err := os.RemoveAll(pgTestDirectory); err != nil {
fmt.Printf("failed to remove testing dir: %s\n", err)
}
}

0 comments on commit ae95fd5

Please sign in to comment.