diff --git a/lib/charms/data_platform_libs/v0/data_interfaces.py b/lib/charms/data_platform_libs/v0/data_interfaces.py
index 3ce69e155e..b331bdce83 100644
--- a/lib/charms/data_platform_libs/v0/data_interfaces.py
+++ b/lib/charms/data_platform_libs/v0/data_interfaces.py
@@ -331,7 +331,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent):
 
 # Increment this PATCH version before using `charmcraft publish-lib` or reset
 # to 0 if you are raising the major API version
-LIBPATCH = 34
+LIBPATCH = 36
 
 PYDEPS = ["ops>=2.0.0"]
 
@@ -642,8 +642,8 @@ def _move_to_new_label_if_needed(self):
             return
 
         # Create a new secret with the new label
-        old_meta = self._secret_meta
         content = self._secret_meta.get_content()
+        self._secret_uri = None
 
         # I wish we could just check if we are the owners of the secret...
         try:
@@ -651,7 +651,7 @@ def _move_to_new_label_if_needed(self):
         except ModelError as err:
             if "this unit is not the leader" not in str(err):
                 raise
-        old_meta.remove_all_revisions()
+        self.current_label = None
 
     def set_content(self, content: Dict[str, str]) -> None:
         """Setting cached secret content."""
@@ -1586,7 +1586,7 @@ def _register_secret_to_relation(
         """
         label = self._generate_secret_label(relation_name, relation_id, group)
 
-        # Fetchin the Secret's meta information ensuring that it's locally getting registered with
+        # Fetching the Secret's meta information ensuring that it's locally getting registered with
         CachedSecret(self._model, self.component, label, secret_id).meta
 
     def _register_secrets_to_relation(self, relation: Relation, params_name_list: List[str]):
@@ -2309,7 +2309,7 @@ def _secrets(self) -> dict:
         return self._cached_secrets
 
     def _get_secret(self, group) -> Optional[Dict[str, str]]:
-        """Retrieveing secrets."""
+        """Retrieving secrets."""
         if not self.app:
             return
         if not self._secrets.get(group):
@@ -3016,7 +3016,7 @@ class KafkaRequiresEvents(CharmEvents):
 # Kafka Provides and Requires
 
 
-class KafkaProvidesData(ProviderData):
+class KafkaProviderData(ProviderData):
     """Provider-side of the Kafka relation."""
 
     def __init__(self, model: Model, relation_name: str) -> None:
@@ -3059,12 +3059,12 @@ def set_zookeeper_uris(self, relation_id: int, zookeeper_uris: str) -> None:
         self.update_relation_data(relation_id, {"zookeeper-uris": zookeeper_uris})
 
 
-class KafkaProvidesEventHandlers(EventHandlers):
+class KafkaProviderEventHandlers(EventHandlers):
     """Provider-side of the Kafka relation."""
 
     on = KafkaProvidesEvents()  # pyright: ignore [reportAssignmentType]
 
-    def __init__(self, charm: CharmBase, relation_data: KafkaProvidesData) -> None:
+    def __init__(self, charm: CharmBase, relation_data: KafkaProviderData) -> None:
         super().__init__(charm, relation_data)
         # Just to keep lint quiet, can't resolve inheritance. The same happened in super().__init__() above
         self.relation_data = relation_data
@@ -3086,15 +3086,15 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None:
             )
 
 
-class KafkaProvides(KafkaProvidesData, KafkaProvidesEventHandlers):
+class KafkaProvides(KafkaProviderData, KafkaProviderEventHandlers):
     """Provider-side of the Kafka relation."""
 
     def __init__(self, charm: CharmBase, relation_name: str) -> None:
-        KafkaProvidesData.__init__(self, charm.model, relation_name)
-        KafkaProvidesEventHandlers.__init__(self, charm, self)
+        KafkaProviderData.__init__(self, charm.model, relation_name)
+        KafkaProviderEventHandlers.__init__(self, charm, self)
 
 
-class KafkaRequiresData(RequirerData):
+class KafkaRequirerData(RequirerData):
     """Requirer-side of the Kafka relation."""
 
     def __init__(
@@ -3124,12 +3124,12 @@ def topic(self, value):
         self._topic = value
 
 
-class KafkaRequiresEventHandlers(RequirerEventHandlers):
+class KafkaRequirerEventHandlers(RequirerEventHandlers):
     """Requires-side of the Kafka relation."""
 
     on = KafkaRequiresEvents()  # pyright: ignore [reportAssignmentType]
 
-    def __init__(self, charm: CharmBase, relation_data: KafkaRequiresData) -> None:
+    def __init__(self, charm: CharmBase, relation_data: KafkaRequirerData) -> None:
         super().__init__(charm, relation_data)
         # Just to keep lint quiet, can't resolve inheritance. The same happened in super().__init__() above
         self.relation_data = relation_data
@@ -3142,10 +3142,13 @@ def _on_relation_created_event(self, event: RelationCreatedEvent) -> None:
             return
 
         # Sets topic, extra user roles, and "consumer-group-prefix" in the relation
-        relation_data = {
-            f: getattr(self, f.replace("-", "_"), "")
-            for f in ["consumer-group-prefix", "extra-user-roles", "topic"]
-        }
+        relation_data = {"topic": self.relation_data.topic}
+
+        if self.relation_data.extra_user_roles:
+            relation_data["extra-user-roles"] = self.relation_data.extra_user_roles
+
+        if self.relation_data.consumer_group_prefix:
+            relation_data["consumer-group-prefix"] = self.relation_data.consumer_group_prefix
 
         self.relation_data.update_relation_data(event.relation.id, relation_data)
 
@@ -3188,7 +3191,7 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None:
             return
 
 
-class KafkaRequires(KafkaRequiresData, KafkaRequiresEventHandlers):
+class KafkaRequires(KafkaRequirerData, KafkaRequirerEventHandlers):
     """Provider-side of the Kafka relation."""
 
     def __init__(
@@ -3200,7 +3203,7 @@ def __init__(
         consumer_group_prefix: Optional[str] = None,
         additional_secret_fields: Optional[List[str]] = [],
     ) -> None:
-        KafkaRequiresData.__init__(
+        KafkaRequirerData.__init__(
             self,
             charm.model,
             relation_name,
@@ -3209,7 +3212,7 @@ def __init__(
             consumer_group_prefix,
             additional_secret_fields,
         )
-        KafkaRequiresEventHandlers.__init__(self, charm, self)
+        KafkaRequirerEventHandlers.__init__(self, charm, self)
 
 
 # Opensearch related events
diff --git a/lib/charms/data_platform_libs/v0/upgrade.py b/lib/charms/data_platform_libs/v0/upgrade.py
index 4ee2e9ff88..ef74644de4 100644
--- a/lib/charms/data_platform_libs/v0/upgrade.py
+++ b/lib/charms/data_platform_libs/v0/upgrade.py
@@ -285,7 +285,7 @@ def restart(self, event) -> None:
 
 # Increment this PATCH version before using `charmcraft publish-lib` or reset
 # to 0 if you are raising the major API version
-LIBPATCH = 15
+LIBPATCH = 16
 
 PYDEPS = ["pydantic>=1.10,<2", "poetry-core"]
 
@@ -501,7 +501,7 @@ class DataUpgrade(Object, ABC):
 
     STATES = ["recovery", "failed", "idle", "ready", "upgrading", "completed"]
 
-    on = UpgradeEvents()  # pyright: ignore [reportGeneralTypeIssues]
+    on = UpgradeEvents()  # pyright: ignore [reportAssignmentType]
 
     def __init__(
         self,
@@ -606,6 +606,21 @@ def upgrade_stack(self, stack: List[int]) -> None:
         self.peer_relation.data[self.charm.app].update({"upgrade-stack": json.dumps(stack)})
         self._upgrade_stack = stack
 
+    @property
+    def other_unit_states(self) -> list:
+        """Current upgrade state for other units.
+
+        Returns:
+            Unsorted list of upgrade states for other units.
+        """
+        if not self.peer_relation:
+            return []
+
+        return [
+            self.peer_relation.data[unit].get("state", "")
+            for unit in list(self.peer_relation.units)
+        ]
+
     @property
     def unit_states(self) -> list:
         """Current upgrade state for all units.
@@ -926,11 +941,8 @@ def on_upgrade_changed(self, event: EventBase) -> None:
             return
 
         if self.substrate == "vm" and self.cluster_state == "recovery":
-            # Only defer for vm, that will set unit states to "ready" on upgrade-charm
-            # on k8s only the upgrading unit will receive the upgrade-charm event
-            # and deferring will prevent the upgrade stack from being popped
-            logger.debug("Cluster in recovery, deferring...")
-            event.defer()
+            # skip run while in recovery. The event will be retrigged when the cluster is ready
+            logger.debug("Cluster in recovery, skip...")
             return
 
         # if all units completed, mark as complete
@@ -981,6 +993,7 @@ def on_upgrade_changed(self, event: EventBase) -> None:
             self.charm.unit == top_unit
             and top_state in ["ready", "upgrading"]
             and self.cluster_state == "ready"
+            and "upgrading" not in self.other_unit_states
         ):
             logger.debug(
                 f"{top_unit.name} is next to upgrade, emitting `upgrade_granted` event and upgrading..."
diff --git a/lib/charms/operator_libs_linux/v2/snap.py b/lib/charms/operator_libs_linux/v2/snap.py
index ef426775d8..6d4dc385a6 100644
--- a/lib/charms/operator_libs_linux/v2/snap.py
+++ b/lib/charms/operator_libs_linux/v2/snap.py
@@ -83,7 +83,7 @@
 
 # Increment this PATCH version before using `charmcraft publish-lib` or reset
 # to 0 if you are raising the major API version
-LIBPATCH = 5
+LIBPATCH = 6
 
 
 # Regex to locate 7-bit C1 ANSI sequences
@@ -584,13 +584,16 @@ def ensure(
                     "Installing snap %s, revision %s, tracking %s", self._name, revision, channel
                 )
                 self._install(channel, cohort, revision)
-            else:
+                logger.info("The snap installation completed successfully")
+            elif revision is None or revision != self._revision:
                 # The snap is installed, but we are changing it (e.g., switching channels).
                 logger.info(
                     "Refreshing snap %s, revision %s, tracking %s", self._name, revision, channel
                 )
                 self._refresh(channel=channel, cohort=cohort, revision=revision, devmode=devmode)
-            logger.info("The snap installation completed successfully")
+                logger.info("The snap refresh completed successfully")
+            else:
+                logger.info("Refresh of snap %s was unnecessary", self._name)
 
         self._update_snap_apps()
         self._state = state