diff --git a/.dockerignore b/.dockerignore index 8e27148..81972d2 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,8 +1,9 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2023-01-30T15:45:28Z by kres latest. +# Generated on 2024-05-09T09:59:51Z by kres 1e986af. * +!api !cmd !internal !pkg diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 9782318..2d70afe 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -1,6 +1,6 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-03-12T11:34:19Z by kres latest. +# Generated on 2024-05-21T10:07:54Z by kres 0290180. name: default concurrency: @@ -29,15 +29,6 @@ jobs: - self-hosted - generic if: (!startsWith(github.head_ref, 'renovate/') && !startsWith(github.head_ref, 'dependabot/')) - services: - buildkitd: - image: moby/buildkit:v0.12.5 - options: --privileged - ports: - - 1234:1234 - volumes: - - /var/lib/buildkit/${{ github.repository }}:/var/lib/buildkit - - /usr/etc/buildkit/buildkitd.toml:/etc/buildkit/buildkitd.toml steps: - name: checkout uses: actions/checkout@v4 @@ -45,11 +36,12 @@ jobs: run: | git fetch --prune --unshallow - name: Set up Docker Buildx + id: setup-buildx uses: docker/setup-buildx-action@v3 with: driver: remote - endpoint: tcp://127.0.0.1:1234 - timeout-minutes: 1 + endpoint: tcp://buildkit-amd64.ci.svc.cluster.local:1234 + timeout-minutes: 10 - name: base run: | make base @@ -60,8 +52,11 @@ jobs: run: | make unit-tests-race - name: coverage - run: | - make coverage + uses: codecov/codecov-action@v4 + with: + files: _out/coverage-unit-tests.txt + token: ${{ secrets.CODECOV_TOKEN }} + timeout-minutes: 3 - name: discovery-service run: | make discovery-service @@ -86,17 +81,20 @@ jobs: run: | make image-discovery-service - name: push-discovery-service-latest - if: github.event_name != 'pull_request' + if: github.event_name != 'pull_request' && github.ref == 'refs/heads/main' env: PLATFORM: linux/amd64,linux/arm64 PUSH: "true" run: | - make image-discovery-service TAG=latest + make image-discovery-service IMAGE_TAG=latest + - name: snapshot-decoder + run: | + make snapshot-decoder - name: Generate Checksums if: startsWith(github.ref, 'refs/tags/') run: | - sha256sum _out/discovery-service-* > _out/sha256sum.txt - sha512sum _out/discovery-service-* > _out/sha512sum.txt + sha256sum _out/discovery-service-* _out/snapshot-decoder-* > _out/sha256sum.txt + sha512sum _out/discovery-service-* _out/snapshot-decoder-* > _out/sha512sum.txt - name: release-notes if: startsWith(github.ref, 'refs/tags/') run: | @@ -109,4 +107,5 @@ jobs: draft: "true" files: |- _out/discovery-service-* + _out/snapshot-decoder-* _out/sha*.txt diff --git a/.golangci.yml b/.golangci.yml index f20e168..13ce6b9 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,21 +1,20 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-03-12T11:34:19Z by kres latest. +# Generated on 2024-05-19T19:39:10Z by kres dccd292. # options for analysis running run: timeout: 10m issues-exit-code: 1 tests: true - build-tags: [] - skip-dirs: [] - skip-dirs-use-default: true - skip-files: [] + build-tags: [ ] modules-download-mode: readonly # output configuration options output: - format: colored-line-number + formats: + - format: colored-line-number + path: stdout print-issued-lines: true print-linter-name: true uniq-by-line: true @@ -32,54 +31,35 @@ linters-settings: check-blank: true exhaustive: default-signifies-exhaustive: false - funlen: - lines: 60 - statements: 40 gci: - local-prefixes: github.com/siderolabs/discovery-service/ + sections: + - standard # Standard section: captures all standard packages. + - default # Default section: contains all imports that could not be matched to another section type. + - localmodule # Imports from the same module. gocognit: min-complexity: 30 - ireturn: - allow: - - anon - - error - - empty - - stdlib - - github.com\/talos-systems\/kres\/internal\/dag.Node nestif: min-complexity: 5 goconst: min-len: 3 min-occurrences: 3 gocritic: - disabled-checks: [] + disabled-checks: [ ] gocyclo: min-complexity: 20 godot: - check-all: false - godox: - keywords: # default keywords are TODO, BUG, and FIXME, these can be overwritten by this setting - - NOTE - - OPTIMIZE # marks code that should be optimized before merging - - HACK # marks hack-arounds that should be removed before merging + scope: declarations gofmt: simplify: true - goimports: - local-prefixes: github.com/siderolabs/discovery-service/ - golint: - min-confidence: 0.8 - gomnd: - settings: {} - gomodguard: {} + gomodguard: { } govet: - check-shadowing: true enable-all: true lll: line-length: 200 tab-width: 4 misspell: locale: US - ignore-words: [] + ignore-words: [ ] nakedret: max-func-lines: 30 prealloc: @@ -88,16 +68,15 @@ linters-settings: for-loops: false # Report preallocation suggestions on for loops, false by default nolintlint: allow-unused: false - allow-leading-space: false - allow-no-explanation: [] + allow-no-explanation: [ ] require-explanation: false require-specific: true - rowserrcheck: {} - testpackage: {} + rowserrcheck: { } + testpackage: { } unparam: check-exported: false unused: - check-exported: false + local-variables-are-used: false whitespace: multi-if: false # Enforces newlines (or comments) after every multi-line if statement multi-func: false # Enforces newlines (or comments) after every multi-line function signature @@ -113,8 +92,8 @@ linters-settings: gofumpt: extra-rules: false cyclop: - # the maximal code complexity to report - max-complexity: 20 + # the maximal code complexity to report + max-complexity: 20 # depguard: # Main: # deny: @@ -125,48 +104,50 @@ linters: disable-all: false fast: false disable: - - exhaustruct - exhaustivestruct + - exhaustruct + - err113 - forbidigo - funlen - - gas - gochecknoglobals - gochecknoinits - godox - - goerr113 - gomnd - gomoddirectives + - gosec + - inamedparam - ireturn + - mnd - nestif - nonamedreturns - nosnakecase - paralleltest + - tagalign - tagliatelle - thelper - typecheck - varnamelen - wrapcheck - depguard # Disabled because starting with golangci-lint 1.53.0 it doesn't allow denylist alone anymore - - tagalign - - inamedparam - testifylint # complains about our assert recorder and has a number of false positives for assert.Greater(t, thing, 1) - protogetter # complains about us using Value field on typed spec, instead of GetValue which has a different signature - perfsprint # complains about us using fmt.Sprintf in non-performance critical code, updating just kres took too long # abandoned linters for which golangci shows the warning that the repo is archived by the owner + - deadcode + - golint + - ifshort - interfacer - maligned - - golint - scopelint - - varcheck - - deadcode - structcheck - - ifshort + - varcheck # disabled as it seems to be broken - goes into imported libraries and reports issues there - musttag + - goimports # same as gci issues: - exclude: [] - exclude-rules: [] + exclude: [ ] + exclude-rules: [ ] exclude-use-default: false exclude-case-sensitive: false max-issues-per-linter: 10 diff --git a/.kres.yaml b/.kres.yaml index 3900766..aaec2d2 100644 --- a/.kres.yaml +++ b/.kres.yaml @@ -1,4 +1,9 @@ --- +kind: auto.CommandConfig +name: snapshot-decoder +spec: + disableImage: true +--- kind: common.Image name: image-discovery-service spec: @@ -15,6 +20,15 @@ spec: GOOS: linux GOARCH: arm64 --- +kind: golang.Generate +spec: + baseSpecPath: /api + vtProtobufEnabled: true + specs: + - source: api/storage/storage.proto + subdirectory: storage + genGateway: false +--- kind: service.CodeCov spec: targetThreshold: 30 diff --git a/Dockerfile b/Dockerfile index fbced5d..04d85ae 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,27 +1,28 @@ -# syntax = docker/dockerfile-upstream:1.7.0-labs +# syntax = docker/dockerfile-upstream:1.7.1-labs # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-03-12T11:34:19Z by kres latest. +# Generated on 2024-05-21T10:07:54Z by kres 0290180. ARG TOOLCHAIN -# cleaned up specs and compiled versions -FROM scratch AS generate +FROM ghcr.io/siderolabs/ca-certificates:v1.7.0 AS image-ca-certificates -FROM ghcr.io/siderolabs/ca-certificates:v1.6.0 AS image-ca-certificates - -FROM ghcr.io/siderolabs/fhs:v1.6.0 AS image-fhs +FROM ghcr.io/siderolabs/fhs:v1.7.0 AS image-fhs # runs markdownlint -FROM docker.io/node:21.7.1-alpine3.19 AS lint-markdown +FROM docker.io/node:22.2.0-alpine3.19 AS lint-markdown WORKDIR /src -RUN npm i -g markdownlint-cli@0.39.0 +RUN npm i -g markdownlint-cli@0.40.0 RUN npm i sentences-per-line@0.2.1 COPY .markdownlint.json . COPY ./README.md ./README.md RUN markdownlint --ignore "CHANGELOG.md" --ignore "**/node_modules/**" --ignore '**/hack/chglog/**' --rules node_modules/sentences-per-line/index.js . +# collects proto specs +FROM scratch AS proto-specs +ADD api/storage/storage.proto /api/storage/ + # base toolchain image FROM ${TOOLCHAIN} AS toolchain RUN apk --update --no-cache add bash curl build-base protoc protobuf-dev @@ -36,6 +37,21 @@ ENV GOTOOLCHAIN ${GOTOOLCHAIN} ARG GOEXPERIMENT ENV GOEXPERIMENT ${GOEXPERIMENT} ENV GOPATH /go +ARG PROTOBUF_GO_VERSION +RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install google.golang.org/protobuf/cmd/protoc-gen-go@v${PROTOBUF_GO_VERSION} +RUN mv /go/bin/protoc-gen-go /bin +ARG GRPC_GO_VERSION +RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v${GRPC_GO_VERSION} +RUN mv /go/bin/protoc-gen-go-grpc /bin +ARG GRPC_GATEWAY_VERSION +RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@v${GRPC_GATEWAY_VERSION} +RUN mv /go/bin/protoc-gen-grpc-gateway /bin +ARG GOIMPORTS_VERSION +RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install golang.org/x/tools/cmd/goimports@v${GOIMPORTS_VERSION} +RUN mv /go/bin/goimports /bin +ARG VTPROTOBUF_VERSION +RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install github.com/planetscale/vtprotobuf/cmd/protoc-gen-go-vtproto@v${VTPROTOBUF_VERSION} +RUN mv /go/bin/protoc-gen-go-vtproto /bin ARG DEEPCOPY_VERSION RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install github.com/siderolabs/deep-copy@${DEEPCOPY_VERSION} \ && mv /go/bin/deep-copy /bin/deep-copy @@ -44,9 +60,6 @@ RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/g && mv /go/bin/golangci-lint /bin/golangci-lint RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install golang.org/x/vuln/cmd/govulncheck@latest \ && mv /go/bin/govulncheck /bin/govulncheck -ARG GOIMPORTS_VERSION -RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install golang.org/x/tools/cmd/goimports@${GOIMPORTS_VERSION} \ - && mv /go/bin/goimports /bin/goimports ARG GOFUMPT_VERSION RUN go install mvdan.cc/gofumpt@${GOFUMPT_VERSION} \ && mv /go/bin/gofumpt /bin/gofumpt @@ -59,40 +72,30 @@ COPY go.sum go.sum RUN cd . RUN --mount=type=cache,target=/go/pkg go mod download RUN --mount=type=cache,target=/go/pkg go mod verify +COPY ./api ./api COPY ./cmd ./cmd COPY ./internal ./internal COPY ./pkg ./pkg RUN --mount=type=cache,target=/go/pkg go list -mod=readonly all >/dev/null -# builds discovery-service-linux-amd64 -FROM base AS discovery-service-linux-amd64-build -COPY --from=generate / / -WORKDIR /src/cmd/discovery-service -ARG GO_BUILDFLAGS -ARG GO_LDFLAGS -RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOARCH=amd64 GOOS=linux go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS}" -o /discovery-service-linux-amd64 - -# builds discovery-service-linux-arm64 -FROM base AS discovery-service-linux-arm64-build -COPY --from=generate / / -WORKDIR /src/cmd/discovery-service -ARG GO_BUILDFLAGS -ARG GO_LDFLAGS -RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOARCH=arm64 GOOS=linux go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS}" -o /discovery-service-linux-arm64 +# runs protobuf compiler +FROM tools AS proto-compile +COPY --from=proto-specs / / +RUN protoc -I/api --go_out=paths=source_relative:/api --go-grpc_out=paths=source_relative:/api --go-vtproto_out=paths=source_relative:/api --go-vtproto_opt=features=marshal+unmarshal+size+equal+clone /api/storage/storage.proto +RUN rm /api/storage/storage.proto +RUN goimports -w -local github.com/siderolabs/discovery-service /api +RUN gofumpt -w /api # runs gofumpt FROM base AS lint-gofumpt RUN FILES="$(gofumpt -l .)" && test -z "${FILES}" || (echo -e "Source code is not formatted with 'gofumpt -w .':\n${FILES}"; exit 1) -# runs goimports -FROM base AS lint-goimports -RUN FILES="$(goimports -l -local github.com/siderolabs/discovery-service/ .)" && test -z "${FILES}" || (echo -e "Source code is not formatted with 'goimports -w -local github.com/siderolabs/discovery-service/ .':\n${FILES}"; exit 1) - # runs golangci-lint FROM base AS lint-golangci-lint WORKDIR /src COPY .golangci.yml . ENV GOGC 50 +RUN golangci-lint config verify --config .golangci.yml RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/root/.cache/golangci-lint --mount=type=cache,target=/go/pkg golangci-lint run --config .golangci.yml # runs govulncheck @@ -112,14 +115,56 @@ WORKDIR /src ARG TESTPKGS RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg --mount=type=cache,target=/tmp go test -v -covermode=atomic -coverprofile=coverage.txt -coverpkg=${TESTPKGS} -count 1 ${TESTPKGS} +# cleaned up specs and compiled versions +FROM scratch AS generate +COPY --from=proto-compile /api/ /api/ + +FROM scratch AS unit-tests +COPY --from=unit-tests-run /src/coverage.txt /coverage-unit-tests.txt + +# builds discovery-service-linux-amd64 +FROM base AS discovery-service-linux-amd64-build +COPY --from=generate / / +WORKDIR /src/cmd/discovery-service +ARG GO_BUILDFLAGS +ARG GO_LDFLAGS +RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOARCH=amd64 GOOS=linux go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS}" -o /discovery-service-linux-amd64 + +# builds discovery-service-linux-arm64 +FROM base AS discovery-service-linux-arm64-build +COPY --from=generate / / +WORKDIR /src/cmd/discovery-service +ARG GO_BUILDFLAGS +ARG GO_LDFLAGS +RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOARCH=arm64 GOOS=linux go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS}" -o /discovery-service-linux-arm64 + +# builds snapshot-decoder-linux-amd64 +FROM base AS snapshot-decoder-linux-amd64-build +COPY --from=generate / / +WORKDIR /src/cmd/snapshot-decoder +ARG GO_BUILDFLAGS +ARG GO_LDFLAGS +RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOARCH=amd64 GOOS=linux go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS}" -o /snapshot-decoder-linux-amd64 + +# builds snapshot-decoder-linux-arm64 +FROM base AS snapshot-decoder-linux-arm64-build +COPY --from=generate / / +WORKDIR /src/cmd/snapshot-decoder +ARG GO_BUILDFLAGS +ARG GO_LDFLAGS +RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg GOARCH=arm64 GOOS=linux go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS}" -o /snapshot-decoder-linux-arm64 + FROM scratch AS discovery-service-linux-amd64 COPY --from=discovery-service-linux-amd64-build /discovery-service-linux-amd64 /discovery-service-linux-amd64 FROM scratch AS discovery-service-linux-arm64 COPY --from=discovery-service-linux-arm64-build /discovery-service-linux-arm64 /discovery-service-linux-arm64 -FROM scratch AS unit-tests -COPY --from=unit-tests-run /src/coverage.txt /coverage-unit-tests.txt +FROM scratch AS snapshot-decoder-linux-amd64 +COPY --from=snapshot-decoder-linux-amd64-build /snapshot-decoder-linux-amd64 /snapshot-decoder-linux-amd64 + +FROM scratch AS snapshot-decoder-linux-arm64 +COPY --from=snapshot-decoder-linux-arm64-build /snapshot-decoder-linux-arm64 /snapshot-decoder-linux-arm64 FROM discovery-service-linux-${TARGETARCH} AS discovery-service @@ -127,6 +172,12 @@ FROM scratch AS discovery-service-all COPY --from=discovery-service-linux-amd64 / / COPY --from=discovery-service-linux-arm64 / / +FROM snapshot-decoder-linux-${TARGETARCH} AS snapshot-decoder + +FROM scratch AS snapshot-decoder-all +COPY --from=snapshot-decoder-linux-amd64 / / +COPY --from=snapshot-decoder-linux-arm64 / / + FROM scratch AS image-discovery-service ARG TARGETARCH COPY --from=discovery-service discovery-service-linux-${TARGETARCH} /discovery-service diff --git a/Makefile b/Makefile index 5f44ed7..584c25c 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-03-12T11:34:19Z by kres latest. +# Generated on 2024-05-21T10:07:54Z by kres 0290180. # common variables @@ -9,20 +9,23 @@ TAG := $(shell git describe --tag --always --dirty --match v[0-9]\*) ABBREV_TAG := $(shell git describe --tags >/dev/null 2>/dev/null && git describe --tag --always --match v[0-9]\* --abbrev=0 || echo 'undefined') BRANCH := $(shell git rev-parse --abbrev-ref HEAD) ARTIFACTS := _out +IMAGE_TAG ?= $(TAG) +OPERATING_SYSTEM := $(shell uname -s | tr '[:upper:]' '[:lower:]') +GOARCH := $(shell uname -m | sed 's/x86_64/amd64/' | sed 's/aarch64/arm64/') WITH_DEBUG ?= false WITH_RACE ?= false REGISTRY ?= ghcr.io USERNAME ?= siderolabs REGISTRY_AND_USERNAME ?= $(REGISTRY)/$(USERNAME) -PROTOBUF_GO_VERSION ?= 1.33.0 +PROTOBUF_GO_VERSION ?= 1.34.1 GRPC_GO_VERSION ?= 1.3.0 -GRPC_GATEWAY_VERSION ?= 2.19.1 +GRPC_GATEWAY_VERSION ?= 2.20.0 VTPROTOBUF_VERSION ?= 0.6.0 +GOIMPORTS_VERSION ?= 0.21.0 DEEPCOPY_VERSION ?= v0.5.6 -GOLANGCILINT_VERSION ?= v1.56.2 +GOLANGCILINT_VERSION ?= v1.58.2 GOFUMPT_VERSION ?= v0.6.0 -GO_VERSION ?= 1.22.1 -GOIMPORTS_VERSION ?= v0.19.0 +GO_VERSION ?= 1.22.3 GO_BUILDFLAGS ?= GO_LDFLAGS ?= CGO_ENABLED ?= 0 @@ -59,9 +62,9 @@ COMMON_ARGS += --build-arg=PROTOBUF_GO_VERSION="$(PROTOBUF_GO_VERSION)" COMMON_ARGS += --build-arg=GRPC_GO_VERSION="$(GRPC_GO_VERSION)" COMMON_ARGS += --build-arg=GRPC_GATEWAY_VERSION="$(GRPC_GATEWAY_VERSION)" COMMON_ARGS += --build-arg=VTPROTOBUF_VERSION="$(VTPROTOBUF_VERSION)" +COMMON_ARGS += --build-arg=GOIMPORTS_VERSION="$(GOIMPORTS_VERSION)" COMMON_ARGS += --build-arg=DEEPCOPY_VERSION="$(DEEPCOPY_VERSION)" COMMON_ARGS += --build-arg=GOLANGCILINT_VERSION="$(GOLANGCILINT_VERSION)" -COMMON_ARGS += --build-arg=GOIMPORTS_VERSION="$(GOIMPORTS_VERSION)" COMMON_ARGS += --build-arg=GOFUMPT_VERSION="$(GOFUMPT_VERSION)" COMMON_ARGS += --build-arg=TESTPKGS="$(TESTPKGS)" TOOLCHAIN ?= docker.io/golang:1.22-alpine @@ -110,7 +113,7 @@ If you already have a compatible builder instance, you may use that instead. ## Artifacts All artifacts will be output to ./$(ARTIFACTS). Images will be tagged with the -registry "$(REGISTRY)", username "$(USERNAME)", and a dynamic tag (e.g. $(IMAGE):$(TAG)). +registry "$(REGISTRY)", username "$(USERNAME)", and a dynamic tag (e.g. $(IMAGE):$(IMAGE_TAG)). The registry and username can be overridden by exporting REGISTRY, and USERNAME respectively. @@ -128,7 +131,10 @@ else GO_LDFLAGS += -s endif -all: unit-tests discovery-service image-discovery-service lint +all: unit-tests discovery-service image-discovery-service snapshot-decoder lint + +$(ARTIFACTS): ## Creates artifacts directory. + @mkdir -p $(ARTIFACTS) .PHONY: clean clean: ## Cleans up all artifacts. @@ -140,6 +146,9 @@ target-%: ## Builds the specified target defined in the Dockerfile. The build r local-%: ## Builds the specified target defined in the Dockerfile using the local output type. The build result will be output to the specified local destination. @$(MAKE) target-$* TARGET_ARGS="--output=type=local,dest=$(DEST) $(TARGET_ARGS)" +generate: ## Generate .proto definitions. + @$(MAKE) local-$@ DEST=./ + lint-golangci-lint: ## Runs golangci-lint linter. @$(MAKE) target-$@ @@ -157,9 +166,6 @@ fmt: ## Formats the source code lint-govulncheck: ## Runs govulncheck linter. @$(MAKE) target-$@ -lint-goimports: ## Runs goimports linter. - @$(MAKE) target-$@ - .PHONY: base base: ## Prepare base toolchain @$(MAKE) target-$@ @@ -172,10 +178,6 @@ unit-tests: ## Performs unit tests unit-tests-race: ## Performs unit tests with race detection enabled. @$(MAKE) target-$@ -.PHONY: coverage -coverage: ## Upload coverage data to codecov.io. - bash -c "bash <(curl -s https://codecov.io/bash) -f $(ARTIFACTS)/coverage-unit-tests.txt -X fix" - .PHONY: $(ARTIFACTS)/discovery-service-linux-amd64 $(ARTIFACTS)/discovery-service-linux-amd64: @$(MAKE) local-discovery-service-linux-amd64 DEST=$(ARTIFACTS) @@ -198,11 +200,28 @@ lint-markdown: ## Runs markdownlint. @$(MAKE) target-$@ .PHONY: lint -lint: lint-golangci-lint lint-gofumpt lint-govulncheck lint-goimports lint-markdown ## Run all linters for the project. +lint: lint-golangci-lint lint-gofumpt lint-govulncheck lint-markdown ## Run all linters for the project. .PHONY: image-discovery-service image-discovery-service: ## Builds image for discovery-service. - @$(MAKE) target-$@ TARGET_ARGS="--tag=$(REGISTRY)/$(USERNAME)/discovery-service:$(TAG)" + @$(MAKE) target-$@ TARGET_ARGS="--tag=$(REGISTRY)/$(USERNAME)/discovery-service:$(IMAGE_TAG)" + +.PHONY: $(ARTIFACTS)/snapshot-decoder-linux-amd64 +$(ARTIFACTS)/snapshot-decoder-linux-amd64: + @$(MAKE) local-snapshot-decoder-linux-amd64 DEST=$(ARTIFACTS) + +.PHONY: snapshot-decoder-linux-amd64 +snapshot-decoder-linux-amd64: $(ARTIFACTS)/snapshot-decoder-linux-amd64 ## Builds executable for snapshot-decoder-linux-amd64. + +.PHONY: $(ARTIFACTS)/snapshot-decoder-linux-arm64 +$(ARTIFACTS)/snapshot-decoder-linux-arm64: + @$(MAKE) local-snapshot-decoder-linux-arm64 DEST=$(ARTIFACTS) + +.PHONY: snapshot-decoder-linux-arm64 +snapshot-decoder-linux-arm64: $(ARTIFACTS)/snapshot-decoder-linux-arm64 ## Builds executable for snapshot-decoder-linux-arm64. + +.PHONY: snapshot-decoder +snapshot-decoder: snapshot-decoder-linux-amd64 snapshot-decoder-linux-arm64 ## Builds executables for snapshot-decoder. .PHONY: rekres rekres: @@ -215,8 +234,7 @@ help: ## This help menu. @grep -E '^[a-zA-Z%_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' .PHONY: release-notes -release-notes: - mkdir -p $(ARTIFACTS) +release-notes: $(ARTIFACTS) @ARTIFACTS=$(ARTIFACTS) ./hack/release.sh $@ $(ARTIFACTS)/RELEASE_NOTES.md $(TAG) .PHONY: conformance diff --git a/api/storage/storage.pb.go b/api/storage/storage.pb.go new file mode 100644 index 0000000..1d7ecce --- /dev/null +++ b/api/storage/storage.pb.go @@ -0,0 +1,408 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v4.24.4 +// source: storage/storage.proto + +package storagepb + +import ( + reflect "reflect" + sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// StateSnapshot is the snapshot of the discovery service state. +type StateSnapshot struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Clusters []*ClusterSnapshot `protobuf:"bytes,1,rep,name=clusters,proto3" json:"clusters,omitempty"` +} + +func (x *StateSnapshot) Reset() { + *x = StateSnapshot{} + if protoimpl.UnsafeEnabled { + mi := &file_storage_storage_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StateSnapshot) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StateSnapshot) ProtoMessage() {} + +func (x *StateSnapshot) ProtoReflect() protoreflect.Message { + mi := &file_storage_storage_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StateSnapshot.ProtoReflect.Descriptor instead. +func (*StateSnapshot) Descriptor() ([]byte, []int) { + return file_storage_storage_proto_rawDescGZIP(), []int{0} +} + +func (x *StateSnapshot) GetClusters() []*ClusterSnapshot { + if x != nil { + return x.Clusters + } + return nil +} + +// ClusterSnapshot is the snapshot of a cluster with a set of affiliates. +type ClusterSnapshot struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Affiliates []*AffiliateSnapshot `protobuf:"bytes,2,rep,name=affiliates,proto3" json:"affiliates,omitempty"` +} + +func (x *ClusterSnapshot) Reset() { + *x = ClusterSnapshot{} + if protoimpl.UnsafeEnabled { + mi := &file_storage_storage_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ClusterSnapshot) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ClusterSnapshot) ProtoMessage() {} + +func (x *ClusterSnapshot) ProtoReflect() protoreflect.Message { + mi := &file_storage_storage_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ClusterSnapshot.ProtoReflect.Descriptor instead. +func (*ClusterSnapshot) Descriptor() ([]byte, []int) { + return file_storage_storage_proto_rawDescGZIP(), []int{1} +} + +func (x *ClusterSnapshot) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *ClusterSnapshot) GetAffiliates() []*AffiliateSnapshot { + if x != nil { + return x.Affiliates + } + return nil +} + +// AffiliateSnapshot is the snapshot of an affiliate that is part of a cluster with a set of endpoints. +type AffiliateSnapshot struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Expiration *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=expiration,proto3" json:"expiration,omitempty"` + Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` + Endpoints []*EndpointSnapshot `protobuf:"bytes,4,rep,name=endpoints,proto3" json:"endpoints,omitempty"` +} + +func (x *AffiliateSnapshot) Reset() { + *x = AffiliateSnapshot{} + if protoimpl.UnsafeEnabled { + mi := &file_storage_storage_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *AffiliateSnapshot) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AffiliateSnapshot) ProtoMessage() {} + +func (x *AffiliateSnapshot) ProtoReflect() protoreflect.Message { + mi := &file_storage_storage_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AffiliateSnapshot.ProtoReflect.Descriptor instead. +func (*AffiliateSnapshot) Descriptor() ([]byte, []int) { + return file_storage_storage_proto_rawDescGZIP(), []int{2} +} + +func (x *AffiliateSnapshot) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *AffiliateSnapshot) GetExpiration() *timestamppb.Timestamp { + if x != nil { + return x.Expiration + } + return nil +} + +func (x *AffiliateSnapshot) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +func (x *AffiliateSnapshot) GetEndpoints() []*EndpointSnapshot { + if x != nil { + return x.Endpoints + } + return nil +} + +// EndpointSnapshot is the snapshot of an endpoint of an affiliate. +type EndpointSnapshot struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Expiration *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=expiration,proto3" json:"expiration,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *EndpointSnapshot) Reset() { + *x = EndpointSnapshot{} + if protoimpl.UnsafeEnabled { + mi := &file_storage_storage_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *EndpointSnapshot) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EndpointSnapshot) ProtoMessage() {} + +func (x *EndpointSnapshot) ProtoReflect() protoreflect.Message { + mi := &file_storage_storage_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EndpointSnapshot.ProtoReflect.Descriptor instead. +func (*EndpointSnapshot) Descriptor() ([]byte, []int) { + return file_storage_storage_proto_rawDescGZIP(), []int{3} +} + +func (x *EndpointSnapshot) GetExpiration() *timestamppb.Timestamp { + if x != nil { + return x.Expiration + } + return nil +} + +func (x *EndpointSnapshot) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +var File_storage_storage_proto protoreflect.FileDescriptor + +var file_storage_storage_proto_rawDesc = []byte{ + 0x0a, 0x15, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x18, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, + 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, + 0x65, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x22, 0x56, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x74, 0x65, 0x53, 0x6e, 0x61, 0x70, 0x73, + 0x68, 0x6f, 0x74, 0x12, 0x45, 0x0a, 0x08, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x18, + 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64, + 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, + 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, + 0x52, 0x08, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x22, 0x6e, 0x0a, 0x0f, 0x43, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x0e, 0x0a, + 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x4b, 0x0a, + 0x0a, 0x61, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, + 0x0b, 0x32, 0x2b, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, + 0x76, 0x65, 0x72, 0x79, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e, 0x41, 0x66, 0x66, + 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x52, 0x0a, + 0x61, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x73, 0x22, 0xbd, 0x01, 0x0a, 0x11, 0x41, + 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, + 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, + 0x12, 0x3a, 0x0a, 0x0a, 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x52, 0x0a, 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, + 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x12, 0x48, 0x0a, 0x09, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x18, 0x04, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64, 0x69, 0x73, + 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e, 0x45, + 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x52, + 0x09, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x22, 0x62, 0x0a, 0x10, 0x45, 0x6e, + 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x3a, + 0x0a, 0x0a, 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0a, + 0x65, 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, + 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x42, 0x37, + 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x69, 0x64, + 0x65, 0x72, 0x6f, 0x6c, 0x61, 0x62, 0x73, 0x2f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, + 0x79, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, + 0x6f, 0x72, 0x61, 0x67, 0x65, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_storage_storage_proto_rawDescOnce sync.Once + file_storage_storage_proto_rawDescData = file_storage_storage_proto_rawDesc +) + +func file_storage_storage_proto_rawDescGZIP() []byte { + file_storage_storage_proto_rawDescOnce.Do(func() { + file_storage_storage_proto_rawDescData = protoimpl.X.CompressGZIP(file_storage_storage_proto_rawDescData) + }) + return file_storage_storage_proto_rawDescData +} + +var file_storage_storage_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_storage_storage_proto_goTypes = []interface{}{ + (*StateSnapshot)(nil), // 0: sidero.discovery.storage.StateSnapshot + (*ClusterSnapshot)(nil), // 1: sidero.discovery.storage.ClusterSnapshot + (*AffiliateSnapshot)(nil), // 2: sidero.discovery.storage.AffiliateSnapshot + (*EndpointSnapshot)(nil), // 3: sidero.discovery.storage.EndpointSnapshot + (*timestamppb.Timestamp)(nil), // 4: google.protobuf.Timestamp +} +var file_storage_storage_proto_depIdxs = []int32{ + 1, // 0: sidero.discovery.storage.StateSnapshot.clusters:type_name -> sidero.discovery.storage.ClusterSnapshot + 2, // 1: sidero.discovery.storage.ClusterSnapshot.affiliates:type_name -> sidero.discovery.storage.AffiliateSnapshot + 4, // 2: sidero.discovery.storage.AffiliateSnapshot.expiration:type_name -> google.protobuf.Timestamp + 3, // 3: sidero.discovery.storage.AffiliateSnapshot.endpoints:type_name -> sidero.discovery.storage.EndpointSnapshot + 4, // 4: sidero.discovery.storage.EndpointSnapshot.expiration:type_name -> google.protobuf.Timestamp + 5, // [5:5] is the sub-list for method output_type + 5, // [5:5] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name +} + +func init() { file_storage_storage_proto_init() } +func file_storage_storage_proto_init() { + if File_storage_storage_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_storage_storage_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StateSnapshot); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_storage_storage_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ClusterSnapshot); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_storage_storage_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*AffiliateSnapshot); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_storage_storage_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*EndpointSnapshot); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_storage_storage_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_storage_storage_proto_goTypes, + DependencyIndexes: file_storage_storage_proto_depIdxs, + MessageInfos: file_storage_storage_proto_msgTypes, + }.Build() + File_storage_storage_proto = out.File + file_storage_storage_proto_rawDesc = nil + file_storage_storage_proto_goTypes = nil + file_storage_storage_proto_depIdxs = nil +} diff --git a/api/storage/storage.proto b/api/storage/storage.proto new file mode 100644 index 0000000..cafcc9b --- /dev/null +++ b/api/storage/storage.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +package sidero.discovery.storage; +option go_package = "github.com/siderolabs/discovery-service/api/storagepb"; + +import "google/protobuf/timestamp.proto"; + +// StateSnapshot is the snapshot of the discovery service state. +message StateSnapshot { + repeated ClusterSnapshot clusters = 1; +} + +// ClusterSnapshot is the snapshot of a cluster with a set of affiliates. +message ClusterSnapshot { + string id = 1; + repeated AffiliateSnapshot affiliates = 2; +} + +// AffiliateSnapshot is the snapshot of an affiliate that is part of a cluster with a set of endpoints. +message AffiliateSnapshot { + string id = 1; + google.protobuf.Timestamp expiration = 2; + bytes data = 3; + repeated EndpointSnapshot endpoints = 4; +} + +// EndpointSnapshot is the snapshot of an endpoint of an affiliate. +message EndpointSnapshot { + google.protobuf.Timestamp expiration = 1; + bytes data = 2; +} diff --git a/api/storage/storage_vtproto.pb.go b/api/storage/storage_vtproto.pb.go new file mode 100644 index 0000000..4ae0b8c --- /dev/null +++ b/api/storage/storage_vtproto.pb.go @@ -0,0 +1,1064 @@ +// Code generated by protoc-gen-go-vtproto. DO NOT EDIT. +// protoc-gen-go-vtproto version: v0.6.0 +// source: storage/storage.proto + +package storagepb + +import ( + fmt "fmt" + io "io" + + protohelpers "github.com/planetscale/vtprotobuf/protohelpers" + timestamppb1 "github.com/planetscale/vtprotobuf/types/known/timestamppb" + proto "google.golang.org/protobuf/proto" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +func (m *StateSnapshot) CloneVT() *StateSnapshot { + if m == nil { + return (*StateSnapshot)(nil) + } + r := new(StateSnapshot) + if rhs := m.Clusters; rhs != nil { + tmpContainer := make([]*ClusterSnapshot, len(rhs)) + for k, v := range rhs { + tmpContainer[k] = v.CloneVT() + } + r.Clusters = tmpContainer + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *StateSnapshot) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *ClusterSnapshot) CloneVT() *ClusterSnapshot { + if m == nil { + return (*ClusterSnapshot)(nil) + } + r := new(ClusterSnapshot) + r.Id = m.Id + if rhs := m.Affiliates; rhs != nil { + tmpContainer := make([]*AffiliateSnapshot, len(rhs)) + for k, v := range rhs { + tmpContainer[k] = v.CloneVT() + } + r.Affiliates = tmpContainer + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *ClusterSnapshot) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *AffiliateSnapshot) CloneVT() *AffiliateSnapshot { + if m == nil { + return (*AffiliateSnapshot)(nil) + } + r := new(AffiliateSnapshot) + r.Id = m.Id + r.Expiration = (*timestamppb.Timestamp)((*timestamppb1.Timestamp)(m.Expiration).CloneVT()) + if rhs := m.Data; rhs != nil { + tmpBytes := make([]byte, len(rhs)) + copy(tmpBytes, rhs) + r.Data = tmpBytes + } + if rhs := m.Endpoints; rhs != nil { + tmpContainer := make([]*EndpointSnapshot, len(rhs)) + for k, v := range rhs { + tmpContainer[k] = v.CloneVT() + } + r.Endpoints = tmpContainer + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *AffiliateSnapshot) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *EndpointSnapshot) CloneVT() *EndpointSnapshot { + if m == nil { + return (*EndpointSnapshot)(nil) + } + r := new(EndpointSnapshot) + r.Expiration = (*timestamppb.Timestamp)((*timestamppb1.Timestamp)(m.Expiration).CloneVT()) + if rhs := m.Data; rhs != nil { + tmpBytes := make([]byte, len(rhs)) + copy(tmpBytes, rhs) + r.Data = tmpBytes + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *EndpointSnapshot) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (this *StateSnapshot) EqualVT(that *StateSnapshot) bool { + if this == that { + return true + } else if this == nil || that == nil { + return false + } + if len(this.Clusters) != len(that.Clusters) { + return false + } + for i, vx := range this.Clusters { + vy := that.Clusters[i] + if p, q := vx, vy; p != q { + if p == nil { + p = &ClusterSnapshot{} + } + if q == nil { + q = &ClusterSnapshot{} + } + if !p.EqualVT(q) { + return false + } + } + } + return string(this.unknownFields) == string(that.unknownFields) +} + +func (this *StateSnapshot) EqualMessageVT(thatMsg proto.Message) bool { + that, ok := thatMsg.(*StateSnapshot) + if !ok { + return false + } + return this.EqualVT(that) +} +func (this *ClusterSnapshot) EqualVT(that *ClusterSnapshot) bool { + if this == that { + return true + } else if this == nil || that == nil { + return false + } + if this.Id != that.Id { + return false + } + if len(this.Affiliates) != len(that.Affiliates) { + return false + } + for i, vx := range this.Affiliates { + vy := that.Affiliates[i] + if p, q := vx, vy; p != q { + if p == nil { + p = &AffiliateSnapshot{} + } + if q == nil { + q = &AffiliateSnapshot{} + } + if !p.EqualVT(q) { + return false + } + } + } + return string(this.unknownFields) == string(that.unknownFields) +} + +func (this *ClusterSnapshot) EqualMessageVT(thatMsg proto.Message) bool { + that, ok := thatMsg.(*ClusterSnapshot) + if !ok { + return false + } + return this.EqualVT(that) +} +func (this *AffiliateSnapshot) EqualVT(that *AffiliateSnapshot) bool { + if this == that { + return true + } else if this == nil || that == nil { + return false + } + if this.Id != that.Id { + return false + } + if !(*timestamppb1.Timestamp)(this.Expiration).EqualVT((*timestamppb1.Timestamp)(that.Expiration)) { + return false + } + if string(this.Data) != string(that.Data) { + return false + } + if len(this.Endpoints) != len(that.Endpoints) { + return false + } + for i, vx := range this.Endpoints { + vy := that.Endpoints[i] + if p, q := vx, vy; p != q { + if p == nil { + p = &EndpointSnapshot{} + } + if q == nil { + q = &EndpointSnapshot{} + } + if !p.EqualVT(q) { + return false + } + } + } + return string(this.unknownFields) == string(that.unknownFields) +} + +func (this *AffiliateSnapshot) EqualMessageVT(thatMsg proto.Message) bool { + that, ok := thatMsg.(*AffiliateSnapshot) + if !ok { + return false + } + return this.EqualVT(that) +} +func (this *EndpointSnapshot) EqualVT(that *EndpointSnapshot) bool { + if this == that { + return true + } else if this == nil || that == nil { + return false + } + if !(*timestamppb1.Timestamp)(this.Expiration).EqualVT((*timestamppb1.Timestamp)(that.Expiration)) { + return false + } + if string(this.Data) != string(that.Data) { + return false + } + return string(this.unknownFields) == string(that.unknownFields) +} + +func (this *EndpointSnapshot) EqualMessageVT(thatMsg proto.Message) bool { + that, ok := thatMsg.(*EndpointSnapshot) + if !ok { + return false + } + return this.EqualVT(that) +} +func (m *StateSnapshot) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *StateSnapshot) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *StateSnapshot) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if len(m.Clusters) > 0 { + for iNdEx := len(m.Clusters) - 1; iNdEx >= 0; iNdEx-- { + size, err := m.Clusters[iNdEx].MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *ClusterSnapshot) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ClusterSnapshot) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *ClusterSnapshot) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if len(m.Affiliates) > 0 { + for iNdEx := len(m.Affiliates) - 1; iNdEx >= 0; iNdEx-- { + size, err := m.Affiliates[iNdEx].MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0x12 + } + } + if len(m.Id) > 0 { + i -= len(m.Id) + copy(dAtA[i:], m.Id) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.Id))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *AffiliateSnapshot) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *AffiliateSnapshot) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *AffiliateSnapshot) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if len(m.Endpoints) > 0 { + for iNdEx := len(m.Endpoints) - 1; iNdEx >= 0; iNdEx-- { + size, err := m.Endpoints[iNdEx].MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0x22 + } + } + if len(m.Data) > 0 { + i -= len(m.Data) + copy(dAtA[i:], m.Data) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.Data))) + i-- + dAtA[i] = 0x1a + } + if m.Expiration != nil { + size, err := (*timestamppb1.Timestamp)(m.Expiration).MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0x12 + } + if len(m.Id) > 0 { + i -= len(m.Id) + copy(dAtA[i:], m.Id) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.Id))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *EndpointSnapshot) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *EndpointSnapshot) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *EndpointSnapshot) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if len(m.Data) > 0 { + i -= len(m.Data) + copy(dAtA[i:], m.Data) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.Data))) + i-- + dAtA[i] = 0x12 + } + if m.Expiration != nil { + size, err := (*timestamppb1.Timestamp)(m.Expiration).MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *StateSnapshot) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Clusters) > 0 { + for _, e := range m.Clusters { + l = e.SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + } + n += len(m.unknownFields) + return n +} + +func (m *ClusterSnapshot) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Id) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + if len(m.Affiliates) > 0 { + for _, e := range m.Affiliates { + l = e.SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + } + n += len(m.unknownFields) + return n +} + +func (m *AffiliateSnapshot) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Id) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + if m.Expiration != nil { + l = (*timestamppb1.Timestamp)(m.Expiration).SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + l = len(m.Data) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + if len(m.Endpoints) > 0 { + for _, e := range m.Endpoints { + l = e.SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + } + n += len(m.unknownFields) + return n +} + +func (m *EndpointSnapshot) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Expiration != nil { + l = (*timestamppb1.Timestamp)(m.Expiration).SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + l = len(m.Data) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + n += len(m.unknownFields) + return n +} + +func (m *StateSnapshot) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StateSnapshot: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StateSnapshot: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Clusters", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Clusters = append(m.Clusters, &ClusterSnapshot{}) + if err := m.Clusters[len(m.Clusters)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := protohelpers.Skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return protohelpers.ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ClusterSnapshot) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ClusterSnapshot: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ClusterSnapshot: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Id = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Affiliates", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Affiliates = append(m.Affiliates, &AffiliateSnapshot{}) + if err := m.Affiliates[len(m.Affiliates)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := protohelpers.Skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return protohelpers.ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *AffiliateSnapshot) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AffiliateSnapshot: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AffiliateSnapshot: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Id = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Expiration", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Expiration == nil { + m.Expiration = ×tamppb.Timestamp{} + } + if err := (*timestamppb1.Timestamp)(m.Expiration).UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...) + if m.Data == nil { + m.Data = []byte{} + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Endpoints", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Endpoints = append(m.Endpoints, &EndpointSnapshot{}) + if err := m.Endpoints[len(m.Endpoints)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := protohelpers.Skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return protohelpers.ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *EndpointSnapshot) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: EndpointSnapshot: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: EndpointSnapshot: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Expiration", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Expiration == nil { + m.Expiration = ×tamppb.Timestamp{} + } + if err := (*timestamppb1.Timestamp)(m.Expiration).UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...) + if m.Data == nil { + m.Data = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := protohelpers.Skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return protohelpers.ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} diff --git a/cmd/discovery-service/main.go b/cmd/discovery-service/main.go index 23d6097..b9dce91 100644 --- a/cmd/discovery-service/main.go +++ b/cmd/discovery-service/main.go @@ -22,6 +22,7 @@ import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" + "github.com/jonboulle/clockwork" prom "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/siderolabs/discovery-api/api/v1alpha1/server/pb" @@ -37,6 +38,7 @@ import ( "github.com/siderolabs/discovery-service/internal/limiter" _ "github.com/siderolabs/discovery-service/internal/proto" "github.com/siderolabs/discovery-service/internal/state" + "github.com/siderolabs/discovery-service/internal/state/storage" "github.com/siderolabs/discovery-service/pkg/limits" "github.com/siderolabs/discovery-service/pkg/server" ) @@ -49,6 +51,9 @@ var ( devMode = false gcInterval = time.Minute redirectEndpoint = "" + snapshotsEnabled = true + snapshotPath = "/var/discovery-service/state.binpb" + snapshotInterval = 10 * time.Minute ) func init() { @@ -58,6 +63,9 @@ func init() { flag.BoolVar(&devMode, "debug", devMode, "enable debug mode") flag.DurationVar(&gcInterval, "gc-interval", gcInterval, "garbage collection interval") flag.StringVar(&redirectEndpoint, "redirect-endpoint", redirectEndpoint, "redirect all clients to a new endpoint (gRPC endpoint, e.g. 'example.com:443'") + flag.BoolVar(&snapshotsEnabled, "snapshots-enabled", snapshotsEnabled, "enable snapshots") + flag.StringVar(&snapshotPath, "snapshot-path", snapshotPath, "path to the snapshot file") + flag.DurationVar(&snapshotInterval, "snapshot-interval", snapshotInterval, "interval to save the snapshot") if debug.Enabled { flag.StringVar(&debugAddr, "debug-addr", debugAddr, "debug (pprof, trace, expvar) listen addr") @@ -189,6 +197,19 @@ func run(ctx context.Context, logger *zap.Logger) error { state := state.NewState(logger) prom.MustRegister(state) + var stateStorage *storage.Storage + + if snapshotsEnabled { + stateStorage = storage.New(snapshotPath, state, logger) + prom.MustRegister(stateStorage) + + if err := stateStorage.Load(); err != nil { + logger.Warn("failed to load state from storage", zap.Error(err)) + } + } else { + logger.Info("snapshots are disabled") + } + srv := server.NewClusterServer(state, ctx.Done(), redirectEndpoint) prom.MustRegister(srv) @@ -226,6 +247,12 @@ func run(ctx context.Context, logger *zap.Logger) error { eg, ctx := errgroup.WithContext(ctx) + if snapshotsEnabled { + eg.Go(func() error { + return stateStorage.Start(ctx, clockwork.NewRealClock(), snapshotInterval) + }) + } + eg.Go(func() error { logger.Info("gRPC server starting", zap.Stringer("address", lis.Addr())) diff --git a/cmd/snapshot-decoder/main.go b/cmd/snapshot-decoder/main.go new file mode 100644 index 0000000..38f5686 --- /dev/null +++ b/cmd/snapshot-decoder/main.go @@ -0,0 +1,64 @@ +// Copyright (c) 2024 Sidero Labs, Inc. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. + +// Package main implements a simple tool to decode a snapshot file. +package main + +import ( + "bytes" + "flag" + "fmt" + "io" + "log" + "os" + + "google.golang.org/protobuf/encoding/protojson" + + storagepb "github.com/siderolabs/discovery-service/api/storage" +) + +var snapshotPath = "/var/discovery-service/state.binpb" + +func init() { + flag.StringVar(&snapshotPath, "snapshot-path", snapshotPath, "path to the snapshot file") +} + +func main() { + flag.Parse() + + if err := run(); err != nil { + log.Fatalf("error: %v", err) + } +} + +func run() error { + data, err := os.ReadFile(snapshotPath) + if err != nil { + return fmt.Errorf("failed to read snapshot: %w", err) + } + + snapshot := &storagepb.StateSnapshot{} + + if err = snapshot.UnmarshalVT(data); err != nil { + return fmt.Errorf("failed to unmarshal snapshot: %w", err) + } + + opts := protojson.MarshalOptions{ + Indent: " ", + } + + json, err := opts.Marshal(snapshot) + if err != nil { + return fmt.Errorf("failed to marshal snapshot: %w", err) + } + + if _, err = io.Copy(os.Stdout, bytes.NewReader(json)); err != nil { + return fmt.Errorf("failed to write snapshot: %w", err) + } + + fmt.Println() + + return nil +} diff --git a/go.mod b/go.mod index b3c64d2..db562ea 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,12 @@ module github.com/siderolabs/discovery-service -go 1.22.1 +go 1.22.3 require ( github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 + github.com/jonboulle/clockwork v0.4.1-0.20231224152657-fc59783b0293 + github.com/planetscale/vtprotobuf v0.6.0 github.com/prometheus/client_golang v1.19.0 github.com/siderolabs/discovery-api v0.1.4 github.com/siderolabs/discovery-client v0.1.8 @@ -16,7 +18,7 @@ require ( golang.org/x/sync v0.6.0 golang.org/x/time v0.5.0 google.golang.org/grpc v1.62.1 - google.golang.org/protobuf v1.33.0 + google.golang.org/protobuf v1.34.1 ) require ( @@ -25,15 +27,14 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/planetscale/vtprotobuf v0.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/net v0.21.0 // indirect - golang.org/x/sys v0.17.0 // indirect - golang.org/x/text v0.14.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index fefb714..bf91970 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 h1:f4tg github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0/go.mod h1:hKAkSgNkL0FII46ZkJcpVEAai4KV+swlIWCKfekd1pA= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 h1:HcUWd006luQPljE73d5sk+/VgYPGUReEVz2y1/qylwY= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1/go.mod h1:w9Y7gY31krpLmrVU5ZPG9H7l9fZuRu5/3R3S3FMtVQ4= +github.com/jonboulle/clockwork v0.4.1-0.20231224152657-fc59783b0293 h1:l3TVsYI+QxIp0CW7YCizx9WG26Lj7DXnc1pdlBKk3gY= +github.com/jonboulle/clockwork v0.4.1-0.20231224152657-fc59783b0293/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -52,14 +54,14 @@ go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= go4.org/netipx v0.0.0-20231129151722-fdeea329fbba h1:0b9z3AuHCjxk0x/opv64kcgZLBseWJUpBw5I82+2U4M= go4.org/netipx v0.0.0-20231129151722-fdeea329fbba/go.mod h1:PLyyIXexvUFg3Owu6p/WfdlivPbZJsZdgWZlrGope/Y= -golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= -golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= -golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -69,8 +71,8 @@ google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/internal/landing/html/index.html b/internal/landing/html/index.html index a55d952..029fa66 100644 --- a/internal/landing/html/index.html +++ b/internal/landing/html/index.html @@ -35,8 +35,7 @@
- Moreover, the discovery service has no peristence. - Data is stored in memory only with a TTL set by the clients (i.e. Talos). + Data is stored in memory only with a TTL set by the clients (i.e., Talos), and periodic snapshots to disk are enabled. The cluster ID is used as a key to select the affiliates (so that different clusters see different affiliates).
diff --git a/internal/state/affiliate_test.go b/internal/state/affiliate_test.go index ec0ff56..c8f3985 100644 --- a/internal/state/affiliate_test.go +++ b/internal/state/affiliate_test.go @@ -106,7 +106,7 @@ func TestAffiliateTooManyEndpoints(t *testing.T) { affiliate := state.NewAffiliate("id1") - for i := 0; i < limits.AffiliateEndpointsMax; i++ { + for i := range limits.AffiliateEndpointsMax { assert.NoError(t, affiliate.MergeEndpoints([][]byte{[]byte(fmt.Sprintf("endpoint%d", i))}, now)) } diff --git a/internal/state/cluster_test.go b/internal/state/cluster_test.go index a21f7de..742d945 100644 --- a/internal/state/cluster_test.go +++ b/internal/state/cluster_test.go @@ -272,7 +272,7 @@ func TestClusterTooManyAffiliates(t *testing.T) { cluster := state.NewCluster("cluster3") - for i := 0; i < limits.ClusterAffiliatesMax; i++ { + for i := range limits.ClusterAffiliatesMax { assert.NoError(t, cluster.WithAffiliate(fmt.Sprintf("af%d", i), func(*state.Affiliate) error { return nil })) diff --git a/internal/state/snapshot.go b/internal/state/snapshot.go new file mode 100644 index 0000000..a2d938d --- /dev/null +++ b/internal/state/snapshot.go @@ -0,0 +1,130 @@ +// Copyright (c) 2024 Sidero Labs, Inc. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. + +package state + +import ( + "slices" + + "github.com/siderolabs/gen/xslices" + "google.golang.org/protobuf/types/known/timestamppb" + + storagepb "github.com/siderolabs/discovery-service/api/storage" +) + +// ExportClusterSnapshots exports all cluster snapshots and calls the provided function for each one. +// +// Implements storage.Snapshotter interface. +func (state *State) ExportClusterSnapshots(f func(snapshot *storagepb.ClusterSnapshot) error) error { + var err error + + // reuse the same snapshotin each iteration + clusterSnapshot := &storagepb.ClusterSnapshot{} + + state.clusters.Range(func(_ string, cluster *Cluster) bool { + snapshotCluster(cluster, clusterSnapshot) + + err = f(clusterSnapshot) + + return err == nil + }) + + return err +} + +// ImportClusterSnapshots imports cluster snapshots by calling the provided function until it returns false. +// +// Implements storage.Snapshotter interface. +func (state *State) ImportClusterSnapshots(f func() (*storagepb.ClusterSnapshot, bool, error)) error { + for { + clusterSnapshot, ok, err := f() + if err != nil { + return err + } + + if !ok { + break + } + + cluster := clusterFromSnapshot(clusterSnapshot) + + state.clusters.Store(cluster.id, cluster) + } + + return nil +} + +func snapshotCluster(cluster *Cluster, snapshot *storagepb.ClusterSnapshot) { + cluster.affiliatesMu.Lock() + defer cluster.affiliatesMu.Unlock() + + snapshot.Id = cluster.id + + // reuse the same slice, resize it as needed + snapshot.Affiliates = slices.Grow(snapshot.Affiliates, len(cluster.affiliates)) + snapshot.Affiliates = snapshot.Affiliates[:len(cluster.affiliates)] + + i := 0 + for _, affiliate := range cluster.affiliates { + if snapshot.Affiliates[i] == nil { + snapshot.Affiliates[i] = &storagepb.AffiliateSnapshot{} + } + + snapshot.Affiliates[i].Id = affiliate.id + + if snapshot.Affiliates[i].Expiration == nil { + snapshot.Affiliates[i].Expiration = ×tamppb.Timestamp{} + } + + snapshot.Affiliates[i].Expiration.Seconds = affiliate.expiration.Unix() + snapshot.Affiliates[i].Expiration.Nanos = int32(affiliate.expiration.Nanosecond()) + + snapshot.Affiliates[i].Data = affiliate.data + + // reuse the same slice, resize it as needed + snapshot.Affiliates[i].Endpoints = slices.Grow(snapshot.Affiliates[i].Endpoints, len(affiliate.endpoints)) + snapshot.Affiliates[i].Endpoints = snapshot.Affiliates[i].Endpoints[:len(affiliate.endpoints)] + + for j, endpoint := range affiliate.endpoints { + if snapshot.Affiliates[i].Endpoints[j] == nil { + snapshot.Affiliates[i].Endpoints[j] = &storagepb.EndpointSnapshot{} + } + + snapshot.Affiliates[i].Endpoints[j].Data = endpoint.data + + if snapshot.Affiliates[i].Endpoints[j].Expiration == nil { + snapshot.Affiliates[i].Endpoints[j].Expiration = ×tamppb.Timestamp{} + } + + snapshot.Affiliates[i].Endpoints[j].Expiration.Seconds = endpoint.expiration.Unix() + snapshot.Affiliates[i].Endpoints[j].Expiration.Nanos = int32(endpoint.expiration.Nanosecond()) + } + + i++ + } +} + +func clusterFromSnapshot(snapshot *storagepb.ClusterSnapshot) *Cluster { + return &Cluster{ + id: snapshot.Id, + affiliates: xslices.ToMap(snapshot.Affiliates, affiliateFromSnapshot), + } +} + +func affiliateFromSnapshot(snapshot *storagepb.AffiliateSnapshot) (string, *Affiliate) { + return snapshot.Id, &Affiliate{ + id: snapshot.Id, + expiration: snapshot.Expiration.AsTime(), + data: snapshot.Data, + endpoints: xslices.Map(snapshot.Endpoints, endpointFromSnapshot), + } +} + +func endpointFromSnapshot(snapshot *storagepb.EndpointSnapshot) Endpoint { + return Endpoint{ + data: snapshot.Data, + expiration: snapshot.Expiration.AsTime(), + } +} diff --git a/internal/state/storage/protobuf.go b/internal/state/storage/protobuf.go new file mode 100644 index 0000000..a10ec30 --- /dev/null +++ b/internal/state/storage/protobuf.go @@ -0,0 +1,135 @@ +// Copyright (c) 2024 Sidero Labs, Inc. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. + +package storage + +import ( + "errors" + "fmt" + "io" + "slices" + + "google.golang.org/protobuf/encoding/protowire" + + storagepb "github.com/siderolabs/discovery-service/api/storage" +) + +const ( + clustersFieldNum = 1 + clustersFieldType = protowire.BytesType + + // MaxClusterSize is the maximum allowed size of a cluster snapshot. + MaxClusterSize = 138578179 +) + +// ErrClusterSnapshotTooLarge is returned when a cluster snapshot is above the maximum size of MaxClusterSize. +var ErrClusterSnapshotTooLarge = fmt.Errorf("cluster snapshot is above the maximum size of %qB", MaxClusterSize) + +// encodeClusterSnapshot encodes a ClusterSnapshot into the given buffer, resizing it as needed. +// +// It returns the buffer with the encoded ClusterSnapshot and an error if the encoding fails. +func encodeClusterSnapshot(buffer []byte, snapshot *storagepb.ClusterSnapshot) ([]byte, error) { + buffer = protowire.AppendTag(buffer, clustersFieldNum, clustersFieldType) + size := snapshot.SizeVT() + buffer = protowire.AppendVarint(buffer, uint64(size)) + + startIdx := len(buffer) + clusterSize := size + + buffer = slices.Grow(buffer, clusterSize) + buffer = buffer[:startIdx+clusterSize] + + if _, err := snapshot.MarshalToSizedBufferVT(buffer[startIdx:]); err != nil { + return nil, fmt.Errorf("failed to marshal cluster: %w", err) + } + + return buffer, nil +} + +// decodeClusterSnapshot decodes a ClusterSnapshot from the given buffer. +func decodeClusterSnapshot(buffer []byte) (*storagepb.ClusterSnapshot, error) { + var clusterSnapshot storagepb.ClusterSnapshot + + if err := clusterSnapshot.UnmarshalVT(buffer); err != nil { + return nil, fmt.Errorf("failed to unmarshal cluster snapshot: %w", err) + } + + return &clusterSnapshot, nil +} + +// decodeClusterSnapshotHeader reads and decodes the header of a ClusterSnapshot from the given io.ByteReader. +// +// It returns the size of the header and the size of the snapshot. +func decodeClusterSnapshotHeader(r io.ByteReader) (headerSize, clusterSize int, err error) { + tagNum, tagType, tagEncodedLen, err := consumeTag(r) + if err != nil { + return 0, 0, err + } + + if tagNum != clustersFieldNum { + return 0, 0, fmt.Errorf("unexpected number: %v", tagNum) + } + + if tagType != clustersFieldType { + return 0, 0, fmt.Errorf("unexpected type: %v", tagType) + } + + clusterSizeVal, clusterSizeEncodedLen, err := consumeVarint(r) + if clusterSizeEncodedLen < 0 { + return 0, 0, err + } + + if clusterSizeVal > MaxClusterSize { + return 0, 0, fmt.Errorf("%w: %v", ErrClusterSnapshotTooLarge, clusterSizeVal) + } + + return tagEncodedLen + clusterSizeEncodedLen, int(clusterSizeVal), nil +} + +// consumeTag reads a varint-encoded tag from the given io.ByteReader, reporting its length. +// +// It is an adaptation of protowire.ConsumeTag to work with io.ByteReader. +// +// It returns the tag number, the tag type, the length of the tag, and an error if the tag is invalid. +func consumeTag(r io.ByteReader) (protowire.Number, protowire.Type, int, error) { + v, n, err := consumeVarint(r) + if err != nil { + return 0, 0, 0, err + } + + num, typ := protowire.DecodeTag(v) + if num < protowire.MinValidNumber { + return 0, 0, 0, errors.New("invalid field number") + } + + return num, typ, n, nil +} + +// consumeVarint parses a varint-encoded uint64 from the given io.ByteReader, reporting its length. +// +// It is an adaptation of protowire.ConsumeVarint to work with io.ByteReader. +// +// It returns the parsed value, the length of the varint, and an error if the varint is invalid. +func consumeVarint(r io.ByteReader) (uint64, int, error) { + var v uint64 + + for i := range 10 { + b, err := r.ReadByte() + if err != nil { + return 0, 0, err + } + + y := uint64(b) + v += y << uint(i*7) + + if y < 0x80 { + return v, i + 1, nil + } + + v -= 0x80 << uint(i*7) + } + + return 0, 0, errors.New("variable length integer overflow") +} diff --git a/internal/state/storage/storage.go b/internal/state/storage/storage.go new file mode 100644 index 0000000..bdeed0a --- /dev/null +++ b/internal/state/storage/storage.go @@ -0,0 +1,397 @@ +// Copyright (c) 2024 Sidero Labs, Inc. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. + +// Package storage implements persistent storage for the state of the discovery service. +package storage + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "slices" + "sync" + "time" + + "github.com/jonboulle/clockwork" + prom "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" + + storagepb "github.com/siderolabs/discovery-service/api/storage" +) + +const ( + labelOperation = "operation" + labelStatus = "status" + + operationSave = "save" + operationLoad = "load" + + statusSuccess = "success" + statusError = "error" +) + +// Storage is a persistent storage for the state of the discovery service. +type Storage struct { + state Snapshotter + logger *zap.Logger + + operationsMetric *prom.CounterVec + lastSnapshotSizeMetric *prom.GaugeVec + lastOperationClustersMetric *prom.GaugeVec + lastOperationAffiliatesMetric *prom.GaugeVec + lastOperationEndpointsMetric *prom.GaugeVec + lastOperationDurationMetric *prom.GaugeVec + + path string +} + +// Describe implements prometheus.Collector interface. +func (storage *Storage) Describe(descs chan<- *prom.Desc) { + prom.DescribeByCollect(storage, descs) +} + +// Collect implements prometheus.Collector interface. +func (storage *Storage) Collect(metrics chan<- prom.Metric) { + storage.operationsMetric.Collect(metrics) + storage.lastSnapshotSizeMetric.Collect(metrics) + storage.lastOperationClustersMetric.Collect(metrics) + storage.lastOperationAffiliatesMetric.Collect(metrics) + storage.lastOperationEndpointsMetric.Collect(metrics) + storage.lastOperationDurationMetric.Collect(metrics) +} + +// Snapshotter is an interface for exporting and importing cluster state. +type Snapshotter interface { + // ExportClusterSnapshots exports cluster snapshots to the given function. + ExportClusterSnapshots(f func(*storagepb.ClusterSnapshot) error) error + + // ImportClusterSnapshots imports cluster snapshots from the given function. + ImportClusterSnapshots(f func() (*storagepb.ClusterSnapshot, bool, error)) error +} + +// New creates a new instance of Storage. +func New(path string, state Snapshotter, logger *zap.Logger) *Storage { + return &Storage{ + state: state, + logger: logger.With(zap.String("component", "storage"), zap.String("path", path)), + path: path, + + operationsMetric: prom.NewCounterVec(prom.CounterOpts{ + Name: "discovery_storage_operations_total", + Help: "The total number of storage operations.", + }, []string{labelOperation, labelStatus}), + lastSnapshotSizeMetric: prom.NewGaugeVec(prom.GaugeOpts{ + Name: "discovery_storage_last_snapshot_size_bytes", + Help: "The size of the last processed snapshot in bytes.", + }, []string{labelOperation}), + lastOperationClustersMetric: prom.NewGaugeVec(prom.GaugeOpts{ + Name: "discovery_storage_last_operation_clusters", + Help: "The number of clusters in the snapshot of the last operation.", + }, []string{labelOperation}), + lastOperationAffiliatesMetric: prom.NewGaugeVec(prom.GaugeOpts{ + Name: "discovery_storage_last_operation_affiliates", + Help: "The number of affiliates in the snapshot of the last operation.", + }, []string{labelOperation}), + lastOperationEndpointsMetric: prom.NewGaugeVec(prom.GaugeOpts{ + Name: "discovery_storage_last_operation_endpoints", + Help: "The number of endpoints in the snapshot of the last operation.", + }, []string{labelOperation}), + lastOperationDurationMetric: prom.NewGaugeVec(prom.GaugeOpts{ + Name: "discovery_storage_last_operation_duration_seconds", + Help: "The duration of the last operation in seconds.", + }, []string{labelOperation}), + } +} + +// Start starts the storage loop that periodically saves the state. +func (storage *Storage) Start(ctx context.Context, clock clockwork.Clock, interval time.Duration) error { + storage.logger.Info("start storage loop", zap.Duration("interval", interval)) + + ticker := clock.NewTicker(interval) + + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + storage.logger.Info("received shutdown signal") + + if err := storage.Save(); err != nil { + return fmt.Errorf("failed to save state on shutdown: %w", err) + } + + if errors.Is(ctx.Err(), context.Canceled) { + return nil + } + + return ctx.Err() + case <-ticker.Chan(): + if err := storage.Save(); err != nil { + storage.logger.Error("failed to save state", zap.Error(err)) + } + } + } +} + +// Save saves all clusters' states into the persistent storage. +func (storage *Storage) Save() (err error) { + start := time.Now() + + defer func() { + if err != nil { + storage.operationsMetric.WithLabelValues(operationSave, statusError).Inc() + } + }() + + if err = os.MkdirAll(filepath.Dir(storage.path), 0o755); err != nil { + return fmt.Errorf("failed to create directory path: %w", err) + } + + tmpFile, err := getTempFile(storage.path) + if err != nil { + return fmt.Errorf("failed to create temporary file: %w", err) + } + + defer func() { + tmpFile.Close() //nolint:errcheck + os.Remove(tmpFile.Name()) //nolint:errcheck + }() + + stats, err := storage.Export(tmpFile) + if err != nil { + return fmt.Errorf("failed to write snapshot: %w", err) + } + + if err = commitTempFile(tmpFile, storage.path); err != nil { + return fmt.Errorf("failed to commit temporary file: %w", err) + } + + duration := time.Since(start) + + storage.logger.Info("state saved", zap.Int("clusters", stats.NumClusters), zap.Int("affiliates", stats.NumAffiliates), + zap.Int("endpoints", stats.NumEndpoints), zap.Duration("duration", duration)) + + storage.operationsMetric.WithLabelValues(operationSave, statusSuccess).Inc() + storage.lastSnapshotSizeMetric.WithLabelValues(operationSave).Set(float64(stats.Size)) + storage.lastOperationClustersMetric.WithLabelValues(operationSave).Set(float64(stats.NumClusters)) + storage.lastOperationAffiliatesMetric.WithLabelValues(operationSave).Set(float64(stats.NumAffiliates)) + storage.lastOperationEndpointsMetric.WithLabelValues(operationSave).Set(float64(stats.NumEndpoints)) + storage.lastOperationDurationMetric.WithLabelValues(operationSave).Set(duration.Seconds()) + + return nil +} + +// Load loads all clusters' states from the persistent storage. +func (storage *Storage) Load() (err error) { + defer func() { + if err != nil { + storage.operationsMetric.WithLabelValues(operationLoad, statusError).Inc() + } + }() + + start := time.Now() + + // open file for reading + file, err := os.Open(storage.path) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + + defer file.Close() //nolint:errcheck + + stats, err := storage.Import(file) + if err != nil { + return fmt.Errorf("failed to read snapshot: %w", err) + } + + if err = file.Close(); err != nil { + return fmt.Errorf("failed to close file: %w", err) + } + + duration := time.Since(start) + + storage.logger.Info("state loaded", zap.Int("clusters", stats.NumClusters), zap.Int("affiliates", stats.NumAffiliates), + zap.Int("endpoints", stats.NumEndpoints), zap.Duration("duration", duration)) + + storage.operationsMetric.WithLabelValues(operationLoad, statusSuccess).Inc() + storage.lastSnapshotSizeMetric.WithLabelValues(operationLoad).Set(float64(stats.Size)) + storage.lastOperationClustersMetric.WithLabelValues(operationLoad).Set(float64(stats.NumClusters)) + storage.lastOperationAffiliatesMetric.WithLabelValues(operationLoad).Set(float64(stats.NumAffiliates)) + storage.lastOperationEndpointsMetric.WithLabelValues(operationLoad).Set(float64(stats.NumEndpoints)) + storage.lastOperationDurationMetric.WithLabelValues(operationLoad).Set(duration.Seconds()) + + return nil +} + +// Import imports all clusters' states from the given reader. +// +// When importing, we avoid unmarshalling to the storagepb.StateSnapshot type directly, as it causes an allocation of all the cluster snapshots at once. +// Instead, we process clusters in a streaming manner, unmarshaling them one by one and importing them into the state. +func (storage *Storage) Import(reader io.Reader) (SnapshotStats, error) { + size := 0 + numClusters := 0 + numAffiliates := 0 + numEndpoints := 0 + + buffer := make([]byte, 256) + bufferedReader := bufio.NewReader(reader) + + // unmarshal the clusters in a streaming manner and import them into the state + if err := storage.state.ImportClusterSnapshots(func() (*storagepb.ClusterSnapshot, bool, error) { + headerSize, clusterSize, err := decodeClusterSnapshotHeader(bufferedReader) + if err != nil { + if err == io.EOF { //nolint:errorlint + return nil, false, nil + } + + return nil, false, fmt.Errorf("failed to decode cluster header: %w", err) + } + + if clusterSize > cap(buffer) { + buffer = slices.Grow(buffer, clusterSize-cap(buffer)) + } + + buffer = buffer[:clusterSize] + + if _, err = io.ReadFull(bufferedReader, buffer); err != nil { + return nil, false, fmt.Errorf("failed to read bytes: %w", err) + } + + clusterSnapshot, err := decodeClusterSnapshot(buffer) + if err != nil { + return nil, false, fmt.Errorf("failed to decode cluster: %w", err) + } + + buffer = buffer[:0] + + // update stats + size += headerSize + clusterSize + numClusters++ + numAffiliates += len(clusterSnapshot.Affiliates) + + for _, affiliate := range clusterSnapshot.Affiliates { + numEndpoints += len(affiliate.Endpoints) + } + + return clusterSnapshot, true, nil + }); err != nil { + return SnapshotStats{}, fmt.Errorf("failed to import clusters: %w", err) + } + + return SnapshotStats{ + Size: size, + NumClusters: numClusters, + NumAffiliates: numAffiliates, + NumEndpoints: numEndpoints, + }, nil +} + +// Export exports all clusters' states into the given writer. +// +// When exporting, we avoid marshaling to the storagepb.StateSnapshot type directly, as it causes an allocation of all the cluster snapshots at once. +// Instead, we process clusters in a streaming manner, marshaling them one by one and exporting them into the writer. +func (storage *Storage) Export(writer io.Writer) (SnapshotStats, error) { + numClusters := 0 + numAffiliates := 0 + numEndpoints := 0 + size := 0 + + var buffer []byte + + bufferedWriter := bufio.NewWriter(writer) + + // marshal the clusters in a streaming manner and export them into the writer + if err := storage.state.ExportClusterSnapshots(func(snapshot *storagepb.ClusterSnapshot) error { + var err error + + buffer, err = encodeClusterSnapshot(buffer, snapshot) + if err != nil { + return fmt.Errorf("failed to encode cluster: %w", err) + } + + written, err := bufferedWriter.Write(buffer) + if err != nil { + return fmt.Errorf("failed to write cluster: %w", err) + } + + // prepare the buffer for the next iteration - reset it + buffer = buffer[:0] + + // update stats + size += written + numClusters++ + numAffiliates += len(snapshot.Affiliates) + + for _, affiliate := range snapshot.Affiliates { + numEndpoints += len(affiliate.Endpoints) + } + + return nil + }); err != nil { + return SnapshotStats{}, fmt.Errorf("failed to snapshot clusters: %w", err) + } + + if err := bufferedWriter.Flush(); err != nil { + return SnapshotStats{}, fmt.Errorf("failed to flush writer: %w", err) + } + + return SnapshotStats{ + Size: size, + NumClusters: numClusters, + NumAffiliates: numAffiliates, + NumEndpoints: numEndpoints, + }, nil +} + +func getTempFile(dst string) (*os.File, error) { + tmpFile, err := os.OpenFile(dst+".tmp", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o666) + if err != nil { + return nil, fmt.Errorf("failed to create file: %w", err) + } + + return tmpFile, nil +} + +// commitTempFile commits the temporary file to the destination and removes it. +func commitTempFile(tmpFile *os.File, dst string) error { + renamed := false + closer := sync.OnceValue(tmpFile.Close) + + defer func() { + closer() //nolint:errcheck + + if !renamed { + os.Remove(tmpFile.Name()) //nolint:errcheck + } + }() + + if err := tmpFile.Sync(); err != nil { + return fmt.Errorf("failed to sync data: %w", err) + } + + if err := closer(); err != nil { + return fmt.Errorf("failed to close file: %w", err) + } + + if err := os.Rename(tmpFile.Name(), dst); err != nil { + return fmt.Errorf("failed to rename file: %w", err) + } + + renamed = true + + return nil +} + +// SnapshotStats contains statistics about a snapshot. +type SnapshotStats struct { + Size int + NumClusters int + NumAffiliates int + NumEndpoints int +} diff --git a/internal/state/storage/storage_bench_test.go b/internal/state/storage/storage_bench_test.go new file mode 100644 index 0000000..445bd68 --- /dev/null +++ b/internal/state/storage/storage_bench_test.go @@ -0,0 +1,62 @@ +// Copyright (c) 2024 Sidero Labs, Inc. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. + +package storage_test + +import ( + "io" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + storagepb "github.com/siderolabs/discovery-service/api/storage" + "github.com/siderolabs/discovery-service/internal/state" + "github.com/siderolabs/discovery-service/internal/state/storage" +) + +func BenchmarkExport(b *testing.B) { + logger := zap.NewNop() + state := buildState(b, buildTestSnapshot(b.N), logger) + storage := storage.New("", state, logger) + + b.ReportAllocs() + b.ResetTimer() + + _, err := storage.Export(io.Discard) + require.NoError(b, err) +} + +func testBenchmarkAllocs(t *testing.T, f func(b *testing.B), threshold int64) { + res := testing.Benchmark(f) + + allocs := res.AllocsPerOp() + if allocs > threshold { + t.Fatalf("Expected AllocsPerOp <= %d, got %d", threshold, allocs) + } +} + +func TestBenchmarkExportAllocs(t *testing.T) { + testBenchmarkAllocs(t, BenchmarkExport, 0) +} + +func buildState(tb testing.TB, data *storagepb.StateSnapshot, logger *zap.Logger) *state.State { + i := 0 + state := state.NewState(logger) + + err := state.ImportClusterSnapshots(func() (*storagepb.ClusterSnapshot, bool, error) { + if i >= len(data.Clusters) { + return nil, false, nil + } + + clusterSnapshot := data.Clusters[i] + i++ + + return clusterSnapshot, true, nil + }) + require.NoError(tb, err) + + return state +} diff --git a/internal/state/storage/storage_test.go b/internal/state/storage/storage_test.go new file mode 100644 index 0000000..78c243a --- /dev/null +++ b/internal/state/storage/storage_test.go @@ -0,0 +1,398 @@ +// Copyright (c) 2024 Sidero Labs, Inc. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. + +package storage_test + +import ( + "bytes" + "context" + "fmt" + "math" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "google.golang.org/protobuf/types/known/timestamppb" + + storagepb "github.com/siderolabs/discovery-service/api/storage" + "github.com/siderolabs/discovery-service/internal/state/storage" + "github.com/siderolabs/discovery-service/pkg/limits" +) + +func TestExport(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { //nolint:govet + name string + snapshot *storagepb.StateSnapshot + }{ + { + "empty state", + &storagepb.StateSnapshot{}, + }, + { + "small state", + &storagepb.StateSnapshot{Clusters: []*storagepb.ClusterSnapshot{{Id: "a"}, {Id: "b"}}}, + }, + { + "large state", + buildTestSnapshot(100), + }, + } { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + snapshot := buildTestSnapshot(10) + tempDir := t.TempDir() + path := filepath.Join(tempDir, "test.binpb") + state := &mockSnapshotter{data: snapshot} + logger := zaptest.NewLogger(t) + + stateStorage := storage.New(path, state, logger) + + var buffer bytes.Buffer + + exportStats, err := stateStorage.Export(&buffer) + require.NoError(t, err) + + assert.Equal(t, statsForSnapshot(snapshot), exportStats) + + expected, err := snapshot.MarshalVT() + require.NoError(t, err) + + require.Equal(t, expected, buffer.Bytes()) + }) + } +} + +func TestImport(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { //nolint:govet + name string + snapshot *storagepb.StateSnapshot + }{ + { + "empty state", + &storagepb.StateSnapshot{}, + }, + { + "small state", + &storagepb.StateSnapshot{Clusters: []*storagepb.ClusterSnapshot{{Id: "a"}, {Id: "b"}}}, + }, + { + "large state", + buildTestSnapshot(100), + }, + } { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + path := filepath.Join(t.TempDir(), "test.binpb") + state := &mockSnapshotter{data: tc.snapshot} + logger := zaptest.NewLogger(t) + + stateStorage := storage.New(path, state, logger) + + data, err := tc.snapshot.MarshalVT() + require.NoError(t, err) + + importStats, err := stateStorage.Import(bytes.NewReader(data)) + require.NoError(t, err) + + require.Equal(t, statsForSnapshot(tc.snapshot), importStats) + + loads := state.getLoads() + + require.Len(t, loads, 1) + require.True(t, loads[0].EqualVT(tc.snapshot)) + }) + } +} + +func TestImportMaxSize(t *testing.T) { + t.Parallel() + + cluster := buildMaxSizeCluster() + stateSnapshot := &storagepb.StateSnapshot{Clusters: []*storagepb.ClusterSnapshot{cluster}} + path := filepath.Join(t.TempDir(), "test.binpb") + state := &mockSnapshotter{data: stateSnapshot} + logger := zaptest.NewLogger(t) + + stateStorage := storage.New(path, state, logger) + + clusterData, err := cluster.MarshalVT() + require.NoError(t, err) + + require.Equal(t, len(clusterData), storage.MaxClusterSize) + + data, err := stateSnapshot.MarshalVT() + require.NoError(t, err) + + t.Logf("max cluster marshaled size: %d", len(data)) + + _, err = stateStorage.Import(bytes.NewReader(data)) + require.NoError(t, err) + + // add one more affiliate to trigger an overflow + cluster.Affiliates = append(cluster.Affiliates, &storagepb.AffiliateSnapshot{ + Id: "overflow", + }) + + data, err = stateSnapshot.MarshalVT() + require.NoError(t, err) + + _, err = stateStorage.Import(bytes.NewReader(data)) + require.ErrorIs(t, err, storage.ErrClusterSnapshotTooLarge) +} + +func TestStorage(t *testing.T) { + t.Parallel() + + snapshot := buildTestSnapshot(10) + tempDir := t.TempDir() + path := filepath.Join(tempDir, "test.binpb") + state := &mockSnapshotter{data: snapshot} + logger := zaptest.NewLogger(t) + + stateStorage := storage.New(path, state, logger) + + // test save + + require.NoError(t, stateStorage.Save()) + + expectedData, err := snapshot.MarshalVT() + require.NoError(t, err) + + actualData, err := os.ReadFile(path) + require.NoError(t, err) + + require.Equal(t, expectedData, actualData) + + // test load + + require.NoError(t, stateStorage.Load()) + require.Len(t, state.getLoads(), 1) + require.True(t, snapshot.EqualVT(state.getLoads()[0])) + + // modify, save & load again to assert that the file content gets overwritten + + snapshot.Clusters[1].Affiliates[0].Data = []byte("new aff1 data") + + require.NoError(t, stateStorage.Save()) + require.NoError(t, stateStorage.Load()) + require.Len(t, state.getLoads(), 2) + require.True(t, snapshot.EqualVT(state.getLoads()[1])) +} + +func TestSchedule(t *testing.T) { + t.Parallel() + + clock := clockwork.NewFakeClock() + snapshot := buildTestSnapshot(10) + tempDir := t.TempDir() + path := filepath.Join(tempDir, "test.binpb") + state := &mockSnapshotter{data: snapshot} + logger := zaptest.NewLogger(t) + + stateStorage := storage.New(path, state, logger) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + // start the periodic storage and wait for it to block on the timer + + errCh := make(chan error) + + go func() { + errCh <- stateStorage.Start(ctx, clock, 10*time.Minute) + }() + + require.NoError(t, clock.BlockUntilContext(ctx, 1)) + + // advance time to trigger the first snapshot and assert it + + clock.Advance(13 * time.Minute) + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + assert.Equal(collect, 1, state.getSnapshots()) + }, 2*time.Second, 100*time.Millisecond) + + // advance time to trigger the second snapshot and assert it + + clock.Advance(10 * time.Minute) + require.EventuallyWithT(t, func(collect *assert.CollectT) { + assert.Equal(collect, 2, state.getSnapshots()) + }, 2*time.Second, 100*time.Millisecond) + + // cancel the context to stop the storage loop and wait for it to exit + cancel() + + require.NoError(t, <-errCh) + + // assert that the state was saved on shutdown + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + assert.Equal(collect, 3, state.getSnapshots()) + }, 2*time.Second, 100*time.Millisecond) +} + +type mockSnapshotter struct { + data *storagepb.StateSnapshot + loads []*storagepb.StateSnapshot + + snapshots int + + lock sync.Mutex +} + +func (m *mockSnapshotter) getSnapshots() int { + m.lock.Lock() + defer m.lock.Unlock() + + return m.snapshots +} + +func (m *mockSnapshotter) getLoads() []*storagepb.StateSnapshot { + m.lock.Lock() + defer m.lock.Unlock() + + return append([]*storagepb.StateSnapshot(nil), m.loads...) +} + +// ExportClusterSnapshots implements storage.Snapshotter interface. +func (m *mockSnapshotter) ExportClusterSnapshots(f func(snapshot *storagepb.ClusterSnapshot) error) error { + m.lock.Lock() + defer m.lock.Unlock() + + m.snapshots++ + + for _, cluster := range m.data.Clusters { + if err := f(cluster); err != nil { + return err + } + } + + return nil +} + +// ImportClusterSnapshots implements storage.Snapshotter interface. +func (m *mockSnapshotter) ImportClusterSnapshots(f func() (*storagepb.ClusterSnapshot, bool, error)) error { + m.lock.Lock() + defer m.lock.Unlock() + + var clusters []*storagepb.ClusterSnapshot + + for { + cluster, ok, err := f() + if err != nil { + return err + } + + if !ok { + break + } + + clusters = append(clusters, cluster) + } + + m.loads = append(m.loads, &storagepb.StateSnapshot{Clusters: clusters}) + + return nil +} + +func statsForSnapshot(snapshot *storagepb.StateSnapshot) storage.SnapshotStats { + numAffiliates := 0 + numEndpoints := 0 + + for _, cluster := range snapshot.Clusters { + numAffiliates += len(cluster.Affiliates) + + for _, affiliate := range cluster.Affiliates { + numEndpoints += len(affiliate.Endpoints) + } + } + + return storage.SnapshotStats{ + NumClusters: len(snapshot.Clusters), + NumAffiliates: numAffiliates, + NumEndpoints: numEndpoints, + Size: snapshot.SizeVT(), + } +} + +func buildTestSnapshot(numClusters int) *storagepb.StateSnapshot { + clusters := make([]*storagepb.ClusterSnapshot, 0, numClusters) + + for i := range numClusters { + affiliates := make([]*storagepb.AffiliateSnapshot, 0, 5) + + for j := range 5 { + affiliates = append(affiliates, &storagepb.AffiliateSnapshot{ + Id: fmt.Sprintf("aff%d", j), + Expiration: timestamppb.New(time.Now().Add(time.Hour)), + Data: []byte(fmt.Sprintf("aff%d data", j)), + }) + } + + if i%2 == 0 { + affiliates[0].Endpoints = []*storagepb.EndpointSnapshot{ + { + Expiration: timestamppb.New(time.Now().Add(time.Hour)), + Data: []byte(fmt.Sprintf("endpoint%d data", i)), + }, + } + } + + clusters = append(clusters, &storagepb.ClusterSnapshot{ + Id: fmt.Sprintf("cluster%d", i), + Affiliates: affiliates, + }) + } + + return &storagepb.StateSnapshot{ + Clusters: clusters, + } +} + +// buildMaxSizeCluster creates a cluster snapshot with the maximum possible marshaled size within the limits of the discovery service. +func buildMaxSizeCluster() *storagepb.ClusterSnapshot { + largestTTL := ×tamppb.Timestamp{ + Seconds: math.MinInt64, + Nanos: math.MinInt32, + } // the timestamp with the maximum possible marshaled size + + affiliates := make([]*storagepb.AffiliateSnapshot, 0, limits.ClusterAffiliatesMax) + + for range limits.ClusterAffiliatesMax { + endpoints := make([]*storagepb.EndpointSnapshot, 0, limits.AffiliateEndpointsMax) + + for range limits.AffiliateEndpointsMax { + endpoints = append(endpoints, &storagepb.EndpointSnapshot{ + Expiration: largestTTL, + Data: bytes.Repeat([]byte("a"), limits.AffiliateDataMax), + }) + } + + affiliates = append(affiliates, &storagepb.AffiliateSnapshot{ + Id: strings.Repeat("a", limits.AffiliateIDMax), + Expiration: largestTTL, + Data: bytes.Repeat([]byte("a"), limits.AffiliateDataMax), + Endpoints: endpoints, + }) + } + + return &storagepb.ClusterSnapshot{ + Id: strings.Repeat("c", limits.ClusterIDMax), + Affiliates: affiliates, + } +} diff --git a/pkg/server/client_test.go b/pkg/server/client_test.go index 6b6892a..697efd0 100644 --- a/pkg/server/client_test.go +++ b/pkg/server/client_test.go @@ -406,8 +406,6 @@ func clusterSimulator(t *testing.T, endpoint string, logger *zap.Logger, numAffi eg, ctx := errgroup.WithContext(ctx) for i := range affiliates { - i := i - eg.Go(func() error { return affiliates[i].Run(ctx, logger, notifyCh[i]) }) @@ -446,7 +444,7 @@ func clusterSimulator(t *testing.T, endpoint string, logger *zap.Logger, numAffi expected := make(map[int]struct{}) - for i := 0; i < numAffiliates; i++ { + for i := range numAffiliates { if i != affiliateID { expected[i] = struct{}{} } @@ -493,7 +491,7 @@ func clusterSimulator(t *testing.T, endpoint string, logger *zap.Logger, numAffi // eventually all affiliates should see discovered state const NumAttempts = 50 // 50 * 100ms = 5s - for j := 0; j < NumAttempts; j++ { + for j := range NumAttempts { matches := true for i := range affiliates { diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 90dc673..89c24f4 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -407,7 +407,7 @@ func TestValidation(t *testing.T) { t.Run("AffiliateUpdateTooMany", func(t *testing.T) { t.Parallel() - for i := 0; i < limits.ClusterAffiliatesMax; i++ { + for i := range limits.ClusterAffiliatesMax { _, err := client.AffiliateUpdate(ctx, &pb.AffiliateUpdateRequest{ ClusterId: "fatcluster", AffiliateId: fmt.Sprintf("af%d", i), @@ -428,7 +428,7 @@ func TestValidation(t *testing.T) { t.Run("AffiliateUpdateTooManyEndpoints", func(t *testing.T) { t.Parallel() - for i := 0; i < limits.AffiliateEndpointsMax; i++ { + for i := range limits.AffiliateEndpointsMax { _, err := client.AffiliateUpdate(ctx, &pb.AffiliateUpdateRequest{ ClusterId: "smallcluster", AffiliateId: "af", @@ -514,7 +514,7 @@ func testHitRateLimit(client pb.ClusterClient, ip string) func(t *testing.T) { ctx = metadata.AppendToOutgoingContext(ctx, "X-Real-IP", ip) - for i := 0; i < limits.IPRateBurstSizeMax; i++ { + for range limits.IPRateBurstSizeMax { _, err := client.Hello(ctx, &pb.HelloRequest{ ClusterId: "fake", ClientVersion: "v0.12.0", diff --git a/pkg/server/version_test.go b/pkg/server/version_test.go index 003d522..3f1554c 100644 --- a/pkg/server/version_test.go +++ b/pkg/server/version_test.go @@ -22,8 +22,6 @@ func TestParseVersion(t *testing.T) { "v0.14.0-alpha.0-7-gf7d9f211": "v0.14.0-alpha.0-dev", "v0.14.0-alpha.0-7-gf7d9f211-dirty": "v0.14.0-alpha.0-dev", } { - v, expected := v, expected - t.Run(v, func(t *testing.T) { t.Parallel()