Skip to content

Commit

Permalink
kgo: add LeaveGroupContext
Browse files Browse the repository at this point in the history
This allows more control over timeouts when leaving a group or closing
the client, and also gives more insight into group leave errors.

Closes #556.
  • Loading branch information
twmb committed Sep 21, 2023
1 parent 5f5c721 commit 4275edd
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 50 deletions.
15 changes: 12 additions & 3 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,10 @@ func (cl *Client) CloseAllowingRebalance() {
cl.Close()
}

// Close leaves any group and closes all connections and goroutines.
// Close leaves any group and closes all connections and goroutines. This
// function waits for the group to be left. If you want to force leave a group
// immediately and ensure a speedy shutdown you can use LeaveGroupContext first
// (and then Close will be immediate).
//
// If you are group consuming and have overridden the default
// OnPartitionsRevoked, you must manually commit offsets before closing the
Expand All @@ -942,6 +945,10 @@ func (cl *Client) CloseAllowingRebalance() {
// notification of revoked partitions. If you want to automatically allow
// rebalancing, use CloseAllowingRebalance.
func (cl *Client) Close() {
cl.close(cl.ctx)
}

func (cl *Client) close(ctx context.Context) (rerr error) {
defer cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(HookClientClosed); ok {
h.OnClientClosed(cl)
Expand All @@ -951,7 +958,7 @@ func (cl *Client) Close() {
c := &cl.consumer
c.kill.Store(true)
if c.g != nil {
cl.LeaveGroup()
rerr = cl.LeaveGroupContext(ctx)
} else if c.d != nil {
c.mu.Lock() // lock for assign
c.assignPartitions(nil, assignInvalidateAll, nil, "") // we do not use a log message when not in a group
Expand All @@ -963,7 +970,7 @@ func (cl *Client) Close() {
// loopFetch from starting. Assigning also waits for the prior session
// to be complete, meaning loopFetch cannot be running.

sessCloseCtx, sessCloseCancel := context.WithTimeout(cl.ctx, time.Second)
sessCloseCtx, sessCloseCancel := context.WithTimeout(ctx, time.Second)
var wg sync.WaitGroup
cl.allSinksAndSources(func(sns sinkAndSource) {
if sns.source.session.id != 0 {
Expand Down Expand Up @@ -1015,6 +1022,8 @@ func (cl *Client) Close() {
closing.Close()
}
}

return rerr
}

// Request issues a request to Kafka, waiting for and returning the response.
Expand Down
146 changes: 99 additions & 47 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,36 +150,82 @@ type groupConsumer struct {
// We set this once to manage the group lifecycle once.
managing bool

dying bool // set when closing, read in findNewAssignments
dying bool // set when closing, read in findNewAssignments
left chan struct{}
leaveErr error // set before left is closed
}

// LeaveGroup leaves a group. Close automatically leaves the group, so this is
// only necessary to call if you plan to leave the group but continue to use
// the client. If a rebalance is in progress, this function waits for the
// rebalance to complete before the group can be left. This is necessary to
// allow you to safely issue one final offset commit in OnPartitionsRevoked. If
// you have overridden the default revoke, you must manually commit offsets
// before leaving the group.
//
// If you have configured the group with an InstanceID, this does not leave the
// group. With instance IDs, it is expected that clients will restart and
// re-use the same instance ID. To leave a group using an instance ID, you must
// manually issue a kmsg.LeaveGroupRequest or use an external tool (kafka
// scripts or kcl).
//
// It is recommended to use LeaveGroupContext to see if the leave was
// successful.
func (cl *Client) LeaveGroup() {
cl.LeaveGroupContext(cl.ctx)
}

// LeaveGroup leaves a group if in one. Calling the client's Close function
// also leaves a group, so this is only necessary to call if you plan to leave
// the group and continue using the client. Note that if a rebalance is in
// progress, this function waits for the rebalance to complete before the group
// can be left. This is necessary to allow you to safely issue one final offset
// commit in OnPartitionsRevoked. If you have overridden the default revoke,
// you must manually commit offsets before leaving the group.
// LeaveGroup leaves a group. Close automatically leaves the group, so this is
// only necessary to call if you plan to leave the group but continue to use
// the client. If a rebalance is in progress, this function waits for the
// rebalance to complete before the group can be left. This is necessary to
// allow you to safely issue one final offset commit in OnPartitionsRevoked. If
// you have overridden the default revoke, you must manually commit offsets
// before leaving the group.
//
// The context can be used to avoid waiting for the client to leave the group.
// Not waiting may result in your client being stuck in the group and the
// partitions this client was consuming being stuck until the session timeout.
// This function returns any leave group error or context cancel error. If the
// context is nil, this immediately leaves the group and does not wait and does
// not return an error.
//
// If you have configured the group with an InstanceID, this does not leave the
// group. With instance IDs, it is expected that clients will restart and
// re-use the same instance ID. To leave a group using an instance ID, you must
// manually issue a kmsg.LeaveGroupRequest or use an external tool (kafka
// scripts or kcl).
func (cl *Client) LeaveGroup() {
func (cl *Client) LeaveGroupContext(ctx context.Context) error {
c := &cl.consumer
if c.g == nil {
return
return nil
}
var immediate bool
if ctx == nil {
var cancel func()
ctx, cancel = context.WithCancel(context.Background())
cancel()
immediate = true
}

c.waitAndAddRebalance()
c.mu.Lock() // lock for assign
c.assignPartitions(nil, assignInvalidateAll, nil, "invalidating all assignments in LeaveGroup")
wait := c.g.leave()
c.mu.Unlock()
c.unaddRebalance()
go func() {
c.waitAndAddRebalance()
c.mu.Lock() // lock for assign
c.assignPartitions(nil, assignInvalidateAll, nil, "invalidating all assignments in LeaveGroup")
c.g.leave(ctx)
c.mu.Unlock()
c.unaddRebalance()
}()

wait() // wait after we unlock
select {
case <-ctx.Done():
if immediate {
return nil
}
return ctx.Err()
case <-c.g.left:
return c.g.leaveErr
}
}

// GroupMetadata returns the current group member ID and generation, or an
Expand Down Expand Up @@ -214,6 +260,8 @@ func (c *consumer) initGroup() {
rejoinCh: make(chan string, 1),
heartbeatForceCh: make(chan func(error)),
using: make(map[string]int),

left: make(chan struct{}),
}
c.g = g
if !g.cfg.setCommitCallback {
Expand Down Expand Up @@ -411,7 +459,7 @@ func (g *groupConsumer) manage() {
}
}

func (g *groupConsumer) leave() (wait func()) {
func (g *groupConsumer) leave(ctx context.Context) {
// If g.using is nonzero before this check, then a manage goroutine has
// started. If not, it will never start because we set dying.
g.mu.Lock()
Expand All @@ -421,43 +469,47 @@ func (g *groupConsumer) leave() (wait func()) {
g.cancel()
g.mu.Unlock()

done := make(chan struct{})
if wasManaging {
// We want to wait for the manage goroutine to be done
// so that we call the user's on{Assign,RevokeLost}.
<-g.manageDone
}

go func() {
defer close(done)
if wasDead {
// If we already called leave(), then we just wait for
// the prior leave to finish and we avoid re-issuing a
// LeaveGroup request.
return
}

if wasManaging {
// We want to wait for the manage goroutine to be done
// so that we call the user's on{Assign,RevokeLost}.
<-g.manageDone
}
go func() {
defer close(g.left)

if wasDead {
// If we already called leave(), then we just wait for
// the prior leave to finish and we avoid re-issuing a
// LeaveGroup request.
if g.cfg.instanceID != nil {
return
}

if g.cfg.instanceID == nil {
g.cfg.logger.Log(LogLevelInfo, "leaving group",
"group", g.cfg.group,
"member_id", g.memberID, // lock not needed now since nothing can change it (manageDone)
)
// If we error when leaving, there is not much
// we can do. We may as well just return.
req := kmsg.NewPtrLeaveGroupRequest()
req.Group = g.cfg.group
req.MemberID = g.memberID
member := kmsg.NewLeaveGroupRequestMember()
member.MemberID = g.memberID
member.Reason = kmsg.StringPtr("client leaving group per normal operation")
req.Members = append(req.Members, member)
req.RequestWith(g.cl.ctx, g.cl)
g.cfg.logger.Log(LogLevelInfo, "leaving group",
"group", g.cfg.group,
"member_id", g.memberID, // lock not needed now since nothing can change it (manageDone)
)
// If we error when leaving, there is not much
// we can do. We may as well just return.
req := kmsg.NewPtrLeaveGroupRequest()
req.Group = g.cfg.group
req.MemberID = g.memberID
member := kmsg.NewLeaveGroupRequestMember()
member.MemberID = g.memberID
member.Reason = kmsg.StringPtr("client leaving group per normal operation")
req.Members = append(req.Members, member)

resp, err := req.RequestWith(ctx, g.cl)
if err != nil {
g.leaveErr = err
return
}
g.leaveErr = kerr.ErrorForCode(resp.ErrorCode)
}()

return func() { <-done }
}

// returns the difference of g.nowAssigned and g.lastAssigned.
Expand Down

0 comments on commit 4275edd

Please sign in to comment.