From a630ce6b987bbc763ea800632a1083415e903173 Mon Sep 17 00:00:00 2001 From: zeroalphat Date: Wed, 27 Sep 2023 08:07:31 +0000 Subject: [PATCH] Add implementation to limit the number of executions that can be performed. Signed-off-by: zeroalphat --- cmd/necoperf-daemon/cmd/daemon.go | 45 ++++++++++++++++--------------- cmd/necoperf-daemon/cmd/root.go | 10 ++++--- go.mod | 7 +++-- internal/daemon/api.go | 32 +++++++++++++++++----- internal/daemon/daemon.go | 26 +++++++++++------- 5 files changed, 77 insertions(+), 43 deletions(-) diff --git a/cmd/necoperf-daemon/cmd/daemon.go b/cmd/necoperf-daemon/cmd/daemon.go index c109706..c4588a7 100644 --- a/cmd/necoperf-daemon/cmd/daemon.go +++ b/cmd/necoperf-daemon/cmd/daemon.go @@ -8,29 +8,30 @@ import ( "github.com/spf13/cobra" ) -var port int -var runtimeEndpoint string -var workDir string +var ( + port int + runtimeEndpoint string + workDir string +) -func init() { - rootCmd.AddCommand(daemonCmd) - flags := daemonCmd.Flags() - flags.IntVar(&port, "port", 6543, "Set server port number") - flags.StringVar(&runtimeEndpoint, "runtime-endpoint", "unix:///run/containerd/containerd.sock", "Set container runtime endpoint") - flags.StringVar(&workDir, "work-dir", "/var/necoperf", "Set working directory") -} +func NewDaemonCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "daemon", + Short: "Starts the daemon", + RunE: func(cmd *cobra.Command, args []string) error { + handler := slog.NewTextHandler(os.Stderr, nil) + logger := slog.New(handler) + daemon, err := daemon.New(logger, port, runtimeEndpoint, workDir) + if err != nil { + return err + } -var daemonCmd = &cobra.Command{ - Use: "daemon", - Short: "Starts the daemon", - RunE: func(cmd *cobra.Command, args []string) error { - handler := slog.NewTextHandler(os.Stderr, nil) - logger := slog.New(handler) - daemon, err := daemon.New(logger, port, runtimeEndpoint, workDir) - if err != nil { - return err - } + return daemon.Start() + }, + } + cmd.Flags().IntVar(&port, "port", 6543, "Set server port number") + cmd.Flags().StringVar(&runtimeEndpoint, "runtime-endpoint", "unix:///run/containerd/containerd.sock", "Set container runtime endpoint") + cmd.Flags().StringVar(&workDir, "work-dir", "/var/necoperf", "Set working directory") - return daemon.Start() - }, + return cmd } diff --git a/cmd/necoperf-daemon/cmd/root.go b/cmd/necoperf-daemon/cmd/root.go index c8cedbf..0f7b007 100644 --- a/cmd/necoperf-daemon/cmd/root.go +++ b/cmd/necoperf-daemon/cmd/root.go @@ -6,16 +6,20 @@ import ( "github.com/spf13/cobra" ) -var ( - rootCmd = &cobra.Command{ +func NewRootCommand() *cobra.Command { + cmd := &cobra.Command{ Use: "necoperf-daemon", RunE: func(cmd *cobra.Command, args []string) error { return cmd.Help() }, } -) + + return cmd +} func Execute() { + rootCmd := NewRootCommand() + rootCmd.AddCommand(NewDaemonCommand()) if err := rootCmd.Execute(); err != nil { log.Fatal(err) } diff --git a/go.mod b/go.mod index 2f8cd00..923c9d5 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,12 @@ go 1.21 toolchain go1.21.1 require ( + github.com/google/uuid v1.3.0 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0 github.com/oklog/run v1.1.0 github.com/spf13/cobra v1.7.0 github.com/stretchr/testify v1.8.4 + golang.org/x/sync v0.3.0 google.golang.org/grpc v1.58.1 google.golang.org/protobuf v1.31.0 k8s.io/cri-api v0.28.2 @@ -23,6 +25,7 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.9.0 // indirect + github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -36,6 +39,7 @@ require ( github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.3.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect + github.com/imdario/mergo v0.3.6 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -44,8 +48,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/onsi/ginkgo/v2 v2.11.0 // indirect - github.com/onsi/gomega v1.27.10 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.16.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect diff --git a/internal/daemon/api.go b/internal/daemon/api.go index cad51f0..770a9d6 100644 --- a/internal/daemon/api.go +++ b/internal/daemon/api.go @@ -6,16 +6,18 @@ import ( "time" "github.com/cybozu-go/necoperf/internal/rpc" + "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) const ( maxTimeout = 10 * time.Minute // 10 minute + weight = 1 ) func (d *DaemonServer) Profile(req *rpc.PerfProfileRequest, stream rpc.NecoPerf_ProfileServer) error { - ctx := stream.Context() + eg, ctx := errgroup.WithContext(stream.Context()) containerID := req.GetContainerId() if len(containerID) == 0 { err := status.Error(codes.InvalidArgument, "container ID is not set") @@ -41,17 +43,35 @@ func (d *DaemonServer) Profile(req *rpc.PerfProfileRequest, stream rpc.NecoPerf_ err := status.Error(codes.Internal, "invalid PID is returned from CRI API") return err } - profileDataPath, err := d.perfExecuter.ExecRecord(ctx, d.workDir, pid, timeout) + + err = d.semaphore.Acquire(ctx, weight) if err != nil { return err } - defer os.Remove(profileDataPath) - scriptDataPath, err := d.perfExecuter.ExecScript(ctx, profileDataPath, d.workDir) - if err != nil { + var scriptDataPath string + defer os.Remove(scriptDataPath) + + eg.Go(func() error { + defer d.semaphore.Release(weight) + + profileDataPath, err := d.perfExecuter.ExecRecord(ctx, d.workDir, pid, timeout) + defer os.Remove(profileDataPath) + if err != nil { + return err + } + + scriptDataPath, err = d.perfExecuter.ExecScript(ctx, profileDataPath, d.workDir) + if err != nil { + return err + } + + return nil + }) + + if err := eg.Wait(); err != nil { return err } - defer os.Remove(scriptDataPath) f, err := os.Open(scriptDataPath) if err != nil { diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index 4940e03..66656b1 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -12,6 +12,7 @@ import ( "github.com/cybozu-go/necoperf/internal/rpc" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/oklog/run" + "golang.org/x/sync/semaphore" "google.golang.org/grpc" "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" @@ -21,11 +22,12 @@ import ( ) type DaemonServer struct { - logger *slog.Logger - server *grpc.Server - port int - endpoint string - workDir string + logger *slog.Logger + server *grpc.Server + port int + endpoint string + workDir string + semaphore *semaphore.Weighted rpc.UnimplementedNecoPerfServer container *resource.Container perfExecuter *resource.PerfExecuter @@ -34,6 +36,7 @@ type DaemonServer struct { const ( minTime = 30 * time.Second criTimeout = 30 * time.Second + maxWorkers = 2 ) func New(logger *slog.Logger, port int, endpoint, workDir string) (*DaemonServer, error) { @@ -57,12 +60,15 @@ func New(logger *slog.Logger, port int, endpoint, workDir string) (*DaemonServer ), ) + semaphore := semaphore.NewWeighted(maxWorkers) + return &DaemonServer{ - logger: logger, - server: serv, - port: port, - endpoint: endpoint, - workDir: workDir, + logger: logger, + server: serv, + port: port, + endpoint: endpoint, + workDir: workDir, + semaphore: semaphore, }, nil }