Skip to content

Commit

Permalink
Add MySQL support to backfill script (#2081)
Browse files Browse the repository at this point in the history
* Add functional tests for backfill-redis script

Signed-off-by: Colleen Murphy <[email protected]>

* Add MySQL support to backfill script

Rename backfill-redis to backfill-index and add support for when MySQL
is used as the index storage backend.

Some Redis-specific parameters are renamed to clearly differentiate them
from MySQL parameters. MySQL connection parameters mirror the
rekor-server paramters, using a single --dsn flag instead of separate
host, port, password, etc flags.

Signed-off-by: Colleen Murphy <[email protected]>

* Update backfill test for MySQL backend

Signed-off-by: Colleen Murphy <[email protected]>

---------

Signed-off-by: Colleen Murphy <[email protected]>
  • Loading branch information
cmurphy authored Apr 30, 2024
1 parent a29c120 commit 5d0bb6e
Show file tree
Hide file tree
Showing 9 changed files with 544 additions and 76 deletions.
33 changes: 32 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
make ko-local
docker run --rm $(cat rekorImagerefs) version
docker run --rm $(cat cliImagerefs) version
docker run --rm $(cat redisImagerefs) --version
docker run --rm $(cat indexImagerefs) --version
e2e:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -118,6 +118,37 @@ jobs:
files: /tmp/rekor-merged.cov,/tmp/pkg-rekor-merged.cov
flags: e2etests

backfill:
runs-on: ubuntu-latest
needs: build

steps:
- uses: actions/checkout@1d96c772d19495a3b5c517cd2bc0cb401ea0529f # v4.1.3
- name: Extract version of Go to use
run: echo "GOVERSION=$(cat Dockerfile|grep golang | awk ' { print $2 } ' | cut -d '@' -f 1 | cut -d ':' -f 2 | uniq)" >> $GITHUB_ENV
- uses: actions/setup-go@0c52d547c9bc32b1aa3301fd7a9cb496313a4491 # v5.0.0
with:
go-version: ${{ env.GOVERSION }}
- name: Install backfill test dependencies
run: |
go install ./cmd/rekor-cli
sudo add-apt-repository ppa:savoury1/minisign && sudo apt-get update && sudo apt-get install minisign
sudo apt install redis-tools default-mysql-client -y
- name: Backfill test redis
run: ./tests/backfill-test.sh
env:
INDEX_BACKEND: redis
- name: Backfill test mysql
run: ./tests/backfill-test.sh
env:
INDEX_BACKEND: mysql
- name: Upload logs if they exist
uses: actions/upload-artifact@1746f4ab65b179e0ea60a494b83293b640dd5bba # v4.3.2
if: failure()
with:
name: E2E Docker Compose logs
path: /tmp/docker-compose.log

sharding-e2e:
runs-on: ubuntu-latest
needs: build
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ trillianServerImagerefs
trillianSignerImagerefs
rekorImagerefs
cliImagerefs
redisImagerefs
indexImagerefs
cosign.*
signature
rekor.pub
Expand Down
4 changes: 2 additions & 2 deletions .ko.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ builds:
- -extldflags "-static"
- "{{ .Env.LDFLAGS }}"

- id: backfill-redis
- id: backfill-index
dir: .
main: ./cmd/backfill-redis
main: ./cmd/backfill-index
env:
- CGO_ENABLED=0
flags:
Expand Down
12 changes: 6 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ rekor-cli: $(SRCS)
rekor-server: $(SRCS)
CGO_ENABLED=0 go build -trimpath -ldflags "$(SERVER_LDFLAGS)" -o rekor-server ./cmd/rekor-server

backfill-redis: $(SRCS)
CGO_ENABLED=0 go build -trimpath -ldflags "$(SERVER_LDFLAGS)" -o backfill-redis ./cmd/backfill-redis
backfill-index: $(SRCS)
CGO_ENABLED=0 go build -trimpath -ldflags "$(SERVER_LDFLAGS)" -o backfill-index ./cmd/backfill-index

test:
go test ./...
Expand Down Expand Up @@ -120,11 +120,11 @@ ko:
--platform=all --tags $(GIT_VERSION) --tags $(GIT_HASH) \
--image-refs rekorCliImagerefs github.com/sigstore/rekor/cmd/rekor-cli

# backfill-redis
# backfill-index
LDFLAGS="$(SERVER_LDFLAGS)" GIT_HASH=$(GIT_HASH) GIT_VERSION=$(GIT_VERSION) \
ko publish --base-import-paths \
--platform=all --tags $(GIT_VERSION) --tags $(GIT_HASH) \
--image-refs bRedisImagerefs github.com/sigstore/rekor/cmd/backfill-redis
--image-refs bIndexImagerefs github.com/sigstore/rekor/cmd/backfill-index

deploy:
LDFLAGS="$(SERVER_LDFLAGS)" GIT_HASH=$(GIT_HASH) GIT_VERSION=$(GIT_VERSION) ko apply -f config/
Expand Down Expand Up @@ -154,8 +154,8 @@ ko-local:

KO_DOCKER_REPO=ko.local LDFLAGS="$(SERVER_LDFLAGS)" GIT_HASH=$(GIT_HASH) GIT_VERSION=$(GIT_VERSION) \
ko publish --base-import-paths \
--tags $(GIT_VERSION) --tags $(GIT_HASH) --image-refs redisImagerefs \
github.com/sigstore/rekor/cmd/backfill-redis
--tags $(GIT_VERSION) --tags $(GIT_HASH) --image-refs indexImagerefs \
github.com/sigstore/rekor/cmd/backfill-index

.PHONY: fuzz
# This runs the fuzz tests for a short period of time to ensure they don't crash.
Expand Down
182 changes: 124 additions & 58 deletions cmd/backfill-redis/main.go → cmd/backfill-index/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 The Sigstore Authors.
// Copyright 2024 The Sigstore Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -13,13 +13,14 @@
// limitations under the License.

/*
backfill-redis is a script to populate the Redis index with entries
from Rekor. This is sometimes necessary because Redis caching is best
effort. If Redis returns an error, Rekor will not, and so sometimes
we need to backfill missing entries into Redis for the search API.
backfill-index is a script to populate the index storage database with
entries from Rekor. This is sometimes necessary because caching is
best effort. If Redis returns an error, the database will not, and so sometimes
we need to backfill missing entries into the database for the search API.
It can also be used to populate an index storage backend from scratch.
To run:
go run cmd/backfill-redis/main.go --rekor-address <address> \
go run cmd/backfill-index/main.go --rekor-address <address> \
--hostname <redis-hostname> --port <redis-port> --concurrency <num-of-workers> \
--start <first index to backfill> --end <last index to backfill> [--dry-run]
*/
Expand All @@ -40,11 +41,14 @@ import (
"syscall"

"github.com/go-openapi/runtime"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
"sigs.k8s.io/release-utils/version"

"github.com/sigstore/rekor/pkg/client"
rekorclient "github.com/sigstore/rekor/pkg/generated/client"
"github.com/sigstore/rekor/pkg/generated/client/entries"
"github.com/sigstore/rekor/pkg/generated/models"
"github.com/sigstore/rekor/pkg/types"
Expand All @@ -64,18 +68,43 @@ import (
_ "github.com/sigstore/rekor/pkg/types/tuf/v0.0.1"
)

const (
mysqlWriteStmt = "INSERT IGNORE INTO EntryIndex (EntryKey, EntryUUID) VALUES (:key, :uuid)"
)

type provider int

const (
providerUnset provider = iota
providerRedis
providerMySQL
)

type indexClient interface {
idempotentAddToIndex(ctx context.Context, key, value string) error
}

type redisClient struct {
client *redis.Client
}

type mysqlClient struct {
client *sqlx.DB
}

var (
redisHostname = flag.String("hostname", "", "Hostname for Redis application")
redisPort = flag.String("port", "", "Port to Redis application")
redisPassword = flag.String("password", "", "Password for Redis authentication")
startIndex = flag.Int("start", -1, "First index to backfill")
endIndex = flag.Int("end", -1, "Last index to backfill")
enableTLS = flag.Bool("enable-tls", false, "Enable TLS for Redis client")
insecureSkipVerify = flag.Bool("insecure-skip-verify", false, "Whether to skip TLS verification for Redis client or not")
rekorAddress = flag.String("rekor-address", "", "Address for Rekor, e.g. https://rekor.sigstore.dev")
versionFlag = flag.Bool("version", false, "Print the current version of Backfill Redis")
concurrency = flag.Int("concurrency", 1, "Number of workers to use for backfill")
dryRun = flag.Bool("dry-run", false, "Dry run - don't actually insert into Redis")
redisHostname = flag.String("redis-hostname", "", "Hostname for Redis application")
redisPort = flag.String("redis-port", "", "Port to Redis application")
redisPassword = flag.String("redis-password", "", "Password for Redis authentication")
redisEnableTLS = flag.Bool("redis-enable-tls", false, "Enable TLS for Redis client")
redisInsecureSkipVerify = flag.Bool("redis-insecure-skip-verify", false, "Whether to skip TLS verification for Redis client or not")
mysqlDSN = flag.String("mysql-dsn", "", "MySQL Data Source Name")
startIndex = flag.Int("start", -1, "First index to backfill")
endIndex = flag.Int("end", -1, "Last index to backfill")
rekorAddress = flag.String("rekor-address", "", "Address for Rekor, e.g. https://rekor.sigstore.dev")
versionFlag = flag.Bool("version", false, "Print the current version of Backfill MySQL")
concurrency = flag.Int("concurrency", 1, "Number of workers to use for backfill")
dryRun = flag.Bool("dry-run", false, "Dry run - don't actually insert into MySQL")
)

func main() {
Expand All @@ -87,11 +116,26 @@ func main() {
os.Exit(0)
}

if *redisHostname == "" {
log.Fatal("address must be set")
provider := providerUnset
if *mysqlDSN != "" && *redisHostname != "" {
log.Fatal("Ambiguous backend address: either mysql-dsn or redis-hostname must be set, but not both")
}
if *redisPort == "" {
log.Fatal("port must be set")
if *mysqlDSN != "" {
provider = providerMySQL
}
if *redisHostname != "" || *redisPort != "" || *redisPassword != "" {
provider = providerRedis
}
if provider == providerUnset {
log.Fatal("Must set mysql-dsn for MySQL or redis-hostname and redis-port for Redis")
}
if provider == providerRedis {
if *redisHostname == "" {
log.Fatal("Redis address must be set")
}
if *redisPort == "" {
log.Fatal("Redis port must be set")
}
}
if *startIndex == -1 {
log.Fatal("start must be set to >=0")
Expand All @@ -103,15 +147,57 @@ func main() {
log.Fatal("rekor-address must be set")
}

log.Printf("running backfill redis Version: %s GitCommit: %s BuildDate: %s", versionInfo.GitVersion, versionInfo.GitCommit, versionInfo.BuildDate)
log.Printf("running backfill index Version: %s GitCommit: %s BuildDate: %s", versionInfo.GitVersion, versionInfo.GitCommit, versionInfo.BuildDate)

redisClient := redisClient()
indexClient, err := getIndexClient(provider)
if err != nil {
log.Fatalf("creating index client: %v", err)
}

rekorClient, err := client.GetRekorClient(*rekorAddress)
if err != nil {
log.Fatalf("creating rekor client: %v", err)
}

err = populate(indexClient, rekorClient)
if err != nil {
log.Fatalf("populating index: %v", err)
}
}

// getIndexClient creates a client for the provided index backend.
func getIndexClient(backend provider) (indexClient, error) {
switch backend {
case providerRedis:
opts := &redis.Options{
Addr: fmt.Sprintf("%s:%s", *redisHostname, *redisPort),
Password: *redisPassword,
Network: "tcp",
DB: 0, // default DB
}
// #nosec G402
if *redisEnableTLS {
opts.TLSConfig = &tls.Config{
InsecureSkipVerify: *redisInsecureSkipVerify, //nolint: gosec
}
}
return &redisClient{client: redis.NewClient(opts)}, nil
case providerMySQL:
dbClient, err := sqlx.Open("mysql", *mysqlDSN)
if err != nil {
return nil, err
}
if err = dbClient.Ping(); err != nil {
return nil, err
}
return &mysqlClient{client: dbClient}, nil
default:
return nil, fmt.Errorf("could not create client for unexpected provider")
}
}

// populate does the heavy lifting of populating the index storage for whichever client is passed in.
func populate(indexClient indexClient, rekorClient *rekorclient.Rekor) error {
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
group, ctx := errgroup.WithContext(ctx)
group.SetLimit(*concurrency)
Expand Down Expand Up @@ -164,14 +250,10 @@ func main() {
continue
}
for _, key := range keys {
// remove the key-value pair from the index in case it already exists
if err := removeFromIndex(ctx, redisClient, key, uuid); err != nil {
insertErrs = append(insertErrs, fmt.Errorf("error removing UUID %s with key %s: %w", uuid, key, err))
}
if err := addToIndex(ctx, redisClient, key, uuid); err != nil {
if err := indexClient.idempotentAddToIndex(ctx, key, uuid); err != nil {
insertErrs = append(insertErrs, fmt.Errorf("error inserting UUID %s with key %s: %w", uuid, key, err))
}
fmt.Printf("Uploaded Redis entry %s, index %d, key %s\n", uuid, index, key)
fmt.Printf("Uploaded entry %s, index %d, key %s\n", uuid, index, key)
}
}
if len(insertErrs) != 0 || len(parseErrs) != 0 {
Expand All @@ -194,37 +276,19 @@ func main() {
return nil
})
}
err = group.Wait()
err := group.Wait()
if err != nil {
log.Fatalf("error running backfill: %v", err)
}
close(resultChan)
fmt.Println("Backfill complete")
if len(parseErrs) > 0 {
fmt.Printf("Failed to parse %d entries: %v\n", len(parseErrs), parseErrs)
return fmt.Errorf("failed to parse %d entries: %v", len(parseErrs), parseErrs)
}
if len(insertErrs) > 0 {
fmt.Printf("Failed to insert/remove %d entries: %v\n", len(insertErrs), insertErrs)
return fmt.Errorf("failed to insert/remove %d entries: %v", len(insertErrs), insertErrs)
}
}

func redisClient() *redis.Client {

opts := &redis.Options{
Addr: fmt.Sprintf("%s:%s", *redisHostname, *redisPort),
Password: *redisPassword,
Network: "tcp",
DB: 0, // default DB
}

// #nosec G402
if *enableTLS {
opts.TLSConfig = &tls.Config{
InsecureSkipVerify: *insecureSkipVerify, //nolint: gosec
}
}

return redis.NewClient(opts)
return nil
}

// unmarshalEntryImpl decodes the base64-encoded entry to a specific entry type (types.EntryImpl).
Expand All @@ -247,21 +311,23 @@ func unmarshalEntryImpl(e string) (types.EntryImpl, string, string, error) {
return entry, pe.Kind(), entry.APIVersion(), nil
}

// removeFromIndex removes all occurrences of a value from a given key. This guards against
// multiple invocations of backfilling creating duplicates.
func removeFromIndex(ctx context.Context, redisClient *redis.Client, key, value string) error {
func (c *redisClient) idempotentAddToIndex(ctx context.Context, key, value string) error {
if *dryRun {
return nil
}
_, err := redisClient.LRem(ctx, key, 0, value).Result()
// remove the key-value pair from the index in case it already exists
_, err := c.client.LRem(ctx, key, 0, value).Result()
if err != nil {
return err
}
_, err = c.client.LPush(ctx, key, value).Result()
return err
}

// addToIndex pushes a value onto a key of type list.
func addToIndex(ctx context.Context, redisClient *redis.Client, key, value string) error {
func (c *mysqlClient) idempotentAddToIndex(ctx context.Context, key, value string) error {
if *dryRun {
return nil
}
_, err := redisClient.LPush(ctx, key, value).Result()
_, err := c.client.NamedExecContext(ctx, mysqlWriteStmt, map[string]any{"key": key, "uuid": value})
return err
}
Loading

0 comments on commit 5d0bb6e

Please sign in to comment.