Skip to content

Commit

Permalink
Merge pull request #66 from replicase/fix/introduce-ack-tracker
Browse files Browse the repository at this point in the history
fix: introduce ack tracker to prevent loss batch message
  • Loading branch information
KennyChenFight authored Sep 2, 2024
2 parents acdc215 + 10fc2b9 commit 51636dd
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 49 deletions.
60 changes: 52 additions & 8 deletions pkg/source/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,22 @@ package source
import (
"context"
"encoding/hex"
"fmt"
"os"
"time"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/bits-and-blooms/bitset"
"github.com/replicase/pgcapture/pkg/cursor"
"github.com/replicase/pgcapture/pkg/pb"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)

var ReceiverQueueSize = 5000
const (
ReceiverQueueSize = 5000
AckTrackerSize = 1000
)

type PulsarReaderSource struct {
BaseSource
Expand Down Expand Up @@ -160,9 +165,10 @@ type PulsarConsumerSource struct {
PulsarReplicateState bool
PulsarMaxReconnect *uint

client pulsar.Client
consumer pulsar.Consumer
log *logrus.Entry
client pulsar.Client
consumer pulsar.Consumer
log *logrus.Entry
ackTrackers map[string]*ackTracker
}

func (p *PulsarConsumerSource) Capture(cp cursor.Checkpoint) (changes chan Change, err error) {
Expand All @@ -189,6 +195,8 @@ func (p *PulsarConsumerSource) Capture(cp cursor.Checkpoint) (changes chan Chang
return nil, err
}

p.ackTrackers = make(map[string]*ackTracker, AckTrackerSize)

p.log = logrus.WithFields(logrus.Fields{
"From": "PulsarConsumerSource",
"Topic": p.PulsarTopic,
Expand Down Expand Up @@ -222,9 +230,11 @@ func (p *PulsarConsumerSource) Capture(cp cursor.Checkpoint) (changes chan Chang
first = true
}

if m.GetChange() == nil {
p.consumer.Ack(msg)
return
if msg.ID().BatchSize() > 1 {
key := p.ackTrackerKey(msg.ID())
if _, ok := p.ackTrackers[key]; !ok {
p.ackTrackers[key] = newAckTracker(uint(msg.ID().BatchSize()))
}
}

change = Change{Checkpoint: checkpoint, Message: m}
Expand All @@ -239,7 +249,13 @@ func (p *PulsarConsumerSource) Capture(cp cursor.Checkpoint) (changes chan Chang

func (p *PulsarConsumerSource) Commit(cp cursor.Checkpoint) {
if mid, err := pulsar.DeserializeMessageID(cp.Data); err == nil {
p.consumer.AckID(mid)
tracker, ok := p.ackTrackers[p.ackTrackerKey(mid)]
if ok && tracker.ack(int(mid.BatchIdx())) {
_ = p.consumer.AckID(mid)
delete(p.ackTrackers, p.ackTrackerKey(mid))
} else if !ok {
_ = p.consumer.AckID(mid)
}
}
}

Expand All @@ -248,3 +264,31 @@ func (p *PulsarConsumerSource) Requeue(cp cursor.Checkpoint, reason string) {
p.consumer.NackID(mid)
}
}

func (p *PulsarConsumerSource) ackTrackerKey(id pulsar.MessageID) string {
return fmt.Sprintf("%d:%d", id.LedgerID(), id.EntryID())
}

type ackTracker struct {
size uint
batchIDs *bitset.BitSet
}

func newAckTracker(size uint) *ackTracker {
batchIDs := bitset.New(size)
for i := uint(0); i < size; i++ {
batchIDs.Set(i)
}
return &ackTracker{
size: size,
batchIDs: batchIDs,
}
}

func (t *ackTracker) ack(batchID int) bool {
if batchID < 0 {
return true
}
t.batchIDs.Clear(uint(batchID))
return t.batchIDs.None()
}
101 changes: 60 additions & 41 deletions pkg/source/pulsar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ func TestPulsarConsumerSource(t *testing.T) {
defer client.Close()

producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
Name: topic,
Topic: topic,
Name: topic,
BatchingMaxPublishDelay: 3 * time.Second,
BatchingMaxMessages: 3,
})
if err != nil {
t.Fatal(err)
Expand All @@ -155,75 +157,92 @@ func TestPulsarConsumerSource(t *testing.T) {
t.Fatal(err)
}

// begin, commit message should be ignored
cp := cursor.Checkpoint{LSN: 1}
bs, _ := proto.Marshal(&pb.Message{Type: &pb.Message_Begin{Begin: &pb.Begin{}}})
if _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Key: cp.ToKey(),
Payload: bs,
}); err != nil {
t.Fatal(err)
}
cp = cursor.Checkpoint{LSN: 1}
bs, _ = proto.Marshal(&pb.Message{Type: &pb.Message_Commit{Commit: &pb.Commit{}}})
if _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Key: cp.ToKey(),
Payload: bs,
}); err != nil {
t.Fatal(err)
}

lsn := 0
for ; lsn < 3; lsn++ {
cp := cursor.Checkpoint{LSN: uint64(lsn)}
bs, _ := proto.Marshal(&pb.Message{Type: &pb.Message_Change{Change: &pb.Change{Table: strconv.Itoa(lsn)}}})
if _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
messages := []*pb.Message{
{
Type: &pb.Message_Begin{Begin: &pb.Begin{FinalLsn: 0}},
},
{
Type: &pb.Message_Change{Change: &pb.Change{Table: "1"}},
},
{
Type: &pb.Message_Change{Change: &pb.Change{Table: "2"}},
},
{
Type: &pb.Message_Commit{Commit: &pb.Commit{CommitLsn: 3}},
},
}

for i, m := range messages {
cp := cursor.Checkpoint{LSN: uint64(i)}
bs, _ := proto.Marshal(m)
producer.SendAsync(context.Background(), &pulsar.ProducerMessage{
Key: cp.ToKey(),
Payload: bs,
}); err != nil {
t.Fatal(err)
}
}, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
if err != nil {
t.Fatal(err)
}
})
}
producer.Flush()

