diff --git a/front-end/src/api/subscriptions.js b/front-end/src/api/subscriptions.js index 72aa641f..b288c248 100644 --- a/front-end/src/api/subscriptions.js +++ b/front-end/src/api/subscriptions.js @@ -59,7 +59,15 @@ export function deleteSubscriptionOnCluster(cluster, persistent, tenantNamespace method: 'delete' }) } - +/** + * peek message from cluster + * @param {*} cluster + * @param {*} persistent + * @param {*} tenantNamespaceTopic + * @param {*} subName + * @param {*} messagePosition + * @returns + */ export function peekMessagesOnCluster(cluster, persistent, tenantNamespaceTopic, subName, messagePosition) { return request({ headers: { diff --git a/front-end/src/api/topics.js b/front-end/src/api/topics.js index daf99468..77108f93 100644 --- a/front-end/src/api/topics.js +++ b/front-end/src/api/topics.js @@ -63,14 +63,26 @@ export function fetchTopicStats(persistent, tenantNamespaceTopic) { method: 'get' }) } - +/** + * topic detail + * @param {*}} persistent + * @param {*} tenantNamespaceTopic + * @returns + */ export function fetchTopicStatsInternal(persistent, tenantNamespaceTopic) { return request({ url: BASE_URL_V2 + `/${persistent}/${tenantNamespaceTopic}/internalStats`, method: 'get' }) } - +/** + * topic stat + * topic stat + * @param {*} persistent + * @param {*} tenantNamespaceTopic + * @param {*} perPartition + * @returns + */ export function fetchPartitionTopicStats(persistent, tenantNamespaceTopic, perPartition) { return request({ url: BASE_URL_V2 + `/${persistent}/${tenantNamespaceTopic}/partitioned-stats?perPartition=${perPartition}`, @@ -233,7 +245,14 @@ export function unloadOnCluster(cluster, persistent, tenantNamespaceTopic) { method: 'put' }) } - +/** + * skip message + * @param {*} persistent + * @param {*} tenantNamespaceTopic + * @param {*} subName + * @param {*} numMessages + * @returns + */ export function skip(persistent, tenantNamespaceTopic, subName, numMessages) { return request({ url: BASE_URL_V2 + `/${persistent}/${tenantNamespaceTopic}/subscription/${subName}/skip/${numMessages}`, @@ -298,7 +317,14 @@ export function expireMessagesAllSubscriptionsOnCluster(cluster, persistent, ten method: 'post' }) } - +/** + * peek message + * @param {*} persistent + * @param {*} tenantNamespaceTopic + * @param {*} subName + * @param {*} messagePosition + * @returns + */ export function peekMessages(persistent, tenantNamespaceTopic, subName, messagePosition) { return request({ url: BASE_URL_V2 + `/${persistent}/${tenantNamespaceTopic}/subscription/${subName}/position/${messagePosition}`, diff --git a/front-end/src/lang/en.js b/front-end/src/lang/en.js index 6e359dab..360f1513 100644 --- a/front-end/src/lang/en.js +++ b/front-end/src/lang/en.js @@ -295,7 +295,10 @@ export default { outBytes: 'Out Throughput', storageSize: 'Storage Size', enabled: 'Enabled', - disabled: 'Disabled' + disabled: 'Disabled', + msgBacklog: 'Backlog', + numberOfEntries: 'All Entries', + unackedMessages: 'Unack Msg' }, tenant: { label: 'Tenant', diff --git a/front-end/src/views/management/subscriptions/subscription.vue b/front-end/src/views/management/subscriptions/subscription.vue index 2feab851..96568816 100644 --- a/front-end/src/views/management/subscriptions/subscription.vue +++ b/front-end/src/views/management/subscriptions/subscription.vue @@ -471,10 +471,15 @@ export default { }) return } + // If it is a partition topic, you can only view the messages under one of the partitions. For the specific reason, see'Pulsar PR[8181]'. + var tenantNamespaceTopic = this.tenantNamespaceTopic + if (this.postForm.persistent === 'persistent' && parseInt(this.postForm.partition) !== -1) { + tenantNamespaceTopic = tenantNamespaceTopic + '-partition-' + this.postForm.partition + } peekMessagesOnCluster( this.getCurrentCluster(), this.postForm.persistent, - this.tenantNamespaceTopic, + tenantNamespaceTopic, this.postForm.subscription, this.form.peekNumMessages).then(response => { if (!response.data) return diff --git a/front-end/src/views/management/topics/partitionedTopic.vue b/front-end/src/views/management/topics/partitionedTopic.vue index 82772314..90c21210 100644 --- a/front-end/src/views/management/topics/partitionedTopic.vue +++ b/front-end/src/views/management/topics/partitionedTopic.vue @@ -17,6 +17,7 @@
+ @@ -32,6 +33,7 @@ +
@@ -56,6 +58,7 @@
+ + + +

{{ $t('topic.subscription.subscriptions') }}

+ + +

{{ $t('topic.policy.authentication') }} @@ -269,6 +278,7 @@
{{ $t('topic.deleteTopic') }} + @@ -311,6 +321,7 @@ import { fetchTenants } from '@/api/tenants' import { fetchNamespaces, getClusters } from '@/api/namespaces' import { + fetchTopicStatsInternal, fetchPartitionTopicStats, deletePartitionTopicOnCluster, expireMessagesAllSubscriptionsOnCluster, @@ -457,6 +468,7 @@ export default { } }, initTopicStats() { + // get topic stat detail fetchPartitionTopicStats(this.postForm.persistent, this.tenantNamespaceTopic, true).then(response => { if (!response.data) return this.partitionTopicStats = [] @@ -464,11 +476,18 @@ export default { inMsg: numberFormatter(response.data.msgRateIn, 2), outMsg: numberFormatter(response.data.msgRateOut, 2), inBytes: formatBytes(response.data.msgThroughputIn), - outBytes: formatBytes(response.data.msgThroughputOut) + outBytes: formatBytes(response.data.msgThroughputOut), + // The total number of topics + numberOfEntries: 0, + // Max backlog + msgBacklog: 0, + // Max number of messages without ACK + unackedMessages: 0 }) var prefix = this.postForm.persistent + '://' + this.tenantNamespaceTopic var tempPartitionsList = Object.keys(response.data.partitions) this.partitionsList = [] + // Traverse each partition for (var i = 0; i < tempPartitionsList.length; i++) { var key = prefix + '-partition-' + i if (response.data.partitions.hasOwnProperty(key)) { @@ -488,6 +507,15 @@ export default { 'storageSize': formatBytes(response.data.partitions[key].storageSize, 0), 'partitionTopicLink': '/management/topics/' + this.postForm.persistent + '/' + this.tenantNamespaceTopic + '-partition-' + i + '/topic' }) + + // Calculate the sum of each partition message + fetchTopicStatsInternal(this.postForm.persistent, this.postForm.tenant + '/' + this.postForm.namespace + '/' + partition).then(response => { + if (!response.data) return + // is number + if (typeof response.data.numberOfEntries === 'number' && !isNaN(response.data.numberOfEntries)) { + this.partitionTopicStats[0].numberOfEntries = this.partitionTopicStats[0].numberOfEntries + response.data.numberOfEntries + } + }) } } var index = 0 @@ -497,17 +525,32 @@ export default { var type = 'Exclusive' var children = [] for (var j in response.data.partitions) { + // tenant + namespace + partition var subSplitPartition = j.split('://') + // partition var subPartition = subSplitPartition[1].split('/')[2] if (response.data.partitions[j].hasOwnProperty('subscriptions')) { for (var p in response.data.partitions[j].subscriptions) { if (p === s) { + // msgBacklog is number + if (typeof response.data.partitions[j].subscriptions[p].msgBacklog === 'number' && !isNaN(response.data.partitions[j].subscriptions[p].msgBacklog)) { + if (this.partitionTopicStats[0].msgBacklog < response.data.partitions[j].subscriptions[p].msgBacklog) { + this.partitionTopicStats[0].msgBacklog = response.data.partitions[j].subscriptions[p].msgBacklog + } + } + // unackedMessages is number + if (typeof response.data.partitions[j].subscriptions[p].unackedMessages === 'number' && !isNaN(response.data.partitions[j].subscriptions[p].unackedMessages)) { + if (this.partitionTopicStats[0].unackedMessages < response.data.partitions[j].subscriptions[p].unackedMessages) { + this.partitionTopicStats[0].unackedMessages = response.data.partitions[j].subscriptions[p].unackedMessages + } + } children.push({ 'id': 1000000 * (index + 1) + j, 'subscription': subPartition, 'outMsg': numberFormatter(response.data.partitions[j].subscriptions[p].msgRateOut, 2), 'outBytes': formatBytes(response.data.partitions[j].subscriptions[p].msgThroughputOut), 'msgExpired': numberFormatter(response.data.partitions[j].subscriptions[p].msgRateExpired, 2), + // Backlog of messages in the topic 'backlog': response.data.partitions[j].subscriptions[p].msgBacklog, 'type': response.data.partitions[j].subscriptions[p].type, 'subscriptionLink': '/management/subscriptions/' + this.postForm.persistent + '/' + subSplitPartition[1] + '/' + s + '/subscription', diff --git a/front-end/src/views/management/topics/topic.vue b/front-end/src/views/management/topics/topic.vue index f45b53b9..c210f82c 100644 --- a/front-end/src/views/management/topics/topic.vue +++ b/front-end/src/views/management/topics/topic.vue @@ -50,6 +50,7 @@

+ @@ -344,6 +345,8 @@ + + @@ -461,6 +464,8 @@ + +

{{ $t('topic.policy.authentication') }} @@ -513,6 +518,7 @@
{{ $t('topic.deleteTopic') }} + @@ -845,6 +851,7 @@ export default { }) } } + // Total number of messages tracked this.entries = numberFormatter(response.data.numberOfEntries, 0) for (var c in response.data.cursors) { this.cursorsList.push({