Skip to content

Commit

Permalink
Support zstd compression
Browse files Browse the repository at this point in the history
After the recent xz attack it seems reasonable to add another good compression
format and potentially offboard from xz.
  • Loading branch information
jamespfennell committed May 13, 2024
1 parent cf488bc commit a15a7e0
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 72 deletions.
32 changes: 26 additions & 6 deletions config/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"sync"

"github.com/DataDog/zstd"
"github.com/jamespfennell/xz"
)

Expand All @@ -14,13 +15,17 @@ type CompressionFormat int
const (
Gzip CompressionFormat = 0
Xz CompressionFormat = 1
Zstd CompressionFormat = 2
)

const ExtensionRegex = `gz|xz`
const ExtensionRegex = `gz|xz|zstd`

var allFormats = []CompressionFormat{
Gzip,
Xz,
func AllCompressionFormats() []CompressionFormat {
return []CompressionFormat{
Gzip,
Xz,
Zstd,
}
}

type formatImpl struct {
Expand Down Expand Up @@ -63,9 +68,24 @@ var xzImpl = formatImpl{
},
}

var zstdImpl = formatImpl{
id: "zstd",
extension: "zstd",
minLevel: zstd.BestSpeed,
maxLevel: zstd.BestCompression,
defaultLevel: zstd.DefaultCompression,
newReader: func(r io.Reader) (io.ReadCloser, error) {
return zstd.NewReader(r), nil
},
newWriter: func(w io.Writer, level int) io.WriteCloser {
return zstd.NewWriterLevel(w, level)
},
}

var formatToImpl = map[CompressionFormat]formatImpl{
Gzip: gzipImpl,
Xz: xzImpl,
Zstd: zstdImpl,
}

func (format *CompressionFormat) impl() formatImpl {
Expand Down Expand Up @@ -102,7 +122,7 @@ func (format CompressionFormat) MarshalYAML() (interface{}, error) {
}

func NewFormatFromId(id string) (CompressionFormat, bool) {
for _, format := range allFormats {
for _, format := range AllCompressionFormats() {
if format.impl().id == id {
return format, true
}
Expand All @@ -111,7 +131,7 @@ func NewFormatFromId(id string) (CompressionFormat, bool) {
}

func NewFormatFromExtension(extension string) (CompressionFormat, bool) {
for _, format := range allFormats {
for _, format := range AllCompressionFormats() {
if format.impl().extension == extension {
return format, true
}
Expand Down
5 changes: 3 additions & 2 deletions config/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package config

import (
"fmt"
"gopkg.in/yaml.v2"
"regexp"
"strings"
"testing"

"gopkg.in/yaml.v2"
)

func Test_ExtensionRegex(t *testing.T) {
var extensionMatcher = regexp.MustCompile(ExtensionRegex)
for _, format := range allFormats {
for _, format := range AllCompressionFormats() {
if extensionMatcher.FindStringSubmatch(format.Extension()) == nil {
t.Errorf("Extension regex does not support format %v", format)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/jamespfennell/hoard
go 1.22

require (
github.com/DataDog/zstd v1.5.5
github.com/jamespfennell/xz v0.1.2
github.com/minio/minio v0.0.0-20210313185243-afbd3e41ebfc
github.com/minio/minio-go/v7 v7.0.11-0.20210302210017-6ae69c73ce78
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c/go.mod h1:chxPXzS
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ=
github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/sarama v1.27.2 h1:1EyY1dsxNDUQEv0O/4TsjosHI2CgB1uo9H/v56xzTxc=
Expand Down
6 changes: 3 additions & 3 deletions internal/actions/audit/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
//
// This actions searches for data problems in remote object storage.
// Currently, it looks for the following problems:
// - Hours for which there a multiple archive files. These need to be merged.
// - Data stored in one remote replica but not another. This data needs to be copied
// to all replicas.
// - Hours for which there a multiple archive files. These need to be merged.
// - Data stored in one remote replica but not another. This data needs to be copied
// to all replicas.
//
// The action optionally fixes the problems it encounters.
package audit
Expand Down
3 changes: 2 additions & 1 deletion internal/storage/astore/astore.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ const maxPerMonthPrefixesInSearch = 6

// TODO: document this
// TODO: do some very elementary benchmarking
// downloading all data is faster than downloading 7 hours of data :(
//
// downloading all data is faster than downloading 7 hours of data :(
func generatePrefixesForSearch(startOpt *hour.Hour, end hour.Hour) []persistence.Prefix {
if startOpt == nil {
return []persistence.Prefix{persistence.EmptyPrefix()}
Expand Down
141 changes: 81 additions & 60 deletions tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"bytes"
"compress/gzip"
"fmt"
"github.com/jamespfennell/hoard"
"github.com/jamespfennell/hoard/config"
"github.com/jamespfennell/hoard/tests/deps"
"io"
"os"
"reflect"
"testing"

"github.com/jamespfennell/hoard"
"github.com/jamespfennell/hoard/config"
"github.com/jamespfennell/hoard/tests/deps"
)

var minioServer1 = &deps.InProcessMinioServer{
Expand Down Expand Up @@ -73,73 +74,93 @@ func Test_DownloadPackMerge(t *testing.T) {
}

func Test_DownloadUploadRetrieve(t *testing.T) {
workspace := newFilesystem(t)
server := newFeedServer(t)
bucketName := newBucket(t, minioServer1)
for _, compressionFormat := range config.AllCompressionFormats() {
t.Run(compressionFormat.String(), func(t *testing.T) {
workspace := newFilesystem(t)
server := newFeedServer(t)
bucketName := newBucket(t, minioServer1)

c := &config.Config{
WorkspacePath: workspace.String(),
Feeds: []config.Feed{
{
ID: "feed1_",
Postfix: ".txt",
URL: fmt.Sprintf("http://localhost:%d", server.Port()),
},
},
ObjectStorage: []config.ObjectStorage{
minioServer1.Config(bucketName),
},
}
c := &config.Config{
WorkspacePath: workspace.String(),
Feeds: []config.Feed{
{
ID: "feed1_",
Postfix: ".txt",
URL: fmt.Sprintf("http://localhost:%d", server.Port()),
Compression: config.Compression{
Format: compressionFormat,
},
},
},
ObjectStorage: []config.ObjectStorage{
minioServer1.Config(bucketName),
},
}

retrievePath := newFilesystem(t)
actions := []Action{
Download,
Pack,
Upload,
Retrieve(retrievePath.String()),
}
requireNilErr(t, ExecuteMany(actions, c))
retrievePath := newFilesystem(t)
actions := []Action{
Download,
Pack,
Upload,
Retrieve(retrievePath.String()),
}
requireNilErr(t, ExecuteMany(actions, c))

verifyLocalFiles(t, retrievePath, server, false)
verifyLocalFiles(t, retrievePath, server, false)
})
}
}

func TestDifferentCompressionFormats(t *testing.T) {
server := newFeedServer(t)
bucketName := newBucket(t, minioServer1)
for _, compressionFormat1 := range config.AllCompressionFormats() {
for _, compressionFormat2 := range config.AllCompressionFormats() {
if compressionFormat1 == compressionFormat2 {
continue
}
t.Run(fmt.Sprintf("%s_to_%s", &compressionFormat1, &compressionFormat2), func(t *testing.T) {

config1 := &config.Config{
WorkspacePath: newFilesystem(t).String(),
Feeds: []config.Feed{
{
ID: "feed1_",
Postfix: ".txt",
URL: fmt.Sprintf("http://localhost:%d", server.Port()),
},
},
ObjectStorage: []config.ObjectStorage{
minioServer1.Config(bucketName),
},
}
config2 := replaceCompressionFormat(*config1, config.NewSpecWithLevel(config.Xz, 9))
config2.WorkspacePath = newFilesystem(t).String()
server := newFeedServer(t)
bucketName := newBucket(t, minioServer1)

actions := []Action{
Download,
Pack,
Upload,
}
requireNilErr(t, ExecuteMany(actions, config1))
requireNilErr(t, ExecuteMany(actions, config2))
requireNilErr(t, ExecuteMany(actions, config1))
config1 := &config.Config{
WorkspacePath: newFilesystem(t).String(),
Feeds: []config.Feed{
{
ID: "feed1_",
Postfix: ".txt",
URL: fmt.Sprintf("http://localhost:%d", server.Port()),
Compression: config.Compression{
Format: compressionFormat1,
},
},
},
ObjectStorage: []config.ObjectStorage{
minioServer1.Config(bucketName),
},
}
config2 := replaceCompressionFormat(*config1, config.Compression{Format: compressionFormat2})
config2.WorkspacePath = newFilesystem(t).String()

for _, c := range []*config.Config{config1, config2} {
retrievePath := newFilesystem(t)
actions = []Action{
Retrieve(retrievePath.String()),
}
requireNilErr(t, ExecuteMany(actions, c))
actions := []Action{
Download,
Pack,
Upload,
}
requireNilErr(t, ExecuteMany(actions, config1))
requireNilErr(t, ExecuteMany(actions, config2))
requireNilErr(t, ExecuteMany(actions, config1))

verifyLocalFiles(t, retrievePath, server, false)
for _, c := range []*config.Config{config1, config2} {
retrievePath := newFilesystem(t)
actions = []Action{
Retrieve(retrievePath.String()),
}
requireNilErr(t, ExecuteMany(actions, c))

verifyLocalFiles(t, retrievePath, server, false)
}
})
}
}
}

Expand Down

0 comments on commit a15a7e0

Please sign in to comment.