Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 848: Added support for DescribeConsumerGroup for consumer protocol groups #1357

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ func main() {
"IsSimpleConsumerGroup: %v\n"+
"PartitionAssignor: %s\n"+
"State: %s\n"+
"Type: %s\n"+
"Coordinator: %+v\n"+
"Members: %+v\n",
g.GroupID, g.Error, g.IsSimpleConsumerGroup, g.PartitionAssignor,
g.State, g.Coordinator, g.Members)
g.State, g.Type, g.Coordinator, g.Members)
if includeAuthorizedOperations {
fmt.Printf("Allowed operations: %s\n", g.AuthorizedOperations)
}
Expand Down
18 changes: 17 additions & 1 deletion kafka/adminapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ type MemberDescription struct {
Host string
// Member assignment.
Assignment MemberAssignment
// Member Target Assignment.
TargetAssignment MemberAssignment
}

// ConsumerGroupDescription represents the result of DescribeConsumerGroups for
Expand All @@ -350,6 +352,8 @@ type ConsumerGroupDescription struct {
PartitionAssignor string
// Consumer group state.
State ConsumerGroupState
// Consumer group type.
Type ConsumerGroupType
// Consumer group coordinator (has ID == -1 if not known).
Coordinator Node
// Members list.
Expand Down Expand Up @@ -1319,6 +1323,8 @@ func (a *AdminClient) cToConsumerGroupDescriptions(
C.rd_kafka_ConsumerGroupDescription_partition_assignor(cGroup))
state := ConsumerGroupState(
C.rd_kafka_ConsumerGroupDescription_state(cGroup))
groupType := ConsumerGroupType(
C.rd_kafka_ConsumerGroupDescription_type(cGroup))

cNode := C.rd_kafka_ConsumerGroupDescription_coordinator(cGroup)
coordinator := a.cToNode(cNode)
Expand All @@ -1338,6 +1344,14 @@ func (a *AdminClient) cToConsumerGroupDescriptions(
if cToppars != nil {
memberAssignment.TopicPartitions = newTopicPartitionsFromCparts(cToppars)
}
cMemberTargetAssignment :=
C.rd_kafka_MemberDescription_target_assignment(cMember)
cTargetToppars :=
C.rd_kafka_MemberAssignment_partitions(cMemberTargetAssignment)
memberTargetAssignment := MemberAssignment{}
if cTargetToppars != nil {
memberTargetAssignment.TopicPartitions = newTopicPartitionsFromCparts(cTargetToppars)
}
members[midx] = MemberDescription{
ClientID: C.GoString(
C.rd_kafka_MemberDescription_client_id(cMember)),
Expand All @@ -1347,7 +1361,8 @@ func (a *AdminClient) cToConsumerGroupDescriptions(
C.rd_kafka_MemberDescription_consumer_id(cMember)),
Host: C.GoString(
C.rd_kafka_MemberDescription_host(cMember)),
Assignment: memberAssignment,
Assignment: memberAssignment,
TargetAssignment: memberTargetAssignment,
}
}

Expand All @@ -1362,6 +1377,7 @@ func (a *AdminClient) cToConsumerGroupDescriptions(
Error: err,
IsSimpleConsumerGroup: isSimple,
PartitionAssignor: paritionAssignor,
Type: groupType,
State: state,
Coordinator: coordinator,
Members: members,
Expand Down
162 changes: 162 additions & 0 deletions kafka/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,168 @@ func (its *IntegrationTestSuite) TestAdminClient_ListAndDescribeConsumerGroups()
return
}
}
func (its *IntegrationTestSuite) TestAdminClient_DescribeConsumerGroupsProtocolCompatibility() {
t := its.T()

if testConsumerGroupProtocolClassic() {
t.Skipf("KIP-480: DescribeConsumerGroups protocol compatibility test skipped for Classic protocol")
}

rand.Seed(time.Now().Unix())
groupNew1 := fmt.Sprintf("%s-new-1-%d", testconf.GroupID, rand.Int())
groupNew2 := fmt.Sprintf("%s-new-2-%d", testconf.GroupID, rand.Int())
groupClassic1 := fmt.Sprintf("%s-classic-1-%d", testconf.GroupID, rand.Int())
groupClassic2 := fmt.Sprintf("%s-classic-2-%d", testconf.GroupID, rand.Int())
topic := fmt.Sprintf("%s-%d", testconf.TopicName, rand.Int())

ac := createAdminClient(t)
defer ac.Close()

// Create topic
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err := ac.CreateTopics(ctx, []TopicSpecification{
{
Topic: topic,
NumPartitions: 2,
},
})
if err != nil {
t.Fatalf("Topic creation failed: %v", err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = ac.DeleteTopics(ctx, []string{topic})
if err != nil {
t.Errorf("Topic deletion failed: %v", err)
}
}()

// Create Consumer for groupNew1
consumerNew1, err := createConsumer(t, topic, groupNew1, "consumer", "client-new-1")
if err != nil {
t.Fatalf("Failed to create consumer for groupNew1: %v", err)
}
defer consumerNew1.Close()

// Create Consumer for groupNew2
consumerNew2, err := createConsumer(t, topic, groupNew2, "consumer", "client-new-2")
if err != nil {
t.Fatalf("Failed to create consumer for groupNew2: %v", err)
}
defer consumerNew2.Close()

// Create Consumer for groupClassic1
consumerClassic1, err := createConsumer(t, topic, groupClassic1, "classic", "client-classic-1")
if err != nil {
t.Fatalf("Failed to create consumer for groupClassic1: %v", err)
}
defer consumerClassic1.Close()

// Create Consumer for groupClassic2
consumerClassic2, err := createConsumer(t, topic, groupClassic2, "classic", "client-classic-2")
if err != nil {
t.Fatalf("Failed to create consumer for groupClassic2: %v", err)
}
defer consumerClassic2.Close()

// Prepare mappings
groupIDToClientIDToPartitions := map[string]map[string][]TopicPartition{
groupNew1: {
"client-new-1": {{Topic: &topic, Partition: 0, Offset: OffsetInvalid}, {Topic: &topic, Partition: 1, Offset: OffsetInvalid}},
},
groupNew2: {
"client-new-2": {{Topic: &topic, Partition: 0, Offset: OffsetInvalid}, {Topic: &topic, Partition: 1, Offset: OffsetInvalid}},
},
groupClassic1: {
"client-classic-1": {{Topic: &topic, Partition: 0, Offset: OffsetInvalid}, {Topic: &topic, Partition: 1, Offset: OffsetInvalid}},
},
groupClassic2: {
"client-classic-2": {{Topic: &topic, Partition: 0, Offset: OffsetInvalid}, {Topic: &topic, Partition: 1, Offset: OffsetInvalid}},
},
}
groupIDToType := map[string]ConsumerGroupType{
groupNew1: ConsumerGroupTypeConsumer,
groupNew2: ConsumerGroupTypeConsumer,
groupClassic1: ConsumerGroupTypeClassic,
groupClassic2: ConsumerGroupTypeClassic,
}

// Test 1: Describe new protocol groups
describeConsumerGroupsAndVerify(
t, ac, []string{groupNew1, groupNew2}, "consumer", groupIDToClientIDToPartitions, groupIDToType,
)

// Test 2: Describe classic protocol groups
describeConsumerGroupsAndVerify(
t, ac, []string{groupClassic1, groupClassic2}, "classic", groupIDToClientIDToPartitions, groupIDToType,
)

// Test 3: Describe mixed protocol groups
describeConsumerGroupsAndVerify(
t, ac, []string{groupNew1, groupNew2, groupClassic1, groupClassic2}, "mixed", groupIDToClientIDToPartitions, groupIDToType,
)
}

func createConsumer(t *testing.T, topic, groupID, protocol, clientID string) (*Consumer, error) {
config := &ConfigMap{
"bootstrap.servers": testconf.Brokers,
"group.id": groupID,
"auto.offset.reset": "earliest",
"enable.auto.offset.store": false,
"partition.assignment.strategy": "range",
"group.protocol": protocol,
"client.id": clientID,
}
config.updateFromTestconf()

consumer, err := NewConsumer(config)
if err != nil {
return nil, fmt.Errorf("failed to create consumer for group %s: %v", groupID, err)
}

consumer.Subscribe(topic, nil)
consumer.Poll(10 * 1000) // Trigger rebalance
return consumer, nil
}

func describeConsumerGroupsAndVerify(
t *testing.T, ac *AdminClient, groupIDs []string, scenario string, clientIDToPartitions map[string]map[string][]TopicPartition, groupIDToType map[string]ConsumerGroupType,
) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

groupDescResult, err := ac.DescribeConsumerGroups(ctx, groupIDs, SetAdminRequestTimeout(30*time.Second))
if err != nil {
t.Fatalf("Error describing consumer groups in scenario %s: %v", scenario, err)
}

groupDescs := groupDescResult.ConsumerGroupDescriptions
if len(groupDescs) != len(groupIDs) {
t.Fatalf("Expected %d group descriptions, got %d in scenario %s", len(groupIDs), len(groupDescs), scenario)
}

for _, groupDesc := range groupDescs {
clientID := groupDesc.Members[0].ClientID
partitions := clientIDToPartitions[clientID]
expectedGroupType := groupIDToType[groupDesc.GroupID]

if len(groupDesc.Members) != 1 {
t.Errorf("Expected exactly one member for group %s in scenario %s", groupDesc.GroupID, scenario)
}

// Verify partition assignment and group type
if len(partitions) > 0 && !checkGroupDesc(&groupDesc, ConsumerGroupStateStable, groupDesc.GroupID, "range", clientIDToPartitions[groupDesc.GroupID]) {
t.Errorf("Group %s description does not match expected result in scenario %s: %v", groupDesc.GroupID, scenario, groupDesc)
}

// Verify group type
if expectedGroupType != groupDesc.Type {
t.Errorf("Expected group %s to have type %s, got %s", groupDesc.GroupID, expectedGroupType.String(), groupDesc.Type)
}
}
}

// TestAdminClient_DescribeConsumerGroupsAuthorizedOperations validates the
// working of the DescribeConsumerGroups API of the admin client for fetching
Expand Down