Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
davissp14 committed Jul 8, 2024
1 parent ae95fd5 commit 164f6f9
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 52 deletions.
7 changes: 4 additions & 3 deletions cmd/monitor/monitor_backup_retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@ import (
)

func monitorBackupRetention(ctx context.Context, node *flypg.Node, barman *flypg.Barman) {

ticker := time.NewTicker(defaultBackupRetentionEvalFrequency)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.Println("Shutting down backup retention monitor")
log.Println("[WARN] Shutting down backup retention monitor...")
return
case <-ticker.C:
primary, err := isPrimary(ctx, node)
if err != nil {
log.Printf("Failed to resolve primary when evaluating retention: %s", err)
log.Printf("[WARN] Failed to resolve primary when evaluating retention: %s", err)
continue
}

Expand All @@ -29,7 +30,7 @@ func monitorBackupRetention(ctx context.Context, node *flypg.Node, barman *flypg
}

if _, err := barman.WALArchiveDelete(ctx); err != nil {
log.Printf("WAL archive retention failed with: %s", err)
log.Printf("[WARN] Failed to prune WAL Archive: %s", err)
}

}
Expand Down
73 changes: 36 additions & 37 deletions cmd/monitor/monitor_backup_schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,61 +9,60 @@ import (
"github.com/fly-apps/postgres-flex/internal/flypg"
)

const (
backupRetryInterval = time.Second * 30
)

