Skip to content

Commit

Permalink
Fix: filtered records holding up pipeline with destination batching (#…
Browse files Browse the repository at this point in the history
…1987)

* Fix: filtered records holding up pipeline with destination batching

* fix test

* linter fix

* change logs level

* update destination_acker_test.go

* update destination_test.go

* linter

* Update pkg/lifecycle/stream/destination_test.go

Co-authored-by: Maha Hajja <[email protected]>

* comment

---------

Co-authored-by: Maha Hajja <[email protected]>
Co-authored-by: Maha Hajja <[email protected]>
  • Loading branch information
3 people authored Nov 26, 2024
1 parent 5a0923e commit 43fe0a1
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 34 deletions.
2 changes: 1 addition & 1 deletion pkg/conduit/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestRuntime(t *testing.T) {
go func() {
errC <- r.Run(ctx)
}()
err, got, recvErr := cchan.ChanOut[error](errC).RecvTimeout(context.Background(), 100*time.Second)
err, got, recvErr := cchan.ChanOut[error](errC).RecvTimeout(context.Background(), time.Second)
is.NoErr(recvErr)
is.True(got)
if !cerrors.Is(err, context.Canceled) {
Expand Down
9 changes: 9 additions & 0 deletions pkg/lifecycle/stream/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ func (n *DestinationNode) Run(ctx context.Context) (err error) {
if err != nil || msg == nil {
return err
}
if msg.filtered {
n.logger.Debug(ctx).Str(log.MessageIDField, msg.ID()).
Msg("message marked as filtered, sending directly to next node")
err = n.base.Send(ctx, n.logger, msg)
if err != nil {
return msg.Nack(err, n.ID())
}
continue
}

n.logger.Trace(msg.Ctx).Msg("writing record to destination connector")

Expand Down
29 changes: 20 additions & 9 deletions pkg/lifecycle/stream/destination_acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ type DestinationAckerNode struct {
// queue is used to store messages
queue deque.Deque[*Message]

// m guards access to queue
m sync.Mutex
queueMutex sync.Mutex

// mctx guards access to the contextCtxCancel function
mctx sync.Mutex
Expand Down Expand Up @@ -96,9 +95,9 @@ func (n *DestinationAckerNode) Run(ctx context.Context) (err error) {
return err
}

n.m.Lock()
n.queueMutex.Lock()
n.queue.PushBack(msg)
n.m.Unlock()
n.queueMutex.Unlock()
select {
case signalChan <- struct{}{}:
// triggered the start of listening to acks in worker goroutine
Expand All @@ -116,9 +115,9 @@ func (n *DestinationAckerNode) worker(
) {
handleError := func(msg *Message, err error) {
// push message back to the front of the queue and return error
n.m.Lock()
n.queueMutex.Lock()
n.queue.PushFront(msg)
n.m.Unlock()
n.queueMutex.Unlock()

errChan <- err
}
Expand All @@ -131,13 +130,25 @@ func (n *DestinationAckerNode) worker(
// let's start fetching acks for messages in the queue
for {
// check if there are more messages waiting in the queue
n.m.Lock()
n.queueMutex.Lock()
if n.queue.Len() == 0 {
n.m.Unlock()
n.queueMutex.Unlock()
break
}
msg := n.queue.PopFront()
n.m.Unlock()
n.queueMutex.Unlock()

if msg.filtered {
n.logger.Trace(ctx).
Str(log.MessageIDField, msg.ID()).
Msg("acking filtered message")
err := n.handleAck(msg, nil)
if err != nil {
errChan <- err
return
}
continue
}

if len(acks) == 0 {
// Ack can return multiple acks, store them and check the position
Expand Down
54 changes: 54 additions & 0 deletions pkg/lifecycle/stream/destination_acker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,60 @@ func TestDestinationAckerNode_Cache(t *testing.T) {
ackHandlerWg.Wait() // all ack handler should be called by now
}

func TestDestinationAckerNode_AckFilteredRecords(t *testing.T) {
is := is.New(t)
ctx := context.Background()
ctrl := gomock.NewController(t)
dest := mock.NewDestination(ctrl)

node := &DestinationAckerNode{
Name: "destination-acker-node",
Destination: dest,
}

in := make(chan *Message)
node.Sub(in)

nodeDone := make(chan struct{})
go func() {
defer close(nodeDone)
err := node.Run(ctx)
is.NoErr(err)
}()

// up to this point there should have been no calls to the destination
// only after a received message should the node try to fetch the ack
msg := &Message{
filtered: true,
Record: opencdc.Record{Position: opencdc.Position("test-position")},
}
ackHandlerDone := make(chan struct{})
msg.RegisterAckHandler(func(got *Message) error {
defer close(ackHandlerDone)
is.Equal(msg, got)
return nil
})
in <- msg // send message to incoming channel

select {
case <-time.After(time.Second):
is.Fail() // expected ack handler to be called
case <-ackHandlerDone:
// all good
}

// note that there should be no calls to the destination at all if the node
// didn't receive any messages
close(in)

select {
case <-time.After(time.Second):
is.Fail() // expected node to stop running
case <-nodeDone:
// all good
}
}

func TestDestinationAckerNode_ForwardAck(t *testing.T) {
is := is.New(t)
ctx := context.Background()
Expand Down
83 changes: 70 additions & 13 deletions pkg/lifecycle/stream/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ package stream
import (
"context"
"io"
"sync"
"testing"
"time"

"github.com/conduitio/conduit-commons/cchan"
"github.com/conduitio/conduit-commons/csync"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/metrics/noop"
Expand All @@ -40,39 +42,39 @@ func TestDestinationNode_ForceStop(t *testing.T) {
}{{
name: "Destination.Open blocks",
mockDestination: func(onStuck chan struct{}) *mock.Destination {
src := mock.NewDestination(ctrl)
src.EXPECT().ID().Return("destination-connector").AnyTimes()
src.EXPECT().Errors().Return(make(chan error))
src.EXPECT().Open(gomock.Any()).DoAndReturn(func(ctx context.Context) error {
dest := mock.NewDestination(ctrl)
dest.EXPECT().ID().Return("destination-connector").AnyTimes()
dest.EXPECT().Errors().Return(make(chan error))
dest.EXPECT().Open(gomock.Any()).DoAndReturn(func(ctx context.Context) error {
close(onStuck)
<-ctx.Done() // block until context is done
return ctx.Err()
})
return src
return dest
},
wantMsg: false,
wantErr: context.Canceled,
}, {
name: "Destination.Write blocks",
mockDestination: func(onStuck chan struct{}) *mock.Destination {
var connectorCtx context.Context
src := mock.NewDestination(ctrl)
src.EXPECT().ID().Return("destination-connector").AnyTimes()
src.EXPECT().Errors().Return(make(chan error))
src.EXPECT().Teardown(gomock.Any()).Return(nil)
src.EXPECT().Open(gomock.Any()).DoAndReturn(func(ctx context.Context) error {
dest := mock.NewDestination(ctrl)
dest.EXPECT().ID().Return("destination-connector").AnyTimes()
dest.EXPECT().Errors().Return(make(chan error))
dest.EXPECT().Teardown(gomock.Any()).Return(nil)
dest.EXPECT().Open(gomock.Any()).DoAndReturn(func(ctx context.Context) error {
// the connector opens the stream in open and keeps it open
// until the context is open
connectorCtx = ctx
return nil
})
src.EXPECT().Write(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, r []opencdc.Record) error {
dest.EXPECT().Write(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, r []opencdc.Record) error {
close(onStuck)
<-connectorCtx.Done() // block until connector stream is closed
return io.EOF // io.EOF is returned when the stream is closed
})
src.EXPECT().Stop(gomock.Any(), gomock.Any()).Return(nil)
return src
dest.EXPECT().Stop(gomock.Any(), gomock.Any()).Return(nil)
return dest
},
wantMsg: true,
wantErr: io.EOF,
Expand Down Expand Up @@ -133,3 +135,58 @@ func TestDestinationNode_ForceStop(t *testing.T) {
})
}
}

func TestDestinationNode_HandleFilteredMessage(t *testing.T) {
is := is.New(t)
ctx := context.Background()
ctrl := gomock.NewController(t)
dest := mock.NewDestination(ctrl)
// A filtered message is passing through this node without being sent
// to the destination, hence no Destination.Write() call here.
dest.EXPECT().Errors().Return(make(chan error))
dest.EXPECT().Open(gomock.Any()).Return(nil)
dest.EXPECT().Stop(gomock.Any(), gomock.Any()).Return(nil)
dest.EXPECT().Teardown(gomock.Any()).Return(nil)

node := &DestinationNode{
Name: "destination-acker-node",
Destination: dest,
}

in := make(chan *Message)
node.Sub(in)
out := node.Pub()

var nodeStopped sync.WaitGroup
nodeStopped.Add(1)

go func() {
defer nodeStopped.Done()
err := node.Run(ctx)
is.NoErr(err)
}()

// up to this point there should have been no calls to the destination
// only after a received message should the node try to fetch the ack
msg := &Message{
filtered: true,
Record: opencdc.Record{Position: opencdc.Position("test-position")},
}
go func() {
// send message to incoming channel
err := cchan.ChanIn[*Message](in).SendTimeout(ctx, msg, 100*time.Millisecond)
is.NoErr(err) // expected message to be sent to the destination node's Sub channel

// note that there should be no calls to the destination at all if the node
// didn't receive any messages
close(in)
}()

gotMsg, gotMsgBool, err := cchan.ChanOut[*Message](out).RecvTimeout(ctx, 100*time.Millisecond)
is.NoErr(err) // expected node to close outgoing channel
is.True(gotMsgBool) // expected node to close outgoing channel
is.Equal(msg, gotMsg)

err = (*csync.WaitGroup)(&nodeStopped).WaitTimeout(ctx, 100*time.Millisecond)
is.NoErr(err) // timed out waiting for node to be done running
}
9 changes: 8 additions & 1 deletion pkg/lifecycle/stream/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,21 @@ type Message struct {
acked chan struct{}
nacked chan struct{}

// filtered indicates whether the message holds a filtered record.
// Such a message needs to be acknowledged.
// This is done by letting the message pass through all the nodes
// (without being processed by them), until the destination acker
// node acknowledges it.
filtered bool

// handler is executed when Ack or Nack is called.
handler StatusChangeHandler

// hasNackHandler is true if at least one nack handler was registered.
hasNackHandler bool

// ackNackReturnValue is cached the first time Ack or Nack is executed.
ackNackReturnValue error

// initOnce is guarding the initialization logic of a message.
initOnce sync.Once
// ackNackOnce is guarding the acking/nacking logic of a message.
Expand Down
10 changes: 10 additions & 0 deletions pkg/lifecycle/stream/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ func (n *MetricsNode) Run(ctx context.Context) error {
return err
}

if msg.filtered {
n.logger.Trace(ctx).Str(log.MessageIDField, msg.ID()).
Msg("message marked as filtered, sending directly to next node")
err = n.base.Send(ctx, n.logger, msg)
if err != nil {
return msg.Nack(err, n.ID())
}
continue
}

msg.RegisterAckHandler(func(msg *Message) error {
n.Histogram.Observe(msg.Record)
return nil
Expand Down
19 changes: 16 additions & 3 deletions pkg/lifecycle/stream/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ func (n *ProcessorNode) Run(ctx context.Context) error {
return err
}

if msg.filtered {
n.logger.Trace(ctx).Str(log.MessageIDField, msg.ID()).
Msg("message marked as filtered, sending directly to next node")
err = n.base.Send(ctx, n.logger, msg)
if err != nil {
return msg.Nack(err, n.ID())
}
continue
}

executeTime := time.Now()
recsIn := []opencdc.Record{msg.Record}
recsOut := n.Processor.Process(msg.Ctx, recsIn)
Expand All @@ -104,10 +114,13 @@ func (n *ProcessorNode) Run(ctx context.Context) error {
return err
}
case sdk.FilterRecord:
// NB: Ack skipped messages since they've been correctly handled
err := msg.Ack()
msg.filtered = true
n.logger.Trace(ctx).
Str(log.MessageIDField, msg.ID()).
Msg("message marked as filtered, sending directly to next node")
err = n.base.Send(ctx, n.logger, msg)
if err != nil {
return cerrors.Errorf("failed to ack skipped message: %w", err)
return msg.Nack(err, n.ID())
}
case sdk.ErrorRecord:
err = msg.Nack(v.Error, n.ID())
Expand Down
Loading

0 comments on commit 43fe0a1

Please sign in to comment.