Skip to content

Commit

Permalink
fix: fixed go race
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikelle committed Dec 9, 2024
1 parent 7cc5d6c commit 70ccfff
Showing 1 changed file with 77 additions and 61 deletions.
138 changes: 77 additions & 61 deletions cl/relayer/relayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"net"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -136,39 +138,100 @@ func TestSubscribe(t *testing.T) {
pb.RegisterRelayerServer(r.server, r)

lis := bufconn.Listen(1024 * 1024)

serverDone := make(chan struct{})
var serverErr atomic.Value

mock.ExpectXGroupCreateMkStream(blockStreamName, "member_group:testClient", "0").SetVal("OK")

mock.ExpectXReadGroup(&redis.XReadGroupArgs{
Group: "member_group:testClient",
Consumer: "member_consumer:testClient",
Streams: []string{blockStreamName, "0"},
Count: 1,
Block: time.Second,
}).SetErr(redis.Nil)

mock.ExpectXReadGroup(&redis.XReadGroupArgs{
Group: "member_group:testClient",
Consumer: "member_consumer:testClient",
Streams: []string{blockStreamName, ">"},
Count: 1,
Block: time.Second,
}).SetVal([]redis.XStream{
{
Stream: blockStreamName,
Messages: []redis.XMessage{
{
ID: "123-1",
Values: map[string]interface{}{
"payload_id": "payload_123",
"execution_payload": "some_encoded_payload",
"sender_instance_id": "instance_abc",
},
},
},
},
})

ackCalled := make(chan struct{})

customMatch := func(expected, actual []interface{}) error {
if len(actual) >= 1 {
cmdName, ok := actual[0].(string)
if ok && strings.ToUpper(cmdName) == "XACK" {
select {
case <-ackCalled:
default:
close(ackCalled)
}
}
}
return nil
}

mock.CustomMatch(customMatch).ExpectXAck(blockStreamName, "member_group:testClient", "123-1").SetVal(int64(1))

go func() {
if err := r.server.Serve(lis); err != nil {
t.Errorf("Server exited with error: %v", err)
err := r.server.Serve(lis)
if err != nil && err != grpc.ErrServerStopped {
serverErr.Store(err)
}
close(serverDone)
}()

defer r.server.GracefulStop()
defer func() {
r.server.GracefulStop()
<-serverDone
if err, ok := serverErr.Load().(error); ok {
t.Errorf("Server error: %v", err)
}
}()

ctx, cancel := context.WithCancel(context.Background())
// Create a gRPC client
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

conn, err := grpc.NewClient(
"passthrough:///",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) {
return lis.Dial()
}))
}),
)
if err != nil {
t.Fatalf("failed to dial bufconn: %v", err)
}
defer conn.Close()

client := pb.NewRelayerClient(conn)

// Expect a consumer group to be created
mock.ExpectXGroupCreateMkStream(blockStreamName, "member_group:testClient", "0").SetVal("OK")

// Call Subscribe
stream, err := client.Subscribe(ctx)
if err != nil {
t.Fatalf("failed to call Subscribe: %v", err)
}

// Send initial subscribe request
err = stream.Send(&pb.ClientMessage{
Message: &pb.ClientMessage_SubscribeRequest{
SubscribeRequest: &pb.SubscribeRequest{
Expand All @@ -180,36 +243,6 @@ func TestSubscribe(t *testing.T) {
t.Fatalf("failed to send subscribe request: %v", err)
}

mock.ExpectXReadGroup(&redis.XReadGroupArgs{
Group: "member_group:testClient",
Consumer: "member_consumer:testClient",
Streams: []string{blockStreamName, "0"},
Count: 1,
Block: time.Second,
}).SetErr(redis.Nil)

mock.ExpectXReadGroup(&redis.XReadGroupArgs{
Group: "member_group:testClient",
Consumer: "member_consumer:testClient",
Streams: []string{blockStreamName, ">"},
Count: 1,
Block: time.Second,
}).SetVal([]redis.XStream{
{
Stream: blockStreamName,
Messages: []redis.XMessage{
{
ID: "123-1",
Values: map[string]interface{}{
"payload_id": "payload_123",
"execution_payload": "some_encoded_payload",
"sender_instance_id": "instance_abc",
},
},
},
},
})

recvMsg, err := stream.Recv()
if err != nil {
t.Fatalf("failed to receive message from server: %v", err)
Expand All @@ -218,8 +251,6 @@ func TestSubscribe(t *testing.T) {
t.Errorf("expected payload_123, got %s", recvMsg.GetPayloadId())
}

mock.ExpectXAck(blockStreamName, "member_group:testClient", "123-1").SetVal(1)

err = stream.Send(&pb.ClientMessage{
Message: &pb.ClientMessage_AckPayload{
AckPayload: &pb.AckPayloadRequest{
Expand All @@ -233,28 +264,13 @@ func TestSubscribe(t *testing.T) {
t.Fatalf("failed to send ack: %v", err)
}

mock.ExpectXReadGroup(&redis.XReadGroupArgs{
Group: "member_group:testClient",
Consumer: "member_consumer:testClient",
Streams: []string{blockStreamName, "0"},
Count: 1,
Block: time.Second,
}).SetErr(redis.Nil)

mock.ExpectXReadGroup(&redis.XReadGroupArgs{
Group: "member_group:testClient",
Consumer: "member_consumer:testClient",
Streams: []string{blockStreamName, ">"},
Count: 1,
Block: time.Second,
}).SetErr(redis.Nil)

// Give the server some time to process these reads
time.Sleep(100 * time.Millisecond)
select {
case <-ackCalled:
case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for XAck to be called")
}

if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet redis expectations: %v", err)
}

cancel()
}

0 comments on commit 70ccfff

Please sign in to comment.