func monitorBackupSchedule(ctx context.Context, node *flypg.Node, barman *flypg.Barman) {
lastBackupTime, err := barman.LastCompletedBackup(ctx)
if err != nil {
log.Printf("Failed to resolve the last backup taken: %s", err)
log.Printf("[WARN] Failed to resolve the last backup taken: %s", err)
}

// Calculate when the next backup is due.
nextScheduledBackup, err := calculateNextBackupTime(barman, lastBackupTime)
if err != nil {
log.Printf("Failed to calculate the next scheduled backup time: %s", err)
}
nextScheduledBackup := calculateNextBackupTime(barman, lastBackupTime)

// Determine if the node is the primary.
primary, err := isPrimary(ctx, node)
if err != nil {
log.Printf("Failed to resolve primary status: %s", err)
log.Printf("[WARN] Failed to resolve primary status: %s", err)
}

if primary {
if nextScheduledBackup < 0 {
log.Println("No backups found! Performing the initial base backup.")
log.Println("[INFO] No backups found! Performing the initial base backup.")
err := performBaseBackup(ctx, barman, true)
switch {
case err != nil:
log.Printf("Failed to perform initial base backup: %s", err)
log.Printf("[WARN] Failed to perform initial base backup: %s", err)
lastBackupTime = time.Now().Add(-backupFrequency(barman) + time.Hour)
default:
log.Println("Initial base backup completed successfully")
log.Println("[INFO] Initial base backup completed successfully")
lastBackupTime = time.Now()
}

// Recalculate the next scheduled backup time after the initial backup.
nextScheduledBackup, err = calculateNextBackupTime(barman, lastBackupTime)
if err != nil {
log.Printf("Failed to calculate the next scheduled backup time: %s", err)
}
nextScheduledBackup = calculateNextBackupTime(barman, lastBackupTime)
}

log.Printf("Next full backup due in: %s", nextScheduledBackup)
log.Printf("[INFO] Next full backup due in: %s", nextScheduledBackup)
}

// Monitor the backup schedule even if we are not the primary. This is to ensure backups will
// continue to be taken in the event of a failover.
ticker := time.NewTicker(nextScheduledBackup)
defer ticker.Stop()

// Monitor the backup schedule even if we are not the primary. This is to ensure backups will
// continue to be taken in the event of a failover.
for {
select {
case <-ctx.Done():
log.Println("Shutting down backup schedule monitor")
log.Println("[WARN] Shutting down backup schedule monitor...")
return
case <-ticker.C:
// Check to see if we are the Primary.
primary, err := isPrimary(ctx, node)
if err != nil {
log.Printf("Failed to resolve primary status: %s", err)
log.Printf("[WARN] Failed to resolve primary status: %s", err)
}

// Noop if we are not the primary.
Expand All @@ -73,25 +72,27 @@ func monitorBackupSchedule(ctx context.Context, node *flypg.Node, barman *flypg.

lastBackupTime, err := barman.LastCompletedBackup(ctx)
if err != nil {
log.Printf("Failed to resolve the last backup taken: %s", err)
log.Printf("[WARN] Failed to determine when the last backup was taken: %s", err)
}

// Recalculate the next scheduled backup time.
nextScheduledBackup, err := calculateNextBackupTime(barman, lastBackupTime)
if err != nil {
log.Printf("Failed to calculate the next scheduled backup time: %s", err)
}
nextScheduledBackup = calculateNextBackupTime(barman, lastBackupTime)

log.Printf("Next full backup due in: %s", nextScheduledBackup)
log.Printf("[INFO] Next full backup due in: %s", nextScheduledBackup)

// Perform a full backup if the next scheduled backup time is less than 0.
if nextScheduledBackup < 0 {
log.Println("Performing full backup now")
if err := performBaseBackup(ctx, barman, false); err != nil {
log.Printf("Failed to perform full backup: %s", err)
log.Println("[INFO] Performing full backup now")
err := performBaseBackup(ctx, barman, false)
switch {
case err != nil:
log.Printf("[WARN] Failed to perform full backup: %s", err)
// Retry the backup in 1 hour.
nextScheduledBackup = time.Hour
default:
log.Println("[INFO] Full backup completed successfully")
nextScheduledBackup = backupFrequency(barman)
}

nextScheduledBackup = backupFrequency(barman)
}

// Reset the ticker frequency in case the backup frequency has changed.
Expand All @@ -100,13 +101,12 @@ func monitorBackupSchedule(ctx context.Context, node *flypg.Node, barman *flypg.
}
}

func calculateNextBackupTime(barman *flypg.Barman, lastBackupTime time.Time) (time.Duration, error) {
func calculateNextBackupTime(barman *flypg.Barman, lastBackupTime time.Time) time.Duration {
// If there was no backup, return a negative duration to trigger an immediate backup.
if lastBackupTime.IsZero() {
return -1, nil
return -1
}

return time.Until(lastBackupTime.Add(backupFrequency(barman))), nil
return time.Until(lastBackupTime.Add(backupFrequency(barman)))
}

func isPrimary(ctx context.Context, node *flypg.Node) (bool, error) {
Expand All @@ -127,7 +127,7 @@ func backupFrequency(barman *flypg.Barman) time.Duration {
fullBackupDur, err := time.ParseDuration(barman.Settings.FullBackupFrequency)
switch {
case err != nil:
log.Printf("Failed to parse full backup frequency: %s", err)
log.Printf("[WARN] Failed to parse full backup frequency: %s", err)
default:
fullBackupSchedule = fullBackupDur
}
Expand All @@ -145,7 +145,7 @@ func performBaseBackup(ctx context.Context, barman *flypg.Barman, immediateCheck
return ctx.Err()
default:
if _, err := barman.Backup(ctx, immediateCheckpoint); err != nil {
log.Printf("failed to perform full backup: %s. Retrying in 30 seconds.", err)
log.Printf("[WARN] Failed to perform full backup: %s. Retrying in 30 seconds.", err)

// If we've exceeded the maximum number of retries, we should return an error.
if retryCount >= maxRetries {
Expand All @@ -157,12 +157,11 @@ func performBaseBackup(ctx context.Context, barman *flypg.Barman, immediateCheck
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second * 30):
case <-time.After(backupRetryInterval):
continue
}
}

log.Println("full backup completed successfully")
return nil
}
}
Expand Down
15 changes: 3 additions & 12 deletions cmd/monitor/monitor_backup_schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ func TestCalculateNextBackupTime(t *testing.T) {
}

t.Run("no backups", func(t *testing.T) {
nextBackupTime, err := calculateNextBackupTime(barman, time.Time{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
nextBackupTime := calculateNextBackupTime(barman, time.Time{})

if nextBackupTime.Hours() > 0 {
t.Fatalf("expected next backup time duration to be less than 0, but got %s", nextBackupTime)
Expand All @@ -73,10 +70,7 @@ func TestCalculateNextBackupTime(t *testing.T) {
t.Run("recent backup", func(t *testing.T) {
lastBackup := time.Now().Add(-1 * time.Hour)

nextBackupTime, err := calculateNextBackupTime(barman, lastBackup)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
nextBackupTime := calculateNextBackupTime(barman, lastBackup)

expected := 22.0
if nextBackupTime.Hours() == expected {
Expand All @@ -87,10 +81,7 @@ func TestCalculateNextBackupTime(t *testing.T) {
t.Run("old backup", func(t *testing.T) {
lastBackup := time.Now().Add(-25 * time.Hour)

nextBackupTime, err := calculateNextBackupTime(barman, lastBackup)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
nextBackupTime := calculateNextBackupTime(barman, lastBackup)

expected := -1.0
if nextBackupTime.Hours() == -1.0 {
Expand Down

0 comments on commit 164f6f9

Please sign in to comment.