Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable deserialization of old Akka cluster messages (mixed pekko/akka cluster) #1578

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

sadekmunawar
Copy link
Contributor

Forming a cluster with Akka nodes requires the deserialization of cluster messages sent by the Akka. This commit fixes the following exception that occurs during deserialization.

[akka://[email protected]:2551] with serializer id [5] and manifest [akka.cluster.InternalClusterAction$InitJoinAck].
java.lang.IllegalArgumentException: Unknown manifest [akka.cluster.InternalClusterAction$InitJoinAck]
    at org.apache.pekko.cluster.protobuf.ClusterMessageSerializer.fromBinary(ClusterMessageSerializer.scala:156)

case _ => throw new IllegalArgumentException(s"Unknown manifest [$manifest]")
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
val updatedManifest = {
if (manifest.startsWith("akka"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only needed when migrating from Akka, may be better under a boolean guard.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, @mdedetrich once added one, and then is can be if (guard && manifest.startsWith("akka")), WDYT

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can provide a PR for this based on checking pekko.remote.accept-protocol-names config. That config is an array value and if "akka" is in the array then we can allow this check. We only need to do this config once so the boolean result can be stored as a val.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. If we decide to keep the changes in this PR, then having boolean guard would be better.

@@ -177,6 +177,14 @@ class ClusterMessageSerializerSpec extends PekkoSpec("pekko.actor.provider = clu
ClusterMessageSerializer.OldWelcomeManifest)
}

"be de-serializable with class manifests from Akka nodes" in {
val oldAkkaJoinAckManifest = s"org.apache.pekko.cluster.InternalClusterAction$$InitJoinAck"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sadekmunawar this class only needs to be supported for Akka prior to v2.6.4.

Do we really need to support clusters that are running with very old Akka releases? Ideally we would only support pretty recent Akka releases. It is a pity that Akka changed the cluster messages in a patch release (2.6.5).

// Kept for one version iteration from Akka 2.6.4 to allow rolling migration to short manifests
// can be removed in Akka 2.6.6 or later.
val OldJoinManifest = s"org.apache.pekko.cluster.InternalClusterAction$$Join"
val OldWelcomeManifest = s"org.apache.pekko.cluster.InternalClusterAction$$Welcome"
val OldLeaveManifest = s"org.apache.pekko.cluster.ClusterUserAction$$Leave"
val OldDownManifest = s"org.apache.pekko.cluster.ClusterUserAction$$Down"
val OldInitJoinManifest = s"org.apache.pekko.cluster.InternalClusterAction$$InitJoin$$"
val OldInitJoinAckManifest = s"org.apache.pekko.cluster.InternalClusterAction$$InitJoinAck"
val OldInitJoinNackManifest = s"org.apache.pekko.cluster.InternalClusterAction$$InitJoinNack"
val HeartBeatManifestPre2523 = s"org.apache.pekko.cluster.ClusterHeartbeatSender$$Heartbeat"
val HeartBeatRspManifest2523 = s"org.apache.pekko.cluster.ClusterHeartbeatSender$$HeartbeatRsp"
val OldExitingConfirmedManifest = s"org.apache.pekko.cluster.InternalClusterAction$$ExitingConfirmed"
val OldGossipStatusManifest = "org.apache.pekko.cluster.GossipStatus"
val OldGossipEnvelopeManifest = "org.apache.pekko.cluster.GossipEnvelope"
val OldClusterRouterPoolManifest = "org.apache.pekko.cluster.routing.ClusterRouterPool"

@pjfanning
Copy link
Contributor

I have updated https://cwiki.apache.org/confluence/display/PEKKO/Pekko+Akka+Compatibility and include the fact the we only support forming clusters with Akka nodes of version 2.6.5 and above.

@raboof
Copy link
Member

raboof commented Jan 10, 2025

Does that mean we can close this PR?

@pjfanning
Copy link
Contributor

I would prefer not to use this because I think trying to support Akka before v2.6.5 almost certainly will lead to us having to add extra fixes. One that I suspect that we'd need is that we might need to also allow Pekko nodes to optionally send cluster messages that look like the old Akka format and this PR does not address that.

@raboof
Copy link
Member

raboof commented Jan 10, 2025

I would prefer not to use this because I think trying to support Akka before v2.6.5 almost certainly will lead to us having to add extra fixes.

Sounds reasonable to me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants