From 766472f99055443c12d6b606e2381064c52d4a9c Mon Sep 17 00:00:00 2001 From: John Long Date: Thu, 7 Dec 2023 11:11:31 -0500 Subject: [PATCH] imports cleanup --- components/.gitignore | 2 - components/broker/engine/broker.go | 86 +++++++++---------- components/broker/engine/grpc_server.go | 36 ++++---- components/broker/engine/http_client.go | 18 ++-- components/broker/engine/nats_client.go | 14 +-- components/broker/engine/store.go | 22 ++--- components/broker/engine/sub_mgr.go | 24 +++--- components/broker/engine/types.go | 12 +-- components/broker/telemetry/client.go | 2 +- components/broker/telemetry/tracer.go | 13 +-- components/httpsrv/main.go | 4 +- components/httpsrv/server/server.go | 8 +- components/httpsrv/server/vars.go | 4 +- components/operator/controller/client.go | 8 +- .../kubefox/components/backend/main.go | 2 +- .../kubefox/components/frontend/main.go | 2 +- grpc/client.go | 34 ++++---- k8s/utils.go | 4 +- kit/env/env.go | 26 ++---- kit/graphql/graphql.go | 6 -- kit/kit.go | 37 ++++---- kit/kontext.go | 38 ++++---- kit/types.go | 24 +++--- logkf/logger.go | 12 +-- matcher/event_matcher.go | 58 ++++++------- matcher/event_matcher_test.go | 20 ++--- 26 files changed, 249 insertions(+), 267 deletions(-) delete mode 100644 components/.gitignore diff --git a/components/.gitignore b/components/.gitignore deleted file mode 100644 index 867dad5..0000000 --- a/components/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -frontend/ -backend/ \ No newline at end of file diff --git a/components/broker/engine/broker.go b/components/broker/engine/broker.go index bb341f7..e819761 100644 --- a/components/broker/engine/broker.go +++ b/components/broker/engine/broker.go @@ -18,7 +18,7 @@ import ( "github.com/xigxog/kubefox/build" "github.com/xigxog/kubefox/components/broker/config" "github.com/xigxog/kubefox/components/broker/telemetry" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" "github.com/xigxog/kubefox/k8s" "github.com/xigxog/kubefox/logkf" "github.com/xigxog/kubefox/matcher" @@ -50,14 +50,14 @@ type Engine interface { } type Broker interface { - AuthorizeComponent(context.Context, *kubefox.Component, string) error + AuthorizeComponent(context.Context, *core.Component, string) error Subscribe(context.Context, *SubscriptionConf) (ReplicaSubscription, error) - RecvEvent(evt *kubefox.Event, receiver Receiver) *BrokerEvent - Component() *kubefox.Component + RecvEvent(evt *core.Event, receiver Receiver) *BrokerEvent + Component() *core.Component } type broker struct { - comp *kubefox.Component + comp *core.Component grpcSrv *GRPCServer @@ -82,7 +82,7 @@ type broker struct { } func New() Engine { - name, id := kubefox.GenerateNameAndId() + name, id := core.GenerateNameAndId() logkf.Global = logkf.Global. With(logkf.KeyBrokerId, id). With(logkf.KeyBrokerName, name) @@ -90,7 +90,7 @@ func New() Engine { ctx, cancel := context.WithCancel(context.Background()) brk := &broker{ - comp: &kubefox.Component{ + comp: &core.Component{ Name: name, Commit: build.Info.Commit, Id: id, @@ -112,7 +112,7 @@ func New() Engine { return brk } -func (brk *broker) Component() *kubefox.Component { +func (brk *broker) Component() *core.Component { return brk.comp } @@ -198,7 +198,7 @@ func (brk *broker) Subscribe(ctx context.Context, conf *SubscriptionConf) (Repli return sub, nil } -func (brk *broker) AuthorizeComponent(ctx context.Context, comp *kubefox.Component, authToken string) error { +func (brk *broker) AuthorizeComponent(ctx context.Context, comp *core.Component, authToken string) error { parsed, err := jwt.ParseString(authToken) if err != nil { return err @@ -236,12 +236,12 @@ func (brk *broker) AuthorizeComponent(ctx context.Context, comp *kubefox.Compone return nil } -func (brk *broker) RecvEvent(evt *kubefox.Event, receiver Receiver) *BrokerEvent { +func (brk *broker) RecvEvent(evt *core.Event, receiver Receiver) *BrokerEvent { brkEvt := &BrokerEvent{ Event: evt, Receiver: receiver, ReceivedAt: time.Now(), - DoneCh: make(chan *kubefox.Err), + DoneCh: make(chan *core.Err), } go func() { @@ -264,17 +264,17 @@ func (brk *broker) startWorker(id int) { if err := brk.routeEvent(log, evt); err != nil { l := log.WithEvent(evt.Event) - kfErr := &kubefox.Err{} + kfErr := &core.Err{} if ok := errors.As(err, &kfErr); !ok { - kfErr = kubefox.ErrUnexpected(err) + kfErr = core.ErrUnexpected(err) } switch kfErr.Code() { - case kubefox.CodeUnexpected: + case core.CodeUnexpected: l.Error(err) - case kubefox.CodeBrokerMismatch: + case core.CodeBrokerMismatch: l.Warn(err) - case kubefox.CodeUnauthorized: + case core.CodeUnauthorized: l.Warn(err) default: l.Debug(err) @@ -351,21 +351,21 @@ func (brk *broker) routeEvent(log *logkf.Logger, evt *BrokerEvent) error { return brk.natsClient.Publish(evt.Target.Subject(), evt.Event) default: - return kubefox.ErrComponentGone() + return core.ErrComponentGone() } } func (brk *broker) checkEvent(evt *BrokerEvent) error { if evt.TTL() <= 0 { - return kubefox.ErrTimeout() + return core.ErrTimeout() } if evt.Source == nil || !evt.Source.IsFull() { - return kubefox.ErrInvalid(fmt.Errorf("event source is invalid")) + return core.ErrInvalid(fmt.Errorf("event source is invalid")) } - if evt.Category == kubefox.Category_RESPONSE && (evt.Target == nil || !evt.Target.IsFull()) { - return kubefox.ErrInvalid(fmt.Errorf("response target is missing required attribute")) + if evt.Category == core.Category_RESPONSE && (evt.Target == nil || !evt.Target.IsFull()) { + return core.ErrInvalid(fmt.Errorf("response target is missing required attribute")) } switch evt.Receiver { @@ -374,17 +374,17 @@ func (brk *broker) checkEvent(evt *BrokerEvent) error { evt.Target.BrokerId != "" && evt.Target.BrokerId != brk.comp.Id { - return kubefox.ErrBrokerMismatch(fmt.Errorf("event target broker id is %s", evt.Target.BrokerId)) + return core.ErrBrokerMismatch(fmt.Errorf("event target broker id is %s", evt.Target.BrokerId)) } case ReceiverGRPCServer: if evt.Target != nil && !evt.Target.IsFull() && !evt.Target.IsNameOnly() { - return kubefox.ErrInvalid(fmt.Errorf("event target is invalid")) + return core.ErrInvalid(fmt.Errorf("event target is invalid")) } // If a valid context is not present reject. if evt.Context == nil || (!evt.Context.IsDeployment() && !evt.Context.IsRelease()) { - return kubefox.ErrInvalid(fmt.Errorf("event context is invalid")) + return core.ErrInvalid(fmt.Errorf("event context is invalid")) } } @@ -408,13 +408,13 @@ func (brk *broker) findTarget(ctx context.Context, evt *BrokerEvent) error { matcher, err = brk.store.DeploymentMatcher(ctx, evt.Context) default: - return kubefox.ErrInvalid(fmt.Errorf("event missing deployment or environment context")) + return core.ErrInvalid(fmt.Errorf("event missing deployment or environment context")) } if err != nil { if k8s.IsNotFound(err) { - return kubefox.ErrNotFound(err) + return core.ErrNotFound(err) } - return kubefox.ErrUnexpected(err) + return core.ErrUnexpected(err) } route, matched := matcher.Match(evt.Event) @@ -422,7 +422,7 @@ func (brk *broker) findTarget(ctx context.Context, evt *BrokerEvent) error { case matched: evt.RouteId = int64(route.Id) if evt.Target == nil { - evt.Target = &kubefox.Component{} + evt.Target = &core.Component{} } evt.Target.Name = route.Component.Name evt.Target.Commit = route.Component.Commit @@ -432,7 +432,7 @@ func (brk *broker) findTarget(ctx context.Context, evt *BrokerEvent) error { evt.RouteId = api.DefaultRouteId default: - return kubefox.ErrRouteNotFound() + return core.ErrRouteNotFound() } return nil @@ -448,7 +448,7 @@ func (brk *broker) attachEnv(ctx context.Context, evt *BrokerEvent) error { switch { case evt.Context.IsRelease(): if env, err := brk.store.ReleaseEnv(evt.Context.Release); err != nil { - return kubefox.ErrNotFound(err) + return core.ErrNotFound(err) } else { evt.EnvVars = env.GetData().Vars evt.Adapters = env.GetData().Adapters @@ -456,7 +456,7 @@ func (brk *broker) attachEnv(ctx context.Context, evt *BrokerEvent) error { case evt.Context.IsDeployment(): if env, err := brk.store.Environment(evt.Context.Environment); err != nil { - return kubefox.ErrNotFound(err) + return core.ErrNotFound(err) } else { evt.EnvVars = env.GetData().Vars evt.Adapters = env.GetData().Adapters @@ -468,7 +468,7 @@ func (brk *broker) attachEnv(ctx context.Context, evt *BrokerEvent) error { func (brk *broker) checkComponents(ctx context.Context, evt *BrokerEvent) error { if evt.Target == nil || evt.Target.Name == "" || evt.Context == nil { - return kubefox.ErrComponentMismatch() + return core.ErrComponentMismatch() } var ( @@ -482,12 +482,12 @@ func (brk *broker) checkComponents(ctx context.Context, evt *BrokerEvent) error appDep, err = brk.store.AppDeployment(evt.Context.Deployment) default: if k8s.IsNotFound(err) { - return kubefox.ErrNotFound(err) + return core.ErrNotFound(err) } - return kubefox.ErrUnexpected() + return core.ErrUnexpected() } if err != nil { - return kubefox.ErrNotFound(err) + return core.ErrNotFound(err) } // Check if target is part of deployment spec. @@ -499,12 +499,12 @@ func (brk *broker) checkComponents(ctx context.Context, evt *BrokerEvent) error switch { case depComp == nil && adapter == nil: if !brk.store.IsGenesisAdapter(ctx, evt.Target) { - return kubefox.ErrComponentMismatch(fmt.Errorf("target component not part of deployment")) + return core.ErrComponentMismatch(fmt.Errorf("target component not part of deployment")) } case depComp == nil && adapter != nil: if adapter.Type != api.ComponentTypeHTTP { - return kubefox.ErrUnsupportedAdapter(fmt.Errorf("adapter type '%s' is not supported", adapter.Type)) + return core.ErrUnsupportedAdapter(fmt.Errorf("adapter type '%s' is not supported", adapter.Type)) } evt.TargetAdapter = adapter @@ -512,14 +512,14 @@ func (brk *broker) checkComponents(ctx context.Context, evt *BrokerEvent) error evt.Target.Commit = depComp.Commit reg, found := brk.store.Component(ctx, evt.Target) if !found { - return kubefox.ErrNotFound(fmt.Errorf("target component not found")) + return core.ErrNotFound(fmt.Errorf("target component not found")) } if !reg.DefaultHandler { - return kubefox.ErrRouteNotFound(fmt.Errorf("target component does not have default handler")) + return core.ErrRouteNotFound(fmt.Errorf("target component does not have default handler")) } case evt.Target.Commit != depComp.Commit: - return kubefox.ErrComponentMismatch(fmt.Errorf("target component commit does not match deployment")) + return core.ErrComponentMismatch(fmt.Errorf("target component commit does not match deployment")) } // Check if source is part of deployment spec. @@ -530,16 +530,16 @@ func (brk *broker) checkComponents(ctx context.Context, evt *BrokerEvent) error switch { case depComp == nil && adapter == nil: if !brk.store.IsGenesisAdapter(ctx, evt.Source) { - return kubefox.ErrComponentMismatch(fmt.Errorf("source component not part of deployment")) + return core.ErrComponentMismatch(fmt.Errorf("source component not part of deployment")) } case depComp == nil && adapter != nil: if evt.Source.BrokerId != brk.comp.BrokerId { - return kubefox.ErrBrokerMismatch(fmt.Errorf("source component is adapter but does not match broker")) + return core.ErrBrokerMismatch(fmt.Errorf("source component is adapter but does not match broker")) } case evt.Source.Commit != depComp.Commit: - return kubefox.ErrComponentMismatch(fmt.Errorf("source component commit does not match deployment")) + return core.ErrComponentMismatch(fmt.Errorf("source component commit does not match deployment")) } return nil diff --git a/components/broker/engine/grpc_server.go b/components/broker/engine/grpc_server.go index a5a6cff..0a36324 100644 --- a/components/broker/engine/grpc_server.go +++ b/components/broker/engine/grpc_server.go @@ -10,7 +10,7 @@ import ( "github.com/xigxog/kubefox/api" "github.com/xigxog/kubefox/components/broker/config" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" "github.com/xigxog/kubefox/grpc" "github.com/xigxog/kubefox/logkf" @@ -43,7 +43,7 @@ func (srv *GRPCServer) Start(ctx context.Context) error { creds, err := credentials.NewServerTLSFromFile(api.PathTLSCert, api.PathTLSKey) if err != nil { - return kubefox.ErrUnexpected(err) + return core.ErrUnexpected(err) } srv.wrapped = gogrpc.NewServer( gogrpc.Creds(creds), @@ -55,7 +55,7 @@ func (srv *GRPCServer) Start(ctx context.Context) error { lis, err := net.Listen("tcp", config.GRPCSrvAddr) if err != nil { - return kubefox.ErrPortUnavailable(err) + return core.ErrPortUnavailable(err) } go func() { @@ -127,18 +127,18 @@ func (srv *GRPCServer) subscribe(stream grpc.Broker_SubscribeServer) (ReplicaSub var ( err error authToken string - comp *kubefox.Component + comp *core.Component sub ReplicaSubscription sendMutex sync.Mutex ) if authToken, comp, err = parseMD(stream); err != nil { - return nil, kubefox.ErrUnauthorized(err) + return nil, core.ErrUnauthorized(err) } subLog := srv.log.WithComponent(comp) if err := srv.brk.AuthorizeComponent(stream.Context(), comp, authToken); err != nil { - return nil, kubefox.ErrUnauthorized(err) + return nil, core.ErrUnauthorized(err) } sendEvt := func(evt *BrokerEvent) error { @@ -149,7 +149,7 @@ func (srv *GRPCServer) subscribe(stream grpc.Broker_SubscribeServer) (ReplicaSub subLog.WithEvent(evt.Event).Debug("send event") if err := stream.Send(evt.MatchedEvent()); err != nil { - return kubefox.ErrUnexpected(err) + return core.ErrUnexpected(err) } return nil } @@ -157,15 +157,15 @@ func (srv *GRPCServer) subscribe(stream grpc.Broker_SubscribeServer) (ReplicaSub // The first event sent should be the component spec. regEvt, err := stream.Recv() if err != nil { - return nil, kubefox.ErrUnauthorized(err) + return nil, core.ErrUnauthorized(err) } if regEvt.EventType() != api.EventTypeRegister { - return nil, kubefox.ErrUnauthorized(fmt.Errorf("expected event of type %s but got %s", + return nil, core.ErrUnauthorized(fmt.Errorf("expected event of type %s but got %s", api.EventTypeRegister, regEvt.Type)) } compSpec := &api.ComponentDefinition{} if err := regEvt.Bind(compSpec); err != nil { - return nil, kubefox.ErrUnauthorized(err) + return nil, core.ErrUnauthorized(err) } sub, err = srv.brk.Subscribe(stream.Context(), &SubscriptionConf{ @@ -179,7 +179,7 @@ func (srv *GRPCServer) subscribe(stream grpc.Broker_SubscribeServer) (ReplicaSub } regResp := &BrokerEvent{ - Event: kubefox.NewResp(kubefox.EventOpts{ + Event: core.NewResp(core.EventOpts{ Type: api.EventTypeRegister, Parent: regEvt, Source: srv.brk.Component(), @@ -195,7 +195,7 @@ func (srv *GRPCServer) subscribe(stream grpc.Broker_SubscribeServer) (ReplicaSub // This simply receives events from the gRPC stream and places them on a // channel. This makes checking for the context to be done easier by using a // select in the next code block. - recvCh := make(chan *kubefox.Event) + recvCh := make(chan *core.Event) go func() { for { if !sub.IsActive() { @@ -219,17 +219,17 @@ func (srv *GRPCServer) subscribe(stream grpc.Broker_SubscribeServer) (ReplicaSub if evt.Source == nil { evt.Source = comp } else if !evt.Source.Equal(comp) { - return sub, kubefox.ErrUnauthorized( + return sub, core.ErrUnauthorized( fmt.Errorf("event from '%s' claiming to be '%s'", comp.Key(), evt.Source.Key())) } evt.Source.BrokerId = srv.brk.Component().Id brkEvt := srv.brk.RecvEvent(evt, ReceiverGRPCServer) if err := <-brkEvt.Done(); err != nil && - evt.Category == kubefox.Category_REQUEST && - err.Code() != kubefox.CodeTimeout { + evt.Category == core.Category_REQUEST && + err.Code() != core.CodeTimeout { - errResp := kubefox.NewErr(err, kubefox.EventOpts{ + errResp := core.NewErr(err, core.EventOpts{ Parent: evt, Source: srv.brk.Component(), Target: evt.Source, @@ -246,7 +246,7 @@ func (srv *GRPCServer) subscribe(stream grpc.Broker_SubscribeServer) (ReplicaSub } } -func parseMD(stream grpc.Broker_SubscribeServer) (authToken string, comp *kubefox.Component, err error) { +func parseMD(stream grpc.Broker_SubscribeServer) (authToken string, comp *core.Component, err error) { md, found := metadata.FromIncomingContext(stream.Context()) if !found { err = fmt.Errorf("gRPC metadata missing") @@ -271,7 +271,7 @@ func parseMD(stream grpc.Broker_SubscribeServer) (authToken string, comp *kubefo return } - comp = &kubefox.Component{ + comp = &core.Component{ Id: compId, Commit: compCommit, Name: compName, diff --git a/components/broker/engine/http_client.go b/components/broker/engine/http_client.go index 694cb88..af99848 100644 --- a/components/broker/engine/http_client.go +++ b/components/broker/engine/http_client.go @@ -14,7 +14,7 @@ import ( "github.com/xigxog/kubefox/api" "github.com/xigxog/kubefox/api/kubernetes/v1alpha1" "github.com/xigxog/kubefox/components/broker/config" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" "github.com/xigxog/kubefox/logkf" ) @@ -88,7 +88,7 @@ func NewHTTPClient(brk Broker) *HTTPClient { func (c *HTTPClient) SendEvent(req *BrokerEvent) error { adapter := req.TargetAdapter if adapter == nil { - return kubefox.ErrInvalid(fmt.Errorf("adapter is missing")) + return core.ErrInvalid(fmt.Errorf("adapter is missing")) } ctx, cancel := context.WithTimeout(context.Background(), req.TTL()) @@ -97,11 +97,11 @@ func (c *HTTPClient) SendEvent(req *BrokerEvent) error { httpReq, err := req.Event.HTTPRequest(ctx) if err != nil { cancel() - return kubefox.ErrInvalid(err) + return core.ErrInvalid(err) } if adapterURL, err := url.Parse(adapter.URL.StringVal); err != nil { // success cancel() - return kubefox.ErrInvalid(fmt.Errorf("error parsing adapter url: %v", err)) + return core.ErrInvalid(fmt.Errorf("error parsing adapter url: %v", err)) } else { adapterURL = adapterURL.JoinPath(httpReq.URL.EscapedPath()) @@ -132,10 +132,10 @@ func (c *HTTPClient) SendEvent(req *BrokerEvent) error { httpReq.Header.Set(k, v.StringVal) } - resp := kubefox.NewResp(kubefox.EventOpts{ + resp := core.NewResp(core.EventOpts{ Parent: req.Event, Target: req.Source, - Source: &kubefox.Component{ + Source: &core.Component{ Name: req.Target.Name, Commit: c.brk.Component().Commit, Id: c.brk.Component().Id, @@ -148,13 +148,13 @@ func (c *HTTPClient) SendEvent(req *BrokerEvent) error { var reqErr error if httpResp, err := c.adapterClient(adapter).Do(httpReq); err != nil { - reqErr = kubefox.ErrUnexpected(fmt.Errorf("http request failed: %v", err)) + reqErr = core.ErrUnexpected(fmt.Errorf("http request failed: %v", err)) } else { reqErr = resp.SetHTTPResponse(httpResp, config.MaxEventSize) } if reqErr != nil { - if !errors.Is(reqErr, &kubefox.Err{}) { - reqErr = kubefox.ErrUnexpected(reqErr) + if !errors.Is(reqErr, &core.Err{}) { + reqErr = core.ErrUnexpected(reqErr) } resp.Type = string(api.EventTypeError) resp.SetJSON(reqErr) diff --git a/components/broker/engine/nats_client.go b/components/broker/engine/nats_client.go index 51e7072..5d2eb99 100644 --- a/components/broker/engine/nats_client.go +++ b/components/broker/engine/nats_client.go @@ -9,7 +9,7 @@ import ( "github.com/nats-io/nats.go" "github.com/xigxog/kubefox/api" "github.com/xigxog/kubefox/components/broker/config" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" "github.com/xigxog/kubefox/logkf" "google.golang.org/protobuf/proto" ) @@ -87,7 +87,7 @@ func (c *NATSClient) Close() { } } -func (c *NATSClient) Request(subject string, evt *kubefox.Event) error { +func (c *NATSClient) Request(subject string, evt *core.Event) error { msg, err := c.Msg(subject, evt) if err != nil { return err @@ -103,7 +103,7 @@ func (c *NATSClient) Request(subject string, evt *kubefox.Event) error { return nil } -func (c *NATSClient) Publish(subject string, evt *kubefox.Event) error { +func (c *NATSClient) Publish(subject string, evt *core.Event) error { msg, err := c.Msg(subject, evt) if err != nil { return err @@ -114,7 +114,7 @@ func (c *NATSClient) Publish(subject string, evt *kubefox.Event) error { return c.nc.PublishMsg(msg) } -func (c *NATSClient) Msg(subject string, evt *kubefox.Event) (*nats.Msg, error) { +func (c *NATSClient) Msg(subject string, evt *core.Event) (*nats.Msg, error) { dataBytes, err := proto.Marshal(evt) if err != nil { return nil, err @@ -129,8 +129,8 @@ func (c *NATSClient) Msg(subject string, evt *kubefox.Event) (*nats.Msg, error) // h.Set("ce_type", evt.Type) // h.Set("ce_time", time.Now().Format(time.RFC3339)) // h.Set("ce_source", fmt.Sprintf("kubefox:component:%s", evt.Source.Key())) - // h.Set("ce_dataschema", kubefox.DataSchemaKubefox) - // h.Set("ce_datacontenttype", kubefox.ContentTypeProtobuf) + // h.Set("ce_dataschema", core.DataSchemaKubefox) + // h.Set("ce_datacontenttype", core.ContentTypeProtobuf) // return &nats.Msg{ @@ -173,7 +173,7 @@ func (c *NATSClient) ConsumeEvents(ctx context.Context, name, subj string) error func (c *NATSClient) handleMsg(msg *nats.Msg) { c.log.Debugf("handling msg from nats") - evt := kubefox.NewEvent() + evt := core.NewEvent() if err := proto.Unmarshal(msg.Data, evt); err != nil { evtId := msg.Header.Get(CloudEventId) c.log.With(logkf.KeyEventId, evtId).Warn("message contains invalid event data: %v", err) diff --git a/components/broker/engine/store.go b/components/broker/engine/store.go index 707dac3..17bb37c 100644 --- a/components/broker/engine/store.go +++ b/components/broker/engine/store.go @@ -11,7 +11,7 @@ import ( "github.com/xigxog/kubefox/api/kubernetes/v1alpha1" "github.com/xigxog/kubefox/cache" "github.com/xigxog/kubefox/components/broker/config" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" "github.com/xigxog/kubefox/k8s" "github.com/xigxog/kubefox/logkf" "github.com/xigxog/kubefox/matcher" @@ -126,11 +126,11 @@ func (str *Store) Close() { str.cancel() } -func (str *Store) Component(ctx context.Context, comp *kubefox.Component) (*api.ComponentDefinition, bool) { +func (str *Store) Component(ctx context.Context, comp *core.Component) (*api.ComponentDefinition, bool) { return str.compCache.Get(comp.GroupKey()) } -func (str *Store) IsGenesisAdapter(ctx context.Context, comp *kubefox.Component) bool { +func (str *Store) IsGenesisAdapter(ctx context.Context, comp *core.Component) bool { r, found := str.Component(ctx, comp) if !found { return false @@ -224,7 +224,7 @@ func (str *Store) ReleaseMatcher(ctx context.Context) (*matcher.EventMatcher, er return str.relMatcher, nil } -func (str *Store) DeploymentMatcher(ctx context.Context, evtCtx *kubefox.EventContext) (*matcher.EventMatcher, error) { +func (str *Store) DeploymentMatcher(ctx context.Context, evtCtx *core.EventContext) (*matcher.EventMatcher, error) { dep, err := str.AppDeployment(evtCtx.Deployment) if err != nil { return nil, err @@ -308,7 +308,7 @@ func (str *Store) buildComponentCache(ctx context.Context) (cache.Cache[*api.Com for _, app := range list.Items { for compName, compSpec := range app.Spec.App.Components { - comp := &kubefox.Component{Name: compName, Commit: compSpec.Commit} + comp := &core.Component{Name: compName, Commit: compSpec.Commit} compCache.Set(comp.GroupKey(), &compSpec.ComponentDefinition) } } @@ -320,7 +320,7 @@ func (str *Store) buildComponentCache(ctx context.Context) (cache.Cache[*api.Com for _, c := range p.Status.Components { if c.Name == api.PlatformComponentHTTPSrv { - comp := &kubefox.Component{Name: c.Name, Commit: c.Commit} + comp := &core.Component{Name: c.Name, Commit: c.Commit} compCache.Set(comp.GroupKey(), &api.ComponentDefinition{ Type: api.ComponentTypeGenesis, }) @@ -368,7 +368,7 @@ func (str *Store) buildReleaseMatcher(ctx context.Context) (*matcher.EventMatche vars = env.GetData().Vars } - evtCtx := &kubefox.EventContext{Release: rel.Name} + evtCtx := &core.EventContext{Release: rel.Name} routes, err := str.buildRoutes(ctx, comps, vars, evtCtx) if err != nil { @@ -388,13 +388,13 @@ func (str *Store) buildRoutes( ctx context.Context, comps map[string]*v1alpha1.Component, vars map[string]*api.Val, - evtCtx *kubefox.EventContext) ([]*kubefox.Route, error) { + evtCtx *core.EventContext) ([]*core.Route, error) { - routes := make([]*kubefox.Route, 0) + routes := make([]*core.Route, 0) for compName, compSpec := range comps { - comp := &kubefox.Component{Name: compName, Commit: compSpec.Commit} + comp := &core.Component{Name: compName, Commit: compSpec.Commit} for _, r := range compSpec.Routes { - route := &kubefox.Route{ + route := &core.Route{ RouteSpec: r, Component: comp, EventContext: evtCtx, diff --git a/components/broker/engine/sub_mgr.go b/components/broker/engine/sub_mgr.go index dece0a5..890b8aa 100644 --- a/components/broker/engine/sub_mgr.go +++ b/components/broker/engine/sub_mgr.go @@ -7,15 +7,15 @@ import ( "sync/atomic" "github.com/xigxog/kubefox/api" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" "github.com/xigxog/kubefox/logkf" ) type SubscriptionMgr interface { Create(ctx context.Context, cfg *SubscriptionConf, recvCh chan *BrokerEvent) (ReplicaSubscription, GroupSubscription, error) - Subscription(comp *kubefox.Component) (Subscription, bool) - ReplicaSubscription(comp *kubefox.Component) (ReplicaSubscription, bool) - GroupSubscription(comp *kubefox.Component) (GroupSubscription, bool) + Subscription(comp *core.Component) (Subscription, bool) + ReplicaSubscription(comp *core.Component) (ReplicaSubscription, bool) + GroupSubscription(comp *core.Component) (GroupSubscription, bool) Subscriptions() []ReplicaSubscription Close() } @@ -31,7 +31,7 @@ type GroupSubscription interface { } type SubscriptionConf struct { - Component *kubefox.Component + Component *core.Component ComponentSpec *api.ComponentDefinition SendFunc SendEvent EnableGroup bool @@ -39,7 +39,7 @@ type SubscriptionConf struct { type ReplicaSubscription interface { Subscription - Component() *kubefox.Component + Component() *core.Component ComponentSpec() *api.ComponentDefinition IsGroupEnabled() bool Cancel(err error) @@ -64,7 +64,7 @@ type subscriptionGroup struct { } type subscription struct { - comp *kubefox.Component + comp *core.Component compSpec *api.ComponentDefinition mgr *subscriptionMgr @@ -145,7 +145,7 @@ func (mgr *subscriptionMgr) Create(ctx context.Context, cfg *SubscriptionConf, r return sub, grpSub, nil } -func (mgr *subscriptionMgr) Subscription(comp *kubefox.Component) (Subscription, bool) { +func (mgr *subscriptionMgr) Subscription(comp *core.Component) (Subscription, bool) { if sub, found := mgr.ReplicaSubscription(comp); found { return sub, true } @@ -153,7 +153,7 @@ func (mgr *subscriptionMgr) Subscription(comp *kubefox.Component) (Subscription, return mgr.GroupSubscription(comp) } -func (mgr *subscriptionMgr) ReplicaSubscription(comp *kubefox.Component) (ReplicaSubscription, bool) { +func (mgr *subscriptionMgr) ReplicaSubscription(comp *core.Component) (ReplicaSubscription, bool) { mgr.mutex.RLock() defer mgr.mutex.RUnlock() @@ -165,7 +165,7 @@ func (mgr *subscriptionMgr) ReplicaSubscription(comp *kubefox.Component) (Replic return sub, true } -func (mgr *subscriptionMgr) GroupSubscription(comp *kubefox.Component) (GroupSubscription, bool) { +func (mgr *subscriptionMgr) GroupSubscription(comp *core.Component) (GroupSubscription, bool) { mgr.mutex.RLock() defer mgr.mutex.RUnlock() @@ -247,7 +247,7 @@ func (sub *subscription) IsActive() bool { return !sub.canceled.Load() } -func (sub *subscription) Component() *kubefox.Component { +func (sub *subscription) Component() *core.Component { return sub.comp } @@ -293,7 +293,7 @@ func (sub *subscription) processSendChan() { } } -func checkComp(comp *kubefox.Component) error { +func checkComp(comp *core.Component) error { if comp.Name == "" { return fmt.Errorf("component is missing name") } diff --git a/components/broker/engine/types.go b/components/broker/engine/types.go index 16f8592..b50e19b 100644 --- a/components/broker/engine/types.go +++ b/components/broker/engine/types.go @@ -6,7 +6,7 @@ import ( "github.com/xigxog/kubefox/api" "github.com/xigxog/kubefox/api/kubernetes/v1alpha1" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" "google.golang.org/protobuf/types/known/structpb" ) @@ -22,7 +22,7 @@ const ( type SendEvent func(*BrokerEvent) error type BrokerEvent struct { - *kubefox.Event + *core.Event EnvVars map[string]*api.Val RouteId int64 @@ -32,7 +32,7 @@ type BrokerEvent struct { Receiver Receiver ReceivedAt time.Time - DoneCh chan *kubefox.Err + DoneCh chan *core.Err tick time.Time mutex sync.Mutex @@ -52,11 +52,11 @@ func (evt *BrokerEvent) TTL() time.Duration { return evt.Event.TTL() } -func (evt *BrokerEvent) Done() chan *kubefox.Err { +func (evt *BrokerEvent) Done() chan *core.Err { return evt.DoneCh } -func (evt *BrokerEvent) MatchedEvent() *kubefox.MatchedEvent { +func (evt *BrokerEvent) MatchedEvent() *core.MatchedEvent { var env map[string]*structpb.Value if evt.EnvVars != nil { env = make(map[string]*structpb.Value, len(evt.EnvVars)) @@ -65,7 +65,7 @@ func (evt *BrokerEvent) MatchedEvent() *kubefox.MatchedEvent { } } - return &kubefox.MatchedEvent{ + return &core.MatchedEvent{ Event: evt.Event, RouteId: evt.RouteId, Env: env, diff --git a/components/broker/telemetry/client.go b/components/broker/telemetry/client.go index 9caaad6..3c6c9f7 100644 --- a/components/broker/telemetry/client.go +++ b/components/broker/telemetry/client.go @@ -50,7 +50,7 @@ func (c *Client) Start(ctx context.Context) error { // tlsCfg, err := cl.tls() // if err != nil { // cl.log.Error(err) - // os.Exit(kubefox.TelemetryErrorCode) + // os.Exit(core.TelemetryErrorCode) // } res := resource.NewWithAttributes( diff --git a/components/broker/telemetry/tracer.go b/components/broker/telemetry/tracer.go index 6c914e4..3a5eb29 100644 --- a/components/broker/telemetry/tracer.go +++ b/components/broker/telemetry/tracer.go @@ -6,7 +6,8 @@ import ( "time" "github.com/xigxog/kubefox/api" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -17,16 +18,16 @@ var ( ) type Span interface { - End(*kubefox.Event) + End(*core.Event) } type span struct { cancel context.CancelFunc otelSpan trace.Span - req *kubefox.Event + req *core.Event } -func NewSpan(ctx context.Context, timeout time.Duration, req *kubefox.Event) (context.Context, Span) { +func NewSpan(ctx context.Context, timeout time.Duration, req *core.Event) (context.Context, Span) { ctx, cancel := context.WithTimeout(ctx, timeout) typ := api.EventTypeUnknown @@ -59,7 +60,7 @@ func NewSpan(ctx context.Context, timeout time.Duration, req *kubefox.Event) (co } } -func (sp *span) End(resp *kubefox.Event) { +func (sp *span) End(resp *core.Event) { if resp != nil { sp.otelSpan.SetAttributes(traceAttrs(resp)...) resp.SetTraceId(sp.otelSpan.SpanContext().TraceID().String()) @@ -77,7 +78,7 @@ func (sp *span) End(resp *kubefox.Event) { sp.cancel() } -func traceAttrs(req *kubefox.Event) []attribute.KeyValue { +func traceAttrs(req *core.Event) []attribute.KeyValue { attrs := []attribute.KeyValue{} if req != nil && req.Type != "" { diff --git a/components/httpsrv/main.go b/components/httpsrv/main.go index 1ad899b..4cc4f0f 100644 --- a/components/httpsrv/main.go +++ b/components/httpsrv/main.go @@ -9,7 +9,7 @@ import ( "github.com/xigxog/kubefox/api" "github.com/xigxog/kubefox/build" "github.com/xigxog/kubefox/components/httpsrv/server" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" "github.com/xigxog/kubefox/logkf" "github.com/xigxog/kubefox/utils" ) @@ -36,7 +36,7 @@ func main() { os.Exit(1) } - server.Component.Id = kubefox.GenerateId() + server.Component.Id = core.GenerateId() server.Spec = &api.ComponentDefinition{ Type: api.ComponentTypeGenesis, diff --git a/components/httpsrv/server/server.go b/components/httpsrv/server/server.go index 5e32727..261e2eb 100644 --- a/components/httpsrv/server/server.go +++ b/components/httpsrv/server/server.go @@ -10,7 +10,7 @@ import ( "time" "github.com/xigxog/kubefox/api" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" "github.com/xigxog/kubefox/grpc" "github.com/xigxog/kubefox/logkf" ) @@ -106,12 +106,12 @@ func (srv *Server) Shutdown() { } func (srv *Server) ServeHTTP(resWriter http.ResponseWriter, httpReq *http.Request) { - ctx, cancel := context.WithTimeoutCause(httpReq.Context(), EventTimeout, kubefox.ErrTimeout()) + ctx, cancel := context.WithTimeoutCause(httpReq.Context(), EventTimeout, core.ErrTimeout()) defer cancel() resWriter.Header().Set(api.HeaderAdapter, Component.Key()) - req := kubefox.NewReq(kubefox.EventOpts{ + req := core.NewReq(core.EventOpts{ Source: Component, }) req.SetTTL(EventTimeout) @@ -153,7 +153,7 @@ func writeError(resWriter http.ResponseWriter, err error, log *logkf.Logger) { log.Debugf("event failed: %v", err) statusCode := http.StatusInternalServerError - kfErr := &kubefox.Err{} + kfErr := &core.Err{} if ok := errors.As(err, &kfErr); ok { statusCode = kfErr.HTTPCode() } diff --git a/components/httpsrv/server/vars.go b/components/httpsrv/server/vars.go index a999280..300f6e3 100644 --- a/components/httpsrv/server/vars.go +++ b/components/httpsrv/server/vars.go @@ -4,7 +4,7 @@ import ( "time" "github.com/xigxog/kubefox/api" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" ) var ( @@ -15,6 +15,6 @@ var ( ) var ( - Component = new(kubefox.Component) + Component = new(core.Component) Spec = new(api.ComponentDefinition) ) diff --git a/components/operator/controller/client.go b/components/operator/controller/client.go index 5b0f406..4cae741 100644 --- a/components/operator/controller/client.go +++ b/components/operator/controller/client.go @@ -8,7 +8,7 @@ import ( "github.com/xigxog/kubefox/api/kubernetes/v1alpha1" "github.com/xigxog/kubefox/components/operator/templates" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" "github.com/xigxog/kubefox/k8s" "github.com/xigxog/kubefox/logkf" v1 "k8s.io/api/core/v1" @@ -61,7 +61,7 @@ func (r *Client) GetPlatform(ctx context.Context, namespace string) (*v1alpha1.P return nil, fmt.Errorf("unable to fetch namespace: %w", err) } if ns.Status.Phase == v1.NamespaceTerminating { - return nil, kubefox.ErrNotFound() + return nil, core.ErrNotFound() } p := &v1alpha1.Platform{} @@ -71,11 +71,11 @@ func (r *Client) GetPlatform(ctx context.Context, namespace string) (*v1alpha1.P } switch c := len(l.Items); c { case 0: - return nil, kubefox.ErrNotFound() + return nil, core.ErrNotFound() case 1: p = &l.Items[0] default: - return nil, kubefox.ErrInvalid(fmt.Errorf("too many Platforms")) + return nil, core.ErrInvalid(fmt.Errorf("too many Platforms")) } return p, nil diff --git a/examples/hello-world/kubefox/components/backend/main.go b/examples/hello-world/kubefox/components/backend/main.go index 7e852e7..f1ad9ef 100644 --- a/examples/hello-world/kubefox/components/backend/main.go +++ b/examples/hello-world/kubefox/components/backend/main.go @@ -12,7 +12,7 @@ var ( func main() { k := kit.New() - who = k.EnvVar("who", env.Required()) + who = k.EnvVar("who", env.Required) k.Default(sayWho) k.Start() diff --git a/examples/hello-world/kubefox/components/frontend/main.go b/examples/hello-world/kubefox/components/frontend/main.go index 782c25d..e9ca8e7 100644 --- a/examples/hello-world/kubefox/components/frontend/main.go +++ b/examples/hello-world/kubefox/components/frontend/main.go @@ -16,7 +16,7 @@ func main() { backend = k.Component("backend") - k.EnvVar("subPath", env.Unique()) + k.EnvVar("subPath", env.Unique) k.Route("Path(`/{{.Env.subPath}}/hello`)", sayHello) k.Start() diff --git a/grpc/client.go b/grpc/client.go index 73f0569..4c78070 100644 --- a/grpc/client.go +++ b/grpc/client.go @@ -13,7 +13,7 @@ import ( "time" "github.com/xigxog/kubefox/api" - kubefox "github.com/xigxog/kubefox/core" + core "github.com/xigxog/kubefox/core" "github.com/xigxog/kubefox/logkf" gogrpc "google.golang.org/grpc" @@ -21,7 +21,7 @@ import ( ) type ClientOpts struct { - Component *kubefox.Component + Component *core.Component BrokerAddr string HealthSrvAddr string } @@ -45,13 +45,13 @@ type Client struct { } type ComponentEvent struct { - *kubefox.MatchedEvent + *core.MatchedEvent ReceivedAt time.Time } type ActiveReq struct { - respCh chan *kubefox.Event + respCh chan *core.Event expiration time.Time } @@ -126,7 +126,7 @@ func (c *Client) run(spec *api.ComponentDefinition, retry int) (int, error) { return retry + 1, fmt.Errorf("subscribing to broker failed: %v", err) } - evt := kubefox.NewReq(kubefox.EventOpts{ + evt := core.NewReq(core.EventOpts{ Type: api.EventTypeRegister, Source: c.Component, }) @@ -157,10 +157,10 @@ func (c *Client) run(spec *api.ComponentDefinition, retry int) (int, error) { } switch evt.Event.Category { - case kubefox.Category_REQUEST: + case core.Category_REQUEST: go c.recvReq(evt) - case kubefox.Category_RESPONSE: + case core.Category_RESPONSE: go c.recvResp(evt.Event) default: @@ -177,7 +177,7 @@ func (c *Client) Req() chan *ComponentEvent { return c.recvCh } -func (c *Client) SendReq(ctx context.Context, req *kubefox.Event, start time.Time) (*kubefox.Event, error) { +func (c *Client) SendReq(ctx context.Context, req *core.Event, start time.Time) (*core.Event, error) { respCh, err := c.SendReqChan(req, start) if err != nil { return nil, err @@ -188,17 +188,17 @@ func (c *Client) SendReq(ctx context.Context, req *kubefox.Event, start time.Tim return resp, resp.Err() case <-ctx.Done(): - return nil, kubefox.ErrTimeout(err) + return nil, core.ErrTimeout(err) case <-c.brk.Context().Done(): - return nil, kubefox.ErrBrokerUnavailable(err) + return nil, core.ErrBrokerUnavailable(err) } } -func (c *Client) SendReqChan(req *kubefox.Event, start time.Time) (chan *kubefox.Event, error) { +func (c *Client) SendReqChan(req *core.Event, start time.Time) (chan *core.Event, error) { c.log.WithEvent(req).Debug("send request") - respCh := make(chan *kubefox.Event) + respCh := make(chan *core.Event) c.reqMapMutex.Lock() c.reqMap[req.Id] = &ActiveReq{ respCh: respCh, @@ -213,17 +213,17 @@ func (c *Client) SendReqChan(req *kubefox.Event, start time.Time) (chan *kubefox return respCh, nil } -func (c *Client) SendResp(resp *kubefox.Event, start time.Time) error { +func (c *Client) SendResp(resp *core.Event, start time.Time) error { c.log.WithEvent(resp).Debug("send response") return c.send(resp, start) } -func (c *Client) recvReq(req *kubefox.MatchedEvent) { +func (c *Client) recvReq(req *core.MatchedEvent) { c.log.WithEvent(req.Event).Debug("receive request") c.recvCh <- &ComponentEvent{MatchedEvent: req, ReceivedAt: time.Now()} } -func (c *Client) recvResp(resp *kubefox.Event) { +func (c *Client) recvResp(resp *core.Event) { log := c.log.WithEvent(resp) log.Debug("receive response") @@ -240,14 +240,14 @@ func (c *Client) recvResp(resp *kubefox.Event) { respCh.respCh <- resp } -func (c *Client) send(evt *kubefox.Event, start time.Time) error { +func (c *Client) send(evt *core.Event, start time.Time) error { // Need to protect the stream from being called by multiple threads. c.sendMutex.Lock() defer c.sendMutex.Unlock() evt.ReduceTTL(start) if evt.TTL() < 0 { - return kubefox.ErrTimeout() + return core.ErrTimeout() } return c.brk.Send(evt) diff --git a/k8s/utils.go b/k8s/utils.go index c999a68..f438af6 100644 --- a/k8s/utils.go +++ b/k8s/utils.go @@ -5,7 +5,7 @@ import ( "fmt" "github.com/xigxog/kubefox/api" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" "github.com/xigxog/kubefox/utils" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -93,7 +93,7 @@ func IsNotFound(err error) bool { if err == nil { return false } - return apierrors.IsNotFound(err) || errors.Is(err, kubefox.ErrNotFound()) + return apierrors.IsNotFound(err) || errors.Is(err, core.ErrNotFound()) } func IsAlreadyExists(err error) bool { diff --git a/kit/env/env.go b/kit/env/env.go index 98e289a..b0a7e50 100644 --- a/kit/env/env.go +++ b/kit/env/env.go @@ -21,39 +21,29 @@ func (v *Var) Type() api.EnvVarType { return v.typ } -func Array() VarOption { - return func(evs *api.EnvVarSchema) { +var ( + Array = func(evs *api.EnvVarSchema) { evs.Type = api.EnvVarTypeArray } -} -func Bool() VarOption { - return func(evs *api.EnvVarSchema) { + Bool = func(evs *api.EnvVarSchema) { evs.Type = api.EnvVarTypeBoolean } -} -func Number() VarOption { - return func(evs *api.EnvVarSchema) { + Number = func(evs *api.EnvVarSchema) { evs.Type = api.EnvVarTypeNumber } -} -func String() VarOption { - return func(evs *api.EnvVarSchema) { + String = func(evs *api.EnvVarSchema) { evs.Type = api.EnvVarTypeString } -} -func Required() VarOption { - return func(evs *api.EnvVarSchema) { + Required = func(evs *api.EnvVarSchema) { evs.Required = true } -} -func Unique() VarOption { - return func(evs *api.EnvVarSchema) { + Unique = func(evs *api.EnvVarSchema) { evs.Required = true evs.Unique = true } -} +) diff --git a/kit/graphql/graphql.go b/kit/graphql/graphql.go index 3be78d6..58d7326 100644 --- a/kit/graphql/graphql.go +++ b/kit/graphql/graphql.go @@ -14,12 +14,6 @@ type Client struct { } func New(ktx kit.Kontext, dependency kit.Dependency) *Client { - // c := graphql.NewClient("http://hasura-graphql-engine.default:8080/v1/graphql", &http.Client{ - // Transport: ktx.Transport(dependency), - // }).WithRequestModifier(func(r *http.Request) { - // r.Header.Set("x-hasura-admin-secret", "hasura") - // }) - return &Client{ ktx: ktx, wrapped: graphql.NewClient("", &http.Client{ diff --git a/kit/kit.go b/kit/kit.go index 1ad715d..ccca0c4 100644 --- a/kit/kit.go +++ b/kit/kit.go @@ -12,7 +12,7 @@ import ( "github.com/xigxog/kubefox/api" "github.com/xigxog/kubefox/build" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" "github.com/xigxog/kubefox/grpc" "github.com/xigxog/kubefox/kit/env" "github.com/xigxog/kubefox/logkf" @@ -51,11 +51,10 @@ func New() Kit { }, } - comp := &kubefox.Component{Id: kubefox.GenerateId()} + comp := &core.Component{Id: core.GenerateId()} var help bool var brokerAddr, healthAddr, logFormat, logLevel string - //-tls-skip-verify flag.StringVar(&comp.Name, "name", "", "Component name. (required)") flag.StringVar(&comp.Commit, "commit", "", "Commit the Component was built from. (required)") flag.StringVar(&brokerAddr, "broker-addr", "127.0.0.1:6060", "Address of the Broker gRPC server.") @@ -70,7 +69,7 @@ func New() Kit { if help { fmt.Fprintf(flag.CommandLine.Output(), ` -Flags can be set using names below or the environment variable listed. +Flags can be set using names below. Flags: `) @@ -94,7 +93,6 @@ Flags: logkf.Global = logkf. BuildLoggerOrDie(logFormat, logLevel). WithComponent(comp) - defer logkf.Global.Sync() svc.log = logkf.Global svc.log.DebugInterface("build info:", build.Info) @@ -105,7 +103,7 @@ Flags: HealthSrvAddr: healthAddr, }) - svc.log.Info("🦊 kit created") + svc.log.Info("kit created 🦊") return svc } @@ -181,20 +179,20 @@ func (svc *kit) dependency(name string, typ api.ComponentType) Dependency { } func (svc *kit) Start() { + defer logkf.Global.Sync() + + if err := svc.start(); err != nil { + os.Exit(1) + } +} + +func (svc *kit) start() (err error) { if svc.export { c, _ := json.MarshalIndent(svc.spec, "", " ") fmt.Println(string(c)) os.Exit(0) } - var err error - defer func() { - if err != nil { - logkf.Global.Sync() - os.Exit(1) - } - }() - svc.log.DebugInterface("component spec:", svc.spec) if err = svc.brk.StartHealthSrv(); err != nil { @@ -216,15 +214,16 @@ func (svc *kit) Start() { select { case req := <-svc.brk.Req(): svc.recvReq(req) - case err = <-svc.brk.Err(): + case err = <-svc.brk.Err(): // Sets start() err svc.log.Errorf("broker error: %v", err) return } } }() } - wg.Wait() + + return } func (svc *kit) recvReq(req *grpc.ComponentEvent) { @@ -248,7 +247,7 @@ func (svc *kit) recvReq(req *grpc.ComponentEvent) { switch { case req.RouteId == api.DefaultRouteId: if svc.defHandler == nil { - err = kubefox.ErrNotFound(fmt.Errorf("default handler not found")) + err = core.ErrNotFound(fmt.Errorf("default handler not found")) } else { err = svc.defHandler(ktx) } @@ -257,13 +256,13 @@ func (svc *kit) recvReq(req *grpc.ComponentEvent) { err = svc.routes[req.RouteId].handler(ktx) default: - err = kubefox.ErrNotFound(fmt.Errorf("invalid route id %d", req.RouteId)) + err = core.ErrNotFound(fmt.Errorf("invalid route id %d", req.RouteId)) } if err != nil { log.Error(err) - errEvt := kubefox.NewErr(err, kubefox.EventOpts{}) + errEvt := core.NewErr(err, core.EventOpts{}) if err := ktx.ForwardResp(errEvt).Send(); err != nil { log.Error(err) } diff --git a/kit/kontext.go b/kit/kontext.go index cfd5e6d..a987792 100644 --- a/kit/kontext.go +++ b/kit/kontext.go @@ -9,14 +9,14 @@ import ( "time" "github.com/xigxog/kubefox/api" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" "github.com/xigxog/kubefox/logkf" "google.golang.org/protobuf/types/known/structpb" ) type kontext struct { - *kubefox.Event + *core.Event kit *kit env map[string]*structpb.Value @@ -27,13 +27,13 @@ type kontext struct { } type respKontext struct { - *kubefox.Event + *core.Event ktx *kontext } type reqKontext struct { - *kubefox.Event + *core.Event ktx *kontext } @@ -71,9 +71,9 @@ func (k *kontext) EnvDefV(v EnvVar, def *api.Val) *api.Val { } } -func (k *kontext) ForwardResp(resp kubefox.EventReader) Resp { +func (k *kontext) ForwardResp(resp core.EventReader) Resp { return &respKontext{ - Event: kubefox.CloneToResp(resp.(*kubefox.Event), kubefox.EventOpts{ + Event: core.CloneToResp(resp.(*core.Event), core.EventOpts{ Parent: k.Event, Source: k.kit.brk.Component, Target: k.Event.Source, @@ -84,7 +84,7 @@ func (k *kontext) ForwardResp(resp kubefox.EventReader) Resp { func (k *kontext) Resp() Resp { return &respKontext{ - Event: kubefox.NewResp(kubefox.EventOpts{ + Event: core.NewResp(core.EventOpts{ Parent: k.Event, Source: k.kit.brk.Component, Target: k.Event.Source, @@ -95,11 +95,11 @@ func (k *kontext) Resp() Resp { func (k *kontext) Req(c Dependency) Req { return &reqKontext{ - Event: kubefox.NewReq(kubefox.EventOpts{ + Event: core.NewReq(core.EventOpts{ Type: c.EventType(), Parent: k.Event, Source: k.kit.brk.Component, - Target: &kubefox.Component{Name: c.Name()}, + Target: &core.Component{Name: c.Name()}, }), ktx: k, } @@ -107,11 +107,11 @@ func (k *kontext) Req(c Dependency) Req { func (k *kontext) Forward(c Dependency) Req { return &reqKontext{ - Event: kubefox.CloneToReq(k.Event, kubefox.EventOpts{ + Event: core.CloneToReq(k.Event, core.EventOpts{ Type: c.EventType(), Parent: k.Event, Source: k.kit.brk.Component, - Target: &kubefox.Component{Name: c.Name()}, + Target: &core.Component{Name: c.Name()}, }), ktx: k, } @@ -126,11 +126,11 @@ func (k *kontext) HTTP(c Dependency) *http.Client { func (k *kontext) Transport(c Dependency) http.RoundTripper { return &EventRoundTripper{ req: &reqKontext{ - Event: kubefox.NewReq(kubefox.EventOpts{ + Event: core.NewReq(core.EventOpts{ Type: c.EventType(), Parent: k.Event, Source: k.kit.brk.Component, - Target: &kubefox.Component{Name: c.Name()}, + Target: &core.Component{Name: c.Name()}, }), ktx: k, }, @@ -189,17 +189,17 @@ func (resp *respKontext) Send() error { return resp.ktx.kit.brk.SendResp(resp.Event, resp.ktx.start) } -func (req *reqKontext) SendStr(val string) (kubefox.EventReader, error) { +func (req *reqKontext) SendStr(val string) (core.EventReader, error) { c := fmt.Sprintf("%s; %s", api.ContentTypePlain, api.CharSetUTF8) return req.SendBytes(c, []byte(val)) } -func (req *reqKontext) SendHTML(val string) (kubefox.EventReader, error) { +func (req *reqKontext) SendHTML(val string) (core.EventReader, error) { c := fmt.Sprintf("%s; %s", api.ContentTypeHTML, api.CharSetUTF8) return req.SendBytes(c, []byte(val)) } -func (req *reqKontext) SendJSON(val any) (kubefox.EventReader, error) { +func (req *reqKontext) SendJSON(val any) (core.EventReader, error) { if err := req.SetJSON(val); err != nil { return nil, err } @@ -207,7 +207,7 @@ func (req *reqKontext) SendJSON(val any) (kubefox.EventReader, error) { return req.Send() } -func (req *reqKontext) SendReader(contentType string, reader io.Reader) (kubefox.EventReader, error) { +func (req *reqKontext) SendReader(contentType string, reader io.Reader) (core.EventReader, error) { bytes, err := io.ReadAll(reader) if err != nil { return nil, err @@ -216,13 +216,13 @@ func (req *reqKontext) SendReader(contentType string, reader io.Reader) (kubefox return req.SendBytes(contentType, bytes) } -func (req *reqKontext) SendBytes(contentType string, b []byte) (kubefox.EventReader, error) { +func (req *reqKontext) SendBytes(contentType string, b []byte) (core.EventReader, error) { req.Event.ContentType = contentType req.Event.Content = b return req.Send() } -func (req *reqKontext) Send() (kubefox.EventReader, error) { +func (req *reqKontext) Send() (core.EventReader, error) { return req.ktx.kit.brk.SendReq(req.ktx.ctx, req.Event, req.ktx.start) } diff --git a/kit/types.go b/kit/types.go index 2b5bc87..3dc3e2f 100644 --- a/kit/types.go +++ b/kit/types.go @@ -6,7 +6,7 @@ import ( "net/http" "github.com/xigxog/kubefox/api" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" "github.com/xigxog/kubefox/kit/env" "github.com/xigxog/kubefox/logkf" ) @@ -31,7 +31,7 @@ type Kit interface { } type Kontext interface { - kubefox.EventReader + core.EventReader Env(v EnvVar) string EnvV(v EnvVar) *api.Val @@ -39,7 +39,7 @@ type Kontext interface { EnvDefV(v EnvVar, def *api.Val) *api.Val Resp() Resp - ForwardResp(resp kubefox.EventReader) Resp + ForwardResp(resp core.EventReader) Resp Req(c Dependency) Req Forward(c Dependency) Req @@ -51,18 +51,18 @@ type Kontext interface { } type Req interface { - kubefox.EventWriter - - SendStr(s string) (kubefox.EventReader, error) - SendHTML(h string) (kubefox.EventReader, error) - SendJSON(v any) (kubefox.EventReader, error) - SendBytes(contentType string, b []byte) (kubefox.EventReader, error) - SendReader(contentType string, reader io.Reader) (kubefox.EventReader, error) - Send() (kubefox.EventReader, error) + core.EventWriter + + SendStr(s string) (core.EventReader, error) + SendHTML(h string) (core.EventReader, error) + SendJSON(v any) (core.EventReader, error) + SendBytes(contentType string, b []byte) (core.EventReader, error) + SendReader(contentType string, reader io.Reader) (core.EventReader, error) + Send() (core.EventReader, error) } type Resp interface { - kubefox.EventWriter + core.EventWriter SendStr(s string) error SendHTML(h string) error diff --git a/logkf/logger.go b/logkf/logger.go index 8cc0b72..5d7aac6 100644 --- a/logkf/logger.go +++ b/logkf/logger.go @@ -6,7 +6,7 @@ import ( "fmt" "os" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -104,7 +104,7 @@ func BuildLogger(format, level string) (*Logger, error) { // ensures log messages are shown to be from caller instead of this logger z = z.WithOptions(zap.AddCallerSkip(skip)) - kubefox.RecordStackTraces = (level == "debug") + core.RecordStackTraces = (level == "debug") return &Logger{wrapped: z.Sugar()}, nil } @@ -164,7 +164,7 @@ func (log *Logger) WithPlatformComponent(val string) *Logger { return log.With(KeyPlatformComponent, val) } -func (log *Logger) WithComponent(comp *kubefox.Component) *Logger { +func (log *Logger) WithComponent(comp *core.Component) *Logger { if comp == nil { return log } @@ -176,7 +176,7 @@ func (log *Logger) WithComponent(comp *kubefox.Component) *Logger { ) } -func (log *Logger) WithSource(src *kubefox.Component) *Logger { +func (log *Logger) WithSource(src *core.Component) *Logger { if src == nil { return log } @@ -188,7 +188,7 @@ func (log *Logger) WithSource(src *kubefox.Component) *Logger { ) } -func (log *Logger) WithTarget(tgt *kubefox.Component) *Logger { +func (log *Logger) WithTarget(tgt *core.Component) *Logger { if tgt == nil { return log } @@ -200,7 +200,7 @@ func (log *Logger) WithTarget(tgt *kubefox.Component) *Logger { ) } -func (log *Logger) WithEvent(evt *kubefox.Event) *Logger { +func (log *Logger) WithEvent(evt *core.Event) *Logger { if evt == nil { return log } diff --git a/matcher/event_matcher.go b/matcher/event_matcher.go index 7fc2f07..c72c91b 100644 --- a/matcher/event_matcher.go +++ b/matcher/event_matcher.go @@ -10,7 +10,7 @@ import ( "github.com/vulcand/predicate" "github.com/xigxog/kubefox/api" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" ) type param struct { @@ -19,14 +19,14 @@ type param struct { } type EventMatcher struct { - routes []*kubefox.Route + routes []*core.Route parser predicate.Parser mutex sync.RWMutex } func New() *EventMatcher { m := &EventMatcher{ - routes: make([]*kubefox.Route, 0), + routes: make([]*core.Route, 0), } // TODO add various event criteria for route predicates: @@ -61,7 +61,7 @@ func (m *EventMatcher) IsEmpty() bool { return len(m.routes) == 0 } -func (m *EventMatcher) AddRoutes(routes []*kubefox.Route) error { +func (m *EventMatcher) AddRoutes(routes []*core.Route) error { for _, r := range routes { if r.ResolvedRule == "" { return fmt.Errorf("route %d has unresolved rule", r.Id) @@ -75,7 +75,7 @@ func (m *EventMatcher) AddRoutes(routes []*kubefox.Route) error { r.ParseErr = fmt.Errorf("invalid route '%s': parsing '%s' failed; %w", r.Rule, r.ResolvedRule, err) continue } - r.Predicate = parsed.(kubefox.EventPredicate) + r.Predicate = parsed.(core.EventPredicate) m.mutex.Lock() m.routes = append(m.routes, r) @@ -90,7 +90,7 @@ func (m *EventMatcher) AddRoutes(routes []*kubefox.Route) error { return nil } -func (m *EventMatcher) Match(evt *kubefox.Event) (*kubefox.Route, bool) { +func (m *EventMatcher) Match(evt *core.Event) (*core.Route, bool) { m.mutex.RLock() defer m.mutex.RUnlock() @@ -113,13 +113,13 @@ func (m *EventMatcher) Match(evt *kubefox.Event) (*kubefox.Route, bool) { return nil, false } -func (m *EventMatcher) all() kubefox.EventPredicate { - return func(e *kubefox.Event) bool { +func (m *EventMatcher) all() core.EventPredicate { + return func(e *core.Event) bool { return true } } -func (m *EventMatcher) header(key, val string) (kubefox.EventPredicate, error) { +func (m *EventMatcher) header(key, val string) (core.EventPredicate, error) { if key == "" { return nil, fmt.Errorf("header key must be provided") } @@ -130,24 +130,24 @@ func (m *EventMatcher) header(key, val string) (kubefox.EventPredicate, error) { return nil, fmt.Errorf("invalid regex of header predicate %s: %w", val, err) } - return func(e *kubefox.Event) bool { + return func(e *core.Event) bool { return matchMap(key, val, regex, e.ValueMap(api.ValKeyHeader)) }, nil } -func (m *EventMatcher) host(s string) (kubefox.EventPredicate, error) { +func (m *EventMatcher) host(s string) (core.EventPredicate, error) { parts, params, err := split(s, '.') if err != nil { return nil, err } - return func(e *kubefox.Event) bool { + return func(e *core.Event) bool { return matchParts(api.ValKeyHost, ".", parts, params, e, false) }, nil } -func (m *EventMatcher) method(s ...string) kubefox.EventPredicate { - return func(e *kubefox.Event) bool { +func (m *EventMatcher) method(s ...string) core.EventPredicate { + return func(e *core.Event) bool { m := e.Value(api.ValKeyMethod) for _, v := range s { if strings.EqualFold(m, v) { @@ -158,29 +158,29 @@ func (m *EventMatcher) method(s ...string) kubefox.EventPredicate { } } -func (m *EventMatcher) path(s string) (kubefox.EventPredicate, error) { +func (m *EventMatcher) path(s string) (core.EventPredicate, error) { parts, params, err := split(s, '/') if err != nil { return nil, err } - return func(e *kubefox.Event) bool { + return func(e *core.Event) bool { return matchParts(api.ValKeyPath, "/", parts, params, e, false) }, nil } -func (m *EventMatcher) pathPrefix(s string) (kubefox.EventPredicate, error) { +func (m *EventMatcher) pathPrefix(s string) (core.EventPredicate, error) { parts, params, err := split(s, '/') if err != nil { return nil, err } - return func(e *kubefox.Event) bool { + return func(e *core.Event) bool { return matchParts(api.ValKeyPath, "/", parts, params, e, true) }, nil } -func (m *EventMatcher) query(key, val string) (kubefox.EventPredicate, error) { +func (m *EventMatcher) query(key, val string) (core.EventPredicate, error) { if key == "" { return nil, fmt.Errorf("query param key must be provided") } @@ -190,34 +190,34 @@ func (m *EventMatcher) query(key, val string) (kubefox.EventPredicate, error) { return nil, fmt.Errorf("invalid regex of query predicate %s: %w", val, err) } - return func(e *kubefox.Event) bool { + return func(e *core.Event) bool { return matchMap(key, val, regex, e.ValueMap(api.ValKeyQuery)) }, nil } -func (m *EventMatcher) eventType(s string) kubefox.EventPredicate { - return func(e *kubefox.Event) bool { +func (m *EventMatcher) eventType(s string) core.EventPredicate { + return func(e *core.Event) bool { return e.GetType() == s } } // Logical operator AND that combines predicates -func and(a, b kubefox.EventPredicate) kubefox.EventPredicate { - return func(e *kubefox.Event) bool { +func and(a, b core.EventPredicate) core.EventPredicate { + return func(e *core.Event) bool { return a(e) && b(e) } } // Logical operator OR that combines predicates -func or(a, b kubefox.EventPredicate) kubefox.EventPredicate { - return func(e *kubefox.Event) bool { +func or(a, b core.EventPredicate) core.EventPredicate { + return func(e *core.Event) bool { return a(e) || b(e) } } // Logical operator NOT that negates predicates -func not(a kubefox.EventPredicate) kubefox.EventPredicate { - return func(e *kubefox.Event) bool { +func not(a core.EventPredicate) core.EventPredicate { + return func(e *core.Event) bool { return !a(e) } } @@ -252,7 +252,7 @@ func matchMap(key, val string, regex *regexp.Regexp, m map[string][]string) bool return false } -func matchParts(val string, sep string, parts []string, params map[int]*param, e *kubefox.Event, prefix bool) bool { +func matchParts(val string, sep string, parts []string, params map[int]*param, e *core.Event, prefix bool) bool { evtParts := strings.Split(strings.Trim(e.Value(val), sep), sep) if len(parts) > len(evtParts) { diff --git a/matcher/event_matcher_test.go b/matcher/event_matcher_test.go index 0ea33f7..4389f87 100644 --- a/matcher/event_matcher_test.go +++ b/matcher/event_matcher_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/xigxog/kubefox/api" - kubefox "github.com/xigxog/kubefox/core" + "github.com/xigxog/kubefox/core" ) func TestPath(t *testing.T) { @@ -16,33 +16,33 @@ func TestPath(t *testing.T) { "c": api.ValArrayString([]string{"a", "b"}), } - r1 := &kubefox.Route{ + r1 := &core.Route{ RouteSpec: api.RouteSpec{ Id: 1, Rule: "PathPrefix(`/customize/{{.Env.b}}`)", }, - Component: &kubefox.Component{}, - EventContext: &kubefox.EventContext{}, + Component: &core.Component{}, + EventContext: &core.EventContext{}, } if err := r1.Resolve(v, nil); err != nil { t.Log(err) t.FailNow() } - r2 := &kubefox.Route{ + r2 := &core.Route{ RouteSpec: api.RouteSpec{ Id: 2, Rule: "Method(`PUT`,`GET`,`POST`) && (Query(`q1`, `{q[1-2]}`) && Header(`header-one`,`{[a-z0-9]+}`)) && Host(`{{.Env.a}}.0.0.{i}`) && Path(`/customize/{{.Env.b}}/{j:[a-z]+}`)", }, - Component: &kubefox.Component{}, - EventContext: &kubefox.EventContext{}, + Component: &core.Component{}, + EventContext: &core.EventContext{}, } if err := r2.Resolve(v, nil); err != nil { t.Log(err) t.FailNow() } - if err := p.AddRoutes([]*kubefox.Route{r1, r2}); err != nil { + if err := p.AddRoutes([]*core.Route{r1, r2}); err != nil { t.Log(err) t.FailNow() } @@ -62,8 +62,8 @@ func TestPath(t *testing.T) { } -func evt(evtType api.EventType) *kubefox.Event { - evt := kubefox.NewEvent() +func evt(evtType api.EventType) *core.Event { + evt := core.NewEvent() evt.Type = string(evtType) evt.SetValue(api.ValKeyURL, "http://127.0.0.1/customize/1/a?q1=q1&q2=q2") evt.SetValue(api.ValKeyHost, "127.0.0.1")