From 479dd0f79665334b1421cf9ae8927b46dccebfb0 Mon Sep 17 00:00:00 2001 From: Shaun Davis Date: Wed, 26 Jun 2024 11:13:25 -0500 Subject: [PATCH] Retention work --- cmd/monitor/main.go | 15 +++ cmd/monitor/monitor_backup_retention.go | 24 +++++ cmd/monitor/monitor_dead_members.go | 2 - cmd/start/main.go | 1 - internal/flypg/barman.go | 125 ++++++++++++++++++++++++ internal/flypg/pg.go | 34 +------ 6 files changed, 169 insertions(+), 32 deletions(-) create mode 100644 cmd/monitor/monitor_backup_retention.go create mode 100644 internal/flypg/barman.go diff --git a/cmd/monitor/main.go b/cmd/monitor/main.go index 402e8717..9eb185aa 100644 --- a/cmd/monitor/main.go +++ b/cmd/monitor/main.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "os" "time" "github.com/fly-apps/postgres-flex/internal/flypg" @@ -16,6 +17,8 @@ var ( defaultDeadMemberRemovalThreshold = time.Hour * 12 defaultInactiveSlotRemovalThreshold = time.Hour * 12 + + defaultBackupRetentionEvaluationThreshold = time.Hour * 1 ) func main() { @@ -36,6 +39,18 @@ func main() { } }() + if os.Getenv("CLOUD_ARCHIVING_ENABLED") == "true" { + barman, err := flypg.NewBarman() + if err != nil { + panic(err) + } + + log.Println("Monitoring backup retention") + barman.PrintRetentionPolicy() + + go monitorBackupRetention(ctx, barman) + } + // Readonly monitor log.Println("Monitoring cluster state") go monitorClusterState(ctx, node) diff --git a/cmd/monitor/monitor_backup_retention.go b/cmd/monitor/monitor_backup_retention.go new file mode 100644 index 00000000..06e8818a --- /dev/null +++ b/cmd/monitor/monitor_backup_retention.go @@ -0,0 +1,24 @@ +package main + +import ( + "context" + "log" + "time" + + "github.com/fly-apps/postgres-flex/internal/flypg" +) + +func monitorBackupRetention(ctx context.Context, barman *flypg.Barman) { + ticker := time.NewTicker(defaultBackupRetentionEvaluationThreshold) + defer ticker.Stop() + for range ticker.C { + result, err := barman.WALArchiveDelete(ctx) + if err != nil { + log.Printf("Backup retention failed with: %s", err) + } + + if len(result) > 0 { + log.Printf("Backup retention response: %s", result) + } + } +} diff --git a/cmd/monitor/monitor_dead_members.go b/cmd/monitor/monitor_dead_members.go index 78370ff9..5602cc06 100644 --- a/cmd/monitor/monitor_dead_members.go +++ b/cmd/monitor/monitor_dead_members.go @@ -37,8 +37,6 @@ func monitorDeadMembers(ctx context.Context, node *flypg.Node) error { ticker := time.NewTicker(deadMemberMonitorFrequency) defer ticker.Stop() - log.Printf("Pruning every %s...\n", removalThreshold) - for range ticker.C { err := deadMemberMonitorTick(ctx, node, seenAt, removalThreshold) if err != nil { diff --git a/cmd/start/main.go b/cmd/start/main.go index abe8a8c3..382ebbae 100644 --- a/cmd/start/main.go +++ b/cmd/start/main.go @@ -38,7 +38,6 @@ func main() { } svisor := supervisor.New("flybarman", 1*time.Minute) - svisor.AddProcess("cron", "/usr/sbin/cron -f", supervisor.WithRestart(0, 5*time.Second)) svisor.AddProcess("barman", fmt.Sprintf("tail -f %s", node.LogFile)) svisor.AddProcess("admin", "/usr/local/bin/start_admin_server", supervisor.WithRestart(0, 5*time.Second), diff --git a/internal/flypg/barman.go b/internal/flypg/barman.go new file mode 100644 index 00000000..c23500c6 --- /dev/null +++ b/internal/flypg/barman.go @@ -0,0 +1,125 @@ +package flypg + +import ( + "context" + "fmt" + "log" + "os" + "os/exec" + "strings" +) + +const ( + cronDirectory = "/data/cron" + archivePrunerCronFile = "/data/cron/barman.cron" + archivePrunerBinary = "/usr/local/bin/archive_pruner" +) + +// # TODO - make this configurable +// retention="RECOVERY WINDOW OF 7 DAYS" +// # TODO - make this configurable +// minimum_redundancy=3 + +// provider="aws-s3" +// endpoint=$AWS_ENDPOINT_URL +// bucket=$AWS_BUCKET_NAME +// name=$FLY_APP_NAME + +// barman-cloud-backup-delete \ +// --cloud-provider "$provider" \ +// --endpoint-url "$endpoint" \ +// --retention "$retention" \ +// --minimum-redundancy "$minimum_redundancy" \ +// "s3://$bucket" "$name" + +type Barman struct { + appName string + provider string + endpoint string + bucket string + minimumRedundancy string + retentionDays string +} + +func NewBarman() (*Barman, error) { + if err := validateBarmanRequirements(); err != nil { + return nil, err + } + + // TODO - Validate minimum and retention day values + + return &Barman{ + appName: os.Getenv("FLY_APP_NAME"), + provider: "aws-s3", + endpoint: strings.TrimSpace(os.Getenv("AWS_ENDPOINT_URL")), + bucket: strings.TrimSpace(os.Getenv("AWS_BUCKET_NAME")), + minimumRedundancy: getenv("CLOUD_ARCHIVING_MINIMUM_REDUNDANCY", "3"), + retentionDays: getenv("CLOUD_ARCHIVING_RETENTION_DAYS", "7"), + }, nil +} + +func (b *Barman) RetentionPolicy() string { + return fmt.Sprintf("'RECOVERY WINDOW OF %s days'", b.retentionDays) +} + +func (b *Barman) WALArchiveDelete(ctx context.Context) ([]byte, error) { + cmd := exec.CommandContext(ctx, b.walArchiveDeleteCommandString()) + return cmd.CombinedOutput() +} + +func (b *Barman) PrintRetentionPolicy() { + str := `Retention Policy + ----------------- + RECOVERY WINDOW OF %d days + MINIMUM BACKUP REDUNDANCY: %d +` + log.Printf(str, b.retentionDays, b.minimumRedundancy) +} + +func (b *Barman) walArchiveDeleteCommandString() string { + return fmt.Sprintf("barman-cloud-backup-delete --cloud-provider %s --endpoint-url %s --retention %s --minimum-redundancy %s s3://%s %s", + b.provider, + b.endpoint, + b.RetentionPolicy(), + b.minimumRedundancy, + b.bucket, + b.appName, + ) +} + +func (b *Barman) walArchiveCommandString() string { + return fmt.Sprintf("'barman-cloud-wal-archive --cloud-provider %s --gzip --endpoint-url %s s3://%s %s %%p'", + b.provider, + b.endpoint, + b.bucket, + b.appName, + ) +} + +func validateBarmanRequirements() error { + if os.Getenv("AWS_ACCESS_KEY_ID") == "" { + return fmt.Errorf("AWS_ACCESS_KEY_ID secret must be set") + } + + if os.Getenv("AWS_SECRET_ACCESS_KEY") == "" { + return fmt.Errorf("AWS_SECRET_ACCESS_KEY secret must be set") + } + + if os.Getenv("AWS_BUCKET_NAME") == "" { + return fmt.Errorf("AWS_BUCKET_NAME envvar must be set") + } + + if os.Getenv("AWS_ENDPOINT_URL") == "" { + return fmt.Errorf("AWS_ENDPOINT_URL envvar must be set") + } + + return nil +} + +func getenv(key, fallback string) string { + value := os.Getenv(key) + if len(value) == 0 { + return fallback + } + return value +} diff --git a/internal/flypg/pg.go b/internal/flypg/pg.go index 574ee8a1..a0dd5d69 100644 --- a/internal/flypg/pg.go +++ b/internal/flypg/pg.go @@ -175,21 +175,17 @@ func (c *PGConfig) SetDefaults() error { // Works to configure archiving to object storage if enabled switch strings.ToLower(os.Getenv("CLOUD_ARCHIVING_ENABLED")) { case "true": - if err := validateCloudArchiver(); err != nil { - return fmt.Errorf("failed to validate s3 archiver: %s", err) + barman, err := NewBarman() + if err != nil { + return err } - bucket := strings.TrimSpace(os.Getenv("AWS_BUCKET_NAME")) - endpoint := strings.TrimSpace(os.Getenv("AWS_ENDPOINT_URL")) - - archiveCommand := fmt.Sprintf("'barman-cloud-wal-archive --cloud-provider aws-s3 --gzip --endpoint-url %s s3://%s %s %%p'", endpoint, bucket, c.AppName) c.internalConfig["archive_mode"] = "on" - c.internalConfig["archive_command"] = archiveCommand + c.internalConfig["archive_command"] = barman.walArchiveCommandString() case "false": c.internalConfig["archive_mode"] = "off" default: - // Noop to remain backwards compatible with existing setups that may have - // manually setup archiving. + // Noop } return nil @@ -558,23 +554,3 @@ func diskSizeInBytes(dir string) (uint64, error) { } return stat.Blocks * uint64(stat.Bsize), nil } - -func validateCloudArchiver() error { - if os.Getenv("AWS_ACCESS_KEY_ID") == "" { - return fmt.Errorf("AWS_ACCESS_KEY_ID secret must be set") - } - - if os.Getenv("AWS_SECRET_ACCESS_KEY") == "" { - return fmt.Errorf("AWS_SECRET_ACCESS_KEY secret must be set") - } - - if os.Getenv("AWS_BUCKET_NAME") == "" { - return fmt.Errorf("AWS_BUCKET_NAME envvar must be set") - } - - if os.Getenv("AWS_ENDPOINT_URL") == "" { - return fmt.Errorf("AWS_ENDPOINT_URL envvar must be set") - } - - return nil -}