Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedule fix #236

Merged
merged 8 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ func main() {
}
}()

if os.Getenv("S3_ARCHIVE_CONFIG") != "" {
// No need to monitor backups outside of the primary region.
if os.Getenv("S3_ARCHIVE_CONFIG") != "" && node.PrimaryRegion == node.RepMgr.Region {
store, err := state.NewStore()
if err != nil {
panic(fmt.Errorf("failed initialize cluster state store: %s", err))
Expand All @@ -63,10 +64,10 @@ func main() {
}

// Backup scheduler
go monitorBackupSchedule(ctx, barman)
go monitorBackupSchedule(ctx, node, barman)

// Backup retention monitor
go monitorBackupRetention(ctx, barman)
go monitorBackupRetention(ctx, node, barman)
}

// Readonly monitor
Expand Down
19 changes: 13 additions & 6 deletions cmd/monitor/monitor_backup_retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,31 @@ import (
"github.com/fly-apps/postgres-flex/internal/flypg"
)

func monitorBackupRetention(ctx context.Context, barman *flypg.Barman) {
func monitorBackupRetention(ctx context.Context, node *flypg.Node, barman *flypg.Barman) {

ticker := time.NewTicker(defaultBackupRetentionEvalFrequency)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.Println("Shutting down backup retention monitor")
log.Println("[WARN] Shutting down backup retention monitor...")
return
case <-ticker.C:
result, err := barman.WALArchiveDelete(ctx)
primary, err := isPrimary(ctx, node)
if err != nil {
log.Printf("Backup retention failed with: %s", err)
log.Printf("[WARN] Failed to resolve primary when evaluating retention: %s", err)
continue
}

if !primary {
continue
}

if len(result) > 0 {
log.Printf("Backup retention response: %s", result)
if _, err := barman.WALArchiveDelete(ctx); err != nil {
log.Printf("[WARN] Failed to prune WAL Archive: %s", err)
}

}
}
}
162 changes: 106 additions & 56 deletions cmd/monitor/monitor_backup_schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,114 +9,164 @@ import (
"github.com/fly-apps/postgres-flex/internal/flypg"
)

func monitorBackupSchedule(ctx context.Context, barman *flypg.Barman) {
const (
backupRetryInterval = time.Second * 30
)

func monitorBackupSchedule(ctx context.Context, node *flypg.Node, barman *flypg.Barman) {
// Determine when the last backup was taken.
lastBackupTime, err := barman.LastCompletedBackup(ctx)
if err != nil {
log.Printf("Failed to resolve the last backup taken: %s", err)
log.Printf("[WARN] %s", err)
}

fullBackupSchedule := defaultFullBackupSchedule
// Calculate the next scheduled backup time.
nextScheduledBackup := calculateNextBackupTime(barman, lastBackupTime)

// Set the full backup schedule if it is defined in the configuration.
if barman.Settings.FullBackupFrequency != "" {
fullBackupDur, err := time.ParseDuration(barman.Settings.FullBackupFrequency)
switch {
case err != nil:
log.Printf("Failed to parse full backup frequency: %s", err)
default:
fullBackupSchedule = fullBackupDur
}
// Check to see if we are the Primary.
primary, err := isPrimary(ctx, node)
if err != nil {
log.Printf("[WARN] Failed to resolve primary status: %s", err)
}

// Ensure we have a least one backup before proceeding.
if lastBackupTime.IsZero() {
log.Println("No backups found! Performing the initial base backup.")
// Perform the initial base backup if we are the primary and either no backups have been taken
// or the next scheduled backup time is less than 0.
if primary {
if nextScheduledBackup < 0 {
err := performBaseBackup(ctx, barman, true)
switch {
case err != nil:
log.Printf("[WARN] Failed to perform initial base backup: %s", err)
log.Printf("[INFO] Retrying in 10 minutes...")
lastBackupTime = time.Now().Add(-backupFrequency(barman) + 10*time.Minute)
default:
log.Println("[INFO] Initial base backup completed successfully")
lastBackupTime = time.Now()
}

if err := performInitialBaseBackup(ctx, barman); err != nil {
log.Printf("Failed to perform the initial full backup: %s", err)
log.Printf("Backup scheduler will re-attempt in %s.", fullBackupSchedule)
// Recalculate the next scheduled backup time after the initial backup.
nextScheduledBackup = calculateNextBackupTime(barman, lastBackupTime)
}

lastBackupTime = time.Now()
}

log.Printf("Full backup schedule set to: %s", fullBackupSchedule)

// Calculate the time until the next backup is due.
timeUntilNextBackup := time.Until(lastBackupTime.Add(fullBackupSchedule))

// Perform backup immediately if the time until the next backup is negative.
if timeUntilNextBackup < 0 {
log.Println("Performing full backup now")
_, err := barman.Backup(ctx, false)
if err != nil {
log.Printf("Full backup failed with: %s", err)
}

timeUntilNextBackup = fullBackupSchedule
// Safety net in case ticker does not have a valid duration.
if nextScheduledBackup < 0 {
nextScheduledBackup = backupFrequency(barman)
}

log.Printf("Next full backup due in: %s", timeUntilNextBackup)
log.Printf("[INFO] Next full backup due in: %s", nextScheduledBackup)

ticker := time.NewTicker(timeUntilNextBackup)
// Monitor the backup schedule even if we are not the primary. This is to ensure backups will
// continue to be taken in the event of a failover.
ticker := time.NewTicker(nextScheduledBackup)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.Println("Shutting down backup scheduler")
log.Println("[WARN] Shutting down backup schedule monitor...")
return
case <-ticker.C:
// Perform a backup while passively waiting for the checkpoint process to complete.
// This could actually take a while, so we should be prepared to wait.
log.Println("Performing full backup")
_, err := barman.Backup(ctx, false)
// Check to see if we are the Primary. This is necessary given failovers can occur at runtime.
primary, err := isPrimary(ctx, node)
if err != nil {
// TODO - Implement a backup-off strategy.
timeUntilNextBackup = time.Hour * 1
ticker.Reset(timeUntilNextBackup)
log.Printf("[WARN] Failed to resolve primary status: %s", err)
continue
}

log.Printf("Backup retention failed with: %s.", err)
log.Printf("Backup will be re-attempted in %s.", timeUntilNextBackup)
if !primary {
continue
}

lastBackupTime, err := barman.LastCompletedBackup(ctx)
if err != nil {
log.Printf("[WARN] Failed to determine when the last backup was taken: %s", err)
continue
}

log.Printf("Full backup completed successfully")
ticker.Reset(fullBackupSchedule)
// Recalculate the next scheduled backup time.
nextScheduledBackup = calculateNextBackupTime(barman, lastBackupTime)

// Perform a full backup if the next scheduled backup time is less than 0.
if nextScheduledBackup < 0 {
log.Println("[INFO] Performing full backup...")
if err := performBaseBackup(ctx, barman, false); err != nil {
log.Printf("[WARN] Failed to perform full backup: %v", err)
}

// TODO - We should consider retrying at a shorter interval in the event of a failure.
nextScheduledBackup = backupFrequency(barman)
}

log.Printf("[INFO] Next full backup due in: %s", nextScheduledBackup)

// Reset the ticker frequency in case the backup frequency has changed.
ticker.Reset(nextScheduledBackup)
}
}
}

func performInitialBaseBackup(ctx context.Context, barman *flypg.Barman) error {
func calculateNextBackupTime(barman *flypg.Barman, lastBackupTime time.Time) time.Duration {
// If there was no backup, return a negative duration to trigger an immediate backup.
if lastBackupTime.IsZero() {
return -1
}
return time.Until(lastBackupTime.Add(backupFrequency(barman)))
}

func isPrimary(ctx context.Context, node *flypg.Node) (bool, error) {
conn, err := node.RepMgr.NewLocalConnection(ctx)
if err != nil {
return false, fmt.Errorf("failed to open local connection: %s", err)
}
defer func() { _ = conn.Close(ctx) }()

return node.RepMgr.IsPrimary(ctx, conn)
}

func backupFrequency(barman *flypg.Barman) time.Duration {
fullBackupSchedule := defaultFullBackupSchedule

// Set the full backup schedule if it is defined in the configuration.
if barman.Settings.FullBackupFrequency != "" {
fullBackupDur, err := time.ParseDuration(barman.Settings.FullBackupFrequency)
switch {
case err != nil:
log.Printf("[WARN] Failed to parse full backup frequency: %s", err)
default:
fullBackupSchedule = fullBackupDur
}
}

return fullBackupSchedule
}

func performBaseBackup(ctx context.Context, barman *flypg.Barman, immediateCheckpoint bool) error {
maxRetries := 10
retryCount := 0
for {
select {
case <-ctx.Done():
return nil
return ctx.Err()
default:
_, err := barman.Backup(ctx, true)
if err != nil {
log.Printf("Failed to perform the initial full backup: %s. Retrying in 30 seconds.", err)
if _, err := barman.Backup(ctx, immediateCheckpoint); err != nil {
log.Printf("[WARN] Failed to perform full backup: %s. Retrying in 30 seconds.", err)

// If we've exceeded the maximum number of retries, we should return an error.
if retryCount >= maxRetries {
return fmt.Errorf("failed to perform the initial full backup after %d retries", maxRetries)
return fmt.Errorf("failed to perform full backup after %d retries", maxRetries)
}

retryCount++

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second * 30):
case <-time.After(backupRetryInterval):
continue
}
}

log.Println("Initial full backup completed successfully")
return nil
}
}
Expand Down
Loading
Loading