Skip to content

Commit

Permalink
imports cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
xadhatter committed Dec 7, 2023
1 parent 9ceee25 commit 766472f
Show file tree
Hide file tree
Showing 26 changed files with 249 additions and 267 deletions.
2 changes: 0 additions & 2 deletions components/.gitignore

This file was deleted.

86 changes: 43 additions & 43 deletions components/broker/engine/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/xigxog/kubefox/build"
"github.com/xigxog/kubefox/components/broker/config"
"github.com/xigxog/kubefox/components/broker/telemetry"
kubefox "github.com/xigxog/kubefox/core"
"github.com/xigxog/kubefox/core"
"github.com/xigxog/kubefox/k8s"
"github.com/xigxog/kubefox/logkf"
"github.com/xigxog/kubefox/matcher"
Expand Down Expand Up @@ -50,14 +50,14 @@ type Engine interface {
}

type Broker interface {
AuthorizeComponent(context.Context, *kubefox.Component, string) error
AuthorizeComponent(context.Context, *core.Component, string) error
Subscribe(context.Context, *SubscriptionConf) (ReplicaSubscription, error)
RecvEvent(evt *kubefox.Event, receiver Receiver) *BrokerEvent
Component() *kubefox.Component
RecvEvent(evt *core.Event, receiver Receiver) *BrokerEvent
Component() *core.Component
}

