diff --git a/front-end/src/api/subscriptions.js b/front-end/src/api/subscriptions.js
index 72aa641f..b288c248 100644
--- a/front-end/src/api/subscriptions.js
+++ b/front-end/src/api/subscriptions.js
@@ -59,7 +59,15 @@ export function deleteSubscriptionOnCluster(cluster, persistent, tenantNamespace
method: 'delete'
})
}
-
+/**
+ * peek message from cluster
+ * @param {*} cluster
+ * @param {*} persistent
+ * @param {*} tenantNamespaceTopic
+ * @param {*} subName
+ * @param {*} messagePosition
+ * @returns
+ */
export function peekMessagesOnCluster(cluster, persistent, tenantNamespaceTopic, subName, messagePosition) {
return request({
headers: {
diff --git a/front-end/src/api/topics.js b/front-end/src/api/topics.js
index daf99468..77108f93 100644
--- a/front-end/src/api/topics.js
+++ b/front-end/src/api/topics.js
@@ -63,14 +63,26 @@ export function fetchTopicStats(persistent, tenantNamespaceTopic) {
method: 'get'
})
}
-
+/**
+ * topic detail
+ * @param {*}} persistent
+ * @param {*} tenantNamespaceTopic
+ * @returns
+ */
export function fetchTopicStatsInternal(persistent, tenantNamespaceTopic) {
return request({
url: BASE_URL_V2 + `/${persistent}/${tenantNamespaceTopic}/internalStats`,
method: 'get'
})
}
-
+/**
+ * topic stat
+ * topic stat
+ * @param {*} persistent
+ * @param {*} tenantNamespaceTopic
+ * @param {*} perPartition
+ * @returns
+ */
export function fetchPartitionTopicStats(persistent, tenantNamespaceTopic, perPartition) {
return request({
url: BASE_URL_V2 + `/${persistent}/${tenantNamespaceTopic}/partitioned-stats?perPartition=${perPartition}`,
@@ -233,7 +245,14 @@ export function unloadOnCluster(cluster, persistent, tenantNamespaceTopic) {
method: 'put'
})
}
-
+/**
+ * skip message
+ * @param {*} persistent
+ * @param {*} tenantNamespaceTopic
+ * @param {*} subName
+ * @param {*} numMessages
+ * @returns
+ */
export function skip(persistent, tenantNamespaceTopic, subName, numMessages) {
return request({
url: BASE_URL_V2 + `/${persistent}/${tenantNamespaceTopic}/subscription/${subName}/skip/${numMessages}`,
@@ -298,7 +317,14 @@ export function expireMessagesAllSubscriptionsOnCluster(cluster, persistent, ten
method: 'post'
})
}
-
+/**
+ * peek message
+ * @param {*} persistent
+ * @param {*} tenantNamespaceTopic
+ * @param {*} subName
+ * @param {*} messagePosition
+ * @returns
+ */
export function peekMessages(persistent, tenantNamespaceTopic, subName, messagePosition) {
return request({
url: BASE_URL_V2 + `/${persistent}/${tenantNamespaceTopic}/subscription/${subName}/position/${messagePosition}`,
diff --git a/front-end/src/lang/en.js b/front-end/src/lang/en.js
index 6e359dab..360f1513 100644
--- a/front-end/src/lang/en.js
+++ b/front-end/src/lang/en.js
@@ -295,7 +295,10 @@ export default {
outBytes: 'Out Throughput',
storageSize: 'Storage Size',
enabled: 'Enabled',
- disabled: 'Disabled'
+ disabled: 'Disabled',
+ msgBacklog: 'Backlog',
+ numberOfEntries: 'All Entries',
+ unackedMessages: 'Unack Msg'
},
tenant: {
label: 'Tenant',
diff --git a/front-end/src/views/management/subscriptions/subscription.vue b/front-end/src/views/management/subscriptions/subscription.vue
index 2feab851..96568816 100644
--- a/front-end/src/views/management/subscriptions/subscription.vue
+++ b/front-end/src/views/management/subscriptions/subscription.vue
@@ -471,10 +471,15 @@ export default {
})
return
}
+ // If it is a partition topic, you can only view the messages under one of the partitions. For the specific reason, see'Pulsar PR[8181]'.
+ var tenantNamespaceTopic = this.tenantNamespaceTopic
+ if (this.postForm.persistent === 'persistent' && parseInt(this.postForm.partition) !== -1) {
+ tenantNamespaceTopic = tenantNamespaceTopic + '-partition-' + this.postForm.partition
+ }
peekMessagesOnCluster(
this.getCurrentCluster(),
this.postForm.persistent,
- this.tenantNamespaceTopic,
+ tenantNamespaceTopic,
this.postForm.subscription,
this.form.peekNumMessages).then(response => {
if (!response.data) return
diff --git a/front-end/src/views/management/topics/partitionedTopic.vue b/front-end/src/views/management/topics/partitionedTopic.vue
index 82772314..90c21210 100644
--- a/front-end/src/views/management/topics/partitionedTopic.vue
+++ b/front-end/src/views/management/topics/partitionedTopic.vue
@@ -17,6 +17,7 @@
+
@@ -32,6 +33,7 @@
+
@@ -56,6 +58,7 @@
+
+
+
+
{{ $t('topic.subscription.subscriptions') }}
+
+
+
{{ $t('topic.policy.authentication') }}
@@ -269,6 +278,7 @@
{{ $t('topic.deleteTopic') }}
+
@@ -311,6 +321,7 @@
import { fetchTenants } from '@/api/tenants'
import { fetchNamespaces, getClusters } from '@/api/namespaces'
import {
+ fetchTopicStatsInternal,
fetchPartitionTopicStats,
deletePartitionTopicOnCluster,
expireMessagesAllSubscriptionsOnCluster,
@@ -457,6 +468,7 @@ export default {
}
},
initTopicStats() {
+ // get topic stat detail
fetchPartitionTopicStats(this.postForm.persistent, this.tenantNamespaceTopic, true).then(response => {
if (!response.data) return
this.partitionTopicStats = []
@@ -464,11 +476,18 @@ export default {
inMsg: numberFormatter(response.data.msgRateIn, 2),
outMsg: numberFormatter(response.data.msgRateOut, 2),
inBytes: formatBytes(response.data.msgThroughputIn),
- outBytes: formatBytes(response.data.msgThroughputOut)
+ outBytes: formatBytes(response.data.msgThroughputOut),
+ // The total number of topics
+ numberOfEntries: 0,
+ // Max backlog
+ msgBacklog: 0,
+ // Max number of messages without ACK
+ unackedMessages: 0
})
var prefix = this.postForm.persistent + '://' + this.tenantNamespaceTopic
var tempPartitionsList = Object.keys(response.data.partitions)
this.partitionsList = []
+ // Traverse each partition
for (var i = 0; i < tempPartitionsList.length; i++) {
var key = prefix + '-partition-' + i
if (response.data.partitions.hasOwnProperty(key)) {
@@ -488,6 +507,15 @@ export default {
'storageSize': formatBytes(response.data.partitions[key].storageSize, 0),
'partitionTopicLink': '/management/topics/' + this.postForm.persistent + '/' + this.tenantNamespaceTopic + '-partition-' + i + '/topic'
})
+
+ // Calculate the sum of each partition message
+ fetchTopicStatsInternal(this.postForm.persistent, this.postForm.tenant + '/' + this.postForm.namespace + '/' + partition).then(response => {
+ if (!response.data) return
+ // is number
+ if (typeof response.data.numberOfEntries === 'number' && !isNaN(response.data.numberOfEntries)) {
+ this.partitionTopicStats[0].numberOfEntries = this.partitionTopicStats[0].numberOfEntries + response.data.numberOfEntries
+ }
+ })
}
}
var index = 0
@@ -497,17 +525,32 @@ export default {
var type = 'Exclusive'
var children = []
for (var j in response.data.partitions) {
+ // tenant + namespace + partition
var subSplitPartition = j.split('://')
+ // partition
var subPartition = subSplitPartition[1].split('/')[2]
if (response.data.partitions[j].hasOwnProperty('subscriptions')) {
for (var p in response.data.partitions[j].subscriptions) {
if (p === s) {
+ // msgBacklog is number
+ if (typeof response.data.partitions[j].subscriptions[p].msgBacklog === 'number' && !isNaN(response.data.partitions[j].subscriptions[p].msgBacklog)) {
+ if (this.partitionTopicStats[0].msgBacklog < response.data.partitions[j].subscriptions[p].msgBacklog) {
+ this.partitionTopicStats[0].msgBacklog = response.data.partitions[j].subscriptions[p].msgBacklog
+ }
+ }
+ // unackedMessages is number
+ if (typeof response.data.partitions[j].subscriptions[p].unackedMessages === 'number' && !isNaN(response.data.partitions[j].subscriptions[p].unackedMessages)) {
+ if (this.partitionTopicStats[0].unackedMessages < response.data.partitions[j].subscriptions[p].unackedMessages) {
+ this.partitionTopicStats[0].unackedMessages = response.data.partitions[j].subscriptions[p].unackedMessages
+ }
+ }
children.push({
'id': 1000000 * (index + 1) + j,
'subscription': subPartition,
'outMsg': numberFormatter(response.data.partitions[j].subscriptions[p].msgRateOut, 2),
'outBytes': formatBytes(response.data.partitions[j].subscriptions[p].msgThroughputOut),
'msgExpired': numberFormatter(response.data.partitions[j].subscriptions[p].msgRateExpired, 2),
+ // Backlog of messages in the topic
'backlog': response.data.partitions[j].subscriptions[p].msgBacklog,
'type': response.data.partitions[j].subscriptions[p].type,
'subscriptionLink': '/management/subscriptions/' + this.postForm.persistent + '/' + subSplitPartition[1] + '/' + s + '/subscription',
diff --git a/front-end/src/views/management/topics/topic.vue b/front-end/src/views/management/topics/topic.vue
index f45b53b9..c210f82c 100644
--- a/front-end/src/views/management/topics/topic.vue
+++ b/front-end/src/views/management/topics/topic.vue
@@ -50,6 +50,7 @@
+
@@ -344,6 +345,8 @@
+
+
@@ -461,6 +464,8 @@
+
+
{{ $t('topic.policy.authentication') }}
@@ -513,6 +518,7 @@
{{ $t('topic.deleteTopic') }}
+
@@ -845,6 +851,7 @@ export default {
})
}
}
+ // Total number of messages tracked
this.entries = numberFormatter(response.data.numberOfEntries, 0)
for (var c in response.data.cursors) {
this.cursorsList.push({