Skip to content

Commit

Permalink
Retention work
Browse files Browse the repository at this point in the history
  • Loading branch information
davissp14 committed Jun 26, 2024
1 parent c7e764a commit 479dd0f
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 32 deletions.
15 changes: 15 additions & 0 deletions cmd/monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"os"
"time"

"github.com/fly-apps/postgres-flex/internal/flypg"
Expand All @@ -16,6 +17,8 @@ var (

defaultDeadMemberRemovalThreshold = time.Hour * 12
defaultInactiveSlotRemovalThreshold = time.Hour * 12

defaultBackupRetentionEvaluationThreshold = time.Hour * 1
)

func main() {
Expand All @@ -36,6 +39,18 @@ func main() {
}
}()

if os.Getenv("CLOUD_ARCHIVING_ENABLED") == "true" {
barman, err := flypg.NewBarman()
if err != nil {
panic(err)
}

log.Println("Monitoring backup retention")
barman.PrintRetentionPolicy()

go monitorBackupRetention(ctx, barman)
}

// Readonly monitor
log.Println("Monitoring cluster state")
go monitorClusterState(ctx, node)
Expand Down
24 changes: 24 additions & 0 deletions cmd/monitor/monitor_backup_retention.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"context"
"log"
"time"

"github.com/fly-apps/postgres-flex/internal/flypg"
)

