diff --git a/go.mod b/go.mod index 2f8cd00..923c9d5 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,12 @@ go 1.21 toolchain go1.21.1 require ( + github.com/google/uuid v1.3.0 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0 github.com/oklog/run v1.1.0 github.com/spf13/cobra v1.7.0 github.com/stretchr/testify v1.8.4 + golang.org/x/sync v0.3.0 google.golang.org/grpc v1.58.1 google.golang.org/protobuf v1.31.0 k8s.io/cri-api v0.28.2 @@ -23,6 +25,7 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.9.0 // indirect + github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -36,6 +39,7 @@ require ( github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.3.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect + github.com/imdario/mergo v0.3.6 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -44,8 +48,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/onsi/ginkgo/v2 v2.11.0 // indirect - github.com/onsi/gomega v1.27.10 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.16.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect diff --git a/internal/daemon/api.go b/internal/daemon/api.go index cad51f0..f8bee1b 100644 --- a/internal/daemon/api.go +++ b/internal/daemon/api.go @@ -6,16 +6,18 @@ import ( "time" "github.com/cybozu-go/necoperf/internal/rpc" + "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) const ( - maxTimeout = 10 * time.Minute // 10 minute + maxTimeout = 10 * time.Minute // 10 minute + maxConcurrent = 2 ) func (d *DaemonServer) Profile(req *rpc.PerfProfileRequest, stream rpc.NecoPerf_ProfileServer) error { - ctx := stream.Context() + eg, ctx := errgroup.WithContext(stream.Context()) containerID := req.GetContainerId() if len(containerID) == 0 { err := status.Error(codes.InvalidArgument, "container ID is not set") @@ -41,17 +43,35 @@ func (d *DaemonServer) Profile(req *rpc.PerfProfileRequest, stream rpc.NecoPerf_ err := status.Error(codes.Internal, "invalid PID is returned from CRI API") return err } - profileDataPath, err := d.perfExecuter.ExecRecord(ctx, d.workDir, pid, timeout) + + var scriptDataPath string + defer os.Remove(scriptDataPath) + + err = d.semaphore.Acquire(ctx, maxConcurrent) if err != nil { return err } - defer os.Remove(profileDataPath) - scriptDataPath, err := d.perfExecuter.ExecScript(ctx, profileDataPath, d.workDir) - if err != nil { + eg.Go(func() error { + defer d.semaphore.Release(1) + + profileDataPath, err := d.perfExecuter.ExecRecord(ctx, d.workDir, pid, timeout) + defer os.Remove(profileDataPath) + if err != nil { + return err + } + + scriptDataPath, err = d.perfExecuter.ExecScript(ctx, profileDataPath, d.workDir) + if err != nil { + return err + } + + return nil + }) + + if err := eg.Wait(); err != nil { return err } - defer os.Remove(scriptDataPath) f, err := os.Open(scriptDataPath) if err != nil { diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index 4940e03..7c2e326 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -12,6 +12,7 @@ import ( "github.com/cybozu-go/necoperf/internal/rpc" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/oklog/run" + "golang.org/x/sync/semaphore" "google.golang.org/grpc" "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" @@ -21,11 +22,12 @@ import ( ) type DaemonServer struct { - logger *slog.Logger - server *grpc.Server - port int - endpoint string - workDir string + logger *slog.Logger + server *grpc.Server + port int + endpoint string + workDir string + semaphore *semaphore.Weighted rpc.UnimplementedNecoPerfServer container *resource.Container perfExecuter *resource.PerfExecuter @@ -34,6 +36,7 @@ type DaemonServer struct { const ( minTime = 30 * time.Second criTimeout = 30 * time.Second + maxWorkers = 3 ) func New(logger *slog.Logger, port int, endpoint, workDir string) (*DaemonServer, error) { @@ -57,12 +60,15 @@ func New(logger *slog.Logger, port int, endpoint, workDir string) (*DaemonServer ), ) + w := semaphore.NewWeighted(maxWorkers) + return &DaemonServer{ - logger: logger, - server: serv, - port: port, - endpoint: endpoint, - workDir: workDir, + logger: logger, + server: serv, + port: port, + endpoint: endpoint, + workDir: workDir, + semaphore: w, }, nil }