type broker struct {
comp *kubefox.Component
comp *core.Component

grpcSrv *GRPCServer

Expand All @@ -82,15 +82,15 @@ type broker struct {
}

func New() Engine {
name, id := kubefox.GenerateNameAndId()
name, id := core.GenerateNameAndId()
logkf.Global = logkf.Global.
With(logkf.KeyBrokerId, id).
With(logkf.KeyBrokerName, name)
ctrl.SetLogger(zapr.NewLogger(logkf.Global.Unwrap().Desugar()))

ctx, cancel := context.WithCancel(context.Background())
brk := &broker{
comp: &kubefox.Component{
comp: &core.Component{
Name: name,
Commit: build.Info.Commit,
Id: id,
Expand All @@ -112,7 +112,7 @@ func New() Engine {
return brk
}

func (brk *broker) Component() *kubefox.Component {
func (brk *broker) Component() *core.Component {
return brk.comp
}

Expand Down Expand Up @@ -198,7 +198,7 @@ func (brk *broker) Subscribe(ctx context.Context, conf *SubscriptionConf) (Repli
return sub, nil
}

func (brk *broker) AuthorizeComponent(ctx context.Context, comp *kubefox.Component, authToken string) error {
func (brk *broker) AuthorizeComponent(ctx context.Context, comp *core.Component, authToken string) error {
parsed, err := jwt.ParseString(authToken)
if err != nil {
return err
Expand Down Expand Up @@ -236,12 +236,12 @@ func (brk *broker) AuthorizeComponent(ctx context.Context, comp *kubefox.Compone
return nil
}

func (brk *broker) RecvEvent(evt *kubefox.Event, receiver Receiver) *BrokerEvent {
func (brk *broker) RecvEvent(evt *core.Event, receiver Receiver) *BrokerEvent {
brkEvt := &BrokerEvent{
Event: evt,
Receiver: receiver,
ReceivedAt: time.Now(),
DoneCh: make(chan *kubefox.Err),
DoneCh: make(chan *core.Err),
}

go func() {
Expand All @@ -264,17 +264,17 @@ func (brk *broker) startWorker(id int) {
if err := brk.routeEvent(log, evt); err != nil {
l := log.WithEvent(evt.Event)

kfErr := &kubefox.Err{}
kfErr := &core.Err{}
if ok := errors.As(err, &kfErr); !ok {
kfErr = kubefox.ErrUnexpected(err)
kfErr = core.ErrUnexpected(err)
}

switch kfErr.Code() {
case kubefox.CodeUnexpected:
case core.CodeUnexpected:
l.Error(err)
case kubefox.CodeBrokerMismatch:
case core.CodeBrokerMismatch:
l.Warn(err)
case kubefox.CodeUnauthorized:
case core.CodeUnauthorized:
l.Warn(err)
default:
l.Debug(err)
Expand Down Expand Up @@ -351,21 +351,21 @@ func (brk *broker) routeEvent(log *logkf.Logger, evt *BrokerEvent) error {
return brk.natsClient.Publish(evt.Target.Subject(), evt.Event)

default:
return kubefox.ErrComponentGone()
return core.ErrComponentGone()
}
}

func (brk *broker) checkEvent(evt *BrokerEvent) error {
if evt.TTL() <= 0 {
return kubefox.ErrTimeout()
return core.ErrTimeout()
}

if evt.Source == nil || !evt.Source.IsFull() {
return kubefox.ErrInvalid(fmt.Errorf("event source is invalid"))
return core.ErrInvalid(fmt.Errorf("event source is invalid"))
}

if evt.Category == kubefox.Category_RESPONSE && (evt.Target == nil || !evt.Target.IsFull()) {
return kubefox.ErrInvalid(fmt.Errorf("response target is missing required attribute"))
if evt.Category == core.Category_RESPONSE && (evt.Target == nil || !evt.Target.IsFull()) {
return core.ErrInvalid(fmt.Errorf("response target is missing required attribute"))
}

switch evt.Receiver {
Expand All @@ -374,17 +374,17 @@ func (brk *broker) checkEvent(evt *BrokerEvent) error {
evt.Target.BrokerId != "" &&
evt.Target.BrokerId != brk.comp.Id {

return kubefox.ErrBrokerMismatch(fmt.Errorf("event target broker id is %s", evt.Target.BrokerId))
return core.ErrBrokerMismatch(fmt.Errorf("event target broker id is %s", evt.Target.BrokerId))
}

case ReceiverGRPCServer:
if evt.Target != nil && !evt.Target.IsFull() && !evt.Target.IsNameOnly() {
return kubefox.ErrInvalid(fmt.Errorf("event target is invalid"))
return core.ErrInvalid(fmt.Errorf("event target is invalid"))
}

// If a valid context is not present reject.
if evt.Context == nil || (!evt.Context.IsDeployment() && !evt.Context.IsRelease()) {
return kubefox.ErrInvalid(fmt.Errorf("event context is invalid"))
return core.ErrInvalid(fmt.Errorf("event context is invalid"))
}
}

Expand All @@ -408,21 +408,21 @@ func (brk *broker) findTarget(ctx context.Context, evt *BrokerEvent) error {
matcher, err = brk.store.DeploymentMatcher(ctx, evt.Context)

default:
return kubefox.ErrInvalid(fmt.Errorf("event missing deployment or environment context"))
return core.ErrInvalid(fmt.Errorf("event missing deployment or environment context"))
}
if err != nil {
if k8s.IsNotFound(err) {
return kubefox.ErrNotFound(err)
return core.ErrNotFound(err)
}
return kubefox.ErrUnexpected(err)
return core.ErrUnexpected(err)
}

route, matched := matcher.Match(evt.Event)
switch {
case matched:
evt.RouteId = int64(route.Id)
if evt.Target == nil {
evt.Target = &kubefox.Component{}
evt.Target = &core.Component{}
}
evt.Target.Name = route.Component.Name
evt.Target.Commit = route.Component.Commit
Expand All @@ -432,7 +432,7 @@ func (brk *broker) findTarget(ctx context.Context, evt *BrokerEvent) error {
evt.RouteId = api.DefaultRouteId

default:
return kubefox.ErrRouteNotFound()
return core.ErrRouteNotFound()
}

return nil
Expand All @@ -448,15 +448,15 @@ func (brk *broker) attachEnv(ctx context.Context, evt *BrokerEvent) error {
switch {
case evt.Context.IsRelease():
if env, err := brk.store.ReleaseEnv(evt.Context.Release); err != nil {
return kubefox.ErrNotFound(err)
return core.ErrNotFound(err)
} else {
evt.EnvVars = env.GetData().Vars
evt.Adapters = env.GetData().Adapters
}

case evt.Context.IsDeployment():
if env, err := brk.store.Environment(evt.Context.Environment); err != nil {
return kubefox.ErrNotFound(err)
return core.ErrNotFound(err)
} else {
evt.EnvVars = env.GetData().Vars
evt.Adapters = env.GetData().Adapters
Expand All @@ -468,7 +468,7 @@ func (brk *broker) attachEnv(ctx context.Context, evt *BrokerEvent) error {

func (brk *broker) checkComponents(ctx context.Context, evt *BrokerEvent) error {
if evt.Target == nil || evt.Target.Name == "" || evt.Context == nil {
return kubefox.ErrComponentMismatch()
return core.ErrComponentMismatch()
}

var (
Expand All @@ -482,12 +482,12 @@ func (brk *broker) checkComponents(ctx context.Context, evt *BrokerEvent) error
appDep, err = brk.store.AppDeployment(evt.Context.Deployment)
default:
if k8s.IsNotFound(err) {
return kubefox.ErrNotFound(err)
return core.ErrNotFound(err)
}
return kubefox.ErrUnexpected()
return core.ErrUnexpected()
}
if err != nil {
return kubefox.ErrNotFound(err)
return core.ErrNotFound(err)
}

// Check if target is part of deployment spec.
Expand All @@ -499,27 +499,27 @@ func (brk *broker) checkComponents(ctx context.Context, evt *BrokerEvent) error
switch {
case depComp == nil && adapter == nil:
if !brk.store.IsGenesisAdapter(ctx, evt.Target) {
return kubefox.ErrComponentMismatch(fmt.Errorf("target component not part of deployment"))
return core.ErrComponentMismatch(fmt.Errorf("target component not part of deployment"))
}

case depComp == nil && adapter != nil:
if adapter.Type != api.ComponentTypeHTTP {
return kubefox.ErrUnsupportedAdapter(fmt.Errorf("adapter type '%s' is not supported", adapter.Type))
return core.ErrUnsupportedAdapter(fmt.Errorf("adapter type '%s' is not supported", adapter.Type))
}
evt.TargetAdapter = adapter

case evt.Target.Commit == "" && evt.RouteId == api.DefaultRouteId:
evt.Target.Commit = depComp.Commit
reg, found := brk.store.Component(ctx, evt.Target)
if !found {
return kubefox.ErrNotFound(fmt.Errorf("target component not found"))
return core.ErrNotFound(fmt.Errorf("target component not found"))
}
if !reg.DefaultHandler {
return kubefox.ErrRouteNotFound(fmt.Errorf("target component does not have default handler"))
return core.ErrRouteNotFound(fmt.Errorf("target component does not have default handler"))
}

case evt.Target.Commit != depComp.Commit:
return kubefox.ErrComponentMismatch(fmt.Errorf("target component commit does not match deployment"))
return core.ErrComponentMismatch(fmt.Errorf("target component commit does not match deployment"))
}

// Check if source is part of deployment spec.
Expand All @@ -530,16 +530,16 @@ func (brk *broker) checkComponents(ctx context.Context, evt *BrokerEvent) error
switch {
case depComp == nil && adapter == nil:
if !brk.store.IsGenesisAdapter(ctx, evt.Source) {
return kubefox.ErrComponentMismatch(fmt.Errorf("source component not part of deployment"))
return core.ErrComponentMismatch(fmt.Errorf("source component not part of deployment"))
}

case depComp == nil && adapter != nil:
if evt.Source.BrokerId != brk.comp.BrokerId {
return kubefox.ErrBrokerMismatch(fmt.Errorf("source component is adapter but does not match broker"))
return core.ErrBrokerMismatch(fmt.Errorf("source component is adapter but does not match broker"))
}

case evt.Source.Commit != depComp.Commit:
return kubefox.ErrComponentMismatch(fmt.Errorf("source component commit does not match deployment"))
return core.ErrComponentMismatch(fmt.Errorf("source component commit does not match deployment"))
}

return nil
Expand Down
Loading

0 comments on commit 766472f

Please sign in to comment.