Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update struct point #286

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,10 +477,10 @@ func checkExpression(expr *repository.Expression) error {
}

// implement apis for Expression.
func (m *apiManager) AppendExpression(ctx context.Context, exprs []repository.Expression) error {
func (m *apiManager) AppendExpression(ctx context.Context, exprs []*repository.Expression) error {
// validate expressions.
for index, expr := range exprs {
if err := checkExpression(&exprs[index]); nil != err {
if err := checkExpression(exprs[index]); nil != err {
log.L().Error("append expression, invalidate expression", logf.Path(expr.Path),
logf.Eid(expr.EntityID), logf.Owner(expr.Owner), logf.Expr(expr.Expression))
return errors.Wrap(err, "invalid expression")
Expand All @@ -491,7 +491,7 @@ func (m *apiManager) AppendExpression(ctx context.Context, exprs []repository.Ex
}

// implement apis for Expression.
func (m *apiManager) appendExpression(ctx context.Context, exprs []repository.Expression) error {
func (m *apiManager) appendExpression(ctx context.Context, exprs []*repository.Expression) error {
// validate expressions.
for _, expr := range exprs {
if err := expression.Validate(expr); nil != err {
Expand All @@ -515,7 +515,7 @@ func (m *apiManager) appendExpression(ctx context.Context, exprs []repository.Ex
return nil
}

func (m *apiManager) RemoveExpression(ctx context.Context, exprs []repository.Expression) error {
func (m *apiManager) RemoveExpression(ctx context.Context, exprs []*repository.Expression) error {
// delete expressions.
for index := range exprs {
log.L().Debug("remove expression",
Expand All @@ -532,15 +532,15 @@ func (m *apiManager) RemoveExpression(ctx context.Context, exprs []repository.Ex
return nil
}

func (m *apiManager) GetExpression(ctx context.Context, expr repository.Expression) (*repository.Expression, error) {
func (m *apiManager) GetExpression(ctx context.Context, expr *repository.Expression) (*repository.Expression, error) {
// get expression.
expr, err := m.entityRepo.GetExpression(ctx, expr)
if nil != err {
log.L().Error("get expression", logf.Error(err), logf.Path(expr.Path),
logf.Eid(expr.EntityID), logf.Owner(expr.Owner), logf.Expr(expr.Expression))
return nil, errors.Wrap(err, "get expression")
}
return &expr, nil
return expr, nil
}

func (m *apiManager) ListExpression(ctx context.Context, en *Base) ([]*repository.Expression, error) {
Expand Down Expand Up @@ -577,11 +577,11 @@ func (m *apiManager) GetSubscription(ctx context.Context, subscription *reposito

//////////////

func convExprs(mp mapper.Mapper) []repository.Expression {
func convExprs(mp mapper.Mapper) []*repository.Expression {
segs := strings.SplitN(mp.TQL, "select", 2)
arr := strings.Split(segs[1], ",")

exprs := []repository.Expression{}
exprs := []*repository.Expression{}
for index := range arr {
segs = strings.Split(arr[index], " as ")

Expand All @@ -591,7 +591,7 @@ func convExprs(mp mapper.Mapper) []repository.Expression {
}

exprs = append(exprs,
*repository.NewExpression(
repository.NewExpression(
mp.Owner, mp.EntityID, mp.Name, path, segs[0], mp.Description))
}
return exprs
Expand Down
6 changes: 3 additions & 3 deletions pkg/manager/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ type APIManager interface {
AppendMapperZ(context.Context, *mapper.Mapper) error

// Expression.
AppendExpression(context.Context, []repository.Expression) error
RemoveExpression(context.Context, []repository.Expression) error
GetExpression(context.Context, repository.Expression) (*repository.Expression, error)
AppendExpression(context.Context, []*repository.Expression) error
RemoveExpression(context.Context, []*repository.Expression) error
GetExpression(context.Context, *repository.Expression) (*repository.Expression, error)
ListExpression(context.Context, *Base) ([]*repository.Expression, error)

// Subscription.
Expand Down
2 changes: 1 addition & 1 deletion pkg/mapper/expression/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type IExpression interface {
Sources() map[string][]string
}

func Validate(expr repository.Expression) error {
func Validate(expr *repository.Expression) error {
// check path.

// check expression.
Expand Down
2 changes: 1 addition & 1 deletion pkg/mapper/expression/expression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func Test_Validate(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := Validate(tt.args.expr); (err != nil) != tt.wantErr {
if err := Validate(&tt.args.expr); (err != nil) != tt.wantErr {
t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
3 changes: 2 additions & 1 deletion pkg/mapper/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func (m *mapper) Tentacles() map[string][]Tentacler {
for index, item := range tentacleConf.PropertyKeys {
watchKey := WatchKey{
EntityID: entityID,
PropertyKey: item}
PropertyKey: item,
}
eItems[index] = watchKey
mItems = append(mItems, watchKey)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/mapper/tentacle.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ func (t *tentacle) Mapper() IMapper {
func (t *tentacle) Copy() Tentacler {
items := make([]WatchKey, len(t.items))
for index, item := range t.items {
items[index] = item
items[index] = WatchKey{
item.EntityID,
item.PropertyKey,
}
}

ten := &tentacle{
Expand Down
18 changes: 9 additions & 9 deletions pkg/repository/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,30 +109,30 @@ func (e *Expression) Prefix() string {
return ListExpressionPrefix(e.Owner, e.EntityID)
}

func (r *repo) PutExpression(ctx context.Context, expr Expression) error {
err := r.dao.PutResource(ctx, &expr)
func (r *repo) PutExpression(ctx context.Context, expr *Expression) error {
err := r.dao.PutResource(ctx, expr)
return errors.Wrap(err, "put expression repository")
}

func (r *repo) GetExpression(ctx context.Context, expr Expression) (Expression, error) {
_, err := r.dao.GetResource(ctx, &expr)
func (r *repo) GetExpression(ctx context.Context, expr *Expression) (*Expression, error) {
_, err := r.dao.GetResource(ctx, expr)
return expr, errors.Wrap(err, "get expression repository")
}

func (r *repo) DelExpression(ctx context.Context, expr Expression) error {
err := r.dao.DelResource(ctx, &expr)
func (r *repo) DelExpression(ctx context.Context, expr *Expression) error {
err := r.dao.DelResource(ctx, expr)
return errors.Wrap(err, "del expression repository")
}

func (r *repo) DelExprByEnity(ctx context.Context, expr Expression) error {
func (r *repo) DelExprByEnity(ctx context.Context, expr *Expression) error {
// construct prefix key.
prefix := expr.Prefix()
err := r.dao.DelResources(ctx, prefix)
return errors.Wrap(err, "del expressions repository")
}

func (r *repo) HasExpression(ctx context.Context, expr Expression) (bool, error) {
has, err := r.dao.HasResource(ctx, &expr)
func (r *repo) HasExpression(ctx context.Context, expr *Expression) (bool, error) {
has, err := r.dao.HasResource(ctx, expr)
return has, errors.Wrap(err, "exists expression repository")
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/repository/expression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func TestPutExpression(t *testing.T) {

for _, exprInfo := range tests {
t.Run(exprInfo.name, func(t *testing.T) {
err := repoIns.PutExpression(context.Background(), *exprInfo.expr)
err := repoIns.PutExpression(context.Background(), exprInfo.expr)
assert.Nil(t, err)
})
}
}

func TestGetExpression(t *testing.T) {
_, err := repoIns.GetExpression(context.Background(), Expression{EntityID: "device123", Owner: "admin", Path: "temp"})
_, err := repoIns.GetExpression(context.Background(), &Expression{EntityID: "device123", Owner: "admin", Path: "temp"})
t.Log(err)
// assert.ErrorIs(t, err, xerrors.ErrResourceNotFound)
// assert.Equal(t, "admin", expr.Owner)
Expand All @@ -62,12 +62,12 @@ func TestListExpression(t *testing.T) {
}

func TestDelExpression(t *testing.T) {
err := repoIns.DelExpression(context.Background(), Expression{Owner: "admin", EntityID: "device123", Path: "temp"})
err := repoIns.DelExpression(context.Background(), &Expression{Owner: "admin", EntityID: "device123", Path: "temp"})
assert.Nil(t, err)
}

func TestDeleteExpressions(t *testing.T) {
err := repoIns.DelExprByEnity(context.Background(), Expression{Owner: "admin", EntityID: "device123"})
err := repoIns.DelExprByEnity(context.Background(), &Expression{Owner: "admin", EntityID: "device123"})
assert.Nil(t, err)
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/repository/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,23 @@ func (s *Schema) Decode(key, bytes []byte) error {
return errors.Wrap(err, "decode Schema")
}

func (r *repo) PutSchema(ctx context.Context, expr Schema) error {
err := r.dao.PutResource(ctx, &expr)
func (r *repo) PutSchema(ctx context.Context, expr *Schema) error {
err := r.dao.PutResource(ctx, expr)
return errors.Wrap(err, "put expression repository")
}

func (r *repo) GetSchema(ctx context.Context, expr Schema) (Schema, error) {
_, err := r.dao.GetResource(ctx, &expr)
func (r *repo) GetSchema(ctx context.Context, expr *Schema) (*Schema, error) {
_, err := r.dao.GetResource(ctx, expr)
return expr, errors.Wrap(err, "get expression repository")
}

func (r *repo) DelSchema(ctx context.Context, expr Schema) error {
err := r.dao.DelResource(ctx, &expr)
func (r *repo) DelSchema(ctx context.Context, expr *Schema) error {
err := r.dao.DelResource(ctx, expr)
return errors.Wrap(err, "del expression repository")
}

func (r *repo) HasSchema(ctx context.Context, expr Schema) (bool, error) {
has, err := r.dao.HasResource(ctx, &expr)
func (r *repo) HasSchema(ctx context.Context, expr *Schema) (bool, error) {
has, err := r.dao.HasResource(ctx, expr)
return has, errors.Wrap(err, "exists expression repository")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/repository/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func Test_repo_PutSchema(t *testing.T) {
for _, tt := range tests {
ctx := context.Background()
t.Run(tt.name, func(t *testing.T) {
if err := rr.PutSchema(ctx, tt.schema); (err != nil) != tt.wantErr {
if err := rr.PutSchema(ctx, &tt.schema); (err != nil) != tt.wantErr {
t.Errorf("PutSchema() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
16 changes: 11 additions & 5 deletions pkg/repository/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ type IRepository interface {
GetEntity(ctx context.Context, eid string) ([]byte, error)
DelEntity(ctx context.Context, eid string) error
HasEntity(ctx context.Context, eid string) (bool, error)
PutExpression(ctx context.Context, expr Expression) error
GetExpression(ctx context.Context, expr Expression) (Expression, error)
DelExpression(ctx context.Context, expr Expression) error
DelExprByEnity(ctx context.Context, expr Expression) error
HasExpression(ctx context.Context, expr Expression) (bool, error)
PutExpression(ctx context.Context, expr *Expression) error
GetExpression(ctx context.Context, expr *Expression) (*Expression, error)
DelExpression(ctx context.Context, expr *Expression) error
DelExprByEnity(ctx context.Context, expr *Expression) error
HasExpression(ctx context.Context, expr *Expression) (bool, error)
ListExpression(ctx context.Context, rev int64, req *ListExprReq) ([]*Expression, error)
RangeExpression(ctx context.Context, rev int64, handler RangeExpressionFunc)
WatchExpression(ctx context.Context, rev int64, handler WatchExpressionFunc)
Expand All @@ -24,4 +24,10 @@ type IRepository interface {
HasSubscription(ctx context.Context, expr *Subscription) (bool, error)
RangeSubscription(ctx context.Context, rev int64, handler RangeSubscriptionFunc)
WatchSubscription(ctx context.Context, rev int64, handler WatchSubscriptionFunc)
PutSchema(ctx context.Context, expr *Schema) error
GetSchema(ctx context.Context, expr *Schema) (*Schema, error)
DelSchema(ctx context.Context, expr *Schema) error
HasSchema(ctx context.Context, expr *Schema) (bool, error)
RangeSchema(ctx context.Context, rev int64, handler RangeSchemaFunc)
WatchSchema(ctx context.Context, rev int64, handler WatchSchemaFunc)
}
44 changes: 42 additions & 2 deletions pkg/runtime/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type NodeConf struct {
type Node struct {
runtimes map[string]*Runtime
queues map[string]*xkafka.Pubsub
schemas *SchemaStore
dispatch dispatch.Dispatcher
resourceManager types.ResourceManager
revision int64
Expand All @@ -49,6 +50,7 @@ func NewNode(ctx context.Context, resourceManager types.ResourceManager, dispatc
resourceManager: resourceManager,
runtimes: make(map[string]*Runtime),
queues: make(map[string]*xkafka.Pubsub),
schemas: NewSchemaStore(),
}
}

Expand Down Expand Up @@ -197,13 +199,31 @@ func (n *Node) listMetadata() {
}
}
})

repo.RangeSchema(ctx, n.revision, func(schemas []*repository.Schema) {
// 所有node都需要存储 schema.
for _, sm := range schemas {
log.L().Debug("sync range Schema", logf.String("schemaID", sm.ID), logf.Any("schema", sm), logf.Owner(sm.Owner))
schemaID := sm.ID
n.schemas.Set(schemaID, sm)
}
})

log.L().Debug("runtime.Environment initialized", logf.Elapsedms(elapsedTime.ElapsedMilli()))
}

// watchResource watch resources.
func (n *Node) watchMetadata() {
repo := n.resourceManager.Repo()
go repo.WatchExpression(context.Background(), n.revision,
go n.watchExpression(repo)

go n.watchSubscription(repo)

go n.watchSchema(repo)
}

func (n *Node) watchExpression(repo repository.IRepository) {
repo.WatchExpression(context.Background(), n.revision,
func(et dao.EnventType, expr repository.Expression) {
switch et {
case dao.DELETE:
Expand Down Expand Up @@ -240,8 +260,10 @@ func (n *Node) watchMetadata() {
log.L().Error("watch metadata changed, invalid event type")
}
})
}

go repo.WatchSubscription(context.Background(), n.revision,
func (n *Node) watchSubscription(repo repository.IRepository) {
repo.WatchSubscription(context.Background(), n.revision,
func(et dao.EnventType, sub *repository.Subscription) {
switch et {
case dao.DELETE:
Expand Down Expand Up @@ -272,6 +294,24 @@ func (n *Node) watchMetadata() {
})
}

func (n *Node) watchSchema(repo repository.IRepository) {
repo.WatchSchema(context.Background(), n.revision,
func(et dao.EnventType, sm repository.Schema) {
switch et {
case dao.DELETE:
log.L().Debug("sync DELETE Schema", logf.String("schemaID", sm.ID), logf.Owner(sm.Owner))
schemaID := sm.ID
n.schemas.Del(schemaID)
case dao.PUT:
log.L().Debug("sync PUT Schema", logf.String("schemaID", sm.ID), logf.Any("schema", sm), logf.Owner(sm.Owner))
schemaID := sm.ID
n.schemas.Set(schemaID, &sm)
default:
log.L().Error("watch metadata changed, invalid event type")
}
})
}

func parseExpression(expr repository.Expression, version int) (map[string]*ExpressionInfo, error) {
exprIns, err := expression.NewExpr(expr.Expression, nil)
if nil != err {
Expand Down
Loading