From 19a157239ebc523c9c6e8e3656c4918a1bc83929 Mon Sep 17 00:00:00 2001 From: Andrewmatilde Date: Wed, 8 Feb 2023 17:01:30 +0800 Subject: [PATCH] Fix issue about disk attack cannot work well in chaos-mesh:physical machine chaos (#236) * add default value of PayloadProcessNum&FillByFAllocate Signed-off-by: andrewmatilde * recover unit-test Signed-off-by: andrewmatilde * recover unit-test Signed-off-by: andrewmatilde * Enable fill | write in dir. Signed-off-by: andrewmatilde * Enable fill | write in dir. Signed-off-by: andrewmatilde * Enable fill | write in dir. Signed-off-by: andrewmatilde * fix lint Signed-off-by: andrewmatilde * fix log Signed-off-by: andrewmatilde * ignore some unhandled errors & fix unexported returned value in exported function Signed-off-by: andrewmatilde * fix deprecated function Signed-off-by: andrewmatilde * complete async disk attack in server side Signed-off-by: andrewmatilde * better input args type in command pool Signed-off-by: andrewmatilde * complete test for command pool Signed-off-by: andrewmatilde * add license Signed-off-by: andrewmatilde * add license Signed-off-by: andrewmatilde * fix comment Signed-off-by: andrewmatilde * add output channel to pools Signed-off-by: andrewmatilde * manually test & fix disk attack Signed-off-by: andrewmatilde * fix ut in disk attack Signed-off-by: andrewmatilde * fix Boilerplate header Signed-off-by: andrewmatilde --------- Signed-off-by: andrewmatilde Co-authored-by: Cwen Yin --- go.mod | 8 +- go.sum | 18 ++- pkg/core/disk.go | 13 ++- pkg/core/disk_test.go | 3 +- pkg/core/experiment.go | 3 + pkg/core/jvm.go | 4 +- pkg/server/chaosd/disk.go | 128 +++++++++------------ pkg/server/chaosd/disk_server.go | 172 ++++++++++++++++++++++++++++ pkg/server/chaosd/disk_test.go | 14 ++- pkg/server/chaosd/recover.go | 2 + pkg/server/chaosd/server.go | 4 + pkg/server/httpserver/auth.go | 8 +- pkg/server/httpserver/experiment.go | 8 +- pkg/server/httpserver/server.go | 64 +++++------ pkg/server/httpserver/system.go | 4 +- pkg/utils/command.go | 13 ++- pkg/utils/command_test.go | 3 +- pkg/utils/pool.go | 117 +++++++++++++++++++ pkg/utils/pool_test.go | 81 +++++++++++++ 19 files changed, 540 insertions(+), 127 deletions(-) create mode 100644 pkg/server/chaosd/disk_server.go create mode 100644 pkg/utils/pool.go create mode 100644 pkg/utils/pool_test.go diff --git a/go.mod b/go.mod index db9a9c28..1df2d751 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/chaos-mesh/chaosd require ( + github.com/Jeffail/tunny v0.1.4 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a github.com/chaos-mesh/chaos-mesh v0.9.1-0.20220812140450-4bc7ef589c13 @@ -24,11 +25,13 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0 github.com/robfig/cron/v3 v3.0.1 + github.com/samber/lo v1.37.0 + github.com/samber/mo v1.8.0 github.com/segmentio/kafka-go v0.4.31 github.com/shirou/gopsutil v3.21.11+incompatible github.com/spf13/cobra v1.4.0 github.com/spf13/pflag v1.0.5 - github.com/stretchr/testify v1.7.2 + github.com/stretchr/testify v1.8.1 github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe github.com/swaggo/gin-swagger v1.5.0 github.com/swaggo/swag v1.8.3 @@ -135,9 +138,10 @@ require ( go.uber.org/dig v1.14.1 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 // indirect + golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect + golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect diff --git a/go.sum b/go.sum index c6271417..ed93fc4b 100644 --- a/go.sum +++ b/go.sum @@ -70,6 +70,8 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= +github.com/Jeffail/tunny v0.1.4 h1:chtpdz+nUtaYQeCKlNBg6GycFF/kGVHOr6A3cmzTJXs= +github.com/Jeffail/tunny v0.1.4/go.mod h1:P8xAx4XQl0xsuhjX1DtfaMDCSuavzdb2rwbd0lk+fvo= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= @@ -947,6 +949,10 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/safchain/ethtool v0.0.0-20190326074333-42ed695e3de8/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4= +github.com/samber/lo v1.37.0 h1:XjVcB8g6tgUp8rsPsJ2CvhClfImrpL04YpQHXeHPhRw= +github.com/samber/lo v1.37.0/go.mod h1:9vaz2O4o8oOnK23pd2TrXufcbdbJIa3b6cstBWKpopA= +github.com/samber/mo v1.8.0 h1:vYjHTfg14JF9tD2NLhpoUsRi9bjyRoYwa4+do0nvbVw= +github.com/samber/mo v1.8.0/go.mod h1:BfkrCPuYzVG3ZljnZB783WIJIGk1mcZr9c9CPf8tAxs= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo= @@ -1005,6 +1011,8 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v0.0.0-20180303142811-b89eecf5ca5d/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -1013,8 +1021,11 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe h1:K8pHPVoTgxFJt1lXuIzzOX7zZhZFldJQK/CgKx9BFIc= github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe/go.mod h1:lKJPbtWzJ9JhsTN1k1gZgleJWY/cqq0psdoMmaThG3w= @@ -1169,6 +1180,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM= +golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -1281,8 +1294,9 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/core/disk.go b/pkg/core/disk.go index d20e8f4c..6e255ca5 100644 --- a/pkg/core/disk.go +++ b/pkg/core/disk.go @@ -16,7 +16,6 @@ package core import ( "encoding/json" "fmt" - "io/ioutil" "os" "path/filepath" "strconv" @@ -90,6 +89,16 @@ func NewDiskOption() *DiskOption { } } +func NewDiskOptionForServer() *DiskOption { + return &DiskOption{ + CommonAttackConfig: CommonAttackConfig{ + Kind: DiskServerAttack, + }, + PayloadProcessNum: 1, + FillByFallocate: true, + } +} + func (opt *DiskOption) PreProcess() (*DiskAttackConfig, error) { if err := opt.CommonAttackConfig.Validate(); err != nil { return nil, err @@ -191,7 +200,7 @@ func initPath(opt *DiskOption) (string, error) { // check if Path of file is valid when Path is not empty if os.IsNotExist(err) { var b []byte - if err := ioutil.WriteFile(opt.Path, b, 0600); err != nil { + if err := os.WriteFile(opt.Path, b, 0600); err != nil { return "", err } if err := os.Remove(opt.Path); err != nil { diff --git a/pkg/core/disk_test.go b/pkg/core/disk_test.go index a57e42e9..cb710b69 100644 --- a/pkg/core/disk_test.go +++ b/pkg/core/disk_test.go @@ -1,4 +1,4 @@ -// Copyright 2021 Chaos Mesh Authors. +// Copyright 2023 Chaos Mesh Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -10,6 +10,7 @@ // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. + package core import ( diff --git a/pkg/core/experiment.go b/pkg/core/experiment.go index c9008a8a..5d209a0e 100644 --- a/pkg/core/experiment.go +++ b/pkg/core/experiment.go @@ -35,6 +35,7 @@ const ( NetworkAttack = "network" StressAttack = "stress" DiskAttack = "disk" + DiskServerAttack = "disk-server" ClockAttack = "clock" HostAttack = "host" JVMAttack = "jvm" @@ -109,6 +110,8 @@ func GetAttackByKind(kind string) *AttackConfig { attackConfig = &StressCommand{} case DiskAttack: attackConfig = &DiskAttackConfig{} + case DiskServerAttack: + attackConfig = &DiskAttackConfig{} case JVMAttack: attackConfig = &JVMCommand{} case ClockAttack: diff --git a/pkg/core/jvm.go b/pkg/core/jvm.go index c28752bc..38ec4dd4 100644 --- a/pkg/core/jvm.go +++ b/pkg/core/jvm.go @@ -141,8 +141,8 @@ type JVMStressSpec struct { // only when SQL match the Database, Table and SQLType, chaosd will inject fault // for example: // -// SQL is "select * from test.t1", -// only when ((Database == "test" || Database == "") && (Table == "t1" || Table == "") && (SQLType == "select" || SQLType == "")) is true, chaosd will inject fault +// SQL is "select * from test.t1", +// only when ((Database == "test" || Database == "") && (Table == "t1" || Table == "") && (SQLType == "select" || SQLType == "")) is true, chaosd will inject fault type JVMMySQLSpec struct { // the version of mysql-connector-java, only support 5.X.X(set to 5) and 8.X.X(set to 8) now MySQLConnectorVersion string `json:"mysql-connector-version,omitempty"` diff --git a/pkg/server/chaosd/disk.go b/pkg/server/chaosd/disk.go index e08b546c..518d218c 100644 --- a/pkg/server/chaosd/disk.go +++ b/pkg/server/chaosd/disk.go @@ -14,17 +14,15 @@ package chaosd import ( + "context" "fmt" "os" - "os/exec" - "sync" - "time" "github.com/hashicorp/go-multierror" "github.com/pingcap/log" "go.uber.org/zap" - "github.com/chaos-mesh/chaosd/pkg/server/utils" + pkgUtils "github.com/chaos-mesh/chaosd/pkg/utils" "github.com/chaos-mesh/chaosd/pkg/core" ) @@ -33,88 +31,71 @@ type diskAttack struct{} var DiskAttack AttackType = diskAttack{} -func (disk diskAttack) Attack(options core.AttackConfig, env Environment) error { +func handleDiskAttackOutput(output []byte, err error, c chan interface{}) { + if err != nil { + log.Error(string(output), zap.Error(err)) + c <- err + } + log.Info(string(output)) +} + +func (diskAttack) Attack(options core.AttackConfig, env Environment) error { + err := ApplyDiskAttack(options, env) + if err != nil { + return err + } + return nil +} + +func handleOutputChannelError(c chan interface{}) error { + close(c) + var multiErrs error + for i := range c { + if err, ok := i.(error); ok { + multiErrs = multierror.Append(multiErrs, err) + } + } + if multiErrs != nil { + return multiErrs + } + return nil +} + +func ApplyDiskAttack(options core.AttackConfig, env Environment) error { var attackConf *core.DiskAttackConfig var ok bool if attackConf, ok = options.(*core.DiskAttackConfig); !ok { return fmt.Errorf("AttackConfig -> *DiskAttackConfig meet error") } + poolSize := getPoolSize(attackConf) + outputChan := make(chan interface{}, poolSize+1) if attackConf.Action == core.DiskFillAction { - if attackConf.FAllocateOption != nil { - cmd := core.FAllocateCommand.Unmarshal(*attackConf.FAllocateOption) - output, err := cmd.CombinedOutput() - - if err != nil { - log.Error(string(output), zap.Error(err)) - return err - } - log.Info(string(output)) - return nil - } - - for _, DdOption := range *attackConf.DdOptions { - cmd := core.DdCommand.Unmarshal(DdOption) - output, err := cmd.CombinedOutput() - - if err != nil { - log.Error(string(output), zap.Error(err)) - return err - } - log.Info(string(output)) - } - return nil + cmdPool := pkgUtils.NewCommandPools(context.Background(), nil, poolSize) + env.Chaos.CmdPools[env.AttackUid] = cmdPool + fillDisk(attackConf, cmdPool, NewOutputHandler(handleDiskAttackOutput, outputChan)) + cmdPool.Wait() + cmdPool.Close() + return handleOutputChannelError(outputChan) } if attackConf.DdOptions != nil { - duration, _ := options.ScheduleDuration() - var deadline <-chan time.Time - if duration != nil { - deadline = time.After(*duration) - } - - if len(*attackConf.DdOptions) == 0 { - return nil - } - rest := (*attackConf.DdOptions)[len(*attackConf.DdOptions)-1] - *attackConf.DdOptions = (*attackConf.DdOptions)[:len(*attackConf.DdOptions)-1] - - cmd := core.DdCommand.Unmarshal(rest) - err := utils.ExecWithDeadline(deadline, cmd) - if err != nil { - return err + var cmdPool *pkgUtils.CommandPools + deadline := getDeadline(options) + if deadline != nil { + cmdPool = pkgUtils.NewCommandPools(context.Background(), deadline, poolSize) } + cmdPool = pkgUtils.NewCommandPools(context.Background(), nil, poolSize) + env.Chaos.CmdPools[env.AttackUid] = cmdPool - var wg sync.WaitGroup - var mu sync.Mutex - var errs error - wg.Add(len(*attackConf.DdOptions)) - for _, ddOpt := range *attackConf.DdOptions { - cmd := core.DdCommand.Unmarshal(ddOpt) - - go func(cmd *exec.Cmd) { - defer wg.Done() - err := utils.ExecWithDeadline(deadline, cmd) - if err != nil { - log.Error(cmd.String(), zap.Error(err)) - mu.Lock() - defer mu.Unlock() - errs = multierror.Append(errs, err) - return - } - }(cmd) - } - - wg.Wait() - - if errs != nil { - return errs - } + applyPayload(attackConf, cmdPool, NewOutputHandler(handleDiskAttackOutput, outputChan)) + cmdPool.Wait() + cmdPool.Close() + return handleOutputChannelError(outputChan) } return nil - } -func (diskAttack) Recover(exp core.Experiment, _ Environment) error { +func (diskAttack) Recover(exp core.Experiment, env Environment) error { attackConfig, err := exp.GetRequestCommand() if err != nil { return err @@ -127,5 +108,10 @@ func (diskAttack) Recover(exp core.Experiment, _ Environment) error { log.Warn(fmt.Sprintf("recover disk: remove %s failed", config.Path), zap.Error(err)) } } + + if cmdPool, ok := env.Chaos.CmdPools[exp.Uid]; ok { + cmdPool.Close() + } + return nil } diff --git a/pkg/server/chaosd/disk_server.go b/pkg/server/chaosd/disk_server.go new file mode 100644 index 00000000..3c0bd658 --- /dev/null +++ b/pkg/server/chaosd/disk_server.go @@ -0,0 +1,172 @@ +// Copyright 2021 Chaos Mesh Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package chaosd + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/pingcap/log" + "go.uber.org/zap" + + pkgUtils "github.com/chaos-mesh/chaosd/pkg/utils" + + "github.com/chaos-mesh/chaosd/pkg/core" +) + +type diskServerAttack struct{} + +var DiskServerAttack AttackType = diskServerAttack{} + +func handleDiskServerOutput(output []byte, err error, _ chan interface{}) { + if err != nil { + log.Error(string(output), zap.Error(err)) + } + log.Info(string(output)) +} + +func (diskServerAttack) Attack(options core.AttackConfig, env Environment) error { + err := ApplyDiskServerAttack(options, env) + if err != nil { + return err + } + return nil +} + +type OutputHandler struct { + StdoutHandler func([]byte, error, chan interface{}) + OutputChan chan interface{} +} + +func NewOutputHandler( + handler func([]byte, error, chan interface{}), + outputChan chan interface{}) *OutputHandler { + return &OutputHandler{ + StdoutHandler: handler, + OutputChan: outputChan, + } +} + +func getPoolSize(attackConf *core.DiskAttackConfig) int { + poolSize := 1 + if attackConf.DdOptions != nil && len(*attackConf.DdOptions) > 0 { + poolSize = len(*attackConf.DdOptions) + } + return poolSize +} + +func fillDisk( + attackConf *core.DiskAttackConfig, + cmdPool *pkgUtils.CommandPools, + outputHandler *OutputHandler) { + if attackConf.FAllocateOption != nil { + name, args := core.FAllocateCommand.GetCmdArgs(*attackConf.FAllocateOption) + runner := pkgUtils.NewCommandRunner(name, args). + WithOutputHandler(outputHandler.StdoutHandler, outputHandler.OutputChan) + cmdPool.Start(runner) + return + } + + for _, DdOption := range *attackConf.DdOptions { + name, args := core.DdCommand.GetCmdArgs(DdOption) + runner := pkgUtils.NewCommandRunner(name, args). + WithOutputHandler(outputHandler.StdoutHandler, outputHandler.OutputChan) + cmdPool.Start(runner) + } + return +} + +func getDeadline(options core.AttackConfig) *time.Time { + duration, _ := options.ScheduleDuration() + if duration != nil { + deadline := time.Now().Add(*duration) + return &deadline + } + return nil +} + +func applyPayload( + attackConf *core.DiskAttackConfig, + cmdPool *pkgUtils.CommandPools, + outputHandler *OutputHandler) { + if len(*attackConf.DdOptions) == 0 { + return + } + rest := (*attackConf.DdOptions)[len(*attackConf.DdOptions)-1] + *attackConf.DdOptions = (*attackConf.DdOptions)[:len(*attackConf.DdOptions)-1] + name, args := core.DdCommand.GetCmdArgs(rest) + runner := pkgUtils.NewCommandRunner(name, args). + WithOutputHandler(outputHandler.StdoutHandler, outputHandler.OutputChan) + cmdPool.Start(runner) + + for _, ddOpt := range *attackConf.DdOptions { + name, args := core.DdCommand.GetCmdArgs(ddOpt) + runner := pkgUtils.NewCommandRunner(name, args). + WithOutputHandler(outputHandler.StdoutHandler, outputHandler.OutputChan) + cmdPool.Start(runner) + } +} + +func ApplyDiskServerAttack(options core.AttackConfig, env Environment) error { + var attackConf *core.DiskAttackConfig + var ok bool + if attackConf, ok = options.(*core.DiskAttackConfig); !ok { + return fmt.Errorf("AttackConfig -> *DiskAttackConfig meet error") + } + poolSize := getPoolSize(attackConf) + if attackConf.Action == core.DiskFillAction { + cmdPool := pkgUtils.NewCommandPools(context.Background(), nil, poolSize) + env.Chaos.CmdPools[env.AttackUid] = cmdPool + fillDisk(attackConf, cmdPool, NewOutputHandler(handleDiskServerOutput, nil)) + return nil + } + + if attackConf.DdOptions != nil { + var cmdPool *pkgUtils.CommandPools + deadline := getDeadline(options) + if deadline != nil { + cmdPool = pkgUtils.NewCommandPools(context.Background(), deadline, poolSize) + } + cmdPool = pkgUtils.NewCommandPools(context.Background(), nil, poolSize) + env.Chaos.CmdPools[env.AttackUid] = cmdPool + + applyPayload(attackConf, cmdPool, NewOutputHandler(handleDiskServerOutput, nil)) + } + return nil +} + +func (diskServerAttack) Recover(exp core.Experiment, env Environment) error { + attackConfig, err := exp.GetRequestCommand() + if err != nil { + return err + } + config := *attackConfig.(*core.DiskAttackConfig) + + switch config.Action { + case core.DiskFillAction, core.DiskWritePayloadAction: + err = os.Remove(config.Path) + if err != nil { + log.Warn(fmt.Sprintf("recover disk: remove %s failed", config.Path), zap.Error(err)) + } + } + + if cmdPool, ok := env.Chaos.CmdPools[exp.Uid]; ok { + log.Info(fmt.Sprintf("stop disk attack,read: %s", config.Path)) + cmdPool.Close() + } + + return nil +} diff --git a/pkg/server/chaosd/disk_test.go b/pkg/server/chaosd/disk_test.go index 7aa407d8..c028bc7f 100644 --- a/pkg/server/chaosd/disk_test.go +++ b/pkg/server/chaosd/disk_test.go @@ -1,4 +1,4 @@ -// Copyright 2021 Chaos Mesh Authors. +// Copyright 2023 Chaos Mesh Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -10,6 +10,7 @@ // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. + package chaosd import ( @@ -19,6 +20,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/chaos-mesh/chaosd/pkg/core" + "github.com/chaos-mesh/chaosd/pkg/utils" ) func Test_diskAttack_Attack(t *testing.T) { @@ -30,9 +32,15 @@ func Test_diskAttack_Attack(t *testing.T) { Path: "./a", PayloadProcessNum: 1, } + env := Environment{ + AttackUid: "a", + Chaos: &Server{ + CmdPools: make(map[string]*utils.CommandPools), + }, + } conf, err := opt.PreProcess() assert.NoError(t, err) - err = DiskAttack.Attack(conf, Environment{}) + err = DiskAttack.Attack(conf, env) assert.NoError(t, err) f, err := os.Open("./a") @@ -47,7 +55,7 @@ func Test_diskAttack_Attack(t *testing.T) { opt.PayloadProcessNum = 4 wConf, err := opt.PreProcess() assert.NoError(t, err) - err = DiskAttack.Attack(wConf, Environment{}) + err = DiskAttack.Attack(wConf, env) assert.NoError(t, err) f, err = os.Open("./a") diff --git a/pkg/server/chaosd/recover.go b/pkg/server/chaosd/recover.go index fbc74d5e..ea532d42 100644 --- a/pkg/server/chaosd/recover.go +++ b/pkg/server/chaosd/recover.go @@ -61,6 +61,8 @@ func (s *Server) RecoverAttack(uid string) error { attackType = StressAttack case core.DiskAttack: attackType = DiskAttack + case core.DiskServerAttack: + attackType = DiskServerAttack case core.JVMAttack: attackType = JVMAttack case core.ClockAttack: diff --git a/pkg/server/chaosd/server.go b/pkg/server/chaosd/server.go index 4b127fef..ea072147 100644 --- a/pkg/server/chaosd/server.go +++ b/pkg/server/chaosd/server.go @@ -19,6 +19,7 @@ import ( "github.com/chaos-mesh/chaosd/pkg/config" "github.com/chaos-mesh/chaosd/pkg/core" "github.com/chaos-mesh/chaosd/pkg/scheduler" + "github.com/chaos-mesh/chaosd/pkg/utils" ) type Server struct { @@ -30,6 +31,8 @@ type Server struct { tcRule core.TCRuleStore conf *config.Config svr *chaosdaemon.DaemonServer + + CmdPools map[string]*utils.CommandPools } func NewServer( @@ -51,5 +54,6 @@ func NewServer( iptablesRule: iptables, tcRule: tc, svr: svr, + CmdPools: make(map[string]*utils.CommandPools), } } diff --git a/pkg/server/httpserver/auth.go b/pkg/server/httpserver/auth.go index 06c28392..a4822b6f 100644 --- a/pkg/server/httpserver/auth.go +++ b/pkg/server/httpserver/auth.go @@ -16,8 +16,8 @@ package httpserver import ( "crypto/tls" "crypto/x509" - "io/ioutil" "net/http" + "os" "time" "github.com/gin-gonic/gin" @@ -37,7 +37,7 @@ var ( errMissingClientCert = utils.ErrAuth.New("Sorry, but you need to provide a client certificate to continue") ) -func (s *httpServer) serverMode() string { +func (s *HttpServer) serverMode() string { if len(s.conf.SSLCertFile) > 0 { if len(s.conf.SSLClientCAFile) > 0 { return MTLSServer @@ -47,7 +47,7 @@ func (s *httpServer) serverMode() string { return HTTPServer } -func (s *httpServer) startHttpsServer() (err error) { +func (s *HttpServer) startHttpsServer() (err error) { mode := s.serverMode() if mode == HTTPServer { return nil @@ -60,7 +60,7 @@ func (s *httpServer) startHttpsServer() (err error) { if mode == MTLSServer { log.Info("starting HTTPS server with Client Auth", zap.String("address", httpsServerAddr)) - caCert, ioErr := ioutil.ReadFile(s.conf.SSLClientCAFile) + caCert, ioErr := os.ReadFile(s.conf.SSLClientCAFile) if ioErr != nil { err = ioErr return diff --git a/pkg/server/httpserver/experiment.go b/pkg/server/httpserver/experiment.go index bf2350f4..dee934ad 100644 --- a/pkg/server/httpserver/experiment.go +++ b/pkg/server/httpserver/experiment.go @@ -22,7 +22,7 @@ import ( "github.com/chaos-mesh/chaosd/pkg/core" ) -func (s *httpServer) listExperiments(c *gin.Context) { +func (s *HttpServer) listExperiments(c *gin.Context) { mode, ok := c.GetQuery("launch_mode") var chaosList []*core.Experiment var err error @@ -32,17 +32,17 @@ func (s *httpServer) listExperiments(c *gin.Context) { chaosList, err = s.exp.List(context.Background()) } if err != nil { - c.AbortWithError(http.StatusInternalServerError, err) + _ = c.AbortWithError(http.StatusInternalServerError, err) return } c.JSON(http.StatusOK, chaosList) } -func (s *httpServer) listExperimentRuns(c *gin.Context) { +func (s *HttpServer) listExperimentRuns(c *gin.Context) { uid := c.Param("uid") runsList, err := s.chaos.ExpRun.ListByExperimentUID(context.Background(), uid) if err != nil { - c.AbortWithError(http.StatusInternalServerError, err) + _ = c.AbortWithError(http.StatusInternalServerError, err) return } c.JSON(http.StatusOK, runsList) diff --git a/pkg/server/httpserver/server.go b/pkg/server/httpserver/server.go index ae3a2c68..1aae3b9b 100644 --- a/pkg/server/httpserver/server.go +++ b/pkg/server/httpserver/server.go @@ -30,7 +30,7 @@ import ( "github.com/chaos-mesh/chaosd/pkg/swaggerserver" ) -type httpServer struct { +type HttpServer struct { conf *config.Config chaos *chaosd.Server exp core.ExperimentStore @@ -40,15 +40,15 @@ func NewServer( conf *config.Config, chaos *chaosd.Server, exp core.ExperimentStore, -) *httpServer { - return &httpServer{ +) *HttpServer { + return &HttpServer{ conf: conf, chaos: chaos, exp: exp, } } -func Register(s *httpServer, scheduler scheduler.Scheduler) { +func Register(s *HttpServer, scheduler scheduler.Scheduler) { if s.conf.Platform != config.LocalPlatform { return } @@ -66,7 +66,7 @@ func Register(s *httpServer, scheduler scheduler.Scheduler) { scheduler.Start() } -func (s *httpServer) startHttpServer() error { +func (s *HttpServer) startHttpServer() error { httpServerAddr := s.conf.Address() log.Info("starting HTTP server", zap.String("address", httpServerAddr)) e := gin.Default() @@ -78,7 +78,7 @@ func (s *httpServer) startHttpServer() error { return e.Run(httpServerAddr) } -func (s *httpServer) handler(engine *gin.Engine) { +func (s *HttpServer) handler(engine *gin.Engine) { api := engine.Group("/api") { api.GET("/swagger/*any", swaggerserver.Handler()) @@ -107,7 +107,7 @@ func (s *httpServer) handler(engine *gin.Engine) { } } -func (s *httpServer) systemHandler(engine *gin.Engine) { +func (s *HttpServer) systemHandler(engine *gin.Engine) { api := engine.Group("/api") system := api.Group("/system") { @@ -125,10 +125,10 @@ func (s *httpServer) systemHandler(engine *gin.Engine) { // @Failure 400 {object} utils.APIError // @Failure 500 {object} utils.APIError // @Router /api/attack/process [post] -func (s *httpServer) createProcessAttack(c *gin.Context) { +func (s *HttpServer) createProcessAttack(c *gin.Context) { attack := core.NewProcessCommand() if err := c.ShouldBindJSON(attack); err != nil { - c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) + _ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) return } @@ -157,10 +157,10 @@ func (s *httpServer) createProcessAttack(c *gin.Context) { // @Failure 400 {object} utils.APIError // @Failure 500 {object} utils.APIError // @Router /api/attack/network [post] -func (s *httpServer) createNetworkAttack(c *gin.Context) { +func (s *HttpServer) createNetworkAttack(c *gin.Context) { attack := core.NewNetworkCommand() if err := c.ShouldBindJSON(attack); err != nil { - c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) + _ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) return } @@ -189,10 +189,10 @@ func (s *httpServer) createNetworkAttack(c *gin.Context) { // @Failure 400 {object} utils.APIError // @Failure 500 {object} utils.APIError // @Router /api/attack/stress [post] -func (s *httpServer) createStressAttack(c *gin.Context) { +func (s *HttpServer) createStressAttack(c *gin.Context) { attack := core.NewStressCommand() if err := c.ShouldBindJSON(attack); err != nil { - c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) + _ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) return } @@ -221,10 +221,10 @@ func (s *httpServer) createStressAttack(c *gin.Context) { // @Failure 400 {object} utils.APIError // @Failure 500 {object} utils.APIError // @Router /api/attack/disk [post] -func (s *httpServer) createDiskAttack(c *gin.Context) { - options := core.NewDiskOption() +func (s *HttpServer) createDiskAttack(c *gin.Context) { + options := core.NewDiskOptionForServer() if err := c.ShouldBindJSON(options); err != nil { - c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) + _ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) return } @@ -236,7 +236,7 @@ func (s *httpServer) createDiskAttack(c *gin.Context) { return } - uid, err := s.chaos.ExecuteAttack(chaosd.DiskAttack, attackConfig, core.ServerMode) + uid, err := s.chaos.ExecuteAttack(chaosd.DiskServerAttack, attackConfig, core.ServerMode) if err != nil { handleError(c, err) return @@ -254,10 +254,10 @@ func (s *httpServer) createDiskAttack(c *gin.Context) { // @Failure 400 {object} utils.APIError // @Failure 500 {object} utils.APIError // @Router /api/attack/clock [post] -func (s *httpServer) createClockAttack(c *gin.Context) { +func (s *HttpServer) createClockAttack(c *gin.Context) { options := core.NewClockOption() if err := c.ShouldBindJSON(options); err != nil { - c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) + _ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) return } @@ -286,10 +286,10 @@ func (s *httpServer) createClockAttack(c *gin.Context) { // @Failure 400 {object} utils.APIError // @Failure 500 {object} utils.APIError // @Router /api/attack/http [post] -func (s *httpServer) createHTTPAttack(c *gin.Context) { +func (s *HttpServer) createHTTPAttack(c *gin.Context) { attack := core.NewHTTPAttackOption() if err := c.ShouldBindJSON(attack); err != nil { - c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) + _ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) return } @@ -318,10 +318,10 @@ func (s *httpServer) createHTTPAttack(c *gin.Context) { // @Failure 400 {object} utils.APIError // @Failure 500 {object} utils.APIError // @Router /api/attack/jvm [post] -func (s *httpServer) createJVMAttack(c *gin.Context) { +func (s *HttpServer) createJVMAttack(c *gin.Context) { options := core.NewJVMCommand() if err := c.ShouldBindJSON(options); err != nil { - c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) + _ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) return } @@ -350,10 +350,10 @@ func (s *httpServer) createJVMAttack(c *gin.Context) { // @Failure 400 {object} utils.APIError // @Failure 500 {object} utils.APIError // @Router /api/attack/kafka [post] -func (s *httpServer) createKafkaAttack(c *gin.Context) { +func (s *HttpServer) createKafkaAttack(c *gin.Context) { options := core.NewKafkaCommand() if err := c.ShouldBindJSON(options); err != nil { - c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) + _ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) return } @@ -382,10 +382,10 @@ func (s *httpServer) createKafkaAttack(c *gin.Context) { // @Failure 400 {object} utils.APIError // @Failure 500 {object} utils.APIError // @Router /api/attack/vm [post] -func (s *httpServer) createVMAttack(c *gin.Context) { +func (s *HttpServer) createVMAttack(c *gin.Context) { options := core.NewVMOption() if err := c.ShouldBindJSON(options); err != nil { - c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) + _ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) return } @@ -414,10 +414,10 @@ func (s *httpServer) createVMAttack(c *gin.Context) { // @Failure 400 {object} utils.APIError // @Failure 500 {object} utils.APIError // @Router /api/attack/redis [post] -func (s *httpServer) createRedisAttack(c *gin.Context) { +func (s *HttpServer) createRedisAttack(c *gin.Context) { attack := core.NewRedisCommand() if err := c.ShouldBindJSON(attack); err != nil { - c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) + _ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) return } @@ -446,10 +446,10 @@ func (s *httpServer) createRedisAttack(c *gin.Context) { // @Failure 400 {object} utils.APIError // @Failure 500 {object} utils.APIError // @Router /api/attack/user_defined [post] -func (s *httpServer) createUserDefinedAttack(c *gin.Context) { +func (s *HttpServer) createUserDefinedAttack(c *gin.Context) { attack := core.NewUserDefinedOption() if err := c.ShouldBindJSON(attack); err != nil { - c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) + _ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err)) return } @@ -477,7 +477,7 @@ func (s *httpServer) createUserDefinedAttack(c *gin.Context) { // @Success 200 {object} utils.Response // @Failure 500 {object} utils.APIError // @Router /api/attack/{uid} [delete] -func (s *httpServer) recoverAttack(c *gin.Context) { +func (s *HttpServer) recoverAttack(c *gin.Context) { uid := c.Param("uid") err := s.chaos.RecoverAttack(uid) if err != nil { diff --git a/pkg/server/httpserver/system.go b/pkg/server/httpserver/system.go index 3ec5fadf..58351903 100644 --- a/pkg/server/httpserver/system.go +++ b/pkg/server/httpserver/system.go @@ -26,10 +26,10 @@ type healthInfo struct { Message string `json:"message"` } -func (s *httpServer) healthcheck(c *gin.Context) { +func (s *HttpServer) healthcheck(c *gin.Context) { c.JSON(http.StatusOK, healthInfo{Status: 0}) } -func (s *httpServer) version(c *gin.Context) { +func (s *HttpServer) version(c *gin.Context) { c.JSON(http.StatusOK, version.Get()) } diff --git a/pkg/utils/command.go b/pkg/utils/command.go index 2efc0bb9..eaee2b26 100644 --- a/pkg/utils/command.go +++ b/pkg/utils/command.go @@ -14,6 +14,7 @@ package utils import ( + "context" "fmt" "os/exec" "reflect" @@ -24,6 +25,16 @@ type Command struct { } func (c Command) Unmarshal(val interface{}) *exec.Cmd { + name, args := c.GetCmdArgs(val) + return exec.Command(name, args...) +} + +func (c Command) UnmarshalWithCtx(ctx context.Context, val interface{}) *exec.Cmd { + name, args := c.GetCmdArgs(val) + return exec.CommandContext(ctx, name, args...) +} + +func (c Command) GetCmdArgs(val interface{}) (string, []string) { v := reflect.ValueOf(val) var options []string @@ -39,5 +50,5 @@ func (c Command) Unmarshal(val interface{}) *exec.Cmd { options = append(options, fmt.Sprintf("%s=%v", tag, v.Field(i).String())) } } - return exec.Command(c.Name, options...) // + return c.Name, options } diff --git a/pkg/utils/command_test.go b/pkg/utils/command_test.go index ae9d3883..3d65452b 100644 --- a/pkg/utils/command_test.go +++ b/pkg/utils/command_test.go @@ -1,4 +1,4 @@ -// Copyright 2021 Chaos Mesh Authors. +// Copyright 2023 Chaos Mesh Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -10,6 +10,7 @@ // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. + package utils import ( diff --git a/pkg/utils/pool.go b/pkg/utils/pool.go new file mode 100644 index 00000000..86b54a82 --- /dev/null +++ b/pkg/utils/pool.go @@ -0,0 +1,117 @@ +// Copyright 2023 Chaos Mesh Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "context" + "fmt" + "os/exec" + "sync" + "time" + + "github.com/Jeffail/tunny" + "github.com/samber/lo" + "github.com/samber/mo" +) + +// CommandPools is a group of commands runner +type CommandPools struct { + cancel context.CancelFunc + pools *tunny.Pool + wg sync.WaitGroup +} + +// NewCommandPools returns a new CommandPools +func NewCommandPools(ctx context.Context, deadline *time.Time, size int) *CommandPools { + var ctx2 context.Context + var cancel context.CancelFunc + if deadline != nil { + ctx2, cancel = context.WithDeadline(ctx, *deadline) + } else { + ctx2, cancel = context.WithCancel(ctx) + } + return &CommandPools{ + cancel: cancel, + pools: tunny.NewFunc(size, func(payload interface{}) interface{} { + cmdPayload, ok := payload.(lo.Tuple2[string, []string]) + if !ok { + return mo.Err[[]byte](fmt.Errorf("payload is not CommandPayload")) + } + name, args := cmdPayload.Unpack() + cmd := exec.CommandContext(ctx2, name, args...) + output, err := cmd.CombinedOutput() + if err != nil { + return mo.Err[[]byte](fmt.Errorf("%s: %s", err, string(output))) + } + return mo.Ok[[]byte](output) + }), + } +} + +type CommandRunner struct { + Name string + Args []string + + outputHandler func([]byte, error, chan interface{}) + outputChanel chan interface{} +} + +func NewCommandRunner(name string, args []string) *CommandRunner { + return &CommandRunner{ + Name: name, + Args: args, + outputHandler: func(bytes []byte, err error, c chan interface{}) {}, + outputChanel: nil, + } +} + +func (r *CommandRunner) WithOutputHandler( + handler func([]byte, error, chan interface{}), + outputChanel chan interface{}, +) *CommandRunner { + r.outputHandler = handler + r.outputChanel = outputChanel + return r +} + +func (p *CommandPools) Process(name string, args []string) ([]byte, error) { + result, ok := p.pools.Process(lo.Tuple2[string, []string]{ + A: name, + B: args, + }).(mo.Result[[]byte]) + if !ok { + return nil, fmt.Errorf("payload is not Result[[]byte]") + } + return result.Get() +} + +// Start command async. +func (p *CommandPools) Start(runner *CommandRunner) { + p.wg.Add(1) + go func() { + output, err := p.Process(runner.Name, runner.Args) + runner.outputHandler(output, err, runner.outputChanel) + p.wg.Done() + }() +} + +func (p *CommandPools) Wait() { + p.wg.Wait() +} + +func (p *CommandPools) Close() { + p.cancel() + p.Wait() + p.pools.Close() +} diff --git a/pkg/utils/pool_test.go b/pkg/utils/pool_test.go new file mode 100644 index 00000000..40483c49 --- /dev/null +++ b/pkg/utils/pool_test.go @@ -0,0 +1,81 @@ +// Copyright 2023 Chaos Mesh Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "context" + "math" + "testing" + "time" + + "github.com/pingcap/log" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +func TestCommandPools_Cancel(t *testing.T) { + now := time.Now() + cmdPools := NewCommandPools(context.Background(), nil, 1) + var gErr []error + runner := NewCommandRunner("sleep", []string{"10s"}). + WithOutputHandler(func(output []byte, err error, _ chan interface{}) { + if err != nil { + log.Error(string(output), zap.Error(err)) + gErr = append(gErr, err) + } + log.Info(string(output)) + }, nil) + cmdPools.Start(runner) + cmdPools.Close() + assert.Less(t, time.Since(now).Seconds(), 10.0) + assert.Equal(t, 1, len(gErr)) +} + +func TestCommandPools_Deadline(t *testing.T) { + now := time.Now() + deadline := time.Now().Add(time.Millisecond * 50) + cmdPools := NewCommandPools(context.Background(), &deadline, 1) + var gErr []error + runner := NewCommandRunner("sleep", []string{"10s"}). + WithOutputHandler(func(output []byte, err error, _ chan interface{}) { + if err != nil { + log.Error(string(output), zap.Error(err)) + gErr = append(gErr, err) + } + log.Info(string(output)) + }, nil) + cmdPools.Start(runner) + cmdPools.Wait() + assert.Less(t, math.Abs(float64(time.Since(now).Milliseconds()-50)), 10.0) + assert.Equal(t, 1, len(gErr)) + +} + +func TestCommandPools_Normal(t *testing.T) { + now := time.Now() + cmdPools := NewCommandPools(context.Background(), nil, 1) + var gErr []error + runner := NewCommandRunner("sleep", []string{"1s"}). + WithOutputHandler(func(output []byte, err error, _ chan interface{}) { + if err != nil { + log.Error(string(output), zap.Error(err)) + gErr = append(gErr, err) + } + log.Info(string(output)) + }, nil) + cmdPools.Start(runner) + cmdPools.Wait() + assert.Less(t, time.Since(now).Seconds(), 2.0) + assert.Equal(t, 0, len(gErr)) +}