Skip to content

Commit

Permalink
[fix] 1,修复读写并发问题 2,修复search查询出错不返回err 3,解除健康检查对etcd的依赖 4,golang-lint问…
Browse files Browse the repository at this point in the history
…题修复 (#322)

* add value filter in ListKV API (#302)

Co-authored-by: tornado-ssy <[email protected]>
(cherry picked from commit 93bbb89)

* fix the concurrent bug of KvIdCache

* fix the bug of do not report the error which occured in action of get kvdocs from etcd

* fix the bug of do not report the error which occured in action of get kvdocs from etcd

* resolve conflicts in master

* [fix] fix golangci-lint (#318)

Co-authored-by: songshiyuan 00649746 <[email protected]>
(cherry picked from commit 577408a)

* [fix] cancel the depency between healthcheck and etcd (#319)

Co-authored-by: songshiyuan 00649746 <[email protected]>
(cherry picked from commit fcacc0d)

---------

Co-authored-by: little-cui <[email protected]>
Co-authored-by: songshiyuan 00649746 <[email protected]>
  • Loading branch information
3 people authored Apr 1, 2024
1 parent 222aff0 commit 502dfdc
Show file tree
Hide file tree
Showing 13 changed files with 119 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v2
with:
version: v1.51.2
version: v1.55.2
args: --enable gofmt,gocyclo,goimports,dupl,gosec --timeout 5m --skip-dirs=examples,test --skip-files=.*_test.go$
static-checks:
runs-on: ubuntu-latest
Expand Down
2 changes: 2 additions & 0 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
QueryParamRev = "revision"
QueryParamMatch = "match"
QueryParamKey = "key"
QueryParamValue = "value"
QueryParamLabel = "label"
QueryParamStatus = "status"
QueryParamOffset = "offset"
Expand All @@ -39,6 +40,7 @@ const (
QueryParamURLPath = "urlPath"
QueryParamUserAgent = "userAgent"
QueryParamOverride = "override"
QueryParamMode = "mode"
)

// http headers
Expand Down
1 change: 1 addition & 0 deletions pkg/model/db_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type ListKVRequest struct {
Project string `json:"project,omitempty" yaml:"project,omitempty" validate:"min=1,max=256,commonName"`
Domain string `json:"domain,omitempty" yaml:"domain,omitempty" validate:"min=1,max=256,commonName"` //redundant
Key string `json:"key" yaml:"key" validate:"max=128,getKey"`
Value string `json:"value" yaml:"value" validate:"max=128"`
Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty" validate:"max=8,dive,keys,labelK,endkeys,labelV"` //redundant
Offset int64 `validate:"min=0"`
Limit int64 `validate:"min=0,max=100"`
Expand Down
57 changes: 35 additions & 22 deletions server/datasource/etcd/kv/kv_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (
"sync"
"time"

"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/stringutil"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/datasource/etcd/key"
"github.com/go-chassis/foundation/backoff"
"github.com/go-chassis/openlog"
"github.com/little-cui/etcdadpt"
goCache "github.com/patrickmn/go-cache"
"go.etcd.io/etcd/api/v3/mvccpb"

"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/stringutil"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/datasource/etcd/key"
)

func Init() {
Expand All @@ -35,8 +36,6 @@ const (
backOffMinInterval = 5 * time.Second
)

type IDSet map[string]struct{}

type Cache struct {
timeOut time.Duration
client etcdadpt.Client
Expand Down Expand Up @@ -158,11 +157,13 @@ func (kc *Cache) cachePut(rsp *etcdadpt.Response) {
cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels)
m, ok := kc.LoadKvIDSet(cacheKey)
if !ok {
kc.StoreKvIDSet(cacheKey, IDSet{kvDoc.ID: struct{}{}})
z := &sync.Map{}
z.Store(kvDoc.ID, struct{}{})
kc.StoreKvIDSet(cacheKey, z)
openlog.Info("cacheKey " + cacheKey + "not exists")
continue
}
m[kvDoc.ID] = struct{}{}
m.Store(kvDoc.ID, struct{}{})
}
}

Expand All @@ -180,23 +181,23 @@ func (kc *Cache) cacheDelete(rsp *etcdadpt.Response) {
openlog.Error("cacheKey " + cacheKey + "not exists")
continue
}
delete(m, kvDoc.ID)
m.Delete(kvDoc.ID)
}
}

func (kc *Cache) LoadKvIDSet(cacheKey string) (IDSet, bool) {
func (kc *Cache) LoadKvIDSet(cacheKey string) (*sync.Map, bool) {
val, ok := kc.kvIDCache.Load(cacheKey)
if !ok {
return nil, false
}
kvIds, ok := val.(IDSet)
kvIds, ok := val.(*sync.Map)
if !ok {
return nil, false
}
return kvIds, true
}

func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds IDSet) {
func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds *sync.Map) {
kc.kvIDCache.Store(cacheKey, kvIds)
}

Expand Down Expand Up @@ -232,22 +233,25 @@ func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool,
cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels)
kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
if !ok {
kvCache.StoreKvIDSet(cacheKey, IDSet{})
kvCache.StoreKvIDSet(cacheKey, &sync.Map{})
return result, true, nil
}

var docs []*model.KVDoc

var kvIdsLeft []string
for kvID := range kvIds {
if doc, ok := kvCache.LoadKvDoc(kvID); ok {
kvIds.Range(func(kvID, value any) bool {
if doc, ok := kvCache.LoadKvDoc(kvID.(string)); ok {
docs = append(docs, doc)
continue
} else {
kvIdsLeft = append(kvIdsLeft, kvID.(string))
}
kvIdsLeft = append(kvIdsLeft, kvID)
return true
})
tpData, err := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
if err != nil {
return nil, true, err
}

tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
docs = append(docs, tpData...)

for _, doc := range docs {
Expand All @@ -260,14 +264,15 @@ func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool,
return result, true, nil
}

func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) []*model.KVDoc {
func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) ([]*model.KVDoc, error) {
if len(kvIdsLeft) == 0 {
return nil
return nil, nil
}

openlog.Debug("get kv from etcd by kvId")
wg := sync.WaitGroup{}
docs := make([]*model.KVDoc, len(kvIdsLeft))
var getKvErr error
for i, kvID := range kvIdsLeft {
wg.Add(1)
go func(kvID string, cnt int) {
Expand All @@ -277,12 +282,14 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe
kv, err := etcdadpt.Get(ctx, docKey)
if err != nil {
openlog.Error(fmt.Sprintf("failed to get kv from etcd, err %v", err))
getKvErr = err
return
}

doc, err := kc.GetKvDoc(kv)
if err != nil {
openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err))
getKvErr = err
return
}

Expand All @@ -291,7 +298,10 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe
}(kvID, i)
}
wg.Wait()
return docs
if getKvErr != nil {
return nil, getKvErr
}
return docs, nil
}

func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
Expand All @@ -304,6 +314,9 @@ func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
if req.Regex != nil && !req.Regex.MatchString(doc.Key) {
return false
}
if req.Opts.Value != "" && !strings.Contains(doc.Value, req.Opts.Value) {
return false
}
return true
}

