Skip to content
This repository has been archived by the owner on Jun 21, 2022. It is now read-only.

Commit

Permalink
inits backup strategy implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mhausenblas committed Jul 7, 2017
1 parent d5fbf8e commit a5095b6
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 107 deletions.
112 changes: 27 additions & 85 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/mhausenblas/reshifter/pkg/remotes"
"github.com/mhausenblas/reshifter/pkg/types"
"github.com/mhausenblas/reshifter/pkg/util"
"github.com/pierrre/archivefile/zip"
)

// Backup iterates over well-known Kubernetes (distro) keys in an etcd server
Expand All @@ -36,6 +35,9 @@ func Backup(endpoint, target, remote, bucket string) (string, error) {
if err != nil {
return "", fmt.Errorf("Can't determine Kubernetes distro: %s", err)
}

strategyName, strategy := pickStrategy()

// deal with etcd3 servers:
if strings.HasPrefix(version, "3") {
c3, cerr := util.NewClient3(endpoint, secure)
Expand All @@ -44,24 +46,12 @@ func Backup(endpoint, target, remote, bucket string) (string, error) {
}
defer func() { _ = c3.Close() }()
log.WithFields(log.Fields{"func": "backup.Backup"}).Debug(fmt.Sprintf("Got etcd3 cluster with endpoints %v", c3.Endpoints()))
err = discovery.Visit3(c3, types.KubernetesPrefix, types.Vanilla, func(path string, val string) error {
_, err = store(target, path, val)
if err != nil {
return fmt.Errorf("Can't store backup locally: %s", err)
}
return nil
})
err = discovery.Visit3(c3, types.KubernetesPrefix, target, types.Vanilla, strategy, strategyName)
if err != nil {
return "", err
}
if distrotype == types.OpenShift {
err = discovery.Visit3(c3, types.OpenShiftPrefix, types.OpenShift, func(path string, val string) error {
_, err = store(target, path, val)
if err != nil {
return fmt.Errorf("Can't store backup locally: %s", err)
}
return nil
})
err = discovery.Visit3(c3, types.OpenShiftPrefix, target, types.OpenShift, strategy, strategyName)
if err != nil {
return "", err
}
Expand All @@ -75,91 +65,43 @@ func Backup(endpoint, target, remote, bucket string) (string, error) {
}
kapi := client.NewKeysAPI(c2)
log.WithFields(log.Fields{"func": "backup.Backup"}).Debug(fmt.Sprintf("Got etcd2 cluster with %v", c2.Endpoints()))
err = discovery.Visit2(kapi, types.KubernetesPrefix, func(path string, val string) error {
_, err = store(target, path, val)
if err != nil {
return fmt.Errorf("Can't store backup locally: %s", err)
}
return nil
})
err = discovery.Visit2(kapi, types.KubernetesPrefix, target, strategy, strategyName)
if err != nil {
return "", err
}
if distrotype == types.OpenShift {
err = discovery.Visit2(kapi, types.OpenShiftPrefix, func(path string, val string) error {
_, err = store(target, path, val)
if err != nil {
return fmt.Errorf("Can't store backup locally: %s", err)
}
return nil
})
err = discovery.Visit2(kapi, types.OpenShiftPrefix, target, strategy, strategyName)
if err != nil {
return "", err
}
}
}
// create ZIP file of the reaped content:
_, err = arch(target)
if err != nil {
return "", err
}
if remote != "" {
err = remotes.StoreInS3(remote, bucket, target, based)

if strategyName == types.ReapFunctionRaw {
// create ZIP file of the reaped content:
_, err = arch(target)
if err != nil {
return "", err
}
if remote != "" {
err = remotes.StoreInS3(remote, bucket, target, based)
if err != nil {
return "", err
}
}
}
return based, nil
}

// store creates a file at based+path with val as its content.
// based is the output directory to use and path can be
// any valid etcd key (with ':'' characters being escaped automatically).
func store(based string, path string, val string) (string, error) {
log.WithFields(log.Fields{"func": "backup.store"}).Debug(fmt.Sprintf("Trying to store %s with value=%s in %s", path, val, based))
// make sure we're dealing with a valid path
// that is, non-empty and has to start with /:
if path == "" || (strings.Index(path, "/") != 0) {
return "", fmt.Errorf("Path has to be non-empty")
}
// escape ":" in the path so that we have no issues storing it in the filesystem:
fpath, _ := filepath.Abs(filepath.Join(based, strings.Replace(path, ":", types.EscapeColon, -1)))
if path == "/" {
log.WithFields(log.Fields{"func": "backup.store"}).Debug(fmt.Sprintf("Rewriting root"))
fpath = based
}
err := os.MkdirAll(fpath, os.ModePerm)
if err != nil {
return "", fmt.Errorf("%s", err)
}
cpath, _ := filepath.Abs(filepath.Join(fpath, types.ContentFile))
c, err := os.Create(cpath)
if err != nil {
return "", fmt.Errorf("%s", err)
}
defer func() {
_ = c.Close()
}()
nbytes, err := c.WriteString(val)
if err != nil {
return "", fmt.Errorf("%s", err)
}
log.WithFields(log.Fields{"func": "backup.store"}).Debug(fmt.Sprintf("Stored %s in %s with %d bytes", path, fpath, nbytes))
return cpath, nil
return based, nil
}

// arch creates a ZIP archive of the content store() has generated
func arch(based string) (string, error) {
defer func() {
_ = os.RemoveAll(based)
}()
log.WithFields(log.Fields{"func": "backup.arch"}).Debug(fmt.Sprintf("Trying to pack backup into %s.zip", based))
opath := based + ".zip"
err := zip.ArchiveFile(based, opath, func(apath string) {
log.WithFields(log.Fields{"func": "backup.arch"}).Debug(fmt.Sprintf("%s", apath))
})
if err != nil {
return "", fmt.Errorf("Can't create archive or no content to back up: %s", err)
func pickStrategy() (string, types.Reap) {
backupstrategy := os.Getenv("RS_BACKUP_STRATEGY")
switch backupstrategy {
case "raw":
return types.ReapFunctionRaw, raw
case "render":
return types.ReapFunctionRender, render
default:
return types.ReapFunctionRaw, raw
}
return opath, nil
}
91 changes: 91 additions & 0 deletions pkg/backup/strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package backup

import (
"fmt"
"io"
"os"
"path/filepath"
"strings"

log "github.com/Sirupsen/logrus"
"github.com/mhausenblas/reshifter/pkg/types"
"github.com/pierrre/archivefile/zip"
)

// raw is a reap strategy that stores val in a path in directory target.
func raw(path, val string, target interface{}) error {
t, ok := target.(string)
if !ok {
return fmt.Errorf("Can't use target %v, it should be a string!", target)
}
_, err := store(t, path, val)
if err != nil {
return fmt.Errorf("Can't store %s%s in %s: %s", t, path, val, err)
}
return nil
}

// render is a reap strategy that writes path/val to a writer.
func render(path, val string, writer interface{}) error {
w, ok := writer.(io.Writer)
if !ok {
return fmt.Errorf("Can't use writer %v, it should be an io.Writer!", writer)
}
_, err := w.Write([]byte(fmt.Sprintf("%s = %s\n", path, val)))
if err != nil {
return fmt.Errorf("Can't write %s%s to %v: %s", path, val, w, err)
}
return nil
}

// store creates a file at based+path with val as its content.
// based is the output directory to use and path can be
// any valid etcd key (with ':'' characters being escaped automatically).
func store(based string, path string, val string) (string, error) {
log.WithFields(log.Fields{"func": "backup.store"}).Debug(fmt.Sprintf("Trying to store %s with value=%s in %s", path, val, based))
// make sure we're dealing with a valid path
// that is, non-empty and has to start with /:
if path == "" || (strings.Index(path, "/") != 0) {
return "", fmt.Errorf("Path has to be non-empty")
}
// escape ":" in the path so that we have no issues storing it in the filesystem:
fpath, _ := filepath.Abs(filepath.Join(based, strings.Replace(path, ":", types.EscapeColon, -1)))
if path == "/" {
log.WithFields(log.Fields{"func": "backup.store"}).Debug(fmt.Sprintf("Rewriting root"))
fpath = based
}
err := os.MkdirAll(fpath, os.ModePerm)
if err != nil {
return "", fmt.Errorf("%s", err)
}
cpath, _ := filepath.Abs(filepath.Join(fpath, types.ContentFile))
c, err := os.Create(cpath)
if err != nil {
return "", fmt.Errorf("%s", err)
}
defer func() {
_ = c.Close()
}()
nbytes, err := c.WriteString(val)
if err != nil {
return "", fmt.Errorf("%s", err)
}
log.WithFields(log.Fields{"func": "backup.store"}).Debug(fmt.Sprintf("Stored %s in %s with %d bytes", path, fpath, nbytes))
return cpath, nil
}

// arch creates a ZIP archive of the content store() has generated
func arch(based string) (string, error) {
defer func() {
_ = os.RemoveAll(based)
}()
log.WithFields(log.Fields{"func": "backup.arch"}).Debug(fmt.Sprintf("Trying to pack backup into %s.zip", based))
opath := based + ".zip"
err := zip.ArchiveFile(based, opath, func(apath string) {
log.WithFields(log.Fields{"func": "backup.arch"}).Debug(fmt.Sprintf("%s", apath))
})
if err != nil {
return "", fmt.Errorf("Can't create archive or no content to back up: %s", err)
}
return opath, nil
}
16 changes: 8 additions & 8 deletions pkg/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,20 @@ func CountKeysFor(endpoint string, distro types.KubernetesDistro) (int, int, err
log.WithFields(log.Fields{"func": "discovery.CountKeysFor"}).Debug(fmt.Sprintf("Got etcd3 cluster with endpoints %v", c3.Endpoints()))
switch distro {
case types.Vanilla:
err = Visit3(c3, types.KubernetesPrefix, types.Vanilla, func(path string, val string) error {
err = Visit3(c3, types.KubernetesPrefix, "", types.Vanilla, func(path, val string, arg interface{}) error {
numkeys++
totalsize += len(val)
return nil
})
}, "")
if err != nil {
return 0, 0, err
}
case types.OpenShift:
err = Visit3(c3, types.OpenShiftPrefix, types.OpenShift, func(path string, val string) error {
err = Visit3(c3, types.OpenShiftPrefix, "", types.OpenShift, func(path, val string, arg interface{}) error {
numkeys++
totalsize += len(val)
return nil
})
}, "")
if err != nil {
return 0, 0, err
}
Expand All @@ -140,20 +140,20 @@ func CountKeysFor(endpoint string, distro types.KubernetesDistro) (int, int, err
log.WithFields(log.Fields{"func": "discovery.CountKeysFor"}).Debug(fmt.Sprintf("Got etcd2 cluster with %v", c2.Endpoints()))
switch distro {
case types.Vanilla:
err = Visit2(kapi, types.KubernetesPrefix, func(path string, val string) error {
err = Visit2(kapi, types.KubernetesPrefix, "", func(path, val string, arg interface{}) error {
numkeys++
totalsize += len(val)
return nil
})
}, "")
if err != nil {
return 0, 0, err
}
case types.OpenShift:
err = Visit2(kapi, types.OpenShiftPrefix, func(path string, val string) error {
err = Visit2(kapi, types.OpenShiftPrefix, "", func(path, val string, arg interface{}) error {
numkeys++
totalsize += len(val)
return nil
})
}, "")
if err != nil {
return 0, 0, err
}
Expand Down
32 changes: 25 additions & 7 deletions pkg/discovery/visitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package discovery

import (
"fmt"
"os"

log "github.com/Sirupsen/logrus"
"github.com/coreos/etcd/client"
Expand All @@ -13,7 +14,7 @@ import (
// Visit2 recursively visits an etcd2 server from the path given and applies
// the reap function on leaf nodes, that is, keys that don't have sub-keys,
// otherwise descents the tree.
func Visit2(kapi client.KeysAPI, path string, reapfn types.Reap) error {
func Visit2(kapi client.KeysAPI, path, target string, reapfn types.Reap, reapfnName string) error {
log.WithFields(log.Fields{"func": "discovery.Visit2"}).Debug(fmt.Sprintf("Processing %s", path))
res, err := kapi.Get(context.Background(),
path,
Expand All @@ -29,17 +30,23 @@ func Visit2(kapi client.KeysAPI, path string, reapfn types.Reap) error {
log.WithFields(log.Fields{"func": "discovery.Visit2"}).Debug(fmt.Sprintf("%s has %d children", path, len(res.Node.Nodes)))
for _, node := range res.Node.Nodes {
log.WithFields(log.Fields{"func": "discovery.Visit2"}).Debug(fmt.Sprintf("Next I'm going to visit child %s", node.Key))
_ = Visit2(kapi, node.Key, reapfn)
_ = Visit2(kapi, node.Key, target, reapfn, reapfnName)
}
return nil
}
// we're on a leaf node, so apply the reap function:
return reapfn(res.Node.Key, string(res.Node.Value))
switch reapfnName {
case types.ReapFunctionRaw:
return reapfn(res.Node.Key, string(res.Node.Value), target)
case types.ReapFunctionRender:
return reapfn(res.Node.Key, string(res.Node.Value), os.Stdout)
}
return nil
}

// Visit3 visits the given path of an etcd3 server and applies the reap function
// on the keys in the respective range, depending on the Kubernetes distro.
func Visit3(c3 *clientv3.Client, path string, distro types.KubernetesDistro, reapfn types.Reap) error {
func Visit3(c3 *clientv3.Client, path, target string, distro types.KubernetesDistro, reapfn types.Reap, reapfnName string) error {
log.WithFields(log.Fields{"func": "discovery.Visit3"}).Debug(fmt.Sprintf("Processing %s", path))
endkey := ""
if distro == types.Vanilla {
Expand All @@ -55,10 +62,21 @@ func Visit3(c3 *clientv3.Client, path string, distro types.KubernetesDistro, rea
log.WithFields(log.Fields{"func": "discovery.Visit3"}).Debug(fmt.Sprintf("Got %v", res))
for _, ev := range res.Kvs {
log.WithFields(log.Fields{"func": "discovery.Visit3"}).Debug(fmt.Sprintf("key: %s, value: %s", ev.Key, ev.Value))
err = reapfn(string(ev.Key), string(ev.Value))
if err != nil {
return err

// we're on a leaf node, so apply the reap function:
switch reapfnName {
case types.ReapFunctionRaw:
err = reapfn(string(ev.Key), string(ev.Value), target)
if err != nil {
return err
}
case types.ReapFunctionRender:
err = reapfn(string(ev.Key), string(ev.Value), os.Stdout)
if err != nil {
return err
}
}
return nil
}
return nil
}
13 changes: 10 additions & 3 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ const (
OpenShiftPrefixLast = "/openshift.io/zzzzzzzzzz"
// ContentTypeZip represents the content type for a ZIP file
ContentTypeZip = "application/zip"
// ReapFunctionRaw represents the reap function that dumps the values of all keys to disk.
ReapFunctionRaw = "raw"
// ReapFunctionRender represents the reap function that dumps the values of all keys to stdout.
ReapFunctionRender = "render"
// NotADistro represents the fact that no Kubernetes distro-related prefixes exit in etcd
NotADistro KubernetesDistro = iota
// Vanilla represents the vanilla, upstream Kubernetes distribution.
Expand Down Expand Up @@ -67,6 +71,9 @@ type EtcdResponse struct {
EtcdClusterVersion string `json:"etcdcluster"`
}

// Reap function types take a node path and a value as parameters and performs
// some side effect, such as storing, on the node
type Reap func(string, string) error
// Reap function types take a path and a value and perform
// some action on it, for example, storing it to disk or
// writing it to stdout. The arg parameter is optional
// and can be used by the function in a context-dependent way,
// for example, it can specify a directory to write to.
type Reap func(path, value string, arg interface{}) error
Loading

1 comment on commit a5095b6

@mhausenblas
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.