Skip to content

Commit

Permalink
moved active evt into event proper
Browse files Browse the repository at this point in the history
  • Loading branch information
xadhatter committed Oct 18, 2023
1 parent e7ed6b1 commit 0dfa80b
Show file tree
Hide file tree
Showing 14 changed files with 638 additions and 671 deletions.
192 changes: 81 additions & 111 deletions components/broker/engine/broker.go

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions components/broker/engine/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ func (srv *GRPCServer) Subscribe(stream grpc.Broker_SubscribeServer) error {
return log.ErrorN("component registration failed: %v", err)
}

sendEvt := func(mEvt *kubefox.MatchedEvent) error {
sendEvt := func(evt *LiveEvent) error {
// Protect the stream from being called by multiple threads.
sendMutex.Lock()
defer sendMutex.Unlock()

srv.log.WithEvent(mEvt.Event).Debug("send event")
srv.log.WithEvent(evt.Event).Debug("send event")

if err := stream.Send(mEvt); err != nil {
if err := stream.Send(evt.MatchedEvent); err != nil {
return fmt.Errorf("%w: %v", ErrComponentGone, err)
}
return nil
Expand Down Expand Up @@ -197,9 +197,10 @@ func (srv *GRPCServer) Subscribe(stream grpc.Broker_SubscribeServer) error {

log = srv.log.WithEvent(evt)
log.Debug("receive event")
err = srv.brk.RecvEvent(&ReceivedEvent{
ActiveEvent: kubefox.StartEvent(evt),
Receiver: ReceiverGRPCServer,
err = srv.brk.RecvEvent(&LiveEvent{
Event: evt,
Receiver: ReceiverGRPCServer,
ReceivedAt: time.Now(),
})
if err != nil {
log.Debug(err)
Expand Down
12 changes: 7 additions & 5 deletions components/broker/engine/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net/http"
"os"
"time"

"github.com/google/uuid"
"github.com/xigxog/kubefox/libs/core/kubefox"
Expand Down Expand Up @@ -37,7 +38,7 @@ func NewHTTPClient(brk Broker) *HTTPClient {
}
}

func (c *HTTPClient) SendEvent(req ReceivedEvent) error {
func (c *HTTPClient) SendEvent(req *LiveEvent) error {
ctx, cancel := context.WithTimeout(context.Background(), req.TTL())
defer cancel()

Expand All @@ -51,7 +52,7 @@ func (c *HTTPClient) SendEvent(req ReceivedEvent) error {
return err
}

resp := kubefox.StartResp(kubefox.EventOpts{
resp := kubefox.NewResp(kubefox.EventOpts{
Parent: req.Event,
Source: c.comp,
Target: req.Source,
Expand All @@ -60,10 +61,11 @@ func (c *HTTPClient) SendEvent(req ReceivedEvent) error {
return err
}

rEvt := &ReceivedEvent{
ActiveEvent: resp,
Subscription: req.Subscription,
rEvt := &LiveEvent{
Event: resp,
Receiver: ReceiverHTTPClient,
ReceivedAt: time.Now(),
Subscription: req.Subscription,
}
if err := c.brk.RecvEvent(rEvt); err != nil {
return err
Expand Down
15 changes: 8 additions & 7 deletions components/broker/engine/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (srv *HTTPServer) ServeHTTP(resWriter http.ResponseWriter, httpReq *http.Re

resWriter.Header().Set(kubefox.HeaderAdapter, srv.comp.Key())

req := kubefox.StartReq(kubefox.EventOpts{
req := kubefox.NewReq(kubefox.EventOpts{
Source: srv.comp,
})
req.Ttl = config.EventTTL.Microseconds()
Expand All @@ -146,7 +146,7 @@ func (srv *HTTPServer) ServeHTTP(resWriter http.ResponseWriter, httpReq *http.Re
return
}

log := srv.log.WithEvent(req.Event)
log := srv.log.WithEvent(req)
log.Debug("received request")

srv.mutex.Lock()
Expand All @@ -160,10 +160,11 @@ func (srv *HTTPServer) ServeHTTP(resWriter http.ResponseWriter, httpReq *http.Re
srv.mutex.Unlock()
}()

rEvt := &ReceivedEvent{
ActiveEvent: req,
Receiver: ReceiverHTTPServer,
ErrCh: make(chan error),
rEvt := &LiveEvent{
Event: req,
Receiver: ReceiverHTTPServer,
ReceivedAt: time.Now(),
ErrCh: make(chan error),
}
if err := srv.brk.RecvEvent(rEvt); err != nil {
writeError(resWriter, context.Cause(ctx), log)
Expand Down Expand Up @@ -219,7 +220,7 @@ func (srv *HTTPServer) Subscription() Subscription {
return srv.sub
}

func (srv *HTTPServer) sendEvent(mEvt *kubefox.MatchedEvent) error {
func (srv *HTTPServer) sendEvent(mEvt *LiveEvent) error {
resp := mEvt.Event
srv.mutex.Lock()
respCh, found := srv.reqMap[resp.ParentId]
Expand Down
Loading

0 comments on commit 0dfa80b

Please sign in to comment.