From 70ccfffd3940399e5109df9be5383127a33ce6cf Mon Sep 17 00:00:00 2001 From: Mikhail Wall Date: Mon, 9 Dec 2024 22:02:30 +0100 Subject: [PATCH] fix: fixed go race --- cl/relayer/relayer_test.go | 138 +++++++++++++++++++++---------------- 1 file changed, 77 insertions(+), 61 deletions(-) diff --git a/cl/relayer/relayer_test.go b/cl/relayer/relayer_test.go index a3cd310dd..1d304741d 100644 --- a/cl/relayer/relayer_test.go +++ b/cl/relayer/relayer_test.go @@ -4,6 +4,8 @@ import ( "context" "errors" "net" + "strings" + "sync/atomic" "testing" "time" @@ -136,15 +138,78 @@ func TestSubscribe(t *testing.T) { pb.RegisterRelayerServer(r.server, r) lis := bufconn.Listen(1024 * 1024) + + serverDone := make(chan struct{}) + var serverErr atomic.Value + + mock.ExpectXGroupCreateMkStream(blockStreamName, "member_group:testClient", "0").SetVal("OK") + + mock.ExpectXReadGroup(&redis.XReadGroupArgs{ + Group: "member_group:testClient", + Consumer: "member_consumer:testClient", + Streams: []string{blockStreamName, "0"}, + Count: 1, + Block: time.Second, + }).SetErr(redis.Nil) + + mock.ExpectXReadGroup(&redis.XReadGroupArgs{ + Group: "member_group:testClient", + Consumer: "member_consumer:testClient", + Streams: []string{blockStreamName, ">"}, + Count: 1, + Block: time.Second, + }).SetVal([]redis.XStream{ + { + Stream: blockStreamName, + Messages: []redis.XMessage{ + { + ID: "123-1", + Values: map[string]interface{}{ + "payload_id": "payload_123", + "execution_payload": "some_encoded_payload", + "sender_instance_id": "instance_abc", + }, + }, + }, + }, + }) + + ackCalled := make(chan struct{}) + + customMatch := func(expected, actual []interface{}) error { + if len(actual) >= 1 { + cmdName, ok := actual[0].(string) + if ok && strings.ToUpper(cmdName) == "XACK" { + select { + case <-ackCalled: + default: + close(ackCalled) + } + } + } + return nil + } + + mock.CustomMatch(customMatch).ExpectXAck(blockStreamName, "member_group:testClient", "123-1").SetVal(int64(1)) + go func() { - if err := r.server.Serve(lis); err != nil { - t.Errorf("Server exited with error: %v", err) + err := r.server.Serve(lis) + if err != nil && err != grpc.ErrServerStopped { + serverErr.Store(err) } + close(serverDone) }() - defer r.server.GracefulStop() + defer func() { + r.server.GracefulStop() + <-serverDone + if err, ok := serverErr.Load().(error); ok { + t.Errorf("Server error: %v", err) + } + }() - ctx, cancel := context.WithCancel(context.Background()) + // Create a gRPC client + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() conn, err := grpc.NewClient( @@ -152,7 +217,8 @@ func TestSubscribe(t *testing.T) { grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { return lis.Dial() - })) + }), + ) if err != nil { t.Fatalf("failed to dial bufconn: %v", err) } @@ -160,15 +226,12 @@ func TestSubscribe(t *testing.T) { client := pb.NewRelayerClient(conn) - // Expect a consumer group to be created - mock.ExpectXGroupCreateMkStream(blockStreamName, "member_group:testClient", "0").SetVal("OK") - + // Call Subscribe stream, err := client.Subscribe(ctx) if err != nil { t.Fatalf("failed to call Subscribe: %v", err) } - // Send initial subscribe request err = stream.Send(&pb.ClientMessage{ Message: &pb.ClientMessage_SubscribeRequest{ SubscribeRequest: &pb.SubscribeRequest{ @@ -180,36 +243,6 @@ func TestSubscribe(t *testing.T) { t.Fatalf("failed to send subscribe request: %v", err) } - mock.ExpectXReadGroup(&redis.XReadGroupArgs{ - Group: "member_group:testClient", - Consumer: "member_consumer:testClient", - Streams: []string{blockStreamName, "0"}, - Count: 1, - Block: time.Second, - }).SetErr(redis.Nil) - - mock.ExpectXReadGroup(&redis.XReadGroupArgs{ - Group: "member_group:testClient", - Consumer: "member_consumer:testClient", - Streams: []string{blockStreamName, ">"}, - Count: 1, - Block: time.Second, - }).SetVal([]redis.XStream{ - { - Stream: blockStreamName, - Messages: []redis.XMessage{ - { - ID: "123-1", - Values: map[string]interface{}{ - "payload_id": "payload_123", - "execution_payload": "some_encoded_payload", - "sender_instance_id": "instance_abc", - }, - }, - }, - }, - }) - recvMsg, err := stream.Recv() if err != nil { t.Fatalf("failed to receive message from server: %v", err) @@ -218,8 +251,6 @@ func TestSubscribe(t *testing.T) { t.Errorf("expected payload_123, got %s", recvMsg.GetPayloadId()) } - mock.ExpectXAck(blockStreamName, "member_group:testClient", "123-1").SetVal(1) - err = stream.Send(&pb.ClientMessage{ Message: &pb.ClientMessage_AckPayload{ AckPayload: &pb.AckPayloadRequest{ @@ -233,28 +264,13 @@ func TestSubscribe(t *testing.T) { t.Fatalf("failed to send ack: %v", err) } - mock.ExpectXReadGroup(&redis.XReadGroupArgs{ - Group: "member_group:testClient", - Consumer: "member_consumer:testClient", - Streams: []string{blockStreamName, "0"}, - Count: 1, - Block: time.Second, - }).SetErr(redis.Nil) - - mock.ExpectXReadGroup(&redis.XReadGroupArgs{ - Group: "member_group:testClient", - Consumer: "member_consumer:testClient", - Streams: []string{blockStreamName, ">"}, - Count: 1, - Block: time.Second, - }).SetErr(redis.Nil) - - // Give the server some time to process these reads - time.Sleep(100 * time.Millisecond) + select { + case <-ackCalled: + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for XAck to be called") + } if err := mock.ExpectationsWereMet(); err != nil { t.Errorf("unmet redis expectations: %v", err) } - - cancel() }