Skip to content

Commit

Permalink
Revert "[fix] fix inconsistency bug between cache layer and etcd. (#287
Browse files Browse the repository at this point in the history
…)" (#298) (#299)

This reverts commit e005d80.

Co-authored-by: tornado-ssy <[email protected]>
Co-authored-by: songshiyuan 00649746 <[email protected]>
  • Loading branch information
3 people authored Sep 18, 2023
1 parent 4d91771 commit 222aff0
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 346 deletions.
8 changes: 1 addition & 7 deletions examples/dev/kie-conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,4 @@ db:
# rsaPublicKeyFile: ./examples/dev/public.key
sync:
# turn on the synchronization switch related operations will be written to the task in the db
enabled: false
#cache:
# labels:
# - environment
# - service
# - app
# - version
enabled: false
11 changes: 3 additions & 8 deletions server/config/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ package config

// Config is yaml file struct
type Config struct {
DB DB `yaml:"db"`
RBAC RBAC `yaml:"rbac"`
Sync Sync `yaml:"sync"`
Cache Cache `yaml:"cache"`
DB DB `yaml:"db"`
RBAC RBAC `yaml:"rbac"`
Sync Sync `yaml:"sync"`
//config from cli
ConfigFile string
NodeName string
Expand Down Expand Up @@ -60,7 +59,3 @@ type RBAC struct {
type Sync struct {
Enabled bool `yaml:"enabled"`
}

type Cache struct {
Labels []string `yaml:"labels"`
}
152 changes: 65 additions & 87 deletions server/datasource/etcd/kv/kv_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/stringutil"
"github.com/apache/servicecomb-kie/server/config"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/datasource/etcd/key"
"github.com/go-chassis/foundation/backoff"
Expand All @@ -38,41 +37,33 @@ const (

type IDSet map[string]struct{}

type LabelsSet map[string]struct{}

type CacheSearchReq struct {
Domain string
Project string
Opts *datasource.FindOptions
Regex *regexp.Regexp
type Cache struct {
timeOut time.Duration
client etcdadpt.Client
revision int64
kvIDCache sync.Map
kvDocCache *goCache.Cache
}

func NewKvCache() *Cache {
kvDocCache := goCache.New(cacheExpirationTime, cacheCleanupInterval)
labelsSet := LabelsSet{}
for _, label := range config.Configurations.Cache.Labels {
labelsSet[label] = struct{}{}
}
return &Cache{
timeOut: etcdWatchTimeout,
client: etcdadpt.Instance(),
revision: 0,
kvDocCache: kvDocCache,
labelsSet: labelsSet,
}
}

func Enabled() bool {
return kvCache != nil
}

type Cache struct {
timeOut time.Duration
client etcdadpt.Client
revision int64
kvIDCache sync.Map
kvDocCache *goCache.Cache
labelsSet LabelsSet
type CacheSearchReq struct {
Domain string
Project string
Opts *datasource.FindOptions
Regex *regexp.Regexp
}

func (kc *Cache) Refresh(ctx context.Context) {
Expand Down Expand Up @@ -139,7 +130,7 @@ func (kc *Cache) list(ctx context.Context) (*etcdadpt.Response, error) {
return rsp, nil
}

func (kc *Cache) watchCallBack(_ string, rsp *etcdadpt.Response) error {
func (kc *Cache) watchCallBack(message string, rsp *etcdadpt.Response) error {
if rsp == nil || len(rsp.Kvs) == 0 {
return fmt.Errorf("unknown event")
}
Expand All @@ -163,9 +154,6 @@ func (kc *Cache) cachePut(rsp *etcdadpt.Response) {
openlog.Error(fmt.Sprintf("failed to unmarshal kv, err %v", err))
continue
}
if !kc.isInLabelsSet(kvDoc.Labels) {
continue
}
kc.StoreKvDoc(kvDoc.ID, kvDoc)
cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, kvDoc.Labels)
m, ok := kc.LoadKvIDSet(cacheKey)
Expand Down Expand Up @@ -232,6 +220,46 @@ func (kc *Cache) DeleteKvDoc(kvID string) {
kc.kvDocCache.Delete(kvID)
}

func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) {
if !req.Opts.ExactLabels {
return nil, false, nil
}

openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts))
result := &model.KVResponse{
Data: []*model.KVDoc{},
}
cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels)
kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
if !ok {
kvCache.StoreKvIDSet(cacheKey, IDSet{})
return result, true, nil
}

var docs []*model.KVDoc

var kvIdsLeft []string
for kvID := range kvIds {
if doc, ok := kvCache.LoadKvDoc(kvID); ok {
docs = append(docs, doc)
continue
}
kvIdsLeft = append(kvIdsLeft, kvID)
}

tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
docs = append(docs, tpData...)

for _, doc := range docs {
if isMatch(req, doc) {
datasource.ClearPart(doc)
result.Data = append(result.Data, doc)
}
}
result.Total = len(result.Data)
return result, true, nil
}

func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLeft []string) []*model.KVDoc {
if len(kvIdsLeft) == 0 {
return nil
Expand Down Expand Up @@ -266,6 +294,19 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, kvIdsLe
return docs
}

func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
if doc == nil {
return false
}
if req.Opts.Status != "" && doc.Status != req.Opts.Status {
return false
}
if req.Regex != nil && !req.Regex.MatchString(doc.Key) {
return false
}
return true
}

func (kc *Cache) GetKvDoc(kv *mvccpb.KeyValue) (*model.KVDoc, error) {
kvDoc := &model.KVDoc{}
err := json.Unmarshal(kv.Value, kvDoc)
Expand All @@ -285,66 +326,3 @@ func (kc *Cache) GetCacheKey(domain, project string, labels map[string]string) s
}, "/")
return inputKey
}

func (kc *Cache) isInLabelsSet(Labels map[string]string) bool {
for label := range Labels {
if _, ok := kc.labelsSet[label]; !ok {
return false
}
}
return true
}

func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, bool, error) {
result := &model.KVResponse{
Data: []*model.KVDoc{},
}
if !req.Opts.ExactLabels || !kvCache.isInLabelsSet(req.Opts.Labels) {
return result, false, nil
}

openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project %v, opts %+v", req.Domain, req.Project, *req.Opts))
cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, req.Opts.Labels)

kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
if !ok {
kvCache.StoreKvIDSet(cacheKey, IDSet{})
return result, true, nil
}

var docs []*model.KVDoc

var kvIdsLeft []string
for kvID := range kvIds {
if doc, ok := kvCache.LoadKvDoc(kvID); ok {
docs = append(docs, doc)
continue
}
kvIdsLeft = append(kvIdsLeft, kvID)
}

tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
docs = append(docs, tpData...)

for _, doc := range docs {
if isMatch(req, doc) {
datasource.ClearPart(doc)
result.Data = append(result.Data, doc)
}
}
result.Total = len(result.Data)
return result, true, nil
}

func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
if doc == nil {
return false
}
if req.Opts.Status != "" && doc.Status != req.Opts.Status {
return false
}
if req.Regex != nil && !req.Regex.MatchString(doc.Key) {
return false
}
return true
}
Loading

0 comments on commit 222aff0

Please sign in to comment.