func monitorBackupRetention(ctx context.Context, barman *flypg.Barman) {
ticker := time.NewTicker(defaultBackupRetentionEvaluationThreshold)
defer ticker.Stop()
for range ticker.C {
result, err := barman.WALArchiveDelete(ctx)
if err != nil {
log.Printf("Backup retention failed with: %s", err)
}

if len(result) > 0 {
log.Printf("Backup retention response: %s", result)
}
}
}
2 changes: 0 additions & 2 deletions cmd/monitor/monitor_dead_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ func monitorDeadMembers(ctx context.Context, node *flypg.Node) error {
ticker := time.NewTicker(deadMemberMonitorFrequency)
defer ticker.Stop()

log.Printf("Pruning every %s...\n", removalThreshold)

for range ticker.C {
err := deadMemberMonitorTick(ctx, node, seenAt, removalThreshold)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion cmd/start/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func main() {
}

svisor := supervisor.New("flybarman", 1*time.Minute)
svisor.AddProcess("cron", "/usr/sbin/cron -f", supervisor.WithRestart(0, 5*time.Second))
svisor.AddProcess("barman", fmt.Sprintf("tail -f %s", node.LogFile))
svisor.AddProcess("admin", "/usr/local/bin/start_admin_server",
supervisor.WithRestart(0, 5*time.Second),
Expand Down
125 changes: 125 additions & 0 deletions internal/flypg/barman.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package flypg

import (
"context"
"fmt"
"log"
"os"
"os/exec"
"strings"
)

const (
cronDirectory = "/data/cron"

Check failure on line 13 in internal/flypg/barman.go

View workflow job for this annotation

GitHub Actions / Staticcheck

const cronDirectory is unused (U1000)
archivePrunerCronFile = "/data/cron/barman.cron"

Check failure on line 14 in internal/flypg/barman.go

View workflow job for this annotation

GitHub Actions / Staticcheck

const archivePrunerCronFile is unused (U1000)
archivePrunerBinary = "/usr/local/bin/archive_pruner"

Check failure on line 15 in internal/flypg/barman.go

View workflow job for this annotation

GitHub Actions / Staticcheck

const archivePrunerBinary is unused (U1000)
)

// # TODO - make this configurable
// retention="RECOVERY WINDOW OF 7 DAYS"
// # TODO - make this configurable
// minimum_redundancy=3

// provider="aws-s3"
// endpoint=$AWS_ENDPOINT_URL
// bucket=$AWS_BUCKET_NAME
// name=$FLY_APP_NAME

// barman-cloud-backup-delete \
// --cloud-provider "$provider" \
// --endpoint-url "$endpoint" \
// --retention "$retention" \
// --minimum-redundancy "$minimum_redundancy" \
// "s3://$bucket" "$name"

type Barman struct {
appName string
provider string
endpoint string
bucket string
minimumRedundancy string
retentionDays string
}

func NewBarman() (*Barman, error) {
if err := validateBarmanRequirements(); err != nil {
return nil, err
}

// TODO - Validate minimum and retention day values

return &Barman{
appName: os.Getenv("FLY_APP_NAME"),
provider: "aws-s3",
endpoint: strings.TrimSpace(os.Getenv("AWS_ENDPOINT_URL")),
bucket: strings.TrimSpace(os.Getenv("AWS_BUCKET_NAME")),
minimumRedundancy: getenv("CLOUD_ARCHIVING_MINIMUM_REDUNDANCY", "3"),
retentionDays: getenv("CLOUD_ARCHIVING_RETENTION_DAYS", "7"),
}, nil
}

func (b *Barman) RetentionPolicy() string {
return fmt.Sprintf("'RECOVERY WINDOW OF %s days'", b.retentionDays)
}

func (b *Barman) WALArchiveDelete(ctx context.Context) ([]byte, error) {
cmd := exec.CommandContext(ctx, b.walArchiveDeleteCommandString())
return cmd.CombinedOutput()
}

func (b *Barman) PrintRetentionPolicy() {
str := `Retention Policy
-----------------
RECOVERY WINDOW OF %d days
MINIMUM BACKUP REDUNDANCY: %d
`
log.Printf(str, b.retentionDays, b.minimumRedundancy)
}

func (b *Barman) walArchiveDeleteCommandString() string {
return fmt.Sprintf("barman-cloud-backup-delete --cloud-provider %s --endpoint-url %s --retention %s --minimum-redundancy %s s3://%s %s",
b.provider,
b.endpoint,
b.RetentionPolicy(),
b.minimumRedundancy,
b.bucket,
b.appName,
)
}

func (b *Barman) walArchiveCommandString() string {
return fmt.Sprintf("'barman-cloud-wal-archive --cloud-provider %s --gzip --endpoint-url %s s3://%s %s %%p'",
b.provider,
b.endpoint,
b.bucket,
b.appName,
)
}

func validateBarmanRequirements() error {
if os.Getenv("AWS_ACCESS_KEY_ID") == "" {
return fmt.Errorf("AWS_ACCESS_KEY_ID secret must be set")
}

if os.Getenv("AWS_SECRET_ACCESS_KEY") == "" {
return fmt.Errorf("AWS_SECRET_ACCESS_KEY secret must be set")
}

if os.Getenv("AWS_BUCKET_NAME") == "" {
return fmt.Errorf("AWS_BUCKET_NAME envvar must be set")
}

if os.Getenv("AWS_ENDPOINT_URL") == "" {
return fmt.Errorf("AWS_ENDPOINT_URL envvar must be set")
}

return nil
}

func getenv(key, fallback string) string {
value := os.Getenv(key)
if len(value) == 0 {
return fallback
}
return value
}
34 changes: 5 additions & 29 deletions internal/flypg/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,21 +175,17 @@ func (c *PGConfig) SetDefaults() error {
// Works to configure archiving to object storage if enabled
switch strings.ToLower(os.Getenv("CLOUD_ARCHIVING_ENABLED")) {
case "true":
if err := validateCloudArchiver(); err != nil {
return fmt.Errorf("failed to validate s3 archiver: %s", err)
barman, err := NewBarman()
if err != nil {
return err
}

bucket := strings.TrimSpace(os.Getenv("AWS_BUCKET_NAME"))
endpoint := strings.TrimSpace(os.Getenv("AWS_ENDPOINT_URL"))

archiveCommand := fmt.Sprintf("'barman-cloud-wal-archive --cloud-provider aws-s3 --gzip --endpoint-url %s s3://%s %s %%p'", endpoint, bucket, c.AppName)
c.internalConfig["archive_mode"] = "on"
c.internalConfig["archive_command"] = archiveCommand
c.internalConfig["archive_command"] = barman.walArchiveCommandString()
case "false":
c.internalConfig["archive_mode"] = "off"
default:
// Noop to remain backwards compatible with existing setups that may have
// manually setup archiving.
// Noop
}

return nil
Expand Down Expand Up @@ -558,23 +554,3 @@ func diskSizeInBytes(dir string) (uint64, error) {
}
return stat.Blocks * uint64(stat.Bsize), nil
}

func validateCloudArchiver() error {
if os.Getenv("AWS_ACCESS_KEY_ID") == "" {
return fmt.Errorf("AWS_ACCESS_KEY_ID secret must be set")
}

if os.Getenv("AWS_SECRET_ACCESS_KEY") == "" {
return fmt.Errorf("AWS_SECRET_ACCESS_KEY secret must be set")
}

if os.Getenv("AWS_BUCKET_NAME") == "" {
return fmt.Errorf("AWS_BUCKET_NAME envvar must be set")
}

if os.Getenv("AWS_ENDPOINT_URL") == "" {
return fmt.Errorf("AWS_ENDPOINT_URL envvar must be set")
}

return nil
}

0 comments on commit 479dd0f

Please sign in to comment.