Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WAL-Archiving + PITR #231

Merged
merged 51 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
261167d
Allow cloud archiving to run next ot the primary
davissp14 Jun 25, 2024
c7e764a
fix test
davissp14 Jun 25, 2024
479dd0f
Retention work
davissp14 Jun 26, 2024
8d497ea
Cleanup
davissp14 Jun 26, 2024
2b69519
fix log
davissp14 Jun 26, 2024
2cc0b45
Fix deletion call
davissp14 Jun 26, 2024
ba33e59
Move the log under the debug flag
davissp14 Jun 26, 2024
39bc76a
Adding tests
davissp14 Jun 26, 2024
78c3ac0
Checkin
davissp14 Jun 27, 2024
47a6f21
Checkin
davissp14 Jun 27, 2024
d91ea1a
Add todo
davissp14 Jun 27, 2024
96190c4
Update AWS_ENDPOINT_URL -> AWS_ENDPOINT_URL_S3
davissp14 Jun 27, 2024
1033d55
Refactor
davissp14 Jun 27, 2024
645e07d
Checkin
davissp14 Jun 27, 2024
49784fe
Working PITR, but there's quite a bit of cleanup to do
davissp14 Jun 29, 2024
f5e1353
cleanup
davissp14 Jun 29, 2024
c1b27e5
cleanup
davissp14 Jun 30, 2024
a3e3ced
Didn't mean to remove this
davissp14 Jun 30, 2024
f431edb
Set archive_timeout
davissp14 Jun 30, 2024
383da38
More cleanup
davissp14 Jul 1, 2024
b17193f
fixes and cleanup
davissp14 Jul 1, 2024
4d6c412
Fixing up recovery time logic
davissp14 Jul 1, 2024
900b8e6
Adding todo's and removing unused code
davissp14 Jul 1, 2024
8fa510c
Unexport functions
davissp14 Jul 1, 2024
4ddcecc
Separate credentials into separate profiles
davissp14 Jul 1, 2024
61eedd0
Fix errcheck
davissp14 Jul 1, 2024
ea0c59e
Add todo
davissp14 Jul 1, 2024
fd001e5
static check fixes
davissp14 Jul 1, 2024
dc6c13c
Make archive settings configurable
davissp14 Jul 2, 2024
eba8aa7
Removing unnecessary change
davissp14 Jul 2, 2024
644c0d1
Cleanup
davissp14 Jul 2, 2024
d19644c
Adding tests
davissp14 Jul 2, 2024
2ac4327
Bug fix
davissp14 Jul 2, 2024
6284d76
Wait for PG to be up before monitoring. Also removing some logging t…
davissp14 Jul 2, 2024
6a76b7b
Fix compile error
benwaffle Jul 2, 2024
c37df40
Adding log indicating the monitor is starting
davissp14 Jul 2, 2024
b7802e3
Rename the env vars to something more appropriate
davissp14 Jul 2, 2024
06c4695
Cleanup
davissp14 Jul 2, 2024
2029c2c
Fix that ensures the user configured full_backup_frequency is honored
davissp14 Jul 2, 2024
589866b
Cleanup
davissp14 Jul 3, 2024
5eaa2c2
Fixes issue where a failed backup was being selected as a base restor…
davissp14 Jul 3, 2024
5734e7c
Remove support for 'latest' string, since this should be controlled w…
davissp14 Jul 3, 2024
010ffb0
Don't allow full_backup_frequencies lower than 1h
davissp14 Jul 3, 2024
884a782
Cleaning up the config validations
davissp14 Jul 3, 2024
53c1112
Adding support for targetTimeline
davissp14 Jul 3, 2024
b8fdd5c
Do not perform a remote restore if there's an existing postgresql dir…
davissp14 Jul 3, 2024
096e3ad
Revert back to RFC3339
davissp14 Jul 3, 2024
fdbbc12
Postgres doesn't support 'Z', so trim it and conver to -00:00
davissp14 Jul 3, 2024
6e3aa73
Moving back to RFC3339, however, 'Z' needs to be stripped and replace…
davissp14 Jul 3, 2024
0f58fd5
Allow target to be left out
davissp14 Jul 5, 2024
5a6eae6
Cleanup
davissp14 Jul 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions cmd/monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"os"
"time"

"github.com/fly-apps/postgres-flex/internal/flypg"
Expand All @@ -16,6 +17,8 @@ var (

defaultDeadMemberRemovalThreshold = time.Hour * 12
defaultInactiveSlotRemovalThreshold = time.Hour * 12

defaultBackupRetentionEvaluationThreshold = time.Hour * 1
)

func main() {
Expand All @@ -36,6 +39,18 @@ func main() {
}
}()

if os.Getenv("CLOUD_ARCHIVING_ENABLED") == "true" {
barman, err := flypg.NewBarman()
if err != nil {
panic(err)
}

log.Println("Monitoring backup retention")
barman.PrintRetentionPolicy()

go monitorBackupRetention(ctx, barman)
}

// Readonly monitor
log.Println("Monitoring cluster state")
go monitorClusterState(ctx, node)
Expand Down
24 changes: 24 additions & 0 deletions cmd/monitor/monitor_backup_retention.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"context"
"log"
"time"

"github.com/fly-apps/postgres-flex/internal/flypg"
)

