Skip to content

Commit

Permalink
Adding cloud archiving support that runs along-side the primary
Browse files Browse the repository at this point in the history
  • Loading branch information
davissp14 committed Jun 25, 2024
1 parent d5c08ce commit d7afc0a
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 2 deletions.
18 changes: 18 additions & 0 deletions bin/barman-cron
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

# Identify the process id of the cron supervisor process
cron_pid=$(pgrep -f '/usr/sbin/cron')
if [ -z "$cron_pid" ]; then
# Send the error message to main process's stderr
echo "Failed to resolve cron process id" >> /proc/1/fd/2
exit 1
fi

barman_output=$(barman cron 2>&1)
barman_exit_code=$?

# Log the result of barman cron
if [ $barman_exit_code -ne 0 ]; then
echo "Barman cron failed with exit code $barman_exit_code: $barman_output" >> /proc/$cron_pid/fd/2
exit 1
fi
4 changes: 2 additions & 2 deletions internal/flybarman/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ wal_retention_policy = main
}

if _, err := os.Stat(n.BarmanCronFile); os.IsNotExist(err) {
barmanCronFileContent := `* * * * * . $HOME/.profile; /usr/bin/barman cron >> /proc/1/fd/1 2>/proc/1/fd/2
`
barmanCronFileContent := `* * * * * . $HOME/.profile; /usr/bin/barman-cron\n`

if err := os.WriteFile(n.BarmanCronFile, []byte(barmanCronFileContent), 0644); err != nil {
return fmt.Errorf("failed write %s: %s", n.BarmanCronFile, err)
}
Expand Down
1 change: 1 addition & 0 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func NewNode() (*Node, error) {
node.RepMgr.Witness = present

node.PGConfig = PGConfig{
AppName: node.AppName,
DataDir: node.DataDir,
Port: node.Port,
ConfigFilePath: fmt.Sprintf("%s/postgresql.conf", node.DataDir),
Expand Down
36 changes: 36 additions & 0 deletions internal/flypg/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
)

type PGConfig struct {
AppName string
ConfigFilePath string
InternalConfigFilePath string
UserConfigFilePath string
Expand Down Expand Up @@ -171,6 +172,20 @@ func (c *PGConfig) SetDefaults() error {
"shared_preload_libraries": fmt.Sprintf("'%s'", strings.Join(sharedPreloadLibraries, ",")),
}

// Works to configure archiving to object storage if enabled
if os.Getenv("CLOUD_ARCHIVING_ENABLED") == "true" {
if err := validateCloudArchiver(); err != nil {
return fmt.Errorf("failed to validate s3 archiver: %s", err)
}

bucket := strings.TrimSpace(os.Getenv("AWS_BUCKET_NAME"))
endpoint := strings.TrimSpace(os.Getenv("AWS_ENDPOINT_URL"))

archiveCommand := fmt.Sprintf("'barman-cloud-wal-archive --cloud-provider aws-s3 --endpoint-url %s s3://%s %s %%p'", endpoint, bucket, c.AppName)
c.internalConfig["archive_mode"] = "on"
c.internalConfig["archive_command"] = archiveCommand
}

return nil
}

Expand Down Expand Up @@ -336,6 +351,7 @@ func (c *PGConfig) validateCompatibility(requested ConfigMap) (ConfigMap, error)
}

archiveMode := resolveConfigValue(requested, current, "archive_mode", "off")

if archiveMode.(string) != "off" {
return requested, errors.New("archive_mode must be set to `off` before wal_level can be set to `minimal`")
}
Expand Down Expand Up @@ -537,3 +553,23 @@ func diskSizeInBytes(dir string) (uint64, error) {
}
return stat.Blocks * uint64(stat.Bsize), nil
}

func validateCloudArchiver() error {
if os.Getenv("AWS_ACCESS_KEY_ID") == "" {
return fmt.Errorf("AWS_ACCESS_KEY_ID secret must be set")
}

if os.Getenv("AWS_SECRET_ACCESS_KEY") == "" {
return fmt.Errorf("AWS_SECRET_ACCESS_KEY secret must be set")
}

if os.Getenv("AWS_BUCKET_NAME") == "" {
return fmt.Errorf("AWS_BUCKET_NAME envvar must be set")
}

if os.Getenv("AWS_ENDPOINT_URL") == "" {
return fmt.Errorf("AWS_ENDPOINT_URL envvar must be set")
}

return nil
}
31 changes: 31 additions & 0 deletions internal/flypg/pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,36 @@ func TestPGConfigInitialization(t *testing.T) {
if cfg["shared_preload_libraries"] != expected {
t.Fatalf("expected %s, got %s", expected, cfg["shared_preload_libraries"])
}
})

t.Run("cloud-archiving", func(t *testing.T) {
t.Setenv("CLOUD_ARCHIVING_ENABLED", "true")
t.Setenv("AWS_ACCESS_KEY_ID", "my-key")
t.Setenv("AWS_SECRET_ACCESS_KEY", "my-secret")
t.Setenv("AWS_BUCKET_NAME", "my-bucket")

store, _ := state.NewStore()

if err := pgConf.initialize(store); err != nil {
t.Fatal(err)
}

cfg, err := pgConf.CurrentConfig()
if err != nil {
t.Fatal(err)
}

if cfg["archive_mode"] != "on" {
t.Fatalf("expected archive_mode to be on, got %v", cfg["archive_mode"])
}

expected := fmt.Sprintf("'barman-cloud-wal-archive --cloud-provider aws-s3 --endpoint-url https://fly.storage.tigris.dev s3://%s %s %%p'",
os.Getenv("AWS_BUCKET_NAME"),
pgConf.AppName)

if cfg["archive_command"] != expected {
t.Fatalf("expected %s, got %s", expected, cfg["archive_command"])
}
})
}

Expand Down Expand Up @@ -208,6 +237,7 @@ func TestPGDefaultPassword(t *testing.T) {
defer cleanup()

pgConf := &PGConfig{
AppName: "my-app",
DataDir: pgTestDirectory,
Port: 5433,
ConfigFilePath: pgConfigFilePath,
Expand Down Expand Up @@ -262,6 +292,7 @@ func TestValidateCompatibility(t *testing.T) {
defer cleanup()

pgConf := &PGConfig{
AppName: "my-app",
DataDir: pgTestDirectory,
Port: 5433,
ConfigFilePath: pgConfigFilePath,
Expand Down

0 comments on commit d7afc0a

Please sign in to comment.