Skip to content

Commit

Permalink
bug fixes, actually try out gcp/azure flows
Browse files Browse the repository at this point in the history
  • Loading branch information
Lucretius committed May 1, 2020
1 parent e6bcdf5 commit 6b9d14a
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 35 deletions.
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,45 @@ In this way, the daemon will always run on the leader Raft node.

Another way to do this, which would allow us to run the snapshot agent anywhere, is to simply have the daemons form their own Raft cluster, but this approach seemed much more cumbersome.

## Running

The recommended way of running this daemon is using systemctl, since it handles restarts and failure scenarios quite well. To learn more about systemctl, checkout [this article](https://www.digitalocean.com/community/tutorials/how-to-use-systemctl-to-manage-systemd-services-and-units). begin, create the following file at `/etc/systemd/system/snapshot.service`:

```
[Unit]
Description="An Open Source Snapshot Service for Raft"
Documentation=https://github.com/Lucretius/vault_raft_snapshot_agent/
Requires=network-online.target
After=network-online.target
ConditionFileNotEmpty=/etc/vault.d/snapshot.json
[Service]
Type=simple
User=vault
Group=vault
ExecStart=/usr/local/bin/vault_raft_snapshot_agent
ExecReload=/usr/local/bin/vault_raft_snapshot_agent
KillMode=process
Restart=on-failure
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target
```

Your configuration is assumed to exist at `/etc/vault.d/snapshot.json` and the actual daemon binary at `/usr/local/bin/vault_raft_snapshot_agent`.

Then just run:

```
sudo systemctl enable snapshot
sudo systemctl start snapshot
```

If your configuration is right and Vault is running on the same host as the agent you will see one of the following:

`Not running on leader node, skipping.` or `Successfully created <type> snapshot to <location>`, depending on if the daemon runs on the leader's host or not.

## Configuration

`addr` The address of the Vault cluster. This is used to check the Vault cluster leader IP, as well as generate snapshots.
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Configuration struct {
AWS S3Config `json:"aws_storage"`
Local LocalConfig `json:"local_storage"`
GCP GCPConfig `json:"google_storage"`
Azure AzureConfig `json:"azure_blob_storage"`
Azure AzureConfig `json:"azure_storage"`
RoleID string `json:"role_id"`
SecretID string `json:"secret_id"`
}
Expand Down
8 changes: 7 additions & 1 deletion snapshot_agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ func NewSnapshotter(config *config.Configuration) (*Snapshotter, error) {
}
}
if config.GCP.Bucket != "" {
err = snapshotter.ConfigureS3(config)
err = snapshotter.ConfigureGCP(config)
if err != nil {
return nil, err
}
}
if config.Azure.ContainerName != "" {
err = snapshotter.ConfigureAzure(config)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion snapshot_agent/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ func (s *Snapshotter) CreateAzureSnapshot(reader io.ReadWriter, config *config.C
return o1.Properties.LastModified.Before(o2.Properties.LastModified)
}
AzureBy(timestamp).Sort(blobs)

if len(blobs)-int(config.Retain) <= 0 {
return url, nil
}
blobsToDelete := blobs[0 : len(blobs)-int(config.Retain)]

for _, b := range blobsToDelete {
Expand Down
72 changes: 40 additions & 32 deletions snapshot_agent/gcp.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package snapshot_agent

import (
"bytes"
"context"
"fmt"
"io"
"log"
"sort"

Expand All @@ -13,48 +13,56 @@ import (
)

// CreateGCPSnapshot writes snapshot to google storage
func (s *Snapshotter) CreateGCPSnapshot(reader io.ReadWriter, config *config.Configuration, currentTs int64) (string, error) {
func (s *Snapshotter) CreateGCPSnapshot(b *bytes.Buffer, config *config.Configuration, currentTs int64) (string, error) {
fileName := fmt.Sprintf("raft_snapshot-%d.snap", currentTs)
obj := s.GCPBucket.Object(fileName)
w := obj.NewWriter(context.Background())
if _, err := io.Copy(w, reader); err != nil {

if _, err := w.Write(b.Bytes()); err != nil {
return "", err
} else {
if config.Retain > 0 {
deleteCtx := context.Background()
query := &storage.Query{Prefix: "raft_snapshot-"}
it := s.GCPBucket.Objects(deleteCtx, query)
var files []storage.ObjectAttrs
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Println("Unable to iterate through bucket to find old snapshots to delete")
return fileName, err
}
files = append(files, *attrs)
}
}

if err := w.Close(); err != nil {
return "", err
}

timestamp := func(o1, o2 *storage.ObjectAttrs) bool {
return o1.Updated.Before(o2.Updated)
if config.Retain > 0 {
deleteCtx := context.Background()
query := &storage.Query{Prefix: "raft_snapshot-"}
it := s.GCPBucket.Objects(deleteCtx, query)
var files []storage.ObjectAttrs
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Println("Unable to iterate through bucket to find old snapshots to delete")
return fileName, err
}
files = append(files, *attrs)
}

GCPBy(timestamp).Sort(files)
snapshotsToDelete := files[0 : len(files)-int(config.Retain)]
timestamp := func(o1, o2 *storage.ObjectAttrs) bool {
return o1.Updated.Before(o2.Updated)
}

GCPBy(timestamp).Sort(files)
if len(files)-int(config.Retain) <= 0 {
return fileName, nil
}
snapshotsToDelete := files[0 : len(files)-int(config.Retain)]

for _, ss := range snapshotsToDelete {
obj := s.GCPBucket.Object(ss.Name)
err = obj.Delete(deleteCtx)
if err != nil {
log.Println("Cannot delete old snapshot")
return fileName, err
}
for _, ss := range snapshotsToDelete {
obj := s.GCPBucket.Object(ss.Name)
err := obj.Delete(deleteCtx)
if err != nil {
log.Println("Cannot delete old snapshot")
return fileName, err
}
}
return fileName, nil
}
return fileName, nil
}

// implementation of Sort interface for s3 objects
Expand Down
3 changes: 3 additions & 0 deletions snapshot_agent/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func (s *Snapshotter) CreateS3Snapshot(reader io.ReadWriter, config *config.Conf
return o1.LastModified.Before(*o2.LastModified)
}
S3By(timestamp).Sort(existingSnapshots)
if len(existingSnapshots)-int(config.Retain) <= 0 {
return o.Location, nil
}
snapshotsToDelete := existingSnapshots[0 : len(existingSnapshots)-int(config.Retain)]

for i := range snapshotsToDelete {
Expand Down

0 comments on commit 6b9d14a

Please sign in to comment.