diff --git a/fleetspeak/src/server/batchedservice/batchedservice.go b/fleetspeak/src/server/batchedservice/batchedservice.go new file mode 100644 index 00000000..844193c0 --- /dev/null +++ b/fleetspeak/src/server/batchedservice/batchedservice.go @@ -0,0 +1,43 @@ +// Copyright 2017 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package batchedservice defines the interface that Fleetspeak expects from its +// server-side service implementations that support batched message processing. +package batchedservice + +import ( + "context" + + fspb "github.com/google/fleetspeak/fleetspeak/src/common/proto/fleetspeak" + spb "github.com/google/fleetspeak/fleetspeak/src/server/proto/fleetspeak_server" +) + +// A BatchedService is similar to Service but processes multiple messages +// instead of one at the time. It also does not retry messages that failed to +// be processed. +type BatchedService interface { + // ProcessMessages is invoked with a batch of messages from the endpoint to be + // processed by the service. + // + // Unlike with the Service interface, messages that fail to be processed + // are not retried and are simply discarded. + ProcessMessages(context.Context, []*fspb.Message) error + + // Stop initiates and waits for an orderly shut down. + Stop() error +} + +// A Factory is a function which creates a server batched service for the provided +// configuration. +type Factory func(*spb.BatchedServiceConfig) (BatchedService, error) diff --git a/fleetspeak/src/server/comms.go b/fleetspeak/src/server/comms.go index 753f489b..c3eae92f 100644 --- a/fleetspeak/src/server/comms.go +++ b/fleetspeak/src/server/comms.go @@ -403,6 +403,36 @@ func (c commsContext) handleMessagesFromClient(ctx context.Context, info *comms. return nil } + // TODO(b/371158380): Refactor validation and splitting by service to a single + // pass. + msgsByService := make(map[string][]*fspb.Message, len(msgs)) + for _, msg := range msgs { + msgsByService[msg.Destination.ServiceName] = append(msgsByService[msg.Destination.ServiceName], msg) + } + + // TODO(hanuszczak): Is it better to potentially over-allocate with capacity + // of `len(msgs)` or start with 0? + unbatchedMsgs := make([]*fspb.Message, 0) + + for service, msgs := range msgsByService { + if len(msgs) == 0 { + continue + } + if service == "" { + log.ErrorContextf(ctx, "dropping %v messages with no service set", len(msgs)) + continue + } + + if c.s.serviceConfig.IsBatchedService(service) { + c.s.serviceConfig.ProcessBatchedMessages(service, msgs) + } else { + unbatchedMsgs = append(unbatchedMsgs, msgs...) + } + } + + // TODO(hanuszczak): Is it better to assign `msgs` to `unbatchedMsgs` here or + // to change the occurrences below (that makes the diff worse?). + msgs = unbatchedMsgs sort.Slice(msgs, func(a, b int) bool { return bytes.Compare(msgs[a].MessageId, msgs[b].MessageId) == -1 }) diff --git a/fleetspeak/src/server/internal/services/manager.go b/fleetspeak/src/server/internal/services/manager.go index 6785b84c..56173e74 100644 --- a/fleetspeak/src/server/internal/services/manager.go +++ b/fleetspeak/src/server/internal/services/manager.go @@ -28,6 +28,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/google/fleetspeak/fleetspeak/src/common" + "github.com/google/fleetspeak/fleetspeak/src/server/batchedservice" "github.com/google/fleetspeak/fleetspeak/src/server/db" "github.com/google/fleetspeak/fleetspeak/src/server/internal/cache" "github.com/google/fleetspeak/fleetspeak/src/server/internal/ftime" @@ -42,21 +43,25 @@ const MaxServiceFailureReasonLength = 900 // A Manager starts, remembers, and shuts down services. type Manager struct { - services map[string]*liveService - dataStore db.Store - serviceRegistry map[string]service.Factory // Used to look up the correct factory when configuring services. - stats stats.Collector - cc *cache.Clients + services map[string]*liveService + batchedServices map[string]batchedservice.BatchedService + dataStore db.Store + serviceRegistry map[string]service.Factory // Used to look up the correct factory when configuring services. + batchedServiceRegistry map[string]batchedservice.Factory + stats stats.Collector + cc *cache.Clients } // NewManager creates a new manager using the provided components. Initially it only contains the 'system' service. -func NewManager(dataStore db.Store, serviceRegistry map[string]service.Factory, stats stats.Collector, clientCache *cache.Clients) *Manager { +func NewManager(dataStore db.Store, serviceRegistry map[string]service.Factory, batchedServiceRegistry map[string]batchedservice.Factory, stats stats.Collector, clientCache *cache.Clients) *Manager { m := Manager{ - services: make(map[string]*liveService), - dataStore: dataStore, - serviceRegistry: serviceRegistry, - stats: stats, - cc: clientCache, + services: make(map[string]*liveService), + batchedServices: make(map[string]batchedservice.BatchedService), + dataStore: dataStore, + serviceRegistry: serviceRegistry, + batchedServiceRegistry: batchedServiceRegistry, + stats: stats, + cc: clientCache, } ssd := liveService{ @@ -137,6 +142,39 @@ func (c *Manager) Install(cfg *spb.ServiceConfig) error { return nil } +// InstallBatched adds a new batched service based on the given configuration, +// replacing any existing service registered under the same name. +func (c *Manager) InstallBatched(cfg *spb.BatchedServiceConfig) error { + if cfg.Name == "" { + return fmt.Errorf("batched service without name") + } + if cfg.Factory == "" { + return fmt.Errorf("batched service without factory") + } + + factory := c.batchedServiceRegistry[cfg.Factory] + if factory == nil { + return fmt.Errorf("no such batched service factory: %v", cfg.Factory) + } + + service, err := factory(cfg) + if err != nil { + return err + } + + c.batchedServices[cfg.Name] = service + log.Infof("installed batched service: %v", cfg.Name) + + return nil +} + +// IsBatchedService returns true if the service under given name is registered +// as a batched service. +func (c *Manager) IsBatchedService(name string) bool { + _, ok := c.batchedServices[name] + return ok +} + // Stop closes and removes all services in the configuration. func (c *Manager) Stop() { for _, d := range c.services { @@ -196,6 +234,23 @@ func (c *Manager) ProcessMessages(msgs []*fspb.Message) { } } +// ProcessBatchedMessages processes a batch of messages using the specified +// service. +func (c *Manager) ProcessBatchedMessages(serviceName string, msgs []*fspb.Message) { + ctx, fin := context.WithTimeout(context.Background(), 30*time.Second) + defer fin() + + service := c.batchedServices[serviceName] + if service == nil { + log.ErrorContextf(ctx, "no such batched service: %v", serviceName) + return + } + + if err := service.ProcessMessages(ctx, msgs); err != nil { + log.ErrorContextf(ctx, "process batched messages: %v", err) + } +} + // processMessage attempts to processes m, returning a fspb.MessageResult. It // also updates stats, calling exactly one of MessageDropped, MessageFailed, // MessageProcessed. diff --git a/fleetspeak/src/server/proto/fleetspeak_server/server.pb.go b/fleetspeak/src/server/proto/fleetspeak_server/server.pb.go index edd5ca72..cea995ee 100644 --- a/fleetspeak/src/server/proto/fleetspeak_server/server.pb.go +++ b/fleetspeak/src/server/proto/fleetspeak_server/server.pb.go @@ -31,6 +31,8 @@ type ServerConfig struct { // The collection of services that this server should include. Services []*ServiceConfig `protobuf:"bytes,1,rep,name=services,proto3" json:"services,omitempty"` + // The collection of batched services that this server should include. + BatchedServices []*BatchedServiceConfig `protobuf:"bytes,3,rep,name=batched_services,json=batchedServices,proto3" json:"batched_services,omitempty"` // The approximate time to wait between checking for new broadcasts. If unset, // a default of 1 minute is used. BroadcastPollTime *durationpb.Duration `protobuf:"bytes,2,opt,name=broadcast_poll_time,json=broadcastPollTime,proto3" json:"broadcast_poll_time,omitempty"` @@ -75,6 +77,13 @@ func (x *ServerConfig) GetServices() []*ServiceConfig { return nil } +func (x *ServerConfig) GetBatchedServices() []*BatchedServiceConfig { + if x != nil { + return x.BatchedServices + } + return nil +} + func (x *ServerConfig) GetBroadcastPollTime() *durationpb.Duration { if x != nil { return x.BroadcastPollTime @@ -95,22 +104,28 @@ var file_fleetspeak_src_server_proto_fleetspeak_server_server_proto_rawDesc = [] 0x65, 0x74, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, - 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x97, 0x01, + 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xeb, 0x01, 0x0a, 0x0c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3c, 0x0a, 0x08, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x66, 0x6c, 0x65, 0x65, 0x74, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x43, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x12, 0x49, 0x0a, 0x13, - 0x62, 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, 0x5f, 0x70, 0x6f, 0x6c, 0x6c, 0x5f, 0x74, - 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, - 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x11, 0x62, 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, 0x50, - 0x6f, 0x6c, 0x6c, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x4c, 0x5a, 0x4a, 0x67, 0x69, 0x74, 0x68, 0x75, - 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x66, 0x6c, 0x65, - 0x65, 0x74, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x2f, 0x66, 0x6c, 0x65, 0x65, 0x74, 0x73, 0x70, 0x65, - 0x61, 0x6b, 0x2f, 0x73, 0x72, 0x63, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2f, 0x66, 0x6c, 0x65, 0x65, 0x74, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x5f, 0x73, - 0x65, 0x72, 0x76, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x69, 0x67, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x12, 0x52, 0x0a, 0x10, + 0x62, 0x61, 0x74, 0x63, 0x68, 0x65, 0x64, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, + 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x66, 0x6c, 0x65, 0x65, 0x74, 0x73, 0x70, + 0x65, 0x61, 0x6b, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, + 0x65, 0x64, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, + 0x0f, 0x62, 0x61, 0x74, 0x63, 0x68, 0x65, 0x64, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, + 0x12, 0x49, 0x0a, 0x13, 0x62, 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, 0x5f, 0x70, 0x6f, + 0x6c, 0x6c, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x11, 0x62, 0x72, 0x6f, 0x61, 0x64, 0x63, + 0x61, 0x73, 0x74, 0x50, 0x6f, 0x6c, 0x6c, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x4c, 0x5a, 0x4a, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2f, 0x66, 0x6c, 0x65, 0x65, 0x74, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x2f, 0x66, 0x6c, 0x65, 0x65, + 0x74, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x2f, 0x73, 0x72, 0x63, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, + 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x66, 0x6c, 0x65, 0x65, 0x74, 0x73, 0x70, 0x65, + 0x61, 0x6b, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( @@ -127,18 +142,20 @@ func file_fleetspeak_src_server_proto_fleetspeak_server_server_proto_rawDescGZIP var file_fleetspeak_src_server_proto_fleetspeak_server_server_proto_msgTypes = make([]protoimpl.MessageInfo, 1) var file_fleetspeak_src_server_proto_fleetspeak_server_server_proto_goTypes = []interface{}{ - (*ServerConfig)(nil), // 0: fleetspeak.server.ServerConfig - (*ServiceConfig)(nil), // 1: fleetspeak.server.ServiceConfig - (*durationpb.Duration)(nil), // 2: google.protobuf.Duration + (*ServerConfig)(nil), // 0: fleetspeak.server.ServerConfig + (*ServiceConfig)(nil), // 1: fleetspeak.server.ServiceConfig + (*BatchedServiceConfig)(nil), // 2: fleetspeak.server.BatchedServiceConfig + (*durationpb.Duration)(nil), // 3: google.protobuf.Duration } var file_fleetspeak_src_server_proto_fleetspeak_server_server_proto_depIdxs = []int32{ 1, // 0: fleetspeak.server.ServerConfig.services:type_name -> fleetspeak.server.ServiceConfig - 2, // 1: fleetspeak.server.ServerConfig.broadcast_poll_time:type_name -> google.protobuf.Duration - 2, // [2:2] is the sub-list for method output_type - 2, // [2:2] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 2, // 1: fleetspeak.server.ServerConfig.batched_services:type_name -> fleetspeak.server.BatchedServiceConfig + 3, // 2: fleetspeak.server.ServerConfig.broadcast_poll_time:type_name -> google.protobuf.Duration + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_fleetspeak_src_server_proto_fleetspeak_server_server_proto_init() } diff --git a/fleetspeak/src/server/proto/fleetspeak_server/server.proto b/fleetspeak/src/server/proto/fleetspeak_server/server.proto index f6c21e47..5fdf0ff3 100644 --- a/fleetspeak/src/server/proto/fleetspeak_server/server.proto +++ b/fleetspeak/src/server/proto/fleetspeak_server/server.proto @@ -14,6 +14,9 @@ message ServerConfig { // The collection of services that this server should include. repeated ServiceConfig services = 1; + // The collection of batched services that this server should include. + repeated BatchedServiceConfig batched_services = 3; + // The approximate time to wait between checking for new broadcasts. If unset, // a default of 1 minute is used. google.protobuf.Duration broadcast_poll_time = 2; diff --git a/fleetspeak/src/server/proto/fleetspeak_server/services.pb.go b/fleetspeak/src/server/proto/fleetspeak_server/services.pb.go index 38fd0dc3..550a9d7b 100644 --- a/fleetspeak/src/server/proto/fleetspeak_server/services.pb.go +++ b/fleetspeak/src/server/proto/fleetspeak_server/services.pb.go @@ -102,6 +102,83 @@ func (x *ServiceConfig) GetConfig() *anypb.Any { return nil } +// TODO(b/371158380): Does it make sense to have this a separate message? +// +// Most of the fields are the same as in `ServiceConfig` (except for the +// `max_parallelism` field) but experience shows that these can diverge more in +// the future and then we will have problems. +type BatchedServiceConfig struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Name that the service will be registered under. + // + // This is used by endpoints to uniquely identify the service to send messages + // to using the `service_name` field of `fleetspeak.Address` message. + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + // Name of the factory which will be used to instantiate the service. + // + // List of available factories (and their names) is specific to a particular + // deployment and is defined during system startup. + Factory string `protobuf:"bytes,2,opt,name=factory,proto3" json:"factory,omitempty"` + // Configuration specific to the chosen service factory. + Config *anypb.Any `protobuf:"bytes,3,opt,name=config,proto3" json:"config,omitempty"` +} + +func (x *BatchedServiceConfig) Reset() { + *x = BatchedServiceConfig{} + if protoimpl.UnsafeEnabled { + mi := &file_fleetspeak_src_server_proto_fleetspeak_server_services_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BatchedServiceConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BatchedServiceConfig) ProtoMessage() {} + +func (x *BatchedServiceConfig) ProtoReflect() protoreflect.Message { + mi := &file_fleetspeak_src_server_proto_fleetspeak_server_services_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BatchedServiceConfig.ProtoReflect.Descriptor instead. +func (*BatchedServiceConfig) Descriptor() ([]byte, []int) { + return file_fleetspeak_src_server_proto_fleetspeak_server_services_proto_rawDescGZIP(), []int{1} +} + +func (x *BatchedServiceConfig) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *BatchedServiceConfig) GetFactory() string { + if x != nil { + return x.Factory + } + return "" +} + +func (x *BatchedServiceConfig) GetConfig() *anypb.Any { + if x != nil { + return x.Config + } + return nil +} + var File_fleetspeak_src_server_proto_fleetspeak_server_services_proto protoreflect.FileDescriptor var file_fleetspeak_src_server_proto_fleetspeak_server_services_proto_rawDesc = []byte{ @@ -121,12 +198,19 @@ var file_fleetspeak_src_server_proto_fleetspeak_server_services_proto_rawDesc = 0x65, 0x6c, 0x69, 0x73, 0x6d, 0x12, 0x2c, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x06, 0x63, 0x6f, 0x6e, - 0x66, 0x69, 0x67, 0x42, 0x4c, 0x5a, 0x4a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x66, 0x6c, 0x65, 0x65, 0x74, 0x73, 0x70, - 0x65, 0x61, 0x6b, 0x2f, 0x66, 0x6c, 0x65, 0x65, 0x74, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x2f, 0x73, - 0x72, 0x63, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, - 0x66, 0x6c, 0x65, 0x65, 0x74, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x65, - 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x66, 0x69, 0x67, 0x22, 0x72, 0x0a, 0x14, 0x42, 0x61, 0x74, 0x63, 0x68, 0x65, 0x64, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x6e, + 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, + 0x18, 0x0a, 0x07, 0x66, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x07, 0x66, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x2c, 0x0a, 0x06, 0x63, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, + 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x42, 0x4c, 0x5a, 0x4a, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x66, 0x6c, 0x65, + 0x65, 0x74, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x2f, 0x66, 0x6c, 0x65, 0x65, 0x74, 0x73, 0x70, 0x65, + 0x61, 0x6b, 0x2f, 0x73, 0x72, 0x63, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2f, 0x66, 0x6c, 0x65, 0x65, 0x74, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x5f, 0x73, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -141,18 +225,20 @@ func file_fleetspeak_src_server_proto_fleetspeak_server_services_proto_rawDescGZ return file_fleetspeak_src_server_proto_fleetspeak_server_services_proto_rawDescData } -var file_fleetspeak_src_server_proto_fleetspeak_server_services_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_fleetspeak_src_server_proto_fleetspeak_server_services_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_fleetspeak_src_server_proto_fleetspeak_server_services_proto_goTypes = []interface{}{ - (*ServiceConfig)(nil), // 0: fleetspeak.server.ServiceConfig - (*anypb.Any)(nil), // 1: google.protobuf.Any + (*ServiceConfig)(nil), // 0: fleetspeak.server.ServiceConfig + (*BatchedServiceConfig)(nil), // 1: fleetspeak.server.BatchedServiceConfig + (*anypb.Any)(nil), // 2: google.protobuf.Any } var file_fleetspeak_src_server_proto_fleetspeak_server_services_proto_depIdxs = []int32{ - 1, // 0: fleetspeak.server.ServiceConfig.config:type_name -> google.protobuf.Any - 1, // [1:1] is the sub-list for method output_type - 1, // [1:1] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name + 2, // 0: fleetspeak.server.ServiceConfig.config:type_name -> google.protobuf.Any + 2, // 1: fleetspeak.server.BatchedServiceConfig.config:type_name -> google.protobuf.Any + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_fleetspeak_src_server_proto_fleetspeak_server_services_proto_init() } @@ -173,6 +259,18 @@ func file_fleetspeak_src_server_proto_fleetspeak_server_services_proto_init() { return nil } } + file_fleetspeak_src_server_proto_fleetspeak_server_services_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BatchedServiceConfig); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -180,7 +278,7 @@ func file_fleetspeak_src_server_proto_fleetspeak_server_services_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_fleetspeak_src_server_proto_fleetspeak_server_services_proto_rawDesc, NumEnums: 0, - NumMessages: 1, + NumMessages: 2, NumExtensions: 0, NumServices: 0, }, diff --git a/fleetspeak/src/server/proto/fleetspeak_server/services.proto b/fleetspeak/src/server/proto/fleetspeak_server/services.proto index 19d92abe..50b656ba 100644 --- a/fleetspeak/src/server/proto/fleetspeak_server/services.proto +++ b/fleetspeak/src/server/proto/fleetspeak_server/services.proto @@ -25,3 +25,25 @@ message ServiceConfig { // the service. The allowed type depends upon the factory. google.protobuf.Any config = 4; } + +// TODO(b/371158380): Does it make sense to have this a separate message? +// +// Most of the fields are the same as in `ServiceConfig` (except for the +// `max_parallelism` field) but experience shows that these can diverge more in +// the future and then we will have problems. +message BatchedServiceConfig { + // Name that the service will be registered under. + // + // This is used by endpoints to uniquely identify the service to send messages + // to using the `service_name` field of `fleetspeak.Address` message. + string name = 1; + + // Name of the factory which will be used to instantiate the service. + // + // List of available factories (and their names) is specific to a particular + // deployment and is defined during system startup. + string factory = 2; + + // Configuration specific to the chosen service factory. + google.protobuf.Any config = 3; +} diff --git a/fleetspeak/src/server/server.go b/fleetspeak/src/server/server.go index 84840014..41a13453 100644 --- a/fleetspeak/src/server/server.go +++ b/fleetspeak/src/server/server.go @@ -28,6 +28,7 @@ import ( "github.com/google/fleetspeak/fleetspeak/src/common" "github.com/google/fleetspeak/fleetspeak/src/server/authorizer" + "github.com/google/fleetspeak/fleetspeak/src/server/batchedservice" "github.com/google/fleetspeak/fleetspeak/src/server/comms" "github.com/google/fleetspeak/fleetspeak/src/server/db" "github.com/google/fleetspeak/fleetspeak/src/server/internal/broadcasts" @@ -44,11 +45,14 @@ import ( // Components gathers the external components required to instantiate a Fleetspeak Server. type Components struct { - Datastore db.Store // Required, used to store all server state. - ServiceFactories map[string]service.Factory // Required, used to configure services according to the ServerConfig. - Communicators []comms.Communicator // Required to communicate with clients. - Stats stats.Collector // If set, will be notified about interesting events. - Authorizer authorizer.Authorizer // If set, will control and validate contacts from clients. + Datastore db.Store // Required, used to store all server state. + + ServiceFactories map[string]service.Factory // Required, used to configure services according to the ServerConfig. + BatchedServiceFactories map[string]batchedservice.Factory // Required, used to create batched services according to the BatchedServerConfig. + + Communicators []comms.Communicator // Required to communicate with clients. + Stats stats.Collector // If set, will be notified about interesting events. + Authorizer authorizer.Authorizer // If set, will control and validate contacts from clients. // If set, these will be used by Fleetspeak servers to pass simple // notifications between themselves. Currently only important when using @@ -116,7 +120,7 @@ func MakeServer(c *spb.ServerConfig, sc Components) (*Server, error) { healthCheck: sc.HealthCheck, } - s.serviceConfig = services.NewManager(sc.Datastore, sc.ServiceFactories, sc.Stats, s.clientCache) + s.serviceConfig = services.NewManager(sc.Datastore, sc.ServiceFactories, sc.BatchedServiceFactories, sc.Stats, s.clientCache) cn, err := s.listener.Start() if err != nil { @@ -129,6 +133,11 @@ func MakeServer(c *spb.ServerConfig, sc Components) (*Server, error) { return nil, err } } + for _, cfg := range c.BatchedServices { + if err := s.serviceConfig.InstallBatched(cfg); err != nil { + return nil, err + } + } for _, cm := range s.comms { if err := cm.Setup(commsContext{&s}); err != nil { return nil, err diff --git a/fleetspeak/src/server/servertests/comms_test.go b/fleetspeak/src/server/servertests/comms_test.go index 9a0f5206..e1eeac7f 100644 --- a/fleetspeak/src/server/servertests/comms_test.go +++ b/fleetspeak/src/server/servertests/comms_test.go @@ -35,7 +35,9 @@ import ( "github.com/google/fleetspeak/fleetspeak/src/server/sertesting" "github.com/google/fleetspeak/fleetspeak/src/server/service" "github.com/google/fleetspeak/fleetspeak/src/server/testserver" + "github.com/google/go-cmp/cmp" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" tspb "google.golang.org/protobuf/types/known/timestamppb" fspb "github.com/google/fleetspeak/fleetspeak/src/common/proto/fleetspeak" @@ -576,3 +578,125 @@ func TestServiceError(t *testing.T) { t.Errorf("Unexpected failure reason: got [%v], want [%v]", messageResult.FailedReason, expectedFailedReason) } } + +type fakeBatchedService struct { + batches [][]*fspb.Message +} + +func (s *fakeBatchedService) ProcessMessages(ctx context.Context, msgs []*fspb.Message) error { + s.batches = append(s.batches, msgs) + return nil +} + +func (s *fakeBatchedService) Stop() error { + return nil +} + +func TestBatchedService(t *testing.T) { + ctx := context.Background() + + service := &fakeBatchedService{} + server := testserver.MakeWithBatchedService(t, "TestServerService", service) + defer server.S.Stop() + + clientKey, err := server.AddClient() + if err != nil { + t.Fatalf("add client: %v", err) + } + clientID, err := common.MakeClientID(clientKey) + if err != nil { + t.Fatalf("make client id: %v", err) + } + + _, err = server.SimulateContactFromClient(ctx, clientKey, []*fspb.Message{ + { + Source: &fspb.Address{ + ClientId: clientID.Bytes(), + ServiceName: "TestEndpointService", + }, + Destination: &fspb.Address{ + ServiceName: "TestServerService", + }, + SourceMessageId: []byte("AA"), + MessageType: "TestMessageType", + }, + }) + if err != nil { + t.Fatalf("simulate contact ('AA'): %v", err) + } + + _, err = server.SimulateContactFromClient(ctx, clientKey, []*fspb.Message{ + { + Source: &fspb.Address{ + ClientId: clientID.Bytes(), + ServiceName: "TestEndpointService", + }, + Destination: &fspb.Address{ + ServiceName: "TestServerService", + }, + SourceMessageId: []byte("BA"), + MessageType: "TestMessageType", + }, + { + Source: &fspb.Address{ + ClientId: clientID.Bytes(), + ServiceName: "TestEndpointService", + }, + Destination: &fspb.Address{ + ServiceName: "TestServerService", + }, + SourceMessageId: []byte("BB"), + MessageType: "TestMessageType", + }, + }) + if err != nil { + t.Fatalf("simulate contact ('BA', 'BB'): %v", err) + } + + wantBatches := [][]*fspb.Message{ + { + { + Source: &fspb.Address{ + ClientId: clientID.Bytes(), + ServiceName: "TestEndpointService", + }, + Destination: &fspb.Address{ + ServiceName: "TestServerService", + }, + SourceMessageId: []byte("AA"), + MessageType: "TestMessageType", + }, + }, + { + { + Source: &fspb.Address{ + ClientId: clientID.Bytes(), + ServiceName: "TestEndpointService", + }, + Destination: &fspb.Address{ + ServiceName: "TestServerService", + }, + SourceMessageId: []byte("BA"), + MessageType: "TestMessageType", + }, + { + Source: &fspb.Address{ + ClientId: clientID.Bytes(), + ServiceName: "TestEndpointService", + }, + Destination: &fspb.Address{ + ServiceName: "TestServerService", + }, + SourceMessageId: []byte("BB"), + MessageType: "TestMessageType", + }, + }, + } + + if diff := cmp.Diff(wantBatches, service.batches, + protocmp.Transform(), + protocmp.IgnoreFields(&fspb.Message{}, "message_id"), + ); diff != "" { + t.Fatalf("unexpected batches from simulated contact (-want +got): %s", diff) + } +} diff --git a/fleetspeak/src/server/testserver/testserver.go b/fleetspeak/src/server/testserver/testserver.go index 42183f31..5b01427c 100644 --- a/fleetspeak/src/server/testserver/testserver.go +++ b/fleetspeak/src/server/testserver/testserver.go @@ -33,6 +33,7 @@ import ( "github.com/google/fleetspeak/fleetspeak/src/common" "github.com/google/fleetspeak/fleetspeak/src/comtesting" "github.com/google/fleetspeak/fleetspeak/src/server" + "github.com/google/fleetspeak/fleetspeak/src/server/batchedservice" "github.com/google/fleetspeak/fleetspeak/src/server/comms" "github.com/google/fleetspeak/fleetspeak/src/server/service" "github.com/google/fleetspeak/fleetspeak/src/server/sqlite" @@ -140,6 +141,46 @@ func MakeWithService(t *testing.T, testName, caseName string, serviceInstance se return testServer } +// MakeWithBatchedService creates with the given batched service backed by a +// SQLite datastore. +func MakeWithBatchedService(t *testing.T, svcName string, svc batchedservice.BatchedService) Server { + t.Helper() + + tempdir := t.TempDir() + + ds, err := sqlite.MakeDatastore(path.Join(tempdir, "test.sqlite")) + if err != nil { + t.Fatalf("create datastore: %v", err) + } + + result := Server{} + + server, err := server.MakeServer( + &spb.ServerConfig{ + BatchedServices: []*spb.BatchedServiceConfig{{ + Name: svcName, + Factory: svcName, + }}, + }, + server.Components{ + Datastore: ds, + BatchedServiceFactories: map[string]batchedservice.Factory{ + svcName: func(conf *spb.BatchedServiceConfig) (batchedservice.BatchedService, error) { + return svc, nil + }, + }, + Communicators: []comms.Communicator{FakeCommunicator{&result}}, + }, + ) + if err != nil { + t.Fatalf("create server: %v", err) + } + + result.S = server + result.DS = ds + return result +} + // AddClient adds a new client with a random id to a server. func (s Server) AddClient() (crypto.PublicKey, error) { k, err := rsa.GenerateKey(rand.Reader, 2048)