Expand Down
3 changes: 3 additions & 0 deletions server/datasource/etcd/kv/kv_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,5 +646,8 @@ func filterMatch(doc *model.KVDoc, opts datasource.FindOptions, regex *regexp.Re
if opts.LabelFormat != "" && doc.LabelFormat != opts.LabelFormat {
return false
}
if opts.Value != "" && !strings.Contains(doc.Value, opts.Value) {
return false
}
return true
}
3 changes: 3 additions & 0 deletions server/datasource/mongo/kv/kv_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ func findKV(ctx context.Context, domain string, project string, opts datasource.
filter["key"] = bson.M{"$regex": "^" + value + "$", "$options": "$i"}
}
}
if opts.Value != "" {
filter["value"] = bson.M{"$regex": opts.Value}
}
if len(opts.Labels) != 0 {
for k, v := range opts.Labels {
filter["labels."+k] = v
Expand Down
8 changes: 8 additions & 0 deletions server/datasource/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type FindOptions struct {
Depth int
ID string
Key string
Value string
Labels map[string]string
LabelFormat string
ClearLabel bool
Expand Down Expand Up @@ -115,6 +116,13 @@ func WithKey(key string) FindOption {
}
}

// WithValue find by value
func WithValue(value string) FindOption {
return func(o *FindOptions) {
o.Value = value
}
}

// WithStatus enabled/disabled
func WithStatus(status string) FindOption {
return func(o *FindOptions) {
Expand Down
10 changes: 8 additions & 2 deletions server/resource/v1/admin_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
"strconv"
"time"

"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/server/datasource"
goRestful "github.com/emicklei/go-restful"
"github.com/go-chassis/cari/config"
"github.com/go-chassis/go-chassis/v2/pkg/runtime"
"github.com/go-chassis/go-chassis/v2/server/restful"
"github.com/go-chassis/openlog"

"github.com/apache/servicecomb-kie/pkg/common"
"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/server/datasource"
)

type AdminResource struct {
Expand Down Expand Up @@ -57,6 +59,10 @@ func (r *AdminResource) URLPatterns() []restful.Route {

// HealthCheck provider version info and time info
func (r *AdminResource) HealthCheck(context *restful.Context) {
healthCheckMode := context.ReadQueryParameter(common.QueryParamMode)
if healthCheckMode == "liveness" {
return
}
domain := ReadDomain(context.Ctx)
resp := &model.DocHealthCheck{}
latest, err := datasource.GetBroker().GetRevisionDao().GetRevision(context.Ctx, domain)
Expand Down
18 changes: 16 additions & 2 deletions server/resource/v1/admin_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import (

_ "github.com/apache/servicecomb-kie/test"

"github.com/apache/servicecomb-kie/pkg/model"
v1 "github.com/apache/servicecomb-kie/server/resource/v1"
"github.com/go-chassis/go-chassis/v2/server/restful/restfultest"
"github.com/stretchr/testify/assert"

"github.com/apache/servicecomb-kie/pkg/model"
v1 "github.com/apache/servicecomb-kie/server/resource/v1"
)

func Test_HeathCheck(t *testing.T) {
Expand All @@ -48,3 +49,16 @@ func Test_HeathCheck(t *testing.T) {
assert.NoError(t, err)
assert.NotEmpty(t, data)
}

func Test_HeakthCheckLiveMode(t *testing.T) {
path := fmt.Sprintf("/v1/health?mode=liveness")
r, _ := http.NewRequest("GET", path, nil)

revision := &v1.AdminResource{}
c, err := restfultest.New(revision, nil)
assert.NoError(t, err)
resp := httptest.NewRecorder()
c.ServeHTTP(resp, r)
respcode := resp.Code
assert.NotEmpty(t, respcode)
}
1 change: 1 addition & 0 deletions server/resource/v1/kv_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (r *KVResource) List(rctx *restful.Context) {
Project: rctx.ReadPathParameter(common.PathParameterProject),
Domain: ReadDomain(rctx.Ctx),
Key: rctx.ReadQueryParameter(common.QueryParamKey),
Value: rctx.ReadQueryParameter(common.QueryParamValue),
Status: rctx.ReadQueryParameter(common.QueryParamStatus),
Match: getMatchPattern(rctx),
}
Expand Down
32 changes: 32 additions & 0 deletions server/resource/v1/kv_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,38 @@ func TestKVResource_List(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 3, len(result.Data))
})
t.Run("list kv by value, should return 1 kv", func(t *testing.T) {
r, _ := http.NewRequest("GET", "/v1/kv_test/kie/kv?value=aaa", nil)
r.Header.Set("Content-Type", "application/json")
kvr := &v1.KVResource{}
c, err := restfultest.New(kvr, nil)
assert.NoError(t, err)
resp := httptest.NewRecorder()
c.ServeHTTP(resp, r)
body, err := ioutil.ReadAll(resp.Body)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.Code, string(body))
result := &model.KVResponse{}
err = json.Unmarshal(body, result)
assert.NoError(t, err)
assert.Equal(t, 1, len(result.Data))
})
t.Run("list kv by value, should return 1 kv", func(t *testing.T) {
r, _ := http.NewRequest("GET", "/v1/kv_test/kie/kv?value=AAA", nil)
r.Header.Set("Content-Type", "application/json")
kvr := &v1.KVResource{}
c, err := restfultest.New(kvr, nil)
assert.NoError(t, err)
resp := httptest.NewRecorder()
c.ServeHTTP(resp, r)
body, err := ioutil.ReadAll(resp.Body)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.Code, string(body))
result := &model.KVResponse{}
err = json.Unmarshal(body, result)
assert.NoError(t, err)
assert.Equal(t, 0, len(result.Data))
})
var rev string
t.Run("list kv by service label, exact match,should return 2 kv", func(t *testing.T) {
r, _ := http.NewRequest("GET", "/v1/kv_test/kie/kv?label=service:utService&match=exact", nil)
Expand Down
7 changes: 4 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
package server

import (
chassis "github.com/go-chassis/go-chassis/v2"
"github.com/go-chassis/go-chassis/v2/core/common"
"github.com/go-chassis/openlog"

"github.com/apache/servicecomb-kie/pkg/validator"
"github.com/apache/servicecomb-kie/server/config"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/db"
"github.com/apache/servicecomb-kie/server/pubsub"
"github.com/apache/servicecomb-kie/server/rbac"
v1 "github.com/apache/servicecomb-kie/server/resource/v1"
"github.com/go-chassis/go-chassis/v2"
"github.com/go-chassis/go-chassis/v2/core/common"
"github.com/go-chassis/openlog"
)

func Run() {
Expand Down
5 changes: 5 additions & 0 deletions server/service/kv/kv_svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kv
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"strings"
"time"
Expand All @@ -45,6 +46,7 @@ var listSema = concurrency.NewSemaphore(concurrency.DefaultConcurrency)
func ListKV(ctx context.Context, request *model.ListKVRequest) (int64, *model.KVResponse, *errsvc.Error) {
opts := []datasource.FindOption{
datasource.WithKey(request.Key),
datasource.WithValue(request.Value),
datasource.WithLabels(request.Labels),
datasource.WithOffset(request.Offset),
datasource.WithLimit(request.Limit),
Expand Down Expand Up @@ -126,6 +128,9 @@ func Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, *errsvc.Error)
kv, err = datasource.GetBroker().GetKVDao().Create(ctx, kv, datasource.WithSync(sync.FromContext(ctx)))
if err != nil {
openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
if errors.Is(err, datasource.ErrKVAlreadyExists) {
err = config.NewError(config.ErrRecordAlreadyExists, datasource.ErrKVAlreadyExists.Error())
}
return nil, util.SvcErr(err)
}
err = datasource.GetBroker().GetHistoryDao().AddHistory(ctx, kv)
Expand Down

0 comments on commit 502dfdc

Please sign in to comment.