From a15a7e04ec07fe3f943308419cd7e5ccd48148ad Mon Sep 17 00:00:00 2001 From: James Fennell Date: Mon, 13 May 2024 09:45:43 -0400 Subject: [PATCH] Support zstd compression After the recent xz attack it seems reasonable to add another good compression format and potentially offboard from xz. --- config/compression.go | 32 +++++-- config/compression_test.go | 5 +- go.mod | 1 + go.sum | 2 + internal/actions/audit/audit.go | 6 +- internal/storage/astore/astore.go | 3 +- tests/main_test.go | 141 +++++++++++++++++------------- 7 files changed, 118 insertions(+), 72 deletions(-) diff --git a/config/compression.go b/config/compression.go index d22853e..bcc1f33 100644 --- a/config/compression.go +++ b/config/compression.go @@ -6,6 +6,7 @@ import ( "io" "sync" + "github.com/DataDog/zstd" "github.com/jamespfennell/xz" ) @@ -14,13 +15,17 @@ type CompressionFormat int const ( Gzip CompressionFormat = 0 Xz CompressionFormat = 1 + Zstd CompressionFormat = 2 ) -const ExtensionRegex = `gz|xz` +const ExtensionRegex = `gz|xz|zstd` -var allFormats = []CompressionFormat{ - Gzip, - Xz, +func AllCompressionFormats() []CompressionFormat { + return []CompressionFormat{ + Gzip, + Xz, + Zstd, + } } type formatImpl struct { @@ -63,9 +68,24 @@ var xzImpl = formatImpl{ }, } +var zstdImpl = formatImpl{ + id: "zstd", + extension: "zstd", + minLevel: zstd.BestSpeed, + maxLevel: zstd.BestCompression, + defaultLevel: zstd.DefaultCompression, + newReader: func(r io.Reader) (io.ReadCloser, error) { + return zstd.NewReader(r), nil + }, + newWriter: func(w io.Writer, level int) io.WriteCloser { + return zstd.NewWriterLevel(w, level) + }, +} + var formatToImpl = map[CompressionFormat]formatImpl{ Gzip: gzipImpl, Xz: xzImpl, + Zstd: zstdImpl, } func (format *CompressionFormat) impl() formatImpl { @@ -102,7 +122,7 @@ func (format CompressionFormat) MarshalYAML() (interface{}, error) { } func NewFormatFromId(id string) (CompressionFormat, bool) { - for _, format := range allFormats { + for _, format := range AllCompressionFormats() { if format.impl().id == id { return format, true } @@ -111,7 +131,7 @@ func NewFormatFromId(id string) (CompressionFormat, bool) { } func NewFormatFromExtension(extension string) (CompressionFormat, bool) { - for _, format := range allFormats { + for _, format := range AllCompressionFormats() { if format.impl().extension == extension { return format, true } diff --git a/config/compression_test.go b/config/compression_test.go index 618eaaa..a9b636b 100644 --- a/config/compression_test.go +++ b/config/compression_test.go @@ -2,15 +2,16 @@ package config import ( "fmt" - "gopkg.in/yaml.v2" "regexp" "strings" "testing" + + "gopkg.in/yaml.v2" ) func Test_ExtensionRegex(t *testing.T) { var extensionMatcher = regexp.MustCompile(ExtensionRegex) - for _, format := range allFormats { + for _, format := range AllCompressionFormats() { if extensionMatcher.FindStringSubmatch(format.Extension()) == nil { t.Errorf("Extension regex does not support format %v", format) } diff --git a/go.mod b/go.mod index 300fb63..228d84c 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/jamespfennell/hoard go 1.22 require ( + github.com/DataDog/zstd v1.5.5 github.com/jamespfennell/xz v0.1.2 github.com/minio/minio v0.0.0-20210313185243-afbd3e41ebfc github.com/minio/minio-go/v7 v7.0.11-0.20210302210017-6ae69c73ce78 diff --git a/go.sum b/go.sum index 4bdbf18..06a1177 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,8 @@ github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c/go.mod h1:chxPXzS github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= +github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ= +github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/sarama v1.27.2 h1:1EyY1dsxNDUQEv0O/4TsjosHI2CgB1uo9H/v56xzTxc= diff --git a/internal/actions/audit/audit.go b/internal/actions/audit/audit.go index e868933..c3516cf 100644 --- a/internal/actions/audit/audit.go +++ b/internal/actions/audit/audit.go @@ -2,9 +2,9 @@ // // This actions searches for data problems in remote object storage. // Currently, it looks for the following problems: -// - Hours for which there a multiple archive files. These need to be merged. -// - Data stored in one remote replica but not another. This data needs to be copied -// to all replicas. +// - Hours for which there a multiple archive files. These need to be merged. +// - Data stored in one remote replica but not another. This data needs to be copied +// to all replicas. // // The action optionally fixes the problems it encounters. package audit diff --git a/internal/storage/astore/astore.go b/internal/storage/astore/astore.go index c798c41..dee4127 100644 --- a/internal/storage/astore/astore.go +++ b/internal/storage/astore/astore.go @@ -93,7 +93,8 @@ const maxPerMonthPrefixesInSearch = 6 // TODO: document this // TODO: do some very elementary benchmarking -// downloading all data is faster than downloading 7 hours of data :( +// +// downloading all data is faster than downloading 7 hours of data :( func generatePrefixesForSearch(startOpt *hour.Hour, end hour.Hour) []persistence.Prefix { if startOpt == nil { return []persistence.Prefix{persistence.EmptyPrefix()} diff --git a/tests/main_test.go b/tests/main_test.go index 7cbba17..c2def2d 100644 --- a/tests/main_test.go +++ b/tests/main_test.go @@ -5,13 +5,14 @@ import ( "bytes" "compress/gzip" "fmt" - "github.com/jamespfennell/hoard" - "github.com/jamespfennell/hoard/config" - "github.com/jamespfennell/hoard/tests/deps" "io" "os" "reflect" "testing" + + "github.com/jamespfennell/hoard" + "github.com/jamespfennell/hoard/config" + "github.com/jamespfennell/hoard/tests/deps" ) var minioServer1 = &deps.InProcessMinioServer{ @@ -73,73 +74,93 @@ func Test_DownloadPackMerge(t *testing.T) { } func Test_DownloadUploadRetrieve(t *testing.T) { - workspace := newFilesystem(t) - server := newFeedServer(t) - bucketName := newBucket(t, minioServer1) + for _, compressionFormat := range config.AllCompressionFormats() { + t.Run(compressionFormat.String(), func(t *testing.T) { + workspace := newFilesystem(t) + server := newFeedServer(t) + bucketName := newBucket(t, minioServer1) - c := &config.Config{ - WorkspacePath: workspace.String(), - Feeds: []config.Feed{ - { - ID: "feed1_", - Postfix: ".txt", - URL: fmt.Sprintf("http://localhost:%d", server.Port()), - }, - }, - ObjectStorage: []config.ObjectStorage{ - minioServer1.Config(bucketName), - }, - } + c := &config.Config{ + WorkspacePath: workspace.String(), + Feeds: []config.Feed{ + { + ID: "feed1_", + Postfix: ".txt", + URL: fmt.Sprintf("http://localhost:%d", server.Port()), + Compression: config.Compression{ + Format: compressionFormat, + }, + }, + }, + ObjectStorage: []config.ObjectStorage{ + minioServer1.Config(bucketName), + }, + } - retrievePath := newFilesystem(t) - actions := []Action{ - Download, - Pack, - Upload, - Retrieve(retrievePath.String()), - } - requireNilErr(t, ExecuteMany(actions, c)) + retrievePath := newFilesystem(t) + actions := []Action{ + Download, + Pack, + Upload, + Retrieve(retrievePath.String()), + } + requireNilErr(t, ExecuteMany(actions, c)) - verifyLocalFiles(t, retrievePath, server, false) + verifyLocalFiles(t, retrievePath, server, false) + }) + } } func TestDifferentCompressionFormats(t *testing.T) { - server := newFeedServer(t) - bucketName := newBucket(t, minioServer1) + for _, compressionFormat1 := range config.AllCompressionFormats() { + for _, compressionFormat2 := range config.AllCompressionFormats() { + if compressionFormat1 == compressionFormat2 { + continue + } + t.Run(fmt.Sprintf("%s_to_%s", &compressionFormat1, &compressionFormat2), func(t *testing.T) { - config1 := &config.Config{ - WorkspacePath: newFilesystem(t).String(), - Feeds: []config.Feed{ - { - ID: "feed1_", - Postfix: ".txt", - URL: fmt.Sprintf("http://localhost:%d", server.Port()), - }, - }, - ObjectStorage: []config.ObjectStorage{ - minioServer1.Config(bucketName), - }, - } - config2 := replaceCompressionFormat(*config1, config.NewSpecWithLevel(config.Xz, 9)) - config2.WorkspacePath = newFilesystem(t).String() + server := newFeedServer(t) + bucketName := newBucket(t, minioServer1) - actions := []Action{ - Download, - Pack, - Upload, - } - requireNilErr(t, ExecuteMany(actions, config1)) - requireNilErr(t, ExecuteMany(actions, config2)) - requireNilErr(t, ExecuteMany(actions, config1)) + config1 := &config.Config{ + WorkspacePath: newFilesystem(t).String(), + Feeds: []config.Feed{ + { + ID: "feed1_", + Postfix: ".txt", + URL: fmt.Sprintf("http://localhost:%d", server.Port()), + Compression: config.Compression{ + Format: compressionFormat1, + }, + }, + }, + ObjectStorage: []config.ObjectStorage{ + minioServer1.Config(bucketName), + }, + } + config2 := replaceCompressionFormat(*config1, config.Compression{Format: compressionFormat2}) + config2.WorkspacePath = newFilesystem(t).String() - for _, c := range []*config.Config{config1, config2} { - retrievePath := newFilesystem(t) - actions = []Action{ - Retrieve(retrievePath.String()), - } - requireNilErr(t, ExecuteMany(actions, c)) + actions := []Action{ + Download, + Pack, + Upload, + } + requireNilErr(t, ExecuteMany(actions, config1)) + requireNilErr(t, ExecuteMany(actions, config2)) + requireNilErr(t, ExecuteMany(actions, config1)) - verifyLocalFiles(t, retrievePath, server, false) + for _, c := range []*config.Config{config1, config2} { + retrievePath := newFilesystem(t) + actions = []Action{ + Retrieve(retrievePath.String()), + } + requireNilErr(t, ExecuteMany(actions, c)) + + verifyLocalFiles(t, retrievePath, server, false) + } + }) + } } }