Skip to content

Commit

Permalink
[KIP-848 EA] Admin API for listing consumer groups now has (#1267)
Browse files Browse the repository at this point in the history
an optional filter to return only groups of given types


Co-authored-by: mahajanadhitya <[email protected]>
  • Loading branch information
mahajanadhitya and mahajanadhitya authored Oct 10, 2024
1 parent be6f062 commit 654a47b
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 37 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

## v2.6.0

This is a feature release.
This is a feature release:

* [KIP-460](https://cwiki.apache.org/confluence/display/KAFKA/KIP-460%3A+Admin+Leader+Election+RPC) Admin Leader Election RPC (#1311)
* [KIP-848 EA](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): Admin API for listing consumer groups now has an optional filter to return only groups of given types (#1267).
* [KIP-460](https://cwiki.apache.org/confluence/display/KAFKA/KIP-460%3A+Admin+Leader+Election+RPC) Admin Leader Election RPC (#1311)

confluent-kafka-go is based on librdkafka v2.6.0, see the
[librdkafka v2.6.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.6.0)
Expand Down
69 changes: 53 additions & 16 deletions examples/admin_list_consumer_groups/admin_list_consumer_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,62 @@ package main

import (
"context"
"flag"
"fmt"
"os"
"strings"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {

if len(os.Args) < 2 {
func usage(message string, fs *flag.FlagSet) {
if message != "" {
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap-servers> [<state1> <state2> ...]\n", os.Args[0])
os.Exit(1)
message)
}
fs.Usage()
os.Exit(1)
}

bootstrapServers := os.Args[1]
var states []kafka.ConsumerGroupState
if len(os.Args) > 2 {
statesStr := os.Args[2:]
for _, stateStr := range statesStr {
state, err := kafka.ConsumerGroupStateFromString(stateStr)
if err != nil {
fmt.Fprintf(os.Stderr,
"Given state %s is not a valid state\n", stateStr)
os.Exit(1)
func parseListConsumerGroupsArgs() (
bootstrapServers string,
states []kafka.ConsumerGroupState,
types []kafka.ConsumerGroupType,
) {
var statesString, typesString string
fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
fs.StringVar(&bootstrapServers, "b", "localhost:9092", "Bootstrap servers")
fs.StringVar(&statesString, "states", "", "States to match")
fs.StringVar(&typesString, "types", "", "Types to match")
fs.Parse(os.Args[1:])

if statesString != "" {
for _, stateString := range strings.Split(statesString, ",") {
state, _ := kafka.ConsumerGroupStateFromString(stateString)
if state == kafka.ConsumerGroupStateUnknown {
usage(fmt.Sprintf("Given state %s is not a valid state\n",
stateString), fs)
}
states = append(states, state)
}
}
if typesString != "" {
for _, typeString := range strings.Split(typesString, ",") {
groupType := kafka.ConsumerGroupTypeFromString(typeString)
if groupType == kafka.ConsumerGroupTypeUnknown {
usage(fmt.Sprintf("Given type %s is not a valid type\n",
typeString), fs)
}
types = append(types, groupType)
}
}
return
}

func main() {

bootstrapServers, states, types := parseListConsumerGroupsArgs()

// Create a new AdminClient.
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})
Expand All @@ -60,8 +87,17 @@ func main() {
// Call ListConsumerGroups.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
var options []kafka.ListConsumerGroupsAdminOption
if len(states) > 0 {
options = append(options, kafka.SetAdminMatchConsumerGroupStates(states))
}
if len(types) > 0 {
options = append(options, kafka.SetAdminMatchConsumerGroupTypes(types))
}

listGroupRes, err := a.ListConsumerGroups(
ctx, kafka.SetAdminMatchConsumerGroupStates(states))
ctx,
options...)

if err != nil {
fmt.Printf("Failed to list groups with client-level error %s\n", err)
Expand All @@ -74,6 +110,7 @@ func main() {
for _, group := range groups {
fmt.Printf("GroupId: %s\n", group.GroupID)
fmt.Printf("State: %s\n", group.State)
fmt.Printf("Type: %s\n", group.Type)
fmt.Printf("IsSimpleConsumerGroup: %v\n", group.IsSimpleConsumerGroup)
fmt.Println()
}
Expand Down
37 changes: 33 additions & 4 deletions kafka/adminapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,41 @@ func (t ConsumerGroupState) String() string {
}

// ConsumerGroupStateFromString translates a consumer group state name/string to
// a ConsumerGroupStateFromString value.
// a ConsumerGroupState value.
func ConsumerGroupStateFromString(stateString string) (ConsumerGroupState, error) {
cStr := C.CString(stateString)
defer C.free(unsafe.Pointer(cStr))
state := ConsumerGroupState(C.rd_kafka_consumer_group_state_code(cStr))
return state, nil
}

// ConsumerGroupType represents a consumer group type
type ConsumerGroupType int

const (
// ConsumerGroupTypeUnknown - Unknown ConsumerGroupType
ConsumerGroupTypeUnknown ConsumerGroupType = C.RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN
// ConsumerGroupTypeConsumer - Consumer ConsumerGroupType
ConsumerGroupTypeConsumer ConsumerGroupType = C.RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER
// ConsumerGroupTypeClassic - Classic ConsumerGroupType
ConsumerGroupTypeClassic ConsumerGroupType = C.RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC
)

// String returns the human-readable representation of a ConsumerGroupType
func (t ConsumerGroupType) String() string {
return C.GoString(C.rd_kafka_consumer_group_type_name(
C.rd_kafka_consumer_group_type_t(t)))
}

// ConsumerGroupTypeFromString translates a consumer group type name/string to
// a ConsumerGroupType value.
func ConsumerGroupTypeFromString(typeString string) ConsumerGroupType {
cStr := C.CString(typeString)
defer C.free(unsafe.Pointer(cStr))
groupType := ConsumerGroupType(C.rd_kafka_consumer_group_type_code(cStr))
return groupType
}

// ConsumerGroupListing represents the result of ListConsumerGroups for a single
// group.
type ConsumerGroupListing struct {
Expand All @@ -255,6 +282,8 @@ type ConsumerGroupListing struct {
IsSimpleConsumerGroup bool
// Group state.
State ConsumerGroupState
// Group type.
Type ConsumerGroupType
}

// ListConsumerGroupsResult represents ListConsumerGroups results and errors.
Expand Down Expand Up @@ -1531,16 +1560,16 @@ func (a *AdminClient) cToConsumerGroupListings(
C.ConsumerGroupListing_by_idx(cGroups, cGroupCount, C.size_t(idx))
state := ConsumerGroupState(
C.rd_kafka_ConsumerGroupListing_state(cGroup))

groupType := ConsumerGroupType(C.rd_kafka_ConsumerGroupListing_type(cGroup))
result[idx] = ConsumerGroupListing{
GroupID: C.GoString(
C.rd_kafka_ConsumerGroupListing_group_id(cGroup)),
State: state,
IsSimpleConsumerGroup: cint2bool(
C.rd_kafka_ConsumerGroupListing_is_simple_consumer_group(cGroup)),
State: state,
Type: groupType,
}
}

return result
}

Expand Down
40 changes: 39 additions & 1 deletion kafka/adminapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,11 +444,49 @@ func testAdminAPIsListConsumerGroups(
t.Fatalf("Expected ConsumerGroupStateFromString to work for Stable state")
}

unknownGroupType := ConsumerGroupTypeFromString("Unknown")
if unknownGroupType != ConsumerGroupTypeUnknown {
t.Fatalf("%s: Expected ConsumerGroupTypeFromString to work for Unknown type", what)
}

classicGroupType := ConsumerGroupTypeFromString("Classic")
if classicGroupType != ConsumerGroupTypeClassic {
t.Fatalf("%s: Expected ConsumerGroupTypeFromString to work for Classic type", what)
}

// Test with unknown group type
ctx, cancel := context.WithTimeout(context.Background(), expDuration)
defer cancel()

listres, err := a.ListConsumerGroups(
ctx, SetAdminRequestTimeout(time.Second),
SetAdminMatchConsumerGroupStates([]ConsumerGroupState{state}))
SetAdminMatchConsumerGroupTypes([]ConsumerGroupType{unknownGroupType}))

if err.(Error).Code() != ErrInvalidArg {
t.Fatalf("%s: Expected ListConsumerGroups to fail when unknown group type option is set, but got result: %v, err: %v",
what, listres, err)
}

// Test with duplicate group type
ctx, cancel = context.WithTimeout(context.Background(), expDuration)
defer cancel()

listres, err = a.ListConsumerGroups(
ctx, SetAdminRequestTimeout(time.Second),
SetAdminMatchConsumerGroupTypes([]ConsumerGroupType{classicGroupType, classicGroupType}))

if err.(Error).Code() != ErrInvalidArg {
t.Fatalf("%s: Expected ListConsumerGroups to fail when duplicate group type options are set, but got result: %v, err: %v",
what, listres, err)
}

// Successful call
ctx, cancel = context.WithTimeout(context.Background(), expDuration)
defer cancel()
listres, err = a.ListConsumerGroups(
ctx, SetAdminRequestTimeout(time.Second),
SetAdminMatchConsumerGroupStates([]ConsumerGroupState{state}),
SetAdminMatchConsumerGroupTypes([]ConsumerGroupType{classicGroupType}))
if err == nil {
t.Fatalf("Expected ListConsumerGroups to fail, but got result: %v, err: %v",
listres, err)
Expand Down
76 changes: 63 additions & 13 deletions kafka/adminoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,68 @@ func (ao AdminOptionMatchConsumerGroupStates) apply(cOptions *C.rd_kafka_AdminOp
return nil
}

// SetAdminMatchConsumerGroupStates sets the state(s) that must be
// listed.
//
// Default: nil (lists groups in all states).
//
// Valid for ListConsumerGroups.
func SetAdminMatchConsumerGroupStates(val []ConsumerGroupState) (ao AdminOptionMatchConsumerGroupStates) {
ao.isSet = true
ao.val = val
return ao
}

// AdminOptionMatchConsumerGroupTypes decides the type(s) that must be
// listed.
//
// Default: nil (lists groups of all types).
//
// Valid for ListConsumerGroups.
type AdminOptionMatchConsumerGroupTypes struct {
isSet bool
val []ConsumerGroupType
}

func (ao AdminOptionMatchConsumerGroupTypes) supportsListConsumerGroups() {
}

func (ao AdminOptionMatchConsumerGroupTypes) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
if !ao.isSet || ao.val == nil {
return nil
}

// Convert types from Go slice to C pointer.
cTypes := make([]C.rd_kafka_consumer_group_type_t, len(ao.val))
cTypesCount := C.size_t(len(ao.val))

for idx, groupType := range ao.val {
cTypes[idx] = C.rd_kafka_consumer_group_type_t(groupType)
}

cTypesPtr := ((*C.rd_kafka_consumer_group_type_t)(&cTypes[0]))
cError := C.rd_kafka_AdminOptions_set_match_consumer_group_types(
cOptions, cTypesPtr, cTypesCount)
if cError != nil {
C.rd_kafka_AdminOptions_destroy(cOptions)
return newErrorFromCErrorDestroy(cError)
}

return nil
}

// SetAdminMatchConsumerGroupTypes set the type(s) that must be
// listed.
//
// Default: nil (lists groups of all types).
//
// Valid for ListConsumerGroups.
func SetAdminMatchConsumerGroupTypes(val []ConsumerGroupType) (ao AdminOptionMatchConsumerGroupTypes) {
ao.isSet = true
ao.val = val
return ao
}

// AdminOptionIncludeAuthorizedOperations decides if the broker should return
// authorized operations.
//
Expand Down Expand Up @@ -415,18 +477,6 @@ func SetAdminOptionIncludeAuthorizedOperations(val bool) (ao AdminOptionIncludeA
return ao
}

// SetAdminMatchConsumerGroupStates decides groups in which state(s) should be
// listed.
//
// Default: nil (lists groups in all states).
//
// Valid for ListConsumerGroups.
func SetAdminMatchConsumerGroupStates(val []ConsumerGroupState) (ao AdminOptionMatchConsumerGroupStates) {
ao.isSet = true
ao.val = val
return ao
}

// CreateTopicsAdminOption - see setters.
//
// See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly.
Expand Down Expand Up @@ -493,7 +543,7 @@ type DeleteACLsAdminOption interface {

// ListConsumerGroupsAdminOption - see setter.
//
// See SetAdminRequestTimeout, SetAdminMatchConsumerGroupStates.
// See SetAdminRequestTimeout, SetAdminMatchConsumerGroupStates, SetAdminMatchConsumerGroupTypes.
type ListConsumerGroupsAdminOption interface {
supportsListConsumerGroups()
apply(cOptions *C.rd_kafka_AdminOptions_t) error
Expand Down
Loading

0 comments on commit 654a47b

Please sign in to comment.