From 2d42e3e8e9eee456d411be0fcbd2b2fa2c918e1f Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Fri, 27 Dec 2024 15:49:51 +0400 Subject: [PATCH] feat: implement correct watch restart for controller runtime Controller runtime does `WatchKind` on all controller input resources, but initial bookmark value is only delivered if there is any update received (on each of the watches). If resource cache is used, `BootstrapContents` will deliver the bookmark, but if the cache is not used, and there are no updates, no bookmark is ever sent, so watch can't be restarted. Fix this by adding yet another option to send 'Noop' events which carry only the bookmark allowing watch retry code to handle it. Signed-off-by: Andrey Smirnov --- .kres.yaml | 6 +- Dockerfile | 12 +- Makefile | 4 +- api/key_storage/key_storage.pb.go | 24 +-- api/v1alpha1/meta.pb.go | 29 ++- api/v1alpha1/resource.pb.go | 76 ++++--- api/v1alpha1/state.pb.go | 280 +++++++++++++------------- api/v1alpha1/state.pb.gw.go | 252 +++++++++-------------- api/v1alpha1/state_vtproto.pb.go | 37 ++++ pkg/controller/runtime/runtime.go | 7 +- pkg/resource/rtestutils/assertions.go | 4 +- pkg/resource/rtestutils/destroy.go | 2 +- pkg/state/conformance/state.go | 45 +++-- pkg/state/impl/inmem/collection.go | 22 +- pkg/state/options.go | 8 + pkg/state/protobuf/client/client.go | 5 + pkg/state/protobuf/protobuf_test.go | 14 +- pkg/state/protobuf/runtime_test.go | 83 ++++++++ pkg/state/protobuf/server/server.go | 6 + pkg/state/state.go | 5 +- pkg/state/wrap.go | 2 +- 21 files changed, 518 insertions(+), 405 deletions(-) create mode 100644 pkg/state/protobuf/runtime_test.go diff --git a/.kres.yaml b/.kres.yaml index 7fd9ba4c..3a34e2a5 100644 --- a/.kres.yaml +++ b/.kres.yaml @@ -5,15 +5,15 @@ spec: - --experimental_allow_proto3_optional vtProtobufEnabled: true specs: - - source: https://raw.githubusercontent.com/cosi-project/specification/3a4d5d04bea01b9549af4e8c0d593ace09a7ebd3/proto/v1alpha1/resource.proto + - source: https://raw.githubusercontent.com/cosi-project/specification/a25fac056c642b32468b030387ab94c17bc3ba1d/proto/v1alpha1/resource.proto subdirectory: v1alpha1/ genGateway: true external: false - - source: https://raw.githubusercontent.com/cosi-project/specification/3a4d5d04bea01b9549af4e8c0d593ace09a7ebd3/proto/v1alpha1/state.proto + - source: https://raw.githubusercontent.com/cosi-project/specification/a25fac056c642b32468b030387ab94c17bc3ba1d/proto/v1alpha1/state.proto subdirectory: v1alpha1/ genGateway: true external: false - - source: https://raw.githubusercontent.com/cosi-project/specification/3a4d5d04bea01b9549af4e8c0d593ace09a7ebd3/proto/v1alpha1/meta.proto + - source: https://raw.githubusercontent.com/cosi-project/specification/a25fac056c642b32468b030387ab94c17bc3ba1d/proto/v1alpha1/meta.proto subdirectory: v1alpha1/ genGateway: true external: false diff --git a/Dockerfile b/Dockerfile index 7fc8246b..6c70b85e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,8 @@ -# syntax = docker/dockerfile-upstream:1.12.0-labs +# syntax = docker/dockerfile-upstream:1.12.1-labs # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-12-17T14:42:24Z by kres b9507d6. +# Generated on 2024-12-27T11:10:43Z by kres fcff05e. ARG TOOLCHAIN @@ -11,7 +11,7 @@ FROM ghcr.io/siderolabs/ca-certificates:v1.9.0 AS image-ca-certificates FROM ghcr.io/siderolabs/fhs:v1.9.0 AS image-fhs # runs markdownlint -FROM docker.io/oven/bun:1.1.38-alpine AS lint-markdown +FROM docker.io/oven/bun:1.1.40-alpine AS lint-markdown WORKDIR /src RUN bun i markdownlint-cli@0.43.0 sentences-per-line@0.3.0 COPY .markdownlint.json . @@ -20,9 +20,9 @@ RUN bunx markdownlint --ignore "CHANGELOG.md" --ignore "**/node_modules/**" --ig # collects proto specs FROM scratch AS proto-specs -ADD https://raw.githubusercontent.com/cosi-project/specification/3a4d5d04bea01b9549af4e8c0d593ace09a7ebd3/proto/v1alpha1/resource.proto /api/v1alpha1/ -ADD https://raw.githubusercontent.com/cosi-project/specification/3a4d5d04bea01b9549af4e8c0d593ace09a7ebd3/proto/v1alpha1/state.proto /api/v1alpha1/ -ADD https://raw.githubusercontent.com/cosi-project/specification/3a4d5d04bea01b9549af4e8c0d593ace09a7ebd3/proto/v1alpha1/meta.proto /api/v1alpha1/ +ADD https://raw.githubusercontent.com/cosi-project/specification/a25fac056c642b32468b030387ab94c17bc3ba1d/proto/v1alpha1/resource.proto /api/v1alpha1/ +ADD https://raw.githubusercontent.com/cosi-project/specification/a25fac056c642b32468b030387ab94c17bc3ba1d/proto/v1alpha1/state.proto /api/v1alpha1/ +ADD https://raw.githubusercontent.com/cosi-project/specification/a25fac056c642b32468b030387ab94c17bc3ba1d/proto/v1alpha1/meta.proto /api/v1alpha1/ ADD api/key_storage/key_storage.proto /api/key_storage/ # base toolchain image diff --git a/Makefile b/Makefile index 9ab71123..4a44d48a 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-12-09T13:31:00Z by kres 8183c20. +# Generated on 2024-12-27T11:10:43Z by kres fcff05e. # common variables @@ -17,7 +17,7 @@ WITH_RACE ?= false REGISTRY ?= ghcr.io USERNAME ?= cosi-project REGISTRY_AND_USERNAME ?= $(REGISTRY)/$(USERNAME) -PROTOBUF_GO_VERSION ?= 1.35.2 +PROTOBUF_GO_VERSION ?= 1.36.0 GRPC_GO_VERSION ?= 1.5.1 GRPC_GATEWAY_VERSION ?= 2.24.0 VTPROTOBUF_VERSION ?= 0.6.0 diff --git a/api/key_storage/key_storage.pb.go b/api/key_storage/key_storage.pb.go index 882f21d0..081cf346 100644 --- a/api/key_storage/key_storage.pb.go +++ b/api/key_storage/key_storage.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 +// protoc-gen-go v1.36.0 // protoc v4.24.4 // source: key_storage/key_storage.proto @@ -117,13 +117,12 @@ func (Algorithm) EnumDescriptor() ([]byte, []int) { // Storage is a main storage for keys in memory and in db. type Storage struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - StorageVersion StorageVersion `protobuf:"varint,1,opt,name=storage_version,json=storageVersion,proto3,enum=cosi.internal.key_storage.StorageVersion" json:"storage_version,omitempty"` - KeySlots map[string]*KeySlot `protobuf:"bytes,2,rep,name=key_slots,json=keySlots,proto3" json:"key_slots,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - KeysHmacHash []byte `protobuf:"bytes,3,opt,name=keys_hmac_hash,json=keysHmacHash,proto3" json:"keys_hmac_hash,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + StorageVersion StorageVersion `protobuf:"varint,1,opt,name=storage_version,json=storageVersion,proto3,enum=cosi.internal.key_storage.StorageVersion" json:"storage_version,omitempty"` + KeySlots map[string]*KeySlot `protobuf:"bytes,2,rep,name=key_slots,json=keySlots,proto3" json:"key_slots,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + KeysHmacHash []byte `protobuf:"bytes,3,opt,name=keys_hmac_hash,json=keysHmacHash,proto3" json:"keys_hmac_hash,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *Storage) Reset() { @@ -179,12 +178,11 @@ func (x *Storage) GetKeysHmacHash() []byte { // KeySlot is a single key slot in KeyStorage. type KeySlot struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Algorithm Algorithm `protobuf:"varint,1,opt,name=algorithm,proto3,enum=cosi.internal.key_storage.Algorithm" json:"algorithm,omitempty"` + EncryptedKey []byte `protobuf:"bytes,2,opt,name=encrypted_key,json=encryptedKey,proto3" json:"encrypted_key,omitempty"` unknownFields protoimpl.UnknownFields - - Algorithm Algorithm `protobuf:"varint,1,opt,name=algorithm,proto3,enum=cosi.internal.key_storage.Algorithm" json:"algorithm,omitempty"` - EncryptedKey []byte `protobuf:"bytes,2,opt,name=encrypted_key,json=encryptedKey,proto3" json:"encrypted_key,omitempty"` + sizeCache protoimpl.SizeCache } func (x *KeySlot) Reset() { diff --git a/api/v1alpha1/meta.pb.go b/api/v1alpha1/meta.pb.go index 1c43e3f8..ba9b5e59 100644 --- a/api/v1alpha1/meta.pb.go +++ b/api/v1alpha1/meta.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 +// protoc-gen-go v1.36.0 // protoc v4.24.4 // source: v1alpha1/meta.proto @@ -71,12 +71,11 @@ func (ResourceDefinitionSpec_Sensitivity) EnumDescriptor() ([]byte, []int) { // NamespaceSpec is the protobuf serialization of the Namespace resource. type NamespaceSpec struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - + state protoimpl.MessageState `protogen:"open.v1"` // Description of the namespace. - Description string `protobuf:"bytes,1,opt,name=description,proto3" json:"description,omitempty"` + Description string `protobuf:"bytes,1,opt,name=description,proto3" json:"description,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *NamespaceSpec) Reset() { @@ -118,10 +117,7 @@ func (x *NamespaceSpec) GetDescription() string { // ResourceDefinitionSpec is the protobuf serialization of the ResourceDefinition resource. type ResourceDefinitionSpec struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - + state protoimpl.MessageState `protogen:"open.v1"` // Canonical type name. ResourceType string `protobuf:"bytes,1,opt,name=resource_type,json=resourceType,proto3" json:"resource_type,omitempty"` // Displayed human-readable type name. @@ -136,7 +132,9 @@ type ResourceDefinitionSpec struct { PrintColumns []*ResourceDefinitionSpec_PrintColumn `protobuf:"bytes,6,rep,name=print_columns,json=printColumns,proto3" json:"print_columns,omitempty"` // Sensitivity indicates how secret resource of this type is. // The empty value represents a non-sensitive resource. - Sensitivity ResourceDefinitionSpec_Sensitivity `protobuf:"varint,7,opt,name=sensitivity,proto3,enum=cosi.resource.meta.ResourceDefinitionSpec_Sensitivity" json:"sensitivity,omitempty"` + Sensitivity ResourceDefinitionSpec_Sensitivity `protobuf:"varint,7,opt,name=sensitivity,proto3,enum=cosi.resource.meta.ResourceDefinitionSpec_Sensitivity" json:"sensitivity,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *ResourceDefinitionSpec) Reset() { @@ -219,12 +217,11 @@ func (x *ResourceDefinitionSpec) GetSensitivity() ResourceDefinitionSpec_Sensiti } type ResourceDefinitionSpec_PrintColumn struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + JsonPath string `protobuf:"bytes,2,opt,name=json_path,json=jsonPath,proto3" json:"json_path,omitempty"` unknownFields protoimpl.UnknownFields - - Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` - JsonPath string `protobuf:"bytes,2,opt,name=json_path,json=jsonPath,proto3" json:"json_path,omitempty"` + sizeCache protoimpl.SizeCache } func (x *ResourceDefinitionSpec_PrintColumn) Reset() { diff --git a/api/v1alpha1/resource.pb.go b/api/v1alpha1/resource.pb.go index e15f698e..b5cf6281 100644 --- a/api/v1alpha1/resource.pb.go +++ b/api/v1alpha1/resource.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 +// protoc-gen-go v1.36.0 // protoc v4.24.4 // source: v1alpha1/resource.proto @@ -107,21 +107,20 @@ func (LabelTerm_Operation) EnumDescriptor() ([]byte, []int) { // (finalizers) are attached controllers blocking teardown of the resource. // (labels) and (annotations) are free-form key-value pairs; labels allow queries. type Metadata struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` + Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"` + Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"` + Version string `protobuf:"bytes,4,opt,name=version,proto3" json:"version,omitempty"` + Owner string `protobuf:"bytes,5,opt,name=owner,proto3" json:"owner,omitempty"` + Phase string `protobuf:"bytes,6,opt,name=phase,proto3" json:"phase,omitempty"` + Created *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=created,proto3" json:"created,omitempty"` + Updated *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=updated,proto3" json:"updated,omitempty"` + Finalizers []string `protobuf:"bytes,9,rep,name=finalizers,proto3" json:"finalizers,omitempty"` + Annotations map[string]string `protobuf:"bytes,11,rep,name=annotations,proto3" json:"annotations,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + Labels map[string]string `protobuf:"bytes,10,rep,name=labels,proto3" json:"labels,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` unknownFields protoimpl.UnknownFields - - Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` - Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"` - Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"` - Version string `protobuf:"bytes,4,opt,name=version,proto3" json:"version,omitempty"` - Owner string `protobuf:"bytes,5,opt,name=owner,proto3" json:"owner,omitempty"` - Phase string `protobuf:"bytes,6,opt,name=phase,proto3" json:"phase,omitempty"` - Created *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=created,proto3" json:"created,omitempty"` - Updated *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=updated,proto3" json:"updated,omitempty"` - Finalizers []string `protobuf:"bytes,9,rep,name=finalizers,proto3" json:"finalizers,omitempty"` - Annotations map[string]string `protobuf:"bytes,11,rep,name=annotations,proto3" json:"annotations,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - Labels map[string]string `protobuf:"bytes,10,rep,name=labels,proto3" json:"labels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + sizeCache protoimpl.SizeCache } func (x *Metadata) Reset() { @@ -233,14 +232,13 @@ func (x *Metadata) GetLabels() map[string]string { // Spec defines content of the resource. type Spec struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - + state protoimpl.MessageState `protogen:"open.v1"` // Protobuf-serialized representation of the resource. ProtoSpec []byte `protobuf:"bytes,1,opt,name=proto_spec,json=protoSpec,proto3" json:"proto_spec,omitempty"` // YAML representation of the spec (optional). - YamlSpec string `protobuf:"bytes,2,opt,name=yaml_spec,json=yamlSpec,proto3" json:"yaml_spec,omitempty"` + YamlSpec string `protobuf:"bytes,2,opt,name=yaml_spec,json=yamlSpec,proto3" json:"yaml_spec,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *Spec) Reset() { @@ -289,12 +287,11 @@ func (x *Spec) GetYamlSpec() string { // Resource is a combination of metadata and spec. type Resource struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Metadata *Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"` + Spec *Spec `protobuf:"bytes,2,opt,name=spec,proto3" json:"spec,omitempty"` unknownFields protoimpl.UnknownFields - - Metadata *Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"` - Spec *Spec `protobuf:"bytes,2,opt,name=spec,proto3" json:"spec,omitempty"` + sizeCache protoimpl.SizeCache } func (x *Resource) Reset() { @@ -343,15 +340,14 @@ func (x *Resource) GetSpec() *Spec { // LabelTerm is an expression on a label. type LabelTerm struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` - Op LabelTerm_Operation `protobuf:"varint,2,opt,name=op,proto3,enum=cosi.resource.LabelTerm_Operation" json:"op,omitempty"` - Value []string `protobuf:"bytes,3,rep,name=value,proto3" json:"value,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Op LabelTerm_Operation `protobuf:"varint,2,opt,name=op,proto3,enum=cosi.resource.LabelTerm_Operation" json:"op,omitempty"` + Value []string `protobuf:"bytes,3,rep,name=value,proto3" json:"value,omitempty"` // Inverts the condition. - Invert bool `protobuf:"varint,5,opt,name=invert,proto3" json:"invert,omitempty"` + Invert bool `protobuf:"varint,5,opt,name=invert,proto3" json:"invert,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *LabelTerm) Reset() { @@ -416,11 +412,10 @@ func (x *LabelTerm) GetInvert() bool { // // Terms are combined with AND. type LabelQuery struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Terms []*LabelTerm `protobuf:"bytes,1,rep,name=terms,proto3" json:"terms,omitempty"` unknownFields protoimpl.UnknownFields - - Terms []*LabelTerm `protobuf:"bytes,1,rep,name=terms,proto3" json:"terms,omitempty"` + sizeCache protoimpl.SizeCache } func (x *LabelQuery) Reset() { @@ -462,11 +457,10 @@ func (x *LabelQuery) GetTerms() []*LabelTerm { // IDQuery is a query on resource metadata ID. type IDQuery struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Regexp string `protobuf:"bytes,1,opt,name=regexp,proto3" json:"regexp,omitempty"` unknownFields protoimpl.UnknownFields - - Regexp string `protobuf:"bytes,1,opt,name=regexp,proto3" json:"regexp,omitempty"` + sizeCache protoimpl.SizeCache } func (x *IDQuery) Reset() { diff --git a/api/v1alpha1/state.pb.go b/api/v1alpha1/state.pb.go index 9803d916..39b8c9cd 100644 --- a/api/v1alpha1/state.pb.go +++ b/api/v1alpha1/state.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 +// protoc-gen-go v1.36.0 // protoc v4.24.4 // source: v1alpha1/state.proto @@ -29,6 +29,7 @@ const ( EventType_DESTROYED EventType = 2 EventType_BOOTSTRAPPED EventType = 3 EventType_ERRORED EventType = 4 + EventType_NOOP EventType = 5 ) // Enum value maps for EventType. @@ -39,6 +40,7 @@ var ( 2: "DESTROYED", 3: "BOOTSTRAPPED", 4: "ERRORED", + 5: "NOOP", } EventType_value = map[string]int32{ "CREATED": 0, @@ -46,6 +48,7 @@ var ( "DESTROYED": 2, "BOOTSTRAPPED": 3, "ERRORED": 4, + "NOOP": 5, } ) @@ -78,15 +81,14 @@ func (EventType) EnumDescriptor() ([]byte, []int) { // Event is emitted when resource changes. type Event struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` + Old *Resource `protobuf:"bytes,3,opt,name=old,proto3" json:"old,omitempty"` + Error *string `protobuf:"bytes,4,opt,name=error,proto3,oneof" json:"error,omitempty"` + EventType EventType `protobuf:"varint,2,opt,name=event_type,json=eventType,proto3,enum=cosi.resource.EventType" json:"event_type,omitempty"` + Bookmark []byte `protobuf:"bytes,5,opt,name=bookmark,proto3,oneof" json:"bookmark,omitempty"` unknownFields protoimpl.UnknownFields - - Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` - Old *Resource `protobuf:"bytes,3,opt,name=old,proto3" json:"old,omitempty"` - Error *string `protobuf:"bytes,4,opt,name=error,proto3,oneof" json:"error,omitempty"` - EventType EventType `protobuf:"varint,2,opt,name=event_type,json=eventType,proto3,enum=cosi.resource.EventType" json:"event_type,omitempty"` - Bookmark []byte `protobuf:"bytes,5,opt,name=bookmark,proto3,oneof" json:"bookmark,omitempty"` + sizeCache protoimpl.SizeCache } func (x *Event) Reset() { @@ -155,14 +157,13 @@ func (x *Event) GetBookmark() []byte { } type GetRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` + Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"` + Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"` + Options *GetOptions `protobuf:"bytes,4,opt,name=options,proto3" json:"options,omitempty"` unknownFields protoimpl.UnknownFields - - Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` - Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"` - Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"` - Options *GetOptions `protobuf:"bytes,4,opt,name=options,proto3" json:"options,omitempty"` + sizeCache protoimpl.SizeCache } func (x *GetRequest) Reset() { @@ -224,9 +225,9 @@ func (x *GetRequest) GetOptions() *GetOptions { } type GetOptions struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *GetOptions) Reset() { @@ -260,11 +261,10 @@ func (*GetOptions) Descriptor() ([]byte, []int) { } type GetResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` unknownFields protoimpl.UnknownFields - - Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` + sizeCache protoimpl.SizeCache } func (x *GetResponse) Reset() { @@ -305,13 +305,12 @@ func (x *GetResponse) GetResource() *Resource { } type ListRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` + Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"` + Options *ListOptions `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"` unknownFields protoimpl.UnknownFields - - Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` - Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"` - Options *ListOptions `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"` + sizeCache protoimpl.SizeCache } func (x *ListRequest) Reset() { @@ -366,12 +365,11 @@ func (x *ListRequest) GetOptions() *ListOptions { } type ListOptions struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + LabelQuery []*LabelQuery `protobuf:"bytes,1,rep,name=label_query,json=labelQuery,proto3" json:"label_query,omitempty"` + IdQuery *IDQuery `protobuf:"bytes,2,opt,name=id_query,json=idQuery,proto3" json:"id_query,omitempty"` unknownFields protoimpl.UnknownFields - - LabelQuery []*LabelQuery `protobuf:"bytes,1,rep,name=label_query,json=labelQuery,proto3" json:"label_query,omitempty"` - IdQuery *IDQuery `protobuf:"bytes,2,opt,name=id_query,json=idQuery,proto3" json:"id_query,omitempty"` + sizeCache protoimpl.SizeCache } func (x *ListOptions) Reset() { @@ -419,11 +417,10 @@ func (x *ListOptions) GetIdQuery() *IDQuery { } type ListResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` unknownFields protoimpl.UnknownFields - - Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` + sizeCache protoimpl.SizeCache } func (x *ListResponse) Reset() { @@ -464,12 +461,11 @@ func (x *ListResponse) GetResource() *Resource { } type CreateRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` + Options *CreateOptions `protobuf:"bytes,2,opt,name=options,proto3" json:"options,omitempty"` unknownFields protoimpl.UnknownFields - - Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` - Options *CreateOptions `protobuf:"bytes,2,opt,name=options,proto3" json:"options,omitempty"` + sizeCache protoimpl.SizeCache } func (x *CreateRequest) Reset() { @@ -517,11 +513,10 @@ func (x *CreateRequest) GetOptions() *CreateOptions { } type CreateOptions struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Owner string `protobuf:"bytes,1,opt,name=owner,proto3" json:"owner,omitempty"` unknownFields protoimpl.UnknownFields - - Owner string `protobuf:"bytes,1,opt,name=owner,proto3" json:"owner,omitempty"` + sizeCache protoimpl.SizeCache } func (x *CreateOptions) Reset() { @@ -562,11 +557,10 @@ func (x *CreateOptions) GetOwner() string { } type CreateResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` unknownFields protoimpl.UnknownFields - - Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` + sizeCache protoimpl.SizeCache } func (x *CreateResponse) Reset() { @@ -607,12 +601,11 @@ func (x *CreateResponse) GetResource() *Resource { } type UpdateRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + NewResource *Resource `protobuf:"bytes,2,opt,name=new_resource,json=newResource,proto3" json:"new_resource,omitempty"` + Options *UpdateOptions `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"` unknownFields protoimpl.UnknownFields - - NewResource *Resource `protobuf:"bytes,2,opt,name=new_resource,json=newResource,proto3" json:"new_resource,omitempty"` - Options *UpdateOptions `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"` + sizeCache protoimpl.SizeCache } func (x *UpdateRequest) Reset() { @@ -660,12 +653,11 @@ func (x *UpdateRequest) GetOptions() *UpdateOptions { } type UpdateOptions struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Owner string `protobuf:"bytes,1,opt,name=owner,proto3" json:"owner,omitempty"` + ExpectedPhase *string `protobuf:"bytes,2,opt,name=expected_phase,json=expectedPhase,proto3,oneof" json:"expected_phase,omitempty"` unknownFields protoimpl.UnknownFields - - Owner string `protobuf:"bytes,1,opt,name=owner,proto3" json:"owner,omitempty"` - ExpectedPhase *string `protobuf:"bytes,2,opt,name=expected_phase,json=expectedPhase,proto3,oneof" json:"expected_phase,omitempty"` + sizeCache protoimpl.SizeCache } func (x *UpdateOptions) Reset() { @@ -713,11 +705,10 @@ func (x *UpdateOptions) GetExpectedPhase() string { } type UpdateResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` unknownFields protoimpl.UnknownFields - - Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` + sizeCache protoimpl.SizeCache } func (x *UpdateResponse) Reset() { @@ -758,14 +749,13 @@ func (x *UpdateResponse) GetResource() *Resource { } type DestroyRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` + Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"` + Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"` + Options *DestroyOptions `protobuf:"bytes,4,opt,name=options,proto3" json:"options,omitempty"` unknownFields protoimpl.UnknownFields - - Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` - Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"` - Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"` - Options *DestroyOptions `protobuf:"bytes,4,opt,name=options,proto3" json:"options,omitempty"` + sizeCache protoimpl.SizeCache } func (x *DestroyRequest) Reset() { @@ -827,11 +817,10 @@ func (x *DestroyRequest) GetOptions() *DestroyOptions { } type DestroyOptions struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Owner string `protobuf:"bytes,1,opt,name=owner,proto3" json:"owner,omitempty"` unknownFields protoimpl.UnknownFields - - Owner string `protobuf:"bytes,1,opt,name=owner,proto3" json:"owner,omitempty"` + sizeCache protoimpl.SizeCache } func (x *DestroyOptions) Reset() { @@ -872,9 +861,9 @@ func (x *DestroyOptions) GetOwner() string { } type DestroyResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *DestroyResponse) Reset() { @@ -908,18 +897,17 @@ func (*DestroyResponse) Descriptor() ([]byte, []int) { } type WatchRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` - Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"` - Id *string `protobuf:"bytes,3,opt,name=id,proto3,oneof" json:"id,omitempty"` - Options *WatchOptions `protobuf:"bytes,4,opt,name=options,proto3" json:"options,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` + Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"` + Id *string `protobuf:"bytes,3,opt,name=id,proto3,oneof" json:"id,omitempty"` + Options *WatchOptions `protobuf:"bytes,4,opt,name=options,proto3" json:"options,omitempty"` // Supported API versions: // 0 (not set): event types Created,Updated,Deleted // 1: additional event types Bootstrapped,Errored - ApiVersion int32 `protobuf:"varint,5,opt,name=api_version,json=apiVersion,proto3" json:"api_version,omitempty"` + ApiVersion int32 `protobuf:"varint,5,opt,name=api_version,json=apiVersion,proto3" json:"api_version,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *WatchRequest) Reset() { @@ -988,16 +976,16 @@ func (x *WatchRequest) GetApiVersion() int32 { } type WatchOptions struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - BootstrapContents bool `protobuf:"varint,1,opt,name=bootstrap_contents,json=bootstrapContents,proto3" json:"bootstrap_contents,omitempty"` - TailEvents int32 `protobuf:"varint,2,opt,name=tail_events,json=tailEvents,proto3" json:"tail_events,omitempty"` - LabelQuery []*LabelQuery `protobuf:"bytes,3,rep,name=label_query,json=labelQuery,proto3" json:"label_query,omitempty"` - IdQuery *IDQuery `protobuf:"bytes,4,opt,name=id_query,json=idQuery,proto3" json:"id_query,omitempty"` - Aggregated bool `protobuf:"varint,5,opt,name=aggregated,proto3" json:"aggregated,omitempty"` - StartFromBookmark []byte `protobuf:"bytes,6,opt,name=start_from_bookmark,json=startFromBookmark,proto3,oneof" json:"start_from_bookmark,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + BootstrapContents bool `protobuf:"varint,1,opt,name=bootstrap_contents,json=bootstrapContents,proto3" json:"bootstrap_contents,omitempty"` + TailEvents int32 `protobuf:"varint,2,opt,name=tail_events,json=tailEvents,proto3" json:"tail_events,omitempty"` + LabelQuery []*LabelQuery `protobuf:"bytes,3,rep,name=label_query,json=labelQuery,proto3" json:"label_query,omitempty"` + IdQuery *IDQuery `protobuf:"bytes,4,opt,name=id_query,json=idQuery,proto3" json:"id_query,omitempty"` + Aggregated bool `protobuf:"varint,5,opt,name=aggregated,proto3" json:"aggregated,omitempty"` + StartFromBookmark []byte `protobuf:"bytes,6,opt,name=start_from_bookmark,json=startFromBookmark,proto3,oneof" json:"start_from_bookmark,omitempty"` + BootstrapBookmark bool `protobuf:"varint,7,opt,name=bootstrap_bookmark,json=bootstrapBookmark,proto3" json:"bootstrap_bookmark,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *WatchOptions) Reset() { @@ -1072,12 +1060,18 @@ func (x *WatchOptions) GetStartFromBookmark() []byte { return nil } +func (x *WatchOptions) GetBootstrapBookmark() bool { + if x != nil { + return x.BootstrapBookmark + } + return false +} + type WatchResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Event []*Event `protobuf:"bytes,1,rep,name=event,proto3" json:"event,omitempty"` unknownFields protoimpl.UnknownFields - - Event []*Event `protobuf:"bytes,1,rep,name=event,proto3" json:"event,omitempty"` + sizeCache protoimpl.SizeCache } func (x *WatchResponse) Reset() { @@ -1231,7 +1225,7 @@ var file_v1alpha1_state_proto_rawDesc = []byte{ 0x61, 0x74, 0x63, 0x68, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x61, 0x70, 0x69, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x61, 0x70, 0x69, 0x56, 0x65, - 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x05, 0x0a, 0x03, 0x5f, 0x69, 0x64, 0x22, 0xba, 0x02, 0x0a, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x05, 0x0a, 0x03, 0x5f, 0x69, 0x64, 0x22, 0xe9, 0x02, 0x0a, 0x0c, 0x57, 0x61, 0x74, 0x63, 0x68, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2d, 0x0a, 0x12, 0x62, 0x6f, 0x6f, 0x74, 0x73, 0x74, 0x72, 0x61, 0x70, 0x5f, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x11, 0x62, 0x6f, 0x6f, 0x74, 0x73, @@ -1250,47 +1244,51 @@ var file_v1alpha1_state_proto_rawDesc = []byte{ 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x66, 0x72, 0x6f, 0x6d, 0x5f, 0x62, 0x6f, 0x6f, 0x6b, 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x11, 0x73, 0x74, 0x61, 0x72, 0x74, 0x46, 0x72, 0x6f, 0x6d, 0x42, 0x6f, 0x6f, 0x6b, 0x6d, 0x61, 0x72, 0x6b, 0x88, 0x01, - 0x01, 0x42, 0x16, 0x0a, 0x14, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x66, 0x72, 0x6f, 0x6d, - 0x5f, 0x62, 0x6f, 0x6f, 0x6b, 0x6d, 0x61, 0x72, 0x6b, 0x22, 0x3b, 0x0a, 0x0d, 0x57, 0x61, 0x74, - 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x05, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x63, 0x6f, 0x73, 0x69, - 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, - 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2a, 0x53, 0x0a, 0x09, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, - 0x79, 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x44, 0x10, 0x00, - 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0d, 0x0a, - 0x09, 0x44, 0x45, 0x53, 0x54, 0x52, 0x4f, 0x59, 0x45, 0x44, 0x10, 0x02, 0x12, 0x10, 0x0a, 0x0c, - 0x42, 0x4f, 0x4f, 0x54, 0x53, 0x54, 0x52, 0x41, 0x50, 0x50, 0x45, 0x44, 0x10, 0x03, 0x12, 0x0b, - 0x0a, 0x07, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x45, 0x44, 0x10, 0x04, 0x32, 0xa6, 0x03, 0x0a, 0x05, - 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x3c, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x19, 0x2e, 0x63, - 0x6f, 0x73, 0x69, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x47, 0x65, 0x74, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, - 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, 0x04, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x1a, 0x2e, 0x63, 0x6f, - 0x73, 0x69, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x4c, 0x69, 0x73, 0x74, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, - 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x45, 0x0a, 0x06, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, + 0x01, 0x12, 0x2d, 0x0a, 0x12, 0x62, 0x6f, 0x6f, 0x74, 0x73, 0x74, 0x72, 0x61, 0x70, 0x5f, 0x62, + 0x6f, 0x6f, 0x6b, 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x07, 0x20, 0x01, 0x28, 0x08, 0x52, 0x11, 0x62, + 0x6f, 0x6f, 0x74, 0x73, 0x74, 0x72, 0x61, 0x70, 0x42, 0x6f, 0x6f, 0x6b, 0x6d, 0x61, 0x72, 0x6b, + 0x42, 0x16, 0x0a, 0x14, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x66, 0x72, 0x6f, 0x6d, 0x5f, + 0x62, 0x6f, 0x6f, 0x6b, 0x6d, 0x61, 0x72, 0x6b, 0x22, 0x3b, 0x0a, 0x0d, 0x57, 0x61, 0x74, 0x63, + 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x05, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, + 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x05, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2a, 0x5d, 0x0a, 0x09, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, + 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x44, 0x10, 0x00, 0x12, + 0x0b, 0x0a, 0x07, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, + 0x44, 0x45, 0x53, 0x54, 0x52, 0x4f, 0x59, 0x45, 0x44, 0x10, 0x02, 0x12, 0x10, 0x0a, 0x0c, 0x42, + 0x4f, 0x4f, 0x54, 0x53, 0x54, 0x52, 0x41, 0x50, 0x50, 0x45, 0x44, 0x10, 0x03, 0x12, 0x0b, 0x0a, + 0x07, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x45, 0x44, 0x10, 0x04, 0x12, 0x08, 0x0a, 0x04, 0x4e, 0x4f, + 0x4f, 0x50, 0x10, 0x05, 0x32, 0xa6, 0x03, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x3c, + 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x19, 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, 0x65, 0x73, + 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x1a, 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, 0x04, + 0x4c, 0x69, 0x73, 0x74, 0x12, 0x1a, 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, 0x65, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x1b, 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, + 0x45, 0x0a, 0x06, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x12, 0x1c, 0x2e, 0x63, 0x6f, 0x73, 0x69, + 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, + 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x45, 0x0a, 0x06, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x1c, 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, - 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x43, - 0x72, 0x65, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x45, 0x0a, - 0x06, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x1c, 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, - 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, 0x65, 0x73, - 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x48, 0x0a, 0x07, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x12, - 0x1d, 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, - 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, - 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x44, - 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x44, - 0x0a, 0x05, 0x57, 0x61, 0x74, 0x63, 0x68, 0x12, 0x1b, 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, - 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, 0x65, 0x73, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x30, 0x01, 0x42, 0x2e, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, - 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x73, 0x69, 0x2d, 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x2f, - 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x61, 0x6c, - 0x70, 0x68, 0x61, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, + 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x55, + 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x48, 0x0a, + 0x07, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x12, 0x1d, 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, + 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, + 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x44, 0x65, 0x73, 0x74, 0x72, 0x6f, 0x79, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x44, 0x0a, 0x05, 0x57, 0x61, 0x74, 0x63, 0x68, + 0x12, 0x1b, 0x2e, 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, + 0x63, 0x6f, 0x73, 0x69, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x57, 0x61, + 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x2e, 0x5a, + 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x73, 0x69, + 0x2d, 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x2f, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, + 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/api/v1alpha1/state.pb.gw.go b/api/v1alpha1/state.pb.gw.go index 0c17093e..624a7982 100644 --- a/api/v1alpha1/state.pb.gw.go +++ b/api/v1alpha1/state.pb.gw.go @@ -10,6 +10,7 @@ package v1alpha1 import ( "context" + "errors" "io" "net/http" @@ -24,47 +25,48 @@ import ( ) // Suppress "imported and not used" errors -var _ codes.Code -var _ io.Reader -var _ status.Status -var _ = runtime.String -var _ = utilities.NewDoubleArray -var _ = metadata.Join +var ( + _ codes.Code + _ io.Reader + _ status.Status + _ = errors.New + _ = runtime.String + _ = utilities.NewDoubleArray + _ = metadata.Join +) func request_State_Get_0(ctx context.Context, marshaler runtime.Marshaler, client StateClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq GetRequest - var metadata runtime.ServerMetadata - - if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF { + var ( + protoReq GetRequest + metadata runtime.ServerMetadata + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } - msg, err := client.Get(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) return msg, metadata, err - } func local_request_State_Get_0(ctx context.Context, marshaler runtime.Marshaler, server StateServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq GetRequest - var metadata runtime.ServerMetadata - - if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF { + var ( + protoReq GetRequest + metadata runtime.ServerMetadata + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } - msg, err := server.Get(ctx, &protoReq) return msg, metadata, err - } func request_State_List_0(ctx context.Context, marshaler runtime.Marshaler, client StateClient, req *http.Request, pathParams map[string]string) (State_ListClient, runtime.ServerMetadata, error) { - var protoReq ListRequest - var metadata runtime.ServerMetadata - - if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF { + var ( + protoReq ListRequest + metadata runtime.ServerMetadata + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } - stream, err := client.List(ctx, &protoReq) if err != nil { return nil, metadata, err @@ -75,95 +77,88 @@ func request_State_List_0(ctx context.Context, marshaler runtime.Marshaler, clie } metadata.HeaderMD = header return stream, metadata, nil - } func request_State_Create_0(ctx context.Context, marshaler runtime.Marshaler, client StateClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq CreateRequest - var metadata runtime.ServerMetadata - - if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF { + var ( + protoReq CreateRequest + metadata runtime.ServerMetadata + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } - msg, err := client.Create(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) return msg, metadata, err - } func local_request_State_Create_0(ctx context.Context, marshaler runtime.Marshaler, server StateServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq CreateRequest - var metadata runtime.ServerMetadata - - if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF { + var ( + protoReq CreateRequest + metadata runtime.ServerMetadata + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } - msg, err := server.Create(ctx, &protoReq) return msg, metadata, err - } func request_State_Update_0(ctx context.Context, marshaler runtime.Marshaler, client StateClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq UpdateRequest - var metadata runtime.ServerMetadata - - if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF { + var ( + protoReq UpdateRequest + metadata runtime.ServerMetadata + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } - msg, err := client.Update(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) return msg, metadata, err - } func local_request_State_Update_0(ctx context.Context, marshaler runtime.Marshaler, server StateServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq UpdateRequest - var metadata runtime.ServerMetadata - - if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF { + var ( + protoReq UpdateRequest + metadata runtime.ServerMetadata + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } - msg, err := server.Update(ctx, &protoReq) return msg, metadata, err - } func request_State_Destroy_0(ctx context.Context, marshaler runtime.Marshaler, client StateClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq DestroyRequest - var metadata runtime.ServerMetadata - - if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF { + var ( + protoReq DestroyRequest + metadata runtime.ServerMetadata + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } - msg, err := client.Destroy(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) return msg, metadata, err - } func local_request_State_Destroy_0(ctx context.Context, marshaler runtime.Marshaler, server StateServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq DestroyRequest - var metadata runtime.ServerMetadata - - if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF { + var ( + protoReq DestroyRequest + metadata runtime.ServerMetadata + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } - msg, err := server.Destroy(ctx, &protoReq) return msg, metadata, err - } func request_State_Watch_0(ctx context.Context, marshaler runtime.Marshaler, client StateClient, req *http.Request, pathParams map[string]string) (State_WatchClient, runtime.ServerMetadata, error) { - var protoReq WatchRequest - var metadata runtime.ServerMetadata - - if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF { + var ( + protoReq WatchRequest + metadata runtime.ServerMetadata + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } - stream, err := client.Watch(ctx, &protoReq) if err != nil { return nil, metadata, err @@ -174,7 +169,6 @@ func request_State_Watch_0(ctx context.Context, marshaler runtime.Marshaler, cli } metadata.HeaderMD = header return stream, metadata, nil - } // RegisterStateHandlerServer registers the http handlers for service State to "mux". @@ -183,16 +177,13 @@ func request_State_Watch_0(ctx context.Context, marshaler runtime.Marshaler, cli // Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterStateHandlerFromEndpoint instead. // GRPC interceptors will not work for this type of registration. To use interceptors, you must use the "runtime.WithMiddlewares" option in the "runtime.NewServeMux" call. func RegisterStateHandlerServer(ctx context.Context, mux *runtime.ServeMux, server StateServer) error { - - mux.Handle("POST", pattern_State_Get_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + mux.Handle(http.MethodPost, pattern_State_Get_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() var stream runtime.ServerTransportStream ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) - var err error - var annotatedContext context.Context - annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/cosi.resource.State/Get", runtime.WithHTTPPathPattern("/cosi.resource.State/Get")) + annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/cosi.resource.State/Get", runtime.WithHTTPPathPattern("/cosi.resource.State/Get")) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) return @@ -204,27 +195,22 @@ func RegisterStateHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) return } - forward_State_Get_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) - }) - mux.Handle("POST", pattern_State_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + mux.Handle(http.MethodPost, pattern_State_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { err := status.Error(codes.Unimplemented, "streaming calls are not yet supported in the in-process transport") _, outboundMarshaler := runtime.MarshalerForRequest(mux, req) runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) return }) - - mux.Handle("POST", pattern_State_Create_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + mux.Handle(http.MethodPost, pattern_State_Create_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() var stream runtime.ServerTransportStream ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) - var err error - var annotatedContext context.Context - annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/cosi.resource.State/Create", runtime.WithHTTPPathPattern("/cosi.resource.State/Create")) + annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/cosi.resource.State/Create", runtime.WithHTTPPathPattern("/cosi.resource.State/Create")) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) return @@ -236,20 +222,15 @@ func RegisterStateHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) return } - forward_State_Create_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) - }) - - mux.Handle("POST", pattern_State_Update_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + mux.Handle(http.MethodPost, pattern_State_Update_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() var stream runtime.ServerTransportStream ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) - var err error - var annotatedContext context.Context - annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/cosi.resource.State/Update", runtime.WithHTTPPathPattern("/cosi.resource.State/Update")) + annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/cosi.resource.State/Update", runtime.WithHTTPPathPattern("/cosi.resource.State/Update")) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) return @@ -261,20 +242,15 @@ func RegisterStateHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) return } - forward_State_Update_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) - }) - - mux.Handle("POST", pattern_State_Destroy_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + mux.Handle(http.MethodPost, pattern_State_Destroy_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() var stream runtime.ServerTransportStream ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) - var err error - var annotatedContext context.Context - annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/cosi.resource.State/Destroy", runtime.WithHTTPPathPattern("/cosi.resource.State/Destroy")) + annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/cosi.resource.State/Destroy", runtime.WithHTTPPathPattern("/cosi.resource.State/Destroy")) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) return @@ -286,12 +262,10 @@ func RegisterStateHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) return } - forward_State_Destroy_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) - }) - mux.Handle("POST", pattern_State_Watch_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + mux.Handle(http.MethodPost, pattern_State_Watch_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { err := status.Error(codes.Unimplemented, "streaming calls are not yet supported in the in-process transport") _, outboundMarshaler := runtime.MarshalerForRequest(mux, req) runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -322,7 +296,6 @@ func RegisterStateHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux } }() }() - return RegisterStateHandler(ctx, mux, conn) } @@ -338,14 +311,11 @@ func RegisterStateHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc // doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in // "StateClient" to call the correct interceptors. This client ignores the HTTP middlewares. func RegisterStateHandlerClient(ctx context.Context, mux *runtime.ServeMux, client StateClient) error { - - mux.Handle("POST", pattern_State_Get_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + mux.Handle(http.MethodPost, pattern_State_Get_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) - var err error - var annotatedContext context.Context - annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/cosi.resource.State/Get", runtime.WithHTTPPathPattern("/cosi.resource.State/Get")) + annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/cosi.resource.State/Get", runtime.WithHTTPPathPattern("/cosi.resource.State/Get")) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) return @@ -356,18 +326,13 @@ func RegisterStateHandlerClient(ctx context.Context, mux *runtime.ServeMux, clie runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) return } - forward_State_Get_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) - }) - - mux.Handle("POST", pattern_State_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + mux.Handle(http.MethodPost, pattern_State_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) - var err error - var annotatedContext context.Context - annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/cosi.resource.State/List", runtime.WithHTTPPathPattern("/cosi.resource.State/List")) + annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/cosi.resource.State/List", runtime.WithHTTPPathPattern("/cosi.resource.State/List")) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) return @@ -378,18 +343,13 @@ func RegisterStateHandlerClient(ctx context.Context, mux *runtime.ServeMux, clie runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) return } - forward_State_List_0(annotatedContext, mux, outboundMarshaler, w, req, func() (proto.Message, error) { return resp.Recv() }, mux.GetForwardResponseOptions()...) - }) - - mux.Handle("POST", pattern_State_Create_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + mux.Handle(http.MethodPost, pattern_State_Create_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) - var err error - var annotatedContext context.Context - annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/cosi.resource.State/Create", runtime.WithHTTPPathPattern("/cosi.resource.State/Create")) + annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/cosi.resource.State/Create", runtime.WithHTTPPathPattern("/cosi.resource.State/Create")) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) return @@ -400,18 +360,13 @@ func RegisterStateHandlerClient(ctx context.Context, mux *runtime.ServeMux, clie runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) return } - forward_State_Create_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) - }) - - mux.Handle("POST", pattern_State_Update_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + mux.Handle(http.MethodPost, pattern_State_Update_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) - var err error - var annotatedContext context.Context - annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/cosi.resource.State/Update", runtime.WithHTTPPathPattern("/cosi.resource.State/Update")) + annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/cosi.resource.State/Update", runtime.WithHTTPPathPattern("/cosi.resource.State/Update")) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) return @@ -422,18 +377,13 @@ func RegisterStateHandlerClient(ctx context.Context, mux *runtime.ServeMux, clie runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) return } - forward_State_Update_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) - }) - - mux.Handle("POST", pattern_State_Destroy_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + mux.Handle(http.MethodPost, pattern_State_Destroy_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) - var err error - var annotatedContext context.Context - annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/cosi.resource.State/Destroy", runtime.WithHTTPPathPattern("/cosi.resource.State/Destroy")) + annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/cosi.resource.State/Destroy", runtime.WithHTTPPathPattern("/cosi.resource.State/Destroy")) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) return @@ -444,18 +394,13 @@ func RegisterStateHandlerClient(ctx context.Context, mux *runtime.ServeMux, clie runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) return } - forward_State_Destroy_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) - }) - - mux.Handle("POST", pattern_State_Watch_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + mux.Handle(http.MethodPost, pattern_State_Watch_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) - var err error - var annotatedContext context.Context - annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/cosi.resource.State/Watch", runtime.WithHTTPPathPattern("/cosi.resource.State/Watch")) + annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/cosi.resource.State/Watch", runtime.WithHTTPPathPattern("/cosi.resource.State/Watch")) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) return @@ -466,38 +411,25 @@ func RegisterStateHandlerClient(ctx context.Context, mux *runtime.ServeMux, clie runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) return } - forward_State_Watch_0(annotatedContext, mux, outboundMarshaler, w, req, func() (proto.Message, error) { return resp.Recv() }, mux.GetForwardResponseOptions()...) - }) - return nil } var ( - pattern_State_Get_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"cosi.resource.State", "Get"}, "")) - - pattern_State_List_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"cosi.resource.State", "List"}, "")) - - pattern_State_Create_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"cosi.resource.State", "Create"}, "")) - - pattern_State_Update_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"cosi.resource.State", "Update"}, "")) - + pattern_State_Get_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"cosi.resource.State", "Get"}, "")) + pattern_State_List_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"cosi.resource.State", "List"}, "")) + pattern_State_Create_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"cosi.resource.State", "Create"}, "")) + pattern_State_Update_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"cosi.resource.State", "Update"}, "")) pattern_State_Destroy_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"cosi.resource.State", "Destroy"}, "")) - - pattern_State_Watch_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"cosi.resource.State", "Watch"}, "")) + pattern_State_Watch_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"cosi.resource.State", "Watch"}, "")) ) var ( - forward_State_Get_0 = runtime.ForwardResponseMessage - - forward_State_List_0 = runtime.ForwardResponseStream - - forward_State_Create_0 = runtime.ForwardResponseMessage - - forward_State_Update_0 = runtime.ForwardResponseMessage - + forward_State_Get_0 = runtime.ForwardResponseMessage + forward_State_List_0 = runtime.ForwardResponseStream + forward_State_Create_0 = runtime.ForwardResponseMessage + forward_State_Update_0 = runtime.ForwardResponseMessage forward_State_Destroy_0 = runtime.ForwardResponseMessage - - forward_State_Watch_0 = runtime.ForwardResponseStream + forward_State_Watch_0 = runtime.ForwardResponseStream ) diff --git a/api/v1alpha1/state_vtproto.pb.go b/api/v1alpha1/state_vtproto.pb.go index 3c233f7d..718edf63 100644 --- a/api/v1alpha1/state_vtproto.pb.go +++ b/api/v1alpha1/state_vtproto.pb.go @@ -355,6 +355,7 @@ func (m *WatchOptions) CloneVT() *WatchOptions { r.TailEvents = m.TailEvents r.IdQuery = m.IdQuery.CloneVT() r.Aggregated = m.Aggregated + r.BootstrapBookmark = m.BootstrapBookmark if rhs := m.LabelQuery; rhs != nil { tmpContainer := make([]*LabelQuery, len(rhs)) for k, v := range rhs { @@ -830,6 +831,9 @@ func (this *WatchOptions) EqualVT(that *WatchOptions) bool { if p, q := this.StartFromBookmark, that.StartFromBookmark; (p == nil && q != nil) || (p != nil && q == nil) || string(p) != string(q) { return false } + if this.BootstrapBookmark != that.BootstrapBookmark { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -1755,6 +1759,16 @@ func (m *WatchOptions) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.BootstrapBookmark { + i-- + if m.BootstrapBookmark { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x38 + } if m.StartFromBookmark != nil { i -= len(m.StartFromBookmark) copy(dAtA[i:], m.StartFromBookmark) @@ -2196,6 +2210,9 @@ func (m *WatchOptions) SizeVT() (n int) { l = len(m.StartFromBookmark) n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + if m.BootstrapBookmark { + n += 2 + } n += len(m.unknownFields) return n } @@ -4436,6 +4453,26 @@ func (m *WatchOptions) UnmarshalVT(dAtA []byte) error { m.StartFromBookmark = []byte{} } iNdEx = postIndex + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field BootstrapBookmark", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.BootstrapBookmark = bool(v != 0) default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/pkg/controller/runtime/runtime.go b/pkg/controller/runtime/runtime.go index 9acbb0f9..02b45e17 100644 --- a/pkg/controller/runtime/runtime.go +++ b/pkg/controller/runtime/runtime.go @@ -233,7 +233,7 @@ func (runtime *Runtime) setupWatches() error { for key, cached := range runtime.watched { kind := resource.NewMetadata(key.Namespace, key.Type, "", resource.Version{}) - if err := runtime.state.WatchKindAggregated(runtime.runCtx, kind, runtime.watchCh, state.WithBootstrapContents(cached)); err != nil { + if err := runtime.state.WatchKindAggregated(runtime.runCtx, kind, runtime.watchCh, state.WithBootstrapContents(cached), state.WithBootstrapBookmark(!cached)); err != nil { return err } } @@ -314,6 +314,11 @@ eventLoop: return false } + // ignore Noop events + if e.Type == state.Noop { + continue + } + // if the resource is cached, we activated a watch with BootstrapContents option, so we need some special handling: // - before Bootstrapped event is received, we ignore events from the point of controller notification, but call Append on the cache // - on Bootstrapped event, we notify the cache that it can start serving reads diff --git a/pkg/resource/rtestutils/assertions.go b/pkg/resource/rtestutils/assertions.go index cc22a7e4..d5ef7106 100644 --- a/pkg/resource/rtestutils/assertions.go +++ b/pkg/resource/rtestutils/assertions.go @@ -230,7 +230,7 @@ func AssertNoResource[R ResourceWithRD]( return case state.Errored: require.NoError(ev.Error) - case state.Created, state.Updated, state.Bootstrapped: + case state.Created, state.Updated, state.Bootstrapped, state.Noop: } } } @@ -275,7 +275,7 @@ func AssertLength[R ResourceWithRD](ctx context.Context, t *testing.T, st state. bootstrapped = true case state.Errored: require.NoError(event.Error) - case state.Updated: + case state.Updated, state.Noop: } if bootstrapped && length == expectedLength { diff --git a/pkg/resource/rtestutils/destroy.go b/pkg/resource/rtestutils/destroy.go index afb27c92..cb85236e 100644 --- a/pkg/resource/rtestutils/destroy.go +++ b/pkg/resource/rtestutils/destroy.go @@ -72,7 +72,7 @@ func Destroy[R ResourceWithRD](ctx context.Context, t *testing.T, st state.State t.Logf("cleaned up %s ID %q", rds.Type, r.Metadata().ID()) } - case state.Bootstrapped: + case state.Bootstrapped, state.Noop: // ignore case state.Errored: require.NoError(t, event.Error()) diff --git a/pkg/state/conformance/state.go b/pkg/state/conformance/state.go index 25ff4c5f..64d8130e 100644 --- a/pkg/state/conformance/state.go +++ b/pkg/state/conformance/state.go @@ -958,17 +958,18 @@ func (suite *StateSuite) TestWatchWithBookmarks() { // TestWatchKindWithBookmarks verifies WatchKind with bookmarks. func (suite *StateSuite) TestWatchKindWithBookmarks() { - suite.testWatchKindWithBookmarks(false) -} - -// TestWatchKindAggregatedWithBookmarks verifies WatchKind aggregated with bookmarks. -func (suite *StateSuite) TestWatchKindAggregatedWithBookmarks() { - suite.testWatchKindWithBookmarks(true) + for _, aggregated := range []bool{false, true} { + for _, bootstrapContents := range []bool{false, true} { + suite.Run(fmt.Sprintf("aggregated=%v/bootstrapContents=%v", aggregated, bootstrapContents), func() { + suite.testWatchKindWithBookmarks(aggregated, bootstrapContents) + }) + } + } } -func (suite *StateSuite) testWatchKindWithBookmarks(useAggregated bool) { +func (suite *StateSuite) testWatchKindWithBookmarks(useAggregated, useBootstrapContents bool) { ns := suite.getNamespace() - res := NewPathResource(ns, fmt.Sprintf("res/watch-kind-with-bookmarks/%v", useAggregated)) + res := NewPathResource(ns, fmt.Sprintf("res/watch-kind-with-bookmarks/%v/%v", useAggregated, useBootstrapContents)) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -980,7 +981,14 @@ func (suite *StateSuite) testWatchKindWithBookmarks(useAggregated bool) { initial, err := suite.State.List(ctx, res.Metadata()) suite.Require().NoError(err) - suite.Require().NoError(watchAggregateAdapter(ctx, useAggregated, suite.State, res.Metadata().Copy(), ch, state.WithBootstrapContents(true))) + if !useBootstrapContents { + initial.Items = []resource.Resource{nil} + } + + suite.Require().NoError(watchAggregateAdapter(ctx, useAggregated, suite.State, res.Metadata().Copy(), ch, + state.WithBootstrapContents(useBootstrapContents), + state.WithBootstrapBookmark(!useBootstrapContents), + )) suite.Require().NoError(suite.State.Update(ctx, res)) suite.Require().NoError(suite.State.Update(ctx, res)) @@ -990,27 +998,36 @@ func (suite *StateSuite) testWatchKindWithBookmarks(useAggregated bool) { suite.Require().NoError(suite.State.Destroy(ctx, res.Metadata())) - const numEvents = 6 + numEvents := 6 + + if !useBootstrapContents { + // no initial "Created" event + numEvents = 5 + } expectedEvents := make([]reducedEventWithBookmark, 0, numEvents) sawBootstrapped := false + sawNoop := false for i := range numEvents + len(initial.Items) - 1 { select { case ev := <-ch: suite.T().Logf("received event %d: %v", i, ev) - if ev.Type == state.Bootstrapped { + switch ev.Type { //nolint:exhaustive + case state.Bootstrapped: sawBootstrapped = true + case state.Noop: + sawNoop = true } // filter unrelated content state - if !sawBootstrapped && ev.Resource.Metadata().ID() != res.Metadata().ID() { + if !sawBootstrapped && !sawNoop && ev.Resource.Metadata().ID() != res.Metadata().ID() { continue } - if sawBootstrapped { + if sawBootstrapped || sawNoop { // initial event might not have a bookmark suite.Assert().NotNil(ev.Bookmark, "event %d, %v", i, ev) } @@ -1030,6 +1047,8 @@ func (suite *StateSuite) testWatchKindWithBookmarks(useAggregated bool) { if ev.b == nil { // no bookmark, skip + suite.T().Logf("skipping event %d, no bookmark: %v", i, ev) + continue } diff --git a/pkg/state/impl/inmem/collection.go b/pkg/state/impl/inmem/collection.go index 080c80fb..1588e475 100644 --- a/pkg/state/impl/inmem/collection.go +++ b/pkg/state/impl/inmem/collection.go @@ -563,6 +563,26 @@ func (collection *ResourceCollection) WatchAll(ctx context.Context, singleCh cha bootstrapList = nil } + // send initial bookmark + if options.BootstrapBookmark { + event := state.Event{ + Type: state.Noop, + Resource: resource.NewTombstone(resource.NewMetadata(collection.ns, collection.typ, "", resource.VersionUndefined)), + Bookmark: encodeBookmark(pos - 1), + } + + switch { + case singleCh != nil: + if !channel.SendWithContext(ctx, singleCh, event) { + return + } + case aggCh != nil: + if !channel.SendWithContext(ctx, aggCh, []state.Event{event}) { + return + } + } + } + for { collection.mu.Lock() @@ -647,7 +667,7 @@ func (collection *ResourceCollection) WatchAll(ctx context.Context, singleCh cha // skip the event return false } - case state.Errored, state.Bootstrapped: + case state.Errored, state.Bootstrapped, state.Noop: panic("should never be reached") } diff --git a/pkg/state/options.go b/pkg/state/options.go index 0d6d81d3..480bbeaf 100644 --- a/pkg/state/options.go +++ b/pkg/state/options.go @@ -192,6 +192,7 @@ type WatchKindOptions struct { StartFromBookmark Bookmark UnmarshalOptions UnmarshalOptions BootstrapContents bool + BootstrapBookmark bool TailEvents int } @@ -205,6 +206,13 @@ func WithBootstrapContents(enable bool) WatchKindOption { } } +// WithBootstrapBookmark enables loading initial bookmark as 'noop' event for WatchKind API. +func WithBootstrapBookmark(enable bool) WatchKindOption { + return func(opts *WatchKindOptions) { + opts.BootstrapBookmark = enable + } +} + // WithKindTailEvents returns N most recent events as part of the response. func WithKindTailEvents(n int) WatchKindOption { return func(opts *WatchKindOptions) { diff --git a/pkg/state/protobuf/client/client.go b/pkg/state/protobuf/client/client.go index d8297cd2..9a521c2a 100644 --- a/pkg/state/protobuf/client/client.go +++ b/pkg/state/protobuf/client/client.go @@ -377,6 +377,7 @@ func (adapter *Adapter) WatchKind(ctx context.Context, resourceKind resource.Kin Type: resourceKind.Type(), Options: &v1alpha1.WatchOptions{ BootstrapContents: opts.BootstrapContents, + BootstrapBookmark: opts.BootstrapBookmark, StartFromBookmark: opts.StartFromBookmark, TailEvents: int32(opts.TailEvents), LabelQuery: labelQueries, @@ -430,6 +431,7 @@ func (adapter *Adapter) WatchKindAggregated(ctx context.Context, resourceKind re Type: resourceKind.Type(), Options: &v1alpha1.WatchOptions{ BootstrapContents: opts.BootstrapContents, + BootstrapBookmark: opts.BootstrapBookmark, StartFromBookmark: opts.StartFromBookmark, TailEvents: int32(opts.TailEvents), LabelQuery: labelQueries, @@ -531,6 +533,7 @@ func (adapter *Adapter) watchAdapter( } watchRequest.Options.BootstrapContents = false + watchRequest.Options.BootstrapBookmark = false watchRequest.Options.StartFromBookmark = lastBookmark watchRequest.Options.TailEvents = 0 @@ -586,6 +589,8 @@ func (adapter *Adapter) watchAdapter( event.Type = state.Bootstrapped case v1alpha1.EventType_ERRORED: event.Type = state.Errored + case v1alpha1.EventType_NOOP: + event.Type = state.Noop } if msgEvent.Resource != nil { diff --git a/pkg/state/protobuf/protobuf_test.go b/pkg/state/protobuf/protobuf_test.go index d21d6a87..f59fe772 100644 --- a/pkg/state/protobuf/protobuf_test.go +++ b/pkg/state/protobuf/protobuf_test.go @@ -133,7 +133,15 @@ func TestProtobufWatchAbort(t *testing.T) { } } -func TestProtobufWatchRestart(t *testing.T) { +func TestProtobufWatchRestartBoostrapped(t *testing.T) { + testProtobufWatchRestart(t, state.WithBootstrapContents(true), state.Bootstrapped) +} + +func TestProtobufWatchRestartInitialBookmark(t *testing.T) { + testProtobufWatchRestart(t, state.WithBootstrapBookmark(true), state.Noop) +} + +func testProtobufWatchRestart(t *testing.T, option state.WatchKindOption, initialEvent state.EventType) { grpcConn, grpcServer, restartServer, coreState := ProtobufSetup(t) stateClient := v1alpha1.NewStateClient(grpcConn) @@ -147,7 +155,7 @@ func TestProtobufWatchRestart(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) - require.NoError(t, st.WatchKindAggregated(ctx, conformance.NewPathResource("test", "/foo").Metadata(), ch, state.WithBootstrapContents(true))) + require.NoError(t, st.WatchKindAggregated(ctx, conformance.NewPathResource("test", "/foo").Metadata(), ch, option)) select { case <-ctx.Done(): @@ -155,7 +163,7 @@ func TestProtobufWatchRestart(t *testing.T) { case ev := <-ch: require.Len(t, ev, 1) - assert.Equal(t, state.Bootstrapped, ev[0].Type) + assert.Equal(t, initialEvent, ev[0].Type) } // abort the server, watch should enter retry loop diff --git a/pkg/state/protobuf/runtime_test.go b/pkg/state/protobuf/runtime_test.go new file mode 100644 index 00000000..8fa014ff --- /dev/null +++ b/pkg/state/protobuf/runtime_test.go @@ -0,0 +1,83 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package protobuf_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + + "github.com/cosi-project/runtime/api/v1alpha1" + "github.com/cosi-project/runtime/pkg/controller/conformance" + "github.com/cosi-project/runtime/pkg/controller/runtime" + "github.com/cosi-project/runtime/pkg/future" + "github.com/cosi-project/runtime/pkg/resource/protobuf" + "github.com/cosi-project/runtime/pkg/state" + "github.com/cosi-project/runtime/pkg/state/protobuf/client" +) + +func TestProtobufWatchRuntimeRestart(t *testing.T) { + require.NoError(t, protobuf.RegisterResource(conformance.IntResourceType, &conformance.IntResource{})) + require.NoError(t, protobuf.RegisterResource(conformance.StrResourceType, &conformance.StrResource{})) + + grpcConn, grpcServer, restartServer, _ := ProtobufSetup(t) + + stateClient := v1alpha1.NewStateClient(grpcConn) + + logger := zaptest.NewLogger(t) + + st := state.WrapCore(client.NewAdapter(stateClient, + client.WithRetryLogger(logger), + )) + + rt, err := runtime.NewRuntime(st, logger) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + + ctx, errCh := future.GoContext(ctx, rt.Run) + + require.NoError(t, rt.RegisterController(&conformance.IntToStrController{ + SourceNamespace: "one", + TargetNamespace: "default", + })) + require.NoError(t, rt.RegisterController(&conformance.IntDoublerController{ + SourceNamespace: "another", + TargetNamespace: "default", + })) + + require.NoError(t, st.Create(ctx, conformance.NewIntResource("one", "1", 1))) + + // wait for controller to start up + _, err = st.WatchFor(ctx, conformance.NewStrResource("default", "1", "1").Metadata(), state.WithEventTypes(state.Created)) + require.NoError(t, err) + + // abort the server, watch should enter retry loop + grpcServer.Stop() + + select { + case err = <-errCh: + require.Fail(t, "runtime finished unexpectedly", "error: %v", err) + case <-time.After(100 * time.Millisecond): + } + + _ = restartServer() + + // now another resource + require.NoError(t, st.Create(ctx, conformance.NewIntResource("another", "2", 2))) + + // wait for controller to start up + _, err = st.WatchFor(ctx, conformance.NewIntResource("default", "2", 4).Metadata(), state.WithEventTypes(state.Created)) + require.NoError(t, err) + + cancel() + + err = <-errCh + require.NoError(t, err) +} diff --git a/pkg/state/protobuf/server/server.go b/pkg/state/protobuf/server/server.go index 4a3a8c6f..a23cea1d 100644 --- a/pkg/state/protobuf/server/server.go +++ b/pkg/state/protobuf/server/server.go @@ -278,6 +278,10 @@ func (server *State) Watch(req *v1alpha1.WatchRequest, srv v1alpha1.State_WatchS opts = append(opts, state.WithKindStartFromBookmark(req.Options.StartFromBookmark)) } + if req.Options.BootstrapBookmark { + opts = append(opts, state.WithBootstrapBookmark(true)) + } + for _, query := range req.GetOptions().GetLabelQuery() { var labelOpts []resource.LabelQueryOption @@ -447,6 +451,8 @@ func mapEvent(apiVersion int32, event state.Event) (*v1alpha1.Event, error) { eventType = v1alpha1.EventType_BOOTSTRAPPED case state.Errored: eventType = v1alpha1.EventType_ERRORED + case state.Noop: + eventType = v1alpha1.EventType_NOOP } return &v1alpha1.Event{ diff --git a/pkg/state/state.go b/pkg/state/state.go index e7c83ca9..b0c2f4af 100644 --- a/pkg/state/state.go +++ b/pkg/state/state.go @@ -26,6 +26,8 @@ const ( Bootstrapped // Error happened in the watch. Errored + // Noop event. + Noop ) func _() { @@ -35,9 +37,10 @@ func _() { _ = x[Destroyed-2] _ = x[Bootstrapped-3] _ = x[Errored-4] + _ = x[Noop-5] } -var eventTypeString = [...]string{"Created", "Updated", "Destroyed", "Bootstrapped", "Errored"} +var eventTypeString = [...]string{"Created", "Updated", "Destroyed", "Bootstrapped", "Errored", "Noop"} func (eventType EventType) String() string { return eventTypeString[eventType] diff --git a/pkg/state/wrap.go b/pkg/state/wrap.go index 77e676f4..a63ef2b0 100644 --- a/pkg/state/wrap.go +++ b/pkg/state/wrap.go @@ -194,7 +194,7 @@ func (state coreWrapper) ContextWithTeardown(ctx context.Context, resourcePointe } case Destroyed: return - case Bootstrapped: + case Bootstrapped, Noop: // ignored, should not happen case Errored: // watch failed, cancel the context