From f80993a02318f6c2f081c299d15b48a316e1ef29 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 9 Dec 2024 14:23:03 +0530 Subject: [PATCH] Changes for supporting DescribeConsumerGroups with new protocol --- .../admin_describe_consumer_groups.go | 3 +- kafka/adminapi.go | 18 +- kafka/integration_test.go | 162 ++++++++++++++++++ 3 files changed, 181 insertions(+), 2 deletions(-) diff --git a/examples/admin_describe_consumer_groups/admin_describe_consumer_groups.go b/examples/admin_describe_consumer_groups/admin_describe_consumer_groups.go index a16af9315..f2371e875 100644 --- a/examples/admin_describe_consumer_groups/admin_describe_consumer_groups.go +++ b/examples/admin_describe_consumer_groups/admin_describe_consumer_groups.go @@ -76,10 +76,11 @@ func main() { "IsSimpleConsumerGroup: %v\n"+ "PartitionAssignor: %s\n"+ "State: %s\n"+ + "Type: %s\n"+ "Coordinator: %+v\n"+ "Members: %+v\n", g.GroupID, g.Error, g.IsSimpleConsumerGroup, g.PartitionAssignor, - g.State, g.Coordinator, g.Members) + g.State, g.Type, g.Coordinator, g.Members) if includeAuthorizedOperations { fmt.Printf("Allowed operations: %s\n", g.AuthorizedOperations) } diff --git a/kafka/adminapi.go b/kafka/adminapi.go index 7490fed9c..2b4c1dbbb 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -335,6 +335,8 @@ type MemberDescription struct { Host string // Member assignment. Assignment MemberAssignment + // Member Target Assignment. + TargetAssignment MemberAssignment } // ConsumerGroupDescription represents the result of DescribeConsumerGroups for @@ -350,6 +352,8 @@ type ConsumerGroupDescription struct { PartitionAssignor string // Consumer group state. State ConsumerGroupState + // Consumer group type. + Type ConsumerGroupType // Consumer group coordinator (has ID == -1 if not known). Coordinator Node // Members list. @@ -1319,6 +1323,8 @@ func (a *AdminClient) cToConsumerGroupDescriptions( C.rd_kafka_ConsumerGroupDescription_partition_assignor(cGroup)) state := ConsumerGroupState( C.rd_kafka_ConsumerGroupDescription_state(cGroup)) + groupType := ConsumerGroupType( + C.rd_kafka_ConsumerGroupDescription_type(cGroup)) cNode := C.rd_kafka_ConsumerGroupDescription_coordinator(cGroup) coordinator := a.cToNode(cNode) @@ -1338,6 +1344,14 @@ func (a *AdminClient) cToConsumerGroupDescriptions( if cToppars != nil { memberAssignment.TopicPartitions = newTopicPartitionsFromCparts(cToppars) } + cMemberTargetAssignment := + C.rd_kafka_MemberDescription_target_assignment(cMember) + cTargetToppars := + C.rd_kafka_MemberAssignment_partitions(cMemberTargetAssignment) + memberTargetAssignment := MemberAssignment{} + if cTargetToppars != nil { + memberTargetAssignment.TopicPartitions = newTopicPartitionsFromCparts(cTargetToppars) + } members[midx] = MemberDescription{ ClientID: C.GoString( C.rd_kafka_MemberDescription_client_id(cMember)), @@ -1347,7 +1361,8 @@ func (a *AdminClient) cToConsumerGroupDescriptions( C.rd_kafka_MemberDescription_consumer_id(cMember)), Host: C.GoString( C.rd_kafka_MemberDescription_host(cMember)), - Assignment: memberAssignment, + Assignment: memberAssignment, + TargetAssignment: memberTargetAssignment, } } @@ -1362,6 +1377,7 @@ func (a *AdminClient) cToConsumerGroupDescriptions( Error: err, IsSimpleConsumerGroup: isSimple, PartitionAssignor: paritionAssignor, + Type: groupType, State: state, Coordinator: coordinator, Members: members, diff --git a/kafka/integration_test.go b/kafka/integration_test.go index d0fa2eb0d..b8f4ff93f 100644 --- a/kafka/integration_test.go +++ b/kafka/integration_test.go @@ -1220,6 +1220,168 @@ func (its *IntegrationTestSuite) TestAdminClient_ListAndDescribeConsumerGroups() return } } +func (its *IntegrationTestSuite) TestAdminClient_DescribeConsumerGroupsProtocolCompatibility() { + t := its.T() + + if testConsumerGroupProtocolClassic() { + t.Skipf("KIP-480: DescribeConsumerGroups protocol compatibility test skipped for Classic protocol") + } + + rand.Seed(time.Now().Unix()) + groupNew1 := fmt.Sprintf("%s-new-1-%d", testconf.GroupID, rand.Int()) + groupNew2 := fmt.Sprintf("%s-new-2-%d", testconf.GroupID, rand.Int()) + groupClassic1 := fmt.Sprintf("%s-classic-1-%d", testconf.GroupID, rand.Int()) + groupClassic2 := fmt.Sprintf("%s-classic-2-%d", testconf.GroupID, rand.Int()) + topic := fmt.Sprintf("%s-%d", testconf.TopicName, rand.Int()) + + ac := createAdminClient(t) + defer ac.Close() + + // Create topic + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _, err := ac.CreateTopics(ctx, []TopicSpecification{ + { + Topic: topic, + NumPartitions: 2, + }, + }) + if err != nil { + t.Fatalf("Topic creation failed: %v", err) + } + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _, err = ac.DeleteTopics(ctx, []string{topic}) + if err != nil { + t.Errorf("Topic deletion failed: %v", err) + } + }() + + // Create Consumer for groupNew1 + consumerNew1, err := createConsumer(t, topic, groupNew1, "consumer", "client-new-1") + if err != nil { + t.Fatalf("Failed to create consumer for groupNew1: %v", err) + } + defer consumerNew1.Close() + + // Create Consumer for groupNew2 + consumerNew2, err := createConsumer(t, topic, groupNew2, "consumer", "client-new-2") + if err != nil { + t.Fatalf("Failed to create consumer for groupNew2: %v", err) + } + defer consumerNew2.Close() + + // Create Consumer for groupClassic1 + consumerClassic1, err := createConsumer(t, topic, groupClassic1, "classic", "client-classic-1") + if err != nil { + t.Fatalf("Failed to create consumer for groupClassic1: %v", err) + } + defer consumerClassic1.Close() + + // Create Consumer for groupClassic2 + consumerClassic2, err := createConsumer(t, topic, groupClassic2, "classic", "client-classic-2") + if err != nil { + t.Fatalf("Failed to create consumer for groupClassic2: %v", err) + } + defer consumerClassic2.Close() + + // Prepare mappings + groupIDToClientIDToPartitions := map[string]map[string][]TopicPartition{ + groupNew1: { + "client-new-1": {{Topic: &topic, Partition: 0, Offset: OffsetInvalid}, {Topic: &topic, Partition: 1, Offset: OffsetInvalid}}, + }, + groupNew2: { + "client-new-2": {{Topic: &topic, Partition: 0, Offset: OffsetInvalid}, {Topic: &topic, Partition: 1, Offset: OffsetInvalid}}, + }, + groupClassic1: { + "client-classic-1": {{Topic: &topic, Partition: 0, Offset: OffsetInvalid}, {Topic: &topic, Partition: 1, Offset: OffsetInvalid}}, + }, + groupClassic2: { + "client-classic-2": {{Topic: &topic, Partition: 0, Offset: OffsetInvalid}, {Topic: &topic, Partition: 1, Offset: OffsetInvalid}}, + }, + } + groupIDToType := map[string]ConsumerGroupType{ + groupNew1: ConsumerGroupTypeConsumer, + groupNew2: ConsumerGroupTypeConsumer, + groupClassic1: ConsumerGroupTypeClassic, + groupClassic2: ConsumerGroupTypeClassic, + } + + // Test 1: Describe new protocol groups + describeConsumerGroupsAndVerify( + t, ac, []string{groupNew1, groupNew2}, "consumer", groupIDToClientIDToPartitions, groupIDToType, + ) + + // Test 2: Describe classic protocol groups + describeConsumerGroupsAndVerify( + t, ac, []string{groupClassic1, groupClassic2}, "classic", groupIDToClientIDToPartitions, groupIDToType, + ) + + // Test 3: Describe mixed protocol groups + describeConsumerGroupsAndVerify( + t, ac, []string{groupNew1, groupNew2, groupClassic1, groupClassic2}, "mixed", groupIDToClientIDToPartitions, groupIDToType, + ) +} + +func createConsumer(t *testing.T, topic, groupID, protocol, clientID string) (*Consumer, error) { + config := &ConfigMap{ + "bootstrap.servers": testconf.Brokers, + "group.id": groupID, + "auto.offset.reset": "earliest", + "enable.auto.offset.store": false, + "partition.assignment.strategy": "range", + "group.protocol": protocol, + "client.id": clientID, + } + config.updateFromTestconf() + + consumer, err := NewConsumer(config) + if err != nil { + return nil, fmt.Errorf("failed to create consumer for group %s: %v", groupID, err) + } + + consumer.Subscribe(topic, nil) + consumer.Poll(10 * 1000) // Trigger rebalance + return consumer, nil +} + +func describeConsumerGroupsAndVerify( + t *testing.T, ac *AdminClient, groupIDs []string, scenario string, clientIDToPartitions map[string]map[string][]TopicPartition, groupIDToType map[string]ConsumerGroupType, +) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + groupDescResult, err := ac.DescribeConsumerGroups(ctx, groupIDs, SetAdminRequestTimeout(30*time.Second)) + if err != nil { + t.Fatalf("Error describing consumer groups in scenario %s: %v", scenario, err) + } + + groupDescs := groupDescResult.ConsumerGroupDescriptions + if len(groupDescs) != len(groupIDs) { + t.Fatalf("Expected %d group descriptions, got %d in scenario %s", len(groupIDs), len(groupDescs), scenario) + } + + for _, groupDesc := range groupDescs { + clientID := groupDesc.Members[0].ClientID + partitions := clientIDToPartitions[clientID] + expectedGroupType := groupIDToType[groupDesc.GroupID] + + if len(groupDesc.Members) != 1 { + t.Errorf("Expected exactly one member for group %s in scenario %s", groupDesc.GroupID, scenario) + } + + // Verify partition assignment and group type + if len(partitions) > 0 && !checkGroupDesc(&groupDesc, ConsumerGroupStateStable, groupDesc.GroupID, "range", clientIDToPartitions[groupDesc.GroupID]) { + t.Errorf("Group %s description does not match expected result in scenario %s: %v", groupDesc.GroupID, scenario, groupDesc) + } + + // Verify group type + if expectedGroupType != groupDesc.Type { + t.Errorf("Expected group %s to have type %s, got %s", groupDesc.GroupID, expectedGroupType.String(), groupDesc.Type) + } + } +} // TestAdminClient_DescribeConsumerGroupsAuthorizedOperations validates the // working of the DescribeConsumerGroups API of the admin client for fetching