Skip to content

Commit

Permalink
fix error-handling in go-routines of backup
Browse files Browse the repository at this point in the history
  • Loading branch information
ostempel committed Dec 3, 2024
1 parent 244c193 commit 148ce9b
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 22 deletions.
58 changes: 54 additions & 4 deletions cmd/internal/backup/backup.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package backup

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -105,48 +106,95 @@ func (b *Backuper) CreateBackup(ctx context.Context) error {

filename := path.Base(backupFilePath) + b.comp.Extension()

// pipe to compress and buffer compressed data
reader1, writer1 := io.Pipe()
compressErr := make(chan error, 1)
compressBuffer := &bytes.Buffer{}
go func() {
defer writer1.Close()
defer close(compressErr)

err := b.comp.Compress(ctx, backupFilePath, writer1)
if err != nil {
b.metrics.CountError("compress")
b.log.Error("error compressing backup", "error", err)
compressErr <- err
} else {
compressErr <- nil
}
}()

// buffer compressed data in order to prevent deadlock of pipe and error-handling
go func() {
_, err := io.Copy(compressBuffer, reader1)
if err != nil {
b.metrics.CountError("buffering")
b.log.Error("error buffering compressed data", "error", err)
}
}()

err = <-compressErr
if err != nil {
return fmt.Errorf("error compressing backup: %w", err)
}

b.log.Info("compressed backup")

if b.encrypter != nil {
filename = filename + encryption.Suffix
}

// pipe to encrypt and buffer encrypted data
reader2, writer2 := io.Pipe()
encryptErr := make(chan error)
encryptBuffer := &bytes.Buffer{}
go func() {
defer writer2.Close()
defer close(encryptErr)

if b.encrypter != nil {
err = b.encrypter.Encrypt(reader1, writer2)
err = b.encrypter.Encrypt(compressBuffer, writer2)
if err != nil {
b.metrics.CountError("encrypt")
b.log.Error("error encrypting backup", "error", err)
encryptErr <- err
} else {
encryptErr <- nil
}
} else {
_, err = io.Copy(writer2, reader1)
_, err = io.Copy(writer2, compressBuffer)
if err != nil {
b.metrics.CountError("streaming")
b.log.Error("error copying backup", "error", err)
encryptErr <- err
} else {
encryptErr <- nil
}
}
}()

countingReader := &CountingReader{Reader: reader2}
// buffer compressed data in order to prevent deadlock of pipe and error-handling
go func() {
_, err := io.Copy(encryptBuffer, reader2)
if err != nil {
b.metrics.CountError("buffering")
b.log.Error("error buffering compressed data", "error", err)
}
}()

err = <-encryptErr
if err != nil {
return fmt.Errorf("error encrypting backup: %w", err)
}

countingReader := &CountingReader{Reader: encryptBuffer}
err = b.bp.UploadBackup(ctx, countingReader, filename)
if err != nil {
b.metrics.CountError("upload")
return fmt.Errorf("error uploading backup: %w", err)
}

b.log.Info("uploaded backup to backup provider bucket")
b.log.Info("uploaded backup to backup provider bucket", "size", countingReader.BytesRead)

b.metrics.CountBackup(countingReader.BytesRead)

Expand All @@ -161,11 +209,13 @@ func (b *Backuper) CreateBackup(ctx context.Context) error {
return nil
}

// CountingReader is a wrapper around io.Reader that counts the number of bytes read
type CountingReader struct {
io.Reader
BytesRead float64
}

// Read reads from the underlying reader and counts the number of bytes read
func (r *CountingReader) Read(p []byte) (int, error) {
n, err := r.Reader.Read(p)
r.BytesRead += float64(n)
Expand Down
2 changes: 1 addition & 1 deletion cmd/internal/compress/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (c *Compressor) Compress(ctx context.Context, backupFilePath string, output
}
err = c.compressor.Archive(ctx, outputWriter, files)
if err != nil {
fmt.Printf("error while compressing file in compressor: %v", err)
return fmt.Errorf("error while compressing file in compressor: %v", err)
}
return nil
}
Expand Down
12 changes: 6 additions & 6 deletions cmd/internal/encryption/encryption.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ func (e *Encrypter) Encrypt(inputReader io.Reader, outputWriter io.Writer) error

// Decrypt input file with key and store decrypted result with suffix removed
// if input does not end with suffix, it is assumed that the file was not encrypted.
func (e *Encrypter) Decrypt(inputReader io.Reader, outputWriter io.Writer) (io.Writer, error) {
func (e *Encrypter) Decrypt(inputReader io.Reader, outputWriter io.Writer) error {

block, err := e.createCipher()
if err != nil {
return nil, err
return err
}

// Erstelle einen Puffer, der den gesamten Input speichert
Expand All @@ -87,18 +87,18 @@ func (e *Encrypter) Decrypt(inputReader io.Reader, outputWriter io.Writer) (io.W
// Kopiere die Daten aus inputReader in den Puffer
_, err = io.Copy(&buf, inputReader)
if err != nil {
return nil, err
return err
}

iv, msgLen, err := e.readIVAndMessageLength(buf, block)
if err != nil {
return nil, err
return err
}

if err := e.decryptFile(bytes.NewReader(buf.Bytes()), outputWriter, block, iv, msgLen); err != nil {
return nil, err
return err
}
return outputWriter, nil
return nil
}

func isASCII(s string) bool {
Expand Down
4 changes: 2 additions & 2 deletions cmd/internal/encryption/encryption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestEncrypter(t *testing.T) {
outputDecrypted, err := fs.Create("decrypted")

//Decrypt files
_, err = e.Decrypt(inputDecrypted, outputDecrypted)
err = e.Decrypt(inputDecrypted, outputDecrypted)
require.NoError(t, err)
cleartext, err := afero.ReadFile(fs, outputDecrypted.Name())

Expand All @@ -68,7 +68,7 @@ func TestEncrypter(t *testing.T) {
require.NoError(t, err)
outputBigDec, err := fs.Create("decrypted_big.test.aes")
require.NoError(t, err)
_, err = e.Decrypt(inputBigDec, outputBigDec)
err = e.Decrypt(inputBigDec, outputBigDec)
require.NoError(t, err)

fs.Remove(input.Name())
Expand Down
68 changes: 60 additions & 8 deletions cmd/internal/initializer/initializer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package initializer

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -213,26 +214,77 @@ func (i *Initializer) Restore(ctx context.Context, version *providers.BackupVers
return fmt.Errorf("could not delete priorly downloaded file: %w", err)
}

backupFile, err := os.Create(backupFilePath)
// pipe to download and buffer downloaded data
reader1, writer1 := io.Pipe()
err = i.bp.DownloadBackup(ctx, version, writer1)
downloadErr := make(chan error, 1)
downloadBuffer := &bytes.Buffer{}
go func() {
defer writer1.Close()
defer close(downloadErr)

err := i.bp.DownloadBackup(ctx, version, writer1)
if err != nil {
i.metrics.CountError("download")
i.log.Error("error downloading backup", "error", err)
downloadErr <- err
} else {
downloadErr <- nil
}

}()
go func() {
_, err := io.Copy(downloadBuffer, reader1)
if err != nil {
i.metrics.CountError("buffering")
i.log.Error("error buffering downloaded data", "error", err)
}
}()

err := <-downloadErr
if err != nil {
return fmt.Errorf("unable to download backup: %w", err)
return fmt.Errorf("error downloading backup: %w", err)
}

if i.encrypter != nil {
if encryption.IsEncrypted(backupFilePath) {
_, err = i.encrypter.Decrypt(reader1, backupFile)
i.currentStatus.Message = "decrypting backup"

// pipe to decrypt and buffer decrypted data
reader2, writer2 := io.Pipe()
decryptErr := make(chan error, 1)
decryptBuffer := &bytes.Buffer{}
go func() {
defer writer2.Close()
defer close(decryptErr)

if i.encrypter != nil {
err = i.encrypter.Decrypt(downloadBuffer, writer2)
if err != nil {
return fmt.Errorf("unable to decrypt backup: %w", err)
i.metrics.CountError("decrypt")
i.log.Error("error decrypting backup", "error", err)
decryptErr <- err
} else {
decryptErr <- nil
}
} else {
io.Copy(writer2, downloadBuffer)
i.log.Info("restoring unencrypted backup with configured encryption - skipping decryption...")
decryptErr <- nil
}
}()
go func() {
_, err := io.Copy(decryptBuffer, reader2)
if err != nil {
i.metrics.CountError("streaming")
i.log.Error("error streaming decrypted data", "error", err)
}
}()

err = <-decryptErr
if err != nil {
return fmt.Errorf("error decrypting backup: %w", err)
}

i.currentStatus.Message = "uncompressing backup"
err = i.comp.Decompress(ctx, reader1)
err = i.comp.Decompress(ctx, decryptBuffer)
if err != nil {
return fmt.Errorf("unable to uncompress backup: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ var startCmd = &cobra.Command{
logger.Info("starting backup-restore-sidecar", "version", v.V, "bind-addr", addr)

comp, err := compress.New(viper.GetString(compressionMethod))
logger.Info(comp.Extension())
if err != nil {
return err
}
Expand Down Expand Up @@ -318,7 +319,7 @@ var downloadBackupCmd = &cobra.Command{

if encrypter != nil {
if encryption.IsEncrypted(backup.GetBackup().GetName()) {
_, err = encrypter.Decrypt(reader1, outputFile)
err = encrypter.Decrypt(reader1, outputFile)
if err != nil {
return fmt.Errorf("unable to decrypt backup: %w", err)
}
Expand Down

0 comments on commit 148ce9b

Please sign in to comment.