Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postpone removing records until assignment is made. #1053

Open
wants to merge 4 commits into
base: series/3.x
Choose a base branch
from

Conversation

MrKustra94
Copy link

Closes #1051

@MrKustra94 MrKustra94 force-pushed the MrKustra94/postpone_removing_nonrequested_records branch from d61250d to 147acd7 Compare August 18, 2022 08:45
@@ -483,6 +489,7 @@ private[kafka] object KafkaConsumerActor {
type StreamId = Int

final case class State[F[_], K, V](
assignments: Set[TopicPartition],
Copy link
Author

@MrKustra94 MrKustra94 Aug 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it this way because I experienced deadlock when I tried to do so with withConsumer(_.assignments().toSet) in onAssignedPartitions invocations.

@MrKustra94 MrKustra94 force-pushed the MrKustra94/postpone_removing_nonrequested_records branch from 147acd7 to bf68fd7 Compare October 9, 2022 22:13
@bplommer bplommer requested review from LMnet and vlovgr December 6, 2022 10:09
Comment on lines 154 to 166
for {
action <- ref.modify { state =>
removeRevokedRecords.run(
state
.withRebalancing(false)
.withAssignments(assigned)
)
}
updatedState <- ref.get
_ <- action >>
log(AssignedPartitions(assigned, updatedState)) >>
updatedState.onRebalances.foldLeft(F.unit)(_ >> _.onAssigned(assigned))
} yield ()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of a separate ref.get can we return the new state from ref.modify:

Suggested change
for {
action <- ref.modify { state =>
removeRevokedRecords.run(
state
.withRebalancing(false)
.withAssignments(assigned)
)
}
updatedState <- ref.get
_ <- action >>
log(AssignedPartitions(assigned, updatedState)) >>
updatedState.onRebalances.foldLeft(F.unit)(_ >> _.onAssigned(assigned))
} yield ()
ref
.modify { state =>
removeRevokedRecords
.run(
state
.withRebalancing(false)
.withAssignments(assigned)
)
.map(stateAndAction => stateAndAction._1 -> stateAndAction)
}
.flatMap {
case (updatedState, action) =>
action >> log(AssignedPartitions(assigned, updatedState)) >>
updatedState.onRebalances.foldLeft(F.unit)(_ >> _.onAssigned(assigned))
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I didn't notice it could be done this way.
Great thanks! 👍

private[this] def assigned(assigned: SortedSet[TopicPartition]): F[Unit] = {
def withState[A] = StateT.apply[Id, State[F, K, V], A](_)

val removeRevokedRecords = withState { st =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the documentation, the assigned parameter only includes newly added partitions, so there is no need to revoke anything. It appears to be an unnecessary sanity check.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Firstly RebalanceListener calls a method to revoke partitions. Previously in this step revoked partitions had records removed. The problem here was lying in combination of Greedy rebalance protocols + paused partitions. Strangely, when paused partitions are cleared, fetching new records does not start from latest committed offset. It started from saved locally position. It caused the following problem: paused partitions records were revoked here, and when fetching new records started, this started from latest removed record offset + 1. This issue fortunately could be mitigated with Cooperative protocol, which on RebalanceListener methods used only revoked and only newly assigned partitions as opposed to Greedy protocol - all partitions were revoked and all partitions were assigned, discarding local state pre-rebalance.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing I have to recall is that why I only rely on assigned parameter. It was long time ago and I would need to deep dive in it again.

Copy link
Member

@LMnet LMnet Aug 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very valuable piece of information. Could you please add it as a comment somewhere? And is it possible to prove this behavior with the test?

Copy link
Author

@MrKustra94 MrKustra94 Aug 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I took additional look.
Łukasz is right about assigned partitions - assigned callback is invoked only with new assigned partitons (Coming from https://github.com/apache/kafka/blob/38103ffaa962ef5092baffb884c84f8de3568501/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L417):

        SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR);
        ownedPartitions.addAll(subscriptions.assignedPartitions());

        // should at least encode the short version
        if (assignmentBuffer.remaining() < 2)
            throw new IllegalStateException("There are insufficient bytes available to read assignment from the sync-group response (" +
                "actual byte size " + assignmentBuffer.remaining() + ") , this is not expected; " +
                "it is possible that the leader's assign function is buggy and did not return any assignment for this member, " +
                "or because static member is configured and the protocol is buggy hence did not get the assignment for this member");
        
        //Some code...
        Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);

        SortedSet<TopicPartition> assignedPartitions = new TreeSet<>(COMPARATOR);
        assignedPartitions.addAll(assignment.partitions());

        //Some code...
        final AtomicReference<Exception> firstException = new AtomicReference<>(null);
        SortedSet<TopicPartition> addedPartitions = new TreeSet<>(COMPARATOR);
        addedPartitions.addAll(assignedPartitions);
        addedPartitions.removeAll(ownedPartitions);

        if (protocol == RebalanceProtocol.COOPERATIVE) {
            SortedSet<TopicPartition> revokedPartitions = new TreeSet<>(COMPARATOR);
            revokedPartitions.addAll(ownedPartitions);
            revokedPartitions.removeAll(assignedPartitions);

            log.info("Updating assignment with\n" +
                    "\tAssigned partitions:                       {}\n" +
                    "\tCurrent owned partitions:                  {}\n" +
                    "\tAdded partitions (assigned - owned):       {}\n" +
                    "\tRevoked partitions (owned - assigned):     {}\n",
                assignedPartitions,
                ownedPartitions,
                addedPartitions,
                revokedPartitions
            );
//..more code

but more magic happens when revoke callback is invoked (https://github.com/apache/kafka/blob/38103ffaa962ef5092baffb884c84f8de3568501/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L777):

switch (protocol) {
                case EAGER:
                    // revoke all partitions
                    revokedPartitions.addAll(subscriptions.assignedPartitions());
                    exception = invokePartitionsRevoked(revokedPartitions);

                    subscriptions.assignFromSubscribed(Collections.emptySet());

                    break;

                case COOPERATIVE:
                    // only revoke those partitions that are not in the subscription any more.
                    Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
                    revokedPartitions.addAll(ownedPartitions.stream()
                        .filter(tp -> !subscriptions.subscription().contains(tp.topic()))
                        .collect(Collectors.toSet()));

                    if (!revokedPartitions.isEmpty()) {
                        exception = invokePartitionsRevoked(revokedPartitions);

                        ownedPartitions.removeAll(revokedPartitions);
                        subscriptions.assignFromSubscribed(ownedPartitions);
                    }

                    break;
            }

And is it possible to prove this behavior with the test?

It is really really hard to reproduce it. It mainly happens when small amount of resources are assigned comparing to workload that must be consumed.
The behaviour is as follows for paused partitions:

//pod-2 has been started. Consumer is being prepared...
INFO o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=TestClientId, groupId=TestGroup] Setting offset for partition topic-59 to the committed offset FetchPosition{offset=1000028, offsetEpoch=Optional.empty, ...}
//pod-1 has been shutdown. Rebalance has started again and now all partitions are assigned to pod-2
INFO o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=TestClientId, groupId=TestGroup] Setting offset for partition topic-59 to the committed offset FetchPosition{offset=1000032, offsetEpoch=Optional.empty, ...}
Published record: topic-59:1000037 //Offset is not moved back to 1000032, but it should, since Range (EAGER) 
// Commit happens, setting offset to 1000038.
Published record: topic-59:1000040 //Missed records: 1000038, 1000039, which exist in the topic but were skipped by consumer.
Published record: topic-59:1000041
Published record: topic-59:1000042```


val withRecords = records intersect revokedFetches
val withoutRecords = revokedFetches diff records

(for {
completeWithRecords <- completeWithRecords(withRecords)
completeWithoutRecords <- completeWithoutRecords(withoutRecords)
removeRevokedRecords <- removeRevokedRecords(revokedNonFetches)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you clean up the record container from records that belong to revoked partitions and have no downstream sinks?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Described in above comment.
The whole fix is based on postponing removing records - we wait until assignment to make sure that we can revoke records. This is a small step handling Greedy rebalance protocols.

@@ -276,7 +286,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V](
def pollConsumer(state: State[F, K, V]): F[ConsumerRecords] =
withConsumer
.blocking { consumer =>
val assigned = consumer.assignment.toSet
val assigned = state.assignments
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain how your changes address the issue? The current codebase seems it doesn't have any flaw with managing assigned partitions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Described above, in above comment 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

4 participants