Skip to content

Commit

Permalink
Introduce batched services.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 688122044
  • Loading branch information
panhania authored and copybara-github committed Nov 13, 2024
1 parent c7412e8 commit d44bffa
Show file tree
Hide file tree
Showing 10 changed files with 496 additions and 54 deletions.
43 changes: 43 additions & 0 deletions fleetspeak/src/server/batchedservice/batchedservice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package batchedservice defines the interface that Fleetspeak expects from its
// server-side service implementations that support batched message processing.
package batchedservice

import (
"context"

fspb "github.com/google/fleetspeak/fleetspeak/src/common/proto/fleetspeak"
spb "github.com/google/fleetspeak/fleetspeak/src/server/proto/fleetspeak_server"
)

// A BatchedService is similar to Service but processes multiple messages
// instead of one at the time. It also does not retry messages that failed to
// be processed.
type BatchedService interface {
// ProcessMessages is invoked with a batch of messages from the endpoint to be
// processed by the service.
//
// Unlike with the Service interface, messages that fail to be processed
// are not retried and are simply discarded.
ProcessMessages(context.Context, []*fspb.Message) error

// Stop initiates and waits for an orderly shut down.
Stop() error
}

// A Factory is a function which creates a server batched service for the provided
// configuration.
type Factory func(*spb.BatchedServiceConfig) (BatchedService, error)
30 changes: 30 additions & 0 deletions fleetspeak/src/server/comms.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,36 @@ func (c commsContext) handleMessagesFromClient(ctx context.Context, info *comms.
return nil
}

// TODO(b/371158380): Refactor validation and splitting by service to a single
// pass.
msgsByService := make(map[string][]*fspb.Message, len(msgs))
for _, msg := range msgs {
msgsByService[msg.Destination.ServiceName] = append(msgsByService[msg.Destination.ServiceName], msg)
}

// TODO(hanuszczak): Is it better to potentially over-allocate with capacity
// of `len(msgs)` or start with 0?
unbatchedMsgs := make([]*fspb.Message, 0)

for service, msgs := range msgsByService {
if len(msgs) == 0 {
continue
}
if service == "" {
log.ErrorContextf(ctx, "dropping %v messages with no service set", len(msgs))
continue
}

if c.s.serviceConfig.IsBatchedService(service) {
c.s.serviceConfig.ProcessBatchedMessages(service, msgs)
} else {
unbatchedMsgs = append(unbatchedMsgs, msgs...)
}
}

// TODO(hanuszczak): Is it better to assign `msgs` to `unbatchedMsgs` here or
// to change the occurrences below (that makes the diff worse?).
msgs = unbatchedMsgs
sort.Slice(msgs, func(a, b int) bool {
return bytes.Compare(msgs[a].MessageId, msgs[b].MessageId) == -1
})
Expand Down
77 changes: 66 additions & 11 deletions fleetspeak/src/server/internal/services/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"google.golang.org/protobuf/proto"

"github.com/google/fleetspeak/fleetspeak/src/common"
"github.com/google/fleetspeak/fleetspeak/src/server/batchedservice"
"github.com/google/fleetspeak/fleetspeak/src/server/db"
"github.com/google/fleetspeak/fleetspeak/src/server/internal/cache"
"github.com/google/fleetspeak/fleetspeak/src/server/internal/ftime"
Expand All @@ -42,21 +43,25 @@ const MaxServiceFailureReasonLength = 900

// A Manager starts, remembers, and shuts down services.
type Manager struct {
services map[string]*liveService
dataStore db.Store
serviceRegistry map[string]service.Factory // Used to look up the correct factory when configuring services.
stats stats.Collector
cc *cache.Clients
services map[string]*liveService
batchedServices map[string]batchedservice.BatchedService
dataStore db.Store
serviceRegistry map[string]service.Factory // Used to look up the correct factory when configuring services.
batchedServiceRegistry map[string]batchedservice.Factory
stats stats.Collector
cc *cache.Clients
}

// NewManager creates a new manager using the provided components. Initially it only contains the 'system' service.
func NewManager(dataStore db.Store, serviceRegistry map[string]service.Factory, stats stats.Collector, clientCache *cache.Clients) *Manager {
func NewManager(dataStore db.Store, serviceRegistry map[string]service.Factory, batchedServiceRegistry map[string]batchedservice.Factory, stats stats.Collector, clientCache *cache.Clients) *Manager {
m := Manager{
services: make(map[string]*liveService),
dataStore: dataStore,
serviceRegistry: serviceRegistry,
stats: stats,
cc: clientCache,
services: make(map[string]*liveService),
batchedServices: make(map[string]batchedservice.BatchedService),
dataStore: dataStore,
serviceRegistry: serviceRegistry,
batchedServiceRegistry: batchedServiceRegistry,
stats: stats,
cc: clientCache,
}

ssd := liveService{
Expand Down Expand Up @@ -137,6 +142,39 @@ func (c *Manager) Install(cfg *spb.ServiceConfig) error {
return nil
}

// InstallBatched adds a new batched service based on the given configuration,
// replacing any existing service registered under the same name.
func (c *Manager) InstallBatched(cfg *spb.BatchedServiceConfig) error {
if cfg.Name == "" {
return fmt.Errorf("batched service without name")
}
if cfg.Factory == "" {
return fmt.Errorf("batched service without factory")
}

factory := c.batchedServiceRegistry[cfg.Factory]
if factory == nil {
return fmt.Errorf("no such batched service factory: %v", cfg.Factory)
}

service, err := factory(cfg)
if err != nil {
return err
}

c.batchedServices[cfg.Name] = service
log.Infof("installed batched service: %v", cfg.Name)

return nil
}

// IsBatchedService returns true if the service under given name is registered
// as a batched service.
func (c *Manager) IsBatchedService(name string) bool {
_, ok := c.batchedServices[name]
return ok
}

// Stop closes and removes all services in the configuration.
func (c *Manager) Stop() {
for _, d := range c.services {
Expand Down Expand Up @@ -196,6 +234,23 @@ func (c *Manager) ProcessMessages(msgs []*fspb.Message) {
}
}

// ProcessBatchedMessages processes a batch of messages using the specified
// service.
func (c *Manager) ProcessBatchedMessages(serviceName string, msgs []*fspb.Message) {
ctx, fin := context.WithTimeout(context.Background(), 30*time.Second)
defer fin()

service := c.batchedServices[serviceName]
if service == nil {
log.ErrorContextf(ctx, "no such batched service: %v", serviceName)
return
}

if err := service.ProcessMessages(ctx, msgs); err != nil {
log.ErrorContextf(ctx, "process batched messages: %v", err)
}
}

// processMessage attempts to processes m, returning a fspb.MessageResult. It
// also updates stats, calling exactly one of MessageDropped, MessageFailed,
// MessageProcessed.
Expand Down
59 changes: 38 additions & 21 deletions fleetspeak/src/server/proto/fleetspeak_server/server.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions fleetspeak/src/server/proto/fleetspeak_server/server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ message ServerConfig {
// The collection of services that this server should include.
repeated ServiceConfig services = 1;

// The collection of batched services that this server should include.
repeated BatchedServiceConfig batched_services = 3;

// The approximate time to wait between checking for new broadcasts. If unset,
// a default of 1 minute is used.
google.protobuf.Duration broadcast_poll_time = 2;
Expand Down
Loading

0 comments on commit d44bffa

Please sign in to comment.