-
Notifications
You must be signed in to change notification settings - Fork 101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Postpone removing records until assignment is made. #1053
base: series/3.x
Are you sure you want to change the base?
Changes from 1 commit
bf68fd7
053f30b
4c4563a
5c12be1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -135,13 +135,36 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
res.start.void | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private[this] def assigned(assigned: SortedSet[TopicPartition]): F[Unit] = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ref | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.updateAndGet(_.withRebalancing(false)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.flatMap { state => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log(AssignedPartitions(assigned, state)) >> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
state.onRebalances.foldLeft(F.unit)(_ >> _.onAssigned(assigned)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private[this] def assigned(assigned: SortedSet[TopicPartition]): F[Unit] = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def withState[A] = StateT.apply[Id, State[F, K, V], A](_) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val removeRevokedRecords = withState { st => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val assignments = st.assignments | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val revokedRecords = st.records.filterKeysStrict(!assignments(_)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (revokedRecords.nonEmpty) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val newState = st.withoutRecords(revokedRecords.keySet) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val action = logging.log(RemovedRevokedRecords(revokedRecords, newState)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(newState, action) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else (st, F.unit) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
action <- ref.modify { state => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
removeRevokedRecords.run( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
state | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.withRebalancing(false) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.withAssignments(assigned) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
updatedState <- ref.get | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
_ <- action >> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log(AssignedPartitions(assigned, updatedState)) >> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
updatedState.onRebalances.foldLeft(F.unit)(_ >> _.onAssigned(assigned)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} yield () | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of a separate
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I didn't notice it could be done this way. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private[this] def revoked(revoked: SortedSet[TopicPartition]): F[Unit] = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def withState[A] = StateT.apply[Id, State[F, K, V], A](_) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -175,44 +198,29 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else (st, F.unit) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def removeRevokedRecords(revokedNonFetches: SortedSet[TopicPartition]) = withState { st => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (revokedNonFetches.nonEmpty) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val revokedRecords = st.records.filterKeysStrict(revokedNonFetches) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (revokedRecords.nonEmpty) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val newState = st.withoutRecords(revokedRecords.keySet) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val action = logging.log(RemovedRevokedRecords(revokedRecords, newState)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(newState, action) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else (st, F.unit) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else (st, F.unit) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ref | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.modify { state => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val withRebalancing = state.withRebalancing(true) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val updatedState = state | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.withRebalancing(true) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.withoutAssignments(revoked) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val fetches = withRebalancing.fetches.keySetStrict | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val records = withRebalancing.records.keySetStrict | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val fetches = updatedState.fetches.keySetStrict | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val records = updatedState.records.keySetStrict | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val revokedFetches = revoked intersect fetches | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val revokedNonFetches = revoked diff revokedFetches | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val withRecords = records intersect revokedFetches | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val withoutRecords = revokedFetches diff records | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(for { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
completeWithRecords <- completeWithRecords(withRecords) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
completeWithoutRecords <- completeWithoutRecords(withoutRecords) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
removeRevokedRecords <- removeRevokedRecords(revokedNonFetches) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't you clean up the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Described in above comment. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} yield RevokedResult( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logRevoked = logging.log(RevokedPartitions(revoked, withRebalancing)), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logRevoked = logging.log(RevokedPartitions(revoked, updatedState)), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
completeWithRecords = completeWithRecords, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
completeWithoutRecords = completeWithoutRecords, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
removeRevokedRecords = removeRevokedRecords, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
onRebalances = withRebalancing.onRebalances | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
)).run(withRebalancing) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
onRebalances = updatedState.onRebalances | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
)).run(updatedState) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.flatMap { res => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val onRevoked = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -221,7 +229,6 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
res.logRevoked >> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
res.completeWithRecords >> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
res.completeWithoutRecords >> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
res.removeRevokedRecords >> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
onRevoked | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -276,7 +283,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def pollConsumer(state: State[F, K, V]): F[ConsumerRecords] = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
withConsumer | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.blocking { consumer => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val assigned = consumer.assignment.toSet | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val assigned = state.assignments | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please explain how your changes address the issue? The current codebase seems it doesn't have any flaw with managing assigned partitions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Described above, in above comment 👍 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val requested = state.fetches.keySetStrict | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
val available = state.records.keySetStrict | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -426,7 +433,6 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logRevoked: F[Unit], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
completeWithRecords: F[Unit], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
completeWithoutRecords: F[Unit], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
removeRevokedRecords: F[Unit], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
onRebalances: Chain[OnRebalance[F]] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -483,6 +489,7 @@ private[kafka] object KafkaConsumerActor { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
type StreamId = Int | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final case class State[F[_], K, V]( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assignments: Set[TopicPartition], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did it this way because I experienced deadlock when I tried to do so with |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
fetches: Map[TopicPartition, Map[StreamId, FetchRequest[F, K, V]]], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
records: Map[TopicPartition, NonEmptyVector[CommittableConsumerRecord[F, K, V]]], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pendingCommits: Chain[Request.Commit[F]], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -491,6 +498,23 @@ private[kafka] object KafkaConsumerActor { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
subscribed: Boolean, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
streaming: Boolean | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/** Add new assignments to state. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* @param assignments assignments to add | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* @return updated state with assignments added | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def withAssignments(assignments: Set[TopicPartition]): State[F, K, V] = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
copy(assignments = this.assignments ++ assignments) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/** Remove assignments to state. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* @param assignments assignments to remove | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* @return updated state with assignments removed | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def withoutAssignments(assignments: Set[TopicPartition]): State[F, K, V] = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
copy(assignments = this.assignments diff assignments) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def withOnRebalance(onRebalance: OnRebalance[F]): State[F, K, V] = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
copy(onRebalances = onRebalances append onRebalance) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -575,6 +599,7 @@ private[kafka] object KafkaConsumerActor { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
object State { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def empty[F[_], K, V]: State[F, K, V] = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
State( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assignments = Set.empty, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
fetches = Map.empty, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
records = Map.empty, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
pendingCommits = Chain.empty, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the documentation, the
assigned
parameter only includes newly added partitions, so there is no need to revoke anything. It appears to be an unnecessary sanity check.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Firstly RebalanceListener calls a method to revoke partitions. Previously in this step revoked partitions had records removed. The problem here was lying in combination of Greedy rebalance protocols + paused partitions. Strangely, when paused partitions are cleared, fetching new records does not start from latest committed offset. It started from saved locally position. It caused the following problem: paused partitions records were revoked here, and when fetching new records started, this started from latest removed record offset + 1. This issue fortunately could be mitigated with Cooperative protocol, which on RebalanceListener methods used only revoked and only newly assigned partitions as opposed to Greedy protocol - all partitions were revoked and all partitions were assigned, discarding local state pre-rebalance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only thing I have to recall is that why I only rely on
assigned
parameter. It was long time ago and I would need to deep dive in it again.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a very valuable piece of information. Could you please add it as a comment somewhere? And is it possible to prove this behavior with the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I took additional look.
Łukasz is right about assigned partitions - assigned callback is invoked only with new assigned partitons (Coming from https://github.com/apache/kafka/blob/38103ffaa962ef5092baffb884c84f8de3568501/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L417):
but more magic happens when revoke callback is invoked (https://github.com/apache/kafka/blob/38103ffaa962ef5092baffb884c84f8de3568501/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L777):
It is really really hard to reproduce it. It mainly happens when small amount of resources are assigned comparing to workload that must be consumed.
The behaviour is as follows for paused partitions: