diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index 86713ae1..85a27219 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -1,4 +1,3 @@
-
 ---
 apiVersion: rbac.authorization.k8s.io/v1
 kind: ClusterRole
@@ -6,174 +5,175 @@ metadata:
   creationTimestamp: null
   name: manager-role
 rules:
-- apiGroups:
-  - flinkoperator.k8s.io
-  resources:
-  - flinkclusters
-  verbs:
-  - get
-  - list
-  - watch
-  - create
-  - update
-  - patch
-  - delete
-- apiGroups:
-  - flinkoperator.k8s.io
-  resources:
-  - flinkclusters/status
-  verbs:
-  - get
-  - update
-  - patch
-- apiGroups:
-  - apps
-  resources:
-  - deployments
-  - statefulsets
-  verbs:
-  - get
-  - list
-  - watch
-  - create
-  - update
-  - patch
-  - delete
-- apiGroups:
-  - apps
-  resources:
-  - deployments/status
-  - statefulsets/status
-  verbs:
-  - get
-- apiGroups:
-  - ""
-  resources:
-  - pods
-  - secrets
-  verbs:
-  - create
-  - get
-  - list
-  - watch
-  - patch
-- apiGroups:
-    - apps
-  resources:
-    - controllerrevisions
-  verbs:
-    - get
-    - list
-    - watch
-    - create
-    - update
-    - patch
-    - delete
-- apiGroups:
-  - ""
-  resources:
-  - pods/status
-  verbs:
-  - get
-- apiGroups:
-  - ""
-  resources:
-  - services
-  verbs:
-  - get
-  - list
-  - watch
-  - create
-  - update
-  - patch
-  - delete
-- apiGroups:
-  - ""
-  resources:
-  - services/status
-  verbs:
-  - get
-- apiGroups:
-  - ""
-  resources:
-  - events
-  verbs:
-  - get
-  - list
-  - watch
-  - create
-  - update
-  - patch
-  - delete
-- apiGroups:
-  - ""
-  resources:
-  - events/status
-  verbs:
-  - get
-- apiGroups:
-  - ""
-  resources:
-  - configmaps
-  verbs:
-  - get
-  - list
-  - watch
-  - create
-  - update
-  - patch
-  - delete
-- apiGroups:
-  - batch
-  resources:
-  - jobs
-  verbs:
-  - get
-  - list
-  - watch
-  - create
-  - update
-  - patch
-  - delete
-- apiGroups:
-  - batch
-  resources:
-  - jobs/status
-  verbs:
-  - get
-- apiGroups:
-  - extensions
-  resources:
-  - ingresses
-  verbs:
-  - get
-  - list
-  - watch
-  - create
-  - update
-  - patch
-  - delete
-- apiGroups:
-  - extensions
-  resources:
-  - ingresses/status
-  verbs:
-  - get
-- apiGroups: 
-  - admissionregistration.k8s.io
-  resources:
-  - mutatingwebhookconfigurations
-  - validatingwebhookconfigurations
-  verbs:
-  - get
-  - create
-  - update
-  - patch
-- apiGroups:
-  - scheduling.volcano.sh
-  resources:
-  - podgroups
-  verbs:
-  - get
-  - create
-  - update
+  - apiGroups:
+      - flinkoperator.k8s.io
+    resources:
+      - flinkclusters
+    verbs:
+      - get
+      - list
+      - watch
+      - create
+      - update
+      - patch
+      - delete
+  - apiGroups:
+      - flinkoperator.k8s.io
+    resources:
+      - flinkclusters/status
+    verbs:
+      - get
+      - update
+      - patch
+  - apiGroups:
+      - apps
+    resources:
+      - deployments
+      - statefulsets
+    verbs:
+      - get
+      - list
+      - watch
+      - create
+      - update
+      - patch
+      - delete
+  - apiGroups:
+      - apps
+    resources:
+      - deployments/status
+      - statefulsets/status
+    verbs:
+      - get
+  - apiGroups:
+      - ""
+    resources:
+      - pods
+      - secrets
+    verbs:
+      - create
+      - get
+      - list
+      - watch
+      - patch
+  - apiGroups:
+      - apps
+    resources:
+      - controllerrevisions
+    verbs:
+      - get
+      - list
+      - watch
+      - create
+      - update
+      - patch
+      - delete
+  - apiGroups:
+      - ""
+    resources:
+      - pods/status
+    verbs:
+      - get
+  - apiGroups:
+      - ""
+    resources:
+      - services
+      - persistentvolumeclaims
+    verbs:
+      - get
+      - list
+      - watch
+      - create
+      - update
+      - patch
+      - delete
+  - apiGroups:
+      - ""
+    resources:
+      - services/status
+    verbs:
+      - get
+  - apiGroups:
+      - ""
+    resources:
+      - events
+    verbs:
+      - get
+      - list
+      - watch
+      - create
+      - update
+      - patch
+      - delete
+  - apiGroups:
+      - ""
+    resources:
+      - events/status
+    verbs:
+      - get
+  - apiGroups:
+      - ""
+    resources:
+      - configmaps
+    verbs:
+      - get
+      - list
+      - watch
+      - create
+      - update
+      - patch
+      - delete
+  - apiGroups:
+      - batch
+    resources:
+      - jobs
+    verbs:
+      - get
+      - list
+      - watch
+      - create
+      - update
+      - patch
+      - delete
+  - apiGroups:
+      - batch
+    resources:
+      - jobs/status
+    verbs:
+      - get
+  - apiGroups:
+      - extensions
+    resources:
+      - ingresses
+    verbs:
+      - get
+      - list
+      - watch
+      - create
+      - update
+      - patch
+      - delete
+  - apiGroups:
+      - extensions
+    resources:
+      - ingresses/status
+    verbs:
+      - get
+  - apiGroups:
+      - admissionregistration.k8s.io
+    resources:
+      - mutatingwebhookconfigurations
+      - validatingwebhookconfigurations
+    verbs:
+      - get
+      - create
+      - update
+      - patch
+  - apiGroups:
+      - scheduling.volcano.sh
+    resources:
+      - podgroups
+    verbs:
+      - get
+      - create
+      - update
diff --git a/controllers/flinkcluster_controller.go b/controllers/flinkcluster_controller.go
index 057bbd0c..fd5c9322 100644
--- a/controllers/flinkcluster_controller.go
+++ b/controllers/flinkcluster_controller.go
@@ -51,6 +51,7 @@ type FlinkClusterReconciler struct {
 // +kubebuilder:rbac:groups=apps,resources=deployments/status,verbs=get
 // +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
 // +kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get
+// +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
 // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
 // +kubebuilder:rbac:groups=core,resources=pods/status,verbs=get
 // +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
diff --git a/controllers/flinkcluster_converter.go b/controllers/flinkcluster_converter.go
index 2e68fe33..2e496e13 100644
--- a/controllers/flinkcluster_converter.go
+++ b/controllers/flinkcluster_converter.go
@@ -214,15 +214,6 @@ func getDesiredJobManagerStatefulSet(
 		ServiceAccountName: getServiceAccountName(serviceAccount),
 	}
 
-	var pvcs []corev1.PersistentVolumeClaim
-	if jobManagerSpec.VolumeClaimTemplates != nil {
-		pvcs = make([]corev1.PersistentVolumeClaim, len(jobManagerSpec.VolumeClaimTemplates))
-		for i, pvc := range jobManagerSpec.VolumeClaimTemplates {
-			pvc.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ToOwnerReference(flinkCluster)}
-			pvcs[i] = pvc
-		}
-	}
-
 	var jobManagerStatefulSet = &appsv1.StatefulSet{
 		ObjectMeta: metav1.ObjectMeta{
 			Namespace:       clusterNamespace,
@@ -234,7 +225,7 @@ func getDesiredJobManagerStatefulSet(
 			Replicas:             jobManagerSpec.Replicas,
 			Selector:             &metav1.LabelSelector{MatchLabels: podLabels},
 			ServiceName:          jobManagerStatefulSetName,
-			VolumeClaimTemplates: pvcs,
+			VolumeClaimTemplates: jobManagerSpec.VolumeClaimTemplates,
 			Template: corev1.PodTemplateSpec{
 				ObjectMeta: metav1.ObjectMeta{
 					Labels:      podLabels,
@@ -522,15 +513,6 @@ func getDesiredTaskManagerStatefulSet(
 		ServiceAccountName: getServiceAccountName(serviceAccount),
 	}
 
-	var pvcs []corev1.PersistentVolumeClaim
-	if taskManagerSpec.VolumeClaimTemplates != nil {
-		pvcs = make([]corev1.PersistentVolumeClaim, len(taskManagerSpec.VolumeClaimTemplates))
-		for i, pvc := range taskManagerSpec.VolumeClaimTemplates {
-			pvc.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ToOwnerReference(flinkCluster)}
-			pvcs[i] = pvc
-		}
-	}
-
 	var taskManagerStatefulSet = &appsv1.StatefulSet{
 		ObjectMeta: metav1.ObjectMeta{
 			Namespace: clusterNamespace,
@@ -543,7 +525,7 @@ func getDesiredTaskManagerStatefulSet(
 			Replicas:             &taskManagerSpec.Replicas,
 			Selector:             &metav1.LabelSelector{MatchLabels: podLabels},
 			ServiceName:          taskManagerStatefulSetName,
-			VolumeClaimTemplates: pvcs,
+			VolumeClaimTemplates: taskManagerSpec.VolumeClaimTemplates,
 			PodManagementPolicy:  "Parallel",
 			Template: corev1.PodTemplateSpec{
 				ObjectMeta: metav1.ObjectMeta{
diff --git a/controllers/flinkcluster_converter_test.go b/controllers/flinkcluster_converter_test.go
index e46542fb..6497ac08 100644
--- a/controllers/flinkcluster_converter_test.go
+++ b/controllers/flinkcluster_converter_test.go
@@ -608,15 +608,6 @@ func TestGetDesiredClusterState(t *testing.T) {
 				{
 					ObjectMeta: metav1.ObjectMeta{
 						Name: "pvc-test",
-						OwnerReferences: []metav1.OwnerReference{
-							{
-								APIVersion:         "flinkoperator.k8s.io/v1beta1",
-								Kind:               "FlinkCluster",
-								Name:               "flinkjobcluster-sample",
-								Controller:         &controller,
-								BlockOwnerDeletion: &blockOwnerDeletion,
-							},
-						},
 					},
 					Spec: v1.PersistentVolumeClaimSpec{
 						AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
diff --git a/controllers/flinkcluster_observer.go b/controllers/flinkcluster_observer.go
index b6c38963..8c48fcb3 100644
--- a/controllers/flinkcluster_observer.go
+++ b/controllers/flinkcluster_observer.go
@@ -49,21 +49,22 @@ type ClusterStateObserver struct {
 
 // ObservedClusterState holds observed state of a cluster.
 type ObservedClusterState struct {
-	cluster           *v1beta1.FlinkCluster
-	revisions         []*appsv1.ControllerRevision
-	configMap         *corev1.ConfigMap
-	jmStatefulSet     *appsv1.StatefulSet
-	jmService         *corev1.Service
-	jmIngress         *extensionsv1beta1.Ingress
-	tmStatefulSet     *appsv1.StatefulSet
-	job               *batchv1.Job
-	jobPod            *corev1.Pod
-	flinkJobStatus    FlinkJobStatus
-	flinkJobSubmitLog *FlinkJobSubmitLog
-	savepoint         *flinkclient.SavepointStatus
-	revisionStatus    *RevisionStatus
-	savepointErr      error
-	observeTime       time.Time
+	cluster                *v1beta1.FlinkCluster
+	revisions              []*appsv1.ControllerRevision
+	configMap              *corev1.ConfigMap
+	jmStatefulSet          *appsv1.StatefulSet
+	jmService              *corev1.Service
+	jmIngress              *extensionsv1beta1.Ingress
+	tmStatefulSet          *appsv1.StatefulSet
+	persistentVolumeClaims *corev1.PersistentVolumeClaimList
+	job                    *batchv1.Job
+	jobPod                 *corev1.Pod
+	flinkJobStatus         FlinkJobStatus
+	flinkJobSubmitLog      *FlinkJobSubmitLog
+	savepoint              *flinkclient.SavepointStatus
+	revisionStatus         *RevisionStatus
+	savepointErr           error
+	observeTime            time.Time
 }
 
 type FlinkJobStatus struct {
@@ -196,6 +197,10 @@ func (observer *ClusterStateObserver) observe(
 	// Savepoint observe error do not affect deploy reconciliation loop.
 	observer.observeSavepoint(observed)
 
+	var pvcs = new(corev1.PersistentVolumeClaimList)
+	observer.observePersistentVolumeClaims(pvcs)
+	observed.persistentVolumeClaims = pvcs
+
 	// (Optional) job.
 	err = observer.observeJob(observed)
 
@@ -524,6 +529,31 @@ func (observer *ClusterStateObserver) observeJobPod(
 	return nil
 }
 
+func (observer *ClusterStateObserver) observePersistentVolumeClaims(
+	observedClaims *corev1.PersistentVolumeClaimList) error {
+	var log = observer.log
+	var clusterNamespace = observer.request.Namespace
+	var clusterName = observer.request.Name
+	var selector = labels.SelectorFromSet(map[string]string{"cluster": clusterName})
+
+	var err = observer.k8sClient.List(
+		observer.context,
+		observedClaims,
+		client.InNamespace(clusterNamespace),
+		client.MatchingLabelsSelector{Selector: selector})
+	if err != nil {
+		if client.IgnoreNotFound(err) != nil {
+			log.Error(err, "Failed to get persistent volume claim list")
+			return err
+		}
+		log.Info("Observed persistent volume claim list", "state", "nil")
+	} else {
+		log.Info("Observed persistent volume claim list", "state", *observedClaims)
+	}
+
+	return nil
+}
+
 type RevisionStatus struct {
 	currentRevision *appsv1.ControllerRevision
 	nextRevision    *appsv1.ControllerRevision
diff --git a/controllers/flinkcluster_reconciler.go b/controllers/flinkcluster_reconciler.go
index b45ce669..4f987116 100644
--- a/controllers/flinkcluster_reconciler.go
+++ b/controllers/flinkcluster_reconciler.go
@@ -109,7 +109,15 @@ func (reconciler *ClusterReconciler) reconcile() (ctrl.Result, error) {
 		return ctrl.Result{}, err
 	}
 
+	err = reconciler.reconcilePersistentVolumeClaims()
+	if err != nil {
+		return ctrl.Result{}, err
+	}
+
 	result, err := reconciler.reconcileJob()
+	if err != nil {
+		return ctrl.Result{}, err
+	}
 
 	return result, nil
 }
@@ -444,6 +452,51 @@ func (reconciler *ClusterReconciler) deleteConfigMap(
 	return err
 }
 
+func (reconciler *ClusterReconciler) reconcilePersistentVolumeClaims() error {
+	observed := reconciler.observed
+	pvcs := observed.persistentVolumeClaims
+	jm := observed.jmStatefulSet
+	tm := observed.tmStatefulSet
+
+	for _, pvc := range pvcs.Items {
+		if c, ok := pvc.Labels["component"]; ok && c == "jobmanager" && jm != nil {
+			reconciler.reconcilePersistentVolumeClaim(&pvc, jm)
+		}
+		if c, ok := pvc.Labels["component"]; ok && c == "taskmanager" && tm != nil {
+			reconciler.reconcilePersistentVolumeClaim(&pvc, tm)
+		}
+	}
+
+	return nil
+}
+
+func (reconciler *ClusterReconciler) reconcilePersistentVolumeClaim(pvc *corev1.PersistentVolumeClaim, sset *appsv1.StatefulSet) error {
+	var log = reconciler.log
+	ctx := reconciler.context
+	k8sClient := reconciler.k8sClient
+
+	if len(pvc.GetOwnerReferences()) != 0 {
+		return nil
+	}
+
+	patch := fmt.Sprintf(
+		`{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":false}],"uid":"%s"}}`,
+		sset.APIVersion,
+		sset.Kind,
+		sset.GetName(),
+		sset.GetUID(),
+		pvc.GetUID(),
+	)
+	err := k8sClient.Patch(ctx, pvc, client.RawPatch(types.MergePatchType, []byte(patch)))
+	if err != nil {
+		log.Error(err, "Failed to update PersistentVolumeClaim")
+	} else {
+		log.Info("PersistentVolumeClaim patched")
+	}
+
+	return err
+}
+
 func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) {
 	var log = reconciler.log
 	var desiredJob = reconciler.desired.Job