Skip to content

Commit

Permalink
removed event kv
Browse files Browse the repository at this point in the history
  • Loading branch information
xadhatter committed Oct 19, 2023
1 parent 7f45856 commit 2d5a10a
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 63 deletions.
1 change: 0 additions & 1 deletion components/broker/engine/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,6 @@ func (brk *broker) startArchiver() {
if err != nil {
// This event will be lost in time, never to be seen again :(
log.Warnf("unable to publish event to JetStream, event not archived: %v", err)
continue
}

case <-brk.ctx.Done():
Expand Down
87 changes: 27 additions & 60 deletions components/broker/engine/jetstream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ type RecvMsg func(*nats.Msg)
type JetStreamClient struct {
jetstream.JetStream

nc *nats.Conn
eventsKV jetstream.KeyValue
compKV jetstream.KeyValue
nc *nats.Conn
compKV jetstream.KeyValue

brk Broker

Expand Down Expand Up @@ -67,10 +66,6 @@ func (c *JetStreamClient) Connect(ctx context.Context) error {
c.nc, err = nats.Connect(
fmt.Sprintf("nats://%s", config.NATSAddr),
nats.Name("broker-"+c.brk.Id()),
// nats.RetryOnFailedConnect(true),
// nats.MaxReconnects(3),
// nats.NoReconnect(),??
nats.PingInterval(time.Second*30),
nats.RootCAs(kubefox.PathCACert),
nats.ClientCert(kubefox.PathTLSCert, kubefox.PathTLSKey),
)
Expand All @@ -88,9 +83,6 @@ func (c *JetStreamClient) Connect(ctx context.Context) error {
if err := c.setupCompsKV(ctx); err != nil {
return err
}
if err := c.setupEventsKV(ctx); err != nil {
return err
}

c.log.Info("jetstream client connected")
return nil
Expand All @@ -116,7 +108,7 @@ func (c *JetStreamClient) setupStream(ctx context.Context) error {
MaxMsgSize: maxMsgSize,
Discard: jetstream.DiscardOld,
MaxAge: EventStreamTTL,
NoAck: true,
Duplicates: time.Millisecond * 100, // minimum value
})
if err != nil {
return c.log.ErrorN("unable to create events stream: %v", err)
Expand All @@ -139,34 +131,14 @@ func (c *JetStreamClient) setupCompsKV(ctx context.Context) (err error) {
return nil
}

func (c *JetStreamClient) setupEventsKV(ctx context.Context) (err error) {
c.eventsKV, err = c.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: eventStream,
Description: "Durable disk backed key/value store for Event ids. Values are retained for 3 days.",
Storage: jetstream.FileStorage,
TTL: EventStreamTTL,
})
if err != nil {
return c.log.ErrorN("unable to create archive key/value store: %w", err)
}

return nil
}

func (c *JetStreamClient) Close() {
c.mutex.Lock()
defer c.mutex.Unlock()

c.log.Info("jetstream client closing")

if c.nc != nil {
c.nc.Close()
}

}

func (c *JetStreamClient) EventsKV() jetstream.KeyValue {
return c.eventsKV
}

func (c *JetStreamClient) ComponentsKV() jetstream.KeyValue {
Expand All @@ -180,13 +152,13 @@ func (c *JetStreamClient) Publish(subject string, evt *kubefox.Event) error {
}

h := make(nats.Header)
h.Set(nats.MsgIdHdr, evt.Id)
// Note, use of `Nats-Msg-Id` would enable de-dupe and increase mem usage.
h.Set(kubefox.CloudEventId, evt.Id)

// Headers create sizeable overhead for storage. Disabling for now.
// Headers create sizeable overhead for storage. Disabling most for now.
//
// h.Set("ce_specversion", "1.0")
// h.Set("ce_type", evt.Type)
// h.Set("ce_id", evt.Id)
// h.Set("ce_time", time.Now().Format(time.RFC3339))
// h.Set("ce_source", fmt.Sprintf("kubefox:component:%s", evt.Source.Key()))
// h.Set("ce_dataschema", kubefox.DataSchemaKubefox)
Expand All @@ -207,10 +179,10 @@ func (c *JetStreamClient) PullEvents(sub ReplicaSubscription) error {
log := c.log.WithComponent(sub.Component())

consumer, err := c.JetStream.CreateOrUpdateConsumer(sub.Context(), eventStream, jetstream.ConsumerConfig{
Name: sub.Component().Key(),
FilterSubject: sub.Component().Subject(),
DeliverPolicy: jetstream.DeliverNewPolicy,
AckPolicy: jetstream.AckNonePolicy,
Name: sub.Component().Key(),
FilterSubject: sub.Component().Subject(),
DeliverPolicy: jetstream.DeliverNewPolicy,
InactiveThreshold: config.EventTTL * 5,
})
if err != nil {
return log.ErrorN("unable to create JetStream consumer for component: %v", err)
Expand All @@ -220,10 +192,9 @@ func (c *JetStreamClient) PullEvents(sub ReplicaSubscription) error {
if sub.IsGroupEnabled() {
grpConsumer, err = c.JetStream.CreateOrUpdateConsumer(sub.Context(), eventStream, jetstream.ConsumerConfig{
Name: sub.Component().GroupKey(),
Durable: sub.Component().GroupKey(),
FilterSubject: sub.Component().GroupSubject(),
AckPolicy: jetstream.AckNonePolicy,
InactiveThreshold: EventStreamTTL,
DeliverPolicy: jetstream.DeliverNewPolicy,
InactiveThreshold: config.EventTTL * 5,
})
if err != nil {
return log.ErrorN("unable to create JetStream consumer for group: %v", err)
Expand All @@ -233,7 +204,7 @@ func (c *JetStreamClient) PullEvents(sub ReplicaSubscription) error {
recvMsg := func(msg jetstream.Msg) {
evt := kubefox.NewEvent()
if err := proto.Unmarshal(msg.Data(), evt); err != nil {
evtId := msg.Headers().Get(jetstream.MsgIDHeader)
evtId := msg.Headers().Get(kubefox.CloudEventId)
log.With(logkf.KeyEventId, evtId).Warn("message contains invalid event data: %v", err)
return
}
Expand All @@ -250,42 +221,38 @@ func (c *JetStreamClient) PullEvents(sub ReplicaSubscription) error {
if err := c.brk.RecvEvent(rEvt); err != nil {
c.log.WithEvent(evt).Debug(err)
if evt.Target.Id == "" && errors.Is(err, ErrSubCanceled) {
// Any component replica can process, republish event.
if err := c.nc.PublishMsg(&nats.Msg{
Subject: msg.Subject(),
Header: msg.Headers(),
Data: msg.Data(),
}); err != nil {
c.log.WithEvent(evt).Debug(err)
}
// Any component replica can process, redeliver event.
log.Debug("nacking event from component group subject")
msg.Nak()
return
}
}
msg.Ack()
}

var (
conCtx jetstream.ConsumeContext
grpConCtx jetstream.ConsumeContext
consumerCtx jetstream.ConsumeContext
grpConsumerCtx jetstream.ConsumeContext
)
if conCtx, err = consumer.Consume(recvMsg); err != nil {
if consumerCtx, err = consumer.Consume(recvMsg); err != nil {
return err
}
if grpConsumer != nil {
if grpConCtx, err = grpConsumer.Consume(recvMsg); err != nil {
conCtx.Stop()
if grpConsumerCtx, err = grpConsumer.Consume(recvMsg); err != nil {
consumerCtx.Stop()
return err
}
}
go func() {
<-sub.Context().Done()
log.Debug("subscription closed, stopping consumers")
if grpConCtx != nil {
grpConCtx.Stop()
if grpConsumerCtx != nil {
grpConsumerCtx.Stop()
}
conCtx.Stop()

consumerCtx.Stop()
}()
log.Debug("consumers started")

log.Debug("consumers started")
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion components/operator/controller/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

const (
NATSImage = "ghcr.io/xigxog/nats:2.10.3"
NATSImage = "ghcr.io/xigxog/nats:2.9.23"
VaultImage = "ghcr.io/xigxog/vault:1.14.4-v0.2.1-alpha"
)

Expand Down
2 changes: 1 addition & 1 deletion components/operator/templates/nats/nats.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ max_payload: 5Mi
# NATS JetStream
jetstream {
max_mem: 0Gi
max_file: 8Gi
max_file: {{ .Values.volumeSize | default "8Gi" }}
store_dir: "/data"
}

Expand Down
4 changes: 4 additions & 0 deletions libs/core/kubefox/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,7 @@ var (
const (
DefaultRouteId = -1
)

const (
CloudEventId = "ce_id"
)

0 comments on commit 2d5a10a

Please sign in to comment.