func monitorBackupRetention(ctx context.Context, barman *flypg.Barman) {
ticker := time.NewTicker(defaultBackupRetentionEvaluationThreshold)
defer ticker.Stop()
for range ticker.C {
result, err := barman.WALArchiveDelete(ctx)
if err != nil {
log.Printf("Backup retention failed with: %s", err)
}

if len(result) > 0 {
log.Printf("Backup retention response: %s", result)
}
}
}
2 changes: 0 additions & 2 deletions cmd/monitor/monitor_dead_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ func monitorDeadMembers(ctx context.Context, node *flypg.Node) error {
ticker := time.NewTicker(deadMemberMonitorFrequency)
defer ticker.Stop()

log.Printf("Pruning every %s...\n", removalThreshold)

for range ticker.C {
err := deadMemberMonitorTick(ctx, node, seenAt, removalThreshold)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion cmd/start/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func main() {
}

svisor := supervisor.New("flybarman", 1*time.Minute)
svisor.AddProcess("cron", "/usr/sbin/cron -f", supervisor.WithRestart(0, 5*time.Second))
svisor.AddProcess("barman", fmt.Sprintf("tail -f %s", node.LogFile))
svisor.AddProcess("admin", "/usr/local/bin/start_admin_server",
supervisor.WithRestart(0, 5*time.Second),
Expand Down
4 changes: 2 additions & 2 deletions internal/flybarman/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ wal_retention_policy = main
return fmt.Errorf("failed to write file %s: %s", n.RootPasswordConfigPath, err)
}

barmanCronFileContent := `* * * * * /usr/local/bin/barman_cron
`
barmanCronFileContent := `* * * * * /usr/local/bin/barman_cron`
davissp14 marked this conversation as resolved.
Show resolved Hide resolved

if err := os.WriteFile(n.BarmanCronFile, []byte(barmanCronFileContent), 0644); err != nil {
return fmt.Errorf("failed write %s: %s", n.BarmanCronFile, err)
}
Expand Down
116 changes: 116 additions & 0 deletions internal/flypg/barman.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package flypg

import (
"context"
"fmt"
"log"
"os"
"strings"

"github.com/fly-apps/postgres-flex/internal/utils"
)

const (
barmanRecoveryDirectory = "/data/postgresql"
)

type Barman struct {
appName string
provider string
endpoint string
bucket string

// fullBackupFrequency string
minimumRedundancy string
retentionDays string
}

func NewBarman() (*Barman, error) {
if err := validateBarman(); err != nil {
return nil, err
}

// TODO - Validate minimum and retention day values
minRedundancy := getenv("CLOUD_ARCHIVING_MINIMUM_REDUNDANCY", "3")
retentionDays := getenv("CLOUD_ARCHIVING_RETENTION_DAYS", "7")

return &Barman{
appName: os.Getenv("FLY_APP_NAME"),
provider: "aws-s3",
endpoint: strings.TrimSpace(os.Getenv("AWS_ENDPOINT_URL")),
bucket: strings.TrimSpace(os.Getenv("AWS_BUCKET_NAME")),

// fullBackupFrequency: getenv("CLOUD_ARCHIVING_FULL_BACKUP_FREQUENCY", "1"),
minimumRedundancy: minRedundancy,
retentionDays: retentionDays,
}, nil
}

func (b *Barman) RetentionPolicy() string {
return fmt.Sprintf("'RECOVERY WINDOW OF %s days'", b.retentionDays)
}

// WALArchiveDelete deletes backups/WAL based on the specified retention policy.
func (b *Barman) WALArchiveDelete(ctx context.Context) ([]byte, error) {
return utils.RunCommand(b.walArchiveDeleteCommand(), "postgres")
}

func (b *Barman) PrintRetentionPolicy() {
str := `
Retention Policy
-----------------
RECOVERY WINDOW OF %s days
MINIMUM BACKUP REDUNDANCY: %s
FULL BACKUP FREQUENCY: %s day(s)
`
log.Printf(str, b.retentionDays, b.minimumRedundancy)
}

func (b *Barman) walArchiveDeleteCommand() string {
return fmt.Sprintf("barman-cloud-backup-delete --cloud-provider %s --endpoint-url %s --retention %s --minimum-redundancy %s s3://%s %s",
b.provider,
b.endpoint,
b.RetentionPolicy(),
b.minimumRedundancy,
b.bucket,
b.appName,
)
}

func (b *Barman) walArchiveCommand() string {
// TODO - Make compression configurable
return fmt.Sprintf("barman-cloud-wal-archive --cloud-provider %s --gzip --endpoint-url %s s3://%s %s %%p",
b.provider,
b.endpoint,
b.bucket,
b.appName,
)
}

func validateBarman() error {
if os.Getenv("AWS_ACCESS_KEY_ID") == "" {
return fmt.Errorf("AWS_ACCESS_KEY_ID secret must be set")
}

if os.Getenv("AWS_SECRET_ACCESS_KEY") == "" {
return fmt.Errorf("AWS_SECRET_ACCESS_KEY secret must be set")
}

if os.Getenv("AWS_BUCKET_NAME") == "" {
return fmt.Errorf("AWS_BUCKET_NAME envvar must be set")
}

if os.Getenv("AWS_ENDPOINT_URL") == "" {
return fmt.Errorf("AWS_ENDPOINT_URL envvar must be set")
}

return nil
}

func getenv(key, fallback string) string {
value := os.Getenv(key)
if len(value) == 0 {
return fallback
}
return value
}
Loading
Loading