diff --git a/cmd/internal/backup/backup.go b/cmd/internal/backup/backup.go index 6f533bd..10c182f 100644 --- a/cmd/internal/backup/backup.go +++ b/cmd/internal/backup/backup.go @@ -1,6 +1,7 @@ package backup import ( + "bytes" "context" "fmt" "io" @@ -105,48 +106,95 @@ func (b *Backuper) CreateBackup(ctx context.Context) error { filename := path.Base(backupFilePath) + b.comp.Extension() + // pipe to compress and buffer compressed data reader1, writer1 := io.Pipe() + compressErr := make(chan error, 1) + compressBuffer := &bytes.Buffer{} go func() { defer writer1.Close() + defer close(compressErr) + err := b.comp.Compress(ctx, backupFilePath, writer1) if err != nil { b.metrics.CountError("compress") b.log.Error("error compressing backup", "error", err) + compressErr <- err + } else { + compressErr <- nil + } + }() + + // buffer compressed data in order to prevent deadlock of pipe and error-handling + go func() { + _, err := io.Copy(compressBuffer, reader1) + if err != nil { + b.metrics.CountError("buffering") + b.log.Error("error buffering compressed data", "error", err) } }() + err = <-compressErr + if err != nil { + return fmt.Errorf("error compressing backup: %w", err) + } + b.log.Info("compressed backup") if b.encrypter != nil { filename = filename + encryption.Suffix } + // pipe to encrypt and buffer encrypted data reader2, writer2 := io.Pipe() + encryptErr := make(chan error) + encryptBuffer := &bytes.Buffer{} go func() { defer writer2.Close() + defer close(encryptErr) + if b.encrypter != nil { - err = b.encrypter.Encrypt(reader1, writer2) + err = b.encrypter.Encrypt(compressBuffer, writer2) if err != nil { b.metrics.CountError("encrypt") b.log.Error("error encrypting backup", "error", err) + encryptErr <- err + } else { + encryptErr <- nil } } else { - _, err = io.Copy(writer2, reader1) + _, err = io.Copy(writer2, compressBuffer) if err != nil { b.metrics.CountError("streaming") b.log.Error("error copying backup", "error", err) + encryptErr <- err + } else { + encryptErr <- nil } } }() - countingReader := &CountingReader{Reader: reader2} + // buffer compressed data in order to prevent deadlock of pipe and error-handling + go func() { + _, err := io.Copy(encryptBuffer, reader2) + if err != nil { + b.metrics.CountError("buffering") + b.log.Error("error buffering compressed data", "error", err) + } + }() + + err = <-encryptErr + if err != nil { + return fmt.Errorf("error encrypting backup: %w", err) + } + + countingReader := &CountingReader{Reader: encryptBuffer} err = b.bp.UploadBackup(ctx, countingReader, filename) if err != nil { b.metrics.CountError("upload") return fmt.Errorf("error uploading backup: %w", err) } - b.log.Info("uploaded backup to backup provider bucket") + b.log.Info("uploaded backup to backup provider bucket", "size", countingReader.BytesRead) b.metrics.CountBackup(countingReader.BytesRead) @@ -161,11 +209,13 @@ func (b *Backuper) CreateBackup(ctx context.Context) error { return nil } +// CountingReader is a wrapper around io.Reader that counts the number of bytes read type CountingReader struct { io.Reader BytesRead float64 } +// Read reads from the underlying reader and counts the number of bytes read func (r *CountingReader) Read(p []byte) (int, error) { n, err := r.Reader.Read(p) r.BytesRead += float64(n) diff --git a/cmd/internal/compress/compress.go b/cmd/internal/compress/compress.go index a4378b4..f85e965 100644 --- a/cmd/internal/compress/compress.go +++ b/cmd/internal/compress/compress.go @@ -49,7 +49,7 @@ func (c *Compressor) Compress(ctx context.Context, backupFilePath string, output } err = c.compressor.Archive(ctx, outputWriter, files) if err != nil { - fmt.Printf("error while compressing file in compressor: %v", err) + return fmt.Errorf("error while compressing file in compressor: %v", err) } return nil } diff --git a/cmd/internal/encryption/encryption.go b/cmd/internal/encryption/encryption.go index 55a087c..3d3ff82 100644 --- a/cmd/internal/encryption/encryption.go +++ b/cmd/internal/encryption/encryption.go @@ -74,11 +74,11 @@ func (e *Encrypter) Encrypt(inputReader io.Reader, outputWriter io.Writer) error // Decrypt input file with key and store decrypted result with suffix removed // if input does not end with suffix, it is assumed that the file was not encrypted. -func (e *Encrypter) Decrypt(inputReader io.Reader, outputWriter io.Writer) (io.Writer, error) { +func (e *Encrypter) Decrypt(inputReader io.Reader, outputWriter io.Writer) error { block, err := e.createCipher() if err != nil { - return nil, err + return err } // Erstelle einen Puffer, der den gesamten Input speichert @@ -87,18 +87,18 @@ func (e *Encrypter) Decrypt(inputReader io.Reader, outputWriter io.Writer) (io.W // Kopiere die Daten aus inputReader in den Puffer _, err = io.Copy(&buf, inputReader) if err != nil { - return nil, err + return err } iv, msgLen, err := e.readIVAndMessageLength(buf, block) if err != nil { - return nil, err + return err } if err := e.decryptFile(bytes.NewReader(buf.Bytes()), outputWriter, block, iv, msgLen); err != nil { - return nil, err + return err } - return outputWriter, nil + return nil } func isASCII(s string) bool { diff --git a/cmd/internal/encryption/encryption_test.go b/cmd/internal/encryption/encryption_test.go index 2bf4b16..186f1af 100644 --- a/cmd/internal/encryption/encryption_test.go +++ b/cmd/internal/encryption/encryption_test.go @@ -44,7 +44,7 @@ func TestEncrypter(t *testing.T) { outputDecrypted, err := fs.Create("decrypted") //Decrypt files - _, err = e.Decrypt(inputDecrypted, outputDecrypted) + err = e.Decrypt(inputDecrypted, outputDecrypted) require.NoError(t, err) cleartext, err := afero.ReadFile(fs, outputDecrypted.Name()) @@ -68,7 +68,7 @@ func TestEncrypter(t *testing.T) { require.NoError(t, err) outputBigDec, err := fs.Create("decrypted_big.test.aes") require.NoError(t, err) - _, err = e.Decrypt(inputBigDec, outputBigDec) + err = e.Decrypt(inputBigDec, outputBigDec) require.NoError(t, err) fs.Remove(input.Name()) diff --git a/cmd/internal/initializer/initializer.go b/cmd/internal/initializer/initializer.go index 2cc6585..59c72e8 100644 --- a/cmd/internal/initializer/initializer.go +++ b/cmd/internal/initializer/initializer.go @@ -1,6 +1,7 @@ package initializer import ( + "bytes" "context" "fmt" "io" @@ -213,26 +214,77 @@ func (i *Initializer) Restore(ctx context.Context, version *providers.BackupVers return fmt.Errorf("could not delete priorly downloaded file: %w", err) } - backupFile, err := os.Create(backupFilePath) + // pipe to download and buffer downloaded data reader1, writer1 := io.Pipe() - err = i.bp.DownloadBackup(ctx, version, writer1) + downloadErr := make(chan error, 1) + downloadBuffer := &bytes.Buffer{} + go func() { + defer writer1.Close() + defer close(downloadErr) + + err := i.bp.DownloadBackup(ctx, version, writer1) + if err != nil { + i.metrics.CountError("download") + i.log.Error("error downloading backup", "error", err) + downloadErr <- err + } else { + downloadErr <- nil + } + + }() + go func() { + _, err := io.Copy(downloadBuffer, reader1) + if err != nil { + i.metrics.CountError("buffering") + i.log.Error("error buffering downloaded data", "error", err) + } + }() + + err := <-downloadErr if err != nil { - return fmt.Errorf("unable to download backup: %w", err) + return fmt.Errorf("error downloading backup: %w", err) } - if i.encrypter != nil { - if encryption.IsEncrypted(backupFilePath) { - _, err = i.encrypter.Decrypt(reader1, backupFile) + i.currentStatus.Message = "decrypting backup" + + // pipe to decrypt and buffer decrypted data + reader2, writer2 := io.Pipe() + decryptErr := make(chan error, 1) + decryptBuffer := &bytes.Buffer{} + go func() { + defer writer2.Close() + defer close(decryptErr) + + if i.encrypter != nil { + err = i.encrypter.Decrypt(downloadBuffer, writer2) if err != nil { - return fmt.Errorf("unable to decrypt backup: %w", err) + i.metrics.CountError("decrypt") + i.log.Error("error decrypting backup", "error", err) + decryptErr <- err + } else { + decryptErr <- nil } } else { + io.Copy(writer2, downloadBuffer) i.log.Info("restoring unencrypted backup with configured encryption - skipping decryption...") + decryptErr <- nil } + }() + go func() { + _, err := io.Copy(decryptBuffer, reader2) + if err != nil { + i.metrics.CountError("streaming") + i.log.Error("error streaming decrypted data", "error", err) + } + }() + + err = <-decryptErr + if err != nil { + return fmt.Errorf("error decrypting backup: %w", err) } i.currentStatus.Message = "uncompressing backup" - err = i.comp.Decompress(ctx, reader1) + err = i.comp.Decompress(ctx, decryptBuffer) if err != nil { return fmt.Errorf("unable to uncompress backup: %w", err) } diff --git a/cmd/main.go b/cmd/main.go index cd42ec0..c1b8a89 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -154,6 +154,7 @@ var startCmd = &cobra.Command{ logger.Info("starting backup-restore-sidecar", "version", v.V, "bind-addr", addr) comp, err := compress.New(viper.GetString(compressionMethod)) + logger.Info(comp.Extension()) if err != nil { return err } @@ -318,7 +319,7 @@ var downloadBackupCmd = &cobra.Command{ if encrypter != nil { if encryption.IsEncrypted(backup.GetBackup().GetName()) { - _, err = encrypter.Decrypt(reader1, outputFile) + err = encrypter.Decrypt(reader1, outputFile) if err != nil { return fmt.Errorf("unable to decrypt backup: %w", err) }