From ea8b8f1493777f9e303cdad3ad96e84efd0be36c Mon Sep 17 00:00:00 2001
From: Utku Ozdemir
Date: Fri, 10 May 2024 12:08:16 +0200
Subject: [PATCH] feat: implement state storage
On a best-effort basis, store the state on the disk periodically and on shutdown & restore it from the disk on startup.
Additionally, bump Go version, deps & rekres.
Closes siderolabs/discovery-service#54.
Signed-off-by: Utku Ozdemir
---
.dockerignore | 3 +-
.github/workflows/ci.yaml | 35 +-
.golangci.yml | 83 +-
.kres.yaml | 14 +
Dockerfile | 117 +-
Makefile | 58 +-
api/storage/storage.pb.go | 408 +++++++
api/storage/storage.proto | 31 +
api/storage/storage_vtproto.pb.go | 1064 ++++++++++++++++++
cmd/discovery-service/main.go | 27 +
cmd/snapshot-decoder/main.go | 64 ++
go.mod | 13 +-
go.sum | 18 +-
internal/landing/html/index.html | 3 +-
internal/state/affiliate_test.go | 2 +-
internal/state/cluster_test.go | 2 +-
internal/state/snapshot.go | 130 +++
internal/state/storage/protobuf.go | 135 +++
internal/state/storage/storage.go | 397 +++++++
internal/state/storage/storage_bench_test.go | 62 +
internal/state/storage/storage_test.go | 398 +++++++
pkg/server/client_test.go | 6 +-
pkg/server/server_test.go | 6 +-
pkg/server/version_test.go | 2 -
24 files changed, 2928 insertions(+), 150 deletions(-)
create mode 100644 api/storage/storage.pb.go
create mode 100644 api/storage/storage.proto
create mode 100644 api/storage/storage_vtproto.pb.go
create mode 100644 cmd/snapshot-decoder/main.go
create mode 100644 internal/state/snapshot.go
create mode 100644 internal/state/storage/protobuf.go
create mode 100644 internal/state/storage/storage.go
create mode 100644 internal/state/storage/storage_bench_test.go
create mode 100644 internal/state/storage/storage_test.go
diff --git a/.dockerignore b/.dockerignore
index 8e27148..81972d2 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -1,8 +1,9 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
-# Generated on 2023-01-30T15:45:28Z by kres latest.
+# Generated on 2024-05-09T09:59:51Z by kres 1e986af.
*
+!api
!cmd
!internal
!pkg
diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index 9782318..2d70afe 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -1,6 +1,6 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
-# Generated on 2024-03-12T11:34:19Z by kres latest.
+# Generated on 2024-05-21T10:07:54Z by kres 0290180.
name: default
concurrency:
@@ -29,15 +29,6 @@ jobs:
- self-hosted
- generic
if: (!startsWith(github.head_ref, 'renovate/') && !startsWith(github.head_ref, 'dependabot/'))
- services:
- buildkitd:
- image: moby/buildkit:v0.12.5
- options: --privileged
- ports:
- - 1234:1234
- volumes:
- - /var/lib/buildkit/${{ github.repository }}:/var/lib/buildkit
- - /usr/etc/buildkit/buildkitd.toml:/etc/buildkit/buildkitd.toml
steps:
- name: checkout
uses: actions/checkout@v4
@@ -45,11 +36,12 @@ jobs:
run: |
git fetch --prune --unshallow
- name: Set up Docker Buildx
+ id: setup-buildx
uses: docker/setup-buildx-action@v3
with:
driver: remote
- endpoint: tcp://127.0.0.1:1234
- timeout-minutes: 1
+ endpoint: tcp://buildkit-amd64.ci.svc.cluster.local:1234
+ timeout-minutes: 10
- name: base
run: |
make base
@@ -60,8 +52,11 @@ jobs:
run: |
make unit-tests-race
- name: coverage
- run: |
- make coverage
+ uses: codecov/codecov-action@v4
+ with:
+ files: _out/coverage-unit-tests.txt
+ token: ${{ secrets.CODECOV_TOKEN }}
+ timeout-minutes: 3
- name: discovery-service
run: |
make discovery-service
@@ -86,17 +81,20 @@ jobs:
run: |
make image-discovery-service
- name: push-discovery-service-latest
- if: github.event_name != 'pull_request'
+ if: github.event_name != 'pull_request' && github.ref == 'refs/heads/main'
env:
PLATFORM: linux/amd64,linux/arm64
PUSH: "true"
run: |
- make image-discovery-service TAG=latest
+ make image-discovery-service IMAGE_TAG=latest
+ - name: snapshot-decoder
+ run: |
+ make snapshot-decoder
- name: Generate Checksums
if: startsWith(github.ref, 'refs/tags/')
run: |
- sha256sum _out/discovery-service-* > _out/sha256sum.txt
- sha512sum _out/discovery-service-* > _out/sha512sum.txt
+ sha256sum _out/discovery-service-* _out/snapshot-decoder-* > _out/sha256sum.txt
+ sha512sum _out/discovery-service-* _out/snapshot-decoder-* > _out/sha512sum.txt
- name: release-notes
if: startsWith(github.ref, 'refs/tags/')
run: |
@@ -109,4 +107,5 @@ jobs:
draft: "true"
files: |-
_out/discovery-service-*
+ _out/snapshot-decoder-*
_out/sha*.txt
diff --git a/.golangci.yml b/.golangci.yml
index f20e168..13ce6b9 100644
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -1,21 +1,20 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
-# Generated on 2024-03-12T11:34:19Z by kres latest.
+# Generated on 2024-05-19T19:39:10Z by kres dccd292.
# options for analysis running
run:
timeout: 10m
issues-exit-code: 1
tests: true
- build-tags: []
- skip-dirs: []
- skip-dirs-use-default: true
- skip-files: []
+ build-tags: [ ]
modules-download-mode: readonly
# output configuration options
output:
- format: colored-line-number
+ formats:
+ - format: colored-line-number
+ path: stdout
print-issued-lines: true
print-linter-name: true
uniq-by-line: true
@@ -32,54 +31,35 @@ linters-settings:
check-blank: true
exhaustive:
default-signifies-exhaustive: false
- funlen:
- lines: 60
- statements: 40
gci:
- local-prefixes: github.com/siderolabs/discovery-service/
+ sections:
+ - standard # Standard section: captures all standard packages.
+ - default # Default section: contains all imports that could not be matched to another section type.
+ - localmodule # Imports from the same module.
gocognit:
min-complexity: 30
- ireturn:
- allow:
- - anon
- - error
- - empty
- - stdlib
- - github.com\/talos-systems\/kres\/internal\/dag.Node
nestif:
min-complexity: 5
goconst:
min-len: 3
min-occurrences: 3
gocritic:
- disabled-checks: []
+ disabled-checks: [ ]
gocyclo:
min-complexity: 20
godot:
- check-all: false
- godox:
- keywords: # default keywords are TODO, BUG, and FIXME, these can be overwritten by this setting
- - NOTE
- - OPTIMIZE # marks code that should be optimized before merging
- - HACK # marks hack-arounds that should be removed before merging
+ scope: declarations
gofmt:
simplify: true
- goimports:
- local-prefixes: github.com/siderolabs/discovery-service/
- golint:
- min-confidence: 0.8
- gomnd:
- settings: {}
- gomodguard: {}
+ gomodguard: { }
govet:
- check-shadowing: true
enable-all: true
lll:
line-length: 200
tab-width: 4
misspell:
locale: US
- ignore-words: []
+ ignore-words: [ ]
nakedret:
max-func-lines: 30
prealloc:
@@ -88,16 +68,15 @@ linters-settings:
for-loops: false # Report preallocation suggestions on for loops, false by default
nolintlint:
allow-unused: false
- allow-leading-space: false
- allow-no-explanation: []
+ allow-no-explanation: [ ]
require-explanation: false
require-specific: true
- rowserrcheck: {}
- testpackage: {}
+ rowserrcheck: { }
+ testpackage: { }
unparam:
check-exported: false
unused:
- check-exported: false
+ local-variables-are-used: false
whitespace:
multi-if: false # Enforces newlines (or comments) after every multi-line if statement
multi-func: false # Enforces newlines (or comments) after every multi-line function signature
@@ -113,8 +92,8 @@ linters-settings:
gofumpt:
extra-rules: false
cyclop:
- # the maximal code complexity to report
- max-complexity: 20
+ # the maximal code complexity to report
+ max-complexity: 20
# depguard:
# Main:
# deny:
@@ -125,48 +104,50 @@ linters:
disable-all: false
fast: false
disable:
- - exhaustruct
- exhaustivestruct
+ - exhaustruct
+ - err113
- forbidigo
- funlen
- - gas
- gochecknoglobals
- gochecknoinits
- godox
- - goerr113
- gomnd
- gomoddirectives
+ - gosec
+ - inamedparam
- ireturn
+ - mnd
- nestif
- nonamedreturns
- nosnakecase
- paralleltest
+ - tagalign
- tagliatelle
- thelper
- typecheck
- varnamelen
- wrapcheck
- depguard # Disabled because starting with golangci-lint 1.53.0 it doesn't allow denylist alone anymore
- - tagalign
- - inamedparam
- testifylint # complains about our assert recorder and has a number of false positives for assert.Greater(t, thing, 1)
- protogetter # complains about us using Value field on typed spec, instead of GetValue which has a different signature
- perfsprint # complains about us using fmt.Sprintf in non-performance critical code, updating just kres took too long
# abandoned linters for which golangci shows the warning that the repo is archived by the owner
+ - deadcode
+ - golint
+ - ifshort
- interfacer
- maligned
- - golint
- scopelint
- - varcheck
- - deadcode
- structcheck
- - ifshort
+ - varcheck
# disabled as it seems to be broken - goes into imported libraries and reports issues there
- musttag
+ - goimports # same as gci
issues:
- exclude: []
- exclude-rules: []
+ exclude: [ ]
+ exclude-rules: [ ]
exclude-use-default: false
exclude-case-sensitive: false
max-issues-per-linter: 10
diff --git a/.kres.yaml b/.kres.yaml
index 3900766..aaec2d2 100644
--- a/.kres.yaml
+++ b/.kres.yaml
@@ -1,4 +1,9 @@
---
+kind: auto.CommandConfig
+name: snapshot-decoder
+spec:
+ disableImage: true
+---
kind: common.Image
name: image-discovery-service
spec:
@@ -15,6 +20,15 @@ spec:
GOOS: linux
GOARCH: arm64
---
+kind: golang.Generate
+spec:
+ baseSpecPath: /api
+ vtProtobufEnabled: true
+ specs:
+ - source: api/storage/storage.proto
+ subdirectory: storage
+ genGateway: false
+---
kind: service.CodeCov
spec:
targetThreshold: 30
diff --git a/Dockerfile b/Dockerfile
index fbced5d..04d85ae 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,27 +1,28 @@
-# syntax = docker/dockerfile-upstream:1.7.0-labs
+# syntax = docker/dockerfile-upstream:1.7.1-labs
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
-# Generated on 2024-03-12T11:34:19Z by kres latest.
+# Generated on 2024-05-21T10:07:54Z by kres 0290180.
ARG TOOLCHAIN
-# cleaned up specs and compiled versions
-FROM scratch AS generate
+FROM ghcr.io/siderolabs/ca-certificates:v1.7.0 AS image-ca-certificates
-FROM ghcr.io/siderolabs/ca-certificates:v1.6.0 AS image-ca-certificates
-
-FROM ghcr.io/siderolabs/fhs:v1.6.0 AS image-fhs
+FROM ghcr.io/siderolabs/fhs:v1.7.0 AS image-fhs
# runs markdownlint
-FROM docker.io/node:21.7.1-alpine3.19 AS lint-markdown
+FROM docker.io/node:22.2.0-alpine3.19 AS lint-markdown
WORKDIR /src
-RUN npm i -g markdownlint-cli@0.39.0
+RUN npm i -g markdownlint-cli@0.40.0
RUN npm i sentences-per-line@0.2.1
COPY .markdownlint.json .
COPY ./README.md ./README.md
RUN markdownlint --ignore "CHANGELOG.md" --ignore "**/node_modules/**" --ignore '**/hack/chglog/**' --rules node_modules/sentences-per-line/index.js .
+# collects proto specs
+FROM scratch AS proto-specs
+ADD api/storage/storage.proto /api/storage/
+
# base toolchain image
FROM ${TOOLCHAIN} AS toolchain
RUN apk --update --no-cache add bash curl build-base protoc protobuf-dev
@@ -36,6 +37,21 @@ ENV GOTOOLCHAIN ${GOTOOLCHAIN}
ARG GOEXPERIMENT
ENV GOEXPERIMENT ${GOEXPERIMENT}
ENV GOPATH /go
+ARG PROTOBUF_GO_VERSION
+RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install google.golang.org/protobuf/cmd/protoc-gen-go@v${PROTOBUF_GO_VERSION}
+RUN mv /go/bin/protoc-gen-go /bin
+ARG GRPC_GO_VERSION
+RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v${GRPC_GO_VERSION}
+RUN mv /go/bin/protoc-gen-go-grpc /bin
+ARG GRPC_GATEWAY_VERSION
+RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@v${GRPC_GATEWAY_VERSION}
+RUN mv /go/bin/protoc-gen-grpc-gateway /bin
+ARG GOIMPORTS_VERSION
+RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install golang.org/x/tools/cmd/goimports@v${GOIMPORTS_VERSION}
+RUN mv /go/bin/goimports /bin
+ARG VTPROTOBUF_VERSION
+RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install github.com/planetscale/vtprotobuf/cmd/protoc-gen-go-vtproto@v${VTPROTOBUF_VERSION}
+RUN mv /go/bin/protoc-gen-go-vtproto /bin
ARG DEEPCOPY_VERSION
RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install github.com/siderolabs/deep-copy@${DEEPCOPY_VERSION} \
&& mv /go/bin/deep-copy /bin/deep-copy
@@ -44,9 +60,6 @@ RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/g
&& mv /go/bin/golangci-lint /bin/golangci-lint
RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install golang.org/x/vuln/cmd/govulncheck@latest \
&& mv /go/bin/govulncheck /bin/govulncheck
-ARG GOIMPORTS_VERSION
-RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install golang.org/x/tools/cmd/goimports@${GOIMPORTS_VERSION} \
- && mv /go/bin/goimports /bin/goimports
ARG GOFUMPT_VERSION
RUN go install mvdan.cc/gofumpt@${GOFUMPT_VERSION} \
&& mv /go/bin/gofumpt /bin/gofumpt
@@ -59,40 +72,30 @@ COPY go.sum go.sum
RUN cd .
RUN --mount=type=cache,target=/go/pkg go mod download
RUN --mount=type=cache,target=/go/pkg go mod verify
+COPY ./api ./api
COPY ./cmd ./cmd
COPY ./internal ./internal
COPY ./pkg ./pkg
RUN --mount=type=cache,target=/go/pkg go list -mod=readonly all >/dev/null
-# builds discovery-service-linux-amd64
-FROM base AS discovery-service-linux-amd64-build
-COPY --from=generate / /
-WORKDIR /src/cmd/discovery-service
-ARG GO_BUILDFLAGS
-ARG GO_LDFLAGS
-RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOARCH=amd64 GOOS=linux go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS}" -o /discovery-service-linux-amd64
-
-# builds discovery-service-linux-arm64
-FROM base AS discovery-service-linux-arm64-build
-COPY --from=generate / /
-WORKDIR /src/cmd/discovery-service
-ARG GO_BUILDFLAGS
-ARG GO_LDFLAGS
-RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOARCH=arm64 GOOS=linux go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS}" -o /discovery-service-linux-arm64
+# runs protobuf compiler
+FROM tools AS proto-compile
+COPY --from=proto-specs / /
+RUN protoc -I/api --go_out=paths=source_relative:/api --go-grpc_out=paths=source_relative:/api --go-vtproto_out=paths=source_relative:/api --go-vtproto_opt=features=marshal+unmarshal+size+equal+clone /api/storage/storage.proto
+RUN rm /api/storage/storage.proto
+RUN goimports -w -local github.com/siderolabs/discovery-service /api
+RUN gofumpt -w /api
# runs gofumpt
FROM base AS lint-gofumpt
RUN FILES="$(gofumpt -l .)" && test -z "${FILES}" || (echo -e "Source code is not formatted with 'gofumpt -w .':\n${FILES}"; exit 1)
-# runs goimports
-FROM base AS lint-goimports
-RUN FILES="$(goimports -l -local github.com/siderolabs/discovery-service/ .)" && test -z "${FILES}" || (echo -e "Source code is not formatted with 'goimports -w -local github.com/siderolabs/discovery-service/ .':\n${FILES}"; exit 1)
-
# runs golangci-lint
FROM base AS lint-golangci-lint
WORKDIR /src
COPY .golangci.yml .
ENV GOGC 50
+RUN golangci-lint config verify --config .golangci.yml
RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/root/.cache/golangci-lint --mount=type=cache,target=/go/pkg golangci-lint run --config .golangci.yml
# runs govulncheck
@@ -112,14 +115,56 @@ WORKDIR /src
ARG TESTPKGS
RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg --mount=type=cache,target=/tmp go test -v -covermode=atomic -coverprofile=coverage.txt -coverpkg=${TESTPKGS} -count 1 ${TESTPKGS}
+# cleaned up specs and compiled versions
+FROM scratch AS generate
+COPY --from=proto-compile /api/ /api/
+
+FROM scratch AS unit-tests
+COPY --from=unit-tests-run /src/coverage.txt /coverage-unit-tests.txt
+
+# builds discovery-service-linux-amd64
+FROM base AS discovery-service-linux-amd64-build
+COPY --from=generate / /
+WORKDIR /src/cmd/discovery-service
+ARG GO_BUILDFLAGS
+ARG GO_LDFLAGS
+RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOARCH=amd64 GOOS=linux go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS}" -o /discovery-service-linux-amd64
+
+# builds discovery-service-linux-arm64
+FROM base AS discovery-service-linux-arm64-build
+COPY --from=generate / /
+WORKDIR /src/cmd/discovery-service
+ARG GO_BUILDFLAGS
+ARG GO_LDFLAGS
+RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOARCH=arm64 GOOS=linux go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS}" -o /discovery-service-linux-arm64
+
+# builds snapshot-decoder-linux-amd64
+FROM base AS snapshot-decoder-linux-amd64-build
+COPY --from=generate / /
+WORKDIR /src/cmd/snapshot-decoder
+ARG GO_BUILDFLAGS
+ARG GO_LDFLAGS
+RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOARCH=amd64 GOOS=linux go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS}" -o /snapshot-decoder-linux-amd64
+
+# builds snapshot-decoder-linux-arm64
+FROM base AS snapshot-decoder-linux-arm64-build
+COPY --from=generate / /
+WORKDIR /src/cmd/snapshot-decoder
+ARG GO_BUILDFLAGS
+ARG GO_LDFLAGS
+RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOARCH=arm64 GOOS=linux go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS}" -o /snapshot-decoder-linux-arm64
+
FROM scratch AS discovery-service-linux-amd64
COPY --from=discovery-service-linux-amd64-build /discovery-service-linux-amd64 /discovery-service-linux-amd64
FROM scratch AS discovery-service-linux-arm64
COPY --from=discovery-service-linux-arm64-build /discovery-service-linux-arm64 /discovery-service-linux-arm64
-FROM scratch AS unit-tests
-COPY --from=unit-tests-run /src/coverage.txt /coverage-unit-tests.txt
+FROM scratch AS snapshot-decoder-linux-amd64
+COPY --from=snapshot-decoder-linux-amd64-build /snapshot-decoder-linux-amd64 /snapshot-decoder-linux-amd64
+
+FROM scratch AS snapshot-decoder-linux-arm64
+COPY --from=snapshot-decoder-linux-arm64-build /snapshot-decoder-linux-arm64 /snapshot-decoder-linux-arm64
FROM discovery-service-linux-${TARGETARCH} AS discovery-service
@@ -127,6 +172,12 @@ FROM scratch AS discovery-service-all
COPY --from=discovery-service-linux-amd64 / /
COPY --from=discovery-service-linux-arm64 / /
+FROM snapshot-decoder-linux-${TARGETARCH} AS snapshot-decoder
+
+FROM scratch AS snapshot-decoder-all
+COPY --from=snapshot-decoder-linux-amd64 / /
+COPY --from=snapshot-decoder-linux-arm64 / /
+
FROM scratch AS image-discovery-service
ARG TARGETARCH
COPY --from=discovery-service discovery-service-linux-${TARGETARCH} /discovery-service
diff --git a/Makefile b/Makefile
index 5f44ed7..584c25c 100644
--- a/Makefile
+++ b/Makefile
@@ -1,6 +1,6 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
-# Generated on 2024-03-12T11:34:19Z by kres latest.
+# Generated on 2024-05-21T10:07:54Z by kres 0290180.
# common variables
@@ -9,20 +9,23 @@ TAG := $(shell git describe --tag --always --dirty --match v[0-9]\*)
ABBREV_TAG := $(shell git describe --tags >/dev/null 2>/dev/null && git describe --tag --always --match v[0-9]\* --abbrev=0 || echo 'undefined')
BRANCH := $(shell git rev-parse --abbrev-ref HEAD)
ARTIFACTS := _out
+IMAGE_TAG ?= $(TAG)
+OPERATING_SYSTEM := $(shell uname -s | tr '[:upper:]' '[:lower:]')
+GOARCH := $(shell uname -m | sed 's/x86_64/amd64/' | sed 's/aarch64/arm64/')
WITH_DEBUG ?= false
WITH_RACE ?= false
REGISTRY ?= ghcr.io
USERNAME ?= siderolabs
REGISTRY_AND_USERNAME ?= $(REGISTRY)/$(USERNAME)
-PROTOBUF_GO_VERSION ?= 1.33.0
+PROTOBUF_GO_VERSION ?= 1.34.1
GRPC_GO_VERSION ?= 1.3.0
-GRPC_GATEWAY_VERSION ?= 2.19.1
+GRPC_GATEWAY_VERSION ?= 2.20.0
VTPROTOBUF_VERSION ?= 0.6.0
+GOIMPORTS_VERSION ?= 0.21.0
DEEPCOPY_VERSION ?= v0.5.6
-GOLANGCILINT_VERSION ?= v1.56.2
+GOLANGCILINT_VERSION ?= v1.58.2
GOFUMPT_VERSION ?= v0.6.0
-GO_VERSION ?= 1.22.1
-GOIMPORTS_VERSION ?= v0.19.0
+GO_VERSION ?= 1.22.3
GO_BUILDFLAGS ?=
GO_LDFLAGS ?=
CGO_ENABLED ?= 0
@@ -59,9 +62,9 @@ COMMON_ARGS += --build-arg=PROTOBUF_GO_VERSION="$(PROTOBUF_GO_VERSION)"
COMMON_ARGS += --build-arg=GRPC_GO_VERSION="$(GRPC_GO_VERSION)"
COMMON_ARGS += --build-arg=GRPC_GATEWAY_VERSION="$(GRPC_GATEWAY_VERSION)"
COMMON_ARGS += --build-arg=VTPROTOBUF_VERSION="$(VTPROTOBUF_VERSION)"
+COMMON_ARGS += --build-arg=GOIMPORTS_VERSION="$(GOIMPORTS_VERSION)"
COMMON_ARGS += --build-arg=DEEPCOPY_VERSION="$(DEEPCOPY_VERSION)"
COMMON_ARGS += --build-arg=GOLANGCILINT_VERSION="$(GOLANGCILINT_VERSION)"
-COMMON_ARGS += --build-arg=GOIMPORTS_VERSION="$(GOIMPORTS_VERSION)"
COMMON_ARGS += --build-arg=GOFUMPT_VERSION="$(GOFUMPT_VERSION)"
COMMON_ARGS += --build-arg=TESTPKGS="$(TESTPKGS)"
TOOLCHAIN ?= docker.io/golang:1.22-alpine
@@ -110,7 +113,7 @@ If you already have a compatible builder instance, you may use that instead.
## Artifacts
All artifacts will be output to ./$(ARTIFACTS). Images will be tagged with the
-registry "$(REGISTRY)", username "$(USERNAME)", and a dynamic tag (e.g. $(IMAGE):$(TAG)).
+registry "$(REGISTRY)", username "$(USERNAME)", and a dynamic tag (e.g. $(IMAGE):$(IMAGE_TAG)).
The registry and username can be overridden by exporting REGISTRY, and USERNAME
respectively.
@@ -128,7 +131,10 @@ else
GO_LDFLAGS += -s
endif
-all: unit-tests discovery-service image-discovery-service lint
+all: unit-tests discovery-service image-discovery-service snapshot-decoder lint
+
+$(ARTIFACTS): ## Creates artifacts directory.
+ @mkdir -p $(ARTIFACTS)
.PHONY: clean
clean: ## Cleans up all artifacts.
@@ -140,6 +146,9 @@ target-%: ## Builds the specified target defined in the Dockerfile. The build r
local-%: ## Builds the specified target defined in the Dockerfile using the local output type. The build result will be output to the specified local destination.
@$(MAKE) target-$* TARGET_ARGS="--output=type=local,dest=$(DEST) $(TARGET_ARGS)"
+generate: ## Generate .proto definitions.
+ @$(MAKE) local-$@ DEST=./
+
lint-golangci-lint: ## Runs golangci-lint linter.
@$(MAKE) target-$@
@@ -157,9 +166,6 @@ fmt: ## Formats the source code
lint-govulncheck: ## Runs govulncheck linter.
@$(MAKE) target-$@
-lint-goimports: ## Runs goimports linter.
- @$(MAKE) target-$@
-
.PHONY: base
base: ## Prepare base toolchain
@$(MAKE) target-$@
@@ -172,10 +178,6 @@ unit-tests: ## Performs unit tests
unit-tests-race: ## Performs unit tests with race detection enabled.
@$(MAKE) target-$@
-.PHONY: coverage
-coverage: ## Upload coverage data to codecov.io.
- bash -c "bash <(curl -s https://codecov.io/bash) -f $(ARTIFACTS)/coverage-unit-tests.txt -X fix"
-
.PHONY: $(ARTIFACTS)/discovery-service-linux-amd64
$(ARTIFACTS)/discovery-service-linux-amd64:
@$(MAKE) local-discovery-service-linux-amd64 DEST=$(ARTIFACTS)
@@ -198,11 +200,28 @@ lint-markdown: ## Runs markdownlint.
@$(MAKE) target-$@
.PHONY: lint
-lint: lint-golangci-lint lint-gofumpt lint-govulncheck lint-goimports lint-markdown ## Run all linters for the project.
+lint: lint-golangci-lint lint-gofumpt lint-govulncheck lint-markdown ## Run all linters for the project.
.PHONY: image-discovery-service
image-discovery-service: ## Builds image for discovery-service.
- @$(MAKE) target-$@ TARGET_ARGS="--tag=$(REGISTRY)/$(USERNAME)/discovery-service:$(TAG)"
+ @$(MAKE) target-$@ TARGET_ARGS="--tag=$(REGISTRY)/$(USERNAME)/discovery-service:$(IMAGE_TAG)"
+
+.PHONY: $(ARTIFACTS)/snapshot-decoder-linux-amd64
+$(ARTIFACTS)/snapshot-decoder-linux-amd64:
+ @$(MAKE) local-snapshot-decoder-linux-amd64 DEST=$(ARTIFACTS)
+
+.PHONY: snapshot-decoder-linux-amd64
+snapshot-decoder-linux-amd64: $(ARTIFACTS)/snapshot-decoder-linux-amd64 ## Builds executable for snapshot-decoder-linux-amd64.
+
+.PHONY: $(ARTIFACTS)/snapshot-decoder-linux-arm64
+$(ARTIFACTS)/snapshot-decoder-linux-arm64:
+ @$(MAKE) local-snapshot-decoder-linux-arm64 DEST=$(ARTIFACTS)
+
+.PHONY: snapshot-decoder-linux-arm64
+snapshot-decoder-linux-arm64: $(ARTIFACTS)/snapshot-decoder-linux-arm64 ## Builds executable for snapshot-decoder-linux-arm64.
+
+.PHONY: snapshot-decoder
+snapshot-decoder: snapshot-decoder-linux-amd64 snapshot-decoder-linux-arm64 ## Builds executables for snapshot-decoder.
.PHONY: rekres
rekres:
@@ -215,8 +234,7 @@ help: ## This help menu.
@grep -E '^[a-zA-Z%_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
.PHONY: release-notes
-release-notes:
- mkdir -p $(ARTIFACTS)
+release-notes: $(ARTIFACTS)
@ARTIFACTS=$(ARTIFACTS) ./hack/release.sh $@ $(ARTIFACTS)/RELEASE_NOTES.md $(TAG)
.PHONY: conformance
diff --git a/api/storage/storage.pb.go b/api/storage/storage.pb.go
new file mode 100644
index 0000000..1d7ecce
--- /dev/null
+++ b/api/storage/storage.pb.go
@@ -0,0 +1,408 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.33.0
+// protoc v4.24.4
+// source: storage/storage.proto
+
+package storagepb
+
+import (
+ reflect "reflect"
+ sync "sync"
+
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ timestamppb "google.golang.org/protobuf/types/known/timestamppb"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+// StateSnapshot is the snapshot of the discovery service state.
+type StateSnapshot struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Clusters []*ClusterSnapshot `protobuf:"bytes,1,rep,name=clusters,proto3" json:"clusters,omitempty"`
+}
+
+func (x *StateSnapshot) Reset() {
+ *x = StateSnapshot{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_storage_storage_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *StateSnapshot) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*StateSnapshot) ProtoMessage() {}
+
+func (x *StateSnapshot) ProtoReflect() protoreflect.Message {
+ mi := &file_storage_storage_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use StateSnapshot.ProtoReflect.Descriptor instead.
+func (*StateSnapshot) Descriptor() ([]byte, []int) {
+ return file_storage_storage_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *StateSnapshot) GetClusters() []*ClusterSnapshot {
+ if x != nil {
+ return x.Clusters
+ }
+ return nil
+}
+
+// ClusterSnapshot is the snapshot of a cluster with a set of affiliates.
+type ClusterSnapshot struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
+ Affiliates []*AffiliateSnapshot `protobuf:"bytes,2,rep,name=affiliates,proto3" json:"affiliates,omitempty"`
+}
+
+func (x *ClusterSnapshot) Reset() {
+ *x = ClusterSnapshot{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_storage_storage_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *ClusterSnapshot) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ClusterSnapshot) ProtoMessage() {}
+
+func (x *ClusterSnapshot) ProtoReflect() protoreflect.Message {
+ mi := &file_storage_storage_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use ClusterSnapshot.ProtoReflect.Descriptor instead.
+func (*ClusterSnapshot) Descriptor() ([]byte, []int) {
+ return file_storage_storage_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *ClusterSnapshot) GetId() string {
+ if x != nil {
+ return x.Id
+ }
+ return ""
+}
+
+func (x *ClusterSnapshot) GetAffiliates() []*AffiliateSnapshot {
+ if x != nil {
+ return x.Affiliates
+ }
+ return nil
+}
+
+// AffiliateSnapshot is the snapshot of an affiliate that is part of a cluster with a set of endpoints.
+type AffiliateSnapshot struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
+ Expiration *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=expiration,proto3" json:"expiration,omitempty"`
+ Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
+ Endpoints []*EndpointSnapshot `protobuf:"bytes,4,rep,name=endpoints,proto3" json:"endpoints,omitempty"`
+}
+
+func (x *AffiliateSnapshot) Reset() {
+ *x = AffiliateSnapshot{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_storage_storage_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *AffiliateSnapshot) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*AffiliateSnapshot) ProtoMessage() {}
+
+func (x *AffiliateSnapshot) ProtoReflect() protoreflect.Message {
+ mi := &file_storage_storage_proto_msgTypes[2]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use AffiliateSnapshot.ProtoReflect.Descriptor instead.
+func (*AffiliateSnapshot) Descriptor() ([]byte, []int) {
+ return file_storage_storage_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *AffiliateSnapshot) GetId() string {
+ if x != nil {
+ return x.Id
+ }
+ return ""
+}
+
+func (x *AffiliateSnapshot) GetExpiration() *timestamppb.Timestamp {
+ if x != nil {
+ return x.Expiration
+ }
+ return nil
+}
+
+func (x *AffiliateSnapshot) GetData() []byte {
+ if x != nil {
+ return x.Data
+ }
+ return nil
+}
+
+func (x *AffiliateSnapshot) GetEndpoints() []*EndpointSnapshot {
+ if x != nil {
+ return x.Endpoints
+ }
+ return nil
+}
+
+// EndpointSnapshot is the snapshot of an endpoint of an affiliate.
+type EndpointSnapshot struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Expiration *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=expiration,proto3" json:"expiration,omitempty"`
+ Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
+}
+
+func (x *EndpointSnapshot) Reset() {
+ *x = EndpointSnapshot{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_storage_storage_proto_msgTypes[3]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *EndpointSnapshot) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*EndpointSnapshot) ProtoMessage() {}
+
+func (x *EndpointSnapshot) ProtoReflect() protoreflect.Message {
+ mi := &file_storage_storage_proto_msgTypes[3]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use EndpointSnapshot.ProtoReflect.Descriptor instead.
+func (*EndpointSnapshot) Descriptor() ([]byte, []int) {
+ return file_storage_storage_proto_rawDescGZIP(), []int{3}
+}
+
+func (x *EndpointSnapshot) GetExpiration() *timestamppb.Timestamp {
+ if x != nil {
+ return x.Expiration
+ }
+ return nil
+}
+
+func (x *EndpointSnapshot) GetData() []byte {
+ if x != nil {
+ return x.Data
+ }
+ return nil
+}
+
+var File_storage_storage_proto protoreflect.FileDescriptor
+
+var file_storage_storage_proto_rawDesc = []byte{
+ 0x0a, 0x15, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67,
+ 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x18, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e,
+ 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67,
+ 0x65, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
+ 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f,
+ 0x74, 0x6f, 0x22, 0x56, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x74, 0x65, 0x53, 0x6e, 0x61, 0x70, 0x73,
+ 0x68, 0x6f, 0x74, 0x12, 0x45, 0x0a, 0x08, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x18,
+ 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64,
+ 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65,
+ 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74,
+ 0x52, 0x08, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x22, 0x6e, 0x0a, 0x0f, 0x43, 0x6c,
+ 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x0e, 0x0a,
+ 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x4b, 0x0a,
+ 0x0a, 0x61, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28,
+ 0x0b, 0x32, 0x2b, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f,
+ 0x76, 0x65, 0x72, 0x79, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e, 0x41, 0x66, 0x66,
+ 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x52, 0x0a,
+ 0x61, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x73, 0x22, 0xbd, 0x01, 0x0a, 0x11, 0x41,
+ 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74,
+ 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64,
+ 0x12, 0x3a, 0x0a, 0x0a, 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02,
+ 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72,
+ 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70,
+ 0x52, 0x0a, 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04,
+ 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61,
+ 0x12, 0x48, 0x0a, 0x09, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x18, 0x04, 0x20,
+ 0x03, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64, 0x69, 0x73,
+ 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e, 0x45,
+ 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x52,
+ 0x09, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x22, 0x62, 0x0a, 0x10, 0x45, 0x6e,
+ 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x3a,
+ 0x0a, 0x0a, 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01,
+ 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
+ 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0a,
+ 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61,
+ 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x42, 0x37,
+ 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x69, 0x64,
+ 0x65, 0x72, 0x6f, 0x6c, 0x61, 0x62, 0x73, 0x2f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72,
+ 0x79, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74,
+ 0x6f, 0x72, 0x61, 0x67, 0x65, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+ file_storage_storage_proto_rawDescOnce sync.Once
+ file_storage_storage_proto_rawDescData = file_storage_storage_proto_rawDesc
+)
+
+func file_storage_storage_proto_rawDescGZIP() []byte {
+ file_storage_storage_proto_rawDescOnce.Do(func() {
+ file_storage_storage_proto_rawDescData = protoimpl.X.CompressGZIP(file_storage_storage_proto_rawDescData)
+ })
+ return file_storage_storage_proto_rawDescData
+}
+
+var file_storage_storage_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
+var file_storage_storage_proto_goTypes = []interface{}{
+ (*StateSnapshot)(nil), // 0: sidero.discovery.storage.StateSnapshot
+ (*ClusterSnapshot)(nil), // 1: sidero.discovery.storage.ClusterSnapshot
+ (*AffiliateSnapshot)(nil), // 2: sidero.discovery.storage.AffiliateSnapshot
+ (*EndpointSnapshot)(nil), // 3: sidero.discovery.storage.EndpointSnapshot
+ (*timestamppb.Timestamp)(nil), // 4: google.protobuf.Timestamp
+}
+var file_storage_storage_proto_depIdxs = []int32{
+ 1, // 0: sidero.discovery.storage.StateSnapshot.clusters:type_name -> sidero.discovery.storage.ClusterSnapshot
+ 2, // 1: sidero.discovery.storage.ClusterSnapshot.affiliates:type_name -> sidero.discovery.storage.AffiliateSnapshot
+ 4, // 2: sidero.discovery.storage.AffiliateSnapshot.expiration:type_name -> google.protobuf.Timestamp
+ 3, // 3: sidero.discovery.storage.AffiliateSnapshot.endpoints:type_name -> sidero.discovery.storage.EndpointSnapshot
+ 4, // 4: sidero.discovery.storage.EndpointSnapshot.expiration:type_name -> google.protobuf.Timestamp
+ 5, // [5:5] is the sub-list for method output_type
+ 5, // [5:5] is the sub-list for method input_type
+ 5, // [5:5] is the sub-list for extension type_name
+ 5, // [5:5] is the sub-list for extension extendee
+ 0, // [0:5] is the sub-list for field type_name
+}
+
+func init() { file_storage_storage_proto_init() }
+func file_storage_storage_proto_init() {
+ if File_storage_storage_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_storage_storage_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*StateSnapshot); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_storage_storage_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*ClusterSnapshot); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_storage_storage_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*AffiliateSnapshot); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_storage_storage_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*EndpointSnapshot); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_storage_storage_proto_rawDesc,
+ NumEnums: 0,
+ NumMessages: 4,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_storage_storage_proto_goTypes,
+ DependencyIndexes: file_storage_storage_proto_depIdxs,
+ MessageInfos: file_storage_storage_proto_msgTypes,
+ }.Build()
+ File_storage_storage_proto = out.File
+ file_storage_storage_proto_rawDesc = nil
+ file_storage_storage_proto_goTypes = nil
+ file_storage_storage_proto_depIdxs = nil
+}
diff --git a/api/storage/storage.proto b/api/storage/storage.proto
new file mode 100644
index 0000000..cafcc9b
--- /dev/null
+++ b/api/storage/storage.proto
@@ -0,0 +1,31 @@
+syntax = "proto3";
+
+package sidero.discovery.storage;
+option go_package = "github.com/siderolabs/discovery-service/api/storagepb";
+
+import "google/protobuf/timestamp.proto";
+
+// StateSnapshot is the snapshot of the discovery service state.
+message StateSnapshot {
+ repeated ClusterSnapshot clusters = 1;
+}
+
+// ClusterSnapshot is the snapshot of a cluster with a set of affiliates.
+message ClusterSnapshot {
+ string id = 1;
+ repeated AffiliateSnapshot affiliates = 2;
+}
+
+// AffiliateSnapshot is the snapshot of an affiliate that is part of a cluster with a set of endpoints.
+message AffiliateSnapshot {
+ string id = 1;
+ google.protobuf.Timestamp expiration = 2;
+ bytes data = 3;
+ repeated EndpointSnapshot endpoints = 4;
+}
+
+// EndpointSnapshot is the snapshot of an endpoint of an affiliate.
+message EndpointSnapshot {
+ google.protobuf.Timestamp expiration = 1;
+ bytes data = 2;
+}
diff --git a/api/storage/storage_vtproto.pb.go b/api/storage/storage_vtproto.pb.go
new file mode 100644
index 0000000..4ae0b8c
--- /dev/null
+++ b/api/storage/storage_vtproto.pb.go
@@ -0,0 +1,1064 @@
+// Code generated by protoc-gen-go-vtproto. DO NOT EDIT.
+// protoc-gen-go-vtproto version: v0.6.0
+// source: storage/storage.proto
+
+package storagepb
+
+import (
+ fmt "fmt"
+ io "io"
+
+ protohelpers "github.com/planetscale/vtprotobuf/protohelpers"
+ timestamppb1 "github.com/planetscale/vtprotobuf/types/known/timestamppb"
+ proto "google.golang.org/protobuf/proto"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ timestamppb "google.golang.org/protobuf/types/known/timestamppb"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+func (m *StateSnapshot) CloneVT() *StateSnapshot {
+ if m == nil {
+ return (*StateSnapshot)(nil)
+ }
+ r := new(StateSnapshot)
+ if rhs := m.Clusters; rhs != nil {
+ tmpContainer := make([]*ClusterSnapshot, len(rhs))
+ for k, v := range rhs {
+ tmpContainer[k] = v.CloneVT()
+ }
+ r.Clusters = tmpContainer
+ }
+ if len(m.unknownFields) > 0 {
+ r.unknownFields = make([]byte, len(m.unknownFields))
+ copy(r.unknownFields, m.unknownFields)
+ }
+ return r
+}
+
+func (m *StateSnapshot) CloneMessageVT() proto.Message {
+ return m.CloneVT()
+}
+
+func (m *ClusterSnapshot) CloneVT() *ClusterSnapshot {
+ if m == nil {
+ return (*ClusterSnapshot)(nil)
+ }
+ r := new(ClusterSnapshot)
+ r.Id = m.Id
+ if rhs := m.Affiliates; rhs != nil {
+ tmpContainer := make([]*AffiliateSnapshot, len(rhs))
+ for k, v := range rhs {
+ tmpContainer[k] = v.CloneVT()
+ }
+ r.Affiliates = tmpContainer
+ }
+ if len(m.unknownFields) > 0 {
+ r.unknownFields = make([]byte, len(m.unknownFields))
+ copy(r.unknownFields, m.unknownFields)
+ }
+ return r
+}
+
+func (m *ClusterSnapshot) CloneMessageVT() proto.Message {
+ return m.CloneVT()
+}
+
+func (m *AffiliateSnapshot) CloneVT() *AffiliateSnapshot {
+ if m == nil {
+ return (*AffiliateSnapshot)(nil)
+ }
+ r := new(AffiliateSnapshot)
+ r.Id = m.Id
+ r.Expiration = (*timestamppb.Timestamp)((*timestamppb1.Timestamp)(m.Expiration).CloneVT())
+ if rhs := m.Data; rhs != nil {
+ tmpBytes := make([]byte, len(rhs))
+ copy(tmpBytes, rhs)
+ r.Data = tmpBytes
+ }
+ if rhs := m.Endpoints; rhs != nil {
+ tmpContainer := make([]*EndpointSnapshot, len(rhs))
+ for k, v := range rhs {
+ tmpContainer[k] = v.CloneVT()
+ }
+ r.Endpoints = tmpContainer
+ }
+ if len(m.unknownFields) > 0 {
+ r.unknownFields = make([]byte, len(m.unknownFields))
+ copy(r.unknownFields, m.unknownFields)
+ }
+ return r
+}
+
+func (m *AffiliateSnapshot) CloneMessageVT() proto.Message {
+ return m.CloneVT()
+}
+
+func (m *EndpointSnapshot) CloneVT() *EndpointSnapshot {
+ if m == nil {
+ return (*EndpointSnapshot)(nil)
+ }
+ r := new(EndpointSnapshot)
+ r.Expiration = (*timestamppb.Timestamp)((*timestamppb1.Timestamp)(m.Expiration).CloneVT())
+ if rhs := m.Data; rhs != nil {
+ tmpBytes := make([]byte, len(rhs))
+ copy(tmpBytes, rhs)
+ r.Data = tmpBytes
+ }
+ if len(m.unknownFields) > 0 {
+ r.unknownFields = make([]byte, len(m.unknownFields))
+ copy(r.unknownFields, m.unknownFields)
+ }
+ return r
+}
+
+func (m *EndpointSnapshot) CloneMessageVT() proto.Message {
+ return m.CloneVT()
+}
+
+func (this *StateSnapshot) EqualVT(that *StateSnapshot) bool {
+ if this == that {
+ return true
+ } else if this == nil || that == nil {
+ return false
+ }
+ if len(this.Clusters) != len(that.Clusters) {
+ return false
+ }
+ for i, vx := range this.Clusters {
+ vy := that.Clusters[i]
+ if p, q := vx, vy; p != q {
+ if p == nil {
+ p = &ClusterSnapshot{}
+ }
+ if q == nil {
+ q = &ClusterSnapshot{}
+ }
+ if !p.EqualVT(q) {
+ return false
+ }
+ }
+ }
+ return string(this.unknownFields) == string(that.unknownFields)
+}
+
+func (this *StateSnapshot) EqualMessageVT(thatMsg proto.Message) bool {
+ that, ok := thatMsg.(*StateSnapshot)
+ if !ok {
+ return false
+ }
+ return this.EqualVT(that)
+}
+func (this *ClusterSnapshot) EqualVT(that *ClusterSnapshot) bool {
+ if this == that {
+ return true
+ } else if this == nil || that == nil {
+ return false
+ }
+ if this.Id != that.Id {
+ return false
+ }
+ if len(this.Affiliates) != len(that.Affiliates) {
+ return false
+ }
+ for i, vx := range this.Affiliates {
+ vy := that.Affiliates[i]
+ if p, q := vx, vy; p != q {
+ if p == nil {
+ p = &AffiliateSnapshot{}
+ }
+ if q == nil {
+ q = &AffiliateSnapshot{}
+ }
+ if !p.EqualVT(q) {
+ return false
+ }
+ }
+ }
+ return string(this.unknownFields) == string(that.unknownFields)
+}
+
+func (this *ClusterSnapshot) EqualMessageVT(thatMsg proto.Message) bool {
+ that, ok := thatMsg.(*ClusterSnapshot)
+ if !ok {
+ return false
+ }
+ return this.EqualVT(that)
+}
+func (this *AffiliateSnapshot) EqualVT(that *AffiliateSnapshot) bool {
+ if this == that {
+ return true
+ } else if this == nil || that == nil {
+ return false
+ }
+ if this.Id != that.Id {
+ return false
+ }
+ if !(*timestamppb1.Timestamp)(this.Expiration).EqualVT((*timestamppb1.Timestamp)(that.Expiration)) {
+ return false
+ }
+ if string(this.Data) != string(that.Data) {
+ return false
+ }
+ if len(this.Endpoints) != len(that.Endpoints) {
+ return false
+ }
+ for i, vx := range this.Endpoints {
+ vy := that.Endpoints[i]
+ if p, q := vx, vy; p != q {
+ if p == nil {
+ p = &EndpointSnapshot{}
+ }
+ if q == nil {
+ q = &EndpointSnapshot{}
+ }
+ if !p.EqualVT(q) {
+ return false
+ }
+ }
+ }
+ return string(this.unknownFields) == string(that.unknownFields)
+}
+
+func (this *AffiliateSnapshot) EqualMessageVT(thatMsg proto.Message) bool {
+ that, ok := thatMsg.(*AffiliateSnapshot)
+ if !ok {
+ return false
+ }
+ return this.EqualVT(that)
+}
+func (this *EndpointSnapshot) EqualVT(that *EndpointSnapshot) bool {
+ if this == that {
+ return true
+ } else if this == nil || that == nil {
+ return false
+ }
+ if !(*timestamppb1.Timestamp)(this.Expiration).EqualVT((*timestamppb1.Timestamp)(that.Expiration)) {
+ return false
+ }
+ if string(this.Data) != string(that.Data) {
+ return false
+ }
+ return string(this.unknownFields) == string(that.unknownFields)
+}
+
+func (this *EndpointSnapshot) EqualMessageVT(thatMsg proto.Message) bool {
+ that, ok := thatMsg.(*EndpointSnapshot)
+ if !ok {
+ return false
+ }
+ return this.EqualVT(that)
+}
+func (m *StateSnapshot) MarshalVT() (dAtA []byte, err error) {
+ if m == nil {
+ return nil, nil
+ }
+ size := m.SizeVT()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalToSizedBufferVT(dAtA[:size])
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *StateSnapshot) MarshalToVT(dAtA []byte) (int, error) {
+ size := m.SizeVT()
+ return m.MarshalToSizedBufferVT(dAtA[:size])
+}
+
+func (m *StateSnapshot) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
+ if m == nil {
+ return 0, nil
+ }
+ i := len(dAtA)
+ _ = i
+ var l int
+ _ = l
+ if m.unknownFields != nil {
+ i -= len(m.unknownFields)
+ copy(dAtA[i:], m.unknownFields)
+ }
+ if len(m.Clusters) > 0 {
+ for iNdEx := len(m.Clusters) - 1; iNdEx >= 0; iNdEx-- {
+ size, err := m.Clusters[iNdEx].MarshalToSizedBufferVT(dAtA[:i])
+ if err != nil {
+ return 0, err
+ }
+ i -= size
+ i = protohelpers.EncodeVarint(dAtA, i, uint64(size))
+ i--
+ dAtA[i] = 0xa
+ }
+ }
+ return len(dAtA) - i, nil
+}
+
+func (m *ClusterSnapshot) MarshalVT() (dAtA []byte, err error) {
+ if m == nil {
+ return nil, nil
+ }
+ size := m.SizeVT()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalToSizedBufferVT(dAtA[:size])
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *ClusterSnapshot) MarshalToVT(dAtA []byte) (int, error) {
+ size := m.SizeVT()
+ return m.MarshalToSizedBufferVT(dAtA[:size])
+}
+
+func (m *ClusterSnapshot) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
+ if m == nil {
+ return 0, nil
+ }
+ i := len(dAtA)
+ _ = i
+ var l int
+ _ = l
+ if m.unknownFields != nil {
+ i -= len(m.unknownFields)
+ copy(dAtA[i:], m.unknownFields)
+ }
+ if len(m.Affiliates) > 0 {
+ for iNdEx := len(m.Affiliates) - 1; iNdEx >= 0; iNdEx-- {
+ size, err := m.Affiliates[iNdEx].MarshalToSizedBufferVT(dAtA[:i])
+ if err != nil {
+ return 0, err
+ }
+ i -= size
+ i = protohelpers.EncodeVarint(dAtA, i, uint64(size))
+ i--
+ dAtA[i] = 0x12
+ }
+ }
+ if len(m.Id) > 0 {
+ i -= len(m.Id)
+ copy(dAtA[i:], m.Id)
+ i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.Id)))
+ i--
+ dAtA[i] = 0xa
+ }
+ return len(dAtA) - i, nil
+}
+
+func (m *AffiliateSnapshot) MarshalVT() (dAtA []byte, err error) {
+ if m == nil {
+ return nil, nil
+ }
+ size := m.SizeVT()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalToSizedBufferVT(dAtA[:size])
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *AffiliateSnapshot) MarshalToVT(dAtA []byte) (int, error) {
+ size := m.SizeVT()
+ return m.MarshalToSizedBufferVT(dAtA[:size])
+}
+
+func (m *AffiliateSnapshot) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
+ if m == nil {
+ return 0, nil
+ }
+ i := len(dAtA)
+ _ = i
+ var l int
+ _ = l
+ if m.unknownFields != nil {
+ i -= len(m.unknownFields)
+ copy(dAtA[i:], m.unknownFields)
+ }
+ if len(m.Endpoints) > 0 {
+ for iNdEx := len(m.Endpoints) - 1; iNdEx >= 0; iNdEx-- {
+ size, err := m.Endpoints[iNdEx].MarshalToSizedBufferVT(dAtA[:i])
+ if err != nil {
+ return 0, err
+ }
+ i -= size
+ i = protohelpers.EncodeVarint(dAtA, i, uint64(size))
+ i--
+ dAtA[i] = 0x22
+ }
+ }
+ if len(m.Data) > 0 {
+ i -= len(m.Data)
+ copy(dAtA[i:], m.Data)
+ i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.Data)))
+ i--
+ dAtA[i] = 0x1a
+ }
+ if m.Expiration != nil {
+ size, err := (*timestamppb1.Timestamp)(m.Expiration).MarshalToSizedBufferVT(dAtA[:i])
+ if err != nil {
+ return 0, err
+ }
+ i -= size
+ i = protohelpers.EncodeVarint(dAtA, i, uint64(size))
+ i--
+ dAtA[i] = 0x12
+ }
+ if len(m.Id) > 0 {
+ i -= len(m.Id)
+ copy(dAtA[i:], m.Id)
+ i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.Id)))
+ i--
+ dAtA[i] = 0xa
+ }
+ return len(dAtA) - i, nil
+}
+
+func (m *EndpointSnapshot) MarshalVT() (dAtA []byte, err error) {
+ if m == nil {
+ return nil, nil
+ }
+ size := m.SizeVT()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalToSizedBufferVT(dAtA[:size])
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *EndpointSnapshot) MarshalToVT(dAtA []byte) (int, error) {
+ size := m.SizeVT()
+ return m.MarshalToSizedBufferVT(dAtA[:size])
+}
+
+func (m *EndpointSnapshot) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
+ if m == nil {
+ return 0, nil
+ }
+ i := len(dAtA)
+ _ = i
+ var l int
+ _ = l
+ if m.unknownFields != nil {
+ i -= len(m.unknownFields)
+ copy(dAtA[i:], m.unknownFields)
+ }
+ if len(m.Data) > 0 {
+ i -= len(m.Data)
+ copy(dAtA[i:], m.Data)
+ i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.Data)))
+ i--
+ dAtA[i] = 0x12
+ }
+ if m.Expiration != nil {
+ size, err := (*timestamppb1.Timestamp)(m.Expiration).MarshalToSizedBufferVT(dAtA[:i])
+ if err != nil {
+ return 0, err
+ }
+ i -= size
+ i = protohelpers.EncodeVarint(dAtA, i, uint64(size))
+ i--
+ dAtA[i] = 0xa
+ }
+ return len(dAtA) - i, nil
+}
+
+func (m *StateSnapshot) SizeVT() (n int) {
+ if m == nil {
+ return 0
+ }
+ var l int
+ _ = l
+ if len(m.Clusters) > 0 {
+ for _, e := range m.Clusters {
+ l = e.SizeVT()
+ n += 1 + l + protohelpers.SizeOfVarint(uint64(l))
+ }
+ }
+ n += len(m.unknownFields)
+ return n
+}
+
+func (m *ClusterSnapshot) SizeVT() (n int) {
+ if m == nil {
+ return 0
+ }
+ var l int
+ _ = l
+ l = len(m.Id)
+ if l > 0 {
+ n += 1 + l + protohelpers.SizeOfVarint(uint64(l))
+ }
+ if len(m.Affiliates) > 0 {
+ for _, e := range m.Affiliates {
+ l = e.SizeVT()
+ n += 1 + l + protohelpers.SizeOfVarint(uint64(l))
+ }
+ }
+ n += len(m.unknownFields)
+ return n
+}
+
+func (m *AffiliateSnapshot) SizeVT() (n int) {
+ if m == nil {
+ return 0
+ }
+ var l int
+ _ = l
+ l = len(m.Id)
+ if l > 0 {
+ n += 1 + l + protohelpers.SizeOfVarint(uint64(l))
+ }
+ if m.Expiration != nil {
+ l = (*timestamppb1.Timestamp)(m.Expiration).SizeVT()
+ n += 1 + l + protohelpers.SizeOfVarint(uint64(l))
+ }
+ l = len(m.Data)
+ if l > 0 {
+ n += 1 + l + protohelpers.SizeOfVarint(uint64(l))
+ }
+ if len(m.Endpoints) > 0 {
+ for _, e := range m.Endpoints {
+ l = e.SizeVT()
+ n += 1 + l + protohelpers.SizeOfVarint(uint64(l))
+ }
+ }
+ n += len(m.unknownFields)
+ return n
+}
+
+func (m *EndpointSnapshot) SizeVT() (n int) {
+ if m == nil {
+ return 0
+ }
+ var l int
+ _ = l
+ if m.Expiration != nil {
+ l = (*timestamppb1.Timestamp)(m.Expiration).SizeVT()
+ n += 1 + l + protohelpers.SizeOfVarint(uint64(l))
+ }
+ l = len(m.Data)
+ if l > 0 {
+ n += 1 + l + protohelpers.SizeOfVarint(uint64(l))
+ }
+ n += len(m.unknownFields)
+ return n
+}
+
+func (m *StateSnapshot) UnmarshalVT(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return protohelpers.ErrIntOverflow
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= uint64(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: StateSnapshot: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: StateSnapshot: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Clusters", wireType)
+ }
+ var msglen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return protohelpers.ErrIntOverflow
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ msglen |= int(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if msglen < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ postIndex := iNdEx + msglen
+ if postIndex < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Clusters = append(m.Clusters, &ClusterSnapshot{})
+ if err := m.Clusters[len(m.Clusters)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
+ return err
+ }
+ iNdEx = postIndex
+ default:
+ iNdEx = preIndex
+ skippy, err := protohelpers.Skip(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if (skippy < 0) || (iNdEx+skippy) < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func (m *ClusterSnapshot) UnmarshalVT(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return protohelpers.ErrIntOverflow
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= uint64(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: ClusterSnapshot: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: ClusterSnapshot: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType)
+ }
+ var stringLen uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return protohelpers.ErrIntOverflow
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ stringLen |= uint64(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ intStringLen := int(stringLen)
+ if intStringLen < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ postIndex := iNdEx + intStringLen
+ if postIndex < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Id = string(dAtA[iNdEx:postIndex])
+ iNdEx = postIndex
+ case 2:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Affiliates", wireType)
+ }
+ var msglen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return protohelpers.ErrIntOverflow
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ msglen |= int(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if msglen < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ postIndex := iNdEx + msglen
+ if postIndex < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Affiliates = append(m.Affiliates, &AffiliateSnapshot{})
+ if err := m.Affiliates[len(m.Affiliates)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
+ return err
+ }
+ iNdEx = postIndex
+ default:
+ iNdEx = preIndex
+ skippy, err := protohelpers.Skip(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if (skippy < 0) || (iNdEx+skippy) < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func (m *AffiliateSnapshot) UnmarshalVT(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return protohelpers.ErrIntOverflow
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= uint64(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: AffiliateSnapshot: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: AffiliateSnapshot: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType)
+ }
+ var stringLen uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return protohelpers.ErrIntOverflow
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ stringLen |= uint64(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ intStringLen := int(stringLen)
+ if intStringLen < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ postIndex := iNdEx + intStringLen
+ if postIndex < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Id = string(dAtA[iNdEx:postIndex])
+ iNdEx = postIndex
+ case 2:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Expiration", wireType)
+ }
+ var msglen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return protohelpers.ErrIntOverflow
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ msglen |= int(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if msglen < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ postIndex := iNdEx + msglen
+ if postIndex < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ if m.Expiration == nil {
+ m.Expiration = ×tamppb.Timestamp{}
+ }
+ if err := (*timestamppb1.Timestamp)(m.Expiration).UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
+ return err
+ }
+ iNdEx = postIndex
+ case 3:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType)
+ }
+ var byteLen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return protohelpers.ErrIntOverflow
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ byteLen |= int(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if byteLen < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ postIndex := iNdEx + byteLen
+ if postIndex < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...)
+ if m.Data == nil {
+ m.Data = []byte{}
+ }
+ iNdEx = postIndex
+ case 4:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Endpoints", wireType)
+ }
+ var msglen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return protohelpers.ErrIntOverflow
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ msglen |= int(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if msglen < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ postIndex := iNdEx + msglen
+ if postIndex < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Endpoints = append(m.Endpoints, &EndpointSnapshot{})
+ if err := m.Endpoints[len(m.Endpoints)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
+ return err
+ }
+ iNdEx = postIndex
+ default:
+ iNdEx = preIndex
+ skippy, err := protohelpers.Skip(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if (skippy < 0) || (iNdEx+skippy) < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func (m *EndpointSnapshot) UnmarshalVT(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return protohelpers.ErrIntOverflow
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= uint64(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: EndpointSnapshot: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: EndpointSnapshot: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Expiration", wireType)
+ }
+ var msglen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return protohelpers.ErrIntOverflow
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ msglen |= int(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if msglen < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ postIndex := iNdEx + msglen
+ if postIndex < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ if m.Expiration == nil {
+ m.Expiration = ×tamppb.Timestamp{}
+ }
+ if err := (*timestamppb1.Timestamp)(m.Expiration).UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
+ return err
+ }
+ iNdEx = postIndex
+ case 2:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType)
+ }
+ var byteLen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return protohelpers.ErrIntOverflow
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ byteLen |= int(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if byteLen < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ postIndex := iNdEx + byteLen
+ if postIndex < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...)
+ if m.Data == nil {
+ m.Data = []byte{}
+ }
+ iNdEx = postIndex
+ default:
+ iNdEx = preIndex
+ skippy, err := protohelpers.Skip(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if (skippy < 0) || (iNdEx+skippy) < 0 {
+ return protohelpers.ErrInvalidLength
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
diff --git a/cmd/discovery-service/main.go b/cmd/discovery-service/main.go
index 23d6097..b9dce91 100644
--- a/cmd/discovery-service/main.go
+++ b/cmd/discovery-service/main.go
@@ -22,6 +22,7 @@ import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
+ "github.com/jonboulle/clockwork"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/siderolabs/discovery-api/api/v1alpha1/server/pb"
@@ -37,6 +38,7 @@ import (
"github.com/siderolabs/discovery-service/internal/limiter"
_ "github.com/siderolabs/discovery-service/internal/proto"
"github.com/siderolabs/discovery-service/internal/state"
+ "github.com/siderolabs/discovery-service/internal/state/storage"
"github.com/siderolabs/discovery-service/pkg/limits"
"github.com/siderolabs/discovery-service/pkg/server"
)
@@ -49,6 +51,9 @@ var (
devMode = false
gcInterval = time.Minute
redirectEndpoint = ""
+ snapshotsEnabled = true
+ snapshotPath = "/var/discovery-service/state.binpb"
+ snapshotInterval = 10 * time.Minute
)
func init() {
@@ -58,6 +63,9 @@ func init() {
flag.BoolVar(&devMode, "debug", devMode, "enable debug mode")
flag.DurationVar(&gcInterval, "gc-interval", gcInterval, "garbage collection interval")
flag.StringVar(&redirectEndpoint, "redirect-endpoint", redirectEndpoint, "redirect all clients to a new endpoint (gRPC endpoint, e.g. 'example.com:443'")
+ flag.BoolVar(&snapshotsEnabled, "snapshots-enabled", snapshotsEnabled, "enable snapshots")
+ flag.StringVar(&snapshotPath, "snapshot-path", snapshotPath, "path to the snapshot file")
+ flag.DurationVar(&snapshotInterval, "snapshot-interval", snapshotInterval, "interval to save the snapshot")
if debug.Enabled {
flag.StringVar(&debugAddr, "debug-addr", debugAddr, "debug (pprof, trace, expvar) listen addr")
@@ -189,6 +197,19 @@ func run(ctx context.Context, logger *zap.Logger) error {
state := state.NewState(logger)
prom.MustRegister(state)
+ var stateStorage *storage.Storage
+
+ if snapshotsEnabled {
+ stateStorage = storage.New(snapshotPath, state, logger)
+ prom.MustRegister(stateStorage)
+
+ if err := stateStorage.Load(); err != nil {
+ logger.Warn("failed to load state from storage", zap.Error(err))
+ }
+ } else {
+ logger.Info("snapshots are disabled")
+ }
+
srv := server.NewClusterServer(state, ctx.Done(), redirectEndpoint)
prom.MustRegister(srv)
@@ -226,6 +247,12 @@ func run(ctx context.Context, logger *zap.Logger) error {
eg, ctx := errgroup.WithContext(ctx)
+ if snapshotsEnabled {
+ eg.Go(func() error {
+ return stateStorage.Start(ctx, clockwork.NewRealClock(), snapshotInterval)
+ })
+ }
+
eg.Go(func() error {
logger.Info("gRPC server starting", zap.Stringer("address", lis.Addr()))
diff --git a/cmd/snapshot-decoder/main.go b/cmd/snapshot-decoder/main.go
new file mode 100644
index 0000000..38f5686
--- /dev/null
+++ b/cmd/snapshot-decoder/main.go
@@ -0,0 +1,64 @@
+// Copyright (c) 2024 Sidero Labs, Inc.
+//
+// Use of this software is governed by the Business Source License
+// included in the LICENSE file.
+
+// Package main implements a simple tool to decode a snapshot file.
+package main
+
+import (
+ "bytes"
+ "flag"
+ "fmt"
+ "io"
+ "log"
+ "os"
+
+ "google.golang.org/protobuf/encoding/protojson"
+
+ storagepb "github.com/siderolabs/discovery-service/api/storage"
+)
+
+var snapshotPath = "/var/discovery-service/state.binpb"
+
+func init() {
+ flag.StringVar(&snapshotPath, "snapshot-path", snapshotPath, "path to the snapshot file")
+}
+
+func main() {
+ flag.Parse()
+
+ if err := run(); err != nil {
+ log.Fatalf("error: %v", err)
+ }
+}
+
+func run() error {
+ data, err := os.ReadFile(snapshotPath)
+ if err != nil {
+ return fmt.Errorf("failed to read snapshot: %w", err)
+ }
+
+ snapshot := &storagepb.StateSnapshot{}
+
+ if err = snapshot.UnmarshalVT(data); err != nil {
+ return fmt.Errorf("failed to unmarshal snapshot: %w", err)
+ }
+
+ opts := protojson.MarshalOptions{
+ Indent: " ",
+ }
+
+ json, err := opts.Marshal(snapshot)
+ if err != nil {
+ return fmt.Errorf("failed to marshal snapshot: %w", err)
+ }
+
+ if _, err = io.Copy(os.Stdout, bytes.NewReader(json)); err != nil {
+ return fmt.Errorf("failed to write snapshot: %w", err)
+ }
+
+ fmt.Println()
+
+ return nil
+}
diff --git a/go.mod b/go.mod
index b3c64d2..db562ea 100644
--- a/go.mod
+++ b/go.mod
@@ -1,10 +1,12 @@
module github.com/siderolabs/discovery-service
-go 1.22.1
+go 1.22.3
require (
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1
+ github.com/jonboulle/clockwork v0.4.1-0.20231224152657-fc59783b0293
+ github.com/planetscale/vtprotobuf v0.6.0
github.com/prometheus/client_golang v1.19.0
github.com/siderolabs/discovery-api v0.1.4
github.com/siderolabs/discovery-client v0.1.8
@@ -16,7 +18,7 @@ require (
golang.org/x/sync v0.6.0
golang.org/x/time v0.5.0
google.golang.org/grpc v1.62.1
- google.golang.org/protobuf v1.33.0
+ google.golang.org/protobuf v1.34.1
)
require (
@@ -25,15 +27,14 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
- github.com/planetscale/vtprotobuf v0.6.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
- golang.org/x/net v0.21.0 // indirect
- golang.org/x/sys v0.17.0 // indirect
- golang.org/x/text v0.14.0 // indirect
+ golang.org/x/net v0.25.0 // indirect
+ golang.org/x/sys v0.20.0 // indirect
+ golang.org/x/text v0.15.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
diff --git a/go.sum b/go.sum
index fefb714..bf91970 100644
--- a/go.sum
+++ b/go.sum
@@ -16,6 +16,8 @@ github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 h1:f4tg
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0/go.mod h1:hKAkSgNkL0FII46ZkJcpVEAai4KV+swlIWCKfekd1pA=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 h1:HcUWd006luQPljE73d5sk+/VgYPGUReEVz2y1/qylwY=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1/go.mod h1:w9Y7gY31krpLmrVU5ZPG9H7l9fZuRu5/3R3S3FMtVQ4=
+github.com/jonboulle/clockwork v0.4.1-0.20231224152657-fc59783b0293 h1:l3TVsYI+QxIp0CW7YCizx9WG26Lj7DXnc1pdlBKk3gY=
+github.com/jonboulle/clockwork v0.4.1-0.20231224152657-fc59783b0293/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@@ -52,14 +54,14 @@ go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go4.org/netipx v0.0.0-20231129151722-fdeea329fbba h1:0b9z3AuHCjxk0x/opv64kcgZLBseWJUpBw5I82+2U4M=
go4.org/netipx v0.0.0-20231129151722-fdeea329fbba/go.mod h1:PLyyIXexvUFg3Owu6p/WfdlivPbZJsZdgWZlrGope/Y=
-golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
-golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
+golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
+golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
-golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
-golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
-golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
-golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
+golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
+golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
+golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -69,8 +71,8 @@ google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk=
google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
-google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
-google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
+google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
+google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
diff --git a/internal/landing/html/index.html b/internal/landing/html/index.html
index a55d952..029fa66 100644
--- a/internal/landing/html/index.html
+++ b/internal/landing/html/index.html
@@ -35,8 +35,7 @@ Details
- Moreover, the discovery service has no peristence.
- Data is stored in memory only with a TTL set by the clients (i.e. Talos).
+ Data is stored in memory only with a TTL set by the clients (i.e., Talos), and periodic snapshots to disk are enabled.
The cluster ID is used as a key to select the affiliates (so that different clusters see different affiliates).
diff --git a/internal/state/affiliate_test.go b/internal/state/affiliate_test.go
index ec0ff56..c8f3985 100644
--- a/internal/state/affiliate_test.go
+++ b/internal/state/affiliate_test.go
@@ -106,7 +106,7 @@ func TestAffiliateTooManyEndpoints(t *testing.T) {
affiliate := state.NewAffiliate("id1")
- for i := 0; i < limits.AffiliateEndpointsMax; i++ {
+ for i := range limits.AffiliateEndpointsMax {
assert.NoError(t, affiliate.MergeEndpoints([][]byte{[]byte(fmt.Sprintf("endpoint%d", i))}, now))
}
diff --git a/internal/state/cluster_test.go b/internal/state/cluster_test.go
index a21f7de..742d945 100644
--- a/internal/state/cluster_test.go
+++ b/internal/state/cluster_test.go
@@ -272,7 +272,7 @@ func TestClusterTooManyAffiliates(t *testing.T) {
cluster := state.NewCluster("cluster3")
- for i := 0; i < limits.ClusterAffiliatesMax; i++ {
+ for i := range limits.ClusterAffiliatesMax {
assert.NoError(t, cluster.WithAffiliate(fmt.Sprintf("af%d", i), func(*state.Affiliate) error {
return nil
}))
diff --git a/internal/state/snapshot.go b/internal/state/snapshot.go
new file mode 100644
index 0000000..a2d938d
--- /dev/null
+++ b/internal/state/snapshot.go
@@ -0,0 +1,130 @@
+// Copyright (c) 2024 Sidero Labs, Inc.
+//
+// Use of this software is governed by the Business Source License
+// included in the LICENSE file.
+
+package state
+
+import (
+ "slices"
+
+ "github.com/siderolabs/gen/xslices"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ storagepb "github.com/siderolabs/discovery-service/api/storage"
+)
+
+// ExportClusterSnapshots exports all cluster snapshots and calls the provided function for each one.
+//
+// Implements storage.Snapshotter interface.
+func (state *State) ExportClusterSnapshots(f func(snapshot *storagepb.ClusterSnapshot) error) error {
+ var err error
+
+ // reuse the same snapshotin each iteration
+ clusterSnapshot := &storagepb.ClusterSnapshot{}
+
+ state.clusters.Range(func(_ string, cluster *Cluster) bool {
+ snapshotCluster(cluster, clusterSnapshot)
+
+ err = f(clusterSnapshot)
+
+ return err == nil
+ })
+
+ return err
+}
+
+// ImportClusterSnapshots imports cluster snapshots by calling the provided function until it returns false.
+//
+// Implements storage.Snapshotter interface.
+func (state *State) ImportClusterSnapshots(f func() (*storagepb.ClusterSnapshot, bool, error)) error {
+ for {
+ clusterSnapshot, ok, err := f()
+ if err != nil {
+ return err
+ }
+
+ if !ok {
+ break
+ }
+
+ cluster := clusterFromSnapshot(clusterSnapshot)
+
+ state.clusters.Store(cluster.id, cluster)
+ }
+
+ return nil
+}
+
+func snapshotCluster(cluster *Cluster, snapshot *storagepb.ClusterSnapshot) {
+ cluster.affiliatesMu.Lock()
+ defer cluster.affiliatesMu.Unlock()
+
+ snapshot.Id = cluster.id
+
+ // reuse the same slice, resize it as needed
+ snapshot.Affiliates = slices.Grow(snapshot.Affiliates, len(cluster.affiliates))
+ snapshot.Affiliates = snapshot.Affiliates[:len(cluster.affiliates)]
+
+ i := 0
+ for _, affiliate := range cluster.affiliates {
+ if snapshot.Affiliates[i] == nil {
+ snapshot.Affiliates[i] = &storagepb.AffiliateSnapshot{}
+ }
+
+ snapshot.Affiliates[i].Id = affiliate.id
+
+ if snapshot.Affiliates[i].Expiration == nil {
+ snapshot.Affiliates[i].Expiration = ×tamppb.Timestamp{}
+ }
+
+ snapshot.Affiliates[i].Expiration.Seconds = affiliate.expiration.Unix()
+ snapshot.Affiliates[i].Expiration.Nanos = int32(affiliate.expiration.Nanosecond())
+
+ snapshot.Affiliates[i].Data = affiliate.data
+
+ // reuse the same slice, resize it as needed
+ snapshot.Affiliates[i].Endpoints = slices.Grow(snapshot.Affiliates[i].Endpoints, len(affiliate.endpoints))
+ snapshot.Affiliates[i].Endpoints = snapshot.Affiliates[i].Endpoints[:len(affiliate.endpoints)]
+
+ for j, endpoint := range affiliate.endpoints {
+ if snapshot.Affiliates[i].Endpoints[j] == nil {
+ snapshot.Affiliates[i].Endpoints[j] = &storagepb.EndpointSnapshot{}
+ }
+
+ snapshot.Affiliates[i].Endpoints[j].Data = endpoint.data
+
+ if snapshot.Affiliates[i].Endpoints[j].Expiration == nil {
+ snapshot.Affiliates[i].Endpoints[j].Expiration = ×tamppb.Timestamp{}
+ }
+
+ snapshot.Affiliates[i].Endpoints[j].Expiration.Seconds = endpoint.expiration.Unix()
+ snapshot.Affiliates[i].Endpoints[j].Expiration.Nanos = int32(endpoint.expiration.Nanosecond())
+ }
+
+ i++
+ }
+}
+
+func clusterFromSnapshot(snapshot *storagepb.ClusterSnapshot) *Cluster {
+ return &Cluster{
+ id: snapshot.Id,
+ affiliates: xslices.ToMap(snapshot.Affiliates, affiliateFromSnapshot),
+ }
+}
+
+func affiliateFromSnapshot(snapshot *storagepb.AffiliateSnapshot) (string, *Affiliate) {
+ return snapshot.Id, &Affiliate{
+ id: snapshot.Id,
+ expiration: snapshot.Expiration.AsTime(),
+ data: snapshot.Data,
+ endpoints: xslices.Map(snapshot.Endpoints, endpointFromSnapshot),
+ }
+}
+
+func endpointFromSnapshot(snapshot *storagepb.EndpointSnapshot) Endpoint {
+ return Endpoint{
+ data: snapshot.Data,
+ expiration: snapshot.Expiration.AsTime(),
+ }
+}
diff --git a/internal/state/storage/protobuf.go b/internal/state/storage/protobuf.go
new file mode 100644
index 0000000..a10ec30
--- /dev/null
+++ b/internal/state/storage/protobuf.go
@@ -0,0 +1,135 @@
+// Copyright (c) 2024 Sidero Labs, Inc.
+//
+// Use of this software is governed by the Business Source License
+// included in the LICENSE file.
+
+package storage
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "slices"
+
+ "google.golang.org/protobuf/encoding/protowire"
+
+ storagepb "github.com/siderolabs/discovery-service/api/storage"
+)
+
+const (
+ clustersFieldNum = 1
+ clustersFieldType = protowire.BytesType
+
+ // MaxClusterSize is the maximum allowed size of a cluster snapshot.
+ MaxClusterSize = 138578179
+)
+
+// ErrClusterSnapshotTooLarge is returned when a cluster snapshot is above the maximum size of MaxClusterSize.
+var ErrClusterSnapshotTooLarge = fmt.Errorf("cluster snapshot is above the maximum size of %qB", MaxClusterSize)
+
+// encodeClusterSnapshot encodes a ClusterSnapshot into the given buffer, resizing it as needed.
+//
+// It returns the buffer with the encoded ClusterSnapshot and an error if the encoding fails.
+func encodeClusterSnapshot(buffer []byte, snapshot *storagepb.ClusterSnapshot) ([]byte, error) {
+ buffer = protowire.AppendTag(buffer, clustersFieldNum, clustersFieldType)
+ size := snapshot.SizeVT()
+ buffer = protowire.AppendVarint(buffer, uint64(size))
+
+ startIdx := len(buffer)
+ clusterSize := size
+
+ buffer = slices.Grow(buffer, clusterSize)
+ buffer = buffer[:startIdx+clusterSize]
+
+ if _, err := snapshot.MarshalToSizedBufferVT(buffer[startIdx:]); err != nil {
+ return nil, fmt.Errorf("failed to marshal cluster: %w", err)
+ }
+
+ return buffer, nil
+}
+
+// decodeClusterSnapshot decodes a ClusterSnapshot from the given buffer.
+func decodeClusterSnapshot(buffer []byte) (*storagepb.ClusterSnapshot, error) {
+ var clusterSnapshot storagepb.ClusterSnapshot
+
+ if err := clusterSnapshot.UnmarshalVT(buffer); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal cluster snapshot: %w", err)
+ }
+
+ return &clusterSnapshot, nil
+}
+
+// decodeClusterSnapshotHeader reads and decodes the header of a ClusterSnapshot from the given io.ByteReader.
+//
+// It returns the size of the header and the size of the snapshot.
+func decodeClusterSnapshotHeader(r io.ByteReader) (headerSize, clusterSize int, err error) {
+ tagNum, tagType, tagEncodedLen, err := consumeTag(r)
+ if err != nil {
+ return 0, 0, err
+ }
+
+ if tagNum != clustersFieldNum {
+ return 0, 0, fmt.Errorf("unexpected number: %v", tagNum)
+ }
+
+ if tagType != clustersFieldType {
+ return 0, 0, fmt.Errorf("unexpected type: %v", tagType)
+ }
+
+ clusterSizeVal, clusterSizeEncodedLen, err := consumeVarint(r)
+ if clusterSizeEncodedLen < 0 {
+ return 0, 0, err
+ }
+
+ if clusterSizeVal > MaxClusterSize {
+ return 0, 0, fmt.Errorf("%w: %v", ErrClusterSnapshotTooLarge, clusterSizeVal)
+ }
+
+ return tagEncodedLen + clusterSizeEncodedLen, int(clusterSizeVal), nil
+}
+
+// consumeTag reads a varint-encoded tag from the given io.ByteReader, reporting its length.
+//
+// It is an adaptation of protowire.ConsumeTag to work with io.ByteReader.
+//
+// It returns the tag number, the tag type, the length of the tag, and an error if the tag is invalid.
+func consumeTag(r io.ByteReader) (protowire.Number, protowire.Type, int, error) {
+ v, n, err := consumeVarint(r)
+ if err != nil {
+ return 0, 0, 0, err
+ }
+
+ num, typ := protowire.DecodeTag(v)
+ if num < protowire.MinValidNumber {
+ return 0, 0, 0, errors.New("invalid field number")
+ }
+
+ return num, typ, n, nil
+}
+
+// consumeVarint parses a varint-encoded uint64 from the given io.ByteReader, reporting its length.
+//
+// It is an adaptation of protowire.ConsumeVarint to work with io.ByteReader.
+//
+// It returns the parsed value, the length of the varint, and an error if the varint is invalid.
+func consumeVarint(r io.ByteReader) (uint64, int, error) {
+ var v uint64
+
+ for i := range 10 {
+ b, err := r.ReadByte()
+ if err != nil {
+ return 0, 0, err
+ }
+
+ y := uint64(b)
+ v += y << uint(i*7)
+
+ if y < 0x80 {
+ return v, i + 1, nil
+ }
+
+ v -= 0x80 << uint(i*7)
+ }
+
+ return 0, 0, errors.New("variable length integer overflow")
+}
diff --git a/internal/state/storage/storage.go b/internal/state/storage/storage.go
new file mode 100644
index 0000000..bdeed0a
--- /dev/null
+++ b/internal/state/storage/storage.go
@@ -0,0 +1,397 @@
+// Copyright (c) 2024 Sidero Labs, Inc.
+//
+// Use of this software is governed by the Business Source License
+// included in the LICENSE file.
+
+// Package storage implements persistent storage for the state of the discovery service.
+package storage
+
+import (
+ "bufio"
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+ "slices"
+ "sync"
+ "time"
+
+ "github.com/jonboulle/clockwork"
+ prom "github.com/prometheus/client_golang/prometheus"
+ "go.uber.org/zap"
+
+ storagepb "github.com/siderolabs/discovery-service/api/storage"
+)
+
+const (
+ labelOperation = "operation"
+ labelStatus = "status"
+
+ operationSave = "save"
+ operationLoad = "load"
+
+ statusSuccess = "success"
+ statusError = "error"
+)
+
+// Storage is a persistent storage for the state of the discovery service.
+type Storage struct {
+ state Snapshotter
+ logger *zap.Logger
+
+ operationsMetric *prom.CounterVec
+ lastSnapshotSizeMetric *prom.GaugeVec
+ lastOperationClustersMetric *prom.GaugeVec
+ lastOperationAffiliatesMetric *prom.GaugeVec
+ lastOperationEndpointsMetric *prom.GaugeVec
+ lastOperationDurationMetric *prom.GaugeVec
+
+ path string
+}
+
+// Describe implements prometheus.Collector interface.
+func (storage *Storage) Describe(descs chan<- *prom.Desc) {
+ prom.DescribeByCollect(storage, descs)
+}
+
+// Collect implements prometheus.Collector interface.
+func (storage *Storage) Collect(metrics chan<- prom.Metric) {
+ storage.operationsMetric.Collect(metrics)
+ storage.lastSnapshotSizeMetric.Collect(metrics)
+ storage.lastOperationClustersMetric.Collect(metrics)
+ storage.lastOperationAffiliatesMetric.Collect(metrics)
+ storage.lastOperationEndpointsMetric.Collect(metrics)
+ storage.lastOperationDurationMetric.Collect(metrics)
+}
+
+// Snapshotter is an interface for exporting and importing cluster state.
+type Snapshotter interface {
+ // ExportClusterSnapshots exports cluster snapshots to the given function.
+ ExportClusterSnapshots(f func(*storagepb.ClusterSnapshot) error) error
+
+ // ImportClusterSnapshots imports cluster snapshots from the given function.
+ ImportClusterSnapshots(f func() (*storagepb.ClusterSnapshot, bool, error)) error
+}
+
+// New creates a new instance of Storage.
+func New(path string, state Snapshotter, logger *zap.Logger) *Storage {
+ return &Storage{
+ state: state,
+ logger: logger.With(zap.String("component", "storage"), zap.String("path", path)),
+ path: path,
+
+ operationsMetric: prom.NewCounterVec(prom.CounterOpts{
+ Name: "discovery_storage_operations_total",
+ Help: "The total number of storage operations.",
+ }, []string{labelOperation, labelStatus}),
+ lastSnapshotSizeMetric: prom.NewGaugeVec(prom.GaugeOpts{
+ Name: "discovery_storage_last_snapshot_size_bytes",
+ Help: "The size of the last processed snapshot in bytes.",
+ }, []string{labelOperation}),
+ lastOperationClustersMetric: prom.NewGaugeVec(prom.GaugeOpts{
+ Name: "discovery_storage_last_operation_clusters",
+ Help: "The number of clusters in the snapshot of the last operation.",
+ }, []string{labelOperation}),
+ lastOperationAffiliatesMetric: prom.NewGaugeVec(prom.GaugeOpts{
+ Name: "discovery_storage_last_operation_affiliates",
+ Help: "The number of affiliates in the snapshot of the last operation.",
+ }, []string{labelOperation}),
+ lastOperationEndpointsMetric: prom.NewGaugeVec(prom.GaugeOpts{
+ Name: "discovery_storage_last_operation_endpoints",
+ Help: "The number of endpoints in the snapshot of the last operation.",
+ }, []string{labelOperation}),
+ lastOperationDurationMetric: prom.NewGaugeVec(prom.GaugeOpts{
+ Name: "discovery_storage_last_operation_duration_seconds",
+ Help: "The duration of the last operation in seconds.",
+ }, []string{labelOperation}),
+ }
+}
+
+// Start starts the storage loop that periodically saves the state.
+func (storage *Storage) Start(ctx context.Context, clock clockwork.Clock, interval time.Duration) error {
+ storage.logger.Info("start storage loop", zap.Duration("interval", interval))
+
+ ticker := clock.NewTicker(interval)
+
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ storage.logger.Info("received shutdown signal")
+
+ if err := storage.Save(); err != nil {
+ return fmt.Errorf("failed to save state on shutdown: %w", err)
+ }
+
+ if errors.Is(ctx.Err(), context.Canceled) {
+ return nil
+ }
+
+ return ctx.Err()
+ case <-ticker.Chan():
+ if err := storage.Save(); err != nil {
+ storage.logger.Error("failed to save state", zap.Error(err))
+ }
+ }
+ }
+}
+
+// Save saves all clusters' states into the persistent storage.
+func (storage *Storage) Save() (err error) {
+ start := time.Now()
+
+ defer func() {
+ if err != nil {
+ storage.operationsMetric.WithLabelValues(operationSave, statusError).Inc()
+ }
+ }()
+
+ if err = os.MkdirAll(filepath.Dir(storage.path), 0o755); err != nil {
+ return fmt.Errorf("failed to create directory path: %w", err)
+ }
+
+ tmpFile, err := getTempFile(storage.path)
+ if err != nil {
+ return fmt.Errorf("failed to create temporary file: %w", err)
+ }
+
+ defer func() {
+ tmpFile.Close() //nolint:errcheck
+ os.Remove(tmpFile.Name()) //nolint:errcheck
+ }()
+
+ stats, err := storage.Export(tmpFile)
+ if err != nil {
+ return fmt.Errorf("failed to write snapshot: %w", err)
+ }
+
+ if err = commitTempFile(tmpFile, storage.path); err != nil {
+ return fmt.Errorf("failed to commit temporary file: %w", err)
+ }
+
+ duration := time.Since(start)
+
+ storage.logger.Info("state saved", zap.Int("clusters", stats.NumClusters), zap.Int("affiliates", stats.NumAffiliates),
+ zap.Int("endpoints", stats.NumEndpoints), zap.Duration("duration", duration))
+
+ storage.operationsMetric.WithLabelValues(operationSave, statusSuccess).Inc()
+ storage.lastSnapshotSizeMetric.WithLabelValues(operationSave).Set(float64(stats.Size))
+ storage.lastOperationClustersMetric.WithLabelValues(operationSave).Set(float64(stats.NumClusters))
+ storage.lastOperationAffiliatesMetric.WithLabelValues(operationSave).Set(float64(stats.NumAffiliates))
+ storage.lastOperationEndpointsMetric.WithLabelValues(operationSave).Set(float64(stats.NumEndpoints))
+ storage.lastOperationDurationMetric.WithLabelValues(operationSave).Set(duration.Seconds())
+
+ return nil
+}
+
+// Load loads all clusters' states from the persistent storage.
+func (storage *Storage) Load() (err error) {
+ defer func() {
+ if err != nil {
+ storage.operationsMetric.WithLabelValues(operationLoad, statusError).Inc()
+ }
+ }()
+
+ start := time.Now()
+
+ // open file for reading
+ file, err := os.Open(storage.path)
+ if err != nil {
+ return fmt.Errorf("failed to open file: %w", err)
+ }
+
+ defer file.Close() //nolint:errcheck
+
+ stats, err := storage.Import(file)
+ if err != nil {
+ return fmt.Errorf("failed to read snapshot: %w", err)
+ }
+
+ if err = file.Close(); err != nil {
+ return fmt.Errorf("failed to close file: %w", err)
+ }
+
+ duration := time.Since(start)
+
+ storage.logger.Info("state loaded", zap.Int("clusters", stats.NumClusters), zap.Int("affiliates", stats.NumAffiliates),
+ zap.Int("endpoints", stats.NumEndpoints), zap.Duration("duration", duration))
+
+ storage.operationsMetric.WithLabelValues(operationLoad, statusSuccess).Inc()
+ storage.lastSnapshotSizeMetric.WithLabelValues(operationLoad).Set(float64(stats.Size))
+ storage.lastOperationClustersMetric.WithLabelValues(operationLoad).Set(float64(stats.NumClusters))
+ storage.lastOperationAffiliatesMetric.WithLabelValues(operationLoad).Set(float64(stats.NumAffiliates))
+ storage.lastOperationEndpointsMetric.WithLabelValues(operationLoad).Set(float64(stats.NumEndpoints))
+ storage.lastOperationDurationMetric.WithLabelValues(operationLoad).Set(duration.Seconds())
+
+ return nil
+}
+
+// Import imports all clusters' states from the given reader.
+//
+// When importing, we avoid unmarshalling to the storagepb.StateSnapshot type directly, as it causes an allocation of all the cluster snapshots at once.
+// Instead, we process clusters in a streaming manner, unmarshaling them one by one and importing them into the state.
+func (storage *Storage) Import(reader io.Reader) (SnapshotStats, error) {
+ size := 0
+ numClusters := 0
+ numAffiliates := 0
+ numEndpoints := 0
+
+ buffer := make([]byte, 256)
+ bufferedReader := bufio.NewReader(reader)
+
+ // unmarshal the clusters in a streaming manner and import them into the state
+ if err := storage.state.ImportClusterSnapshots(func() (*storagepb.ClusterSnapshot, bool, error) {
+ headerSize, clusterSize, err := decodeClusterSnapshotHeader(bufferedReader)
+ if err != nil {
+ if err == io.EOF { //nolint:errorlint
+ return nil, false, nil
+ }
+
+ return nil, false, fmt.Errorf("failed to decode cluster header: %w", err)
+ }
+
+ if clusterSize > cap(buffer) {
+ buffer = slices.Grow(buffer, clusterSize-cap(buffer))
+ }
+
+ buffer = buffer[:clusterSize]
+
+ if _, err = io.ReadFull(bufferedReader, buffer); err != nil {
+ return nil, false, fmt.Errorf("failed to read bytes: %w", err)
+ }
+
+ clusterSnapshot, err := decodeClusterSnapshot(buffer)
+ if err != nil {
+ return nil, false, fmt.Errorf("failed to decode cluster: %w", err)
+ }
+
+ buffer = buffer[:0]
+
+ // update stats
+ size += headerSize + clusterSize
+ numClusters++
+ numAffiliates += len(clusterSnapshot.Affiliates)
+
+ for _, affiliate := range clusterSnapshot.Affiliates {
+ numEndpoints += len(affiliate.Endpoints)
+ }
+
+ return clusterSnapshot, true, nil
+ }); err != nil {
+ return SnapshotStats{}, fmt.Errorf("failed to import clusters: %w", err)
+ }
+
+ return SnapshotStats{
+ Size: size,
+ NumClusters: numClusters,
+ NumAffiliates: numAffiliates,
+ NumEndpoints: numEndpoints,
+ }, nil
+}
+
+// Export exports all clusters' states into the given writer.
+//
+// When exporting, we avoid marshaling to the storagepb.StateSnapshot type directly, as it causes an allocation of all the cluster snapshots at once.
+// Instead, we process clusters in a streaming manner, marshaling them one by one and exporting them into the writer.
+func (storage *Storage) Export(writer io.Writer) (SnapshotStats, error) {
+ numClusters := 0
+ numAffiliates := 0
+ numEndpoints := 0
+ size := 0
+
+ var buffer []byte
+
+ bufferedWriter := bufio.NewWriter(writer)
+
+ // marshal the clusters in a streaming manner and export them into the writer
+ if err := storage.state.ExportClusterSnapshots(func(snapshot *storagepb.ClusterSnapshot) error {
+ var err error
+
+ buffer, err = encodeClusterSnapshot(buffer, snapshot)
+ if err != nil {
+ return fmt.Errorf("failed to encode cluster: %w", err)
+ }
+
+ written, err := bufferedWriter.Write(buffer)
+ if err != nil {
+ return fmt.Errorf("failed to write cluster: %w", err)
+ }
+
+ // prepare the buffer for the next iteration - reset it
+ buffer = buffer[:0]
+
+ // update stats
+ size += written
+ numClusters++
+ numAffiliates += len(snapshot.Affiliates)
+
+ for _, affiliate := range snapshot.Affiliates {
+ numEndpoints += len(affiliate.Endpoints)
+ }
+
+ return nil
+ }); err != nil {
+ return SnapshotStats{}, fmt.Errorf("failed to snapshot clusters: %w", err)
+ }
+
+ if err := bufferedWriter.Flush(); err != nil {
+ return SnapshotStats{}, fmt.Errorf("failed to flush writer: %w", err)
+ }
+
+ return SnapshotStats{
+ Size: size,
+ NumClusters: numClusters,
+ NumAffiliates: numAffiliates,
+ NumEndpoints: numEndpoints,
+ }, nil
+}
+
+func getTempFile(dst string) (*os.File, error) {
+ tmpFile, err := os.OpenFile(dst+".tmp", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o666)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create file: %w", err)
+ }
+
+ return tmpFile, nil
+}
+
+// commitTempFile commits the temporary file to the destination and removes it.
+func commitTempFile(tmpFile *os.File, dst string) error {
+ renamed := false
+ closer := sync.OnceValue(tmpFile.Close)
+
+ defer func() {
+ closer() //nolint:errcheck
+
+ if !renamed {
+ os.Remove(tmpFile.Name()) //nolint:errcheck
+ }
+ }()
+
+ if err := tmpFile.Sync(); err != nil {
+ return fmt.Errorf("failed to sync data: %w", err)
+ }
+
+ if err := closer(); err != nil {
+ return fmt.Errorf("failed to close file: %w", err)
+ }
+
+ if err := os.Rename(tmpFile.Name(), dst); err != nil {
+ return fmt.Errorf("failed to rename file: %w", err)
+ }
+
+ renamed = true
+
+ return nil
+}
+
+// SnapshotStats contains statistics about a snapshot.
+type SnapshotStats struct {
+ Size int
+ NumClusters int
+ NumAffiliates int
+ NumEndpoints int
+}
diff --git a/internal/state/storage/storage_bench_test.go b/internal/state/storage/storage_bench_test.go
new file mode 100644
index 0000000..445bd68
--- /dev/null
+++ b/internal/state/storage/storage_bench_test.go
@@ -0,0 +1,62 @@
+// Copyright (c) 2024 Sidero Labs, Inc.
+//
+// Use of this software is governed by the Business Source License
+// included in the LICENSE file.
+
+package storage_test
+
+import (
+ "io"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "go.uber.org/zap"
+
+ storagepb "github.com/siderolabs/discovery-service/api/storage"
+ "github.com/siderolabs/discovery-service/internal/state"
+ "github.com/siderolabs/discovery-service/internal/state/storage"
+)
+
+func BenchmarkExport(b *testing.B) {
+ logger := zap.NewNop()
+ state := buildState(b, buildTestSnapshot(b.N), logger)
+ storage := storage.New("", state, logger)
+
+ b.ReportAllocs()
+ b.ResetTimer()
+
+ _, err := storage.Export(io.Discard)
+ require.NoError(b, err)
+}
+
+func testBenchmarkAllocs(t *testing.T, f func(b *testing.B), threshold int64) {
+ res := testing.Benchmark(f)
+
+ allocs := res.AllocsPerOp()
+ if allocs > threshold {
+ t.Fatalf("Expected AllocsPerOp <= %d, got %d", threshold, allocs)
+ }
+}
+
+func TestBenchmarkExportAllocs(t *testing.T) {
+ testBenchmarkAllocs(t, BenchmarkExport, 0)
+}
+
+func buildState(tb testing.TB, data *storagepb.StateSnapshot, logger *zap.Logger) *state.State {
+ i := 0
+ state := state.NewState(logger)
+
+ err := state.ImportClusterSnapshots(func() (*storagepb.ClusterSnapshot, bool, error) {
+ if i >= len(data.Clusters) {
+ return nil, false, nil
+ }
+
+ clusterSnapshot := data.Clusters[i]
+ i++
+
+ return clusterSnapshot, true, nil
+ })
+ require.NoError(tb, err)
+
+ return state
+}
diff --git a/internal/state/storage/storage_test.go b/internal/state/storage/storage_test.go
new file mode 100644
index 0000000..78c243a
--- /dev/null
+++ b/internal/state/storage/storage_test.go
@@ -0,0 +1,398 @@
+// Copyright (c) 2024 Sidero Labs, Inc.
+//
+// Use of this software is governed by the Business Source License
+// included in the LICENSE file.
+
+package storage_test
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "math"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/jonboulle/clockwork"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "go.uber.org/zap/zaptest"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ storagepb "github.com/siderolabs/discovery-service/api/storage"
+ "github.com/siderolabs/discovery-service/internal/state/storage"
+ "github.com/siderolabs/discovery-service/pkg/limits"
+)
+
+func TestExport(t *testing.T) {
+ t.Parallel()
+
+ for _, tc := range []struct { //nolint:govet
+ name string
+ snapshot *storagepb.StateSnapshot
+ }{
+ {
+ "empty state",
+ &storagepb.StateSnapshot{},
+ },
+ {
+ "small state",
+ &storagepb.StateSnapshot{Clusters: []*storagepb.ClusterSnapshot{{Id: "a"}, {Id: "b"}}},
+ },
+ {
+ "large state",
+ buildTestSnapshot(100),
+ },
+ } {
+ t.Run(tc.name, func(t *testing.T) {
+ t.Parallel()
+
+ snapshot := buildTestSnapshot(10)
+ tempDir := t.TempDir()
+ path := filepath.Join(tempDir, "test.binpb")
+ state := &mockSnapshotter{data: snapshot}
+ logger := zaptest.NewLogger(t)
+
+ stateStorage := storage.New(path, state, logger)
+
+ var buffer bytes.Buffer
+
+ exportStats, err := stateStorage.Export(&buffer)
+ require.NoError(t, err)
+
+ assert.Equal(t, statsForSnapshot(snapshot), exportStats)
+
+ expected, err := snapshot.MarshalVT()
+ require.NoError(t, err)
+
+ require.Equal(t, expected, buffer.Bytes())
+ })
+ }
+}
+
+func TestImport(t *testing.T) {
+ t.Parallel()
+
+ for _, tc := range []struct { //nolint:govet
+ name string
+ snapshot *storagepb.StateSnapshot
+ }{
+ {
+ "empty state",
+ &storagepb.StateSnapshot{},
+ },
+ {
+ "small state",
+ &storagepb.StateSnapshot{Clusters: []*storagepb.ClusterSnapshot{{Id: "a"}, {Id: "b"}}},
+ },
+ {
+ "large state",
+ buildTestSnapshot(100),
+ },
+ } {
+ t.Run(tc.name, func(t *testing.T) {
+ t.Parallel()
+
+ path := filepath.Join(t.TempDir(), "test.binpb")
+ state := &mockSnapshotter{data: tc.snapshot}
+ logger := zaptest.NewLogger(t)
+
+ stateStorage := storage.New(path, state, logger)
+
+ data, err := tc.snapshot.MarshalVT()
+ require.NoError(t, err)
+
+ importStats, err := stateStorage.Import(bytes.NewReader(data))
+ require.NoError(t, err)
+
+ require.Equal(t, statsForSnapshot(tc.snapshot), importStats)
+
+ loads := state.getLoads()
+
+ require.Len(t, loads, 1)
+ require.True(t, loads[0].EqualVT(tc.snapshot))
+ })
+ }
+}
+
+func TestImportMaxSize(t *testing.T) {
+ t.Parallel()
+
+ cluster := buildMaxSizeCluster()
+ stateSnapshot := &storagepb.StateSnapshot{Clusters: []*storagepb.ClusterSnapshot{cluster}}
+ path := filepath.Join(t.TempDir(), "test.binpb")
+ state := &mockSnapshotter{data: stateSnapshot}
+ logger := zaptest.NewLogger(t)
+
+ stateStorage := storage.New(path, state, logger)
+
+ clusterData, err := cluster.MarshalVT()
+ require.NoError(t, err)
+
+ require.Equal(t, len(clusterData), storage.MaxClusterSize)
+
+ data, err := stateSnapshot.MarshalVT()
+ require.NoError(t, err)
+
+ t.Logf("max cluster marshaled size: %d", len(data))
+
+ _, err = stateStorage.Import(bytes.NewReader(data))
+ require.NoError(t, err)
+
+ // add one more affiliate to trigger an overflow
+ cluster.Affiliates = append(cluster.Affiliates, &storagepb.AffiliateSnapshot{
+ Id: "overflow",
+ })
+
+ data, err = stateSnapshot.MarshalVT()
+ require.NoError(t, err)
+
+ _, err = stateStorage.Import(bytes.NewReader(data))
+ require.ErrorIs(t, err, storage.ErrClusterSnapshotTooLarge)
+}
+
+func TestStorage(t *testing.T) {
+ t.Parallel()
+
+ snapshot := buildTestSnapshot(10)
+ tempDir := t.TempDir()
+ path := filepath.Join(tempDir, "test.binpb")
+ state := &mockSnapshotter{data: snapshot}
+ logger := zaptest.NewLogger(t)
+
+ stateStorage := storage.New(path, state, logger)
+
+ // test save
+
+ require.NoError(t, stateStorage.Save())
+
+ expectedData, err := snapshot.MarshalVT()
+ require.NoError(t, err)
+
+ actualData, err := os.ReadFile(path)
+ require.NoError(t, err)
+
+ require.Equal(t, expectedData, actualData)
+
+ // test load
+
+ require.NoError(t, stateStorage.Load())
+ require.Len(t, state.getLoads(), 1)
+ require.True(t, snapshot.EqualVT(state.getLoads()[0]))
+
+ // modify, save & load again to assert that the file content gets overwritten
+
+ snapshot.Clusters[1].Affiliates[0].Data = []byte("new aff1 data")
+
+ require.NoError(t, stateStorage.Save())
+ require.NoError(t, stateStorage.Load())
+ require.Len(t, state.getLoads(), 2)
+ require.True(t, snapshot.EqualVT(state.getLoads()[1]))
+}
+
+func TestSchedule(t *testing.T) {
+ t.Parallel()
+
+ clock := clockwork.NewFakeClock()
+ snapshot := buildTestSnapshot(10)
+ tempDir := t.TempDir()
+ path := filepath.Join(tempDir, "test.binpb")
+ state := &mockSnapshotter{data: snapshot}
+ logger := zaptest.NewLogger(t)
+
+ stateStorage := storage.New(path, state, logger)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ t.Cleanup(cancel)
+
+ // start the periodic storage and wait for it to block on the timer
+
+ errCh := make(chan error)
+
+ go func() {
+ errCh <- stateStorage.Start(ctx, clock, 10*time.Minute)
+ }()
+
+ require.NoError(t, clock.BlockUntilContext(ctx, 1))
+
+ // advance time to trigger the first snapshot and assert it
+
+ clock.Advance(13 * time.Minute)
+
+ require.EventuallyWithT(t, func(collect *assert.CollectT) {
+ assert.Equal(collect, 1, state.getSnapshots())
+ }, 2*time.Second, 100*time.Millisecond)
+
+ // advance time to trigger the second snapshot and assert it
+
+ clock.Advance(10 * time.Minute)
+ require.EventuallyWithT(t, func(collect *assert.CollectT) {
+ assert.Equal(collect, 2, state.getSnapshots())
+ }, 2*time.Second, 100*time.Millisecond)
+
+ // cancel the context to stop the storage loop and wait for it to exit
+ cancel()
+
+ require.NoError(t, <-errCh)
+
+ // assert that the state was saved on shutdown
+
+ require.EventuallyWithT(t, func(collect *assert.CollectT) {
+ assert.Equal(collect, 3, state.getSnapshots())
+ }, 2*time.Second, 100*time.Millisecond)
+}
+
+type mockSnapshotter struct {
+ data *storagepb.StateSnapshot
+ loads []*storagepb.StateSnapshot
+
+ snapshots int
+
+ lock sync.Mutex
+}
+
+func (m *mockSnapshotter) getSnapshots() int {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+
+ return m.snapshots
+}
+
+func (m *mockSnapshotter) getLoads() []*storagepb.StateSnapshot {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+
+ return append([]*storagepb.StateSnapshot(nil), m.loads...)
+}
+
+// ExportClusterSnapshots implements storage.Snapshotter interface.
+func (m *mockSnapshotter) ExportClusterSnapshots(f func(snapshot *storagepb.ClusterSnapshot) error) error {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+
+ m.snapshots++
+
+ for _, cluster := range m.data.Clusters {
+ if err := f(cluster); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// ImportClusterSnapshots implements storage.Snapshotter interface.
+func (m *mockSnapshotter) ImportClusterSnapshots(f func() (*storagepb.ClusterSnapshot, bool, error)) error {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+
+ var clusters []*storagepb.ClusterSnapshot
+
+ for {
+ cluster, ok, err := f()
+ if err != nil {
+ return err
+ }
+
+ if !ok {
+ break
+ }
+
+ clusters = append(clusters, cluster)
+ }
+
+ m.loads = append(m.loads, &storagepb.StateSnapshot{Clusters: clusters})
+
+ return nil
+}
+
+func statsForSnapshot(snapshot *storagepb.StateSnapshot) storage.SnapshotStats {
+ numAffiliates := 0
+ numEndpoints := 0
+
+ for _, cluster := range snapshot.Clusters {
+ numAffiliates += len(cluster.Affiliates)
+
+ for _, affiliate := range cluster.Affiliates {
+ numEndpoints += len(affiliate.Endpoints)
+ }
+ }
+
+ return storage.SnapshotStats{
+ NumClusters: len(snapshot.Clusters),
+ NumAffiliates: numAffiliates,
+ NumEndpoints: numEndpoints,
+ Size: snapshot.SizeVT(),
+ }
+}
+
+func buildTestSnapshot(numClusters int) *storagepb.StateSnapshot {
+ clusters := make([]*storagepb.ClusterSnapshot, 0, numClusters)
+
+ for i := range numClusters {
+ affiliates := make([]*storagepb.AffiliateSnapshot, 0, 5)
+
+ for j := range 5 {
+ affiliates = append(affiliates, &storagepb.AffiliateSnapshot{
+ Id: fmt.Sprintf("aff%d", j),
+ Expiration: timestamppb.New(time.Now().Add(time.Hour)),
+ Data: []byte(fmt.Sprintf("aff%d data", j)),
+ })
+ }
+
+ if i%2 == 0 {
+ affiliates[0].Endpoints = []*storagepb.EndpointSnapshot{
+ {
+ Expiration: timestamppb.New(time.Now().Add(time.Hour)),
+ Data: []byte(fmt.Sprintf("endpoint%d data", i)),
+ },
+ }
+ }
+
+ clusters = append(clusters, &storagepb.ClusterSnapshot{
+ Id: fmt.Sprintf("cluster%d", i),
+ Affiliates: affiliates,
+ })
+ }
+
+ return &storagepb.StateSnapshot{
+ Clusters: clusters,
+ }
+}
+
+// buildMaxSizeCluster creates a cluster snapshot with the maximum possible marshaled size within the limits of the discovery service.
+func buildMaxSizeCluster() *storagepb.ClusterSnapshot {
+ largestTTL := ×tamppb.Timestamp{
+ Seconds: math.MinInt64,
+ Nanos: math.MinInt32,
+ } // the timestamp with the maximum possible marshaled size
+
+ affiliates := make([]*storagepb.AffiliateSnapshot, 0, limits.ClusterAffiliatesMax)
+
+ for range limits.ClusterAffiliatesMax {
+ endpoints := make([]*storagepb.EndpointSnapshot, 0, limits.AffiliateEndpointsMax)
+
+ for range limits.AffiliateEndpointsMax {
+ endpoints = append(endpoints, &storagepb.EndpointSnapshot{
+ Expiration: largestTTL,
+ Data: bytes.Repeat([]byte("a"), limits.AffiliateDataMax),
+ })
+ }
+
+ affiliates = append(affiliates, &storagepb.AffiliateSnapshot{
+ Id: strings.Repeat("a", limits.AffiliateIDMax),
+ Expiration: largestTTL,
+ Data: bytes.Repeat([]byte("a"), limits.AffiliateDataMax),
+ Endpoints: endpoints,
+ })
+ }
+
+ return &storagepb.ClusterSnapshot{
+ Id: strings.Repeat("c", limits.ClusterIDMax),
+ Affiliates: affiliates,
+ }
+}
diff --git a/pkg/server/client_test.go b/pkg/server/client_test.go
index 6b6892a..697efd0 100644
--- a/pkg/server/client_test.go
+++ b/pkg/server/client_test.go
@@ -406,8 +406,6 @@ func clusterSimulator(t *testing.T, endpoint string, logger *zap.Logger, numAffi
eg, ctx := errgroup.WithContext(ctx)
for i := range affiliates {
- i := i
-
eg.Go(func() error {
return affiliates[i].Run(ctx, logger, notifyCh[i])
})
@@ -446,7 +444,7 @@ func clusterSimulator(t *testing.T, endpoint string, logger *zap.Logger, numAffi
expected := make(map[int]struct{})
- for i := 0; i < numAffiliates; i++ {
+ for i := range numAffiliates {
if i != affiliateID {
expected[i] = struct{}{}
}
@@ -493,7 +491,7 @@ func clusterSimulator(t *testing.T, endpoint string, logger *zap.Logger, numAffi
// eventually all affiliates should see discovered state
const NumAttempts = 50 // 50 * 100ms = 5s
- for j := 0; j < NumAttempts; j++ {
+ for j := range NumAttempts {
matches := true
for i := range affiliates {
diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go
index 90dc673..89c24f4 100644
--- a/pkg/server/server_test.go
+++ b/pkg/server/server_test.go
@@ -407,7 +407,7 @@ func TestValidation(t *testing.T) {
t.Run("AffiliateUpdateTooMany", func(t *testing.T) {
t.Parallel()
- for i := 0; i < limits.ClusterAffiliatesMax; i++ {
+ for i := range limits.ClusterAffiliatesMax {
_, err := client.AffiliateUpdate(ctx, &pb.AffiliateUpdateRequest{
ClusterId: "fatcluster",
AffiliateId: fmt.Sprintf("af%d", i),
@@ -428,7 +428,7 @@ func TestValidation(t *testing.T) {
t.Run("AffiliateUpdateTooManyEndpoints", func(t *testing.T) {
t.Parallel()
- for i := 0; i < limits.AffiliateEndpointsMax; i++ {
+ for i := range limits.AffiliateEndpointsMax {
_, err := client.AffiliateUpdate(ctx, &pb.AffiliateUpdateRequest{
ClusterId: "smallcluster",
AffiliateId: "af",
@@ -514,7 +514,7 @@ func testHitRateLimit(client pb.ClusterClient, ip string) func(t *testing.T) {
ctx = metadata.AppendToOutgoingContext(ctx, "X-Real-IP", ip)
- for i := 0; i < limits.IPRateBurstSizeMax; i++ {
+ for range limits.IPRateBurstSizeMax {
_, err := client.Hello(ctx, &pb.HelloRequest{
ClusterId: "fake",
ClientVersion: "v0.12.0",
diff --git a/pkg/server/version_test.go b/pkg/server/version_test.go
index 003d522..3f1554c 100644
--- a/pkg/server/version_test.go
+++ b/pkg/server/version_test.go
@@ -22,8 +22,6 @@ func TestParseVersion(t *testing.T) {
"v0.14.0-alpha.0-7-gf7d9f211": "v0.14.0-alpha.0-dev",
"v0.14.0-alpha.0-7-gf7d9f211-dirty": "v0.14.0-alpha.0-dev",
} {
- v, expected := v, expected
-
t.Run(v, func(t *testing.T) {
t.Parallel()