Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add statistics on the total number of topics, the maximum number of subscriptions, and the maximum number of unacknowledged subscriptions #402

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion front-end/src/api/subscriptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,15 @@ export function deleteSubscriptionOnCluster(cluster, persistent, tenantNamespace
method: 'delete'
})
}

/**
* peek message from cluster
* @param {*} cluster
* @param {*} persistent
* @param {*} tenantNamespaceTopic
* @param {*} subName
* @param {*} messagePosition
* @returns
*/
export function peekMessagesOnCluster(cluster, persistent, tenantNamespaceTopic, subName, messagePosition) {
return request({
headers: {
Expand Down
34 changes: 30 additions & 4 deletions front-end/src/api/topics.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,26 @@ export function fetchTopicStats(persistent, tenantNamespaceTopic) {
method: 'get'
})
}

/**
* topic detail
* @param {*}} persistent
* @param {*} tenantNamespaceTopic
* @returns
*/
export function fetchTopicStatsInternal(persistent, tenantNamespaceTopic) {
return request({
url: BASE_URL_V2 + `/${persistent}/${tenantNamespaceTopic}/internalStats`,
method: 'get'
})
}

/**
* topic stat
* <a href="https://pulsar.apache.org/docs/en/2.7.1/admin-api-topics/#get-stats">topic stat</a>
* @param {*} persistent
* @param {*} tenantNamespaceTopic
* @param {*} perPartition
* @returns
*/
export function fetchPartitionTopicStats(persistent, tenantNamespaceTopic, perPartition) {
return request({
url: BASE_URL_V2 + `/${persistent}/${tenantNamespaceTopic}/partitioned-stats?perPartition=${perPartition}`,
Expand Down Expand Up @@ -233,7 +245,14 @@ export function unloadOnCluster(cluster, persistent, tenantNamespaceTopic) {
method: 'put'
})
}

/**
* skip message
* @param {*} persistent
* @param {*} tenantNamespaceTopic
* @param {*} subName
* @param {*} numMessages
* @returns
*/
export function skip(persistent, tenantNamespaceTopic, subName, numMessages) {
return request({
url: BASE_URL_V2 + `/${persistent}/${tenantNamespaceTopic}/subscription/${subName}/skip/${numMessages}`,
Expand Down Expand Up @@ -298,7 +317,14 @@ export function expireMessagesAllSubscriptionsOnCluster(cluster, persistent, ten
method: 'post'
})
}

/**
* peek message
* @param {*} persistent
* @param {*} tenantNamespaceTopic
* @param {*} subName
* @param {*} messagePosition
* @returns
*/
export function peekMessages(persistent, tenantNamespaceTopic, subName, messagePosition) {
return request({
url: BASE_URL_V2 + `/${persistent}/${tenantNamespaceTopic}/subscription/${subName}/position/${messagePosition}`,
Expand Down
5 changes: 4 additions & 1 deletion front-end/src/lang/en.js
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,10 @@ export default {
outBytes: 'Out Throughput',
storageSize: 'Storage Size',
enabled: 'Enabled',
disabled: 'Disabled'
disabled: 'Disabled',
msgBacklog: 'Backlog',
numberOfEntries: 'All Entries',
unackedMessages: 'Unack Msg'
},
tenant: {
label: 'Tenant',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,10 +471,15 @@ export default {
})
return
}
// If it is a partition topic, you can only view the messages under one of the partitions. For the specific reason, see'Pulsar PR[8181]'.
var tenantNamespaceTopic = this.tenantNamespaceTopic
if (this.postForm.persistent === 'persistent' && parseInt(this.postForm.partition) !== -1) {
tenantNamespaceTopic = tenantNamespaceTopic + '-partition-' + this.postForm.partition
}
peekMessagesOnCluster(
this.getCurrentCluster(),
this.postForm.persistent,
this.tenantNamespaceTopic,
tenantNamespaceTopic,
this.postForm.subscription,
this.form.peekNumMessages).then(response => {
if (!response.data) return
Expand Down
45 changes: 44 additions & 1 deletion front-end/src/views/management/topics/partitionedTopic.vue
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<div class="app-container">
<div class="createPost-container">
<el-form :inline="true" :model="postForm" label-position="top" class="form-container">
<!--tenant namespace topic select start-->
<el-form-item :label="$t('tenant.label')">
<el-select v-model="postForm.tenant" placeholder="select tenant" @change="getNamespacesList(postForm.tenant)">
<el-option v-for="(item,index) in tenantsListOptions" :key="item+index" :label="item" :value="item"/>
Expand All @@ -32,6 +33,7 @@
<el-option v-for="(item,index) in topicsListOptions" :key="item+index" :label="item" :value="item"/>
</el-select>
</el-form-item>
<!--tenant namespace topic select end-->
<div class="refresh-container">
<el-form-item :label="$t('topic.autoRefresh')">
<el-select ref="autoRefreshSelect" v-model="autoRefreshInterval" placeholder="select auto refresh" @change="onAutoRefreshChanged">
Expand All @@ -56,6 +58,7 @@
</el-form>
</div>
<el-tabs v-model="activeName" @tab-click="handleClick">
<!-- topic overview start-->
<el-tab-pane :label="$t('tabs.overview')" name="overview">
<el-table
:data="partitionTopicStats"
Expand All @@ -65,6 +68,9 @@
<el-table-column :label="$t('common.outMsg')" prop="outMsg"/>
<el-table-column :label="$t('common.inBytes')" prop="inBytes"/>
<el-table-column :label="$t('common.outBytes')" prop="outBytes"/>
<el-table-column :label="$t('common.numberOfEntries')" prop="numberOfEntries"/>
<el-table-column :label="$t('common.msgBacklog')" prop="msgBacklog"/>
<el-table-column :label="$t('common.unackedMessages')" prop="unackedMessages"/>
</el-table>
<h4>{{ $t('topic.subscription.subscriptions') }}</h4>
<el-button
Expand Down Expand Up @@ -217,6 +223,9 @@
</el-col>
</el-row>
</el-tab-pane>
<!-- topic overview end-->

<!-- topic policies start-->
<el-tab-pane label="POLICIES" name="policies">
<h4>{{ $t('topic.policy.authentication') }}
<el-tooltip :content="authorizationContent" class="item" effect="dark" placement="top">
Expand Down Expand Up @@ -269,6 +278,7 @@
<hr class="danger-line">
<el-button type="danger" class="button" @click="handleDeletePartitionTopic">{{ $t('topic.deleteTopic') }}</el-button>
</el-tab-pane>
<!-- topic policies end-->
</el-tabs>
<el-dialog :title="textMap[dialogStatus]" :visible.sync="dialogFormVisible" width="30%">
<el-form ref="form" :model="form" :rules="rules" label-position="top">
Expand Down Expand Up @@ -311,6 +321,7 @@
import { fetchTenants } from '@/api/tenants'
import { fetchNamespaces, getClusters } from '@/api/namespaces'
import {
fetchTopicStatsInternal,
fetchPartitionTopicStats,
deletePartitionTopicOnCluster,
expireMessagesAllSubscriptionsOnCluster,
Expand Down Expand Up @@ -457,18 +468,26 @@ export default {
}
},
initTopicStats() {
// get topic stat detail
fetchPartitionTopicStats(this.postForm.persistent, this.tenantNamespaceTopic, true).then(response => {
if (!response.data) return
this.partitionTopicStats = []
this.partitionTopicStats.push({
inMsg: numberFormatter(response.data.msgRateIn, 2),
outMsg: numberFormatter(response.data.msgRateOut, 2),
inBytes: formatBytes(response.data.msgThroughputIn),
outBytes: formatBytes(response.data.msgThroughputOut)
outBytes: formatBytes(response.data.msgThroughputOut),
// The total number of topics
numberOfEntries: 0,
// Max backlog
msgBacklog: 0,
// Max number of messages without ACK
unackedMessages: 0
})
var prefix = this.postForm.persistent + '://' + this.tenantNamespaceTopic
var tempPartitionsList = Object.keys(response.data.partitions)
this.partitionsList = []
// Traverse each partition
for (var i = 0; i < tempPartitionsList.length; i++) {
var key = prefix + '-partition-' + i
if (response.data.partitions.hasOwnProperty(key)) {
Expand All @@ -488,6 +507,15 @@ export default {
'storageSize': formatBytes(response.data.partitions[key].storageSize, 0),
'partitionTopicLink': '/management/topics/' + this.postForm.persistent + '/' + this.tenantNamespaceTopic + '-partition-' + i + '/topic'
})

// Calculate the sum of each partition message
fetchTopicStatsInternal(this.postForm.persistent, this.postForm.tenant + '/' + this.postForm.namespace + '/' + partition).then(response => {
if (!response.data) return
// is number
if (typeof response.data.numberOfEntries === 'number' && !isNaN(response.data.numberOfEntries)) {
this.partitionTopicStats[0].numberOfEntries = this.partitionTopicStats[0].numberOfEntries + response.data.numberOfEntries
}
})
}
}
var index = 0
Expand All @@ -497,17 +525,32 @@ export default {
var type = 'Exclusive'
var children = []
for (var j in response.data.partitions) {
// tenant + namespace + partition
var subSplitPartition = j.split('://')
// partition
var subPartition = subSplitPartition[1].split('/')[2]
if (response.data.partitions[j].hasOwnProperty('subscriptions')) {
for (var p in response.data.partitions[j].subscriptions) {
if (p === s) {
// msgBacklog is number
if (typeof response.data.partitions[j].subscriptions[p].msgBacklog === 'number' && !isNaN(response.data.partitions[j].subscriptions[p].msgBacklog)) {
if (this.partitionTopicStats[0].msgBacklog < response.data.partitions[j].subscriptions[p].msgBacklog) {
this.partitionTopicStats[0].msgBacklog = response.data.partitions[j].subscriptions[p].msgBacklog
}
}
// unackedMessages is number
if (typeof response.data.partitions[j].subscriptions[p].unackedMessages === 'number' && !isNaN(response.data.partitions[j].subscriptions[p].unackedMessages)) {
if (this.partitionTopicStats[0].unackedMessages < response.data.partitions[j].subscriptions[p].unackedMessages) {
this.partitionTopicStats[0].unackedMessages = response.data.partitions[j].subscriptions[p].unackedMessages
}
}
children.push({
'id': 1000000 * (index + 1) + j,
'subscription': subPartition,
'outMsg': numberFormatter(response.data.partitions[j].subscriptions[p].msgRateOut, 2),
'outBytes': formatBytes(response.data.partitions[j].subscriptions[p].msgThroughputOut),
'msgExpired': numberFormatter(response.data.partitions[j].subscriptions[p].msgRateExpired, 2),
// Backlog of messages in the topic
'backlog': response.data.partitions[j].subscriptions[p].msgBacklog,
'type': response.data.partitions[j].subscriptions[p].type,
'subscriptionLink': '/management/subscriptions/' + this.postForm.persistent + '/' + subSplitPartition[1] + '/' + s + '/subscription',
Expand Down
7 changes: 7 additions & 0 deletions front-end/src/views/management/topics/topic.vue
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
</el-form>
</div>
<el-tabs v-model="activeName" @tab-click="handleClick">
<!--overview in topic partition start -->
<el-tab-pane :label="$t('tabs.overview')" name="overview">
<el-row :gutter="12">
<el-col :span="12">
Expand Down Expand Up @@ -344,6 +345,8 @@
</el-col>
</el-row>
</el-tab-pane>
<!--overview in topic partition end-->
<!--storage in topic partition start-->
<el-tab-pane v-if="nonPersistent===false" :label="$t('tabs.storage')" name="storage">
<el-row :gutter="12">
<el-col :span="8">
Expand Down Expand Up @@ -461,6 +464,8 @@
</el-col>
</el-row>
</el-tab-pane>
<!--storage in topic partition end-->
<!--storage in topic policies start-->
<el-tab-pane :label="$t('tabs.policies')" name="policies">
<h4>{{ $t('topic.policy.authentication') }}
<el-tooltip :content="authorizationContent" class="item" effect="dark" placement="top">
Expand Down Expand Up @@ -513,6 +518,7 @@
<hr class="danger-line">
<el-button type="danger" class="button" @click="handleDeleteTopic">{{ $t('topic.deleteTopic') }}</el-button>
</el-tab-pane>
<!--storage in topic end start-->
</el-tabs>
<el-dialog :title="textMap[dialogStatus]" :visible.sync="dialogFormVisible" width="30%">
<el-form label-position="top">
Expand Down Expand Up @@ -845,6 +851,7 @@ export default {
})
}
}
// Total number of messages tracked
this.entries = numberFormatter(response.data.numberOfEntries, 0)
for (var c in response.data.cursors) {
this.cursorsList.push({
Expand Down