diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 2f3dc4e1..59c16434 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -477,10 +477,10 @@ func checkExpression(expr *repository.Expression) error { } // implement apis for Expression. -func (m *apiManager) AppendExpression(ctx context.Context, exprs []repository.Expression) error { +func (m *apiManager) AppendExpression(ctx context.Context, exprs []*repository.Expression) error { // validate expressions. for index, expr := range exprs { - if err := checkExpression(&exprs[index]); nil != err { + if err := checkExpression(exprs[index]); nil != err { log.L().Error("append expression, invalidate expression", logf.Path(expr.Path), logf.Eid(expr.EntityID), logf.Owner(expr.Owner), logf.Expr(expr.Expression)) return errors.Wrap(err, "invalid expression") @@ -491,7 +491,7 @@ func (m *apiManager) AppendExpression(ctx context.Context, exprs []repository.Ex } // implement apis for Expression. -func (m *apiManager) appendExpression(ctx context.Context, exprs []repository.Expression) error { +func (m *apiManager) appendExpression(ctx context.Context, exprs []*repository.Expression) error { // validate expressions. for _, expr := range exprs { if err := expression.Validate(expr); nil != err { @@ -515,7 +515,7 @@ func (m *apiManager) appendExpression(ctx context.Context, exprs []repository.Ex return nil } -func (m *apiManager) RemoveExpression(ctx context.Context, exprs []repository.Expression) error { +func (m *apiManager) RemoveExpression(ctx context.Context, exprs []*repository.Expression) error { // delete expressions. for index := range exprs { log.L().Debug("remove expression", @@ -532,7 +532,7 @@ func (m *apiManager) RemoveExpression(ctx context.Context, exprs []repository.Ex return nil } -func (m *apiManager) GetExpression(ctx context.Context, expr repository.Expression) (*repository.Expression, error) { +func (m *apiManager) GetExpression(ctx context.Context, expr *repository.Expression) (*repository.Expression, error) { // get expression. expr, err := m.entityRepo.GetExpression(ctx, expr) if nil != err { @@ -540,7 +540,7 @@ func (m *apiManager) GetExpression(ctx context.Context, expr repository.Expressi logf.Eid(expr.EntityID), logf.Owner(expr.Owner), logf.Expr(expr.Expression)) return nil, errors.Wrap(err, "get expression") } - return &expr, nil + return expr, nil } func (m *apiManager) ListExpression(ctx context.Context, en *Base) ([]*repository.Expression, error) { @@ -577,11 +577,11 @@ func (m *apiManager) GetSubscription(ctx context.Context, subscription *reposito ////////////// -func convExprs(mp mapper.Mapper) []repository.Expression { +func convExprs(mp mapper.Mapper) []*repository.Expression { segs := strings.SplitN(mp.TQL, "select", 2) arr := strings.Split(segs[1], ",") - exprs := []repository.Expression{} + exprs := []*repository.Expression{} for index := range arr { segs = strings.Split(arr[index], " as ") @@ -591,7 +591,7 @@ func convExprs(mp mapper.Mapper) []repository.Expression { } exprs = append(exprs, - *repository.NewExpression( + repository.NewExpression( mp.Owner, mp.EntityID, mp.Name, path, segs[0], mp.Description)) } return exprs diff --git a/pkg/manager/types.go b/pkg/manager/types.go index 0e4943f9..472265fa 100644 --- a/pkg/manager/types.go +++ b/pkg/manager/types.go @@ -52,9 +52,9 @@ type APIManager interface { AppendMapperZ(context.Context, *mapper.Mapper) error // Expression. - AppendExpression(context.Context, []repository.Expression) error - RemoveExpression(context.Context, []repository.Expression) error - GetExpression(context.Context, repository.Expression) (*repository.Expression, error) + AppendExpression(context.Context, []*repository.Expression) error + RemoveExpression(context.Context, []*repository.Expression) error + GetExpression(context.Context, *repository.Expression) (*repository.Expression, error) ListExpression(context.Context, *Base) ([]*repository.Expression, error) // Subscription. diff --git a/pkg/mapper/expression/expression.go b/pkg/mapper/expression/expression.go index 78462dac..90044ff1 100644 --- a/pkg/mapper/expression/expression.go +++ b/pkg/mapper/expression/expression.go @@ -13,7 +13,7 @@ type IExpression interface { Sources() map[string][]string } -func Validate(expr repository.Expression) error { +func Validate(expr *repository.Expression) error { // check path. // check expression. diff --git a/pkg/mapper/expression/expression_test.go b/pkg/mapper/expression/expression_test.go index 3b6ff6f4..491b9086 100644 --- a/pkg/mapper/expression/expression_test.go +++ b/pkg/mapper/expression/expression_test.go @@ -47,7 +47,7 @@ func Test_Validate(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err := Validate(tt.args.expr); (err != nil) != tt.wantErr { + if err := Validate(&tt.args.expr); (err != nil) != tt.wantErr { t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/pkg/mapper/mapper.go b/pkg/mapper/mapper.go index d76c7e19..aeefa818 100644 --- a/pkg/mapper/mapper.go +++ b/pkg/mapper/mapper.go @@ -93,7 +93,8 @@ func (m *mapper) Tentacles() map[string][]Tentacler { for index, item := range tentacleConf.PropertyKeys { watchKey := WatchKey{ EntityID: entityID, - PropertyKey: item} + PropertyKey: item, + } eItems[index] = watchKey mItems = append(mItems, watchKey) } diff --git a/pkg/mapper/tentacle.go b/pkg/mapper/tentacle.go index 8f34d3f0..df7bc2ed 100644 --- a/pkg/mapper/tentacle.go +++ b/pkg/mapper/tentacle.go @@ -72,7 +72,10 @@ func (t *tentacle) Mapper() IMapper { func (t *tentacle) Copy() Tentacler { items := make([]WatchKey, len(t.items)) for index, item := range t.items { - items[index] = item + items[index] = WatchKey{ + item.EntityID, + item.PropertyKey, + } } ten := &tentacle{ diff --git a/pkg/repository/expression.go b/pkg/repository/expression.go index 2c0f1c41..b979085b 100644 --- a/pkg/repository/expression.go +++ b/pkg/repository/expression.go @@ -109,30 +109,30 @@ func (e *Expression) Prefix() string { return ListExpressionPrefix(e.Owner, e.EntityID) } -func (r *repo) PutExpression(ctx context.Context, expr Expression) error { - err := r.dao.PutResource(ctx, &expr) +func (r *repo) PutExpression(ctx context.Context, expr *Expression) error { + err := r.dao.PutResource(ctx, expr) return errors.Wrap(err, "put expression repository") } -func (r *repo) GetExpression(ctx context.Context, expr Expression) (Expression, error) { - _, err := r.dao.GetResource(ctx, &expr) +func (r *repo) GetExpression(ctx context.Context, expr *Expression) (*Expression, error) { + _, err := r.dao.GetResource(ctx, expr) return expr, errors.Wrap(err, "get expression repository") } -func (r *repo) DelExpression(ctx context.Context, expr Expression) error { - err := r.dao.DelResource(ctx, &expr) +func (r *repo) DelExpression(ctx context.Context, expr *Expression) error { + err := r.dao.DelResource(ctx, expr) return errors.Wrap(err, "del expression repository") } -func (r *repo) DelExprByEnity(ctx context.Context, expr Expression) error { +func (r *repo) DelExprByEnity(ctx context.Context, expr *Expression) error { // construct prefix key. prefix := expr.Prefix() err := r.dao.DelResources(ctx, prefix) return errors.Wrap(err, "del expressions repository") } -func (r *repo) HasExpression(ctx context.Context, expr Expression) (bool, error) { - has, err := r.dao.HasResource(ctx, &expr) +func (r *repo) HasExpression(ctx context.Context, expr *Expression) (bool, error) { + has, err := r.dao.HasResource(ctx, expr) return has, errors.Wrap(err, "exists expression repository") } diff --git a/pkg/repository/expression_test.go b/pkg/repository/expression_test.go index 55f41875..c0eb12c7 100644 --- a/pkg/repository/expression_test.go +++ b/pkg/repository/expression_test.go @@ -39,14 +39,14 @@ func TestPutExpression(t *testing.T) { for _, exprInfo := range tests { t.Run(exprInfo.name, func(t *testing.T) { - err := repoIns.PutExpression(context.Background(), *exprInfo.expr) + err := repoIns.PutExpression(context.Background(), exprInfo.expr) assert.Nil(t, err) }) } } func TestGetExpression(t *testing.T) { - _, err := repoIns.GetExpression(context.Background(), Expression{EntityID: "device123", Owner: "admin", Path: "temp"}) + _, err := repoIns.GetExpression(context.Background(), &Expression{EntityID: "device123", Owner: "admin", Path: "temp"}) t.Log(err) // assert.ErrorIs(t, err, xerrors.ErrResourceNotFound) // assert.Equal(t, "admin", expr.Owner) @@ -62,12 +62,12 @@ func TestListExpression(t *testing.T) { } func TestDelExpression(t *testing.T) { - err := repoIns.DelExpression(context.Background(), Expression{Owner: "admin", EntityID: "device123", Path: "temp"}) + err := repoIns.DelExpression(context.Background(), &Expression{Owner: "admin", EntityID: "device123", Path: "temp"}) assert.Nil(t, err) } func TestDeleteExpressions(t *testing.T) { - err := repoIns.DelExprByEnity(context.Background(), Expression{Owner: "admin", EntityID: "device123"}) + err := repoIns.DelExprByEnity(context.Background(), &Expression{Owner: "admin", EntityID: "device123"}) assert.Nil(t, err) } diff --git a/pkg/repository/schema.go b/pkg/repository/schema.go index 114abce0..d6f454e0 100644 --- a/pkg/repository/schema.go +++ b/pkg/repository/schema.go @@ -73,23 +73,23 @@ func (s *Schema) Decode(key, bytes []byte) error { return errors.Wrap(err, "decode Schema") } -func (r *repo) PutSchema(ctx context.Context, expr Schema) error { - err := r.dao.PutResource(ctx, &expr) +func (r *repo) PutSchema(ctx context.Context, expr *Schema) error { + err := r.dao.PutResource(ctx, expr) return errors.Wrap(err, "put expression repository") } -func (r *repo) GetSchema(ctx context.Context, expr Schema) (Schema, error) { - _, err := r.dao.GetResource(ctx, &expr) +func (r *repo) GetSchema(ctx context.Context, expr *Schema) (*Schema, error) { + _, err := r.dao.GetResource(ctx, expr) return expr, errors.Wrap(err, "get expression repository") } -func (r *repo) DelSchema(ctx context.Context, expr Schema) error { - err := r.dao.DelResource(ctx, &expr) +func (r *repo) DelSchema(ctx context.Context, expr *Schema) error { + err := r.dao.DelResource(ctx, expr) return errors.Wrap(err, "del expression repository") } -func (r *repo) HasSchema(ctx context.Context, expr Schema) (bool, error) { - has, err := r.dao.HasResource(ctx, &expr) +func (r *repo) HasSchema(ctx context.Context, expr *Schema) (bool, error) { + has, err := r.dao.HasResource(ctx, expr) return has, errors.Wrap(err, "exists expression repository") } diff --git a/pkg/repository/schema_test.go b/pkg/repository/schema_test.go index b0172bc3..a8c5f867 100644 --- a/pkg/repository/schema_test.go +++ b/pkg/repository/schema_test.go @@ -57,7 +57,7 @@ func Test_repo_PutSchema(t *testing.T) { for _, tt := range tests { ctx := context.Background() t.Run(tt.name, func(t *testing.T) { - if err := rr.PutSchema(ctx, tt.schema); (err != nil) != tt.wantErr { + if err := rr.PutSchema(ctx, &tt.schema); (err != nil) != tt.wantErr { t.Errorf("PutSchema() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/pkg/repository/types.go b/pkg/repository/types.go index b69004a2..e9da0efb 100644 --- a/pkg/repository/types.go +++ b/pkg/repository/types.go @@ -10,11 +10,11 @@ type IRepository interface { GetEntity(ctx context.Context, eid string) ([]byte, error) DelEntity(ctx context.Context, eid string) error HasEntity(ctx context.Context, eid string) (bool, error) - PutExpression(ctx context.Context, expr Expression) error - GetExpression(ctx context.Context, expr Expression) (Expression, error) - DelExpression(ctx context.Context, expr Expression) error - DelExprByEnity(ctx context.Context, expr Expression) error - HasExpression(ctx context.Context, expr Expression) (bool, error) + PutExpression(ctx context.Context, expr *Expression) error + GetExpression(ctx context.Context, expr *Expression) (*Expression, error) + DelExpression(ctx context.Context, expr *Expression) error + DelExprByEnity(ctx context.Context, expr *Expression) error + HasExpression(ctx context.Context, expr *Expression) (bool, error) ListExpression(ctx context.Context, rev int64, req *ListExprReq) ([]*Expression, error) RangeExpression(ctx context.Context, rev int64, handler RangeExpressionFunc) WatchExpression(ctx context.Context, rev int64, handler WatchExpressionFunc) @@ -24,4 +24,10 @@ type IRepository interface { HasSubscription(ctx context.Context, expr *Subscription) (bool, error) RangeSubscription(ctx context.Context, rev int64, handler RangeSubscriptionFunc) WatchSubscription(ctx context.Context, rev int64, handler WatchSubscriptionFunc) + PutSchema(ctx context.Context, expr *Schema) error + GetSchema(ctx context.Context, expr *Schema) (*Schema, error) + DelSchema(ctx context.Context, expr *Schema) error + HasSchema(ctx context.Context, expr *Schema) (bool, error) + RangeSchema(ctx context.Context, rev int64, handler RangeSchemaFunc) + WatchSchema(ctx context.Context, rev int64, handler WatchSchemaFunc) } diff --git a/pkg/runtime/node.go b/pkg/runtime/node.go index 3d8068b5..119a3665 100644 --- a/pkg/runtime/node.go +++ b/pkg/runtime/node.go @@ -31,6 +31,7 @@ type NodeConf struct { type Node struct { runtimes map[string]*Runtime queues map[string]*xkafka.Pubsub + schemas *SchemaStore dispatch dispatch.Dispatcher resourceManager types.ResourceManager revision int64 @@ -49,6 +50,7 @@ func NewNode(ctx context.Context, resourceManager types.ResourceManager, dispatc resourceManager: resourceManager, runtimes: make(map[string]*Runtime), queues: make(map[string]*xkafka.Pubsub), + schemas: NewSchemaStore(), } } @@ -197,13 +199,31 @@ func (n *Node) listMetadata() { } } }) + + repo.RangeSchema(ctx, n.revision, func(schemas []*repository.Schema) { + // 所有node都需要存储 schema. + for _, sm := range schemas { + log.L().Debug("sync range Schema", logf.String("schemaID", sm.ID), logf.Any("schema", sm), logf.Owner(sm.Owner)) + schemaID := sm.ID + n.schemas.Set(schemaID, sm) + } + }) + log.L().Debug("runtime.Environment initialized", logf.Elapsedms(elapsedTime.ElapsedMilli())) } // watchResource watch resources. func (n *Node) watchMetadata() { repo := n.resourceManager.Repo() - go repo.WatchExpression(context.Background(), n.revision, + go n.watchExpression(repo) + + go n.watchSubscription(repo) + + go n.watchSchema(repo) +} + +func (n *Node) watchExpression(repo repository.IRepository) { + repo.WatchExpression(context.Background(), n.revision, func(et dao.EnventType, expr repository.Expression) { switch et { case dao.DELETE: @@ -240,8 +260,10 @@ func (n *Node) watchMetadata() { log.L().Error("watch metadata changed, invalid event type") } }) +} - go repo.WatchSubscription(context.Background(), n.revision, +func (n *Node) watchSubscription(repo repository.IRepository) { + repo.WatchSubscription(context.Background(), n.revision, func(et dao.EnventType, sub *repository.Subscription) { switch et { case dao.DELETE: @@ -272,6 +294,24 @@ func (n *Node) watchMetadata() { }) } +func (n *Node) watchSchema(repo repository.IRepository) { + repo.WatchSchema(context.Background(), n.revision, + func(et dao.EnventType, sm repository.Schema) { + switch et { + case dao.DELETE: + log.L().Debug("sync DELETE Schema", logf.String("schemaID", sm.ID), logf.Owner(sm.Owner)) + schemaID := sm.ID + n.schemas.Del(schemaID) + case dao.PUT: + log.L().Debug("sync PUT Schema", logf.String("schemaID", sm.ID), logf.Any("schema", sm), logf.Owner(sm.Owner)) + schemaID := sm.ID + n.schemas.Set(schemaID, &sm) + default: + log.L().Error("watch metadata changed, invalid event type") + } + }) +} + func parseExpression(expr repository.Expression, version int) (map[string]*ExpressionInfo, error) { exprIns, err := expression.NewExpr(expr.Expression, nil) if nil != err { diff --git a/pkg/runtime/schema.go b/pkg/runtime/schema.go new file mode 100644 index 00000000..8b165b49 --- /dev/null +++ b/pkg/runtime/schema.go @@ -0,0 +1,61 @@ +/* +Copyright 2021 The tKeel Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "sync" + + "github.com/tkeel-io/core/pkg/repository" +) + +type SchemaStore struct { + *sync.RWMutex + data map[string]*repository.Schema +} + +func NewSchemaStore() *SchemaStore { + return &SchemaStore{ + RWMutex: &sync.RWMutex{}, + data: map[string]*repository.Schema{}, + } +} + +func (s SchemaStore) Get(schemaID string) *repository.Schema { + defer s.RUnlock() + s.RLock() + sm, ok := s.data[schemaID] + if !ok { + return nil + } + return sm +} + +func (s SchemaStore) Set(schemaID string, sm *repository.Schema) bool { + defer s.Unlock() + s.Lock() + _, ok := s.data[schemaID] + s.data[schemaID] = sm + return ok +} + +func (s SchemaStore) Del(schemaID string) bool { + defer s.Unlock() + s.Lock() + _, ok := s.data[schemaID] + delete(s.data, schemaID) + return ok +} diff --git a/pkg/service/expression.go b/pkg/service/expression.go index 0d742476..eb524b9f 100644 --- a/pkg/service/expression.go +++ b/pkg/service/expression.go @@ -29,9 +29,9 @@ func (s *EntityService) AppendExpression(ctx context.Context, req *pb.AppendExpr logf.Eid(req.EntityId), logf.Value(req.Expressions)) // append expressions. - expressions := make([]repository.Expression, len(req.Expressions.Expressions)) + expressions := make([]*repository.Expression, len(req.Expressions.Expressions)) for index, expr := range req.Expressions.Expressions { - expressions[index] = *repository.NewExpression( + expressions[index] = repository.NewExpression( req.Owner, req.EntityId, expr.Name, propKey(expr.Path), expr.Expression, expr.Description) } @@ -68,10 +68,10 @@ func (s *EntityService) RemoveExpression(ctx context.Context, req *pb.RemoveExpr paths = strings.Split(pathText, ",") } - exprs := []repository.Expression{} + exprs := []*repository.Expression{} for index := range paths { exprs = append(exprs, - repository.Expression{ + &repository.Expression{ Path: propKey(paths[index]), Owner: en.Owner, EntityID: en.ID, @@ -105,7 +105,7 @@ func (s *EntityService) GetExpression(ctx context.Context, in *pb.GetExpressionR var expr *repository.Expression if expr, err = s.apiManager.GetExpression(ctx, - repository.Expression{ + &repository.Expression{ Path: propKey(in.Path), Owner: en.Owner, EntityID: en.ID, diff --git a/pkg/service/mock/manager_mock.go b/pkg/service/mock/manager_mock.go index 9e8ee707..3d141ad9 100644 --- a/pkg/service/mock/manager_mock.go +++ b/pkg/service/mock/manager_mock.go @@ -10,8 +10,7 @@ import ( "github.com/tkeel-io/core/pkg/repository" ) -type APIManagerMock struct { -} +type APIManagerMock struct{} func NewAPIManagerMock() apim.APIManager { return &APIManagerMock{} @@ -74,10 +73,14 @@ func (m *APIManagerMock) CheckSubscription(ctx context.Context, en *apim.Base) ( return nil } -func (m *APIManagerMock) AppendExpression(context.Context, []repository.Expression) error { return nil } -func (m *APIManagerMock) RemoveExpression(context.Context, []repository.Expression) error { return nil } +func (m *APIManagerMock) AppendExpression(context.Context, []*repository.Expression) error { + return nil +} +func (m *APIManagerMock) RemoveExpression(context.Context, []*repository.Expression) error { + return nil +} -func (m *APIManagerMock) GetExpression(context.Context, repository.Expression) (*repository.Expression, error) { +func (m *APIManagerMock) GetExpression(context.Context, *repository.Expression) (*repository.Expression, error) { return nil, nil }