for i := 0; i < lsn; i++ {
for i := 0; i < len(messages); i++ {
change := <-changes
if c := change.Message.GetChange(); c == nil || c.Table != strconv.Itoa(i) {
t.Fatalf("unexpected %v", change.Message.String())
msgID, err := pulsar.DeserializeMessageID(change.Checkpoint.Data)
if err != nil {
t.Fatal(err)
}
if msgID.BatchSize() > 1 && msgID.BatchSize() != int32(len(messages)-1) {
t.Fatalf("unexpected pulsar message id batch size %v %v", msgID.BatchSize(), len(messages))
}

if i == 0 {
if b := change.Message.GetBegin(); b == nil || b.FinalLsn != uint64(i) {
t.Fatalf("unexpected begin message %v", change.Message.String())
}
} else if i == len(messages)-1 {
if c := change.Message.GetCommit(); c == nil || c.CommitLsn != uint64(len(messages)-1) {
t.Fatalf("unexpected commit message %v", change.Message.String())
}
} else {
if c := change.Message.GetChange(); c == nil || c.Table != strconv.Itoa(i) {
t.Fatalf("unexpected change message %v", change.Message.String())
}
}
}
// stop without ack
src.Stop()

// restart to receive same messages, and commit '0' and '2', but abort '1'
// restart to receive same messages, and only abort the last message of the batch
src = newPulsarConsumerSource()
changes, err = src.Capture(cursor.Checkpoint{})
if err != nil {
t.Fatal(err)
}
for i := 0; i < lsn; i++ {

for i := 0; i < len(messages); i++ {
change := <-changes
if c := change.Message.GetChange(); c == nil || c.Table != strconv.Itoa(i) {
t.Fatalf("unexpected %v", change.Message.String())
}
if i == 1 {
if i == 2 {
src.Requeue(change.Checkpoint, "")
} else {
src.Commit(change.Checkpoint)
}
}
src.Stop()

// the '1' message should be redelivered
// restart to receive same batch messages
src = newPulsarConsumerSource()
changes, err = src.Capture(cursor.Checkpoint{})
if err != nil {
t.Fatal(err)
}
change := <-changes
if c := change.Message.GetChange(); c == nil || c.Table != strconv.Itoa(1) {
t.Fatalf("unexpected %v", change.Message.String())

// should only redeliver same batch messages
for i := 0; i < len(messages)-1; i++ {
change := <-changes
src.Commit(change.Checkpoint)
}

select {
case <-changes:
t.Fatal("unexpected message")
Expand Down

0 comments on commit 51636dd

Please sign in to comment.